m_etcd

package
v0.0.0-...-8de5e17 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 28, 2016 License: Apache-2.0 Imports: 13 Imported by: 0

README

metafora etcd client

See Documentation/etcd.md for details.

Testing

Testing the metafora etcd client requires that a new etcd instance be running. The etcd instances should be reachable via the connection described by the connection string localhost:5001,localhost:5002,localhost:5003 or a similar connection string should be exported as an environment variable ETCDCTL_PEERS. The environemnt variable ETCDTESTS must be set, otherwise the tests will be skipped.

An example of running the integration tests is given in the command line below:

ETCDTESTS=1 IP="127.0.0.1" ETCDCTL_PEERS="$IP:5001,$IP:5002,$IP:5003"  go test -v

Documentation

Overview

Package m_etcd contains implementations of all Metafora interfaces using etcd as the broker/backing store.

See https://github.com/lytics/metafora/Documentation/etcd.md for details.

Index

Constants

View Source
const (
	DefaultClaimTTL uint64 = 180 // 3 minutes in seconds
	DefaultNodeTTL  uint64 = 60  // seconds

	TasksPath    = "tasks"
	NodesPath    = "nodes"
	CommandsPath = "commands"
	MetadataKey  = "_metafora" // _{KEYs} are hidden files, so this will not trigger our watches
	OwnerMarker  = "owner"
	PropsKey     = "props"

	//Etcd Error codes are passed directly through go-etcd from the http response,
	//So to find the error codes use this ref:
	//       https://github.com/coreos/etcd/blob/master/error/error.go#L67
	EcodeKeyNotFound   = 100
	EcodeCompareFailed = 101
	EcodeNodeExist     = 105
	EcodeExpiredIndex  = 401 // The event in requested index is outdated and cleared
)

Variables

View Source
var (

	// ErrRefreshFailed is an error returned when coordinator fails to update
	// its node key in the database.
	ErrRefreshFailed = errors.New("unable to refresh node key before deadline")
)

Functions

func DefaultTaskFunc

func DefaultTaskFunc(id, _ string) metafora.Task

DefaultTaskFunc is the default new task function used by the EtcdCoordinator and does not attempt to process the properties value.

func New

New creates a Metafora Coordinator, State Machine, State Store, Fair Balancer, and Commander, all backed by etcd.

Create a Config and implement your task handler as a StatefulHandler. Then New will create all the components needed to call metafora.NewConsumer:

 conf := m_etcd.NewConfig("work", hosts)
	coord, hf, bal, err := m_etcd.New(conf, customHandler)
	if err != nil { /* ...exit... */ }
	consumer, err := metafora.NewConsumer(coord, hf, bal)

func NewClient

func NewClient(namespace string, hosts []string) metafora.Client

NewClient creates a new client using an etcd backend.

func NewCommandListener

func NewCommandListener(task metafora.Task, namespace string, c *etcd.Client) statemachine.CommandListener

NewCommandListener makes a statemachine.CommandListener implementation backed by etcd. The namespace should be the same as the coordinator as commands use a separate path within a namespace than tasks or nodes.

func NewCommander

func NewCommander(namespace string, c *etcd.Client) statemachine.Commander

func NewFairBalancer

func NewFairBalancer(conf *Config) metafora.Balancer

NewFairBalancer creates a new metafora.DefaultFairBalancer that uses etcd for counting tasks per node.

func NewStateStore

func NewStateStore(namespace string, etcdc *etcd.Client) statemachine.StateStore

NewStateStore returns a StateStore implementation that persists task states in etcd.

Types

type Config

type Config struct {
	// Namespace is the key prefix to allow for multitenant use of etcd.
	//
	// Namespaces must start with a / (added by NewConfig if needed).
	Namespace string

	// Name of this Metafora consumer. Only one instance of a Name is allowed to
	// run in a Namespace at a time, so if you set the Name to hostname you can
	// effectively limit Metafora to one process per server.
	Name string

	// Hosts are the URLs to create etcd clients with.
	Hosts []string

	// ClaimTTL is the timeout on task claim markers in seconds.
	//
	// Since every task must update its claim before the TTL expires, setting
	// this lower will increase the load on etcd. Setting this setting higher
	// increases the amount of time it takes a task to be rescheduled if the node
	// it was running on shutsdown uncleanly (or is separated by a network
	// partition).
	//
	// If 0 it is set to DefaultClaimTTL
	ClaimTTL uint64

	// NodeTTL is the timeout on the node's name entry in seconds.
	//
	// If 0 it is set to DefaultNodeTTL
	NodeTTL uint64

	// NewTaskFunc is the function called to unmarshal tasks from etcd into a
	// custom struct. The struct must implement the metafora.Task interface.
	//
	// If nil it is set to DefaultTaskFunc
	NewTaskFunc TaskFunc
}

func NewConfig

func NewConfig(name, namespace string, hosts []string) *Config

NewConfig creates a Config with the required fields and uses defaults for the others.

Panics on empty values.

func (*Config) Copy

func (c *Config) Copy() *Config

Copy returns a shallow copy of this config.

func (*Config) String

func (c *Config) String() string

type EtcdCoordinator

type EtcdCoordinator struct {
	// contains filtered or unexported fields
}

func NewEtcdCoordinator

func NewEtcdCoordinator(conf *Config) (*EtcdCoordinator, error)

NewEtcdCoordinator creates a new Metafora Coordinator implementation using etcd as the broker. If no node ID is specified, a unique one will be generated.

Coordinator methods will be called by the core Metafora Consumer. Calling Init, Close, etc. from your own code will lead to undefined behavior.

func (*EtcdCoordinator) Claim

func (ec *EtcdCoordinator) Claim(task metafora.Task) bool

Claim is called by the Consumer when a Balancer has determined that a task ID can be claimed. Claim returns false if the task could not be claimed. Either due to error, the task being completed, or another consumer has already claimed it.

func (*EtcdCoordinator) Close

func (ec *EtcdCoordinator) Close()

Close stops the coordinator and causes blocking Watch and Command methods to return zero values. It does not release tasks.

func (*EtcdCoordinator) Command

func (ec *EtcdCoordinator) Command() (metafora.Command, error)

Command blocks until a command for this node is received from the broker by the coordinator.

func (*EtcdCoordinator) Done

func (ec *EtcdCoordinator) Done(task metafora.Task)

Done deletes the task.

func (*EtcdCoordinator) Errors

func (ec *EtcdCoordinator) Errors() <-chan error

func (*EtcdCoordinator) Init

Init is called once by the consumer to provide a Logger to Coordinator implementations.

func (*EtcdCoordinator) Name

func (ec *EtcdCoordinator) Name() string

func (*EtcdCoordinator) Release

func (ec *EtcdCoordinator) Release(task metafora.Task)

Release deletes the claim file.

func (*EtcdCoordinator) Watch

func (ec *EtcdCoordinator) Watch(out chan<- metafora.Task) error

Watch streams tasks from etcd watches or GETs until Close is called or etcd is unreachable (in which case an error is returned).

type TaskFunc

type TaskFunc func(id, value string) metafora.Task

TaskFunc creates a Task interface from a task ID and etcd Node. The Node corresponds to the task directory.

Implementations must support value being an empty string.

If nil is returned the task is ignored.

Directories

Path Synopsis
Package testutil is a collection of utilities for use by Metafora's etcd tests.
Package testutil is a collection of utilities for use by Metafora's etcd tests.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL