Documentation ¶
Overview ¶
Package taskhawk is a replacement for celery that works on AWS and GCP, while keeping things pretty simple and straight forward. Any unbound function can be converted into a Taskhawk task.
For inter-service messaging, see Hedwig: https://cloudchacho.github.io/hedwig/.
Provisioning ¶
Taskhawk works on SQS or Pub/Sub as backing queues. Before you can publish tasks, you need to provision the required infra. This may be done manually, or, preferably, using Terraform. Taskhawk provides tools to make infra configuration easier: see Taskhawk Terraform (https://github.com/cloudchacho/terraform-google-taskhawk) for further details.
Using Taskhawk ¶
If your function takes multiple arguments, convert your function into a "Task" as shown here:
type SendEmailTaskInput struct {...} func SendEmail(ctx context.Context, input *SendEmailTaskInput) error { // send email }
Tasks may accept input of arbitrary pointer type as long as it's serializable to JSON. Remember to export fields!
Then, define your backend:
settings := aws.Settings{ AWSAccessKey: <YOUR AWS ACCESS KEY>, AWSAccountID: <YOUR AWS ACCOUNT ID>, AWSRegion: <YOUR AWS REGION>, AWSSecretKey: <YOUR AWS SECRET KEY>, Queue: <YOUR TASKHAWK QUEUE>, } backend := aws.NewBackend(settings, nil)
Before the task can be dispatched, it would need to be registered, as shown below.
hub := NewHub(Config{...}, backend) task, err := taskhawk.RegisterTask(hub, "SendEmailTask", SendEmailTask)
And finally, dispatch your task asynchronously:
task.dispatch(ctx, &SendEmailTaskInput{...})
If you want to include a custom header with the message (for example, you can include a request_id field for cross-application tracing), you can set it on the input object (HeadersCarrier interface).
If you want to customize priority, you can do it like so:
task.dispatchWithPriority(ctx, &SendEmailTaskInput{...}, taskhawk.PriorityHigh)
Tasks are held in SQS queue / Pub/Sub subscription until they're successfully executed, or until they fail a configurable number of times. Failed tasks are moved to a Dead Letter Queue, where they're held for 14 days, and may be examined for further debugging.
Priority ¶
Taskhawk provides 4 priority queues to use, which may be customized per task, or per message. For more details, see https://godoc.org/github.com/cloudchacho/taskhawk-go/taskhawk#Priority.
Metadata and Headers ¶
If your input struct satisfies `taskhawk.MetadataSetter` interface, it'll be filled in with the following attributes:
id: task identifier. This represents a run of a task.
priority: the priority this task message was dispatched with.
receipt: SQS receipt for the task. This may be used to extend message visibility if the task is running longer than expected.
timestamp: task dispatch epoch timestamp
version: message format version.
If your input struct satisfies HeadersCarrier interface, it'll be filled with custom Taskhawk that the task was dispatched with.
For a compile-time type assertion check, you may add (in global scope):
var _ taskhawk.MetadataSetter = &SendEmailTaskInput{} var _ taskhawk.HeadersCarrier = &SendEmailTaskInput{}
This snippet won't consume memory or do anything at runtime.
consumer
A consumer for workers can be started as following:
err := hub.ListenForMessages(ctx, &taskhawk.ListenRequest{...}, backend)
This is a blocking function, so if you want to listen to multiple priority queues, you'll need to run these on separate goroutines.
For more complete code, see examples.
Index ¶
Constants ¶
This section is empty.
Variables ¶
var ErrRetry = errors.New("Retry error")
ErrRetry should cause the task to retry, but not treat the retry as an error
var (
ErrTaskNotFound = errors.New("task not found")
)
ErrTaskNotFound indicates that task was not found
Functions ¶
This section is empty.
Types ¶
type Config ¶ added in v0.2.0
type Config struct { // Sync changes taskhawk dispatch to synchronous mode. This is similar // to Celery's Eager mode and is helpful for integration testing Sync bool // Instrumenter for the consumer Instrumenter Instrumenter // GetLogger returns the logger object for given context GetLogger GetLoggerFunc }
Config used to configure Taskhawk Hub
type ConsumerBackend ¶ added in v0.2.0
type ConsumerBackend interface { // Receive messages from configured queue(s) and provide it through the channel. This should run indefinitely // until the context is canceled. Provider metadata should include all info necessary to ack/nack a message. // The channel must not be closed by the backend. Receive(ctx context.Context, priority Priority, numMessages uint32, visibilityTimeout time.Duration, messageCh chan<- ReceivedMessage) error // NackMessage nacks a message on the queue NackMessage(ctx context.Context, providerMetadata any) error // AckMessage acknowledges a message on the queue AckMessage(ctx context.Context, providerMetadata any) error // RequeueDLQ re-queues everything in the taskhawk DLQ back into the taskhawk queue RequeueDLQ(ctx context.Context, priority Priority, numMessages uint32, visibilityTimeout time.Duration) error }
ConsumerBackend is used for consuming messages from a transport
type GetLoggerFunc ¶ added in v0.2.0
GetLoggerFunc returns the logger object
func LogrusGetLoggerFunc ¶ added in v0.2.0
func LogrusGetLoggerFunc(fn func(ctx context.Context) *logrus.Entry) GetLoggerFunc
type HeadersCarrier ¶ added in v0.2.0
type HeadersCarrier interface { // SetHeaders sets the headers on a task input SetHeaders(map[string]string) // GetHeaders returns the headers set on a task input GetHeaders() map[string]string }
HeadersCarrier interface needs to be implemented by the input struct if your task needs to get custom headers set during dispatch
type Hub ¶ added in v0.2.0
type Hub struct {
// contains filtered or unexported fields
}
Hub is the central struct used to dispatch Taskhawk tasks / run consumer
func NewHub ¶ added in v0.2.0
func NewHub(config Config, backend PublisherBackend) *Hub
NewHub creates a hub
func (*Hub) ListenForMessages ¶ added in v0.2.0
func (h *Hub) ListenForMessages(ctx context.Context, request ListenRequest, backend ConsumerBackend) error
ListenForMessages starts a taskhawk listener for the provided message types
Cancelable context may be used to cancel processing of messages
func (*Hub) RequeueDLQ ¶ added in v0.2.0
func (h *Hub) RequeueDLQ(ctx context.Context, request ListenRequest, backend ConsumerBackend) error
RequeueDLQ re-queues everything in the taskhawk DLQ back into the taskhawk queue
type Instrumenter ¶ added in v0.2.0
type Instrumenter interface { // OnReceive is called as soon as possible after a message is received from the backend. Caller must call // the returned finalized function when processing for the message is finished (typically done via defer). // The context must be replaced with the returned context for the remainder of the operation. // This is where a new span must be started. OnReceive(ctx context.Context, attributes map[string]string) (context.Context, func()) // OnTask is called when a message has been received from the backend and decoded // This is where span attributes, such as name, may be updated. OnTask(ctx context.Context, taskName string) // OnDispatch is called right before a message is published. Caller must call // the returned finalized function when publishing for the message is finished (typically done via defer). // The attributes may be updated to include trace id for downstream consumers. OnDispatch(ctx context.Context, taskName string, attributes map[string]string) (context.Context, map[string]string, func()) }
Instrumenter defines the interface for Taskhawk's instrumentation
type ListenRequest ¶
type ListenRequest struct { // Priority queue to listen to Priority Priority // How many messages to fetch at one time NumMessages uint32 // default 1 // How long should the message be hidden from other consumers? VisibilityTimeout time.Duration // defaults to queue configuration // How many goroutines to spin for processing messages concurrently NumConcurrency uint32 // default 1 }
ListenRequest represents a request to listen for messages
type Logger ¶ added in v0.2.0
type Logger interface { // Error logs an error with a message. `fields` can be used as additional metadata for structured logging. // You can generally expect one of these fields to be available: message_sqs_id, message_sns_id. // By default fields are logged as a map using fmt.Sprintf Error(err error, message string, fields LoggingFields) // Warn logs a warn level log with a message. `fields` param works the same as `Error`. Warn(err error, message string, fields LoggingFields) // Info logs a debug level log with a message. `fields` param works the same as `Error`. Info(message string, fields LoggingFields) // Debug logs a debug level log with a message. `fields` param works the same as `Error`. Debug(message string, fields LoggingFields) }
Logger represents an logging interface that this library expects
type LoggingFields ¶ added in v0.2.0
type MetadataSetter ¶ added in v0.2.0
type MetadataSetter interface { // SetID sets the message id SetID(string) // SetPriority sets the priority a message was dispatched with SetPriority(Priority) // SetProviderMetadata represents backend provider specific metadata, e.g. AWS receipt, or Pub/Sub ack ID // For concrete type of metadata, check the documentation of your backend class SetProviderMetadata(any) // SetTimestamp sets the message dispatch timestamp SetTimestamp(time.Time) // SetVersion sets the message schema version SetVersion(Version) }
MetadataSetter interface needs to be implemented by the input struct if your task needs to get metatada ( message id etc)
type Priority ¶
type Priority int
Priority of a task. This may be used to differentiate batch jobs from other tasks for example.
High and low priority queues provide independent scaling knobs for your use-case.
const ( // PriorityDefault is the default priority of a task if nothing is specified. In most cases, // using just the default queue should work fine. PriorityDefault Priority = iota // Keep default first so empty values automatically default PriorityLow PriorityHigh // PriorityBulk queue will typically have different monitoring, and may be used for bulk jobs, // such as sending push notifications to all users. This allows you to effectively // throttle the tasks. PriorityBulk )
Priority for a task
func (Priority) MarshalJSON ¶
MarshalJSON changes Priority to a JSON string
func (*Priority) UnmarshalJSON ¶
UnmarshalJSON changes priority from a JSON string to Priority
type PublisherBackend ¶ added in v0.2.0
type PublisherBackend interface { // Publish a message represented by the payload, with specified attributes to the topic with specified priority Publish(ctx context.Context, payload []byte, attributes map[string]string, priority Priority) (string, error) }
PublisherBackend is used to publish messages to a transport
type ReceivedMessage ¶ added in v0.2.0
ReceivedMessage is the message as received by a transport backend.
type StdLogger ¶ added in v0.2.0
type StdLogger struct{}
func (*StdLogger) Debug ¶ added in v0.2.0
func (s *StdLogger) Debug(message string, fields LoggingFields)
func (*StdLogger) Error ¶ added in v0.2.0
func (s *StdLogger) Error(err error, message string, fields LoggingFields)
func (*StdLogger) Info ¶ added in v0.2.0
func (s *StdLogger) Info(message string, fields LoggingFields)
type Task ¶
type Task[T any] struct { // contains filtered or unexported fields }
func RegisterTask ¶ added in v0.2.0
RegisterTask registers the task to the hub with priority 'default'. Priority may be overridden at dispatch time using `DispatchWithPriority`.
func RegisterTaskWithPriority ¶ added in v0.2.0
func RegisterTaskWithPriority[T any](h *Hub, taskName string, taskFn TaskFn[T], defaultPriority Priority) (Task[T], error)
RegisterTaskWithPriority registers the task to the hub with specified priority. This will set the default priority, and may be overridden at dispatch time using `DispatchWithPriority`.