dispatcher

package module
v0.0.0-...-af19a5a Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 4, 2019 License: GPL-3.0 Imports: 13 Imported by: 1

README

Dispatcher

Go Report Card GoDoc TravisCI Codecov

Overview

Dispatcher is an asynchronous task queue/job queue based on distributed message passing. Dispatcher can send tasks to queue and execute them asynchronously on different servers.

Goals:

  1. Reconnection ability and its configuration
  2. Graceful quit (stop all workers, wait until all tasks will be finished, close connection)
  3. Ability to create as many workers as we wish from only one connection
  4. Simplicity
  5. Ability to configure timeouts for tasks
  6. Ability to limit number of parallel task for every worker

Non goals:

  1. Handling results of executed tasks
Content
Getting started
Example

Sender: link

Worker: link

$ go get github.com/gofort/dispatcher
$ cd $GOPATH/src/github.com/gofort/dispatcher
$ go run examples/sender/sender.go             #sends one task as example
$ go run examples/worker/worker.go             #executes one task and waits until you send more
Installation

Requirements:

  1. Go > 1.6
  2. RabbitMQ
go get github.com/gofort/dispatcher
Before reading description

This library is created around AMQP protocol and this is why I advice to read RabbitMQ tutorial, because a lot of stuff are covered there. If you don't know what is queue or routing key it is a must.

Server

The main goal of server is to handle AMQP connection properly. If AMQP connection was broken, server should try to reconnect to it and when connection will be restored, all workers should be restored too. Server has also a publisher inside. Publisher has its own AMQP channel and it only sends messages.

This is full server configuration:

type ServerConfig struct {
    AMQPConnectionString        string
    ReconnectionRetries         int
    ReconnectionIntervalSeconds int64
    TLSConfig                   *tls.Config
    SecureConnection            bool
    DebugMode                   bool // for default logger only
    InitQueues                  []Queue
    Exchange                    string // required
    DefaultRoutingKey           string // required
    Logger                      Log
}

Configuration description:

  • AMQPConnectionString: example - amqp://guest:guest@localhost:5672/ (amqp://username:password@host:port)
  • ReconnectionRetries: number of retries of reconnecting to AMQP after connection problems.
  • ReconnectionIntervalSeconds: interval in seconds between reconnection retries.
  • SecureConnection: if true, uses TLSConfig with param InsecureSkipVerify.
  • DebugMode: extended logger, works only with default logger (logrus), if you use your custom logger, enable debug level in it yourself.
  • InitQueues: queues and their binding keys which server will create during first start.
  • Exchange: exchange which will be used by dispatcher.
  • DefaultRoutingKey: default routing key for publishing tasks (you can set routing key manually in every task if you want).
  • Logger: your custom logger which will be used everywhere in Dispatcher.

Custom logger interface:

type Log interface {
	Info(args ...interface{})
	Infof(format string, args ...interface{})
	Debug(args ...interface{})
	Debugf(format string, args ...interface{})
	Error(args ...interface{})
	Errorf(format string, args ...interface{})
}
Task
Structure

All tasks which workers handle and publisher sends have the following structure:

type Task struct {
	UUID       string                 `json:"uuid"`
	Name       string                 `json:"name"`
	RoutingKey string                 `json:"-"`
	Args       []TaskArgument         `json:"args"`
	Headers    map[string]interface{} `json:"-"`
}
type TaskArgument struct {
	Type  string      `json:"type"`
	Value interface{} `json:"value"`
}

Description of task structure:

  • UUID: task uuid, if empty - will be generated by dispatcher.
  • Name: task name, can't be empty, because when you register tasks in workers, workers won't handle tasks which are not registered in it and each task has it'w own function.
  • RoutingKey: if empty - task will be sended to default routing key (which is in server config).
  • Args: task arguments which consists of type and value.
  • Headers: are used to direct the task.

TaskArgument example:

args := []dispatcher.TaskArgument{
    {
        Type:  "string",
        Value: "simple string",
    },
    {
        Type:  "int",
        Value: 1,
    },
}

// Task with such arguments will call this function:
func SomeFunc(somestr string, someint int) {}
// or task can be with results (remember, dispatcher doesn't handle results - ignores them, save them yourself where you need):
func SomeFunc(somestr string, someint int) (string, error) {}

Available types of arguments:

  • bool
  • string
  • int int8 int16 int32 int64
  • uint uint8 uint16 uint32 uint64
  • float32 float64
Config

Task config is what you register in worker (during worker creation).

type TaskConfig struct {
	TimeoutSeconds     int64
	Function           interface{}
	TaskUUIDAsFirstArg bool
}

Description:

  • TimeoutSeconds - timeout after which worker will take new task (but this task won't be stopped!), it is made for a case when your task can be frozen by something.
  • Function - function which will be called when worker receive this task.
  • TaskUUIDAsFirstArg - because dispatcher doesn't handle results of your tasks, you should handle it yourself, this is why worker can pass task uuid as first argument to this type of task.
// TaskUUIDAsFirstArg = false
func SomeFunc(somestr string, someint int) {}
// TaskUUIDAsFirstArg = true
func SomeFunc(taskuuid string, somestr string, someint int) {}
How to send

Example: link

server, _, err := dispatcher.NewServer(&cfg)
if err != nil {
    log.Println(err.Error())
    return
}

task := dispatcher.Task{
    Name: "task_1",
    Args: []dispatcher.TaskArgument{
        {
            Type:  "string",
            Value: "simple string",
        },
        {
            Type:  "int",
            Value: 1,
        },
    },
}

// Here we sending task to a queue
if err := server.Publish(&task); err != nil {
    log.Println(err.Error())
    return
}
Worker

Worker receives tasks from queues and call functions which were registered in it. Worker calls functions with arguments which were in task. Remember, workers don't know anything about results of your tasks, handle them your self, which is why worker can pass task UUID (see TaskConfig) as first argument to functions if you wish. If somebody closes worker (you or server after all reconnection retries), worker will wait until all tasks which it called will be finished.

Example: link

This is full worker configuration:

type WorkerConfig struct {
	Limit       int
	Queue       string // required
	BindingKeys []string
	Name        string // required
}

Configuration description:

  • Limit - number of tasks which worker can handle concurrently.
  • Queue - queue which worker will consume.
  • BindingKeys - keys which worker will bind to queue during its creation.
  • Name - worker name (also consumer tag).

Tasks registering example:

tasks := make(map[string]dispatcher.TaskConfig)

// Task configuration where we pass function which will be executed by this worker when this task will be received
tasks["task_1"] = dispatcher.TaskConfig{
    Function: func(str string, someint int) {
        log.Printf("Example function arguments: string - %s, int - %d\n", str, someint)
        log.Println("Example function completed!")
    },
}

// This function creates worker, but he won't start to consume messages here
worker, err := server.NewWorker(&workercfg, tasks)
if err != nil {
    log.Println(err.Error())
    return
}
Development

Updating dependencies:

$ go get github.com/tools/godep            #if you don't have godep
$ go get -u
$ godep save

Testing:

For testing you should have set environment variable DISPATCHER_AMQP_CON which is equal to AMQP connection string, example: amqp://guest:guest@localhost:5672/

$ go test -v
Special thanks
  1. Richard Knop for his Machinery project which was an example for this project and a bit of code was taken from there.

Documentation

Overview

Package dispatcher is an asynchronous task queue/job queue based on distributed message passing.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Log

type Log interface {
	Info(args ...interface{})
	Infof(format string, args ...interface{})
	Debug(args ...interface{})
	Debugf(format string, args ...interface{})
	Error(args ...interface{})
	Errorf(format string, args ...interface{})
}

Log is an interface of logger which is used in dispatcher. By default dispatcher uses logrus. You can pass your own logger which fits this interface to dispatcher in server config.

type Queue

type Queue struct {
	Name        string
	BindingKeys []string
}

Queue for creating during server creation. Has name and binding keys in it.

type Server

type Server struct {
	// contains filtered or unexported fields
}

Server contains AMQP connection and creates publisher. Server is a parent of workers and publisher.

func NewServer

func NewServer(cfg *ServerConfig) (*Server, chan struct{}, error)

NewServer creates new server from config and connects to AMQP.

func (*Server) Close

func (s *Server) Close()

Close is a complicated function which handles graceful quit of everything which dispatcher has (workers, publisher and connection). At first it stops reconnection process, then it closes publisher, after this it closes all workers and waits until all of them will finish their tasks and closes their channels. After all of this it closes AMQP connection.

func (*Server) GetWorkerByName

func (s *Server) GetWorkerByName(name string) (*Worker, error)

GetWorkerByName returns a pointer to a Worker by its name.

func (*Server) NewWorker

func (s *Server) NewWorker(cfg *WorkerConfig, tasks map[string]TaskConfig) (*Worker, error)

NewWorker creates new worker instance. Takes WorkerConfig and map of TaskConfigs. Map of TaskConfigs needs for task registration inside of this worker.

func (Server) Publish

func (s Server) Publish(task *Task) error

Publish method is used for publishing tasks.

type ServerConfig

type ServerConfig struct {
	AMQPConnectionString        string
	ReconnectionRetriesForever  bool
	ReconnectionRetries         int
	ReconnectionIntervalSeconds int64
	TLSConfig                   *tls.Config
	SecureConnection            bool
	DebugMode                   bool // for default logger only
	InitQueues                  []Queue
	Exchange                    string // required
	DefaultRoutingKey           string // required
	Logger                      Log
}

ServerConfig is a configuration which needs for server creation.

AMQPConnectionString example: amqp://guest:guest@localhost:5672/

ReconnectionRetries - number of reconnection retries, when all retries exceed, server will be closed.

ReconnectionIntervalSeconds - interval in seconds between every retry.

SecureConnection - if true, uses TLSConfig with param InsecureSkipVerify.

DebugMode - if true, enables debug level in logger (by default dispatcher uses logrus and this option enables debug level in it, if you use your own logger, omit this option).

InitQueues - pass queues and binding keys to this field and server will create all of them if they don't exists.

DefaultRoutingKey - default routing key for publishing messages.

Logger - custom logger if you don't want to use dispatcher's default logrus.

type Task

type Task struct {
	UUID       string                 `json:"uuid"`
	Name       string                 `json:"name"`
	RoutingKey string                 `json:"-"`
	Args       []TaskArgument         `json:"args"`
	Headers    map[string]interface{} `json:"-"`
}

Task is a task which can be send to AMQP. Workers receive this tasks and handles them via parsing their arguments. You can pass exchange and routing key to task if you want, they will be used in publish function.

type TaskArgument

type TaskArgument struct {
	Type  string      `json:"type"`
	Value interface{} `json:"value"`
}

TaskArgument is an argument which will be passed to function. For example, task with such arguments will call the following function:

Arguments:

 t := []TaskArgument{
	  TaskArgument{
	  	Type: "int",
	  	Value: 3,
	  },
	  TaskArgument{
	  	Type: "string",
	  	Value: "I am a string",
	  },
 }

Function:

func (myInt int, myAwesomeString string) error {}

Types which can be used: bool, string, int int8 int16 int32 int64, uint uint8 uint16 uint32 uint64, float32 float64

type TaskConfig

type TaskConfig struct {
	TimeoutSeconds     int64
	Function           interface{}
	TaskUUIDAsFirstArg bool
}

TaskConfig is task configuration which is needed for task registration in worker. Contains function which will be called by worker and timeout. Timeout is needed in case your task executing for about half an hour but you expected only 1 minute. When timeout exceeded next task will be taken, but that old task will not be stopped. TaskUUIDAsFirstArg - makes task UUID as first argument of all tasks which this worker calls.

type Worker

type Worker struct {
	// contains filtered or unexported fields
}

Worker instance. Consists of channel which consume queue.

func (*Worker) Close

func (w *Worker) Close()

Close function gracefully closes worker. At first this function stops worker consuming, then waits until all started by this worker tasks will be finished after all of this it closes channel. This function is also used by server close function for graceful quit of all workers.

func (*Worker) Start

func (w *Worker) Start(s *Server) error

Start function starts consuming of queue. Needs server as an argument because only server contains AMQP connection and this function creates AMQP channel for a worker from connection.

type WorkerConfig

type WorkerConfig struct {
	Limit       int
	Queue       string // required
	BindingKeys []string
	Name        string // required
}

WorkerConfig is a configuration for new worker which you want to create.

Limit - number of parallel tasks which will be executed.

Queue - name of queue which worker will consume.

Binding keys - biding keys for queue which will be created.

Name - worker name

Directories

Path Synopsis
examples

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL