celery

package module
v0.0.0-...-2f7120a Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 27, 2013 License: BSD-3-Clause Imports: 14 Imported by: 0

README

Celery consumer

Consumes jobs that are published using Celery in Go.

Note: In very early development.

Installation

$ go get github.com/mattrobenolt/go-celery

Current status

  • AMQP consumer
  • Handles replies

TODO

  • Redis consumer
  • ETAs and retries
  • Lots of other things

Documentation

Index

Constants

View Source
const CELERY_FORMAT = `"2006-01-02T15:04:05.999999999"`

Variables

View Source
var (
	TwoSeconds          = 2 * time.Second
	MaximumRetriesError = errors.New("Maximum retries exceeded")
)
View Source
var (
	RetryError  = errors.New("Retry task again")
	RejectError = errors.New("Reject task")
)

Functions

func GetLogger

func GetLogger() log.Logger

func Init

func Init()

func RegisterTask

func RegisterTask(name string, worker Worker)

func RegisterTransport

func RegisterTransport(name string, t Transport)

func SetupLogging

func SetupLogging()

Types

type AMQP

type AMQP struct{}

func (*AMQP) Open

func (a *AMQP) Open(uri string) Driver

type AMQPDriver

type AMQPDriver struct {
	// contains filtered or unexported fields
}

func (*AMQPDriver) Bind

func (c *AMQPDriver) Bind(b *Binding) error

func (*AMQPDriver) Connect

func (c *AMQPDriver) Connect() (err error)

func (*AMQPDriver) DeclareExchange

func (c *AMQPDriver) DeclareExchange(e *Exchange) error

func (*AMQPDriver) DeclareQueue

func (c *AMQPDriver) DeclareQueue(q *Queue) error

func (*AMQPDriver) GetMessages

func (c *AMQPDriver) GetMessages(q *Queue, rate int) (<-chan *Message, error)

func (*AMQPDriver) IsConnected

func (c *AMQPDriver) IsConnected() bool

func (*AMQPDriver) Publish

func (c *AMQPDriver) Publish(p *Publishing) error

type AMQPReceipt

type AMQPReceipt struct {
	// contains filtered or unexported fields
}

func (*AMQPReceipt) Ack

func (r *AMQPReceipt) Ack()

func (*AMQPReceipt) Reject

func (r *AMQPReceipt) Reject()

func (*AMQPReceipt) Reply

func (r *AMQPReceipt) Reply(id string, data interface{})

func (*AMQPReceipt) Requeue

func (r *AMQPReceipt) Requeue()

type Binding

type Binding struct {
	Name     string
	Queue    *Queue
	Exchange *Exchange
}

func NewBinding

func NewBinding(name string, q *Queue, e *Exchange) *Binding

type Broker

type Broker struct {
	// contains filtered or unexported fields
}

func NewBroker

func NewBroker(uri string) *Broker

func (*Broker) StartConsuming

func (b *Broker) StartConsuming(q *Queue, rate int) Deliveries

type Connection

type Connection struct {
	// contains filtered or unexported fields
}

func NewConnection

func NewConnection(driver Driver) *Connection

func (*Connection) Bind

func (c *Connection) Bind(b *Binding) error

func (*Connection) Consume

func (c *Connection) Consume(q *Queue, rate int) (<-chan *Message, error)

func (*Connection) DeclareExchange

func (c *Connection) DeclareExchange(e *Exchange) error

func (*Connection) DeclareQueue

func (c *Connection) DeclareQueue(q *Queue) error

func (*Connection) Ping

func (c *Connection) Ping() (err error)

type Deliveries

type Deliveries chan *Task

type Driver

type Driver interface {
	Connect() error
	DeclareExchange(*Exchange) error
	DeclareQueue(*Queue) error
	Bind(*Binding) error
	GetMessages(*Queue, int) (<-chan *Message, error)
	Publish(*Publishing) error
	IsConnected() bool
}

type Exchange

type Exchange struct {
	Name               string
	Type               string
	Durable            bool
	DeleteWhenComplete bool
}

func NewDurableExchange

func NewDurableExchange(name string) *Exchange

func NewExchange

func NewExchange(name string, durable bool) *Exchange

type Message

type Message struct {
	ContentType string
	Body        []byte
	Receipt     Receipt
}

type Publishing

type Publishing struct {
	Key      string
	Exchange *Exchange
	Body     []byte
}

type Queue

type Queue struct {
	Name             string
	Durable          bool
	DeleteWhenUnused bool
	Ttl              int
}

func NewDurableQueue

func NewDurableQueue(name string) *Queue

func NewExpiringQueue

func NewExpiringQueue(name string, ttl int) *Queue

func NewQueue

func NewQueue(name string, durable bool, ttl int) *Queue

type Receipt

type Receipt interface {
	Reply(string, interface{})
	Ack()
	Requeue()
	Reject()
}

type Result

type Result struct {
	Status    ResultStatus `json:"status"`
	Traceback []string     `json:"traceback"`
	Result    interface{}  `json:"result"`
	Id        string       `json:"task_id"`
	Children  []string     `json:"children"`
}

type ResultStatus

type ResultStatus string
const (
	StatusSuccess ResultStatus = "SUCCESS"
)

type Task

type Task struct {
	Task    string                 `json:"task"`
	Id      string                 `json:"id"`
	Args    []interface{}          `json:"args"`
	Kwargs  map[string]interface{} `json:"kwargs"`
	Retries int                    `json:"retries"`
	Eta     celeryTime             `json:"eta"`
	Expires celeryTime             `json:"expires"`
	Receipt Receipt                `json:"-"`
}

func (*Task) Ack

func (t *Task) Ack(result interface{})

func (*Task) Reject

func (t *Task) Reject()

func (*Task) Requeue

func (t *Task) Requeue()

func (*Task) String

func (t *Task) String() string

type Transport

type Transport interface {
	Open(string) Driver
}

type Worker

type Worker interface {
	Exec(*Task) (interface{}, error)
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL