radish

package module
v0.0.0-...-36fb391 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 23, 2020 License: BSD-3-Clause Imports: 15 Imported by: 0

README

Radish

GoDoc Go Report Card Build Status

Radish is a stateless asynchronous task queue and handler framework. Radish is designed to maximize the resources of a single node by being able to flexibly increase and decrease the number of worker go routines that handle tasks. A radish server allows users to scale the number of workers that can handle generic tasks, add tasks to the queue, and reports metrics to prometheus for easy tracking and management. Radish also provides a CLI program for interacting with servers that are running the radish service.

Radish is intended to be used as a framework to create asynchronous task handling services that do not rely on an intermediate message broker like RabbitMQ or Redis. The statelessness of Radish makes it much simpler to use, but also does not guarantee fault tolerance in task handling. It is up to the application using Radish to determine how to handle task scheduling and timeouts as well as success and failure callbacks. The way applications do this is by defining tasks handlers that implement the Task interface and registering them with the radish server. Tasks can then be queued using the Delay method or by submitting a Queue request to the API server. On success or failure, the worker will call one of the handlers callback methods then move on to the next task.

Task Handlers

A task handler is implemented by defining a struct that implements the Task interface and registering it with the radish task queue. Custom tasks must specify a Name() method that uniquely identifies the type of task it is (which is also used when queueing tasks) as well as a Handle() method. The Handle() method must accept a uuid, which describes the future being handled (in case the application wants to implement statefulness) as well as generic parameters as a byte slice. We have chosen []byte for parameters so that applications can define any serialization format they choose, e.g. json or protobuf.

type SendEmail struct {}

func (t *SendEmail) Name() string {
    return "sendEmail"
}

func (t *SendEmail) Handle(id uuid.UUID, params []byte) error {}

func (t *SendEmail) Success(id uuid.UUID, params []byte) {}

func (t *SendEmail) Failure(id uuid.UUID, err error, params []byte) {}

Task handlers may also implement two callbacks: Success() and Failure(). Both of these callbacks take parameters that are specific to those methods and must be provided with the task being queued. The Failure() method will additionally be passed the error that caused the task to fail.

Radish Quick Start

Once we have defined our custom task handlers, we can register them and begin delaying tasks for asynchronous processing. If we have two task handlers, SendEmail and DailyReport whose names are "sendEmail" and "dailyReport" respectively, then the simplest way we can get started is as follows:

queue, err := radish.New(nil, new(SendEmail), new(DailyReport))
id, err := queue.Delay("sendEmail", []byte("jdoe@example.com"), nil, nil)
id, err := queue.Delay("dailyReport", []byte("2020-04-07"), nil, nil)

When the task queue is created, it immediately launches workers (1 per CPU on the machine) to start handling tasks. You can then delay tasks, which will return the unique id of the future of the task (which you can use for book keeping in success or failure). In this example, the tasks are submited with an email and an address, but no parameters for success or failure handling.

Configuring Radish

More detailed configuration and registration is possible with radish. In the quick start example we submitted a nil configuration as the first argument to New() - this allowed us to set reasonable defaults for the radish queue. We can configure it more specifically using the Config object:

config := &radish.Config{Workers: 4, QueueSize: 10000}
queue, err := radish.New(config)

The config is validated when it is created and any invalid configurations will return an error when the queue is created. We can also manually register tasks with the queue (and register tasks at runtime) as follows:

err := queue.Register(new(SendEmail))

This allows the queue to be dynamic and handle different tasks at different times. It is also possible to scale the number of workers at runtime:

queue.AddWorkers(8)
queue.RemoveWorkers(2)
queue.SetWorkers(4)
queue.NumWorkers()

The queue can also be scaled and tasks delayed using the Radish service.

Radish Service

Radish implements a gRPC API so that remote clients can connect and get the queue status, delay tasks, and scale the number of workers. The simplest way to run this service is as follows:

queue.Listen()

This wil serve on the address and port specified in the configuration and block until an interrupt signal is received from the OS, which will shutdown the queue. Applications can also manually call:

queue.Shutdown()

To gracefully shutdown the queue, completing any tasks that are in flight and not accepting new tasks if they run the listener in its own go routine. Applications that need to specify their own services using gRPC or http servers can manually run the service as follows:

sock, err := net.Listen("tcp", "0.0.0.0:80")
srv := grpc.NewSever()
api.RegisterRadishServer(srv, queue)
// Register additional gRPC services here

srv.Serve(sock)

The radish CLI command can then be used to access the service and submit tasks.

Metrics

Radish also serves a metrics endpoint that can be polled by Prometheus. Radish keeps track of the following metrics associated with the task queue:

  • radish.workers: A gauge that tracks the number of workers over time as users issue scale requests.
  • radish.queue_size: A gauge that tracks the number of the tasks in the queue currently awaiting handling.
  • radish.percent_full: A gauge that tracks the relative fullness of the task queue based on the configured queue size.
  • radish.tasks_succeeded: A counter that tracks the number of tasks that have been handled and succeeded, labeled by task name.
  • radish.tasks_failed: A counter that tracks the number of tasks that have been handled and failed, labeled by task name.
  • radish.task_latency: A histogram that tracks the amount of time it takes to handle the task and its success or failure callback in milliseconds; labeled by task name and result (success or failure).

Coming soon: If you have your own Prometheus endpoint, you will be able to register Radish metrics manually without serving them in Radish.

Radish CLI

The radish CLI utility is found in cmd/radish and can be installed as follows:

$ go get github.com/kansaslabs/radish/cmd/radish

This utility allows you to interact with any radish server and can be used to manage your task queue services out of the box. You can view the commands and options using radish --help. In order to connect to a radish server you need to specify options as follows:

$ radish -a localhost:5356 -U

This connects radish to a server on port 5356 on the local host without TLS (the -U stands for "unsecure"). Note that you can also use the $RADISH_ENDPOINT and $RADISH_UNSECURE environment variables.

The misspelling of "unsecure" is a joke, radish is not insecure it's just not connecting with encryption.

After the connection options are specified you can use a command to interact with the server. For example to set the number of workers you can use the scale command:

$ radish -a localhost:5356 -U scale -w 12

To get the status of the server and the currently registered tasks you can use the status command:

$ radish -a localhost:5356 -U status

Finally, once you know the names of the tasks that the radish server is handling, you can queue tasks as follows:

$ radish -a localhost:5356 -U queue -t mytask -p '{"my": "data"}'

The CLI interface is meant to help you get quickly started with Radish task queues without having to write your own interfaces or servers.

Turnip

An example metrics server with tasks that simply wait and have a random chance of failure is defined in cmd/turnip. This server is also used to benchmark Radish performance and throughput with variable length tasks. See the examples/README.md for more on how to get started with Turnip.

To build the Turnip image ensure you're in the root of the repository:

$ docker build -t kansaslabs/turnip:latest -f examples/Dockerfile .

KansasLabs administrators can then push this image to Dockerhub as follows:

$ docker push kansaslabs/turnip:latest

Documentation

Overview

Package radish is a stateless asynchronous task queue and handler framework. Radish is designed to maximize the resources of a single node by being able to flexibly increase and decrease the number of worker go routines that handle tasks. A radish server allows users to scale the number of workers that can handle generic tasks, add tasks to the queue, and reports metrics to prometheus for easy tracking and management. Radish also provides a CLI program for interacting with servers that are running the radish service.

Radish is intended to be used as a framework to create asynchronous task handling services that do not rely on an intermediate message broker like RabbitMQ or Redis. The statelessness of Radish makes it much simpler to use, but also does not guarantee fault tolerance in task handling. It is up to the application using Radish to determine how to handle task scheduling and timeouts as well as success and failure callbacks. The way applications do this is by defining tasks handlers that implement the Task interface and registering them with the radish server. Tasks can then be queued using the Delay method or by submitting a Queue request to the API server. On success or failure, the worker will call one of the handlers callback methods then move on to the next task.

Task Handlers

A task handler is implemented by defining a struct that implements the Task interface and registering it with the Radish task queue. Custom tasks must specify a Name method that uniquely identifies the type of task it is (which is also used when queueing tasks) as well as a Handle method. The Handle method must accept a uuid, which describes the future being handled (in case the application wants to implement statefulness) as well as generic parameters as a byte slice. We have chosen []byte for parameters so that applications can define any serialization format they choose, e.g. json or protobuf.

type SendEmail struct {}

func (t *SendEmail) Name() string {
	return "sendEmail"
}

func (t *SendEmail) Handle(id uuid.UUID, params []byte) error {}

func (t *SendEmail) Success(id uuid.UUID, params []byte) {}

func (t *SendEmail) Failure(id uuid.UUID, err error, params []byte) {}

Task handlers may also implement two callbacks: Success and Failure. Both of these callbacks take parameters that are specific to those methods and must be provided with the task being queued. The Failure method will additionally be passed the error that caused the task to fail.

Radish Quick Start

Once we have defined our custom task handlers, we can register them and begin delaying tasks for asynchronous processing. If we have two task handlers, SendEmail and DailyReport whose names are "sendEmail" and "dailyReport" respectively, then the simplest way we can get started is as follows:

queue, err := radish.New(nil, new(SendEmail), new(DailyReport))
id, err := queue.Delay("sendEmail", []byte("jdoe@example.com"), nil, nil)
id, err := queue.Delay("dailyReport", []byte("2020-04-07"), nil, nil)

When the task queue is created, it immediately launches workers (1 per CPU on the machine) to start handling tasks. You can then delay tasks, which will return the unique id of the future of the task (which you can use for book keeping in success or failure). In this example, the tasks are submited with an email and an address, but no parameters for success or failure handling.

Configuring Radish

More detailed configuration and registration is possible with radish. In the quick start example we submitted a nil configuration as the first argument to New - this allowed us to set reasonable defaults for the radish queue. We can configure it more specifically using the Config object:

config := &radish.Config{Workers: 4, QueueSize: 10000}
queue, err := radish.New(config)

The config is validated when it is created and any invalid configurations will return an error when the queue is created. We can also manually register tasks with the queue (and register tasks at runtime) as follows:

err := queue.Register(new(SendEmail))

This allows the queue to be dynamic and handle different tasks at different times. It is also possible to scale the number of workers at runtime:

queue.AddWorkers(8)
queue.RemoveWorkers(2)
queue.SetWorkers(4)
queue.NumWorkers()

The queue can also be scaled and tasks delayed using the Radish service.

Radish Service

Radish implements a gRPC API so that remote clients can connect and get the queue status, delay tasks, and scale the number of workers. The simplest way to run this service is as follows:

queue.Listen()

This wil serve on the address and port specified in the configuration and block until an interrupt signal is received from the OS, which will shutdown the queue. Applications can also manually call:

queue.Shutdown()

To gracefully shutdown the queue, completing any tasks that are in flight and not accepting new tasks if they run the listener in its own go routine. Applications that need to specify their own services using gRPC or http servers can manually run the service as follows:

sock, err := net.Listen("tcp", "0.0.0.0:80")
srv := grpc.NewSever()
api.RegisterRadishServer(srv, queue)
// Register additional gRPC services here

srv.Serve(sock)

The radish CLI command can then be used to access the service and submit tasks.

Metrics

Radish also serves a metrics endpoint that can be polled by Prometheus. Radish keeps track of the following metrics associated with the task queue:

  • radish.workers: A gauge that tracks the number of workers over time as users issue scale requests.
  • radish.queue_size: A gauge that tracks the number of the tasks in the queue currently awaiting handling.
  • radish.percent_full: A gauge that tracks the relative fullness of the task queue based on the configured queue size.
  • radish.tasks_succeeded: A counter that tracks the number of tasks that have been handled and succeeded, labeled by task name.
  • radish.tasks_failed: A counter that tracks the number of tasks that have been handled and failed, labeled by task name.
  • radish.task_latency: A histogram that tracks the amount of time it takes to handle the task in milliseconds; labeled by task name and result.

Coming soon: If you have your own Prometheus endpoint, you will be able to register Radish metrics manually without serving them in Radish.

Radish CLI

The radish CLI utility is found in `cmd/radish` and can be installed as follows:

$ go get github.com/kansaslabs/radish/cmd/radish

This utility allows you to interact with any radish server and can be used to manage your task queue services out of the box. You can view the commands and options using the --help flag. In order to connect to a radish server you need to specify options as follows:

$ radish -a localhost:5356 -U

This connects radish to a server on port 5356 on the local host without TLS (the -U stands for "unsecure"). Note that you can also use the $RADISH_ENDPOINT and $RADISH_UNSECURE environment variables. The misspelling of "unsecure" is a joke, radish is not insecure it's just not connecting with encryption.

After the connection options are specified you can use a command to interact with the server. For example to set the number of workers you can use the scale command:

$ radish -a localhost:5356 -U scale -w 12

To get the status of the server and the currently registered tasks you can use the status command:

$ radish -a localhost:5356 -U status

Finally, once you know the names of the tasks that the radish server is handling, you can queue tasks as follows:

$ radish -a localhost:5356 -U queue -t mytask -p '{"my": "data"}'

The CLI interface is meant to help you get quickly started with Radish task queues without having to write your own interfaces or servers.

Index

Constants

View Source
const (
	ErrUnknown int32 = iota
	ErrInvalidConfig
	ErrTaskAlreadyRegistered
	ErrTaskNotRegistered
	ErrNoWorkers
	ErrInvalidWorkers
	ErrBadGateway
)

Error codes that are common to the radish server

View Source
const PackageVersion = "1.0"

PackageVersion of the current Radish implementation

Variables

This section is empty.

Functions

func Errorf

func Errorf(code int32, format string, a ...interface{}) error

Errorf is a passthrough to api.Errorf, implemented here to allow for radish.Errorf calls.

Types

type Config

type Config struct {
	QueueSize        int    // specifies the size of the tasks channel, delay requests will block if the queue is full (default 5000, cannot be 0)
	Workers          int    // the number of workers to start radish with (default is num cpus)
	Addr             string // server address to listen on (default :5356)
	MetricsAddr      string // address to serve prometheus metrics on (default :9090)
	SuppressMetrics  bool   // do not register or serve prometheus metrics (default false)
	LogLevel         string // the level to log at (default is info)
	CautionThreshold uint   // the number of messages accumulated before issuing another caution
}

Config allows you to specify runtime options to the Radish server and job queue.

func (*Config) Validate

func (c *Config) Validate() (err error)

Validate the config and populate any defaults for zero valued configurations

type Future

type Future struct {
	ID      uuid.UUID // Task ID
	Task    string    // Task type
	Params  []byte    // the serialized parameters of the future
	Success []byte    // the serialized parameters to pass to the success function
	Failure []byte    // the serialized parameters to pass to the failure function on error
}

Future represents an enqueued task and its serialized parameters

type Radish

type Radish struct {
	sync.RWMutex // server concurrency control for both workers and registration
	// contains filtered or unexported fields
}

Radish is a stateless task queue. It listens to requests via the gRPC api to enqueue tasks (or they can be enqueued directly in code) and manages workers to handle each task in the order they are received. Before running the server, tasks must be registered so that the Radish queue knows how to handle them.

func New

func New(config *Config, tasks ...Task) (r *Radish, err error)

New creates a Radish object with the specified config and registers the specified task handlers. If the handler cannot be registered or the config is invalid an error is returned.

func (*Radish) AddWorkers

func (r *Radish) AddWorkers(n int) (err error)

AddWorkers to process tasks. Note that this is thread-safe but does start go routines.

func (*Radish) Delay

func (r *Radish) Delay(task string, params, success, failure []byte) (id uuid.UUID, err error)

Delay creates a new future and adds it to the task queue if the handler has been registered.

func (*Radish) Handler

func (r *Radish) Handler(task string) (handler Task, err error)

Handler is a thread-safe mechanism to fetch a task handler or check if it exists.

func (*Radish) Listen

func (r *Radish) Listen() (err error)

Listen on the configured address and port for API requests and run prometheus metrics server.

func (*Radish) NumWorkers

func (r *Radish) NumWorkers() int

NumWorkers returns the number of currently running workers

func (*Radish) Queue

func (r *Radish) Queue(ctx context.Context, in *api.QueueRequest) (rep *api.QueueReply, err error)

Queue an asynchronous task from a gRPC request.

func (*Radish) Register

func (r *Radish) Register(task Task) (err error)

Register a task handler with the Radish task queue.

func (*Radish) RemoveWorkers

func (r *Radish) RemoveWorkers(n int) (err error)

RemoveWorkers by stopping them gracefully after they've completed the given task.

func (*Radish) Scale

func (r *Radish) Scale(ctx context.Context, in *api.ScaleRequest) (rep *api.ScaleReply, err error)

Scale the number of workers on the server.

func (*Radish) SetWorkers

func (r *Radish) SetWorkers(n int) (err error)

SetWorkers to the specified number of workers. Does nothing if n == number of workers that are running. Adds workers if n > number of workers and removes workers if n > number of workers.

func (*Radish) Shutdown

func (r *Radish) Shutdown() (err error)

Shutdown the queue gracefully, stopping the server, completing any tasks in flight and stopping workers. Tasks cannot be delayed after shutdown is called.

func (*Radish) Status

func (r *Radish) Status(ctx context.Context, in *api.StatusRequest) (rep *api.StatusReply, err error)

Status returns information about the state of the radish task queue.

type Task

type Task interface {
	Name() string                                   // should return a unique name for the specified task
	Handle(id uuid.UUID, params []byte) error       // handle the task with the specified params in any serialization format
	Success(id uuid.UUID, params []byte)            // callback for when the task has successfully been completed without error
	Failure(id uuid.UUID, err error, params []byte) // callback for when the task could not be completed with the error
}

Task specifies the interface for custom task types to be implemented. When registring a task with the radish server, it is important to note that the task methods may be called from multiple go routines; therefore the Handle, Success, and Failure methods must all be thread safe. You can treat the task similar to an http.Handler, constructing per-request handling functions on demand.

TODO: require context and deadlines for task completion. Move ID to the context.

Directories

Path Synopsis
Package api defines the Radish gRPC service.
Package api defines the Radish gRPC service.
cmd
radish
The radish cli program is a utility for interacting with the radish service.
The radish cli program is a utility for interacting with the radish service.
turnip
The turnip command demonstrates how to use the Radish framework.
The turnip command demonstrates how to use the Radish framework.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL