queue

package
v1.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 24, 2021 License: MIT Imports: 11 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (

	// ErrInvalidProvider when provider invalid return
	ErrInvalidProvider = errors.New("config: invalid queue provider")
)

Functions

func ConsumFailInc

func ConsumFailInc(name string)

ConsumFailInc increment failure consume job num

func ConsumSuccessInc

func ConsumSuccessInc(name string)

ConsumSuccessInc increment success consume job num

func Consumers

func Consumers() map[string]Consumer

Consumers return consumers

func InFailInc

func InFailInc(name string)

InFailInc incrment failure enqueue job num

func InSuccessInc

func InSuccessInc(name string)

InSuccessInc increment success enqueue job num

func MetricTurnon

func MetricTurnon()

MetricTurnon turn on metric exporter

func ProcessTimeHist

func ProcessTimeHist(name string, previous time.Time)

ProcessTimeHist stats job process time

func RegisterConsumer

func RegisterConsumer(name string, f Consumer)

RegisterConsumer use Register Queue job consumer

Types

type Base

type Base struct {
	Opts Options

	Debug   bool
	Enqueue func([]byte) error
	// contains filtered or unexported fields
}

Base struct

func NewBase

func NewBase(opts Options) *Base

NewBase return base queue

func (*Base) Backoff

func (b *Base) Backoff()

Backoff use for backoff

func (*Base) BackoffReset

func (b *Base) BackoffReset()

BackoffReset use for reset backoff

func (*Base) Do

func (b *Base) Do(job Jober, fn Consumer, cbs ...func(error)) (err error)

Do use for run queue consume action

func (*Base) GetConsumer

func (b *Base) GetConsumer(name string) (Consumer, error)

GetConsumer option

func (*Base) In

func (b *Base) In(job Jober) (err error)

In use for job enqueue

func (*Base) Parse

func (b *Base) Parse(payload string) (j Jober, err error)

Parse use for parse job info

func (*Base) RegisterConsumer

func (b *Base) RegisterConsumer(name string, f Consumer)

RegisterConsumer use for register queue consumer

func (*Base) Run

func (b *Base) Run()

Run use for loop run job handle

func (*Base) SemAcquire

func (b *Base) SemAcquire(n int)

SemAcquire use for acquire semaphore

func (*Base) SemRelease

func (b *Base) SemRelease(n int)

SemRelease use for release semphore

type Consumer

type Consumer func(job Jober) error

Consumer define job consumer

type Job

type Job struct {
	Name     string
	Payload  []byte
	MaxTries int
	Timeout  time.Duration
}

Job struct

func (*Job) GetMaxTries

func (j *Job) GetMaxTries() int

GetMaxTries return the max tries time of job

func (*Job) GetName

func (j *Job) GetName() string

GetName return the name of job

func (*Job) GetPayload

func (j *Job) GetPayload() []byte

GetPayload return the payload of job

func (*Job) PackPayload

func (j *Job) PackPayload() (packed []byte, err error)

PackPayload use for pack the data of job

func (*Job) Serialize

func (j *Job) Serialize() (data []byte, err error)

Serialize use for serialize job

func (*Job) UnpackPayload

func (j *Job) UnpackPayload(v interface{}) error

UnpackPayload use for unpack the data of job

func (*Job) Unserialize

func (j *Job) Unserialize() (Job, error)

Unserialize use for unserialize job

type Jober

type Jober interface {
	GetName() string
	GetMaxTries() int
	Serialize() ([]byte, error)
	Unserialize() (Job, error)
	PackPayload() ([]byte, error)
	UnpackPayload(interface{}) error
	GetPayload() []byte
}

Jober define job interface

type Options

type Options struct {
	Name        string
	Driver      string
	Parallel    int
	Debug       bool
	Metric      bool
	Consume     bool
	WaitTimeOut time.Duration
	Logger      *logrus.Logger
}

Options struct

func (*Options) Init

func (opts *Options) Init()

Init options

type Queuer

type Queuer interface {
	In(job Jober) (err error)
	Parse(payload string) (j Jober, err error)
	Do(job Jober, fn Consumer, cbs ...func(error)) (err error)
	RegisterConsumer(name string, f Consumer)
	Run()
}

Queuer interface

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL