tasqueue

package module
v0.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 13, 2023 License: BSD-2-Clause Imports: 13 Imported by: 0

README

taskqueue

Run Tests Go Report Card

Tasqueue is a simple, lightweight distributed job/worker implementation in Go.

This version has been extended from the original with support for batch operations, setting user-controlled meta information and querying.

Installation

go get -u github.com/voidshard/tasqueue

Concepts

  • tasqueue.Broker is a generic interface to enqueue and consume messages from a single queue. Currently supported brokers are redis and nats-jetstream. Note: It is important for the broker (or your enqueue, consume implementation) to guarantee atomicity. ie : Tasqueue does not provide locking capabilities to ensure unique job consumption.
  • tasqueue.Results is a generic interface to store the status and results of jobs. Currently we support postgres and sqlite.
  • tasqueue.Task is a pre-registered job handler. It stores a handler functions which is called to process a job. It also stores callbacks (if set through options), executed during different states of a job.
  • tasqueue.Job represents a unit of work pushed to a queue for consumption. It holds:
    • []byte payload (encoded in any manner, if required)
    • task name used to identify the pre-registed task which will processes the job.
  • tasqueue.Group represents a set of jobs that can be done simultaneously, the group is complete when each job is complete.
  • tasqueue.Chain represents a set of jobs that must be run one after the other, the chain is complete when the last job is complete.

Server

A tasqueue server is the main store that holds the broker and the results interfaces. It also acts as a hub to register tasks.

Server Options

Server options are used to configure the server. Broker & Results are mandatory, while logger and open telemetry provider are optional. Refer to the in-memory example for an open telemetry implementation.

type ServerOpts struct {
	// Mandatory results & broker implementations.
	Broker        Broker
	Results       Results

	// Optional logger and telemetry provider.
	Logger        logf.Logger
	TraceProvider *trace.TracerProvider
}

Usage

package main

import (
	"log"

	"github.com/voidshard/tasqueue"
	rb "github.com/voidshard/tasqueue/brokers/redis"
	rp "github.com/voidshard/tasqueue/results/postgres"
	"github.com/zerodha/logf"
)

func main() {
	lo := logf.New(logf.Opts{})

	broker := rb.New(rb.Options{
		Addrs:    []string{"127.0.0.1:6379"},
		Password: "",
		DB:       0,
	}, lo)
	results := rp.New(rp.Options{
		Host:      "localhost",
		Port:      5432,
		Database:  "tasqueue",
		User:      "postgres",
		Password:  "mysecretpassword",
	}, lo)

	srv, err := tasqueue.NewServer(tasqueue.ServerOpts{
		Broker:        broker,
		Results:       results,
		Logger:        lo,
	})
	if err != nil {
		log.Fatal(err)
	}
}

Task Options

Concurrency is the number of processors run for this task. Queue is the queue to consume for this task. Task options contains callbacks that are executed one a state change.

type TaskOpts struct {
	Concurrency  uint32
	Queue        string
	SuccessCB    func(JobCtx)
	ProcessingCB func(JobCtx)
	RetryingCB   func(JobCtx)
	FailedCB     func(JobCtx)
}

Registering tasks

A task can be registered by supplying a name, handler and options. Jobs can be processed using a task registered by a particular name. A handler is a function with the signature func([]byte, JobCtx) error. It is the responsibility of the handler to deal with the []byte payload in whatever manner (decode, if required).

package tasks

import (
	"encoding/json"

	"github.com/voidshard/tasqueue"
)

type SumPayload struct {
	Arg1 int `json:"arg1"`
	Arg2 int `json:"arg2"`
}

type SumResult struct {
	Result int `json:"result"`
}

// SumProcessor prints the sum of two integer arguements.
func SumProcessor(b []byte, m tasqueue.JobCtx) error {
	var pl SumPayload
	if err := json.Unmarshal(b, &pl); err != nil {
		return err
	}

	rs, err := json.Marshal(SumResult{Result: pl.Arg1 + pl.Arg2})
	if err != nil {
		return err
	}

	return m.Save(rs)
}
srv.RegisterTask("add", tasks.SumProcessor, TaskOpts{Concurrency: 5})

Start server

Start() starts the job consumer and processor. It is a blocking function. It listens for jobs on the queue and spawns processor go routines.

srv.Start(ctx)

Job

A tasqueue job represents a unit of work pushed onto the queue, that requires processing using a registered Task. It holds a []byte payload, a task name (which will process the payload) and various options.

Job Options

// JobOpts holds the various options available to configure a job.
type JobOpts struct {
        Correlation

	Queue      string // default: `tasqueue:tasks`
	MaxRetries uint32 // default: `1`
	Schedule   string // cron schedule for the job
}

// Correlation contains extra values a user can either set or filter by
type Correlation struct {
	// ID constrained to 255 chars
	ID string

	// Arbitrary name constrained to 255 chars
	Name string
}

Creating a job

NewJob returns a job with the supplied payload. It accepts the name of the task, the payload and a list of options.

b, _ := json.Marshal(tasks.SumPayload{Arg1: 5, Arg2: 4})
job, err := tasqueue.NewJob("add", b, tasqueue.JobOpts{})
if err != nil {
	log.Fatal(err)
}

// optionally set correlation fields
job.SetCorrelationID("my-id")
job.SetName("some name")

Enqueuing a job

Once a job is created, it can be enqueued via the server for processing. Calling srv.Enqueue returns a job uuid which can be used to query the status of the job.

uuid, err := srv.Enqueue(ctx, job)
if err != nil {
	log.Fatal(err)
}

Getting a job message

To query the details of a job that was enqueued, we can use srv.GetJob. It returns a JobMessage which contains details related to a job.

jobMsg, err := srv.GetJob(ctx, uuid)
if err != nil {
	log.Fatal(err)
}

Fields available in a JobMessage (embeds Meta):

// Meta contains fields related to a job. These are updated when a task is consumed.
type Meta struct {
        Correlation

	UUID        string
	Status      string
	Queue       string
	Schedule    string
	MaxRetry    uint32
	Retried     uint32
	PrevErr     string
	ProcessedAt time.Time
}

JobCtx

JobCtx is passed to handler functions and callbacks. It can be used to view the job's meta information (JobCtx embeds Meta) and also to save arbitrary results for a job using func (c *JobCtx) Save(b []byte) error

Group

A tasqueue group holds multiple jobs and pushes them all simultaneously onto the queue, the Group is considered successful only if all the jobs finish successfully.

Creating a group

NewGroup returns a Group holding the jobs passed.

var group []tasqueue.Job

for i := 0; i < 3; i++ {
	b, _ := json.Marshal(tasks.SumPayload{Arg1: i, Arg2: 4})
	job, err := tasqueue.NewJob("add", b)
	if err != nil {
			log.Fatal(err)
	}
	group = append(group, job)
}

grp, err := tasqueue.NewGroup(group...)
if err != nil {
	log.Fatal(err)
}

Enqueuing a group

Once a group is created, it can be enqueued via the server for processing. Calling srv.EnqueueGroup returns a group uuid which can be used to query the status of the group.

groupUUID, err := srv.EnqueueGroup(ctx, grp)
if err != nil {
	log.Fatal(err)
}

Getting a group message

To query the details of a group that was enqueued, we can use srv.GetGroup. It returns a GroupMessage which contains details related to a group.

groupMsg, err := srv.GetGroup(ctx, groupUUID)
if err != nil {
	log.Fatal(err)
}

Fields available in a GroupMessage (embeds GroupMeta):

// GroupMeta contains fields related to a group job. These are updated when a task is consumed.
type GroupMeta struct {
        Correlation

	UUID   string
	Status string
	// JobStatus is a map of individual job uuid -> status
	JobStatus map[string]string
}

Chain

A tasqueue chain holds multiple jobs and pushes them one after the other (after a job succeeds), the Chain is considered successful only if the final job completes successfuly.

Creating a chain

NewChain returns a chain holding the jobs passed in the order.

var chain []tasqueue.Job

for i := 0; i < 3; i++ {
	b, _ := json.Marshal(tasks.SumPayload{Arg1: i, Arg2: 4})
	task, err := tasqueue.NewJob("add", b)
	if err != nil {
		log.Fatal(err)
	}
	chain = append(chain, task)
}

chn, err := tasqueue.NewChain(chain...)
if err != nil {
	log.Fatal(err)
}

Enqueuing a chain

Once a chain is created, it can be enqueued via the server for processing. Calling srv.EnqueueChain returns a chain uuid which can be used to query the status of the chain.

chainUUID, err := srv.EnqueueChain(ctx, chn)
if err != nil {
	log.Fatal(err)
}

Getting results of previous job in a chain

A job in the chain can access the results of the previous job in the chain by getting JobCtx.Meta.PrevJobResults. This will contain any job result saved by the previous job by JobCtx.Save().

Getting a chain message

To query the details of a chain that was enqueued, we can use srv.GetChain. It returns a ChainMessage which contains details related to a chian.

chainMsg, err := srv.GetChain(ctx, chainUUID)
if err != nil {
	log.Fatal(err)
}

Fields available in a ChainMessage (embeds ChainMeta):

// ChainMeta contains fields related to a chain job.
type ChainMeta struct {
        Correlation

	UUID string
	// Status of the overall chain
	Status string
	// UUID of the current job part of chain
	JobUUID string
	// List of UUIDs of completed jobs
	PrevJobs []string
}

Result

A result is arbitrary []byte data saved by a handler or callback via JobCtx.Save().

Querying

Functions are exposed to query Jobs, Groups and Chains via passing a set of objects. Results are paginated and a pagination token is returned with each query. An empty string indicates there are no more results.

filter := &tasqueue.Filter{Op: tasqueue.OpIn, Field: tasqueue.FieldCorrelationID, Value: []string{"my-id"}}
jobs, token, err := srv.QueryJobs(context.Background(), token, filter)

Currently you can filter via:

  • uuid
  • status
  • correlation id
  • created at
  • modified at

With operations for:

  • in
  • not in
  • less than
  • greater than

Note that 'in' and 'not in' require the filter 'value' to be a []string, less than and greater than expect an int. Created & modified at are in seconds since the epoch.

Credits

  • @knadh for the logo & feature suggestions

License

BSD-2-Clause-FreeBSD

Documentation

Index

Constants

View Source
const (

	// This is the initial state when a job is pushed onto the broker.
	StatusStarted = "queued"

	// This is the state when a worker has recieved a job.
	StatusProcessing = "processing"

	// The state when a job completes, but returns an error (and all retries are over).
	StatusFailed = "failed"

	// The state when a job completes without any error.
	StatusDone = "successful"

	// The state when a job errors out and is queued again to be retried.
	// This state is analogous to statusStarted.
	StatusRetrying = "retrying"
)
View Source
const (
	DefaultQueue = "tasqueue:tasks"
)

Variables

This section is empty.

Functions

This section is empty.

Types

type Broker

type Broker interface {
	// Enqueue places a task in the queue
	Enqueue(ctx context.Context, msg []byte, queue string) error

	// Consume listens for tasks on the queue and calls processor
	Consume(ctx context.Context, work chan []byte, queue string)
}

Broker is something to manage the job queue(s)

type Chain

type Chain struct {
	Jobs []Job
	Opts ChainOpts
}

func NewChain

func NewChain(j ...Job) (Chain, error)

NewChain() accepts a list of Tasks and creates a chain by setting the onSuccess task of i'th task to (i+1)'th task, hence forming a "chain". It returns the first task (essentially the first node of the linked list), which can be queued normally.

func (*Chain) SetCorrelationID

func (c *Chain) SetCorrelationID(id string)

func (*Chain) SetName

func (c *Chain) SetName(name string)

type ChainMessage

type ChainMessage struct {
	ChainMeta
	Chain *Chain
}

ChainMessage is a wrapper over Chain, containing meta info such as status, uuid. A ChainMessage is stored in the results store.

type ChainMeta

type ChainMeta struct {
	Correlation

	UUID string
	// Status of the overall chain
	Status string
	// UUID of the current job part of chain
	JobUUID string
	// List of UUIDs of completed jobs
	PrevJobs []string
}

ChainMeta contains fields related to a chain job.

type ChainOpts

type ChainOpts struct {
	Correlation
}

ChainOpts holds the various options available to configure a chain.

type Correlation

type Correlation struct {
	// ID constrained to 255 chars
	ID string

	// Arbitrary name constrained to 255 chars
	Name string

	// Kind, set internally to either job, chain or group
	Kind Kind
}

Correlation is user data that can be used to identify group(s) of jobs. These are always optional.

type Field

type Field string
const (
	// FieldID is the systems randomly chosen ID for an object
	FieldID Field = "id"

	// FieldStatus is the status of the object (e.g. "pending", "successful", "failed")
	FieldStatus Field = "status"

	// FieldCorrelationID is the user given ID for an object (ie. SetCorrelationID)
	FieldCorrelationID Field = "correlation_id"

	// FieldTimeCreated is the unix time (seconds) the object was created
	FieldTimeCreated Field = "created_at"

	// FieldTimeModified is the unix time (seconds) the object was last modified
	FieldTimeModified Field = "modified_at"
)

type Filter

type Filter struct {
	Field Field
	Op    Op
	Value interface{}
}

Filter is a (Field, Op, Values) tuple to filter objects by. For example, to filter jobs by status:

Filter{Field: FieldStatus, Op: OpIn, Value: []string{"pending", "successful"}}

Multiple Filters are ANDed together.

type Group

type Group struct {
	Jobs []Job
	Opts GroupOpts
}

func NewGroup

func NewGroup(j ...Job) (Group, error)

NewGroup() accepts a list of jobs and creates a group.

func (*Group) SetCorrelationID

func (t *Group) SetCorrelationID(id string)

func (*Group) SetName

func (t *Group) SetName(name string)

type GroupMessage

type GroupMessage struct {
	GroupMeta
	Group *Group
}

GroupMessage is a wrapper over Group, containing meta info such as status, uuid. A GroupMessage is stored in the results store.

type GroupMeta

type GroupMeta struct {
	Correlation

	UUID   string
	Status string
	// JobStatus is a map of job uuid -> status
	JobStatus map[string]string
}

GroupMeta contains fields related to a group job. These are updated when a task is consumed.

type GroupOpts

type GroupOpts struct {
	Correlation
}

GroupOpts holds the various options available to configure a group.

type Job

type Job struct {
	// If task is successful, the OnSuccess jobs are enqueued.
	OnSuccess *Job
	Task      string
	Payload   []byte

	Opts JobOpts
}

Job represents a unit of work pushed by producers. It is the responsibility of the task handler to unmarshal (if required) the payload and process it in any manner.

func NewJob

func NewJob(handler string, payload []byte, opts JobOpts) (Job, error)

NewJob returns a job with arbitrary payload. It accepts the name of the task, the payload and a list of options.

func (*Job) SetCorrelationID

func (t *Job) SetCorrelationID(id string)

func (*Job) SetName

func (t *Job) SetName(name string)

type JobCtx

type JobCtx struct {
	Meta Meta
	// contains filtered or unexported fields
}

JobCtx is passed onto handler functions. It allows access to a job's meta information to the handler.

func (*JobCtx) Save

func (c *JobCtx) Save(resultData []byte) error

Save() sets arbitrary results for a job in the results store.

type JobMessage

type JobMessage struct {
	Meta
	Job *Job
}

JobMessage is a wrapper over Task, used to transport the task over a broker. It contains additional fields such as status and a UUID.

type JobOpts

type JobOpts struct {
	Correlation

	Queue      string
	MaxRetries uint32
	Schedule   string
}

JobOpts holds the various options available to configure a job.

type Kind

type Kind string
const (
	KindJob   Kind = "job"
	KindChain Kind = "chain"
	KindGroup Kind = "group"
)

type Meta

type Meta struct {
	Correlation

	UUID          string
	OnSuccessUUID string
	Status        string
	Queue         string
	Schedule      string
	MaxRetry      uint32
	Retried       uint32
	PrevErr       string
	ProcessedAt   time.Time

	JobResult      []byte
	PrevJobResults [][]byte
}

Meta contains fields related to a job. These are updated when a task is consumed.

func DefaultMeta

func DefaultMeta(opts JobOpts) Meta

DefaultMeta returns Meta with a UUID and other defaults filled in.

type Op

type Op string

Op is some operation to compare field(s)

const (
	// OpIn is a set membership test
	// Implies Value is []string
	OpIn Op = "in"

	// OpNotIn is a set non-membership test
	// Implies Value is []string
	OpNotIn Op = "not_in"

	// OpLessThan is a less than test
	// Implies Value is an int
	OpLessThan Op = "lt"

	// OpGreaterThan is a greater than test
	// Implies Value is an int
	OpGreaterThan Op = "gt"
)

type Opts

type Opts interface {
	Name() string
	Value() interface{}
}

Opts is an interface to define arbitratry options.

type QueryResult

type QueryResult struct {
	Correlation

	UUID   string
	Status string

	Data []byte
}

type Results

type Results interface {
	Query(ctx context.Context, token string, filters ...*Filter) ([]*QueryResult, string, error)
	Get(ctx context.Context, uuid string) ([]byte, error)
	Set(ctx context.Context, uuid, status string, meta *Correlation, b []byte) error
}

Results handles storing and retrieving results

type Server

type Server struct {
	// contains filtered or unexported fields
}

Server is the main store that holds the broker and the results communication interfaces. It also stores the registered tasks.

func NewServer

func NewServer(o ServerOpts) (*Server, error)

NewServer() returns a new instance of server, with sane defaults.

func (*Server) Enqueue

func (s *Server) Enqueue(ctx context.Context, t Job) (string, error)

Enqueue() accepts a job and returns the assigned UUID. The following steps take place: 1. Converts it into a job message, which assigns a UUID (among other meta info) to the job. 2. Sets the job status as "started" on the results store. 3. Enqueues the job (if the job is scheduled, pushes it onto the scheduler)

func (*Server) EnqueueChain

func (s *Server) EnqueueChain(ctx context.Context, c Chain) (string, error)

func (*Server) EnqueueGroup

func (s *Server) EnqueueGroup(ctx context.Context, t Group) (string, error)

EnqueueGroup() accepts a group and returns the assigned UUID. The following steps take place: 1. Converts it into a group message, which assigns a UUID (among other meta info) to the group. 2. Sets the group status as "started" on the results store. 3. Loops over all jobs part of the group and enqueues the job each job. 4. The job status map is updated with the uuids of each enqueued job.

func (*Server) GetChain

func (s *Server) GetChain(ctx context.Context, uuid string) (ChainMessage, error)

func (*Server) GetGroup

func (s *Server) GetGroup(ctx context.Context, uuid string) (GroupMessage, error)

func (*Server) GetJob

func (s *Server) GetJob(ctx context.Context, uuid string) (JobMessage, error)

GetJob accepts a UUID and returns the job message in the results store. This is useful to check the status of a job message.

func (*Server) QueryChains

func (s *Server) QueryChains(ctx context.Context, token string, filters ...*Filter) ([]*ChainMessage, string, error)

QueryChains accepts a list of filters and returns a list of chains that match the filters. It also returns a token, which can be used to paginate through the results.

func (*Server) QueryGroups

func (s *Server) QueryGroups(ctx context.Context, token string, filters ...*Filter) ([]*GroupMessage, string, error)

QueryGroups accepts a list of filters and returns a list of groups that match the filters. It also returns a token, which can be used to paginate through the results.

func (*Server) QueryJobs

func (s *Server) QueryJobs(ctx context.Context, token string, filters ...*Filter) ([]*JobMessage, string, error)

QueryJobs accepts a list of filters and returns a list of jobs that match the filters. It also returns a token, which can be used to paginate through the results.

func (*Server) RegisterTask

func (s *Server) RegisterTask(name string, fn handler, opts TaskOpts)

RegisterTask maps a new task against the tasks map on the server. It accepts different options for the task (to set callbacks).

func (*Server) Start

func (s *Server) Start(ctx context.Context)

Start() starts the job consumer and processor. It is a blocking function.

type ServerOpts

type ServerOpts struct {
	Broker        Broker
	Results       Results
	Logger        logf.Logger
	TraceProvider *trace.TracerProvider
}

type Task

type Task struct {
	Name string

	Opts TaskOpts
	// contains filtered or unexported fields
}

Task is a pre-registered job handler. It stores the callbacks (set through options), which are called during different states of a job.

type TaskOpts

type TaskOpts struct {
	Concurrency uint32
	Queue       string

	SuccessCB    func(JobCtx)
	ProcessingCB func(JobCtx)
	RetryingCB   func(JobCtx)
	FailedCB     func(JobCtx)
}

Directories

Path Synopsis
brokers
cmd
tq
examples
internal
results

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL