Documentation ¶
Overview ¶
Package gocelery is Celery Distributed Task Queue in Go
Celery distributed tasks are used heavily in many python web applications and this library allows you to implement celery workers in Go as well as being able to submit celery tasks in Go.
This package can also be used as pure go distributed task Queue.
Supported brokers/backends
- Redis (broker/backend)
- AMQP (broker/backend)
Celery must be configured to use json instead of default pickle encoding. This is because Go currently has no stable support for decoding pickle objects. Pass below configuration parameters to use json.
CELERY_TASK_SERIALIZER='json' CELERY_ACCEPT_CONTENT=['json'] # Ignore other content CELERY_RESULT_SERIALIZER='json' CELERY_ENABLE_UTC=True
Index ¶
- func GetRealValue(val *reflect.Value) interface{}
- func NewAMQPConnection(host string) (*amqp.Connection, *amqp.Channel)
- func NewRedisPool(uri string) *redis.Pool
- func SetLogger(logger Logger)
- type AMQPCeleryBackend
- type AMQPCeleryBroker
- func (b *AMQPCeleryBroker) CreateExchange() error
- func (b *AMQPCeleryBroker) CreateQueue() error
- func (b *AMQPCeleryBroker) GetAckLate() bool
- func (b *AMQPCeleryBroker) GetConnection() *amqp.Connection
- func (b *AMQPCeleryBroker) GetExchange() string
- func (b *AMQPCeleryBroker) GetQueue() string
- func (b *AMQPCeleryBroker) GetTaskMessage() (*TaskMessage, *amqp.Delivery, error)
- func (b *AMQPCeleryBroker) SendCeleryMessage(message *CeleryMessage) error
- type AMQPCeleryBrokerConfig
- type AMQPCeleryExchangeConfig
- type AMQPCeleryQueueConfig
- type AMQPExchange
- type AMQPQueue
- type AsyncResult
- type CeleryBackend
- type CeleryBroker
- type CeleryClient
- func (cc *CeleryClient) ApplyAsync(task string, exchange string, routingKey string, kwargs interface{}, ...) (*AsyncResult, error)
- func (cc *CeleryClient) Delay(task string, args ...interface{}) (*AsyncResult, error)
- func (cc *CeleryClient) DelayKwargs(task string, args map[string]interface{}) (*AsyncResult, error)
- func (cc *CeleryClient) Register(name string, task interface{})
- func (cc *CeleryClient) StartWorker()
- func (cc *CeleryClient) StopWorker()
- type CeleryDeliveryInfo
- type CeleryLogger
- func (c *CeleryLogger) Debug(args ...interface{})
- func (c *CeleryLogger) Debugf(format string, args ...interface{})
- func (c *CeleryLogger) Error(args ...interface{})
- func (c *CeleryLogger) Errorf(format string, args ...interface{})
- func (c *CeleryLogger) Fatal(args ...interface{})
- func (c *CeleryLogger) Fatalf(format string, args ...interface{})
- func (c *CeleryLogger) Info(args ...interface{})
- func (c *CeleryLogger) Infof(format string, args ...interface{})
- func (c *CeleryLogger) Panic(args ...interface{})
- func (c *CeleryLogger) Panicf(format string, args ...interface{})
- func (c *CeleryLogger) Print(args ...interface{})
- func (c *CeleryLogger) Warn(args ...interface{})
- func (c *CeleryLogger) Warnf(format string, args ...interface{})
- type CeleryMessage
- type CeleryProperties
- type CeleryTask
- type CeleryWorker
- func (w *CeleryWorker) GetNumWorkers() int
- func (w *CeleryWorker) GetTask(name string) interface{}
- func (w *CeleryWorker) Register(name string, task interface{})
- func (w *CeleryWorker) RunTask(message *TaskMessage) (*ResultMessage, error)
- func (w *CeleryWorker) StartWorker()
- func (w *CeleryWorker) StopWorker()
- type DeliveredMessage
- type Logger
- type RedisCeleryBackend
- type RedisCeleryBroker
- type ResultMessage
- type TaskMessage
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func GetRealValue ¶
GetRealValue returns real value of reflect.Value Required for JSON Marshalling
func NewAMQPConnection ¶
func NewAMQPConnection(host string) (*amqp.Connection, *amqp.Channel)
NewAMQPConnection creates new AMQP channel
func NewRedisPool ¶
NewRedisPool creates pool of redis connections from given uri
Types ¶
type AMQPCeleryBackend ¶
AMQPCeleryBackend CeleryBackend for AMQP
func NewAMQPCeleryBackend ¶
func NewAMQPCeleryBackend(host string) *AMQPCeleryBackend
NewAMQPCeleryBackend creates new AMQPCeleryBackend
func NewAMQPCeleryBackendByConnAndChannel ¶
func NewAMQPCeleryBackendByConnAndChannel(conn *amqp.Connection, channel *amqp.Channel) *AMQPCeleryBackend
NewAMQPCeleryBackendByConnAndChannel creates new AMQPCeleryBackend by AMQP conn and channel
func (*AMQPCeleryBackend) GetResult ¶
func (b *AMQPCeleryBackend) GetResult(taskID string) (*ResultMessage, error)
GetResult retrieves result from AMQP Queue
func (*AMQPCeleryBackend) Reconnect ¶
func (b *AMQPCeleryBackend) Reconnect()
Reconnect reconnects to AMQP server
func (*AMQPCeleryBackend) SetResult ¶
func (b *AMQPCeleryBackend) SetResult(taskID string, result *ResultMessage) error
SetResult sets result back to AMQP Queue
type AMQPCeleryBroker ¶
type AMQPCeleryBroker struct { *amqp.Channel Connection *amqp.Connection Exchange *AMQPExchange Queue *AMQPQueue // contains filtered or unexported fields }
AMQPCeleryBroker is RedisBroker for AMQP
func NewAMQPCeleryBroker ¶
func NewAMQPCeleryBroker(conn *amqp.Connection, channel *amqp.Channel, config *AMQPCeleryBrokerConfig) *AMQPCeleryBroker
NewAMQPCeleryBrokerRaw creates new AMQPCeleryBroker using AMQP conn,channel,Exchange, and Queue
func (*AMQPCeleryBroker) CreateExchange ¶
func (b *AMQPCeleryBroker) CreateExchange() error
CreateExchange declares AMQP Exchange with stored configuration
func (*AMQPCeleryBroker) CreateQueue ¶
func (b *AMQPCeleryBroker) CreateQueue() error
CreateQueue declares AMQP Queue with stored configuration
func (*AMQPCeleryBroker) GetAckLate ¶
func (b *AMQPCeleryBroker) GetAckLate() bool
func (*AMQPCeleryBroker) GetConnection ¶
func (b *AMQPCeleryBroker) GetConnection() *amqp.Connection
func (*AMQPCeleryBroker) GetExchange ¶
func (b *AMQPCeleryBroker) GetExchange() string
func (*AMQPCeleryBroker) GetQueue ¶
func (b *AMQPCeleryBroker) GetQueue() string
func (*AMQPCeleryBroker) GetTaskMessage ¶
func (b *AMQPCeleryBroker) GetTaskMessage() (*TaskMessage, *amqp.Delivery, error)
GetTaskMessage retrieves task message from AMQP Queue
func (*AMQPCeleryBroker) SendCeleryMessage ¶
func (b *AMQPCeleryBroker) SendCeleryMessage(message *CeleryMessage) error
SendCeleryMessage sends CeleryMessage to broker
type AMQPCeleryBrokerConfig ¶
type AMQPCeleryBrokerConfig struct { AckLate bool PrefetchCount int Exchange AMQPCeleryExchangeConfig Queue AMQPCeleryQueueConfig CreateExchange bool CreateQueue bool }
type AMQPCeleryQueueConfig ¶
type AMQPExchange ¶
AMQPExchange stores AMQP Exchange configuration
func NewAMQPExchange ¶
func NewAMQPExchange(config AMQPCeleryExchangeConfig) *AMQPExchange
NewAMQPExchange creates new AMQPExchange
type AMQPQueue ¶
AMQPQueue stores AMQP Queue configuration
func NewAMQPQueue ¶
func NewAMQPQueue(config AMQPCeleryQueueConfig) *AMQPQueue
NewAMQPQueue creates new AMQPQueue
type AsyncResult ¶
type AsyncResult struct {
// contains filtered or unexported fields
}
AsyncResult is pending result
func (*AsyncResult) AsyncGet ¶
func (ar *AsyncResult) AsyncGet() (interface{}, error)
AsyncGet gets actual result from redis and returns nil if not available
func (*AsyncResult) Get ¶
func (ar *AsyncResult) Get(timeout time.Duration) (interface{}, error)
Get gets actual result from redis It blocks for period of time set by timeout and return error if unavailable
func (*AsyncResult) Ready ¶
func (ar *AsyncResult) Ready() (bool, error)
Ready checks if actual result is ready
type CeleryBackend ¶
type CeleryBackend interface { GetResult(string) (*ResultMessage, error) // must be non-blocking SetResult(taskID string, result *ResultMessage) error }
CeleryBackend is interface for celery backend database
type CeleryBroker ¶
type CeleryBroker interface { SendCeleryMessage(*CeleryMessage) error GetTaskMessage() (*TaskMessage, *amqp.Delivery, error) // must be non-blocking GetAckLate() bool GetExchange() string GetQueue() string GetConnection() *amqp.Connection }
CeleryBroker is interface for celery broker database
type CeleryClient ¶
type CeleryClient struct { Broker CeleryBroker // contains filtered or unexported fields }
CeleryClient provides API for sending celery tasks
func NewCeleryClient ¶
func NewCeleryClient(broker CeleryBroker, backend CeleryBackend, numWorkers int) (*CeleryClient, error)
NewCeleryClient creates new celery client
func (*CeleryClient) ApplyAsync ¶
func (cc *CeleryClient) ApplyAsync(task string, exchange string, routingKey string, kwargs interface{}, args ...interface{}) (*AsyncResult, error)
func (*CeleryClient) Delay ¶
func (cc *CeleryClient) Delay(task string, args ...interface{}) (*AsyncResult, error)
Delay gets asynchronous result
func (*CeleryClient) DelayKwargs ¶
func (cc *CeleryClient) DelayKwargs(task string, args map[string]interface{}) (*AsyncResult, error)
DelayKwargs gets asynchronous results with argument map
func (*CeleryClient) Register ¶
func (cc *CeleryClient) Register(name string, task interface{})
Register task
func (*CeleryClient) StartWorker ¶
func (cc *CeleryClient) StartWorker()
StartWorker starts celery workers
func (*CeleryClient) StopWorker ¶
func (cc *CeleryClient) StopWorker()
StopWorker stops celery workers
type CeleryDeliveryInfo ¶
type CeleryDeliveryInfo struct { Priority int `json:"priority"` RoutingKey string `json:"routing_key"` Exchange string `json:"Exchange"` }
CeleryDeliveryInfo represents deliveryinfo json
type CeleryLogger ¶
type CeleryLogger struct {
// contains filtered or unexported fields
}
func NewCeleryLogger ¶
func NewCeleryLogger() *CeleryLogger
func (*CeleryLogger) Debug ¶
func (c *CeleryLogger) Debug(args ...interface{})
func (*CeleryLogger) Debugf ¶
func (c *CeleryLogger) Debugf(format string, args ...interface{})
func (*CeleryLogger) Error ¶
func (c *CeleryLogger) Error(args ...interface{})
func (*CeleryLogger) Errorf ¶
func (c *CeleryLogger) Errorf(format string, args ...interface{})
func (*CeleryLogger) Fatal ¶
func (c *CeleryLogger) Fatal(args ...interface{})
func (*CeleryLogger) Fatalf ¶
func (c *CeleryLogger) Fatalf(format string, args ...interface{})
func (*CeleryLogger) Info ¶
func (c *CeleryLogger) Info(args ...interface{})
func (*CeleryLogger) Infof ¶
func (c *CeleryLogger) Infof(format string, args ...interface{})
func (*CeleryLogger) Panic ¶
func (c *CeleryLogger) Panic(args ...interface{})
func (*CeleryLogger) Panicf ¶
func (c *CeleryLogger) Panicf(format string, args ...interface{})
func (*CeleryLogger) Print ¶
func (c *CeleryLogger) Print(args ...interface{})
func (*CeleryLogger) Warn ¶
func (c *CeleryLogger) Warn(args ...interface{})
func (*CeleryLogger) Warnf ¶
func (c *CeleryLogger) Warnf(format string, args ...interface{})
type CeleryMessage ¶
type CeleryMessage struct { Body string `json:"body"` Headers map[string]interface{} `json:"headers"` ContentType string `json:"content-type"` Properties CeleryProperties `json:"properties"` ContentEncoding string `json:"content-encoding"` }
CeleryMessage is actual message to be sent to Redis
func (*CeleryMessage) GetTaskMessage ¶
func (cm *CeleryMessage) GetTaskMessage() *TaskMessage
GetTaskMessage retrieve and decode task messages from broker
type CeleryProperties ¶
type CeleryProperties struct { BodyEncoding string `json:"body_encoding"` CorrelationID string `json:"correlation_id"` ReplyTo string `json:"replay_to"` DeliveryInfo CeleryDeliveryInfo `json:"delivery_info"` DeliveryMode int `json:"delivery_mode"` DeliveryTag string `json:"delivery_tag"` }
CeleryProperties represents properties json
type CeleryTask ¶
type CeleryTask interface { // ParseKwargs - define a method to parse kwargs ParseKwargs(interface{}) error // RunTask - define a method to run RunTask() (interface{}, error) }
CeleryTask is an interface that represents actual task Passing CeleryTask interface instead of function pointer avoids reflection and may have performance gain. ResultMessage must be obtained using GetResultMessage()
type CeleryWorker ¶
type CeleryWorker struct {
// contains filtered or unexported fields
}
CeleryWorker represents distributed task worker
func NewCeleryWorker ¶
func NewCeleryWorker(broker CeleryBroker, backend CeleryBackend, numWorkers int) *CeleryWorker
NewCeleryWorker returns new celery worker
func (*CeleryWorker) GetNumWorkers ¶
func (w *CeleryWorker) GetNumWorkers() int
GetNumWorkers returns number of currently running workers
func (*CeleryWorker) GetTask ¶
func (w *CeleryWorker) GetTask(name string) interface{}
GetTask retrieves registered task
func (*CeleryWorker) Register ¶
func (w *CeleryWorker) Register(name string, task interface{})
Register registers tasks (functions)
func (*CeleryWorker) RunTask ¶
func (w *CeleryWorker) RunTask(message *TaskMessage) (*ResultMessage, error)
RunTask runs celery task
func (*CeleryWorker) StartWorker ¶
func (w *CeleryWorker) StartWorker()
StartWorker starts celery worker
func (*CeleryWorker) StopWorker ¶
func (w *CeleryWorker) StopWorker()
StopWorker stops celery workers
type DeliveredMessage ¶
type DeliveredMessage struct {
// contains filtered or unexported fields
}
type Logger ¶
type Logger interface { Debugf(format string, args ...interface{}) Infof(format string, args ...interface{}) Warnf(format string, args ...interface{}) Errorf(format string, args ...interface{}) Fatalf(format string, args ...interface{}) Panicf(format string, args ...interface{}) // Debug(args ...interface{}) Info(args ...interface{}) Print(args ...interface{}) Warn(args ...interface{}) Error(args ...interface{}) Fatal(args ...interface{}) Panic(args ...interface{}) }
type RedisCeleryBackend ¶
RedisCeleryBackend is CeleryBackend for Redis
func NewRedisCeleryBackend ¶
func NewRedisCeleryBackend(uri string) *RedisCeleryBackend
NewRedisCeleryBackend creates new RedisCeleryBackend
func (*RedisCeleryBackend) GetResult ¶
func (cb *RedisCeleryBackend) GetResult(taskID string) (*ResultMessage, error)
GetResult calls API to get asynchronous result Should be called by AsyncResult
func (*RedisCeleryBackend) SetResult ¶
func (cb *RedisCeleryBackend) SetResult(taskID string, result *ResultMessage) error
SetResult pushes result back into backend
type RedisCeleryBroker ¶
RedisCeleryBroker is CeleryBroker for Redis
func NewRedisCeleryBroker ¶
func NewRedisCeleryBroker(uri string) *RedisCeleryBroker
NewRedisCeleryBroker creates new RedisCeleryBroker based on given uri
func (*RedisCeleryBroker) GetCeleryMessage ¶
func (cb *RedisCeleryBroker) GetCeleryMessage() (*CeleryMessage, error)
GetCeleryMessage retrieves celery message from redis Queue
func (*RedisCeleryBroker) GetTaskMessage ¶
func (cb *RedisCeleryBroker) GetTaskMessage() (*TaskMessage, error)
GetTaskMessage retrieves task message from redis Queue
func (*RedisCeleryBroker) SendCeleryMessage ¶
func (cb *RedisCeleryBroker) SendCeleryMessage(message *CeleryMessage) error
SendCeleryMessage sends CeleryMessage to redis Queue
type ResultMessage ¶
type ResultMessage struct { ID string `json:"task_id"` Status string `json:"status"` Traceback interface{} `json:"traceback"` Result interface{} `json:"result"` Children []interface{} `json:"children"` }
ResultMessage is return message received from broker
type TaskMessage ¶
type TaskMessage struct { ID string `json:"id"` Task string `json:"task"` Args []interface{} `json:"args"` Kwargs interface{} `json:"kwargs"` Retries int `json:"retries"` ETA string `json:"eta"` }
TaskMessage is celery-compatible message
func DecodeTaskMessage ¶
func DecodeTaskMessage(encodedBody string) (*TaskMessage, error)
DecodeTaskMessage decodes base64 encrypted body and return TaskMessage object
func (*TaskMessage) Encode ¶
func (tm *TaskMessage) Encode() (string, error)
Encode returns base64 json encoded string