workerstd

package
v0.7.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 19, 2023 License: MPL-2.0 Imports: 16 Imported by: 0

Documentation

Overview

Package workerstd contains common functions for setting up an asynchronous task worker using gocloud.dev pub/sub.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func RunWithSignalHandler

func RunWithSignalHandler(app *App) (returnErr error)

RunWithSignalHandler runs a worker process described by the App struct in the background, and implements a signal handler in the foreground that traps the INT and TERM signals. When the INT or TERM signal is sent to the process, this will start a graceful shutdown of the worker app, waiting up to ShutdownTimeout duration for all the worker threads to stop processing.

Types

type App

type App struct {
	Broker          *Broker
	Logger          *zap.SugaredLogger
	TaskHandler     TaskHandler
	ShutdownTimeout time.Duration

	// ReceiveTaskFn is called to receive a task from the Sub client. Ideally this is not necessary, but because
	// proto.Message is a pointer type, we can't instantiate the struct without knowing what protobuf message we want to
	// unmarshal to.
	ReceiveTaskFn func(ctx context.Context, subClt *SubClient) (proto.Message, *pubsub.Message, error)

	// CloseFn is called on close. contain Any additional close routine should be handled in the custom close function passed in here.
	CloseFn func() error
}

type Broker

type Broker struct {
	// Engine is the engine of the message broker. Must be one of azuresb or rabbitmq.
	Engine string `mapstructure:"engine"`

	// TopicName is the message queue topic where messages are published. This corresponds to the exchange when using
	// rabbitmq.
	TopicName string `mapstructure:"topic"`

	// ConnectionString is the connection string for connecting to the specific broker. For AzureSB, this is the
	// ServiceBus Namespace FQDN (NAMESPACE.servicebus.windows.net), while for RabbitMQ this is the server URL in the
	// format USERNAME:PASSWORD@HOST:PORT.
	ConnectionString string `mapstructure:"connstring"`

	// ServiceBusSubscriptionName is the Azure ServiceBus Topic Subscription that the worker should consume as. If
	// blank, assume that the topic is an Azure ServiceBus Queue instead of a Topic. This is only used with Azure
	// ServiceBus.
	ServiceBusSubscriptionName string `mapstructure:"subscription"`
}

Broker represents configuration options for the message queue broker used to enqueue tasks for the worker. This can be embedded in a viper compatible config struct.

type PubClient

type PubClient struct {
	// contains filtered or unexported fields
}

func NewPubClient

func NewPubClient(logger *zap.SugaredLogger, broker *Broker, ctx context.Context) (*PubClient, error)

NewPubClient returns an initialized publisher client for the configured broker from the given application config.

func (*PubClient) Close

func (clt *PubClient) Close() error

Close will close all the associated connections of the given publisher client.

func (*PubClient) SendTask

func (clt *PubClient) SendTask(task proto.Message) error

SendTask will send a protobuf encoded message representing a worker task across the open pubsub topic.

type SubClient

type SubClient struct {
	// contains filtered or unexported fields
}

func NewSubClient

func NewSubClient(logger *zap.SugaredLogger, broker *Broker, ctx context.Context) (*SubClient, error)

NewSubClient returns an initialized subscriber client for the configured broker from the given application config.

func (*SubClient) Close

func (clt *SubClient) Close() error

Close will close all the associated connections of the given publisher client.

func (*SubClient) ReceiveTask

func (clt *SubClient) ReceiveTask(ctx context.Context, taskPtr proto.Message) (*pubsub.Message, error)

ReceiveTask will pull a task from the subscription channel and attempt to decode the received message into a task. Note that this will block the thread if there are no messages available in the topic. IMPORTANT: The caller must acknowledge the message once the task is successfully processed, either using Ack or Nack.

type TaskHandler

type TaskHandler interface {
	HandleTaskMsg(proto.Message, *pubsub.Message) error
}

TaskHandler is the interface that task handlers passed to the worker app should implement.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL