pubsub

package module
v1.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 3, 2023 License: MIT Imports: 18 Imported by: 2

README

aws-pub-sub

Package to publish and consumer large messages via AWS SNS.

Documentation

Index

Constants

View Source
const (

	// MESSAGE_TYPE_SNS is used if a message is send via AWS SNS directly.
	MESSAGE_TYPE_SNS string = "SNS"

	// MESSAGE_TYPE_S3 is used if a message is to large for SNS and therefore be uploaded to ans S4 bucket.
	MESSAGE_TYPE_S3 string = "S3"
)
View Source
const MESSAGE_TYPE_ATTRIBUTE string = "MESSAGE_TYPE"

MESSAGE_TYPE_ATTRIBUTE is the message attribute name where message type is placed.

Variables

View Source
var File_message_proto protoreflect.FileDescriptor

Functions

func UnmarshalMessage

func UnmarshalMessage(messageData string, message proto.Message) error

UnmarshalMessage run proto unnarshaller to convert given message data into passed message.

Types

type Consumer

type Consumer interface {

	// Process receives messages for furthe processing.
	Process(messageData string) error
}

Consumer gets messages for processing.

type ExampleMessage

type ExampleMessage struct {

	// Value is content of this test message.
	Value string `protobuf:"bytes,1,opt,name=Value,proto3" json:"Value,omitempty"`
	// Timestamp for testing.
	Timestamp *timestamp.Timestamp `protobuf:"bytes,2,opt,name=Timestamp,proto3" json:"Timestamp,omitempty"`
	// contains filtered or unexported fields
}

ExampleMessage is used for testing.

func (*ExampleMessage) Descriptor deprecated

func (*ExampleMessage) Descriptor() ([]byte, []int)

Deprecated: Use ExampleMessage.ProtoReflect.Descriptor instead.

func (*ExampleMessage) GetTimestamp

func (x *ExampleMessage) GetTimestamp() *timestamp.Timestamp

func (*ExampleMessage) GetValue

func (x *ExampleMessage) GetValue() string

func (*ExampleMessage) ProtoMessage

func (*ExampleMessage) ProtoMessage()

func (*ExampleMessage) ProtoReflect

func (x *ExampleMessage) ProtoReflect() protoreflect.Message

func (*ExampleMessage) Reset

func (x *ExampleMessage) Reset()

func (*ExampleMessage) String

func (x *ExampleMessage) String() string

type LambdaHandler

type LambdaHandler interface {

	// Receive is a AWS lambda event handler to consumer SNS messages.
	Receive(ctx context.Context, snsEvent events.SNSEvent)

	// Subsribe is used to register message consumer for specific topics.
	Subsribe(topicArn string, consumer Consumer)
}

LambdaHandler invoked by messges forwarder from a AWS SNS topic.

type LambdaMessageSubscriber

type LambdaMessageSubscriber struct {
	// contains filtered or unexported fields
}

LambdaMessageSubscriber handles message received from AWS sNS and forwards this messages, depending on topic to registered consumers.

func NewLambdaMessageSubscriber

func NewLambdaMessageSubscriber(logger log.Logger) *LambdaMessageSubscriber

func (*LambdaMessageSubscriber) Receive

func (subsriber *LambdaMessageSubscriber) Receive(ctx context.Context, snsEvent events.SNSEvent)

Receive is a AWS lambda event handler to consumer SNS messages.

func (*LambdaMessageSubscriber) Subsribe

func (subsriber *LambdaMessageSubscriber) Subsribe(topicArn string, consumer Consumer)

Subsribe is used to register message consumer for specific topics.

type LogConsumer

type LogConsumer struct {
	// contains filtered or unexported fields
}

LogConsumer simple log received message.

func NewLogConsumer

func NewLogConsumer(logger log.Logger) *LogConsumer

NewLogConsumer returns a new log consumer which logs received messaged to passed logger.

func (*LogConsumer) Process

func (consumer *LogConsumer) Process(messageData string) error

Process log given message to used logger.

type ObjectId

type ObjectId string

ObjectId is used to identify messages passed to a persistence layer.

type Persistence

type Persistence interface {

	// Upload a message to a persitence location. Returns an object id if passed message has been persisted successful.
	Upload(topic string, message proto.Message) (*ObjectId, error)

	// Download can be used to retrieve a message by given object id.
	Download(id *ObjectId) (proto.Message, error)

	// Type returns type of used persistence layer.
	Type() string
}

Persistence is used to store large message.

type Publisher

type Publisher interface {

	// Send will publish a message.
	Send(topicArn string, message proto.Message) error
}

Publisher sends messages.

func NewSnsPublisher

func NewSnsPublisher(conf config.Config) Publisher

NewSnsPublisher creates a new publisher with default settings.

type SnsPublisher

type SnsPublisher struct {
	// contains filtered or unexported fields
}

SnsPublisher is a client to send messages to AWS SNS. In case a message exceeds SNS message size imit a persitence layer can be used to store such a large message and distribute its id via SNS, only.

func (*SnsPublisher) Send

func (publisher *SnsPublisher) Send(topicArn string, message proto.Message) error

Send publishes passed message to given SNS topic. Is case given message exceeds SNS message size limit it used a perstience layer to store this message and will publish its id, only.

Directories

Path Synopsis
lambda module

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL