machinery: github.com/RichardKnop/machinery/v1 Index | Files | Directories

package machinery

import "github.com/RichardKnop/machinery/v1"

Index

Package Files

factories.go package.go server.go worker.go

Variables

var (
    // ErrWorkerQuitGracefully is return when worker quit gracefully
    ErrWorkerQuitGracefully = errors.New("Worker quit gracefully")
    // ErrWorkerQuitGracefully is return when worker quit abruptly
    ErrWorkerQuitAbruptly = errors.New("Worker quit abruptly")
)

func BackendFactory Uses

func BackendFactory(cnf *config.Config) (backendiface.Backend, error)

BackendFactory creates a new object of backends.Interface Currently supported backends are AMQP/S and Memcache

func BrokerFactory Uses

func BrokerFactory(cnf *config.Config) (brokeriface.Broker, error)

BrokerFactory creates a new object of iface.Broker Currently only AMQP/S broker is supported

func ParseGCPPubSubURL Uses

func ParseGCPPubSubURL(url string) (string, string, error)

ParseGCPPubSubURL Parse GCP Pub/Sub URL url: gcppubsub://YOUR_GCP_PROJECT_ID/YOUR_PUBSUB_SUBSCRIPTION_NAME

func ParseRedisSocketURL Uses

func ParseRedisSocketURL(url string) (path, password string, db int, err error)

ParseRedisSocketURL extracts Redis connection options from a URL with the redis+socket:// scheme. This scheme is not standard (or even de facto) and is used as a transitional mechanism until the the config package gains the proper facilities to support socket-based connections.

func ParseRedisURL Uses

func ParseRedisURL(url string) (host, password string, db int, err error)

ParseRedisURL ...

type Server Uses

type Server struct {
    // contains filtered or unexported fields
}

Server is the main Machinery object and stores all configuration All the tasks workers process are registered against the server

func NewServer Uses

func NewServer(cnf *config.Config) (*Server, error)

NewServer creates Server instance

func NewServerWithBrokerBackend Uses

func NewServerWithBrokerBackend(cnf *config.Config, brokerServer brokersiface.Broker, backendServer backendsiface.Backend) *Server

NewServerWithBrokerBackend ...

func (*Server) GetBackend Uses

func (server *Server) GetBackend() backendsiface.Backend

GetBackend returns backend

func (*Server) GetBroker Uses

func (server *Server) GetBroker() brokersiface.Broker

GetBroker returns broker

func (*Server) GetConfig Uses

func (server *Server) GetConfig() *config.Config

GetConfig returns connection object

func (*Server) GetRegisteredTask Uses

func (server *Server) GetRegisteredTask(name string) (interface{}, error)

GetRegisteredTask returns registered task by name

func (*Server) GetRegisteredTaskNames Uses

func (server *Server) GetRegisteredTaskNames() []string

GetRegisteredTaskNames returns slice of registered task names

func (*Server) IsTaskRegistered Uses

func (server *Server) IsTaskRegistered(name string) bool

IsTaskRegistered returns true if the task name is registered with this broker

func (*Server) NewCustomQueueWorker Uses

func (server *Server) NewCustomQueueWorker(consumerTag string, concurrency int, queue string) *Worker

NewCustomQueueWorker creates Worker instance with Custom Queue

func (*Server) NewWorker Uses

func (server *Server) NewWorker(consumerTag string, concurrency int) *Worker

NewWorker creates Worker instance

func (*Server) RegisterTask Uses

func (server *Server) RegisterTask(name string, taskFunc interface{}) error

RegisterTask registers a single task

func (*Server) RegisterTasks Uses

func (server *Server) RegisterTasks(namedTaskFuncs map[string]interface{}) error

RegisterTasks registers all tasks at once

func (*Server) SendChain Uses

func (server *Server) SendChain(chain *tasks.Chain) (*result.ChainAsyncResult, error)

SendChain triggers a chain of tasks

func (*Server) SendChainWithContext Uses

func (server *Server) SendChainWithContext(ctx context.Context, chain *tasks.Chain) (*result.ChainAsyncResult, error)

SendChainWithContext will inject the trace context in all the signature headers before publishing it

func (*Server) SendChord Uses

func (server *Server) SendChord(chord *tasks.Chord, sendConcurrency int) (*result.ChordAsyncResult, error)

SendChord triggers a group of parallel tasks with a callback

func (*Server) SendChordWithContext Uses

func (server *Server) SendChordWithContext(ctx context.Context, chord *tasks.Chord, sendConcurrency int) (*result.ChordAsyncResult, error)

SendChordWithContext will inject the trace context in all the signature headers before publishing it

func (*Server) SendGroup Uses

func (server *Server) SendGroup(group *tasks.Group, sendConcurrency int) ([]*result.AsyncResult, error)

SendGroup triggers a group of parallel tasks

func (*Server) SendGroupWithContext Uses

func (server *Server) SendGroupWithContext(ctx context.Context, group *tasks.Group, sendConcurrency int) ([]*result.AsyncResult, error)

SendGroupWithContext will inject the trace context in all the signature headers before publishing it

func (*Server) SendTask Uses

func (server *Server) SendTask(signature *tasks.Signature) (*result.AsyncResult, error)

SendTask publishes a task to the default queue

func (*Server) SendTaskWithContext Uses

func (server *Server) SendTaskWithContext(ctx context.Context, signature *tasks.Signature) (*result.AsyncResult, error)

SendTaskWithContext will inject the trace context in the signature headers before publishing it

func (*Server) SetBackend Uses

func (server *Server) SetBackend(backend backendsiface.Backend)

SetBackend sets backend

func (*Server) SetBroker Uses

func (server *Server) SetBroker(broker brokersiface.Broker)

SetBroker sets broker

func (*Server) SetConfig Uses

func (server *Server) SetConfig(cnf *config.Config)

SetConfig sets config

func (*Server) SetPreTaskHandler Uses

func (server *Server) SetPreTaskHandler(handler func(*tasks.Signature))

SetPreTaskHandler Sets pre publish handler

type Worker Uses

type Worker struct {
    ConsumerTag string
    Concurrency int
    Queue       string
    // contains filtered or unexported fields
}

Worker represents a single worker process

func (*Worker) CustomQueue Uses

func (worker *Worker) CustomQueue() string

CustomQueue returns Custom Queue of the running worker process

func (*Worker) GetServer Uses

func (worker *Worker) GetServer() *Server

GetServer returns server

func (*Worker) Launch Uses

func (worker *Worker) Launch() error

Launch starts a new worker process. The worker subscribes to the default queue and processes incoming registered tasks

func (*Worker) LaunchAsync Uses

func (worker *Worker) LaunchAsync(errorsChan chan<- error)

LaunchAsync is a non blocking version of Launch

func (*Worker) Process Uses

func (worker *Worker) Process(signature *tasks.Signature) error

Process handles received tasks and triggers success/error callbacks

func (*Worker) Quit Uses

func (worker *Worker) Quit()

Quit tears down the running worker process

func (*Worker) SetErrorHandler Uses

func (worker *Worker) SetErrorHandler(handler func(err error))

SetErrorHandler sets a custom error handler for task errors A default behavior is just to log the error after all the retry attempts fail

func (*Worker) SetPostTaskHandler Uses

func (worker *Worker) SetPostTaskHandler(handler func(*tasks.Signature))

SetPostTaskHandler sets a custom handler for the end of a job

func (*Worker) SetPreTaskHandler Uses

func (worker *Worker) SetPreTaskHandler(handler func(*tasks.Signature))

SetPreTaskHandler sets a custom handler func before a job is started

Directories

PathSynopsis
backends
backends/amqp
backends/dynamodb
backends/eager
backends/iface
backends/memcache
backends/mongo
backends/null
backends/redis
backends/result
brokers
brokers/amqp
brokers/eager
brokers/errs
brokers/gcppubsub
brokers/iface
brokers/redis
brokers/sqs
common
config
log
retry
tasks
tracing

Package machinery imports 34 packages (graph) and is imported by 42 packages. Updated 2020-03-04. Refresh now. Tools for package owners.