sqstransport

package module
v0.5.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 13, 2021 License: MIT Imports: 7 Imported by: 0

README

PkgGoDev

sqstransport

This package contains a go-kit transport implementation for AWS SQS.

sub := New(
    WithBefore(...),
    WithBefore(...),
    UseHandler(...),        // handle the request,
    UseDecodeRequest(...),  // decode the incoming message into an endpoint request object,
    UseResponseHandler(...),
    UseResponseHandler(...),
    UseInputFactory(...),   // create a *sqs.ReceiveMessageInput instance,
    WithBaseContext(...),   // used for processing each new message
    WithErrorHandler(...),
)

go func() { _ = sub.Serve(client) }()

Documentation

Index

Examples

Constants

This section is empty.

Variables

View Source
var (
	ErrAlreadyStarted = errors.New("already started")
)

Functions

This section is empty.

Types

type AfterBatchFunc added in v0.2.0

type AfterBatchFunc func(ctx context.Context)

type Client

type Client interface {
	ReceiveMessage(ctx context.Context,
		params *sqs.ReceiveMessageInput, optFns ...func(*sqs.Options)) (*sqs.ReceiveMessageOutput, error)
}

type DecodeRequestFunc

type DecodeRequestFunc func(context.Context, types.Message) (request interface{}, err error)

type DecoderError added in v0.4.0

type DecoderError struct {
	Err error
	Msg types.Message
}

func (*DecoderError) Error added in v0.4.0

func (obj *DecoderError) Error() string

type HandlerError added in v0.3.0

type HandlerError struct {
	Err     error
	Request interface{}
	Msg     types.Message
}

HandlerError is used for triggering the error handler when the handler returns an error.

func (*HandlerError) Error added in v0.3.0

func (obj *HandlerError) Error() string

type InputFactory added in v0.5.0

type InputFactory func() (params *sqs.ReceiveMessageInput, optFns []func(*sqs.Options))

type Option added in v0.5.0

type Option func(*Subscriber)

func UseDecodeRequest added in v0.5.0

func UseDecodeRequest(decodeRequest DecodeRequestFunc) Option

UseDecodeRequest sets the decoder function which is required.

func UseHandler added in v0.5.0

func UseHandler(handler endpoint.Endpoint) Option

UseHandler sets the handler function which is required.

func UseInputFactory added in v0.5.0

func UseInputFactory(inputFactory InputFactory) Option

UseInputFactory sets the InputFactory function which is required. It must return a non-nil params. It can return nil for optFns.

func UseResponseHandler added in v0.5.0

func UseResponseHandler(responseHandler ...ResponseFunc) Option

UseResponseHandler adds a ResponseHandler function which is required. Can be called multiple times. Any actions required after executing handler can take place here. Like deleting the message after being successfully processed.

func WithAfterBatch added in v0.5.0

func WithAfterBatch(afterBatch AfterBatchFunc) Option

WithAfterBatch adds an AfterBatch function which is optional. It is called after a batch of messages passed to the Runner.

func WithBaseContext added in v0.5.0

func WithBaseContext(ctxFac func() context.Context) Option

WithBaseContext sets the base context for the subscriber. If not provided, will be context.Background.

func WithBefore added in v0.5.0

func WithBefore(before ...RequestFunc) Option

WithBefore adds RequestFunc which is optional. Can be called multiple times. Can be used for starting a keep-in-flight hearbeat - an example. They run before DecodeRequest and can put additional data inside the context. If returns a nil context, it causes a panic.

func WithErrorHandler added in v0.5.0

func WithErrorHandler(errorHandler transport.ErrorHandler) Option

WithErrorHandler sets the error handler which is optional.

func WithRunner added in v0.5.0

func WithRunner(runner Runner) Option

WithRunner sets the Runner. If not provided, the default runner will be used. All the Befor functions, decoding the message, handling the message and handling the response are executed by the Runner.

type RequestFunc

type RequestFunc func(context.Context, types.Message) context.Context

type ResponseFunc added in v0.3.0

type ResponseFunc func(ctx context.Context, msg types.Message, response interface{}) context.Context

type Runner

type Runner interface {
	Run(func())
}

type Subscriber

type Subscriber struct {
	// contains filtered or unexported fields
}

Subscriber is a go-kit sqs transport. Before, DecodeRequest, Handler, and ResponseHandler run inside the same anonymous function. This anonymous function creates a context, which uses the BaseContext as the parent, and is canceled when the function execution is finished. This anonymous function is passed to the runner. AfterBatch is run after each batch of messages.

Example
client := mockClient10msec()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

handled := make(chan struct{})
sut := New(
	UseHandler(func(ctx context.Context, request interface{}) (response interface{}, err error) {
		select {
		case <-handled:
			return nil, nil
		default:
		}

		defer close(handled)

		// processing the request inside the endpoint
		fmt.Println(request)

		return nil, nil
	}),
	UseInputFactory(defaultInputFactory),
	UseDecodeRequest(nopDecodeRequest),
	UseResponseHandler(nopResponseHandler...),
)

go func() { _ = sut.Serve(ctx, client) }()

<-handled
Output:

MSG-1

func New added in v0.5.0

func New(options ...Option) *Subscriber

New returns a new Subscriber. Mandatory options start with "Use...". (maybe they should be explicit, might change later ¯\_(ツ)_/¯)

func (*Subscriber) Serve

func (obj *Subscriber) Serve(ctx context.Context, l Client) error

Serve starts receiving messages from the queue and calling the handler on each.

func (*Subscriber) Shutdown

func (obj *Subscriber) Shutdown()

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL