gocelery: github.com/shicky/gocelery Index | Examples | Files

package gocelery

import "github.com/shicky/gocelery"

Package gocelery is Celery Distributed Task Queue in Go

Celery distributed tasks are used heavily in many python web applications and this library allows you to implement celery workers in Go as well as being able to submit celery tasks in Go.

This package can also be used as pure go distributed task queue.

Supported brokers/backends

* Redis (broker/backend)
* AMQP (broker/backend)

Celery must be configured to use json instead of default pickle encoding. This is because Go currently has no stable support for decoding pickle objects. Pass below configuration parameters to use json.

CELERY_TASK_SERIALIZER='json'
CELERY_ACCEPT_CONTENT=['json']  # Ignore other content
CELERY_RESULT_SERIALIZER='json'
CELERY_ENABLE_UTC=True

Code:


// initialize celery client
cli, _ := NewCeleryClient(
    NewRedisCeleryBroker("redis://"),
    NewRedisCeleryBackend("redis://"),
    1,
)

// prepare arguments
taskName := "worker.add"
argA := rand.Intn(10)
argB := rand.Intn(10)

// run task
asyncResult, err := cli.Delay(taskName, argA, argB)
if err != nil {
    panic(err)
}

// get results from backend with timeout
res, err := asyncResult.Get(10 * time.Second)
if err != nil {
    panic(err)
}

log.Printf("result: %+v of type %+v", res, reflect.TypeOf(res))

Code:


// initialize celery client
cli, _ := NewCeleryClient(
    NewRedisCeleryBroker("redis://"),
    NewRedisCeleryBackend("redis://"),
    1,
)

// prepare arguments
taskName := "worker.add"
argA := rand.Intn(10)
argB := rand.Intn(10)

// run task
asyncResult, err := cli.DelayKwargs(
    taskName,
    map[string]interface{}{
        "a": argA,
        "b": argB,
    },
)
if err != nil {
    panic(err)
}

// get results from backend with timeout
res, err := asyncResult.Get(10 * time.Second)
if err != nil {
    panic(err)
}

log.Printf("result: %+v of type %+v", res, reflect.TypeOf(res))

Code:


// initialize celery client
cli, _ := NewCeleryClient(
    NewRedisCeleryBroker("redis://"),
    NewRedisCeleryBackend("redis://"),
    5,  // number of workers
)

// task
add := func(a, b int) int {
    return a + b
}

// register task
cli.Register("add", add)

// start workers (non-blocking call)
cli.StartWorker()

// wait for client request
time.Sleep(10 * time.Second)

// stop workers gracefully (blocking call)
cli.StopWorker()

Code:


// initialize celery client
cli, _ := NewCeleryClient(
    NewRedisCeleryBroker("redis://"),
    NewRedisCeleryBackend("redis://"),
    1,
)

// task
add := func(a, b int) int {
    return a + b
}

// register task
cli.Register("add", add)

// context with cancelFunc to handle exit gracefully
ctx, cancel := context.WithCancel(context.Background())

// start workers (non-blocking call)
cli.StartWorkerWithContext(ctx)

// wait for client request
time.Sleep(10 * time.Second)

// stop workers by cancelling context
cancel()

// optional: wait for all workers to terminate
cli.WaitForStopWorker()

Code:

package main

import (
    "fmt"
    "time"
)

// exampleAddTask is integer addition task
// with named arguments
type exampleAddTask struct {
    a   int
    b   int
}

func (a *exampleAddTask) ParseKwargs(kwargs map[string]interface{}) error {
    kwargA, ok := kwargs["a"]
    if !ok {
        return fmt.Errorf("undefined kwarg a")
    }
    kwargAFloat, ok := kwargA.(float64)
    if !ok {
        return fmt.Errorf("malformed kwarg a")
    }
    a.a = int(kwargAFloat)
    kwargB, ok := kwargs["b"]
    if !ok {
        return fmt.Errorf("undefined kwarg b")
    }
    kwargBFloat, ok := kwargB.(float64)
    if !ok {
        return fmt.Errorf("malformed kwarg b")
    }
    a.b = int(kwargBFloat)
    return nil
}

func (a *exampleAddTask) RunTask() (interface{}, error) {
    result := a.a + a.b
    return result, nil
}

func main() {

    // initialize celery client
    cli, _ := NewCeleryClient(
        NewRedisCeleryBroker("redis://"),
        NewRedisCeleryBackend("redis://"),
        5,  // number of workers
    )

    // register task
    cli.Register("add", &exampleAddTask{})

    // start workers (non-blocking call)
    cli.StartWorker()

    // wait for client request
    time.Sleep(10 * time.Second)

    // stop workers gracefully (blocking call)
    cli.StopWorker()
}

Index

Examples

Package Files

amqp.go amqp_backend.go amqp_broker.go convert.go doc.go gocelery.go message.go redis_backend.go redis_broker.go worker.go

func GetRealValue Uses

func GetRealValue(val *reflect.Value) interface{}

GetRealValue returns real value of reflect.Value Required for JSON Marshalling

func NewAMQPConnection Uses

func NewAMQPConnection(host string) (*amqp.Connection, *amqp.Channel)

NewAMQPConnection creates new AMQP channel

func NewRedisPool Uses

func NewRedisPool(uri string) *redis.Pool

NewRedisPool creates pool of redis connections from given connection string

type AMQPCeleryBackend Uses

type AMQPCeleryBackend struct {
    *amqp.Channel
    // contains filtered or unexported fields
}

AMQPCeleryBackend CeleryBackend for AMQP

func NewAMQPCeleryBackend Uses

func NewAMQPCeleryBackend(host string) *AMQPCeleryBackend

NewAMQPCeleryBackend creates new AMQPCeleryBackend

func NewAMQPCeleryBackendByConnAndChannel Uses

func NewAMQPCeleryBackendByConnAndChannel(conn *amqp.Connection, channel *amqp.Channel) *AMQPCeleryBackend

NewAMQPCeleryBackendByConnAndChannel creates new AMQPCeleryBackend by AMQP connection and channel

func (*AMQPCeleryBackend) GetResult Uses

func (b *AMQPCeleryBackend) GetResult(taskID string) (*ResultMessage, error)

GetResult retrieves result from AMQP queue

func (*AMQPCeleryBackend) Reconnect Uses

func (b *AMQPCeleryBackend) Reconnect()

Reconnect reconnects to AMQP server

func (*AMQPCeleryBackend) SetResult Uses

func (b *AMQPCeleryBackend) SetResult(taskID string, result *ResultMessage) error

SetResult sets result back to AMQP queue

type AMQPCeleryBroker Uses

type AMQPCeleryBroker struct {
    *amqp.Channel
    // contains filtered or unexported fields
}

AMQPCeleryBroker is RedisBroker for AMQP

func NewAMQPCeleryBroker Uses

func NewAMQPCeleryBroker(host string) *AMQPCeleryBroker

NewAMQPCeleryBroker creates new AMQPCeleryBroker

func NewAMQPCeleryBrokerByConnAndChannel Uses

func NewAMQPCeleryBrokerByConnAndChannel(conn *amqp.Connection, channel *amqp.Channel) *AMQPCeleryBroker

NewAMQPCeleryBrokerByConnAndChannel creates new AMQPCeleryBroker using AMQP conn and channel

func (*AMQPCeleryBroker) CreateExchange Uses

func (b *AMQPCeleryBroker) CreateExchange() error

CreateExchange declares AMQP exchange with stored configuration

func (*AMQPCeleryBroker) CreateQueue Uses

func (b *AMQPCeleryBroker) CreateQueue() error

CreateQueue declares AMQP Queue with stored configuration

func (*AMQPCeleryBroker) GetTaskMessage Uses

func (b *AMQPCeleryBroker) GetTaskMessage() (*TaskMessage, error)

GetTaskMessage retrieves task message from AMQP queue

func (*AMQPCeleryBroker) SendCeleryMessage Uses

func (b *AMQPCeleryBroker) SendCeleryMessage(message *CeleryMessage) error

SendCeleryMessage sends CeleryMessage to broker

func (*AMQPCeleryBroker) StartConsumingChannel Uses

func (b *AMQPCeleryBroker) StartConsumingChannel() error

StartConsumingChannel spawns receiving channel on AMQP queue

type AMQPExchange Uses

type AMQPExchange struct {
    Name       string
    Type       string
    Durable    bool
    AutoDelete bool
}

AMQPExchange stores AMQP Exchange configuration

func NewAMQPExchange Uses

func NewAMQPExchange(name string) *AMQPExchange

NewAMQPExchange creates new AMQPExchange

type AMQPQueue Uses

type AMQPQueue struct {
    Name       string
    Durable    bool
    AutoDelete bool
}

AMQPQueue stores AMQP Queue configuration

func NewAMQPQueue Uses

func NewAMQPQueue(name string) *AMQPQueue

NewAMQPQueue creates new AMQPQueue

type AsyncResult Uses

type AsyncResult struct {
    TaskID string
    // contains filtered or unexported fields
}

AsyncResult represents pending result

func (*AsyncResult) AsyncGet Uses

func (ar *AsyncResult) AsyncGet() (interface{}, error)

AsyncGet gets actual result from backend and returns nil if not available

func (*AsyncResult) Get Uses

func (ar *AsyncResult) Get(timeout time.Duration) (interface{}, error)

Get gets actual result from backend It blocks for period of time set by timeout and returns error if unavailable

func (*AsyncResult) Ready Uses

func (ar *AsyncResult) Ready() (bool, error)

Ready checks if actual result is ready

type CeleryBackend Uses

type CeleryBackend interface {
    GetResult(string) (*ResultMessage, error) // must be non-blocking
    SetResult(taskID string, result *ResultMessage) error
}

CeleryBackend is interface for celery backend database

type CeleryBroker Uses

type CeleryBroker interface {
    SendCeleryMessage(*CeleryMessage) error
    GetTaskMessage() (*TaskMessage, error) // must be non-blocking
}

CeleryBroker is interface for celery broker database

type CeleryClient Uses

type CeleryClient struct {
    // contains filtered or unexported fields
}

CeleryClient provides API for sending celery tasks

func NewCeleryClient Uses

func NewCeleryClient(broker CeleryBroker, backend CeleryBackend, numWorkers int) (*CeleryClient, error)

NewCeleryClient creates new celery client

func (*CeleryClient) Delay Uses

func (cc *CeleryClient) Delay(task string, args ...interface{}) (*AsyncResult, error)

Delay gets asynchronous result

func (*CeleryClient) DelayKwargs Uses

func (cc *CeleryClient) DelayKwargs(task string, args map[string]interface{}) (*AsyncResult, error)

DelayKwargs gets asynchronous results with argument map

func (*CeleryClient) Register Uses

func (cc *CeleryClient) Register(name string, task interface{})

Register task

func (*CeleryClient) StartWorker Uses

func (cc *CeleryClient) StartWorker()

StartWorker starts celery workers

func (*CeleryClient) StartWorkerWithContext Uses

func (cc *CeleryClient) StartWorkerWithContext(ctx context.Context)

StartWorkerWithContext starts celery workers with given parent context

func (*CeleryClient) StopWorker Uses

func (cc *CeleryClient) StopWorker()

StopWorker stops celery workers

func (*CeleryClient) WaitForStopWorker Uses

func (cc *CeleryClient) WaitForStopWorker()

WaitForStopWorker waits for celery workers to terminate

type CeleryDeliveryInfo Uses

type CeleryDeliveryInfo struct {
    Priority   int    `json:"priority"`
    RoutingKey string `json:"routing_key"`
    Exchange   string `json:"exchange"`
}

CeleryDeliveryInfo represents deliveryinfo json

type CeleryMessage Uses

type CeleryMessage struct {
    Body            string                 `json:"body"`
    Headers         map[string]interface{} `json:"headers,omitempty"`
    ContentType     string                 `json:"content-type"`
    Properties      CeleryProperties       `json:"properties"`
    ContentEncoding string                 `json:"content-encoding"`
}

CeleryMessage is actual message to be sent to Redis

func (*CeleryMessage) GetTaskMessage Uses

func (cm *CeleryMessage) GetTaskMessage() *TaskMessage

GetTaskMessage retrieve and decode task messages from broker

type CeleryProperties Uses

type CeleryProperties struct {
    BodyEncoding  string             `json:"body_encoding"`
    CorrelationID string             `json:"correlation_id"`
    ReplyTo       string             `json:"reply_to"`
    DeliveryInfo  CeleryDeliveryInfo `json:"delivery_info"`
    DeliveryMode  int                `json:"delivery_mode"`
    DeliveryTag   string             `json:"delivery_tag"`
}

CeleryProperties represents properties json

type CeleryTask Uses

type CeleryTask interface {

    // ParseKwargs - define a method to parse kwargs
    ParseKwargs(map[string]interface{}) error

    // RunTask - define a method for execution
    RunTask() (interface{}, error)
}

CeleryTask is an interface that represents actual task Passing CeleryTask interface instead of function pointer avoids reflection and may have performance gain. ResultMessage must be obtained using GetResultMessage()

type CeleryWorker Uses

type CeleryWorker struct {
    // contains filtered or unexported fields
}

CeleryWorker represents distributed task worker

func NewCeleryWorker Uses

func NewCeleryWorker(broker CeleryBroker, backend CeleryBackend, numWorkers int) *CeleryWorker

NewCeleryWorker returns new celery worker

func (*CeleryWorker) GetNumWorkers Uses

func (w *CeleryWorker) GetNumWorkers() int

GetNumWorkers returns number of currently running workers

func (*CeleryWorker) GetTask Uses

func (w *CeleryWorker) GetTask(name string) interface{}

GetTask retrieves registered task

func (*CeleryWorker) Register Uses

func (w *CeleryWorker) Register(name string, task interface{})

Register registers tasks (functions)

func (*CeleryWorker) RunTask Uses

func (w *CeleryWorker) RunTask(message *TaskMessage) (*ResultMessage, error)

RunTask runs celery task

func (*CeleryWorker) StartWorker Uses

func (w *CeleryWorker) StartWorker()

StartWorker starts celery workers

func (*CeleryWorker) StartWorkerWithContext Uses

func (w *CeleryWorker) StartWorkerWithContext(ctx context.Context)

StartWorkerWithContext starts celery worker(s) with given parent context

func (*CeleryWorker) StopWait Uses

func (w *CeleryWorker) StopWait()

StopWait waits for celery workers to terminate

func (*CeleryWorker) StopWorker Uses

func (w *CeleryWorker) StopWorker()

StopWorker stops celery workers

type RedisCeleryBackend Uses

type RedisCeleryBackend struct {
    *redis.Pool
}

RedisCeleryBackend is celery backend for redis

func NewRedisCeleryBackend Uses

func NewRedisCeleryBackend(uri string) *RedisCeleryBackend

NewRedisCeleryBackend creates new RedisCeleryBackend

func (*RedisCeleryBackend) GetResult Uses

func (cb *RedisCeleryBackend) GetResult(taskID string) (*ResultMessage, error)

GetResult queries redis backend to get asynchronous result

func (*RedisCeleryBackend) SetResult Uses

func (cb *RedisCeleryBackend) SetResult(taskID string, result *ResultMessage) error

SetResult pushes result back into redis backend

type RedisCeleryBroker Uses

type RedisCeleryBroker struct {
    *redis.Pool
    // contains filtered or unexported fields
}

RedisCeleryBroker is celery broker for redis

func NewRedisCeleryBroker Uses

func NewRedisCeleryBroker(uri string) *RedisCeleryBroker

NewRedisCeleryBroker creates new RedisCeleryBroker based on given uri

func (*RedisCeleryBroker) GetCeleryMessage Uses

func (cb *RedisCeleryBroker) GetCeleryMessage() (*CeleryMessage, error)

GetCeleryMessage retrieves celery message from redis queue

func (*RedisCeleryBroker) GetTaskMessage Uses

func (cb *RedisCeleryBroker) GetTaskMessage() (*TaskMessage, error)

GetTaskMessage retrieves task message from redis queue

func (*RedisCeleryBroker) SendCeleryMessage Uses

func (cb *RedisCeleryBroker) SendCeleryMessage(message *CeleryMessage) error

SendCeleryMessage sends CeleryMessage to redis queue

type ResultMessage Uses

type ResultMessage struct {
    ID        string        `json:"task_id"`
    Status    string        `json:"status"`
    Traceback interface{}   `json:"traceback"`
    Result    interface{}   `json:"result"`
    Children  []interface{} `json:"children"`
}

ResultMessage is return message received from broker

type TaskMessage Uses

type TaskMessage struct {
    ID      string                 `json:"id"`
    Task    string                 `json:"task"`
    Args    []interface{}          `json:"args"`
    Kwargs  map[string]interface{} `json:"kwargs"`
    Retries int                    `json:"retries"`
    ETA     *string                `json:"eta"`
}

TaskMessage is celery-compatible message

func DecodeTaskMessage Uses

func DecodeTaskMessage(encodedBody string) (*TaskMessage, error)

DecodeTaskMessage decodes base64 encrypted body and return TaskMessage object

func (*TaskMessage) Encode Uses

func (tm *TaskMessage) Encode() (string, error)

Encode returns base64 json encoded string

Package gocelery imports 12 packages (graph). Updated 2019-04-19. Refresh now. Tools for package owners.