linda

package module
v0.0.0-...-330980f Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 4, 2019 License: MIT Imports: 10 Imported by: 0

README

Linda

Build Status GoDoc GoReport License

Linda is a background manager to poll jobs from broker and dispatch them to multi workers.

Linda Broker provides a unified API across different broker (queue) services.

Linda Saver provides a unified API across different saver (db) services.

Brokers allow you to defer the processing of a time consuming task.

When job done, use Release func to release the job with a delay (seconds), you can implement a cron job service.

The real period is job.Period + Interval

Inspiration comes from beanstalkd and goworker

Installation

To install Linda, use

go get github.com/amlun/linda

to get the main package, and then use glide

glide install

to install the dependencies

Getting Started

Terminology
  • Broker

message transport [MQ]

  • Saver

job info storage [Database]

  • poller

poll job from the broker and send to local job channels

poller also migrate the expire jobs

  • worker

worker is the main process to work the job

Worker Type
type workerFunc func(...interface{}) error
Register Worker
linda.RegisterWorkers("MyClass", myFunc)
Broker Interface
type Broker interface {
	Connect(url *neturl.URL) error
	Close() error
	MigrateExpiredJobs(queue string)
	Reserve(queue string, timeout int64) (string, error)
	Delete(queue, id string) error
	Release(queue, id string, delay int64) error
	Push(queue, id string) error
	Later(queue, id string, delay int64) error
}
Saver Interface
type Saver interface {
	Connect(url *neturl.URL) error
	Close() error
	Put(job *Job) error
	Get(id string) (*Job, error)
}
Examples

Add jobs to saver and push them to broker

go run example/push_jobs/main.go

example/push_jobs/main.go

package main

import (
	"github.com/amlun/linda"
	"github.com/sirupsen/logrus"
	"time"
)

func main() {
	var err error
	var b linda.Broker
	var s linda.Saver
	// broker
	if b, err = linda.NewBroker("redis://localhost:6379/"); err != nil {
		logrus.Error(err)
		return
	}
	// saver
	if s, err = linda.NewSaver("redis://localhost:6379/"); err != nil {
		logrus.Error(err)
		return
	}
	// job
	var jobID = "1"
	var queue = "test"
	var job = &linda.Job{
		ID:        jobID,
		Queue:     queue,
		Period:    60,
		Retry:     3,
		CreatedAt: time.Now(),
		Payload: linda.Payload{
			Class: "printArgs",
			Args:  []interface{}{"a", "b", "c"},
		},
	}
	// save job
	if err = s.Put(job); err != nil {
		logrus.Error(err)
		return
	}
	// push to broker
	if err = b.Push(queue, jobID); err != nil {
		logrus.Error(err)
		return
	}
}

Worker run to consume the job

go run example/print_args/main.go

example/print_args/main.go

package main

import (
	"fmt"
	"github.com/amlun/linda"
	"github.com/sirupsen/logrus"
	"os"
	"os/signal"
	"syscall"
	"time"
)

func init() {
	linda.RegisterWorkers("printArgs", PrintArgs)
}

func main() {
	logrus.SetLevel(logrus.DebugLevel)
	// broker
	b, _ := linda.NewBroker("redis://localhost:6379/")
	// saver
	s, _ := linda.NewSaver("redis://localhost:6379/")
	// config
	c := linda.Config{
		Queue:     "test",
		Timeout:   60,
		Interval:  time.Second,
		WorkerNum: 4,
	}
	quit := signals()
	linda.Init(c, b, s)
	go func() {
		defer func() {
			linda.Quit()
		}()
		<-quit
	}()

	if err := linda.Run(); err != nil {
		fmt.Println("Error:", err)
	}
}

func PrintArgs(args ...interface{}) error {
	fmt.Println(args)
	return nil
}

// Signal Handling
func signals() <-chan bool {
	quit := make(chan bool)
	go func() {
		signals := make(chan os.Signal)
		defer close(signals)
		signal.Notify(signals, syscall.SIGQUIT, syscall.SIGTERM, os.Interrupt)
		defer signalStop(signals)
		<-signals
		quit <- true
	}()
	return quit
}

// Stops signals channel.
func signalStop(c chan<- os.Signal) {
	signal.Stop(c)
}

Features

Broker List
  • Redis
  • beanstalkd
  • RabbitMQ

Design

System Design

system-design

Job State
   later                                release
  ----------------> [DELAYED] <------------.
                        |                   |
                   kick | (time passes)     |
                        |                   |
   push                 v     reserve       |       delete
  -----------------> [READY] ---------> [RESERVED] --------> *poof*
                        ^                   |
                         \                  |
                          `-----------------'
                           kick (time out)
 

Thanks

Documentation

Overview

Package linda is a background manager to poll jobs from broker and dispatch them to multi workers.

Linda Broker provides a unified API across different broker (queue) services.

Brokers allow you to defer the processing of a time consuming task.

Use ReleaseWithDelay func, you can implement a cron job service.

Simple Usage:

package main

import (
	"fmt"
	"github.com/amlun/linda"
	"github.com/sirupsen/logrus"
	"os"
	"os/signal"
	"syscall"
	"time"
)

func init() {
	linda.RegisterWorkers("printArgs", PrintArgs)
}

func main() {
	logrus.SetLevel(logrus.DebugLevel)
	// broker
	b, _ := linda.NewBroker("redis://localhost:6379/")
	// saver
	s, _ := linda.NewSaver("redis://localhost:6379/")
	// config
	c := linda.Config{
		Queue:     "test",
		Timeout:   60,
		Interval:  time.Second,
		WorkerNum: 4,
	}
	quit := signals()
	linda.Init(c, b, s)
	go func() {
		defer func() {
			linda.Quit()
		}()
		<-quit
	}()

	if err := linda.Run(); err != nil {
		fmt.Println("Error:", err)
	}
}

func PrintArgs(args ...interface{}) error {
	fmt.Println(args)
	return nil
}

// Signal Handling
func signals() <-chan bool {
	quit := make(chan bool)
	go func() {
		signals := make(chan os.Signal)
		defer close(signals)
		signal.Notify(signals, syscall.SIGQUIT, syscall.SIGTERM, os.Interrupt)
		defer signalStop(signals)
		<-signals
		quit <- true
	}()
	return quit
}

// Stops signals channel.
func signalStop(c chan<- os.Signal) {
	signal.Stop(c)
}

Index

Constants

View Source
const (
	// QueueName
	// the main queue name
	QueueName = "queue:%s"
	// ReservedQueueName
	// pop the job and send it to reserved queue
	ReservedQueueName = "queue:%s:reserved"
	// DelayedQueueName
	// push the job back into delayed queue
	DelayedQueueName = "queue:%s:delayed"
)
View Source
const (

	// ReserveScript -- Reserve the first job off of the queue...
	// KEYS[1] - The queue to pop jobs from, for example: queues:foo
	// KEYS[2] - The queue to place reserved jobs on, for example: queues:foo:reserved
	// ARGV[1] - The time at which the reserved job will expire
	ReserveScript = `` /* 168-byte string literal not displayed */

	// ReleaseScript -- Remove the job from the current queue...
	// KEYS[1] - The "delayed" queue we release jobs onto, for example: queues:foo:delayed
	// KEYS[2] - The queue the jobs are currently on, for example: queues:foo:reserved
	// ARGV[1] - The raw payload of the job to add to the "delayed" queue
	// ARGV[2] - The UNIX timestamp at which the job should become available
	ReleaseScript = `` /* 143-byte string literal not displayed */

	// MigrateJobsScript -- Get all of the jobs with an expired "score"...
	// KEYS[1] - The queue we are removing jobs from, for example: queues:foo:reserved
	// KEYS[2] - The queue we are moving jobs to, for example: queues:foo
	// ARGV[1] - The current UNIX timestamp
	MigrateJobsScript = `` /* 517-byte string literal not displayed */

)
View Source
const (
	// Job Info Prefix
	JobInfoPrefix = "linda:job:%s:info"
)

Variables

View Source
var (
	ErrNotInitialized = errors.New("you must init linda first")
)
View Source
var (
	UnknownBroker = errors.New("unknown broker scheme")
)
View Source
var (
	UnknownSaver = errors.New("unknown saver scheme")
)

Functions

func Close

func Close()

Close linda with close broker and saver

func Init

func Init(c Config, b Broker, s Saver) error

Open linda with config get instance of broker

func Quit

func Quit()

func RegisterBroker

func RegisterBroker(scheme string, broker Broker)

RegisterBroker is used to register brokers with scheme name You can use your own broker driver

func RegisterSaver

func RegisterSaver(scheme string, saver Saver)

RegisterSaver is used to register savers with scheme name You can use your own saver driver

func RegisterWorkers

func RegisterWorkers(class string, worker workerFunc)

RegisterWorkers register worker with workerFunc map to the Job Payload.Class

func Run

func Run() error

Run linda, it also call init function self

Types

type Broker

type Broker interface {
	Connect(rawUrl string, timeout time.Duration) error
	Close() error
	MigrateExpiredJobs(queue string)
	Reserve(queue string, timeout int64) (string, error)
	Delete(queue, id string) error
	Release(queue, id string, delay int64) error
	Push(queue, id string) error
	Later(queue, id string, delay int64) error
}

Broker is message transport[MQ] it provides a unified API, support multi drivers

func NewBroker

func NewBroker(rawUrl string) (Broker, error)

NewBroker will get an instance of broker with url string if there is no matched scheme, return error now broker only support redis

type Config

type Config struct {
	Queue     string
	Timeout   int64
	Interval  time.Duration
	PollerNum int
	WorkerNum int
}

type Job

type Job struct {
	ID        string    `json:"id"`
	Queue     string    `json:"queue"`
	Period    int64     `json:"period"`
	Retry     int64     `json:"retry"`
	CreatedAt time.Time `json:"created_at"`
	Payload   Payload   `json:"payload"`
	State     State     `json:"state"`
}

Job is the basic unit of this package it contains queue name and payload

func (*Job) String

func (j *Job) String() string

String format job to string

type Payload

type Payload struct {
	Class string        `json:"class"`
	Args  []interface{} `json:"args"`
}

Payload is the job's payload

type RedisBroker

type RedisBroker struct {
	// contains filtered or unexported fields
}

RedisBroker broker driver with redis

func (*RedisBroker) Close

func (r *RedisBroker) Close() error

Close the broker

func (*RedisBroker) Connect

func (r *RedisBroker) Connect(rawUrl string, timeout time.Duration) error

Connect broker backend with url

func (*RedisBroker) Delete

func (r *RedisBroker) Delete(queue, id string) error

Delete the reserved job [id] from broker most of the time it means the job has been done successfully

func (*RedisBroker) Later

func (r *RedisBroker) Later(queue, id string, delay int64) error

Later is used for push a job in to the queue with a delay(second) time the job should be handled in the future time

func (*RedisBroker) MigrateExpiredJobs

func (r *RedisBroker) MigrateExpiredJobs(queue string)

MigrateExpiredJobs is used for migrate expired jobs to ready queue

func (*RedisBroker) Push

func (r *RedisBroker) Push(queue, id string) error

Push a job in to the queue

func (*RedisBroker) Release

func (r *RedisBroker) Release(queue, id string, delay int64) error

Release is used for release the reserved job and push it back in to ready queue withe a delay(second) time this function maybe used for cron jobs

func (*RedisBroker) Reserve

func (r *RedisBroker) Reserve(queue string, timeout int64) (id string, err error)

Reserve out a job [id] from broker with its life time if the reserved job is out of time(second) poller will kick it back in to ready queue if time out is 0, it means the job will be delete directly

type RedisSaver

type RedisSaver struct {
	// contains filtered or unexported fields
}

func (*RedisSaver) Close

func (r *RedisSaver) Close() error

Close the saver

func (*RedisSaver) Connect

func (r *RedisSaver) Connect(rawUrl string, timeout time.Duration) error

Connect saver backend with url

func (*RedisSaver) Delete

func (r *RedisSaver) Delete(id string) error

Delete the job from saver

func (*RedisSaver) Get

func (r *RedisSaver) Get(id string) (*Job, error)

Get the job from saver

func (*RedisSaver) Put

func (r *RedisSaver) Put(job *Job) error

Put the job to saver

type Saver

type Saver interface {
	Connect(url string, timeout time.Duration) error
	Close() error
	Put(job *Job) error
	Get(id string) (*Job, error)
	Delete(id string) error
}

Saver is job saved on [Storage] it provides a unified API, support multi drivers

func NewSaver

func NewSaver(rawUrl string) (Saver, error)

NewSaver will get an instance of saver with url string if there is no matched scheme, return error now saver only support redis

type State

type State struct {
	RunTimes  int64     `json:"run_times"`
	LastRunAt time.Time `json:"last_run_at"`
	Retries   int64     `json:"retries"`
}

State is the job's running state

Directories

Path Synopsis
example

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL