goevents

package module
v0.0.0-...-29f8316 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 2, 2021 License: Apache-2.0 Imports: 6 Imported by: 0

README

Go channels for distributed systems

Benefits:

  • Use Go channels transparently over a messaging technology of your choice
  • Write idiomatic Go code instead of using vendor specific APIs
  • Write simple and independent unit test (no need to mock interfaces or running the queue technology)

Usage

The following code receives added-wishlist items from one queue and send recommendations to another queue.

wishlistItems, _ := transport.Receive("added-wishlist-items")
recommendations, _ := transport.Send("recommendations")

for received := range wishlistItems {
        
        // Channel to subscribe to the result of sending event 
        sendResult := make(chan error, 1)

        recommendations <- goevents.EventEnvelop{
            Result: sendResult,
            Event:  goevents.JsonEvent(wishlistItem.Context(), getRecommendation(wishlistItem.Event)),
        }
        
        // Acknowledge the processing of the received event
        received.Result <- <-sendResult
    }
}

Features

  • Producers can
    • explicitly receive and handle the message's publishing result, enabling at-least-one delivery guarantee
    • Or fire-and-forget (removing the latency related to message publishing)
  • Consumers must acknowledge the processing of message to enable message redelivery (at-least-one processing guarantee).
  • Support for graceful shutdown to prevent message loses by ensuring all messages are flushed to the queue
  • Middlewares to customize the processing of events .e.g. error handling, logging, retry

Supported queue technologies

  • Amazon SQS
  • Amazon SNS (only send of events)
  • Kafka
  • Rabbit MQ
  • Redis
  • Postgres database queue (support at-least-one delivery guarantee for producers with transaction)

Documentation

Index

Constants

View Source
const (
	DefaultPublisherCount        = 16
	DefaultSendChannelBufferSize = 1024
)

Variables

This section is empty.

Functions

func BindJson

func BindJson(e Event, destination interface{}) error

Types

type Event

type Event interface {
	// Context return the context from which the event is created
	// The returned context is used to propagate request-scoped values
	// Cancellation signals is not used
	Context() context.Context

	// Content of the event
	Body() ([]byte, error)
}

func JsonEvent

func JsonEvent(ctx context.Context, payload interface{}) Event

func StringEvent

func StringEvent(ctx context.Context, str string) Event

type EventEnvelop

type EventEnvelop struct {
	// For sending message
	//   Result is an optional channel to receive the result of the message publishing, which is
	//     + nil if message was send successfully
	//     + error if the message couldn't be send
	//
	// For receiving message, Result channel is used to acknowledge the processing of message by receiver.
	//     + Receiver sends nil if message is processed successfully
	//     + Receiver sends error if the message cannot be processed
	Result chan<- error

	Event
}

type MiddleWare

type MiddleWare func(queue Queue) Queue

type Option

type Option = func(c *writeGroupConfig)

func WithBufferSize

func WithBufferSize(bufferSize int) Option

func WithMiddlewares

func WithMiddlewares(middlewares ...MiddleWare) Option

func WithPublisherCount

func WithPublisherCount(count int) Option

type Queue

type Queue interface {
	// Send delivers message to the queue
	Send(ctx context.Context, event Event) error

	// Receive returns messages from the queue
	Receive(ctx context.Context) ([]EventEnvelop, error)
}

type QueueProvider

type QueueProvider interface {
	// Get return the Queue identified by name to send and receive messages
	Get(ctx context.Context, name string) (Queue, error)
}

type Transport

type Transport interface {
	Send(topic string) (chan<- EventEnvelop, error)
	Receive(topic string) (<-chan EventEnvelop, error)
	Shutdown(ctx context.Context)
}

func NewTransport

func NewTransport(queueProvider QueueProvider, options ...Option) Transport

Directories

Path Synopsis
examples
queues
sns
sqs

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL