Documentation ¶
Overview ¶
Package linda is a background manager to poll jobs from broker and dispatch them to multi workers.
Linda Broker provides a unified API across different broker (queue) services.
Brokers allow you to defer the processing of a time consuming task.
Use ReleaseWithDelay func, you can implement a cron job service.
Simple Usage:
package main import ( "fmt" "github.com/amlun/linda" "github.com/sirupsen/logrus" "os" "os/signal" "syscall" "time" ) func init() { linda.RegisterWorkers("printArgs", PrintArgs) } func main() { logrus.SetLevel(logrus.DebugLevel) // broker b, _ := linda.NewBroker("redis://localhost:6379/") // saver s, _ := linda.NewSaver("redis://localhost:6379/") // config c := linda.Config{ Queue: "test", Timeout: 60, Interval: time.Second, WorkerNum: 4, } quit := signals() linda.Init(c, b, s) go func() { defer func() { linda.Quit() }() <-quit }() if err := linda.Run(); err != nil { fmt.Println("Error:", err) } } func PrintArgs(args ...interface{}) error { fmt.Println(args) return nil } // Signal Handling func signals() <-chan bool { quit := make(chan bool) go func() { signals := make(chan os.Signal) defer close(signals) signal.Notify(signals, syscall.SIGQUIT, syscall.SIGTERM, os.Interrupt) defer signalStop(signals) <-signals quit <- true }() return quit } // Stops signals channel. func signalStop(c chan<- os.Signal) { signal.Stop(c) }
Index ¶
- Constants
- Variables
- func Close()
- func Init(c Config, b Broker, s Saver) error
- func Quit()
- func RegisterBroker(scheme string, broker Broker)
- func RegisterSaver(scheme string, saver Saver)
- func RegisterWorkers(class string, worker workerFunc)
- func Run() error
- type Broker
- type Config
- type Job
- type Payload
- type RedisBroker
- func (r *RedisBroker) Close() error
- func (r *RedisBroker) Connect(rawUrl string, timeout time.Duration) error
- func (r *RedisBroker) Delete(queue, id string) error
- func (r *RedisBroker) Later(queue, id string, delay int64) error
- func (r *RedisBroker) MigrateExpiredJobs(queue string)
- func (r *RedisBroker) Push(queue, id string) error
- func (r *RedisBroker) Release(queue, id string, delay int64) error
- func (r *RedisBroker) Reserve(queue string, timeout int64) (id string, err error)
- type RedisSaver
- type Saver
- type State
Constants ¶
const ( // QueueName // the main queue name QueueName = "queue:%s" // ReservedQueueName // pop the job and send it to reserved queue ReservedQueueName = "queue:%s:reserved" // DelayedQueueName // push the job back into delayed queue DelayedQueueName = "queue:%s:delayed" )
const ( // ReserveScript -- Reserve the first job off of the queue... // KEYS[1] - The queue to pop jobs from, for example: queues:foo // KEYS[2] - The queue to place reserved jobs on, for example: queues:foo:reserved // ARGV[1] - The time at which the reserved job will expire ReserveScript = `` /* 168-byte string literal not displayed */ // ReleaseScript -- Remove the job from the current queue... // KEYS[1] - The "delayed" queue we release jobs onto, for example: queues:foo:delayed // KEYS[2] - The queue the jobs are currently on, for example: queues:foo:reserved // ARGV[1] - The raw payload of the job to add to the "delayed" queue // ARGV[2] - The UNIX timestamp at which the job should become available ReleaseScript = `` /* 143-byte string literal not displayed */ // MigrateJobsScript -- Get all of the jobs with an expired "score"... // KEYS[1] - The queue we are removing jobs from, for example: queues:foo:reserved // KEYS[2] - The queue we are moving jobs to, for example: queues:foo // ARGV[1] - The current UNIX timestamp MigrateJobsScript = `` /* 517-byte string literal not displayed */ )
const (
// Job Info Prefix
JobInfoPrefix = "linda:job:%s:info"
)
Variables ¶
var (
ErrNotInitialized = errors.New("you must init linda first")
)
var (
UnknownBroker = errors.New("unknown broker scheme")
)
var (
UnknownSaver = errors.New("unknown saver scheme")
)
Functions ¶
func RegisterBroker ¶
RegisterBroker is used to register brokers with scheme name You can use your own broker driver
func RegisterSaver ¶
RegisterSaver is used to register savers with scheme name You can use your own saver driver
func RegisterWorkers ¶
func RegisterWorkers(class string, worker workerFunc)
RegisterWorkers register worker with workerFunc map to the Job Payload.Class
Types ¶
type Broker ¶
type Broker interface { Connect(rawUrl string, timeout time.Duration) error Close() error MigrateExpiredJobs(queue string) Reserve(queue string, timeout int64) (string, error) Delete(queue, id string) error Release(queue, id string, delay int64) error Push(queue, id string) error Later(queue, id string, delay int64) error }
Broker is message transport[MQ] it provides a unified API, support multi drivers
type Job ¶
type Job struct { ID string `json:"id"` Queue string `json:"queue"` Period int64 `json:"period"` Retry int64 `json:"retry"` CreatedAt time.Time `json:"created_at"` Payload Payload `json:"payload"` State State `json:"state"` }
Job is the basic unit of this package it contains queue name and payload
type Payload ¶
type Payload struct { Class string `json:"class"` Args []interface{} `json:"args"` }
Payload is the job's payload
type RedisBroker ¶
type RedisBroker struct {
// contains filtered or unexported fields
}
RedisBroker broker driver with redis
func (*RedisBroker) Connect ¶
func (r *RedisBroker) Connect(rawUrl string, timeout time.Duration) error
Connect broker backend with url
func (*RedisBroker) Delete ¶
func (r *RedisBroker) Delete(queue, id string) error
Delete the reserved job [id] from broker most of the time it means the job has been done successfully
func (*RedisBroker) Later ¶
func (r *RedisBroker) Later(queue, id string, delay int64) error
Later is used for push a job in to the queue with a delay(second) time the job should be handled in the future time
func (*RedisBroker) MigrateExpiredJobs ¶
func (r *RedisBroker) MigrateExpiredJobs(queue string)
MigrateExpiredJobs is used for migrate expired jobs to ready queue
func (*RedisBroker) Push ¶
func (r *RedisBroker) Push(queue, id string) error
Push a job in to the queue
func (*RedisBroker) Release ¶
func (r *RedisBroker) Release(queue, id string, delay int64) error
Release is used for release the reserved job and push it back in to ready queue withe a delay(second) time this function maybe used for cron jobs
func (*RedisBroker) Reserve ¶
func (r *RedisBroker) Reserve(queue string, timeout int64) (id string, err error)
Reserve out a job [id] from broker with its life time if the reserved job is out of time(second) poller will kick it back in to ready queue if time out is 0, it means the job will be delete directly
type RedisSaver ¶
type RedisSaver struct {
// contains filtered or unexported fields
}