celery

package module
v0.0.0-...-e42251a Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 21, 2015 License: BSD-3-Clause Imports: 14 Imported by: 0

README

GOCELERY

golang workers for celery.

Build Status GoDoc

In development, do not use unless you want to contribute. It won't do anything as is.

Documentation

Overview

Golang workers implementation for the Celery distributed task queue http://www.celeryproject.org/.

types package provides common types for celery.

Index

Constants

View Source
const (
	// Statuses
	PENDING  = State("PENDING")  // Task state is unknown (assumed pending since you know the id).
	RECEIVED = State("RECEIVED") // Task was received by a worker.
	STARTED  = State("STARTED")  // Task was started by a worker (:setting:`CELERY_TRACK_STARTED`).
	SUCCESS  = State("SUCCESS")  // Task succeeded
	FAILURE  = State("FAILURE")  // Task failed
	REVOKED  = State("REVOKED")  // Task was revoked.
	RETRY    = State("RETRY")    // Task is waiting for retry.
	IGNORED  = State("IGNORED")
	REJECTED = State("REJECTED")
)

Variables

This section is empty.

Functions

func ContextFromMessage

func ContextFromMessage(parent context.Context, msg Message) context.Context

ContextFromMessage prepares a context from a parent context and a message.

func RegisterMessageDecoder

func RegisterMessageDecoder(contentType string, decoder DecoderFunc)

RegisterMessageDecoder registers a DecoderFunc function for a given content type.

func RetryNTimes

func RetryNTimes(n int, ctx context.Context, err error, delay time.Duration) error

RetryNTimes is helper function that return a Retryable error if tried less than n times. It will retry in delay power the number of previous tries.

Types

type Backend

type Backend interface {
	Publish(Task, *ResultMeta)
}

Backend is the interface for publishers of tasks results.

type Config

type Config struct {
	BrokerURL          string
	CelerydConcurrency int
	CeleryAcksLate     bool
}

func ConfigFromEnv

func ConfigFromEnv() Config

type DecoderFunc

type DecoderFunc func([]byte) (Message, error)

DecoderFunc is a func that parses bytes and return a Message

type DiscardBackend

type DiscardBackend struct{}

DiscardBackend is a Backend that doesn't anything

func (DiscardBackend) Publish

func (DiscardBackend) Publish(Task, *ResultMeta)

Publish implements the Backend interface.

type HandleFunc

type HandleFunc func(context.Context, []interface{}, map[string]interface{}) (interface{}, error)

HandleFunc is the type for function that run tasks and return their results.

type Message

type Message struct {
	Task    string
	ID      string
	Args    []interface{}
	KwArgs  map[string]interface{}
	Retries int
	ETA     time.Time
	Expires time.Time
}

Message v1 as described at http://celery.readthedocs.org/en/latest/internals/protocol.html

func DecodeMessage

func DecodeMessage(contentType string, p []byte) (Message, error)

DecodeMessage decodes a message using registered decoders.

func MsgFromContext

func MsgFromContext(ctx context.Context) Message

MsgFromContext can be called within task function to get the celery message from the context.

type Result

type Result interface{}

Result is the result type returned by tasks. The result encoder should be able to encode it.

type ResultMeta

type ResultMeta struct {
	Status    State  `json:"status"`
	Result    Result `json:"result"`
	Traceback string `json:"trackback"`
	TaskId    string `json:"task_id"`
}

type RetryError

type RetryError struct {
	// contains filtered or unexported fields
}

RetryError is a Retryable error implementation that wraps other errors.

func (*RetryError) At

func (re *RetryError) At() time.Time

At implements the Retryable interface. It is the time

type Retryable

type Retryable interface {
	error
	At() time.Time // the time at which the task should be retried.
}

Retryable is the interface for retryable errors.

func Again

func Again(reason string, delay time.Duration) Retryable

Again is a helper function to retry a task with a reason after delay.

func Retry

func Retry(err error, delay time.Duration) Retryable

Retry is a helper function to retry a task on error after delay.

type Scheduler

type Scheduler struct {
	// contains filtered or unexported fields
}

Scheduler pull tasks from a subscriber and publish them when their ETA is reached.

func NewScheduler

func NewScheduler(sub Subscriber) *Scheduler

NewScheduler returns a new scheduler pulling tasks from sub.

func (*Scheduler) Close

func (s *Scheduler) Close() error

Close implements the subscriber interface. It stops publishing new tasks.

func (*Scheduler) Publish

func (s *Scheduler) Publish(eta time.Time, t Task)

Publish publishes a task at the given ETA.

func (*Scheduler) Subscribe

func (s *Scheduler) Subscribe() <-chan Task

Subscribe implements the Subscriber interface.

type State

type State string

type Subscriber

type Subscriber interface {
	Subscribe() <-chan Task
	Close() error
}

Subscriber is the interface components that produces tasks.

type Task

type Task interface {
	context.Context
	Msg() Message
	Ack() error
	Reject(requeue bool) error
}

Task is the interface for tasks.

type Worker

type Worker struct {
	// contains filtered or unexported fields
}

Worker runs tasks and publish their results.

func NewWorker

func NewWorker(concurrency int, sub Subscriber, backend Backend, retry *Scheduler) *Worker

NewWorker returns a new worker.

concurrency is the number of concurrent goroutines that run tasks.

sub is the subscriber from which the tasks are coming (usually a Scheduler)

Results are published to backend.

retry is a Scheduler used for tasks that are retried after some time (usually same as sub). It can be nil, in which case the tasks are not retried.

func (*Worker) Close

func (w *Worker) Close() error

Close closes the worker, it stops processing new tasks.

func (*Worker) Register

func (w *Worker) Register(name string, h HandleFunc)

Register registers a HandleFunc function with the given name. It should be used before Start.

func (*Worker) RegisterFunc

func (w *Worker) RegisterFunc(name string, fn interface{})

RegisterFunc registers a function that must have a golang.org/x/context.Context as first argument. Other arguments will be passed from the message arguments. The return must be some kind of (interface{}, error). First returned argument must be serializable by the backend to publish the result.

w.RegisterFunc("tasks.add", func(ctx context.Context, a float64, b float64) (float64, error) {
	return a + b, nil
})

It should be used before start.

func (*Worker) Start

func (w *Worker) Start()

Start starts the worker. It should be called after task registration is complete.

func (*Worker) Wait

func (w *Worker) Wait()

Directories

Path Synopsis
Package amqpbackend provides an AMQP result backend implementation.
Package amqpbackend provides an AMQP result backend implementation.
Package amqpconsumer implements a Subscriber that pulls messages from AMQP.
Package amqpconsumer implements a Subscriber that pulls messages from AMQP.
Package amqputil provides utilities to work with http://github.com/streadway/amqp/ package.
Package amqputil provides utilities to work with http://github.com/streadway/amqp/ package.
Package json implements decoding celery json message.
Package json implements decoding celery json message.
Package server provides utilities to run celery workers.
Package server provides utilities to run celery workers.
Package syncutil provides various concurrency mechanisms.
Package syncutil provides various concurrency mechanisms.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL