Documentation ¶
Overview ¶
Package celeriac is a package for interacting with Celery.
It provides functionality to place tasks on the task queue, as well as monitor task and worker events.
Index ¶
- Constants
- Variables
- func Fail(err error, msg string)
- func Log(err error, msg string)
- type Event
- type PingCmd
- type RateLimitTaskCmd
- type RevokeTaskCmd
- type Task
- type TaskEvent
- type TaskEventsList
- type TaskMonitor
- type TaskQueueMgr
- func (taskQueueMgr *TaskQueueMgr) Close()
- func (taskQueueMgr *TaskQueueMgr) DispatchTask(taskName string, taskData map[string]interface{}, routingKey string) (*Task, error)
- func (taskQueueMgr *TaskQueueMgr) DispatchTaskWithID(taskID string, taskName string, taskData map[string]interface{}, ...) (*Task, error)
- func (taskQueueMgr *TaskQueueMgr) Ping() error
- func (taskQueueMgr *TaskQueueMgr) RateLimitTask(taskName string, rateLimit string) error
- func (taskQueueMgr *TaskQueueMgr) RevokeTask(taskID string) error
- func (taskQueueMgr *TaskQueueMgr) TimeLimitTask(taskName string, hardLimit string, softLimit string) error
- type TimeLimitTaskCmd
- type WorkerEvent
Constants ¶
const ( // ConstPublishTaskContentType is the content type of the task data to be published ConstPublishTaskContentType = "application/json" // ConstPublishTaskContentEncoding is the content encoding type of the task data to be published ConstPublishTaskContentEncoding = "utf-8" // ConstTaskDefaultExchangeName is the default exchange name to use when publishing a task ConstTaskDefaultExchangeName = "" // ConstTaskDefaultRoutingKey is the default routing key to use when publishing a task ConstTaskDefaultRoutingKey = "celery" // ConstTaskControlExchangeName is the exchange name for dispatching task control commands ConstTaskControlExchangeName = "celery.pidbox" // ConstEventsMonitorExchangeName is the exchange name used for Celery events ConstEventsMonitorExchangeName = "celeryev" // ConstEventsMonitorExchangeType is the exchange type for the events monitor ConstEventsMonitorExchangeType = "topic" // ConstEventsMonitorQueueName is the queue name of the events monitor ConstEventsMonitorQueueName = "celeriac-events-monitor-queue" // ConstEventsMonitorBindingKey is the binding key for the events monitor ConstEventsMonitorBindingKey = "*.*" // ConstEventsMonitorConsumerTag is the consumer tag name for the events monitor ConstEventsMonitorConsumerTag = "celeriac-events-monitor" // ConstTimeFormat is the general format for all timestamps ConstTimeFormat = "2006-01-02T15:04:05.999999" // ConstEventTypeWorkerOnline is the event type when a Celery worker comes online ConstEventTypeWorkerOnline = "worker-online" // ConstEventTypeWorkerOffline is the event type when a Celery worker goes offline ConstEventTypeWorkerOffline = "worker-offline" // ConstEventTypeWorkerHeartbeat is the event type when a Celery worker is online and "alive" ConstEventTypeWorkerHeartbeat = "worker-heartbeat" // ConstEventTypeTaskSent is the event type when a Celery task is sent ConstEventTypeTaskSent = "task-sent" // ConstEventTypeTaskReceived is the event type when a Celery worker receives a task ConstEventTypeTaskReceived = "task-received" // ConstEventTypeTaskStarted is the event type when a Celery worker starts a task ConstEventTypeTaskStarted = "task-started" // ConstEventTypeTaskSucceeded is the event type when a Celery worker completes a task ConstEventTypeTaskSucceeded = "task-succeeded" // ConstEventTypeTaskFailed is the event type when a Celery worker fails to complete a task ConstEventTypeTaskFailed = "task-failed" // ConstEventTypeTaskRevoked is the event type when a Celery worker has its task revoked ConstEventTypeTaskRevoked = "task-revoked" // ConstEventTypeTaskRetried is the event type when a Celery worker retries a task ConstEventTypeTaskRetried = "task-retried" )
Variables ¶
var ( // ErrInvalidTaskID is raised when an invalid task ID has been detected ErrInvalidTaskID = errors.New("invalid task ID specified") // ErrInvalidTaskName is raised when an invalid task name has been detected ErrInvalidTaskName = errors.New("invalid task name specified") )
Global Errors
Functions ¶
Types ¶
type Event ¶
type Event struct { // Type is the Celery event type. See supported events listed in "constants.go" Type string `json:"type"` // Hostname is the name of the host on which the Celery worker is operating Hostname string `json:"hostname"` // Timestamp is the current time of the event Timestamp float32 `json:"timestamp"` // PID is the process ID PID int `json:"pid"` // Clock is the current clock time Clock int `json:"clock"` // UTCOffset is the offset from UTC for the time when this event is valid UTCOffset int `json:"utcoffset"` // Data is a property allowing extra data to be sent through for custom events from a Celery worker Data interface{} `json:"data, omitempty"` }
Event defines a base event emitted by Celery workers.
func (Event) MarshalEasyJSON ¶
MarshalEasyJSON supports easyjson.Marshaler interface
func (Event) MarshalJSON ¶
MarshalJSON supports json.Marshaler interface
func (*Event) TimestampFormatted ¶
TimestampFormatted returns a formatted string representation of the task event timestamp
func (*Event) UnmarshalEasyJSON ¶
UnmarshalEasyJSON supports easyjson.Unmarshaler interface
func (*Event) UnmarshalJSON ¶
UnmarshalJSON supports json.Unmarshaler interface
type PingCmd ¶
type PingCmd struct {
// contains filtered or unexported fields
}
PingCmd is a wrapper to a command
type RateLimitTaskCmd ¶
type RateLimitTaskCmd struct { Arguments rateLimitTaskArgs `json:"arguments"` // contains filtered or unexported fields }
RateLimitTaskCmd is a wrapper to a command
func NewRateLimitTaskCmd ¶
func NewRateLimitTaskCmd(taskName string, rateLimit string) *RateLimitTaskCmd
NewRateLimitTaskCmd creates a new command for rate limiting a task
taskName: Name of task to change rate limit for rateLimit: The rate limit as tasks per second, or a rate limit string (`"100/m"`, etc.
see :attr:`celery.task.base.Task.rate_limit` for more information)
type RevokeTaskCmd ¶
type RevokeTaskCmd struct { Arguments revokeTaskArgs `json:"arguments"` // contains filtered or unexported fields }
RevokeTaskCmd is a wrapper to a command
func NewRevokeTaskCmd ¶
func NewRevokeTaskCmd(taskID string, terminateProcess bool) *RevokeTaskCmd
NewRevokeTaskCmd creates a new command for revoking a task by given id
If a task is revoked, the workers will ignore the task and not execute it after all.
type Task ¶
type Task struct { // TaskName is the name of the task TaskName string `json:"task"` // ID is the task UUID ID string `json:"id"` // Args are task arguments (optional) Args []string `json:"args, omitempty"` // KWArgs are keyword arguments (optional) KWArgs map[string]interface{} `json:"kwargs, omitempty"` // Retries is a number of retries to perform if an error occurs (optional) Retries int `json:"retries, omitempty"` // ETA is the estimated completion time (optional) ETA *time.Time `json:"eta, omitempty"` // Expires is the time when this task will expire (optional) Expires *time.Time `json:"expires, omitempty"` }
Task is a representation of a Celery task
func NewTask ¶
NewTask is a factory function that creates and returns a pointer to a new task object
func NewTaskWithID ¶
func NewTaskWithID(taskID string, taskName string, args []string, kwargs map[string]interface{}) (*Task, error)
NewTaskWithID is a factory function that creates and returns a pointer to a new task object, allowing caller to specify the task ID.
func (*Task) MarshalJSON ¶
MarshalJSON marshals a Task object into a json bytes array
Time properties are converted to UTC and formatted in ISO8601
type TaskEvent ¶
type TaskEvent struct { Type string `json:"type"` Hostname string `json:"hostname"` Timestamp float32 `json:"timestamp"` PID int `json:"pid"` Clock int `json:"clock"` UTCOffset int `json:"utcoffset"` // UUID is the id of the task UUID string `json:"uuid"` // Name is the textual name of the task executed Name string `json:"name,omitempty"` // Args is a string of the arguments passed to the task Args string `json:"args,omitempty"` // Kwargs is a string of the key-word arguments passed to the task Kwargs string `json:"kwargs,omitempty"` // Runtime is the execution time Runtime float32 `json:"runtime,omitempty"` // Retries is the number of re-tries this task has performed Retries int `json:"retries,omitempty"` // ETA is the explicit time and date to run the retry at. ETA interface{} `json:"eta,omitempty"` // Expires is the datetime or seconds in the future for the task should expire Expires interface{} `json:"expires,omitempty"` // Terminated is a flag indicating whether the task has been terminated Terminated bool `json:"terminated,omitempty"` // Signum is the signal number Signum interface{} `json:"signum,omitempty"` // Expired is a flag indicating whether the task has expired due to factors Expired bool `json:"expired,omitempty"` // Queue may be set for a Celery task, as a rule cross-reference with RoutingKey Queue string `json:"queue,omitempty"` }
TaskEvent is the JSON schema for Celery task events
func NewTaskEvent ¶
func NewTaskEvent() *TaskEvent
NewTaskEvent is a factory function to create a new TaskEvent object
func (TaskEvent) MarshalEasyJSON ¶
MarshalEasyJSON supports easyjson.Marshaler interface
func (TaskEvent) MarshalJSON ¶
MarshalJSON supports json.Marshaler interface
func (*TaskEvent) UnmarshalEasyJSON ¶
UnmarshalEasyJSON supports easyjson.Unmarshaler interface
func (*TaskEvent) UnmarshalJSON ¶
UnmarshalJSON supports json.Unmarshaler interface
type TaskMonitor ¶
type TaskMonitor struct { // Public channel on which events are piped EventsChannel chan interface{} // contains filtered or unexported fields }
TaskMonitor is a Celery task event consumer
func NewTaskMonitor ¶
func NewTaskMonitor(connection *amqp.Connection, channel *amqp.Channel, exchangeName string, exchangeType string, queueName string, bindingKey string, ctag string) (*TaskMonitor, error)
NewTaskMonitor is a factory function that creates a new Celery consumer
func (*TaskMonitor) SetMonitorWorkerHeartbeatEvents ¶
func (monitor *TaskMonitor) SetMonitorWorkerHeartbeatEvents(processHeartbeatEvents bool)
SetMonitorWorkerHeartbeatEvents sets the property whether to process heartbeat events emitted by workers.
NOTE: By default this is set to 'false' so as to minimize unnecessary "noisy heartbeat" events.
func (*TaskMonitor) Shutdown ¶
func (monitor *TaskMonitor) Shutdown() error
Shutdown stops all monitoring, cleaning up any open connections
type TaskQueueMgr ¶
type TaskQueueMgr struct { Monitor *TaskMonitor // contains filtered or unexported fields }
TaskQueueMgr defines a manager for interacting with a Celery task queue
func NewTaskQueueMgr ¶
func NewTaskQueueMgr(brokerURI string) (*TaskQueueMgr, error)
NewTaskQueueMgr is a factory function that creates a new instance of the TaskQueueMgr
func (*TaskQueueMgr) Close ¶
func (taskQueueMgr *TaskQueueMgr) Close()
Close performs appropriate cleanup of any open task queue connections
func (*TaskQueueMgr) DispatchTask ¶
func (taskQueueMgr *TaskQueueMgr) DispatchTask(taskName string, taskData map[string]interface{}, routingKey string) (*Task, error)
DispatchTask places a new task on the Celery task queue Creates a new Task based on the supplied task name and data
func (*TaskQueueMgr) DispatchTaskWithID ¶
func (taskQueueMgr *TaskQueueMgr) DispatchTaskWithID(taskID string, taskName string, taskData map[string]interface{}, routingKey string) (*Task, error)
DispatchTaskWithID places a new task with the specified ID on the Celery task queue Creates a new Task based on the supplied task name and data
func (*TaskQueueMgr) Ping ¶
func (taskQueueMgr *TaskQueueMgr) Ping() error
Ping attempts to ping Celery workers
func (*TaskQueueMgr) RateLimitTask ¶
func (taskQueueMgr *TaskQueueMgr) RateLimitTask(taskName string, rateLimit string) error
RateLimitTask attempts to set rate limit tasks by type
func (*TaskQueueMgr) RevokeTask ¶
func (taskQueueMgr *TaskQueueMgr) RevokeTask(taskID string) error
RevokeTask attempts to notify Celery workers that the specified task needs revoking
func (*TaskQueueMgr) TimeLimitTask ¶
func (taskQueueMgr *TaskQueueMgr) TimeLimitTask(taskName string, hardLimit string, softLimit string) error
TimeLimitTask attempts to set time limits for task by type
type TimeLimitTaskCmd ¶
type TimeLimitTaskCmd struct { Arguments timeLimitTaskArgs `json:"arguments"` // contains filtered or unexported fields }
TimeLimitTaskCmd is a wrapper to a command
func NewTimeLimitTaskCmd ¶
func NewTimeLimitTaskCmd(taskName string, hardLimit string, softLimit string) *TimeLimitTaskCmd
NewTimeLimitTaskCmd creates a new command for rate limiting a task
taskName: Name of task to change rate limit for hardLimit: New hard time limit (in seconds) softLimit: New soft time limit (in seconds)
type WorkerEvent ¶
type WorkerEvent struct { Type string `json:"type"` Hostname string `json:"hostname"` Timestamp float32 `json:"timestamp"` PID int `json:"pid"` Clock int `json:"clock"` UTCOffset int `json:"utcoffset"` // SWSystem is the software system being used SWSystem string `json:"sw_sys"` // SWVersion is the software version being used SWVersion string `json:"sw_ver"` // LoadAverage is an array of average CPU loadings for the worker LoadAverage []float32 `json:"loadavg"` // Freq is the worker frequency use Freq float32 `json:"freq"` // SWIdentity is the software identity SWIdentity string `json:"sw_ident"` // Processed is the number of items processed Processed int `json:"processed, omitempt"` // Active is the active number of workers Active int `json:"active, omitempty"` }
WorkerEvent defines an event emitted by workers, specific to its operation. Event "types" emitted are:
- "worker-online"
- "worker-offline"
- "worker-heartbeat"
Example worker event json:
{ "sw_sys": "Darwin", "clock": 74, "timestamp": 1843965659.580637, "hostname": "celery@worker1.My-Mac.local", "pid": 10837, "sw_ver": "3.1.18", "utcoffset": -11, "loadavg": [2.0, 2.41, 2.54], "processed": 6, "active": 0, "freq": 2.0, "type": "worker-offline", "sw_ident": "py-celery" }
func NewWorkerEvent ¶
func NewWorkerEvent() *WorkerEvent
NewWorkerEvent is a factory function to create a new WorkerEvent object
func (WorkerEvent) MarshalEasyJSON ¶
func (v WorkerEvent) MarshalEasyJSON(w *jwriter.Writer)
MarshalEasyJSON supports easyjson.Marshaler interface
func (WorkerEvent) MarshalJSON ¶
func (v WorkerEvent) MarshalJSON() ([]byte, error)
MarshalJSON supports json.Marshaler interface
func (*WorkerEvent) UnmarshalEasyJSON ¶
func (v *WorkerEvent) UnmarshalEasyJSON(l *jlexer.Lexer)
UnmarshalEasyJSON supports easyjson.Unmarshaler interface
func (*WorkerEvent) UnmarshalJSON ¶
func (v *WorkerEvent) UnmarshalJSON(data []byte) error
UnmarshalJSON supports json.Unmarshaler interface