nq

package module
v0.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 30, 2022 License: MIT Imports: 18 Imported by: 1

README

Reliable, Efficient and Cancellable Distributed Task Queue in Go

Go Report Card GoDoc

NQ ( Nats Queue ) is Go package for queuing and processing jobs in background with workers. Based on nats with a focus on cancel-ability of enqueued jobs.

NQ requires nats-server version that supports both jetstream support and key-value store

How does it work?:

Task Queue Figure Task Queue Figure
This package was designed such that a task should always be cancellable by client. Workers can be configured to cancel and quit instantly upon network partision ( eg. disconnect from nats-server ).

Features

Task Options Walkthrough

Watch for updates

( Introduced in v0.3 )

Listen for updates to task metadata

func main() {
	client := nq.NewPublishClient(nq.NatsClientOpt{
		Addr: "nats://127.0.0.1:4222",
	}, nq.NoAuthentcation(),
	)

	defer client.Close()

	bytesPayload1, err := json.Marshal(UrlPayload{Url: "https://httpstat.us/200?sleep=10000"})
	if err != nil {
		panic(err)
	}

	var wg sync.WaitGroup
	task1 := nq.NewTask(QueueDev, bytesPayload1)
	if ack, err := client.Enqueue(task1); err == nil {
		log.Printf("Watching updates queue=%s taskID=%s payload=%s", ack.Queue, ack.ID, ack.Payload)
		wg.Add(1)
		updates, err := client.GetUpdates(ack.ID)
		if err != nil {
			panic(err)
		}
		// listening for updates
		go func() {
			defer wg.Done()

			for {
				msg, ok := <-updates
				if !ok {
					// channel closed
					return
				}
				log.Printf("Change detected, status=%s", msg.GetStatus())
			}
		}()
	} else {
		log.Printf("err=%s", err)
	}
	wg.Wait()
}

2022/08/29 22:17:15 Watching updates queue=scrap-url-dev taskID=yzaKwBIcbGEt8sMGgMJcZ0 payload={"url":"https://httpstat.us/200?sleep=10000"}
2022/08/29 22:17:15 Change detected, status=pending
2022/08/29 22:17:16 Change detected, status=processing
2022/08/29 22:17:28 Change detected, status=completed

Retrying

By default task is submitted for retry, if it returns non-nil error.

// a task that will be retried 2 before being marked as `failed`
taskWithRetry := nq.NewTask("my-queue", bytesPayload, nq.Retry(2))

Custom filtering function for error, to mark task as failed only on specific error. Here if a task fails due to ErrFailedDueToInvalidApiKeys, it will be consider as failure and will be retried

var ErrFailedDueToInvalidApiKeys = errors.New("failed to perform task, invalid api keys")

srv := nq.NewServer(nq.NatsClientOpt{Addr: nats.DefaultURL}, nq.Config{
	IsFailureFn: func(err error) bool {
		return errors.Is(err, ErrFailedDueToInvalidApiKeys)
	},
	ServerName:  nq.GenerateServerName(),
})

Deadline / Timeout for tasks

// a task that executes till time.Now() + 1 hour
taskWithDeadline := nq.NewTask("my-queue", bytesPayload, nq.Deadline(time.Now().Add(time.Hour)), nq.TaskID("deadlineTaskID"))

// a task that executes for 10 minutes
taskWithTimeout := nq.NewTask("my-queue", bytesPayload, nq.Timeout(time.Minute * 10), nq.TaskID("timeoutTaskID"))

Task cancellations

Tasks that are either waiting for execution or being executed on any worker, can be cancelled. Cancellation of a task requires it's taskID.

// Cancel a task by ID
taskSignature := nq.NewTask("my-queue", []byte())
ack, err := client.Enqueue(taskSignature);
client.Cancel(ack.ID)

A Task can handle cancel like so:

func longRunningOperation(ctx context.Context, task *nq.TaskPayload) error {
	if ctx.Err() != nil {
		return ctx.Err()
	}
	for i := 0; i < 1000; i++ {
		timeout := time.Millisecond * 20
		println("sleeping for: ",timeout)
		time.Sleep(timeout)
		if ctx.Err() != nil {
			return ctx.Err()
		}
	}
	return nil
}

NOTE: Successful cancellation depends on task function respecting context.Done().

Automatic Failover

ShutdownOnNatsDisconnect option will shutdown workers and server is connection to nats-server is broken. Useful when tasks being cancellable at all times is required.

Note: When disconnect is observed, workers would stop processing new messages. The workers would be cancelled in shutdownTimeout duration. If any tasks is/are not completed after this, they will be cancelled and still be available in task queue for future / other workers to process.

Auto-shutdown of worker server if at any time server is incapable of respecting a cancel request. Eg. losing connection to nats-server

srv := nq.NewServer(nq.NatsClientOpt{
	Addr: "nats://127.0.0.1:4222",
}, nq.Config{
	ServerName:  nq.GenerateServerName(),
	Concurrency: 2,
	LogLevel:    nq.InfoLevel,
}, nq.ShutdownOnNatsDisconnect(),
)
$ go run examples/simple.go sub
nq: pid=24914 2022/08/21 15:43:45.650999 INFO: Registered queue=scrap-url-dev
nq: pid=24914 2022/08/21 15:43:45.652720 INFO: Started Server@DumbmachinePro-local/24914
nq: pid=24914 2022/08/21 15:43:45.652739 INFO: [*] Listening for messages
nq: pid=24914 2022/08/21 15:43:45.652742 INFO: cmd/ctrl + c to terminate the process
nq: pid=24914 2022/08/21 15:43:45.652744 INFO: cmd/ctrl + z to stop processing new tasks
nq: pid=24914 2022/08/21 15:43:48.363110 ERROR: Disconnected from nats
nq: pid=24914 2022/08/21 15:43:48.363173 INFO: Starting graceful shutdown
nq: pid=24914 2022/08/21 15:43:53.363535 INFO: Waiting for all workers to finish...
nq: pid=24914 2022/08/21 15:43:53.363550 INFO: All workers have finished
nq: pid=24914 2022/08/21 15:43:53.363570 INFO: Exiting

Reconnection

Server can configured to not shutdown and instead try to reconnect to nats, when disconnected.

srv := nq.NewServer(nq.NatsClientOpt{
		Addr:          "nats://127.0.0.1:4222",
		ReconnectWait: time.Second * 5, // controls timeout between reconnects
		MaxReconnects: 100, // controls total number of reconnects before giving up
	}, nq.Config{ServerName:  "local-serv-1"})

If nats-server is up again:

  1. With previous state ( i.e with expected queue data )

    nq: pid=7988 2022/08/22 17:24:44.349815 INFO: Registered queue=scrap-url-dev
    nq: pid=7988 2022/08/22 17:24:44.356378 INFO: Registered queue=another-one
    nq: pid=7988 2022/08/22 17:24:44.356393 INFO: Started Server@DumbmachinePro-local/7988
    nq: pid=7988 2022/08/22 17:24:44.356444 INFO: [*] Listening for messages
    nq: pid=7988 2022/08/22 17:24:44.356455 INFO: cmd/ctrl + c to terminate the process
    nq: pid=7988 2022/08/22 17:24:44.356459 INFO: cmd/ctrl + z to stop processing new tasks
    disconnected from nats
    2022/08/22 22:55:02 reconnection found nats://127.0.0.1:4222
    nq: pid=7988 2022/08/22 17:25:02.860051 INFO: Re-registering subscriptions to nats-server
    nq: pid=7988 2022/08/22 17:25:02.864988 INFO: Registration successful[nats://127.0.0.1:4222]
    disconnected from nats
    
  2. Without previous state If registered queues are not found in nats-server, they will be created

    nq: pid=7998 2022/08/22 17:26:44.349815 INFO: Registered queue=scrap-url-dev
    nq: pid=7998 2022/08/22 17:26:44.356378 INFO: Registered queue=another-one
    nq: pid=7998 2022/08/22 17:26:44.356393 INFO: Started Server@DumbmachinePro-local/7998
    nq: pid=7998 2022/08/22 17:26:44.356444 INFO: [*] Listening for messages
    nq: pid=7998 2022/08/22 17:26:44.356455 INFO: cmd/ctrl + c to terminate the process
    nq: pid=7998 2022/08/22 17:26:44.356459 INFO: cmd/ctrl + z to stop processing new tasks
    disconnected from nats
    2022/08/22 22:57:25 reconnection found nats://127.0.0.1:4222
    nq: pid=7998 2022/08/22 17:27:25.518079 INFO: Re-registering subscriptions to nats-server
    nq: pid=7998 2022/08/22 17:27:25.524895 WARN: stream=scrap-url-dev re-registering
    nq: pid=7998 2022/08/22 17:27:25.542725 INFO: Registered queue=scrap-url-dev
    nq: pid=7998 2022/08/22 17:27:25.543668 WARN: stream=another-one re-registering
    nq: pid=7998 2022/08/22 17:27:25.554961 INFO: Registered queue=another-one
    nq: pid=7998 2022/08/22 17:27:25.555002 INFO: Registration successful[nats://127.0.0.1:4222]
    

Monitoring and Alerting

Refer nats monitoring section and monitoring tool by nats-io

CLI Usage

Install CLI

go install github.com/dumbmachine/nq/tools/nq@latest
  • Cancel task
$ nq -u nats://127.0.0.1:4222 task cancel --id customID
Cancel message sent task=customID
  • Status of task
$ nq -u nats://127.0.0.1:4222 task status --id customID
taskID=customID status=Cancelled
  • Queue stats
$ nq -u nats://127.0.0.1:4222 queue stats --name scrap-url-dev
queue: scrap-url-dev | MessagesPending: 11 | Size: 3025 Bytes

Quickstart

Install NQ library

go get -u github.com/dumbmachine/nq

Make sure you have nats-server running locally or in a container. Example:

docker run --rm -p 4222:4222 --name nats-server -ti nats:latest -js

Now create a client to publish jobs.

// Creating publish client
package main

import (
	"encoding/json"
	"log"

	"github.com/dumbmachine/nq"
)

type Payload struct {
	Url string `json:"url"`
}

func main() {
	client := nq.NewPublishClient(nq.NatsClientOpt{
		Addr: "nats://127.0.0.1:4222",
	}, nq.NoAuthentcation(),
	// see godoc for more options
	)
	defer client.Close()

	bPayload, err := json.Marshal(Payload{Url: "https://httpstat.us/200?sleep=10000"})
	if err != nil {
		log.Println(err)
	}

	taskSig := nq.NewTask("scrap-url-dev", bPayload)
	if ack, err := client.Enqueue(taskSig); err == nil {
		log.Printf("Submitted queue=%s taskID=%s payload=%s", ack.Queue, ack.ID, ack.Payload)
	} else {
		log.Printf("err=%s", err)
	}
}


// creating worker server
package main

import (
	"context"
	"encoding/json"
	"errors"
	"net/http"
	"time"

	"github.com/dumbmachine/nq"
)

type Payload struct {
	Url string `json:"url"`
}

// Processing function
func fetchHTML(ctx context.Context, task *nq.TaskPayload) error {
	var payload Payload
	if err := json.Unmarshal(task.Payload, &payload); err != nil {
		return errors.New("invalid payload")
	}
	client := &http.Client{}
	req, _ := http.NewRequest("GET", payload.Url, nil)
	req = req.WithContext(ctx)
	if _, err := client.Do(req); err != nil {
		return err
	}
	return nil
}

func main() {

	srv := nq.NewServer(nq.NatsClientOpt{
		Addr:          "nats://127.0.0.1:4222",
		ReconnectWait: time.Second * 2,
		MaxReconnects: 100,
	}, nq.Config{
		ServerName:  nq.GenerateServerName(),
		Concurrency: 1,
		LogLevel:    nq.InfoLevel,
	},
	)

	srv.Register("scrap-url-dev", fetchHTML)

	if err := srv.Run(); err != nil {
		panic(err)
	}
}

Note: New messages are fetched from queue in sequencial order of their registration. NQ does not implement any custom priority order for registered queue yet.

To learn more about nq APIs, see godoc

Acknowledgements

Async : Many of the design ideas are taken from async

Documentation

Overview

nq provides a go package to publish/process tasks via nats

Index

Constants

View Source
const (
	// waiting for task to be received by worker
	Pending = iota

	// task is being processed by a worker
	Processing

	// taskFN returns an error
	Failed

	// successfully processed
	Completed

	// cancelled by user
	Cancelled

	// deleted before being run
	Deleted
)

Possible task statuses

Variables

View Source
var ErrCannotCancelDeletedTask = errors.New("deleted task cannot be cancelled") // trying to cancel an already cancelled task
View Source
var ErrFailedToConnect = errors.New("failed to connect to nats client")
View Source
var ErrInvalidTaskPayload = errors.New("invalid task payload") // Happens when malformed data is sent to task-stream
View Source
var ErrNonCancellableState = errors.New("cannot cancel task, in uncancellable state")
View Source
var ErrQueueNotFound = errors.New("nq: queue not found")
View Source
var ErrServerClosed = errors.New("nq: Server closed")
View Source
var ErrServerNameEmpty = errors.New("server name cannot be empty")
View Source
var ErrStreamNotCreated = errors.New("nq: stream not created")
View Source
var ErrTaskIDEmpty = errors.New("nq: task id cannot be empty")
View Source
var ErrTaskNotFound = errors.New("task not found")

Functions

func CancelStreamNameToStreamName

func CancelStreamNameToStreamName(stream, subject string) string

func EncodeTMToJSON

func EncodeTMToJSON(t *TaskMessage) ([]byte, error)

func GenerateServerName

func GenerateServerName() string

Generates a server name, combination of hostname and process id

func NoAuthentcation

func NoAuthentcation() noAuthentication

Connect to nats-server without any authentication

Default

func ShutdownOnNatsDisconnect

func ShutdownOnNatsDisconnect() shutdownOnNatsDisconnect

Shutdown server and workers if connection with nats-server is broken. Results in any executing tasks being cancelled, if not finished in time specified by `shutdownTimeout`. Option is useful when workers should be `cancellable` at all times.

By default, inactive

func StreamNameToCancelStreamName

func StreamNameToCancelStreamName(subject string) string

streamNameToCancelStreamName returns the name of stream responsible for cancellation of tasks in given stream

func StreamNameToDurableStreamName

func StreamNameToDurableStreamName(srvName, stream string) string

Returns a durable name for stream

Helps re-establishing connection to nats-server while maintaining sequence state

func TokenAuthentication

func TokenAuthentication(username, password string) tokenAuthentication

Connect to nats-server using token authentication

Read more: https://docs.nats.io/running-a-nats-service/configuration/securing_nats/auth_intro/tokens

func UserPassAuthentcation

func UserPassAuthentcation(username, password string) uPassAuthentication

Connect to nats-server using username:password pair

Read more: https://docs.nats.io/running-a-nats-service/configuration/securing_nats/auth_intro/username_passwordß

Types

type CancelPayload

type CancelPayload string

type CancellationStore

type CancellationStore struct {
	// contains filtered or unexported fields
}

func NewCancelations

func NewCancelations() CancellationStore

NewCancelations returns a Cancelations instance.

func (*CancellationStore) Add

func (c *CancellationStore) Add(id string, fn context.CancelFunc)

Add adds a new cancel func to the collection.

func (*CancellationStore) Delete

func (c *CancellationStore) Delete(id string)

Delete deletes a cancel func from the collection given an id.

func (*CancellationStore) Get

func (c *CancellationStore) Get(id string) (fn context.CancelFunc, ok bool)

Get returns a cancel func given an id.

type ClientConnectionOption

type ClientConnectionOption interface {
	String() string
	Type() ClientOptionType
	Value() interface{}
}

type ClientOption

type ClientOption struct {
	Timeout              time.Duration //todo
	AuthenticationType   ClientOptionType
	AuthenticationObject interface{}
	NatsOption           []nats.Option
	// Defaults to false
	ShutdownOnNatsDisconnect bool
}

Internal representation of options for nats-server connection

type ClientOptionType

type ClientOptionType int
const (
	// Authentication types
	// TODO: error on using multiple of belwo
	UPassAuthenticationOpt ClientOptionType = iota
	TokenAuthenticationOpt
	NoAuthenticationOpt

	// General options
	ShutdownOnNatsDisconnectOpt
)

type Config

type Config struct {
	// Durable name for this workser. Required to re-establish connection
	// to nats-server
	ServerName string

	// Maximum number of concurrent processing of tasks.
	//
	// If set to a zero or negative value, the number of CPUs usable by the current process is picked.
	Concurrency int

	// Predicate function to determine whether the error returned from a task is an error.
	// If function returns true, Server will retry the task ( bounded by retry-limit set on task )
	//
	// By default, non-nil the function returns true.
	IsFailureFn func(error) bool

	// Logger specifies the logger used by the server instance.
	//
	// go's logger is used by default.
	Logger ilog.Base

	// LogLevel specifies the minimum log level to enable.
	//
	// InfoLevel is used by default.
	LogLevel LogLevel

	// ShutdownTimeout specifies the duration to wait to let workers finish their tasks
	// before forcing them to abort when stopping the server.
	//
	// Defaults to timeout of 5 seconds.
	ShutdownTimeout time.Duration
}

Server config

type Inspector added in v0.3.0

type Inspector struct {
	// contains filtered or unexported fields
}

func NewInspector added in v0.3.0

func NewInspector(broker *NatsBroker) *Inspector

func (*Inspector) AddAnother added in v0.3.0

func (i *Inspector) AddAnother(queue string, sendUpdatesTo chan string) error

func (*Inspector) Queues added in v0.3.0

func (i *Inspector) Queues()

func (*Inspector) Servers added in v0.3.0

func (i *Inspector) Servers()

type ListenUpdates added in v0.3.0

type ListenUpdates struct {
	// contains filtered or unexported fields
}

type LogLevel

type LogLevel int32

LogLevel represents a log level.

const (

	// DebugLevel is the lowest level of logging.
	// Debug logs are intended for debugging and development purposes.
	DebugLevel LogLevel

	// InfoLevel is used for gener al informational log messages.
	InfoLevel

	// WarnLevel is used for undesired but relatively expected events,
	// which may indicate a problem.
	WarnLevel

	// ErrorLevel is used for undesired and unexpected events that
	// the program can recover from.
	ErrorLevel

	// FatalLevel is used for undesired and unexpected events that
	// the program cannot recover from.
	FatalLevel
)

type NatsBroker

type NatsBroker struct {
	// contains filtered or unexported fields
}

func NewNatsBroker

func NewNatsBroker(conf NatsClientOpt, opt ClientOption, natsConnectionClosed chan struct{}, forceReRegister chan struct{}) (*NatsBroker, error)

TODO: Allow users to specify `forceReRegister` as a boolean NewNatsBroker returns a new instance of NatsBroker.

func (*NatsBroker) AddStream

func (n *NatsBroker) AddStream(conf nats.StreamConfig) error

func (*NatsBroker) Cancel

func (n *NatsBroker) Cancel(subject string, id string) (*TaskMessage, error)

func (*NatsBroker) Close

func (n *NatsBroker) Close() error

func (*NatsBroker) ConnectoQueue

func (n *NatsBroker) ConnectoQueue(q *Queue) error

Creates queue stream if not exists

Also create underlying nets-stream for queue and cancel-queue

func (*NatsBroker) DeleteStream

func (n *NatsBroker) DeleteStream(name string) error

func (*NatsBroker) Ping

func (n *NatsBroker) Ping() error

func (*NatsBroker) Publish

func (n *NatsBroker) Publish(subject string, payload []byte) (*nats.PubAck, error)

func (*NatsBroker) PublishWithMeta

func (n *NatsBroker) PublishWithMeta(msg *TaskMessage) (*TaskMessage, error)

func (*NatsBroker) Stats

func (n *NatsBroker) Stats(q *Queue) error

Temporary function that fulfill statistic demands from nq-cli

func (*NatsBroker) Submit

func (n *NatsBroker) Submit(subject string, payload TaskPayload) (*TaskMessage, error)

type NatsClientOpt

type NatsClientOpt struct {
	// nats server address
	Addr string

	// Name for key-value store used to store task metadata
	//
	// Defaults to nq
	DBName string

	// ReconnectWait is an Option to set the wait time between reconnect attempts.
	//
	// Defaults to 10 seconds
	ReconnectWait time.Duration

	// MaxReconnects is an Option to set the maximum number of reconnect attempts.
	//
	// Defaults to 100
	MaxReconnects int
}

NatsClientOpt represent NATS connection configuration option.

type ProcessingFunc

type ProcessingFunc func(context.Context, *TaskPayload) error

Signature for function executed by a worker. `ProcessingFunc` type are be registered to subjects, process messages published by client

type PublishClient

type PublishClient struct {
	// contains filtered or unexported fields
}

Client responsible for interaction with nq tasks

Client is used to enqueue / cancel tasks or fetch metadata for tasks

func NewPublishClient

func NewPublishClient(config NatsClientOpt, opts ...ClientConnectionOption) *PublishClient

NewPublishClient returns a new Client instance, given nats connection options, to interact with nq tasks

func (*PublishClient) Cancel

func (p *PublishClient) Cancel(id string) error

Cancel sends `cancel` request for given task to workers

func (*PublishClient) CancelInQueue

func (p *PublishClient) CancelInQueue(id string, qname string) error

Faster than using `Cancel` method, if queue name is known

func (*PublishClient) Close

func (p *PublishClient) Close() error

Close closes the connection with nats

func (*PublishClient) DeleteQueue

func (p *PublishClient) DeleteQueue(qname string)

Delete a queue

Deletes underlying nats stream assosociated with a queue

func (*PublishClient) Enqueue

func (p *PublishClient) Enqueue(task *Task, opts ...TaskOption) (*TaskMessage, error)

Enqueue can be used to enqueu given task to a queue

Returns TaskMessage and nil error is enqueued successfully, else non-nill error

func (*PublishClient) Fetch

func (p *PublishClient) Fetch(id string) (*TaskMessage, error)

Fetch fetches TaskMessage for given task

func (*PublishClient) GetUpdates added in v0.3.0

func (p *PublishClient) GetUpdates(taskID string) (chan *TaskMessage, error)

GetUpdates can be used get changes to a task's metadata

Returns error if failed to start watching for changes Channel is closed, once task reaches terminal state

func (*PublishClient) Stats

func (p *PublishClient) Stats(queue string) error

type PullAction

type PullAction struct {
	Q            *Queue
	Subscription *nats.Subscription
	Fn           ProcessingFunc
}

type PullStore

type PullStore struct {
	// contains filtered or unexported fields
}

func NewPullStore

func NewPullStore() PullStore

type Queue

type Queue struct {
	// contains filtered or unexported fields
}

Internal `Queue`s represent an abstraction over a nats stream -> subject

func NewQueue

func NewQueue(name string) *Queue

func (*Queue) DurableStream

func (q *Queue) DurableStream(prefix string) string

type ResultHandlerIFACE

type ResultHandlerIFACE interface {
	// Get the result of a task in nats kv store
	Get(id string) (*TaskMessage, error)
	// Set the result of a task in nats kv store
	Set(id string, data []byte) error
	Watch(id string) (chan *TaskMessage, error)
	GetAllKeys(id string, data []byte) ([]string, error)
}

type ResultHandlerNats

type ResultHandlerNats struct {
	// contains filtered or unexported fields
}

func NewResultHandlerNats

func NewResultHandlerNats(name string, js nats.JetStreamContext) *ResultHandlerNats

func (*ResultHandlerNats) Get

func (rn *ResultHandlerNats) Get(id string) (*TaskMessage, error)

func (*ResultHandlerNats) GetAllKeys

func (rn *ResultHandlerNats) GetAllKeys(id string, data []byte) ([]string, error)

Get all keys from nats key-value store

func (*ResultHandlerNats) Set

func (rn *ResultHandlerNats) Set(id string, data []byte) error

func (*ResultHandlerNats) Watch added in v0.3.0

func (rn *ResultHandlerNats) Watch(id string) (chan *TaskMessage, error)

type Server

type Server struct {
	// contains filtered or unexported fields
}

Responsible for task lifecycle management and processing

func NewServer

func NewServer(natsConfig NatsClientOpt, servCfg Config, opts ...ClientConnectionOption) *Server

func (*Server) Register

func (srv *Server) Register(qname string, fn ProcessingFunc)

Subscribe to a stream

func (*Server) Run

func (srv *Server) Run() error

Run starts the task processing and blocks until an os signal to exit the program is received. Once it receives a signal, it gracefully shuts down all active workers and other goroutines to process the tasks.

Run returns any error encountered at server startup time. If the server has already been shutdown, ErrServerClosed is returned.

func (*Server) Shutdown

func (srv *Server) Shutdown()

Shutdown gracefully shuts down the server. It gracefully closes all active workers. The server will wait for active workers to finish processing tasks for duration specified in Config.ShutdownTimeout. If worker didn't finish processing a task during the timeout, the task will be pushed back to Redis.

func (*Server) Start

func (srv *Server) Start() error

Start starts the worker server. Once the server has started, it pulls tasks off queues and starts a worker goroutine for each task and then call Handler to process it. Tasks are processed concurrently by the workers up to the number of concurrency specified in Config.Concurrency.

Start returns any error encountered at server startup time. If the server has already been shutdown, ErrServerClosed is returned.

func (*Server) Stop

func (srv *Server) Stop()

Stop signals the server to stop pulling new tasks off queues. Stop can be used before shutting down the server to ensure that all currently active tasks are processed before server shutdown.

Stop does not shutdown the server, make sure to call Shutdown before exit.

type Task

type Task struct {
	// contains filtered or unexported fields
}

Task is a representation work to be performed by a worker

func NewTask

func NewTask(queue string, payload []byte, opts ...TaskOption) *Task

NewTask returns a new Task given queue and byte payload

TaskOption can be used to configure task processing

type TaskCancellationMessage

type TaskCancellationMessage struct {
	// ID corresponds to task's ID
	ID string

	// StreamName is the name of stream whose subject is handled by this task
	StreamName string
}

type TaskMessage

type TaskMessage struct {

	// Sequence indicates sequence number of message in nats jetstream
	Sequence uint64

	// ID is a unique identifier for each task, used for cancellation.
	ID string

	// . Autofilled
	StreamName string

	//
	Queue string

	// Payload holds data needed to process the task.
	Payload []byte

	// Status indicated status of task execution
	Status int

	// Timeout specifies timeout in seconds.
	// Use zero to indicate no deadline.
	Timeout int64

	// Deadline specifies the deadline for the task in Unix time.
	// Use zero to indicate no deadline.
	Deadline int64

	// CompletedAt is the time the task was processed successfully in Unix time,
	// the number of seconds elapsed since January 1, 1970 UTC.
	//
	// Negative value indicated cancelled.
	// Use zero to indicate no value.
	CompletedAt int64

	// Current retry count
	//
	// autofilled
	CurrentRetry int

	// Total number of retries possible for this task
	MaxRetry int
	// contains filtered or unexported fields
}

func DecodeTMFromJSON

func DecodeTMFromJSON(data []byte) (*TaskMessage, error)

func (*TaskMessage) GetStatus

func (msg *TaskMessage) GetStatus() string

type TaskOption

type TaskOption interface {
	String() string
	Type() TaskOptionType
	Value() interface{}
}

func Deadline

func Deadline(t time.Time) TaskOption

Deadline returns an option to specify the deadline for the given task.

If both Deadline and Timeout options are set, whichever comes earliest will be used.

func Retry

func Retry(n int) TaskOption

Returns an options to specify maximum number of times a task will be retried before being marked as failed.

-ve retry count is assigned defaultRetry ( 0 )

func TaskID

func TaskID(id string) TaskOption

TaskID returns an option to specify the task ID

func Timeout

func Timeout(d time.Duration) TaskOption

Timeout returns an option to specify how long a task can run before being cancelled.

Zero duration means no limit ( math.MaxInt32 )

If both Deadline and Timeout options are set, whichever comes earliest will be used.

type TaskOptionType

type TaskOptionType int
const (
	MaxRetryOpt TaskOptionType = iota
	TaskIDOpt
	// QueueOpt
	TimeoutOpt
	DeadlineOpt
)

type TaskPayload

type TaskPayload struct {
	ID      string
	Payload []byte
}

Directories

Path Synopsis
internal
log
tools module

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL