Documentation ¶
Overview ¶
Package celery helps to work with Celery (place tasks in queues and execute them).
Index ¶
- Constants
- type App
- type AsyncParam
- type Broker
- type Config
- type Middleware
- type Option
- func WithBroker(broker Broker) Option
- func WithCustomTaskSerializer(serializer protocol.Serializer, mime, encoding string) Option
- func WithLogger(logger log.Logger) Option
- func WithMaxWorkers(n int) Option
- func WithMiddlewares(chain ...Middleware) Option
- func WithTaskProtocol(version int) Option
- func WithTaskSerializer(mime string) Option
- type TaskF
- type TaskParam
- func (p *TaskParam) Args() []interface{}
- func (p *TaskParam) Get(name string) (v interface{}, ok bool)
- func (p *TaskParam) Kwargs() map[string]interface{}
- func (p *TaskParam) MustBool(name string) bool
- func (p *TaskParam) MustFloat(name string) float64
- func (p *TaskParam) MustInt(name string) int
- func (p *TaskParam) MustString(name string) string
- func (p *TaskParam) NameArgs(name ...string)
Examples ¶
Constants ¶
const ( // ContextKeyTaskName is a context key to access task names. ContextKeyTaskName contextKey = iota )
const DefaultMaxWorkers = 1000
DefaultMaxWorkers is the default upper limit of goroutines allowed to process Celery tasks. Note, the workers are launched only when there are tasks to process.
Let's say it takes ~5s to process a task on average, so 1000 goroutines should be able to handle 200 tasks per second (X = N / R = 1000 / 5) according to Little's law N = X * R.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type App ¶
type App struct {
// contains filtered or unexported fields
}
App is a Celery app to produce or consume tasks asynchronously.
func NewApp ¶
NewApp creates a Celery app. The default broker is Redis assumed to run on localhost. When producing tasks the default message serializer is json and protocol is v2.
func (*App) ApplyAsync ¶ added in v0.0.4
func (a *App) ApplyAsync(path, queue string, p *AsyncParam) error
ApplyAsync sends a task message.
func (*App) Delay ¶
Delay is a shortcut to send a task message, i.e., it places the task associated with given Python path into queue.
func (*App) Register ¶
Register associates the task with given Python path and queue. For example, when "myproject.apps.myapp.tasks.mytask" is seen in "important" queue, the TaskF task is executed.
Note, the method is not concurrency safe. The tasks mustn't be registered after the app starts processing tasks.
type AsyncParam ¶ added in v0.0.4
type AsyncParam struct { // Args is a list of arguments. // It will be an empty list if not provided. Args []interface{} // Kwargs is a dictionary of keyword arguments. // It will be an empty dictionary if not provided. Kwargs map[string]interface{} // Expires is an expiration date. // If not provided the message will never expire. Expires time.Time }
AsyncParam represents parameters for sending a task message.
type Broker ¶
type Broker interface { // Send puts a message to a queue. // Note, the method is safe to call concurrently. Send(msg []byte, queue string) error // Observe sets the queues from which the tasks should be received. // Note, the method is not concurrency safe. Observe(queues []string) // Receive returns a raw message from one of the queues. // It blocks until there is a message available for consumption. // Note, the method is not concurrency safe. Receive() ([]byte, error) }
Broker is responsible for receiving and sending task messages. For example, it knows how to read a message from a given queue in Redis. The messages can be in defferent formats depending on Celery protocol version.
type Config ¶
type Config struct {
// contains filtered or unexported fields
}
Config represents Celery settings.
type Middleware ¶ added in v0.0.3
Middleware is a chainable behavior modifier for tasks. For example, a caller can collect task metrics.
type Option ¶
type Option func(*Config)
Option sets up a Config.
func WithBroker ¶
WithBroker allows a caller to replace the default broker.
func WithCustomTaskSerializer ¶
func WithCustomTaskSerializer(serializer protocol.Serializer, mime, encoding string) Option
WithCustomTaskSerializer registers a custom serializer where mime is the mime-type describing the serialized structure, e.g., application/json, and encoding is the content encoding which is usually utf-8 or binary.
func WithMaxWorkers ¶
WithMaxWorkers sets an upper limit of goroutines allowed to process Celery tasks.
func WithMiddlewares ¶ added in v0.0.3
func WithMiddlewares(chain ...Middleware) Option
WithMiddlewares sets a chain of task middlewares. The first middleware is treated as the outermost middleware.
func WithTaskProtocol ¶
WithTaskProtocol sets the default task message protocol version used to send tasks. It is equivalent to CELERY_TASK_PROTOCOL in Python.
func WithTaskSerializer ¶
WithTaskSerializer sets a serializer mime-type, e.g., the message's body is encoded in JSON when a task is sent to the broker. It is equivalent to CELERY_TASK_SERIALIZER in Python.
type TaskF ¶
TaskF represents a Celery task implemented by the client. The error doesn't affect anything, it's logged though.
type TaskParam ¶
type TaskParam struct {
// contains filtered or unexported fields
}
TaskParam provides access to task's positional and keyword arguments. A task function might not know upfront how parameters will be supplied from the caller. They could be passed as positional arguments f(2, 3), keyword arguments f(a=2, b=3) or a mix of both f(2, b=3). In this case the arguments should be named and accessed by name, see NameArgs and Get methods.
Methods prefixed with Must panic if they can't find an argument name or can't cast it to the corresponding type. The panic is logged by a worker and it doesn't affect other tasks.
Example ¶
var ( args = []interface{}{2} kwargs = map[string]interface{}{"b": 3} ) p := NewTaskParam(args, kwargs) p.NameArgs("a", "b") fmt.Println(p.Get("a")) fmt.Println(p.Get("b")) fmt.Println(p.Get("c"))
Output: 2 true 3 true <nil> false
func NewTaskParam ¶
NewTaskParam returns a task param which facilitates access to args and kwargs.
func (*TaskParam) Args ¶
func (p *TaskParam) Args() []interface{}
Args returns task's positional arguments.
func (*TaskParam) Get ¶
Get returns a parameter by name. Firstly it tries to look it up in Kwargs, and then in Args if their names were provided by the client.
func (*TaskParam) MustBool ¶
MustBool looks up a parameter by name and casts it to boolean. It panics if a parameter is missing or of a wrong type.
func (*TaskParam) MustFloat ¶
MustFloat looks up a parameter by name and casts it to float. It panics if a parameter is missing or of a wrong type.
func (*TaskParam) MustInt ¶
MustInt looks up a parameter by name and casts it to integer. It panics if a parameter is missing or of a wrong type.
func (*TaskParam) MustString ¶
MustString looks up a parameter by name and casts it to string. It panics if a parameter is missing or of a wrong type.
Directories ¶
Path | Synopsis |
---|---|
examples
module
|
|
Package goredis implements a Celery broker using Redis and https://github.com/redis/go-redis.
|
Package goredis implements a Celery broker using Redis and https://github.com/redis/go-redis. |
internal
|
|
Package protocol provides means to encode/decode task messages as described in https://github.com/celery/celery/blob/master/docs/internals/protocol.rst.
|
Package protocol provides means to encode/decode task messages as described in https://github.com/celery/celery/blob/master/docs/internals/protocol.rst. |
Package redis implements a Celery broker using Redis and github.com/gomodule/redigo.
|
Package redis implements a Celery broker using Redis and github.com/gomodule/redigo. |