pubsub

package
v0.13.2-gc72711f Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 17, 2024 License: Apache-2.0 Imports: 12 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// DefaultPollingInterval between SQS polling attempts.
	DefaultPollingInterval = 5 * time.Second

	// DefaultVisibilityTimeout sets how long SQS will wait for the subscriber to remove the
	// message from the queue.
	// See: https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-visibility-timeout.html
	DefaultVisibilityTimeout = 5 * time.Second

	// DefaultRetries is the number of times we will try to remove the message from the SQS queue
	DefaultRetries = 3
)

Variables

This section is empty.

Functions

func GetQueueUrl

func GetQueueUrl(client *sqs.SQS, topic string) string

GetQueueUrl retrieves from AWS SQS the URL for the queue, given the topic name

Types

type Base64ProtoMarshaler

type Base64ProtoMarshaler struct{}

Base64ProtoMarshaler is a simple implementation of the `ProtoTextMarshaler` interface, that encodes the Protobuf message as a Base64 string.

func (*Base64ProtoMarshaler) MarshalToText

func (m *Base64ProtoMarshaler) MarshalToText(msg proto.Message) (string, error)

func (*Base64ProtoMarshaler) UnmarshalFromText

func (m *Base64ProtoMarshaler) UnmarshalFromText(text string, msg proto.Message) error

type EventsListener

type EventsListener struct {
	// contains filtered or unexported fields
}

An EventsListener will process `EventRequests` in a separate goroutine.

The messages are polled from the `events` channel, and if any error is encountered, error messages are posted on a `notifications` channel for further processing upstream.

func NewEventsListener

func NewEventsListener(options *ListenerOptions) *EventsListener

func (*EventsListener) ListenForMessages

func (listener *EventsListener) ListenForMessages()

func (*EventsListener) PostNotificationAndReportOutcome

func (listener *EventsListener) PostNotificationAndReportOutcome(eventResponse *protos.EventResponse)

func (*EventsListener) SetLogLevel

func (listener *EventsListener) SetLogLevel(level log.LogLevel)

SetLogLevel to implement the log.Loggable interface

type ListenerOptions

type ListenerOptions struct {
	EventsChannel        <-chan protos.EventRequest
	NotificationsChannel chan<- protos.EventResponse
	StatemachinesStore   storage.StoreManager
	ListenersPoolSize    int8
}

ListenerOptions are used to configure an EventsListener at creation and are used to decouple the internals of the listener from its exposed configuration.

type ProtoTextMarshaler

type ProtoTextMarshaler interface {
	MarshalToText(proto.Message) (string, error)
	UnmarshalFromText(string, *proto.Message) error
}

ProtoTextMarshaler is an interface that allows for marshaling and unmarshaling of Protobuf messages to and from text. This is useful when we need to send Protobuf messages as text, for example when using SQS.

type SqsPublisher

type SqsPublisher struct {
	// contains filtered or unexported fields
}

SqsPublisher is a wrapper around the AWS SQS client, and is used to publish messages to provided queues when outcomes are encountered.

func NewSqsPublisher

func NewSqsPublisher(channel <-chan protos.EventResponse, awsUrl *string) *SqsPublisher

NewSqsPublisher will create a new `Publisher` to send error notifications received on the `errorsChannel` to an SQS `dead-letter queue`.

The `awsUrl` is the URL of the AWS SQS service, which can be obtained from the AWS Console, or by the local AWS CLI.

func (*SqsPublisher) Publish

func (s *SqsPublisher) Publish(errorsTopic string)

Publish receives notifications from the SqsPublisher channel, and sends a message to a topic.

func (*SqsPublisher) SetLogLevel

func (s *SqsPublisher) SetLogLevel(level slf4go.LogLevel)

SetLogLevel allows the SqsPublisher to implement the log.Loggable interface

type SqsSubscriber

type SqsSubscriber struct {
	Timeout              time.Duration
	PollingInterval      time.Duration
	MessageRemoveRetries int
	// contains filtered or unexported fields
}

SqsSubscriber is a wrapper around the AWS SQS client, and is used to subscribe to Events. The subscriber will poll the queue for new messages, and will post them on the `events` channel from where an `EventsListener` will process them.

func NewSqsSubscriber

func NewSqsSubscriber(eventsChannel chan<- protos.EventRequest, sqsUrl *string) *SqsSubscriber

NewSqsSubscriber will create a new `Subscriber` to listen to incoming api.Event from a SQS `queue`.

func (*SqsSubscriber) ProcessMessage

func (s *SqsSubscriber) ProcessMessage(msg *sqs.Message, queueUrl *string)

func (*SqsSubscriber) SetLogLevel

func (s *SqsSubscriber) SetLogLevel(level log.LogLevel)

SetLogLevel allows the SqsSubscriber to implement the log.Loggable interface

func (*SqsSubscriber) Subscribe

func (s *SqsSubscriber) Subscribe(topic string, done <-chan interface{})

Subscribe runs until signaled on the Done channel and listens for incoming Events

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL