celeriac

package module
v0.0.0-...-688a651 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 16, 2020 License: MIT Imports: 12 Imported by: 1

README

Celeriac

Golang client library for adding support for interacting and monitoring Celery workers and tasks.

It provides functionality to place tasks on the task queue, as well as monitor both task and worker events.

Dependencies

This library depends upon the following packages:

  • github.com/streadway/amqp
  • github.com/sirupsen/logrus
  • github.com/nu7hatch/gouuid
  • github.com/mailru/easyjson

Install easyjson

$ go get -u github.com/mailru/easyjson/...

Usage

Installation: go get github.com/svcavallar/celeriac.v1

This imports a new namespace called celeriac

package main

import (
	"log"
	"os"

	"github.com/svcavallar/celeriac.v1"
)

func main() {
	taskBrokerURI := "amqp://user:pass@localhost:5672/vhost"

	// Connect to RabbitMQ task queue
	TaskQueueMgr, err := celeriac.NewTaskQueueMgr(taskBrokerURI)
	if err != nil {
		log.Printf("Failed to connect to task queue: %v", err)
		os.Exit(-1)
	}

	log.Printf("Service connected to task queue - (URL: %s)", taskBrokerURI)

	// Go routine to monitor the Celery events emitted on the celeriac events channel
	go func() {
        for {
            select {
            default:
                ev := <-TaskQueueMgr.Monitor.EventsChannel

                if ev != nil {

                    if x, ok := ev.(*celeriac.WorkerEvent); ok {
                        log.Printf("Celery Event Channel: Worker event - %s [Hostname]: %s", x.Type, x.Hostname)
                    } else if x, ok := ev.(*celeriac.TaskEvent); ok {
                        log.Printf("Celery Event Channel: Task event - %s [ID]: %s", x.Type, x.UUID)
                    } else if x, ok := ev.(*celeriac.Event); ok {
                        log.Printf("Celery Event Channel: General event - %s [Hostname]: %s - [Data]: %v", x.Type, x.Hostname, x.Data)
                    } else {
                        log.Printf("Celery Event Channel: Unhandled event: %v", ev)
                    }
                }
            }
        }
	}()
}

Dispatching Tasks

By Name

This will create and dispatch a task incorporating the supplied data. The task will automatically be allocated and identified by a UUID returned in the task object. The UUID is represented in the form of "6ba7b810-9dad-11d1-80b4-00c04fd430c8".

// Dispatch a new task
taskName := "root.test.task"
taskData := map[string]interface{}{
	"foo": "bar"
}
routingKey := "root.test"

task, err := TaskQueueMgr.DispatchTask(taskName, taskData, routingKey)
if err != nil {
	log.Errorf("Failed to dispatch task to queue: %v", err)
}
By ID & Name

This will create and dispatch a task incorporating the supplied data, and identified by the user-supplied task identifier.

// Dispatch a new task
taskID := "my_task_id_123456789"
taskName := "root.test.task"
taskData := map[string]interface{}{
	"foo": "bar"
}
routingKey := "root.test"

task, err := TaskQueueMgr.DispatchTaskWithID(taskID, taskName, taskData, routingKey)
if err != nil {
	log.Errorf("Failed to dispatch task to queue: %v", err)
}

Modifying task_event.go

If you modify the properties of any the structs in task_event.go you will need to re-generate the easyjson version of this file. This is easily achieved by issuing the following command:

$ easyjson -all task_eventtest.go

Processing Redis Backend Result Automatically

If you are using a Redis backend for storing results you can easily process new/updated entries by subscribing to Redis keyspace events. This will save polling for results, and is made convenient to integrate by using my golang helper package go-redis-event-sink, available at the repo https://github.com/svcavallar/go-redis-event-sink

An example on how to use this is provided within the repository. Essentially, just provide it with the Celery task naming mask patten to watch: celery-task-meta-*

Documentation

Overview

Package celeriac is a package for interacting with Celery.

It provides functionality to place tasks on the task queue, as well as monitor task and worker events.

Index

Constants

View Source
const (
	// ConstPublishTaskContentType is the content type of the task data to be published
	ConstPublishTaskContentType = "application/json"

	// ConstPublishTaskContentEncoding is the content encoding type of the task data to be published
	ConstPublishTaskContentEncoding = "utf-8"

	// ConstTaskDefaultExchangeName is the default exchange name to use when publishing a task
	ConstTaskDefaultExchangeName = ""

	// ConstTaskDefaultRoutingKey is the default routing key to use when publishing a task
	ConstTaskDefaultRoutingKey = "celery"

	// ConstTaskControlExchangeName is the exchange name for dispatching task control commands
	ConstTaskControlExchangeName = "celery.pidbox"

	// ConstEventsMonitorExchangeName is the exchange name used for Celery events
	ConstEventsMonitorExchangeName = "celeryev"

	// ConstEventsMonitorExchangeType is the exchange type for the events monitor
	ConstEventsMonitorExchangeType = "topic"

	// ConstEventsMonitorQueueName is the queue name of the events monitor
	ConstEventsMonitorQueueName = "celeriac-events-monitor-queue"

	// ConstEventsMonitorBindingKey is the binding key for the events monitor
	ConstEventsMonitorBindingKey = "*.*"

	// ConstEventsMonitorConsumerTag is the consumer tag name for the events monitor
	ConstEventsMonitorConsumerTag = "celeriac-events-monitor"

	// ConstTimeFormat is the general format for all timestamps
	ConstTimeFormat = "2006-01-02T15:04:05.999999"

	// ConstEventTypeWorkerOnline is the event type when a Celery worker comes online
	ConstEventTypeWorkerOnline = "worker-online"

	// ConstEventTypeWorkerOffline is the event type when a Celery worker goes offline
	ConstEventTypeWorkerOffline = "worker-offline"

	// ConstEventTypeWorkerHeartbeat is the event type when a Celery worker is online and "alive"
	ConstEventTypeWorkerHeartbeat = "worker-heartbeat"

	// ConstEventTypeTaskSent is the event type when a Celery task is sent
	ConstEventTypeTaskSent = "task-sent"

	// ConstEventTypeTaskReceived is the event type when a Celery worker receives a task
	ConstEventTypeTaskReceived = "task-received"

	// ConstEventTypeTaskStarted is the event type when a Celery worker starts a task
	ConstEventTypeTaskStarted = "task-started"

	// ConstEventTypeTaskSucceeded is the event type when a Celery worker completes a task
	ConstEventTypeTaskSucceeded = "task-succeeded"

	// ConstEventTypeTaskFailed is the event type when a Celery worker fails to complete a task
	ConstEventTypeTaskFailed = "task-failed"

	// ConstEventTypeTaskRevoked is the event type when a Celery worker has its task revoked
	ConstEventTypeTaskRevoked = "task-revoked"

	// ConstEventTypeTaskRetried is the event type when a Celery worker retries a task
	ConstEventTypeTaskRetried = "task-retried"
)

Variables

View Source
var (
	// ErrInvalidTaskID is raised when an invalid task ID has been detected
	ErrInvalidTaskID = errors.New("invalid task ID specified")

	// ErrInvalidTaskName is raised when an invalid task name has been detected
	ErrInvalidTaskName = errors.New("invalid task name specified")
)

Global Errors

Functions

func Fail

func Fail(err error, msg string)

Fail logs the error and exits the program Only use this to handle critical errors

func Log

func Log(err error, msg string)

Log only logs the error but doesn't exit the program Use this to log errors that should not exit the program

Types

type Event

type Event struct {
	// Type is the Celery event type. See supported events listed in "constants.go"
	Type string `json:"type"`

	// Hostname is the name of the host on which the Celery worker is operating
	Hostname string `json:"hostname"`

	// Timestamp is the current time of the event
	Timestamp float32 `json:"timestamp"`

	// PID is the process ID
	PID int `json:"pid"`

	// Clock is the current clock time
	Clock int `json:"clock"`

	// UTCOffset is the offset from UTC for the time when this event is valid
	UTCOffset int `json:"utcoffset"`

	// Data is a property allowing extra data to be sent through for custom events from a Celery worker
	Data interface{} `json:"data, omitempty"`
}

Event defines a base event emitted by Celery workers.

func NewEvent

func NewEvent() *Event

NewEvent is a factory function to create a new Event object

func (Event) MarshalEasyJSON

func (v Event) MarshalEasyJSON(w *jwriter.Writer)

MarshalEasyJSON supports easyjson.Marshaler interface

func (Event) MarshalJSON

func (v Event) MarshalJSON() ([]byte, error)

MarshalJSON supports json.Marshaler interface

func (*Event) TimestampFormatted

func (event *Event) TimestampFormatted() string

TimestampFormatted returns a formatted string representation of the task event timestamp

func (*Event) UnmarshalEasyJSON

func (v *Event) UnmarshalEasyJSON(l *jlexer.Lexer)

UnmarshalEasyJSON supports easyjson.Unmarshaler interface

func (*Event) UnmarshalJSON

func (v *Event) UnmarshalJSON(data []byte) error

UnmarshalJSON supports json.Unmarshaler interface

type PingCmd

type PingCmd struct {
	// contains filtered or unexported fields
}

PingCmd is a wrapper to a command

func NewPingCmd

func NewPingCmd() *PingCmd

NewPingCmd creates a new command for pinging workers

type RateLimitTaskCmd

type RateLimitTaskCmd struct {
	Arguments rateLimitTaskArgs `json:"arguments"`
	// contains filtered or unexported fields
}

RateLimitTaskCmd is a wrapper to a command

func NewRateLimitTaskCmd

func NewRateLimitTaskCmd(taskName string, rateLimit string) *RateLimitTaskCmd

NewRateLimitTaskCmd creates a new command for rate limiting a task

taskName: Name of task to change rate limit for rateLimit: The rate limit as tasks per second, or a rate limit string (`"100/m"`, etc.

see :attr:`celery.task.base.Task.rate_limit` for more information)

type RevokeTaskCmd

type RevokeTaskCmd struct {
	Arguments revokeTaskArgs `json:"arguments"`
	// contains filtered or unexported fields
}

RevokeTaskCmd is a wrapper to a command

func NewRevokeTaskCmd

func NewRevokeTaskCmd(taskID string, terminateProcess bool) *RevokeTaskCmd

NewRevokeTaskCmd creates a new command for revoking a task by given id

If a task is revoked, the workers will ignore the task and not execute it after all.

type Task

type Task struct {
	// TaskName is the name of the task
	TaskName string `json:"task"`

	// ID is the task UUID
	ID string `json:"id"`

	// Args are task arguments (optional)
	Args []string `json:"args, omitempty"`

	// KWArgs are keyword arguments (optional)
	KWArgs map[string]interface{} `json:"kwargs, omitempty"`

	// Retries is a number of retries to perform if an error occurs (optional)
	Retries int `json:"retries, omitempty"`

	// ETA is the estimated completion time (optional)
	ETA *time.Time `json:"eta, omitempty"`

	// Expires is the time when this task will expire (optional)
	Expires *time.Time `json:"expires, omitempty"`
}

Task is a representation of a Celery task

func NewTask

func NewTask(taskName string, args []string, kwargs map[string]interface{}) (*Task, error)

NewTask is a factory function that creates and returns a pointer to a new task object

func NewTaskWithID

func NewTaskWithID(taskID string, taskName string, args []string, kwargs map[string]interface{}) (*Task, error)

NewTaskWithID is a factory function that creates and returns a pointer to a new task object, allowing caller to specify the task ID.

func (*Task) MarshalJSON

func (task *Task) MarshalJSON() ([]byte, error)

MarshalJSON marshals a Task object into a json bytes array

Time properties are converted to UTC and formatted in ISO8601

type TaskEvent

type TaskEvent struct {
	Type      string  `json:"type"`
	Hostname  string  `json:"hostname"`
	Timestamp float32 `json:"timestamp"`
	PID       int     `json:"pid"`
	Clock     int     `json:"clock"`
	UTCOffset int     `json:"utcoffset"`

	// UUID is the id of the task
	UUID string `json:"uuid"`

	// Name is the textual name of the task executed
	Name string `json:"name, omitempty"`

	// Args is a string of the arguments passed to the task
	Args string `json:"args, omitempty"`

	// Kwargs is a string of the key-word arguments passed to the task
	Kwargs string `json:"kwargs, omitempty"`

	// Result is a string containing the result of a completed task
	Result string `json:"result, omitempty"`

	// Runtime is the execution time
	Runtime float32 `json:"runtime, omitempty"`

	// Retries is the number of re-tries this task has performed
	Retries int `json:"retries, omitempty"`

	// ETA is the explicit time and date to run the retry at.
	ETA interface{} `json:"eta, omitempty"`

	// Expires is the datetime or seconds in the future for the task should expire
	Expires interface{} `json:"expires, omitempty"`

	// Exception is a string containing error/exception information
	Exception string `json:"exception, omitempty"`

	// Traceback is a string containing extended error information
	Traceback string `json:"traceback, omitempty"`

	// Terminated is a flag indicating whether the task has been terminated
	Terminated bool `json:"terminated, omitempty"`

	// Signum is the signal number
	Signum interface{} `json:"signum, omitempty"`

	// Expired is a flag indicating whether the task has expired due to factors
	Expired bool `json:"expired, omitempty"`
}

TaskEvent is the JSON schema for Celery task events

func NewTaskEvent

func NewTaskEvent() *TaskEvent

NewTaskEvent is a factory function to create a new TaskEvent object

func (TaskEvent) MarshalEasyJSON

func (v TaskEvent) MarshalEasyJSON(w *jwriter.Writer)

MarshalEasyJSON supports easyjson.Marshaler interface

func (TaskEvent) MarshalJSON

func (v TaskEvent) MarshalJSON() ([]byte, error)

MarshalJSON supports json.Marshaler interface

func (*TaskEvent) UnmarshalEasyJSON

func (v *TaskEvent) UnmarshalEasyJSON(l *jlexer.Lexer)

UnmarshalEasyJSON supports easyjson.Unmarshaler interface

func (*TaskEvent) UnmarshalJSON

func (v *TaskEvent) UnmarshalJSON(data []byte) error

UnmarshalJSON supports json.Unmarshaler interface

type TaskEventsList

type TaskEventsList []TaskEvent

TaskEventsList is an array of task events

type TaskMonitor

type TaskMonitor struct {

	// Public channel on which events are piped
	EventsChannel chan interface{}
	// contains filtered or unexported fields
}

TaskMonitor is a Celery task event consumer

func NewTaskMonitor

func NewTaskMonitor(connection *amqp.Connection, channel *amqp.Channel,
	exchangeName string, exchangeType string, queueName string, bindingKey string, ctag string) (*TaskMonitor, error)

NewTaskMonitor is a factory function that creates a new Celery consumer

func (*TaskMonitor) SetMonitorWorkerHeartbeatEvents

func (monitor *TaskMonitor) SetMonitorWorkerHeartbeatEvents(processHeartbeatEvents bool)

SetMonitorWorkerHeartbeatEvents sets the property whether to process heartbeat events emitted by workers.

NOTE: By default this is set to 'false' so as to minimize unnecessary "noisy heartbeat" events.

func (*TaskMonitor) Shutdown

func (monitor *TaskMonitor) Shutdown() error

Shutdown stops all monitoring, cleaning up any open connections

type TaskQueueMgr

type TaskQueueMgr struct {
	Monitor *TaskMonitor
	// contains filtered or unexported fields
}

TaskQueueMgr defines a manager for interacting with a Celery task queue

func NewTaskQueueMgr

func NewTaskQueueMgr(brokerURI string) (*TaskQueueMgr, error)

NewTaskQueueMgr is a factory function that creates a new instance of the TaskQueueMgr

func (*TaskQueueMgr) Close

func (taskQueueMgr *TaskQueueMgr) Close()

Close performs appropriate cleanup of any open task queue connections

func (*TaskQueueMgr) DispatchTask

func (taskQueueMgr *TaskQueueMgr) DispatchTask(taskName string, taskData map[string]interface{}, routingKey string) (*Task, error)

DispatchTask places a new task on the Celery task queue Creates a new Task based on the supplied task name and data

func (*TaskQueueMgr) DispatchTaskWithID

func (taskQueueMgr *TaskQueueMgr) DispatchTaskWithID(taskID string, taskName string, taskData map[string]interface{}, routingKey string) (*Task, error)

DispatchTaskWithID places a new task with the specified ID on the Celery task queue Creates a new Task based on the supplied task name and data

func (*TaskQueueMgr) Ping

func (taskQueueMgr *TaskQueueMgr) Ping() error

Ping attempts to ping Celery workers

func (*TaskQueueMgr) RateLimitTask

func (taskQueueMgr *TaskQueueMgr) RateLimitTask(taskName string, rateLimit string) error

RateLimitTask attempts to set rate limit tasks by type

func (*TaskQueueMgr) RevokeTask

func (taskQueueMgr *TaskQueueMgr) RevokeTask(taskID string) error

RevokeTask attempts to notify Celery workers that the specified task needs revoking

func (*TaskQueueMgr) TimeLimitTask

func (taskQueueMgr *TaskQueueMgr) TimeLimitTask(taskName string, hardLimit string, softLimit string) error

TimeLimitTask attempts to set time limits for task by type

type TimeLimitTaskCmd

type TimeLimitTaskCmd struct {
	Arguments timeLimitTaskArgs `json:"arguments"`
	// contains filtered or unexported fields
}

TimeLimitTaskCmd is a wrapper to a command

func NewTimeLimitTaskCmd

func NewTimeLimitTaskCmd(taskName string, hardLimit string, softLimit string) *TimeLimitTaskCmd

NewTimeLimitTaskCmd creates a new command for rate limiting a task

taskName: Name of task to change rate limit for hardLimit: New hard time limit (in seconds) softLimit: New soft time limit (in seconds)

type WorkerEvent

type WorkerEvent struct {
	Type      string  `json:"type"`
	Hostname  string  `json:"hostname"`
	Timestamp float32 `json:"timestamp"`
	PID       int     `json:"pid"`
	Clock     int     `json:"clock"`
	UTCOffset int     `json:"utcoffset"`

	// SWSystem is the software system being used
	SWSystem string `json:"sw_sys"`

	// SWVersion is the software version being used
	SWVersion string `json:"sw_ver"`

	// LoadAverage is an array of average CPU loadings for the worker
	LoadAverage []float32 `json:"loadavg"`

	// Freq is the worker frequency use
	Freq float32 `json:"freq"`

	// SWIdentity is the software identity
	SWIdentity string `json:"sw_ident"`

	// Processed is the number of items processed
	Processed int `json:"processed, omitempt"`

	// Active is the active number of workers
	Active int `json:"active, omitempty"`
}

WorkerEvent defines an event emitted by workers, specific to its operation. Event "types" emitted are:

  • "worker-online"
  • "worker-offline"
  • "worker-heartbeat"

Example worker event json:

{
	"sw_sys": "Darwin",
	"clock": 74,
	"timestamp": 1843965659.580637,
	"hostname": "celery@worker1.My-Mac.local",
	"pid": 10837,
	"sw_ver": "3.1.18",
	"utcoffset": -11,
	"loadavg": [2.0, 2.41, 2.54],
	"processed": 6,
	"active": 0,
	"freq": 2.0,
	"type": "worker-offline",
	"sw_ident": "py-celery"
}

func NewWorkerEvent

func NewWorkerEvent() *WorkerEvent

NewWorkerEvent is a factory function to create a new WorkerEvent object

func (WorkerEvent) MarshalEasyJSON

func (v WorkerEvent) MarshalEasyJSON(w *jwriter.Writer)

MarshalEasyJSON supports easyjson.Marshaler interface

func (WorkerEvent) MarshalJSON

func (v WorkerEvent) MarshalJSON() ([]byte, error)

MarshalJSON supports json.Marshaler interface

func (*WorkerEvent) UnmarshalEasyJSON

func (v *WorkerEvent) UnmarshalEasyJSON(l *jlexer.Lexer)

UnmarshalEasyJSON supports easyjson.Unmarshaler interface

func (*WorkerEvent) UnmarshalJSON

func (v *WorkerEvent) UnmarshalJSON(data []byte) error

UnmarshalJSON supports json.Unmarshaler interface

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL