endpoint

package
v0.5.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 3, 2022 License: MIT Imports: 12 Imported by: 0

Documentation

Overview

Package endpoint contains components that are composed to configure an 'endpoint', which can send and receive messages to other endpoints.

Index

Constants

View Source
const DefaultTimeout = 5 * time.Second

DefaultTimeout is the default timeout duration to use if none is given.

Variables

View Source
var DefaultRetryPolicy = NewExponentialBackoffPolicy(
	3,
	0,
	1*time.Second,
	1*time.Hour,
)

DefaultRetryPolicy is the default RetryPolicy.

It allows for 3 immediate attempts, after which each attempt is delayed exponentially, for a maximum of 10 attempts before the message is rejected.

View Source
var DefaultValidators = []Validator{
	&SelfValidator{},
}

DefaultValidators is the set of validators used to validate outgoing messages if no other set of validators is configured on the endpoint.

Functions

func WithEnvelope

func WithEnvelope(p context.Context, env InboundEnvelope) context.Context

WithEnvelope returns a new context derived from p that contains env. The envelope can be retreived from the context with GetEnvelope().

Types

type Acknowledger

type Acknowledger interface {
	// Ack acknowledges the message, indicating that is was handled successfully
	// and does not need to be retried.
	Ack(ctx context.Context) error

	// Retry requeues the message so that it is retried at some point in the
	// future.
	//
	// d is a hint as to how long the transport should wait before retrying
	// this message.
	Retry(ctx context.Context, err error, d time.Duration) error

	// Reject indicates that the message could not be handled and should not be
	// retried. Depending on the transport, this may move the message to some form
	// of error queue or otherwise drop the message completely.
	Reject(ctx context.Context, err error) error
}

Acknowledger is an interface for acknowledging a specific inbound message.

type AttemptID

type AttemptID struct {
	ident.ID
}

AttemptID uniquely identifies an attempt to process a message.

func GenerateAttemptID

func GenerateAttemptID() AttemptID

GenerateAttemptID generates a new unique identifier for a processing attempt.

func MustParseAttemptID

func MustParseAttemptID(s string) AttemptID

MustParseAttemptID parses s into an attempt ID and returns it. It panics if s is empty.

func ParseAttemptID

func ParseAttemptID(s string) (AttemptID, error)

ParseAttemptID parses s into an attempt ID and returns it. It returns an error if s is empty.

type BufferedSink

type BufferedSink struct {
	// contains filtered or unexported fields
}

BufferedSink is a MessageSink that buffers message envelopes in memory.

func (*BufferedSink) Accept

func (s *BufferedSink) Accept(ctx context.Context, env OutboundEnvelope) error

Accept buffers env in memory.

func (*BufferedSink) Envelopes

func (s *BufferedSink) Envelopes() []OutboundEnvelope

Envelopes returns the message envelopes that have been buffered.

func (*BufferedSink) Reset

func (s *BufferedSink) Reset()

Reset removes the buffered message envelopes.

func (*BufferedSink) TakeEnvelopes

func (s *BufferedSink) TakeEnvelopes() []OutboundEnvelope

TakeEnvelopes returns the message envelopes that have been buffered and resets the sink in a single operation.

type Endpoint

type Endpoint struct {
	Name              string
	OutboundTransport OutboundTransport
	InboundTransport  InboundTransport
	InboundPipeline   InboundPipeline
	OutboundPipeline  OutboundPipeline
	RetryPolicy       RetryPolicy
	SenderValidators  []Validator
	Tracer            opentracing.Tracer
	// contains filtered or unexported fields
}

Endpoint is a named source and recipient of messages.

func (*Endpoint) NewSender

func (ep *Endpoint) NewSender(ctx context.Context) (ax.Sender, error)

NewSender returns an ax.Sender that can be used to send messages from this endpoint.

func (*Endpoint) StartReceiving

func (ep *Endpoint) StartReceiving(ctx context.Context) error

StartReceiving processes inbound messages until an error occurrs or ctx is canceled.

type InboundEnvelope

type InboundEnvelope struct {
	ax.Envelope

	// SourceEndpoint is the endpoint that sent the message.
	SourceEndpoint string

	// AttemptID uniquely identifies the attempt to process this message.
	AttemptID AttemptID

	// AttemptCount is the number of times that an attempt has been made to process
	// this message.
	//
	// Messages may be retried after a failure handling the message, or if
	// an endpoint crashes, for example. Not all transports support an attempt
	// count. If the attempt count is unknown, it is set to zero.
	//
	// The attempt count may be reset if a message is manually re-queued after
	// being rejected by the retry policy.
	AttemptCount uint

	// SpanContext is the tracing context that was propagated with the message.
	SpanContext opentracing.SpanContext
}

InboundEnvelope is a specialization of ax.Envelope for messages that are received by an endpoint.

Inbound envelopes traverse an InboundPipeline.

func GetEnvelope

func GetEnvelope(ctx context.Context) (env InboundEnvelope, ok bool)

GetEnvelope returns the message envelope contained in ctx. If ctx does not contain an envelope then ok is false.

type InboundPipeline

type InboundPipeline interface {
	// Initialize is called during initialization of the endpoint, after the
	// transport is initialized. It can be used to inspect or further
	// configure the endpoint as per the needs of the pipeline.
	Initialize(ctx context.Context, ep *Endpoint) error

	// Accept forwards an inbound message through the pipeline until
	// it is handled by some application-defined message handler(s).
	Accept(ctx context.Context, sink MessageSink, env InboundEnvelope) error
}

InboundPipeline is an interface for a message pipeline that processes messages received from the message transport.

A "stage" within the pipeline is simply an implementation of the InboundPipeline interface that forwards messages to another pipeline.

type InboundRejecter

type InboundRejecter struct {
	Validators []Validator
	Next       InboundPipeline
}

InboundRejecter is an inbound pipeline stage that validates messages before forwarding them to the next pipeline stage. It uses a set of validators distinct from those configured in the endpoint.

func (*InboundRejecter) Accept

func (i *InboundRejecter) Accept(
	ctx context.Context,
	sink MessageSink,
	env InboundEnvelope,
) error

Accept forwards an inbound message to the next pipeline stage only if it is successfully validated.

func (*InboundRejecter) Initialize

func (i *InboundRejecter) Initialize(
	ctx context.Context,
	ep *Endpoint,
) error

Initialize is called during initialization of the endpoint, after the transport is initialized. It can be used to inspect or further configure the endpoint as per the needs of the pipeline.

type InboundTransport

type InboundTransport interface {
	// Initialize sets up the transport to communicate as an endpoint named ep.
	Initialize(ctx context.Context, ep string) error

	// Subscribe configures the transport to listen to messages of type mt that are
	// sent using op.
	Subscribe(ctx context.Context, op Operation, mt ax.MessageTypeSet) error

	// Receive returns the next message sent to this endpoint.
	// It blocks until a message is available, or ctx is canceled.
	Receive(ctx context.Context) (InboundEnvelope, Acknowledger, error)
}

InboundTransport is an interface for receiving messages from endpoints.

type MessageSink

type MessageSink interface {
	// Accept processes the message encapsulated in env.
	Accept(ctx context.Context, env OutboundEnvelope) error
}

MessageSink is an interface that accepts outbound message envelopes as input.

type Operation

type Operation int

Operation is an enumeration of transport operations that can be performed in order to send an outbound message.

const (
	// OpSendUnicast is an outbound transport operation that sends a message to
	// a specific endpoint as determined by the outbound message's
	// DestinationEndpoint property.
	OpSendUnicast Operation = iota

	// OpSendMulticast is an outbound transport operation that sends a message
	// to all of its subscribers.
	OpSendMulticast
)

type OutboundEnvelope

type OutboundEnvelope struct {
	ax.Envelope

	// Operation is the operation to be performed on the message. It dictates
	// how the message is sent by the transport.
	Operation Operation

	// DestinationEndpoint is the endpoint to which the message is sent when
	// Operation is OpSendUnicast. The field is ignored for other operations.
	DestinationEndpoint string

	// SpanContext is the tracing context to propagate with the message.
	SpanContext opentracing.SpanContext
}

OutboundEnvelope is a specialization of ax.Envelope for messages that are sent by an endpoint.

Outbound envelopes traverse an OutboundPipeline.

type OutboundPipeline

type OutboundPipeline interface {
	MessageSink

	// Initialize is called during initialization of the endpoint, after the
	// transport is initialized. It can be used to inspect or further
	// configure the endpoint as per the needs of the pipeline.
	Initialize(ctx context.Context, ep *Endpoint) error
}

OutboundPipeline is an interface for a message pipeline that processes messages that are sent via the message transport.

A "stage" within the pipeline is simply an implementation of the OutboundPipeline interface that forwards messages to another pipeline.

type OutboundRejecter

type OutboundRejecter struct {
	Validators []Validator
	Next       OutboundPipeline
}

OutboundRejecter is an outbound pipeline stage that validates messages before forwarding them to the next pipeline stage. It uses a set of validators distinct from those configured in the endpoint.

func (*OutboundRejecter) Accept

func (o *OutboundRejecter) Accept(
	ctx context.Context,
	env OutboundEnvelope,
) error

Accept forwards an outbound message to the next pipeline stage only if it is successfully validated.

func (*OutboundRejecter) Initialize

func (o *OutboundRejecter) Initialize(
	ctx context.Context,
	ep *Endpoint,
) error

Initialize is called during initialization of the endpoint, after the transport is initialized. It can be used to inspect or further configure the endpoint as per the needs of the pipeline.

type OutboundTracer

type OutboundTracer struct {
	Tracer opentracing.Tracer
	Next   OutboundPipeline
}

OutboundTracer is an implementation of OutboundPipeline that starts a new OpenTracing span before forwarding to the next stage.

func (OutboundTracer) Accept

Accept processes the message encapsulated in env.

func (OutboundTracer) Initialize

func (s OutboundTracer) Initialize(ctx context.Context, ep *Endpoint) error

Initialize is called during initialization of the endpoint, after the transport is initialized. It can be used to inspect or further configure the endpoint as per the needs of the pipeline.

type OutboundTransport

type OutboundTransport interface {
	// Initialize sets up the transport to communicate as an endpoint named ep.
	Initialize(ctx context.Context, ep string) error

	// Send sends env via the transport.
	Send(ctx context.Context, env OutboundEnvelope) error
}

OutboundTransport is an interface for sending messages to endpoints.

type RetryPolicy

type RetryPolicy func(InboundEnvelope, error) (time.Duration, bool)

RetryPolicy is a function responsible for determining whether or not a message should be retried.

It returns the delay that should occur before retrying, and a bool indicating whether or not the message should be retried at all.

func NewExponentialBackoffPolicy

func NewExponentialBackoffPolicy(
	ir, mr uint,
	bt, mt time.Duration,
) RetryPolicy

NewExponentialBackoffPolicy returns a retry policy that allows a fixed number of immediate attempts after which retries are delayed exponentially for a until some maximum delay is reached.

Optionally, the message can be rejected after some fixed number of retries.

ir is the number of immediate attempts. mr is the maximum total attempts before rejecting the message. If mr is zero, the message is retried indefinitely.

bt is a "base" delay between retries. It is used as a multplier for the backoff duration. mt is the maximum delay between retries.

type SelfValidatingMessage

type SelfValidatingMessage interface {
	ax.Message

	// Validate returns a non-nil error if the message is invalid. It is up to
	// message implementation to check validity criteria.
	//
	// This method is invoked by SelfValidator that is one the default
	// validators to verify the message.
	Validate() error
}

SelfValidatingMessage is a message that can perform its own superficial validation.

type SelfValidator

type SelfValidator struct{}

SelfValidator is one of the default message validators that validates the message if it implements SelfValidatingMessage interface.

func (SelfValidator) Validate

func (SelfValidator) Validate(
	ctx context.Context,
	m ax.Message,
) error

Validate validates m if it implements SelfValidatingMessage. It returns the error returned by m.Validate(). If m does not implement SelfValidatingMessage then no validation is performed and nil is returned.

type SinkSender

type SinkSender struct {
	Sink       MessageSink
	Validators []Validator
}

SinkSender is an implementation of ax.Sender that passes messages to a message sink.

func (SinkSender) ExecuteCommand

func (s SinkSender) ExecuteCommand(
	ctx context.Context,
	m ax.Command,
	opts ...ax.ExecuteOption,
) (ax.Envelope, error)

ExecuteCommand sends a command message.

If ctx contains a message envelope, m is sent as a child of the message in that envelope.

func (SinkSender) PublishEvent

func (s SinkSender) PublishEvent(
	ctx context.Context,
	m ax.Event,
	opts ...ax.PublishOption,
) (ax.Envelope, error)

PublishEvent sends an event message.

If ctx contains a message envelope, m is sent as a child of the message in that envelope.

type TimeLimiter

type TimeLimiter struct {
	Timeout time.Duration
	Next    InboundPipeline
}

TimeLimiter is an inbound pipeline that sets a context timeout before forwarding on to the next stage.

func (TimeLimiter) Accept

func (tl TimeLimiter) Accept(ctx context.Context, sink MessageSink, env InboundEnvelope) error

Accept forwards an inbound message through the pipeline until it is handled by some application-defined message handler(s).

func (TimeLimiter) Initialize

func (tl TimeLimiter) Initialize(ctx context.Context, ep *Endpoint) error

Initialize is called during initialization of the endpoint, after the transport is initialized. It can be used to inspect or further configure the endpoint as per the needs of the pipeline.

type TransportStage

type TransportStage struct {
	// contains filtered or unexported fields
}

TransportStage is an outbound pipeline stage that forwards messages to a transport. It is typically used as the last stage in an outbound pipeline.

func (*TransportStage) Accept

func (s *TransportStage) Accept(ctx context.Context, env OutboundEnvelope) error

Accept sends env via the transport.

func (*TransportStage) Initialize

func (s *TransportStage) Initialize(ctx context.Context, ep *Endpoint) error

Initialize is called during initialization of the endpoint, after the transport is initialized. It can be used to inspect or further configure the endpoint as per the needs of the pipeline.

type Validator

type Validator interface {
	// Validate checks if m is valid.
	//
	// It returns a non-nil error if the message is invalid. The meaning of
	// 'valid' in is implementation-defined.
	Validate(ctx context.Context, m ax.Message) error
}

Validator is an interface for validating messages.

Application-defined validators can be implemented to provide superficial and domain validation. Each endpoint has a set of validators that are used to validate outgoing messages. Additionally, the validation.InboundRejecter and validation.OutboundRejecter can be used to perform message validation at any point in a pipeline.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL