etcd

package
v0.0.0-...-a5247d6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 14, 2017 License: GPL-3.0 Imports: 26 Imported by: 0

Documentation

Overview

Package etcd implements the distributed key value store integration. This also takes care of managing and clustering the embedded etcd server. The elastic etcd algorithm works in the following way: * When you start up mgmt, you can pass it a list of seeds. * If no seeds are given, then assume you are the first server and startup. * If a seed is given, connect as a client, and optionally volunteer to be a server. * All volunteering clients should listen for a message from the master for nomination. * If a client has been nominated, it should startup a server. * All servers should list for their nomination to be removed and shutdown if so. * The elected leader should decide who to nominate/unnominate to keep the right number of servers.

Smoke testing: mkdir /tmp/mgmt{A..E} ./mgmt run --yaml examples/etcd1a.yaml --hostname h1 --tmp-prefix --no-pgp ./mgmt run --yaml examples/etcd1b.yaml --hostname h2 --tmp-prefix --no-pgp --seeds http://127.0.0.1:2379 --client-urls http://127.0.0.1:2381 --server-urls http://127.0.0.1:2382 ./mgmt run --yaml examples/etcd1c.yaml --hostname h3 --tmp-prefix --no-pgp --seeds http://127.0.0.1:2379 --client-urls http://127.0.0.1:2383 --server-urls http://127.0.0.1:2384 ETCDCTL_API=3 etcdctl --endpoints 127.0.0.1:2379 put /_mgmt/idealClusterSize 3 ./mgmt run --yaml examples/etcd1d.yaml --hostname h4 --tmp-prefix --no-pgp --seeds http://127.0.0.1:2379 --client-urls http://127.0.0.1:2385 --server-urls http://127.0.0.1:2386 ./mgmt run --yaml examples/etcd1e.yaml --hostname h5 --tmp-prefix --no-pgp --seeds http://127.0.0.1:2379 --client-urls http://127.0.0.1:2387 --server-urls http://127.0.0.1:2388 ETCDCTL_API=3 etcdctl --endpoints 127.0.0.1:2379 member list ETCDCTL_API=3 etcdctl --endpoints 127.0.0.1:2381 put /_mgmt/idealClusterSize 5 ETCDCTL_API=3 etcdctl --endpoints 127.0.0.1:2381 member list

Index

Constants

View Source
const (
	NS = "_mgmt" // root namespace for mgmt operations

	MaxStartServerTimeout = 60 // max number of seconds to wait for server to start
	MaxStartServerRetries = 3  // number of times to retry starting the etcd server

	DefaultIdealClusterSize = 5 // default ideal cluster size target for initial seed
	DefaultClientURL        = "127.0.0.1:2379"
	DefaultServerURL        = "127.0.0.1:2380"
)

constant parameters which may need to be tweaked or customized

Variables

View Source
var ErrNotExist = errors.New("errNotExist")

ErrNotExist is returned when GetStr can not find the requested key. TODO: https://dave.cheney.net/2016/04/07/constant-errors

Functions

func AddHostnameConvergedWatcher

func AddHostnameConvergedWatcher(obj *EmbdEtcd, callbackFn func(map[string]bool) error) (func(), error)

AddHostnameConvergedWatcher adds a watcher with a callback that runs on hostname state changes.

func AdvertiseEndpoints

func AdvertiseEndpoints(obj *EmbdEtcd, urls etcdtypes.URLs) error

AdvertiseEndpoints advertises the list of available client endpoints.

func ApplyDeltaEvents

func ApplyDeltaEvents(re *RE, urlsmap etcdtypes.URLsMap) (etcdtypes.URLsMap, error)

ApplyDeltaEvents modifies a URLsMap with the deltas from a WatchResponse.

func Endpoints

func Endpoints(obj *EmbdEtcd) (etcdtypes.URLsMap, error)

Endpoints returns a urls map of available etcd server endpoints.

func GetClusterSize

func GetClusterSize(obj *EmbdEtcd) (uint16, error)

GetClusterSize gets the ideal target cluster size of etcd peers.

func GetResources

func GetResources(obj *EmbdEtcd, hostnameFilter, kindFilter []string) ([]resources.Res, error)

GetResources collects all of the resources which match a filter from etcd. If the kindfilter or hostnameFilter is empty, then it assumes no filtering... TODO: Expand this with a more powerful filter based on what we eventually support in our collect DSL. Ideally a server side filter like WithFilter() We could do this if the pattern was /$NS/exported/$kind/$hostname/$uid = $data.

func GetStr

func GetStr(obj *EmbdEtcd, key string) (string, error)

GetStr collects the string which matches a global namespace in etcd.

func GetStrMap

func GetStrMap(obj *EmbdEtcd, hostnameFilter []string, key string) (map[string]string, error)

GetStrMap collects all of the strings which match a namespace in etcd.

func HostnameConverged

func HostnameConverged(obj *EmbdEtcd) (map[string]bool, error)

HostnameConverged returns a map of every hostname's converged state.

func Leader

func Leader(obj *EmbdEtcd) (string, error)

Leader returns the current leader of the etcd server cluster.

func MemberAdd

func MemberAdd(obj *EmbdEtcd, peerURLs etcdtypes.URLs) (*etcd.MemberAddResponse, error)

MemberAdd adds a member to the cluster.

func MemberRemove

func MemberRemove(obj *EmbdEtcd, mID uint64) (bool, error)

MemberRemove removes a member by mID and returns if it worked, and also if there was an error. This is because it might have run without error, but the member wasn't found, for example.

func Members

func Members(obj *EmbdEtcd) (map[uint64]string, error)

Members returns information on cluster membership. The member ID's are the keys, because an empty names means unstarted! TODO: consider queueing this through the main loop with CtxError(ctx, err)

func Nominate

func Nominate(obj *EmbdEtcd, hostname string, urls etcdtypes.URLs) error

Nominate nominates a particular client to be a server (peer).

func Nominated

func Nominated(obj *EmbdEtcd) (etcdtypes.URLsMap, error)

Nominated returns a urls map of nominated etcd server volunteers. NOTE: I know 'nominees' might be more correct, but is less consistent here

func SetClusterSize

func SetClusterSize(obj *EmbdEtcd, value uint16) error

SetClusterSize sets the ideal target cluster size of etcd peers.

func SetHostnameConverged

func SetHostnameConverged(obj *EmbdEtcd, hostname string, isConverged bool) error

SetHostnameConverged sets whether a specific hostname is converged.

func SetResources

func SetResources(obj *EmbdEtcd, hostname string, resourceList []resources.Res) error

SetResources exports all of the resources which we pass in to etcd.

func SetStr

func SetStr(obj *EmbdEtcd, key string, data *string) error

SetStr sets a key and hostname pair to a certain value. If the value is nil, then it deletes the key. Otherwise the value should point to a string. TODO: TTL or delete disconnect?

func SetStrMap

func SetStrMap(obj *EmbdEtcd, hostname, key string, data *string) error

SetStrMap sets a key and hostname pair to a certain value. If the value is nil, then it deletes the key. Otherwise the value should point to a string. TODO: TTL or delete disconnect?

func Volunteer

func Volunteer(obj *EmbdEtcd, urls etcdtypes.URLs) error

Volunteer offers yourself up to be a server if needed.

func Volunteers

func Volunteers(obj *EmbdEtcd) (etcdtypes.URLsMap, error)

Volunteers returns a urls map of available etcd server volunteers.

func WatchResources

func WatchResources(obj *EmbdEtcd) chan error

WatchResources returns a channel that outputs events when exported resources change. TODO: Filter our watch (on the server side if possible) based on the collection prefixes and filters that we care about...

func WatchStr

func WatchStr(obj *EmbdEtcd, key string) chan error

WatchStr returns a channel which spits out events on key activity. FIXME: It should close the channel when it's done, and spit out errors when something goes wrong.

func WatchStrMap

func WatchStrMap(obj *EmbdEtcd, key string) chan error

WatchStrMap returns a channel which spits out events on key activity. FIXME: It should close the channel when it's done, and spit out errors when something goes wrong.

Types

type AW

type AW struct {
	// contains filtered or unexported fields
}

AW is a struct for the AddWatcher queue.

type CtxDelayErr

type CtxDelayErr struct {
	Delta   time.Duration
	Message string
}

CtxDelayErr requests a retry in Delta duration.

func (*CtxDelayErr) Error

func (obj *CtxDelayErr) Error() string

type CtxPermanentErr

type CtxPermanentErr struct {
	Message string
}

CtxPermanentErr is a permanent failure error to notify about borkage.

func (*CtxPermanentErr) Error

func (obj *CtxPermanentErr) Error() string

type CtxReconnectErr

type CtxReconnectErr struct {
	Message string
}

CtxReconnectErr requests a client reconnect to the new endpoint list.

func (*CtxReconnectErr) Error

func (obj *CtxReconnectErr) Error() string

type CtxRetriesErr

type CtxRetriesErr struct {
	Retries uint
	Message string
}

CtxRetriesErr lets you retry as long as you have retries available. TODO: consider combining this with CtxDelayErr

func (*CtxRetriesErr) Error

func (obj *CtxRetriesErr) Error() string

type DL

type DL struct {
	// contains filtered or unexported fields
}

DL is a struct for the delete queue.

type EmbdEtcd

type EmbdEtcd struct {
	// contains filtered or unexported fields
}

EmbdEtcd provides the embedded server and client etcd functionality.

func NewEmbdEtcd

func NewEmbdEtcd(hostname string, seeds, clientURLs, serverURLs, advertiseClientURLs, advertiseServerURLs etcdtypes.URLs, noServer bool, idealClusterSize uint16, flags Flags, prefix string, converger converger.Converger) *EmbdEtcd

NewEmbdEtcd creates the top level embedded etcd struct client and server obj.

func (*EmbdEtcd) AddWatcher

func (obj *EmbdEtcd) AddWatcher(path string, callback func(re *RE) error, errCheck bool, skipConv bool, opts ...etcd.OpOption) (func(), error)

AddWatcher queues up an add watcher request and returns a cancel function. Remember to add the etcd.WithPrefix() option if you want to watch recursively.

func (*EmbdEtcd) CancelCtx

func (obj *EmbdEtcd) CancelCtx(ctx context.Context) (context.Context, func())

CancelCtx adds a tracked cancel function around an existing context.

func (*EmbdEtcd) CbLoop

func (obj *EmbdEtcd) CbLoop()

CbLoop is the loop where callback execution is serialized.

func (*EmbdEtcd) ComplexGet

func (obj *EmbdEtcd) ComplexGet(path string, skipConv bool, opts ...etcd.OpOption) (map[string]string, error)

ComplexGet performs a get operation and waits for an ACK to continue. It can accept more arguments that are useful for the less common operations. TODO: perhaps a get should never cause an un-converge ?

func (*EmbdEtcd) Connect

func (obj *EmbdEtcd) Connect(reconnect bool) error

Connect connects the client to a server, and then builds the *API structs. If reconnect is true, it will force a reconnect with new config endpoints.

func (*EmbdEtcd) CtxError

func (obj *EmbdEtcd) CtxError(ctx context.Context, err error) (context.Context, error)

CtxError is called whenever there is a connection or other client problem that needs to be resolved before we can continue, eg: connection disconnected, change of server to connect to, etc... It modifies the context if needed.

func (*EmbdEtcd) Delete

func (obj *EmbdEtcd) Delete(path string, opts ...etcd.OpOption) (int64, error)

Delete performs a delete operation and waits for an ACK to continue.

func (*EmbdEtcd) Destroy

func (obj *EmbdEtcd) Destroy() error

Destroy cleans up the entire embedded etcd system. Use DestroyServer if you only want to shutdown the embedded server portion.

func (*EmbdEtcd) DestroyServer

func (obj *EmbdEtcd) DestroyServer() error

DestroyServer shuts down the embedded etcd server portion.

func (*EmbdEtcd) Get

func (obj *EmbdEtcd) Get(path string, opts ...etcd.OpOption) (map[string]string, error)

Get performs a get operation and waits for an ACK to continue.

func (*EmbdEtcd) GetConfig

func (obj *EmbdEtcd) GetConfig() etcd.Config

GetConfig returns the config struct to be used for the etcd client connect.

func (*EmbdEtcd) LocalhostClientURLs

func (obj *EmbdEtcd) LocalhostClientURLs() etcdtypes.URLs

LocalhostClientURLs returns the most localhost like URLs for direct connection. This gets clients to talk to the local servers first before searching remotely.

func (*EmbdEtcd) Loop

func (obj *EmbdEtcd) Loop()

Loop is the main loop where everything is serialized.

func (*EmbdEtcd) ServerReady

func (obj *EmbdEtcd) ServerReady() <-chan struct{}

ServerReady returns on a channel when the server has started successfully.

func (*EmbdEtcd) Set

func (obj *EmbdEtcd) Set(key, value string, opts ...etcd.OpOption) error

Set queues up a set operation to occur using our mainloop.

func (*EmbdEtcd) StartServer

func (obj *EmbdEtcd) StartServer(newCluster bool, peerURLsMap etcdtypes.URLsMap) error

StartServer kicks of a new embedded etcd server.

func (*EmbdEtcd) Startup

func (obj *EmbdEtcd) Startup() error

Startup is the main entry point to kick off the embedded etcd client & server.

func (*EmbdEtcd) TimeoutCtx

func (obj *EmbdEtcd) TimeoutCtx(ctx context.Context, t time.Duration) (context.Context, func())

TimeoutCtx adds a tracked cancel function with timeout around an existing context.

func (*EmbdEtcd) Txn

func (obj *EmbdEtcd) Txn(ifcmps []etcd.Cmp, thenops, elseops []etcd.Op) (*etcd.TxnResponse, error)

Txn performs a transaction and waits for an ACK to continue.

type Flags

type Flags struct {
	Debug   bool // add additional log messages
	Trace   bool // add execution flow log messages
	Verbose bool // add extra log message output
}

Flags are some constant flags which are used throughout the program.

type GQ

type GQ struct {
	// contains filtered or unexported fields
}

GQ is a struct for the get queue.

type KV

type KV struct {
	// contains filtered or unexported fields
}

KV is a key + value struct to hold the two items together.

type RE

type RE struct {
	// contains filtered or unexported fields
}

RE is a response + error struct since these two values often occur together. This is now called an event with the move to the etcd v3 API.

type TN

type TN struct {
	// contains filtered or unexported fields
}

TN is a struct for the txn queue.

type World

type World struct {
	Hostname string // uuid for the consumer of these
	EmbdEtcd *EmbdEtcd
}

World is an etcd backed implementation of the World interface.

func (*World) ResCollect

func (obj *World) ResCollect(hostnameFilter, kindFilter []string) ([]resources.Res, error)

ResCollect gets the collection of exported resources which match the filter. It does this atomically so that a call always returns a complete collection.

func (*World) ResExport

func (obj *World) ResExport(resourceList []resources.Res) error

ResExport exports a list of resources under our hostname namespace. Subsequent calls replace the previously set collection atomically.

func (*World) ResWatch

func (obj *World) ResWatch() chan error

ResWatch returns a channel which spits out events on possible exported resource changes.

func (*World) StrDel

func (obj *World) StrDel(namespace string) error

StrDel deletes the value in a particular namespace.

func (*World) StrGet

func (obj *World) StrGet(namespace string) (string, error)

StrGet returns the value for the the given namespace.

func (*World) StrIsNotExist

func (obj *World) StrIsNotExist(err error) bool

StrIsNotExist returns whether the error from StrGet is a key missing error.

func (*World) StrMapDel

func (obj *World) StrMapDel(namespace string) error

StrMapDel deletes the value in a particular namespace.

func (*World) StrMapGet

func (obj *World) StrMapGet(namespace string) (map[string]string, error)

StrMapGet returns a map of hostnames to values in the given namespace.

func (*World) StrMapSet

func (obj *World) StrMapSet(namespace, value string) error

StrMapSet sets the namespace value to a particular string under the identity of its own hostname.

func (*World) StrMapWatch

func (obj *World) StrMapWatch(namespace string) chan error

StrMapWatch returns a channel which spits out events on possible string changes.

func (*World) StrSet

func (obj *World) StrSet(namespace, value string) error

StrSet sets the namespace value to a particular string.

func (*World) StrWatch

func (obj *World) StrWatch(namespace string) chan error

StrWatch returns a channel which spits out events on possible string changes.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL