worker

package module
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 8, 2019 License: MIT Imports: 5 Imported by: 5

README

Worker

GoDoc Build Status codecov Go Report Card codebeat badge

Package worker adding the abstraction layer around background jobs, allows make a job periodically, observe execution time and to control concurrent execution.

Group of workers allows to control jobs start time and wait until all runned workers finished when we need stop all jobs.

Features

  • Scheduling, use one from existing worker.By* schedule functions. Supporting cron schedule spec format by robfig/cron parser.
  • Control concurrent execution around multiple instances by worker.WithLock. See existing lockers
  • Observe a job execution time duration with worker.SetObserever. Friendly for prometheus/client_golang package.
  • Graceful stop, wait until all running jobs was completed.

Example

wg := worker.NewGroup()
wg.Add(
    worker.
        New(func(context.Context) {}).
        ByTicker(time.Second),

    worker.
        New(func(context.Context) {}).
        ByTimer(time.Second),

    worker.
        New(func(context.Context) {}).
        ByCronSpec("@every 1s"),
)
wg.Run()

Lockers

You can use redis locks for controll exclusive job execution:

l := locker.NewRedis(radix.Client, "job_lock_name", locker.RedisLockTTL(time.Minute))

w := worker.
        New(func(context.Context) {}).
        WithLock(l)

// Job will be executed only if `job_lock_name` redis key not exists.
w.Run(context.Background())

Documentation

Overview

Package worker adding the abstraction layer around background jobs, allows make a job periodically, observe execution time and to control concurrent execution. Group of workers allows to control jobs start time and wait until all runned workers finished when we need stop all jobs.

Usage

Create group and workers with empty job:

wg := worker.NewGroup()
w1 := worker.New(func(context.Context) {})
w2 := worker.New(func(context.Context) {})
w3 := worker.New(func(context.Context) {})

Add workers to group and run all jobs:

wg.Add(w1, w2, w3)
wg.Run()

Stop all workers:

wg.Stop()

Periodic jobs

Set job execution period to worker (only the last will be applied)

w := worker.New(func(context.Context) {})
w.ByTicker(time.Second)
w.ByTimer(time.Second)
w.ByCronSpec("@every 1s")

or set custom schedule function

// run 3 times
w.BySchedule(func(ctx context.Context, j worker.Job) worker.Job {
	return func(ctx context.Context) {
		for i := 0; i < 3; i++ {
			j(ctx)
		}
	}
})

Exclusive jobs

Control concurrent execution around single or multiple instances by lockers

worker.
	New(func(context.Context) {}).
	WithLock(worker.Locker).
	Run(context.Background())

Observe execution time

Collect job execution time metrics

w.SetObserver(func(d float64) {
	fmt.Printf("time elapsed %.3fs", d)
})

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Group

type Group struct {
	// contains filtered or unexported fields
}

Group of workers controlling background jobs execution allows graceful stop all running background jobs

func NewGroup

func NewGroup() *Group

NewGroup yield new workers group

func (*Group) Add

func (g *Group) Add(workers ...*Worker)

Add workers to group, if group runned then start worker immediately

func (*Group) Run

func (g *Group) Run()

Run starting each worker in separate goroutine with wait.Group control

func (*Group) Stop

func (g *Group) Stop()

Stop cancel workers context and wait until all runned workers was completed. Be careful! It can be deadlock if some worker hanging

type Job

type Job func(context.Context)

Job is target background job

type LockFunc

type LockFunc func(context.Context, Job) Job

LockFunc is job wrapper for control exclusive execution

func WithLock

func WithLock(l Locker) LockFunc

WithLock returns func with call Worker in lock

type Locker

type Locker interface {
	// Lock acquire lock for job, returns error when the job should not be started
	Lock() error
	// Unlock release acquired lock
	Unlock()
}

Locker interface

type ObserveFunc

type ObserveFunc func(float64)

ObserveFunc given execution job time duration seconds

type ScheduleFunc

type ScheduleFunc func(context.Context, Job) Job

ScheduleFunc is job wrapper for implement job run schedule

func ByCronSchedule

func ByCronSchedule(schedule string) ScheduleFunc

ByCronSchedule returns job wrapper func for run job by cron schedule using robfig/cron parser for parse cron spec. If schedule spec not valid throw panic, shit happens.

func ByTicker

func ByTicker(period time.Duration) ScheduleFunc

ByTicker returns func which run Worker by ticker each period duration

func ByTimer

func ByTimer(period time.Duration) ScheduleFunc

ByTimer returns job wrapper func for run job each period duration after previous run completed

type Worker

type Worker struct {
	// contains filtered or unexported fields
}

Worker is builder for job with optional schedule and exclusive control

func New

func New(job Job) *Worker

New returns new worker with target job

func (*Worker) ByCronSpec

func (w *Worker) ByCronSpec(spec string) *Worker

ByCronSpec set schedule job wrapper by cron spec

func (*Worker) BySchedule

func (w *Worker) BySchedule(s ScheduleFunc) *Worker

BySchedule set schedule wrapper func for job

func (*Worker) ByTicker

func (w *Worker) ByTicker(period time.Duration) *Worker

ByTicker set schedule ticker job wrapper with period

func (*Worker) ByTimer

func (w *Worker) ByTimer(period time.Duration) *Worker

ByTimer set schedule timer job wrapper with period

func (*Worker) Run

func (w *Worker) Run(ctx context.Context)

Run job, wrap job to metrics, lock and schedule wrappers

func (*Worker) SetImmediately

func (w *Worker) SetImmediately(executeOnRun bool) *Worker

SetImmediately set execute job on Run setting

func (*Worker) SetObserver

func (w *Worker) SetObserver(observe ObserveFunc) *Worker

SetObserver set job duration observer

func (*Worker) WithLock

func (w *Worker) WithLock(l Locker) *Worker

WithLock set job lock wrapper

Directories

Path Synopsis
locker module

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL