delayed

package
v0.0.0-...-b9b3a47 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 24, 2023 License: MIT Imports: 18 Imported by: 0

Documentation

Index

Constants

View Source
const (
	StatusStopped uint32 = iota
	StatusRunning
	StatusStopping
)

Variables

View Source
var InvalidRedisReplyError = errors.New("Invalid redis reply")

Functions

func NewRedisPool

func NewRedisPool(address string, options ...redis.DialOption) *redis.Pool

NewRedisPool creates a new redis pool.

func RandHexString

func RandHexString(size int) string

RandHexString generates a random hex string.

func Recover

func Recover()

Recover recovers from a panic.

Types

type GoTask

type GoTask struct {
	// contains filtered or unexported fields
}

GoTask store a RawGoTask and the serialized data.

func DeserializeGoTask

func DeserializeGoTask(data []byte) (task *GoTask, err error)

DeserializeGoTask creates a new GoTask from the serialized data.

func NewGoTask

func NewGoTask(funcPath string, arg ...interface{}) *GoTask

NewGoTask creates a new GoTask by the function path.

func NewGoTaskOfFunc

func NewGoTaskOfFunc(f interface{}, arg ...interface{}) *GoTask

NewGoTaskOfFunc creates a new GoTask by a function. It's about 100x slower than NewGoTask.

func (*GoTask) Equal

func (t *GoTask) Equal(task *GoTask) bool

Equal returns if two tasks are equal. It may return false if one task is not serialized and the other is deserialized.

func (*GoTask) Serialize

func (t *GoTask) Serialize() (data []byte, err error)

Serialize returns the serialized data of the task.

type Handler

type Handler struct {
	// contains filtered or unexported fields
}

A handler stores a function and other information about how to call it.

func NewHandler

func NewHandler(f interface{}) (h *Handler)

NewHandler creates a handler for a function.

func (*Handler) Call

func (h *Handler) Call(payload []byte) (result []reflect.Value, err error)

Call executes the function of a handler.

type PyTask

type PyTask struct {
	// contains filtered or unexported fields
}

PyTask store a RawPyTask and the serialized data.

func NewPyTask

func NewPyTask(funcPath string, args, kwArgs interface{}) *PyTask

NewPyTask creates a new PyTask by the function path.

func (*PyTask) Serialize

func (t *PyTask) Serialize() (data []byte, err error)

Serialize returns the serialized data of the task.

type Queue

type Queue struct {
	// contains filtered or unexported fields
}

Queue is the struct of a task queue.

func NewQueue

func NewQueue(name string, redisPool *redis.Pool, options ...QueueOption) *Queue

NewQueue creates a new queue.

func (*Queue) Clear

func (q *Queue) Clear() error

Clear removes all data related to the queue in Redis.

func (*Queue) Dequeue

func (q *Queue) Dequeue() (task *GoTask, err error)

Dequeue pops a task from the front of the queue.

func (*Queue) Enqueue

func (q *Queue) Enqueue(task Task) (err error)

Enqueue appends a task to the queue.

func (*Queue) Len

func (q *Queue) Len() (count int, err error)

Len returns the task count of the queue.

func (*Queue) Release

func (q *Queue) Release() (err error)

Release releases the currently dequeued task. It should be called after finishing a task.

func (*Queue) RequeueLost

func (q *Queue) RequeueLost() (count int, err error)

RequeueLost finds out lost tasks and recovers them. It should be called periodically to prevent losing tasks. The lost tasks were those popped from the queue, but its dead worker hadn't released it.

type QueueOption

type QueueOption func(*Queue)

func DequeueTimeout

func DequeueTimeout(d time.Duration) QueueOption

DequeueTimeout sets the dequeue timeout of a queue. It must be larger than 1 ms, Redis BLPOP treats timeout equal or less than 0.001 second as 0 (forever).

func KeepAliveTimeout

func KeepAliveTimeout(d time.Duration) QueueOption

KeepAliveTimeout sets the keep alive timeout of the worker of a queue.

type RawGoTask

type RawGoTask struct {
	FuncPath string
	Payload  []byte // serialized arg
}

RawGoTask store the fields need to be serialized for a GoTask.

type RawPyTask

type RawPyTask struct {
	FuncPath string
	Args     interface{} // must be slice, array or nil
	KwArgs   interface{} // must be map, struct or nil
}

RawPyTask store the fields need to be serialized for a PyTask.

type Sweeper

type Sweeper struct {
	// contains filtered or unexported fields
}

Sweeper keeps recovering lost tasks.

func NewSweeper

func NewSweeper(queues ...*Queue) *Sweeper

NewSweeper creates a new sweeper.

func (*Sweeper) Run

func (s *Sweeper) Run()

Run starts the sweeper.

func (*Sweeper) SetInterval

func (s *Sweeper) SetInterval(interval time.Duration)

SetInterval sets the interval of the sweeper.

func (*Sweeper) Stop

func (s *Sweeper) Stop()

Stop stops the sweeper.

type Task

type Task interface {
	Serialize() ([]byte, error)
	// contains filtered or unexported methods
}

Task is the interface of both GoTask and PyTask.

type Worker

type Worker struct {
	// contains filtered or unexported fields
}

Worker keeps dequeuing and processing Go tasks.

func NewWorker

func NewWorker(queue *Queue, options ...WorkerOption) *Worker

NewWorker creates a new worker.

func (*Worker) Die

func (w *Worker) Die()

Die marks the worker as dead.

func (*Worker) Execute

func (w *Worker) Execute(t *GoTask)

Execute executes a task.

func (*Worker) KeepAlive

func (w *Worker) KeepAlive()

KeepAlive keeps the worker alive.

func (*Worker) RegisterHandlers

func (w *Worker) RegisterHandlers(funcs ...interface{})

RegisterHandlers registers handlers. Tasks with function not been registered will be ignored.

func (*Worker) Run

func (w *Worker) Run()

Run starts the worker.

func (*Worker) Stop

func (w *Worker) Stop()

Stop stops the worker.

type WorkerOption

type WorkerOption func(*Worker)

func KeepAliveDuration

func KeepAliveDuration(d time.Duration) WorkerOption

KeepAliveDuration sets the keep alive duration of a worker.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL