Documentation ¶
Overview ¶
Package m_etcd contains implementations of all Metafora interfaces using etcd as the broker/backing store.
See https://github.com/lytics/metafora/Documentation/etcd.md for details.
Index ¶
- Constants
- Variables
- func DefaultTaskFunc(id, _ string) metafora.Task
- func New(conf *Config, h statemachine.StatefulHandler) (metafora.Coordinator, metafora.HandlerFunc, metafora.Balancer, error)
- func NewClient(namespace string, hosts []string) metafora.Client
- func NewCommandListener(task metafora.Task, namespace string, c *etcd.Client) statemachine.CommandListener
- func NewCommander(namespace string, c *etcd.Client) statemachine.Commander
- func NewFairBalancer(conf *Config) metafora.Balancer
- func NewStateStore(namespace string, etcdc *etcd.Client) statemachine.StateStore
- type Config
- type EtcdCoordinator
- func (ec *EtcdCoordinator) Claim(task metafora.Task) bool
- func (ec *EtcdCoordinator) Close()
- func (ec *EtcdCoordinator) Command() (metafora.Command, error)
- func (ec *EtcdCoordinator) Done(task metafora.Task)
- func (ec *EtcdCoordinator) Errors() <-chan error
- func (ec *EtcdCoordinator) Init(cordCtx metafora.CoordinatorContext) error
- func (ec *EtcdCoordinator) Name() string
- func (ec *EtcdCoordinator) Release(task metafora.Task)
- func (ec *EtcdCoordinator) Watch(out chan<- metafora.Task) error
- type TaskFunc
Constants ¶
const ( DefaultClaimTTL uint64 = 180 // 3 minutes in seconds DefaultNodeTTL uint64 = 60 // seconds TasksPath = "tasks" NodesPath = "nodes" CommandsPath = "commands" MetadataKey = "_metafora" // _{KEYs} are hidden files, so this will not trigger our watches OwnerMarker = "owner" PropsKey = "props" //Etcd Error codes are passed directly through go-etcd from the http response, //So to find the error codes use this ref: // https://github.com/coreos/etcd/blob/master/error/error.go#L67 EcodeKeyNotFound = 100 EcodeCompareFailed = 101 EcodeNodeExist = 105 EcodeExpiredIndex = 401 // The event in requested index is outdated and cleared )
Variables ¶
var ( // ErrRefreshFailed is an error returned when coordinator fails to update // its node key in the database. ErrRefreshFailed = errors.New("unable to refresh node key before deadline") )
Functions ¶
func DefaultTaskFunc ¶
DefaultTaskFunc is the default new task function used by the EtcdCoordinator and does not attempt to process the properties value.
func New ¶
func New(conf *Config, h statemachine.StatefulHandler) ( metafora.Coordinator, metafora.HandlerFunc, metafora.Balancer, error)
New creates a Metafora Coordinator, State Machine, State Store, Fair Balancer, and Commander, all backed by etcd.
Create a Config and implement your task handler as a StatefulHandler. Then New will create all the components needed to call metafora.NewConsumer:
conf := m_etcd.NewConfig("work", hosts) coord, hf, bal, err := m_etcd.New(conf, customHandler) if err != nil { /* ...exit... */ } consumer, err := metafora.NewConsumer(coord, hf, bal)
func NewCommandListener ¶
func NewCommandListener(task metafora.Task, namespace string, c *etcd.Client) statemachine.CommandListener
NewCommandListener makes a statemachine.CommandListener implementation backed by etcd. The namespace should be the same as the coordinator as commands use a separate path within a namespace than tasks or nodes.
func NewCommander ¶
func NewCommander(namespace string, c *etcd.Client) statemachine.Commander
func NewFairBalancer ¶
NewFairBalancer creates a new metafora.DefaultFairBalancer that uses etcd for counting tasks per node.
func NewStateStore ¶
func NewStateStore(namespace string, etcdc *etcd.Client) statemachine.StateStore
NewStateStore returns a StateStore implementation that persists task states in etcd.
Types ¶
type Config ¶
type Config struct { // Namespace is the key prefix to allow for multitenant use of etcd. // // Namespaces must start with a / (added by NewConfig if needed). Namespace string // Name of this Metafora consumer. Only one instance of a Name is allowed to // run in a Namespace at a time, so if you set the Name to hostname you can // effectively limit Metafora to one process per server. Name string // Hosts are the URLs to create etcd clients with. Hosts []string // ClaimTTL is the timeout on task claim markers in seconds. // // Since every task must update its claim before the TTL expires, setting // this lower will increase the load on etcd. Setting this setting higher // increases the amount of time it takes a task to be rescheduled if the node // it was running on shutsdown uncleanly (or is separated by a network // partition). // // If 0 it is set to DefaultClaimTTL ClaimTTL uint64 // NodeTTL is the timeout on the node's name entry in seconds. // // If 0 it is set to DefaultNodeTTL NodeTTL uint64 // NewTaskFunc is the function called to unmarshal tasks from etcd into a // custom struct. The struct must implement the metafora.Task interface. // // If nil it is set to DefaultTaskFunc NewTaskFunc TaskFunc }
type EtcdCoordinator ¶
type EtcdCoordinator struct {
// contains filtered or unexported fields
}
func NewEtcdCoordinator ¶
func NewEtcdCoordinator(conf *Config) (*EtcdCoordinator, error)
NewEtcdCoordinator creates a new Metafora Coordinator implementation using etcd as the broker. If no node ID is specified, a unique one will be generated.
Coordinator methods will be called by the core Metafora Consumer. Calling Init, Close, etc. from your own code will lead to undefined behavior.
func (*EtcdCoordinator) Claim ¶
func (ec *EtcdCoordinator) Claim(task metafora.Task) bool
Claim is called by the Consumer when a Balancer has determined that a task ID can be claimed. Claim returns false if the task could not be claimed. Either due to error, the task being completed, or another consumer has already claimed it.
func (*EtcdCoordinator) Close ¶
func (ec *EtcdCoordinator) Close()
Close stops the coordinator and causes blocking Watch and Command methods to return zero values. It does not release tasks.
func (*EtcdCoordinator) Command ¶
func (ec *EtcdCoordinator) Command() (metafora.Command, error)
Command blocks until a command for this node is received from the broker by the coordinator.
func (*EtcdCoordinator) Done ¶
func (ec *EtcdCoordinator) Done(task metafora.Task)
Done deletes the task.
func (*EtcdCoordinator) Errors ¶
func (ec *EtcdCoordinator) Errors() <-chan error
func (*EtcdCoordinator) Init ¶
func (ec *EtcdCoordinator) Init(cordCtx metafora.CoordinatorContext) error
Init is called once by the consumer to provide a Logger to Coordinator implementations.
func (*EtcdCoordinator) Name ¶
func (ec *EtcdCoordinator) Name() string
func (*EtcdCoordinator) Release ¶
func (ec *EtcdCoordinator) Release(task metafora.Task)
Release deletes the claim file.