metafora

package module
v0.0.0-...-3c171a9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 15, 2023 License: Apache-2.0 Imports: 10 Imported by: 15

README

metafora

Build Status GoDoc

Metafora is a Go library designed to run long-running (minutes to permanent) tasks in a cluster.

IRC: #lytics/metafora on irc.gitter.im

Features

  • Distributed - horizontally scalable
  • Elastic - online cluster resizing with automated rebalancing
  • Masterless - work stealing, not assigning, pluggable balancing
  • Fault tolerant - tasks are reassigned if nodes disappear
  • Simple - few states, no checkpointing, no configuration management
  • Extensible - well defined interfaces for implementing balancing and coordinating
  • Exactly-once - designed to enforce one-and-only-one instance of each submitted task is runningref

Metafora is a library for building distributed task work systems. You're responsible for creating a main() entrypoint for your application, writing a metafora.Handler and HandlerFunc to actually process tasks, and then starting Metafora's Consumer.

Metafora's task state machine is implemented as a Handler adapter. Simply implement your task processor as a StatefulHandler function, and create a metafora.Handler with statemachine.New.

Example

koalemosd is a sample consumer implementation that can be run as a daemon (it requires etcd). koalemosctl is a sample command line client for submitting tasks to koalemosd.

# Install etcd as per https://go.etcd.io/etcd#getting-etcd
# Run the following in one terminal:
go get -v -u github.com/lytics/metafora/examples/koalemosd
koalemosd

# Run the client in another
go get -v -u github.com/lytics/metafora/examples/koalemosctl
koalemosctl sleep 3 # where "sleep 3" is any command on your $PATH

Since koalemosd is a simple wrapper around OS processes, it does not use the state machine (statemachine.StatefulHandler).

Terms

BalancerGo interface consulted by Consumer for determining which tasks can be claimed and which should be released. See balancer.go.
Brokerexternal task and command store like etcd for the Coordinator to use.
Consumercore work runner. Integrates Balancer, Coordinator, and Handlers to get work done.
Coordinatorclient Go interface to Broker. See coordinator.go.
HandlerGo interface for executing tasks.
Taskunit of work. Executed by Handlers.

FAQ

Q. Is it ready for production use?

Yes. Metafora with the etcd coordinator has been the production work system at Lytics since January 2014 and runs thousands of tasks concurrently across a cluster of VMs.

Since Metafora is still under heavy development, you probably want to pin the dependencies to a commit hash or tag to keep the API stable. The master branch is automatically tested and is safe for use if you can tolerate API changes.

Q. Where is the metaforad daemon?

It doesn't exist. Metafora is library for you to import and use in a service you write. Metafora handles task management but leaves implementation details such as task implementation and daemonization up to the user.

FAQ continued in Documentation...

Documentation

Overview

Metafora is a library for building distributed work systems. It's masterless and extensible via core Balancer and Coordinator interfaces.

If you use the builtin FairBalancer and EtcdCoordinator, all you have to do is implement a Handler and HandlerFunc, and then run the Consumer.

See https://github.com/lytics/metafora

Index

Constants

This section is empty.

Variables

View Source
var BalanceEvery = 15 * time.Minute //TODO make balance wait configurable
View Source
var Debug func(v ...interface{}) = gou.Debug
View Source
var Debugf func(format string, v ...interface{}) = gou.Debugf
View Source
var DumbBalancer = dumbBalancer{}

DumbBalancer is the simplest possible balancer implementation which simply accepts all tasks. Since it has no state a single global instance exists.

View Source
var Error func(v ...interface{}) = gou.Error
View Source
var Errorf func(format string, v ...interface{}) = gou.Errorf
View Source
var Info func(v ...interface{}) = gou.Info
View Source
var Infof func(format string, v ...interface{}) = gou.Infof
View Source
var LogLevel int = gou.LogLevel
View Source
var NoDelay = time.Time{}

NoDelay is simply the zero value for time and meant to be a more meaningful value for CanClaim methods to return instead of initializing a new empty time struct.

View Source
var Warn func(v ...interface{}) = gou.Warn
View Source
var Warnf func(format string, v ...interface{}) = gou.Warnf

Functions

func SetLogger

func SetLogger(l LogOutputter)

SetLogger switches where Metafora logs.

Types

type Balancer

type Balancer interface {
	// Init is called once and only once before any other Balancer methods are
	// called. The context argument is meant to expose functionality that might
	// be useful for CanClaim and Balance implementations.
	Init(BalancerContext)

	// CanClaim should return true if the consumer should accept a task.
	//
	// When denying a claim by returning false, CanClaim should return the time
	// at which to reconsider the task for claiming.
	CanClaim(task Task) (ignoreUntil time.Time, claim bool)

	// Balance should return the list of Task IDs that should be released. The
	// criteria used to determine which tasks should be released is left up to
	// the implementation.
	Balance() (release []string)
}

Balancer is the core task balancing interface. Without a master Metafora clusters are cooperatively balanced -- meaning each node needs to know how to balance itself.

func NewDefaultFairBalancer

func NewDefaultFairBalancer(nodeid string, cs ClusterState) Balancer

NewDefaultFairBalancer creates a new FairBalancer but requires a ClusterState implementation to gain more information about the cluster than BalancerContext provides.

func NewDefaultFairBalancerWithThreshold

func NewDefaultFairBalancerWithThreshold(nodeid string, cs ClusterState, threshold float64) Balancer

NewDefaultFairBalancerWithThreshold allows callers to override FairBalancer's default 120% task load release threshold.

type BalancerContext

type BalancerContext interface {
	// Tasks returns a sorted list of task IDs owned by this Consumer. The
	// Consumer stops task manipulations during claiming and balancing, so the
	// list will be accurate unless a task naturally completes.
	Tasks() []RunningTask
}

BalancerContext is a limited interface exposed to Balancers from the Consumer for access to limited Consumer state.

type Client

type Client interface {
	// SubmitTask submits a task to the system, the task id must be unique.
	SubmitTask(Task) error

	// Delete a task
	DeleteTask(taskId string) error

	// SubmitCommand submits a command to a particular node.
	SubmitCommand(node string, command Command) error

	// Nodes retrieves the current set of registered nodes.
	Nodes() ([]string, error)
}

type ClusterState

type ClusterState interface {
	// Provide the current number of jobs
	NodeTaskCount() (map[string]int, error)
}

Provides information about the cluster to be used by FairBalancer

type Command

type Command interface {
	// Name returns the name of the command.
	Name() string

	// Parameters returns the parameters, if any, the command will be executed
	// with.
	Parameters() map[string]interface{}

	// Marshal turns a command into its wire representation.
	Marshal() ([]byte, error)
}

Commands are a way clients can communicate directly with nodes for cluster maintenance.

Use the Command functions to generate implementations of this interface. Metafora's consumer will discard unknown commands.

func CommandBalance

func CommandBalance() Command

CommandBalance forces the node's balancer.Balance method to be called even if frozen.

func CommandFreeze

func CommandFreeze() Command

CommandFreeze stops all task watching and balancing.

func CommandStopTask

func CommandStopTask(task string) Command

CommandStopTask forces a node to stop a task even if frozen.

func CommandUnfreeze

func CommandUnfreeze() Command

CommandUnfreeze resumes task watching and balancing.

func UnmarshalCommand

func UnmarshalCommand(p []byte) (Command, error)

Unmarshal parses a command from its wire representation.

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer is the core Metafora task runner.

func NewConsumer

func NewConsumer(coord Coordinator, h HandlerFunc, b Balancer) (*Consumer, error)

NewConsumer returns a new consumer and calls Init on the Balancer and Coordinator.

func (*Consumer) Frozen

func (c *Consumer) Frozen() bool

Frozen returns true if Metafora is no longer watching for new tasks or rebalancing.

Metafora will remain frozen until receiving an Unfreeze command or it is restarted (frozen state is not persisted).

func (*Consumer) Ignores

func (c *Consumer) Ignores() []string

Ignores is a list of all ignored tasks.

func (*Consumer) Run

func (c *Consumer) Run()

Run is the core run loop of Metafora. It is responsible for calling into the Coordinator to claim work and Balancer to rebalance work.

Run blocks until Shutdown is called or an internal error occurs.

func (*Consumer) Shutdown

func (c *Consumer) Shutdown()

Shutdown stops the main Run loop, calls Stop on all handlers, and calls Close on the Coordinator. Running tasks will be released for other nodes to claim.

func (*Consumer) String

func (c *Consumer) String() string

func (*Consumer) Tasks

func (c *Consumer) Tasks() []RunningTask

Tasks returns a lexicographically sorted list of running Task IDs.

type Coordinator

type Coordinator interface {
	// Init is called once by the consumer to provide a Logger to Coordinator
	// implementations. NewConsumer will return Init's return value.
	Init(CoordinatorContext) error

	// Watch the broker for claimable tasks. Watch blocks until Close is called
	// or it encounters an error. Tasks are sent to consumer via the tasks chan.
	Watch(tasks chan<- Task) (err error)

	// Claim is called by the Consumer when a Balancer has determined that a task
	// ID can be claimed. Claim returns false if another consumer has already
	// claimed the ID.
	Claim(Task) bool

	// Release a task for other consumers to claim. May be called after Close.
	Release(Task)

	// Done is called by Metafora when a task has been completed and should never
	// be scheduled to run again (in other words: deleted from the broker).
	//
	// May be called after Close.
	Done(Task)

	// Command blocks until a command for this node is received from the broker
	// by the coordinator. Command must return (nil, nil) when Close is called.
	Command() (Command, error)

	// Close the coordinator. Stop waiting for tasks and commands. Remove node from broker.
	//
	// Do not release tasks. The consumer will handle task releasing.
	Close()

	// Name of the coordinator for use in logs and other tooling.
	Name() string
}

Coordinator is the core interface Metafora uses to discover, claim, and tasks as well as receive commands.

type CoordinatorContext

type CoordinatorContext interface {
	// Lost is called by the Coordinator when a claimed task is lost to another
	// node. The Consumer will stop the task locally.
	//
	// Since this implies there is a window of time where the task is executing
	// more than once, this is a sign of an unhealthy cluster.
	Lost(Task)
}

CoordinatorContext is the context passed to coordinators by the core consumer.

type FairBalancer

type FairBalancer struct {
	// contains filtered or unexported fields
}

An implementation of Balancer which attempts to randomly release tasks in the case when the count of those currently running on this node is greater than some percentage of the cluster average (default 120%).

This balancer will claim all tasks which were not released on the last call to Balance.

func (*FairBalancer) Balance

func (e *FairBalancer) Balance() []string

Balance releases tasks if this node has 120% more tasks than the average node in the cluster.

func (*FairBalancer) CanClaim

func (e *FairBalancer) CanClaim(task Task) (time.Time, bool)

CanClaim rejects tasks for a period of time if the last balance released tasks. Otherwise all tasks are accepted.

func (*FairBalancer) Init

func (e *FairBalancer) Init(s BalancerContext)

type Handler

type Handler interface {
	// Run handles a task and blocks until completion or Stop is called.
	//
	// If Run returns true, Metafora will mark the task as Done via the
	// Coordinator. The task will not be rescheduled.
	//
	// If Run returns false, Metafora will Release the task via the Coordinator.
	// The task will be scheduled to run again.
	//
	// Panics are treated the same as returning true.
	Run() (done bool)

	// Stop signals to the handler to shutdown gracefully. Stop implementations
	// should not block until Run exits.
	//
	// Stop may be called more than once but calls are serialized. Implmentations
	// may perform different operations on subsequent calls to Stop to implement
	// graceful vs. forced shutdown conditions.
	//
	// Run probably wants to return false when stop is called, but this is left
	// up to the implementation as races between Run finishing and Stop being
	// called can happen.
	Stop()
}

Handler is the core task handling interface. The Consumer will create a new Handler for each claimed task, call Run once and only once, and call Stop when the task should persist its progress and exit.

type HandlerFunc

type HandlerFunc func(Task) Handler

HandlerFunc is called by the Consumer to create a new Handler for each task.

HandlerFunc is meant to be the New function for handlers. Since Run and Stop are called concurrently, any state used by both should be initialized in the HandlerFunc. Since Handlerfunc is uninterruptable, only the minimum amount of work necessary to initialize a handler should be done.

func SimpleHandler

func SimpleHandler(f func(t Task, stop <-chan bool) bool) HandlerFunc

SimpleHander creates a HandlerFunc for a simple function that accepts a stop channel. The channel will be closed when Stop is called.

type LogOutputter

type LogOutputter interface {
	Output(calldepth int, s string) error
}

type ResourceBalancer

type ResourceBalancer struct {
	// contains filtered or unexported fields
}

ResourceBalancer is a balancer implemntation which uses two thresholds to limit claiming and rebalance work based upon a resource reported by a ResourceReporter. When the claim threshold is exceeded, no new work will be claimed. When the release threshold is exceeded work will be released until below that threshold. The claim threshold must be less than the release threshold (otherwise claims would continue just to have the work rebalanced.)

Even below the claim limit, claims are delayed by the percent of resources used (in milliseconds) to give less loaded nodes a claim advantage.

The balancer releases the oldest tasks first (skipping those who are already stopping) to try to prevent rebalancing the same tasks repeatedly within a cluster.

func NewResourceBalancer

func NewResourceBalancer(src ResourceReporter, claimLimit, releaseLimit int) (*ResourceBalancer, error)

NewResourceBalancer creates a new ResourceBalancer or returns an error if the limits are invalid.

Limits should be a percentage expressed as an integer between 1 and 100 inclusive.

func (*ResourceBalancer) Balance

func (b *ResourceBalancer) Balance() []string

func (*ResourceBalancer) CanClaim

func (b *ResourceBalancer) CanClaim(string) bool

func (*ResourceBalancer) Init

func (b *ResourceBalancer) Init(ctx BalancerContext)

type ResourceReporter

type ResourceReporter interface {
	// Used returns the amount of a resource used and the total amount of that
	// resource.
	Used() (used uint64, total uint64)

	// String returns the unit resources are reported in.
	String() string
}

ResourceReporter is required by the ResourceBalancer to read the resource being used for balancing.

type RunningTask

type RunningTask interface {
	Task() Task

	// Started is the time the task was started by this consumer.
	Started() time.Time

	// Stopped is the first time Stop() was called on this task or zero is it has
	// yet to be called. Tasks may take an indeterminate amount of time to
	// shutdown after Stop() is called.
	Stopped() time.Time

	// Handler implementation called for this task.
	Handler() Handler
}

RunningTask represents tasks running within a consumer.

type SleepBalancer

type SleepBalancer struct {
	// contains filtered or unexported fields
}

SleepBalancer is a simplistic Balancer implementation which sleeps 30ms per claimed task in its CanClaim() method. This means the node with the fewest claimed tasks in a cluster should sleep the shortest length of time and win the claim race.

It never releases tasks during Balance() calls.

func (*SleepBalancer) Balance

func (*SleepBalancer) Balance() []string

Balance never returns any tasks for the sleepy balancer.

func (*SleepBalancer) CanClaim

func (b *SleepBalancer) CanClaim(string) bool

CanClaim sleeps 30ms per claimed task.

func (*SleepBalancer) Init

func (b *SleepBalancer) Init(ctx BalancerContext)

Init is called by the Consumer.

type Task

type Task interface {
	// ID is the immutable globally unique ID for this task.
	ID() string
}

Task is the minimum interface for Tasks to implement.

func NewTask

func NewTask(id string) Task

NewTask creates the most basic Task implementation: just a string ID.

Directories

Path Synopsis
cmd
Package metcdv3 contains implementations of all Metafora interfaces using etcd as the broker/backing store.
Package metcdv3 contains implementations of all Metafora interfaces using etcd as the broker/backing store.
testutil
Package testutil is a collection of utilities for use by Metafora's etcd tests.
Package testutil is a collection of utilities for use by Metafora's etcd tests.
Statemachine is a featureful statemachine implementation for Metafora handlers to use.
Statemachine is a featureful statemachine implementation for Metafora handlers to use.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL