jq

package module
v0.5.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 1, 2021 License: MIT Imports: 10 Imported by: 3

README

Redis job queue

A simple job queue use redis as backend.

It just uses Redis List, not Redis Stream.

Use this for:

  • The simplest multi-publisher multi-worker model.
  • Random small processing delays are acceptable.
  • Want to know what it does.

Not for:

  • Blocked consumers
  • FanOut model
  • Strict Message Time Series

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type HandlerFunc

type HandlerFunc func(job *Job) error

HandlerFunc is your custom function to process job. Notice: It must be thread safe, since it will be called parallel.

type Job

type Job struct {
	ID      string
	PubAt   time.Time
	Retried int
	Payload []byte
}

Job is what redis stored in list

func (*Job) Bind

func (j *Job) Bind(v interface{}) error

Bind job payload to target, make sure the target is a pointer, same as you published before.

type Logger

type Logger interface {
	Debugf(format string, args ...interface{})
	Infof(format string, args ...interface{})
	Errorf(format string, args ...interface{})
}

Logger can be logrus or zap sugared logger, or your own.

type Queue

type Queue struct {
	// contains filtered or unexported fields
}

Queue is just one queue

func NewQueue

func NewQueue(name string, rdb RedisClient) *Queue

NewQueue create a queue

func (*Queue) Drop added in v0.2.0

func (q *Queue) Drop(job *Job)

Drop the job,put it to drop queue,if SafeDrop is true.

func (*Queue) Get

func (q *Queue) Get() (*Job, error)

Get a single job from redis. The error returned would be redis.Nil, use errors.Is to check it. This function is not normally used, unless you want to write your own worker. You can use our out of box StartWorker()

func (*Queue) Pub

func (q *Queue) Pub(payload interface{}) (string, error)

Pub publish a job to queue,the payload must be able to be marshalled by msgpack(https://github.com/vmihailenco/msgpack).

func (*Queue) PubTo

func (q *Queue) PubTo(name string, payload interface{}) (string, error)

PubTo can pub a job to another queue which in same redis

func (*Queue) Retry

func (q *Queue) Retry(ctx context.Context, job *Job)

Retry the job, before sending job to queue, it will sleep a while. Use context signal to control this sleep, if worker will restart. This function is not normally used, unless you want to write your own worker. You can use our out of box StartWorker()

func (*Queue) StartWorker

func (q *Queue) StartWorker(ctx context.Context, handle HandlerFunc, opt *WorkerOptions)

StartWorker is blocked.

func (*Queue) Status added in v0.2.0

func (q *Queue) Status() (*Status, error)

Status of the queue

type RedisClient

type RedisClient interface {
	Get(ctx context.Context, key string) *redis.StringCmd
	Set(ctx context.Context, key string, value interface{}, expiration time.Duration) *redis.StatusCmd
	Del(ctx context.Context, keys ...string) *redis.IntCmd
	Incr(ctx context.Context, key string) *redis.IntCmd
	HIncrBy(ctx context.Context, key, field string, incr int64) *redis.IntCmd
	HGetAll(ctx context.Context, key string) *redis.StringStringMapCmd
	Exists(ctx context.Context, keys ...string) *redis.IntCmd
	Expire(ctx context.Context, key string, expiration time.Duration) *redis.BoolCmd
	LPush(ctx context.Context, key string, values ...interface{}) *redis.IntCmd
	RPop(ctx context.Context, key string) *redis.StringCmd
	LLen(ctx context.Context, key string) *redis.IntCmd
	TxPipeline() redis.Pipeliner
}

RedisClient is because go-redis has many kind of clients.

type ReportFunc added in v0.5.0

type ReportFunc func(status *Status)

ReportFunc work together with worker options "Idle",custom your counter report.

type Status added in v0.2.0

type Status struct {
	IsRunning bool
	Process   int
	Success   int
	Failed    int
	Dropped   int
	Total     int
}

type WorkerOptions

type WorkerOptions struct {
	// If job handler fails,max retry these times. Default:10
	MaxRetry int
	// Parallel worker count. Default:2
	Parallel int64
	// If there is no job, worker will take a break Default: 3s
	Interval time.Duration
	// If the workers are inactive during these duration, watcher will clear count and make a report. Default: 3min
	Idle time.Duration
	// Working together with "Idle",custom your report.
	Reporter ReportFunc
	// If a redis server error occurred, wait and retry. Default: 1min
	Recover time.Duration
	// If a job exceeds the max retry time, save it to dropped queue, or really dropped.
	// Default is false, avoiding memory leaks.
	SafeDrop bool
	// You can use your own logger
	Logger Logger
	// If you pass a wait group in,worker will release it in the end of life.
	WG *sync.WaitGroup
}

WorkerOptions is optional when starting a worker

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL