taskhawk

package module
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 19, 2022 License: Apache-2.0 Imports: 12 Imported by: 0

README

Taskhawk Go

Build Status Go Report Card Godoc codecov

Taskhawk is an async task execution framework (à la celery) that works on AWS and GCP, while keeping things pretty simple and straight forward. Any unbound function can be converted into a Taskhawk task.

Only Go 1.18+ is supported currently.

This project uses semantic versioning.

Quick Start

First, install the library:

go get github.com/cloudchacho/taskhawk-go

If your function takes multiple arguments, convert your function into a "Task" as shown here:

type SendEmailTaskInput struct {...}

func SendEmail(ctx context.Context, input *SendEmailTaskInput) error {
    // send email
}

Tasks may accept input of arbitrary pointer type as long as it's serializable to JSON. Remember to export fields!

Then, define your backend:

settings := aws.Settings{
    AWSAccessKey: <YOUR AWS ACCESS KEY>,
    AWSAccountID: <YOUR AWS ACCOUNT ID>,
    AWSRegion: <YOUR AWS REGION>,
    AWSSecretKey: <YOUR AWS SECRET KEY>,

    Queue: <YOUR TASKHAWK QUEUE>,
}
backend := aws.NewBackend(settings, nil)

Before the task can be dispatched, it would need to be registered, as shown below.

hub := NewHub(Config{...}, backend)
task, err := taskhawk.RegisterTask(hub, "SendEmailTask", SendEmailTask)

And finally, dispatch your task asynchronously:

task.dispatch(&SendEmailTaskInput{...})

Development

Prerequisites

Install go1.18.x

Getting Started

Assuming that you have golang installed, set up your environment like so:


$ cd ${GOPATH}/src/github.com/cloudchacho/taskhawk-go
$ go build
Running tests

$ make test  
# OR
$ go test -tags test ./...

Getting Help

We use GitHub issues for tracking bugs and feature requests.

  • If it turns out that you may have found a bug, please open an issue

Release notes

Current version: v0.2.0-dev

v0.1.0
  • Initial version

Documentation

Overview

Package taskhawk is a replacement for celery that works on AWS and GCP, while keeping things pretty simple and straight forward. Any unbound function can be converted into a Taskhawk task.

For inter-service messaging, see Hedwig: https://cloudchacho.github.io/hedwig/.

Provisioning

Taskhawk works on SQS or Pub/Sub as backing queues. Before you can publish tasks, you need to provision the required infra. This may be done manually, or, preferably, using Terraform. Taskhawk provides tools to make infra configuration easier: see Taskhawk Terraform (https://github.com/cloudchacho/terraform-google-taskhawk) for further details.

Using Taskhawk

If your function takes multiple arguments, convert your function into a "Task" as shown here:

type SendEmailTaskInput struct {...}

func SendEmail(ctx context.Context, input *SendEmailTaskInput) error {
	// send email
}

Tasks may accept input of arbitrary pointer type as long as it's serializable to JSON. Remember to export fields!

Then, define your backend:

settings := aws.Settings{
	AWSAccessKey: <YOUR AWS ACCESS KEY>,
	AWSAccountID: <YOUR AWS ACCOUNT ID>,
	AWSRegion: <YOUR AWS REGION>,
	AWSSecretKey: <YOUR AWS SECRET KEY>,

	Queue: <YOUR TASKHAWK QUEUE>,
}
backend := aws.NewBackend(settings, nil)

Before the task can be dispatched, it would need to be registered, as shown below.

hub := NewHub(Config{...}, backend)
task, err := taskhawk.RegisterTask(hub, "SendEmailTask", SendEmailTask)

And finally, dispatch your task asynchronously:

task.dispatch(ctx, &SendEmailTaskInput{...})

If you want to include a custom header with the message (for example, you can include a request_id field for cross-application tracing), you can set it on the input object (HeadersCarrier interface).

If you want to customize priority, you can do it like so:

task.dispatchWithPriority(ctx, &SendEmailTaskInput{...}, taskhawk.PriorityHigh)

Tasks are held in SQS queue / Pub/Sub subscription until they're successfully executed, or until they fail a configurable number of times. Failed tasks are moved to a Dead Letter Queue, where they're held for 14 days, and may be examined for further debugging.

Priority

Taskhawk provides 4 priority queues to use, which may be customized per task, or per message. For more details, see https://godoc.org/github.com/cloudchacho/taskhawk-go/taskhawk#Priority.

Metadata and Headers

If your input struct satisfies `taskhawk.MetadataSetter` interface, it'll be filled in with the following attributes:

id: task identifier. This represents a run of a task.

priority: the priority this task message was dispatched with.

receipt: SQS receipt for the task. This may be used to extend message visibility if the task is running longer than expected.

timestamp: task dispatch epoch timestamp

version: message format version.

If your input struct satisfies HeadersCarrier interface, it'll be filled with custom Taskhawk that the task was dispatched with.

For a compile-time type assertion check, you may add (in global scope):

var _ taskhawk.MetadataSetter = &SendEmailTaskInput{}
var _ taskhawk.HeadersCarrier = &SendEmailTaskInput{}

This snippet won't consume memory or do anything at runtime.

consumer

A consumer for workers can be started as following:

err := hub.ListenForMessages(ctx, &taskhawk.ListenRequest{...}, backend)

This is a blocking function, so if you want to listen to multiple priority queues, you'll need to run these on separate goroutines.

For more complete code, see examples.

Index

Constants

This section is empty.

Variables

View Source
var ErrRetry = errors.New("Retry error")

ErrRetry should cause the task to retry, but not treat the retry as an error

View Source
var (
	ErrTaskNotFound = errors.New("task not found")
)

ErrTaskNotFound indicates that task was not found

Functions

This section is empty.

Types

type Config added in v0.2.0

type Config struct {
	// Sync changes taskhawk dispatch to synchronous mode. This is similar
	// to Celery's Eager mode and is helpful for integration testing
	Sync bool

	// Instrumenter for the consumer
	Instrumenter Instrumenter

	// GetLogger returns the logger object for given context
	GetLogger GetLoggerFunc
}

Config used to configure Taskhawk Hub

type ConsumerBackend added in v0.2.0

type ConsumerBackend interface {
	// Receive messages from configured queue(s) and provide it through the channel. This should run indefinitely
	// until the context is canceled. Provider metadata should include all info necessary to ack/nack a message.
	// The channel must not be closed by the backend.
	Receive(ctx context.Context, priority Priority, numMessages uint32, visibilityTimeout time.Duration, messageCh chan<- ReceivedMessage) error

	// NackMessage nacks a message on the queue
	NackMessage(ctx context.Context, providerMetadata any) error

	// AckMessage acknowledges a message on the queue
	AckMessage(ctx context.Context, providerMetadata any) error

	// RequeueDLQ re-queues everything in the taskhawk DLQ back into the taskhawk queue
	RequeueDLQ(ctx context.Context, priority Priority, numMessages uint32, visibilityTimeout time.Duration) error
}

ConsumerBackend is used for consuming messages from a transport

type GetLoggerFunc added in v0.2.0

type GetLoggerFunc func(ctx context.Context) Logger

GetLoggerFunc returns the logger object

func LogrusGetLoggerFunc added in v0.2.0

func LogrusGetLoggerFunc(fn func(ctx context.Context) *logrus.Entry) GetLoggerFunc

type HeadersCarrier added in v0.2.0

type HeadersCarrier interface {
	// SetHeaders sets the headers on a task input
	SetHeaders(map[string]string)

	// GetHeaders returns the headers set on a task input
	GetHeaders() map[string]string
}

HeadersCarrier interface needs to be implemented by the input struct if your task needs to get custom headers set during dispatch

type Hub added in v0.2.0

type Hub struct {
	// contains filtered or unexported fields
}

Hub is the central struct used to dispatch Taskhawk tasks / run consumer

func NewHub added in v0.2.0

func NewHub(config Config, backend PublisherBackend) *Hub

NewHub creates a hub

func (*Hub) ListenForMessages added in v0.2.0

func (h *Hub) ListenForMessages(ctx context.Context, request ListenRequest, backend ConsumerBackend) error

ListenForMessages starts a taskhawk listener for the provided message types

Cancelable context may be used to cancel processing of messages

func (*Hub) RequeueDLQ added in v0.2.0

func (h *Hub) RequeueDLQ(ctx context.Context, request ListenRequest, backend ConsumerBackend) error

RequeueDLQ re-queues everything in the taskhawk DLQ back into the taskhawk queue

type Instrumenter added in v0.2.0

type Instrumenter interface {
	// OnReceive is called as soon as possible after a message is received from the backend. Caller must call
	// the returned finalized function when processing for the message is finished (typically done via defer).
	// The context must be replaced with the returned context for the remainder of the operation.
	// This is where a new span must be started.
	OnReceive(ctx context.Context, attributes map[string]string) (context.Context, func())

	// OnTask is called when a message has been received from the backend and decoded
	// This is where span attributes, such as name, may be updated.
	OnTask(ctx context.Context, taskName string)

	// OnDispatch is called right before a message is published. Caller must call
	// the returned finalized function when publishing for the message is finished (typically done via defer).
	// The attributes may be updated to include trace id for downstream consumers.
	OnDispatch(ctx context.Context, taskName string, attributes map[string]string) (context.Context, map[string]string, func())
}

Instrumenter defines the interface for Taskhawk's instrumentation

type ListenRequest

type ListenRequest struct {
	// Priority queue to listen to
	Priority Priority

	// How many messages to fetch at one time
	NumMessages uint32 // default 1

	// How long should the message be hidden from other consumers?
	VisibilityTimeout time.Duration // defaults to queue configuration

	// How many goroutines to spin for processing messages concurrently
	NumConcurrency uint32 // default 1
}

ListenRequest represents a request to listen for messages

type Logger added in v0.2.0

type Logger interface {
	// Error logs an error with a message. `fields` can be used as additional metadata for structured logging.
	// You can generally expect one of these fields to be available: message_sqs_id, message_sns_id.
	// By default fields are logged as a map using fmt.Sprintf
	Error(err error, message string, fields LoggingFields)

	// Warn logs a warn level log with a message. `fields` param works the same as `Error`.
	Warn(err error, message string, fields LoggingFields)

	// Info logs a debug level log with a message. `fields` param works the same as `Error`.
	Info(message string, fields LoggingFields)

	// Debug logs a debug level log with a message. `fields` param works the same as `Error`.
	Debug(message string, fields LoggingFields)
}

Logger represents an logging interface that this library expects

type LoggingFields added in v0.2.0

type LoggingFields map[string]any

type MetadataSetter added in v0.2.0

type MetadataSetter interface {
	// SetID sets the message id
	SetID(string)

	// SetPriority sets the priority a message was dispatched with
	SetPriority(Priority)

	// SetProviderMetadata represents backend provider specific metadata, e.g. AWS receipt, or Pub/Sub ack ID
	// For concrete type of metadata, check the documentation of your backend class
	SetProviderMetadata(any)

	// SetTimestamp sets the message dispatch timestamp
	SetTimestamp(time.Time)

	// SetVersion sets the message schema version
	SetVersion(Version)
}

MetadataSetter interface needs to be implemented by the input struct if your task needs to get metatada ( message id etc)

type Priority

type Priority int

Priority of a task. This may be used to differentiate batch jobs from other tasks for example.

High and low priority queues provide independent scaling knobs for your use-case.

const (
	// PriorityDefault is the default priority of a task if nothing is specified. In most cases,
	// using just the default queue should work fine.
	PriorityDefault Priority = iota // Keep default first so empty values automatically default
	PriorityLow
	PriorityHigh
	// PriorityBulk queue will typically have different monitoring, and may be used for bulk jobs,
	// such as sending push notifications to all users. This allows you to effectively
	// throttle the tasks.
	PriorityBulk
)

Priority for a task

func (Priority) MarshalJSON

func (p Priority) MarshalJSON() ([]byte, error)

MarshalJSON changes Priority to a JSON string

func (*Priority) UnmarshalJSON

func (p *Priority) UnmarshalJSON(b []byte) error

UnmarshalJSON changes priority from a JSON string to Priority

type PublisherBackend added in v0.2.0

type PublisherBackend interface {
	// Publish a message represented by the payload, with specified attributes to the topic with specified priority
	Publish(ctx context.Context, payload []byte, attributes map[string]string, priority Priority) (string, error)
}

PublisherBackend is used to publish messages to a transport

type ReceivedMessage added in v0.2.0

type ReceivedMessage struct {
	Payload          []byte
	Attributes       map[string]string
	ProviderMetadata any
}

ReceivedMessage is the message as received by a transport backend.

type StdLogger added in v0.2.0

type StdLogger struct{}

func (*StdLogger) Debug added in v0.2.0

func (s *StdLogger) Debug(message string, fields LoggingFields)

func (*StdLogger) Error added in v0.2.0

func (s *StdLogger) Error(err error, message string, fields LoggingFields)

func (*StdLogger) Info added in v0.2.0

func (s *StdLogger) Info(message string, fields LoggingFields)

func (*StdLogger) Warn added in v0.2.0

func (s *StdLogger) Warn(err error, message string, fields LoggingFields)

type Task

type Task[T any] struct {
	// contains filtered or unexported fields
}

func RegisterTask added in v0.2.0

func RegisterTask[T any](h *Hub, taskName string, taskFn TaskFn[T]) (Task[T], error)

RegisterTask registers the task to the hub with priority 'default'. Priority may be overridden at dispatch time using `DispatchWithPriority`.

func RegisterTaskWithPriority added in v0.2.0

func RegisterTaskWithPriority[T any](h *Hub, taskName string, taskFn TaskFn[T], defaultPriority Priority) (Task[T], error)

RegisterTaskWithPriority registers the task to the hub with specified priority. This will set the default priority, and may be overridden at dispatch time using `DispatchWithPriority`.

func (Task[T]) Dispatch added in v0.2.0

func (t Task[T]) Dispatch(ctx context.Context, input *T) error

func (Task[T]) DispatchWithPriority added in v0.2.0

func (t Task[T]) DispatchWithPriority(ctx context.Context, input *T, priority Priority) error

type TaskFn added in v0.2.0

type TaskFn[T any] func(ctx context.Context, input *T) error

type Version

type Version string

Version represents the message format version

const (
	// Version10 represents the first version of the message format schema
	Version10 Version = "1.0"

	// CurrentVersion represents the current version of the taskhawk message schema
	CurrentVersion = Version10
)

Directories

Path Synopsis
internal

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL