streams

package module
v0.0.1-alpha.8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 25, 2023 License: MIT Imports: 15 Imported by: 6

README

Streams

Streams is a stream-based communication toolkit made for data-in-motion platforms that enables applications to use streaming communication mechanisms for efficient data transfer.

Installation

To install the library, you need to have Golang installed in your system. Once you have Golang installed, you can use the following command to install the library:

go get github.com/alexandria-oss/streams
go get github.com/alexandria-oss/streams/driver/YOUR_DRIVER

Usage

To use library in your application, you need to import it and create a new instance:

import (
    "github.com/alexandria-oss/streams"
    "github.com/alexandria-oss/streams/chanbuf"
)

func main() {
    // Setup both reader and writer instances to be used by the bus. Chanbuf is the in-memory channel-backed driver.
    var reader streams.Reader = chanbuf.NewReader(nil)
    var writer streams.Writer = chanbuf.NewWriter(nil)
    
    bus := streams.NewBus(writer, reader)
}
Sending Data

To send data using the library, you can use the Send method:

err := bus.Publish(context.TODO(), YourMessage{})
Receiving Data

To receive data using the library, you need to register a callback function using the Subscribe method:

bus.Subscribe("user.created", UserCreated{}, func(ctx context.Context, msg streams.Message) error {
  userEvent := UserCreated{}
  if err := codec.Unmarshal(msg.ContentType, msg.Data, &userEvent); err != nil {
    return err
  }
  log.Printf("[At subscriber 0] %s", userEvent)
  // DO SOMETHING HERE...
  return nil // Returning an error here will NOT acknowledge the message consumption, retrying the process or just failing.
})

Examples

Here are some examples of how you can use the Streaming Communication Library in your application:

Example 1: Sending and Receiving Data
package main

import (
	"context"
	"log"
	"os"
	"os/signal"
	"syscall"

	"github.com/alexandria-oss/streams"
	"github.com/alexandria-oss/streams/codec"
	"github.com/alexandria-oss/streams/driver/chanbuf"
)

func main() {
    var reader streams.Reader = chanbuf.NewReader(nil)
    var writer streams.Writer = chanbuf.NewWriter(nil)
    
    bus := streams.NewBus(writer, reader)
    
    bus.Subscribe("user.created", UserCreated{}, func(ctx context.Context, msg streams.Message) error {
      userEvent := UserCreated{}
      if err := codec.Unmarshal(msg.ContentType, msg.Data, &userEvent); err != nil {
        return err
      }
      log.Printf("[At subscriber 0] %s", userEvent)
      return nil
    })
  
    sysChan := make(chan os.Signal, 2)
    signal.Notify(sysChan, os.Interrupt, syscall.SIGTERM)
    go func() {
      // This step is required if you used chanbuf.Bus implementation. This
      // routine will start the underlying channel-based bus.
      chanbuf.Start()
    }()
    go func() {
      // Start the bus inside a goroutine as bus MIGHT block I/O.
      if err := bus.Start(); err != nil {
        log.Print(err)
        os.Exit(1)
      }
    }()
    go func() {
      // Publish your messages as expected.
      err := bus.Publish(context.TODO(), UserCreated{
        UserID:      "123",
        DisplayName: "Joe Doe",
      })
      if err != nil {
        log.Print(err)
      }
    }()
    <-sysChan
    // Shutdown bus instances to deallocate its underlying resources.
    chanbuf.Shutdown()
    _ = bus.Shutdown()
}

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrBusIsShutdown          = errors.New("streams: bus has been terminated")
	ErrEmptyMessage           = errors.New("streams: message is empty")
	ErrUnrecoverable          = errors.New("streams: unrecoverable error")
	ErrEventNotFound          = errors.New("streams: event not found")
	ErrNoSubscriberRegistered = errors.New("streams: subscriber scheduler has no subscriber tasks")
)

Functions

func NewKSUID

func NewKSUID() (string, error)

NewKSUID generates a k-sortable unique identifier (KSUID).

func NewUUID

func NewUUID() (string, error)

NewUUID generates a universally unique identifier (UUID).

Types

type Bus

A Bus is the top-level component used by systems to interact with data-in-motion platforms. If finer-grained tuning or experience is desired, use low-level components such as Writer or Reader.

func NewBus

func NewBus(w Writer, r Reader, options ...PublisherOption) *Bus

NewBus allocates a Bus instance. Specify options to customize Publisher's default configurations.

type DeduplicationStorage

type DeduplicationStorage interface {
	// Commit registers and acknowledges a message has been processed correctly.
	Commit(ctx context.Context, workerID, messageID string)
	// IsDuplicated indicates if a message has been processed before.
	IsDuplicated(ctx context.Context, workerID, messageID string) (bool, error)
}

A DeduplicationStorage is a special kind of storage used by a data-in motion system to keep track of duplicate messages.

It is recommended to use in-memory (or at least external) storages to increase performance significantly and reduce main database backpressure.

type EmbeddedDeduplicationStorage

type EmbeddedDeduplicationStorage struct {
	// contains filtered or unexported fields
}

A EmbeddedDeduplicationStorage is the in-memory concrete implementation of DeduplicationStorage using package allegro.BigCache as high-performance underlying storage.

Consider by using this storage, your computing instance becomes stateful, meaning if the node gets down, the deduplicated message database will be dropped as well.

func NewEmbeddedDeduplicationStorage

func NewEmbeddedDeduplicationStorage(cfg EmbeddedDeduplicationStorageConfig, cache *bigcache.BigCache) EmbeddedDeduplicationStorage

NewEmbeddedDeduplicationStorage allocates an in-memory DeduplicationStorage instance.

func (EmbeddedDeduplicationStorage) Commit

func (e EmbeddedDeduplicationStorage) Commit(_ context.Context, workerID, messageID string)

func (EmbeddedDeduplicationStorage) IsDuplicated

func (e EmbeddedDeduplicationStorage) IsDuplicated(_ context.Context, workerID, messageID string) (bool, error)

type EmbeddedDeduplicationStorageConfig

type EmbeddedDeduplicationStorageConfig struct {
	Logger       *log.Logger
	ErrorLogger  *log.Logger
	KeyDelimiter string
}

EmbeddedDeduplicationStorageConfig is the EmbeddedDeduplicationStorage schema configuration.

type ErrUnrecoverableWrap

type ErrUnrecoverableWrap struct {
	ParentErr error
}

A ErrUnrecoverableWrap is a special wrapper for certain type of errors with no recoverable action.

func (ErrUnrecoverableWrap) Error

func (u ErrUnrecoverableWrap) Error() string

func (ErrUnrecoverableWrap) String

func (u ErrUnrecoverableWrap) String() string

func (ErrUnrecoverableWrap) Unwrap

func (u ErrUnrecoverableWrap) Unwrap() error

Unwrap returns ErrUnrecoverable variable. Thus, calls to errors.Is(err, ErrUnrecoverable) routine will detect this wrapper as unrecoverable error.

type Event

type Event interface {
	// GetHeaders allocates a set of key-value items to be passed through data-in-motion platforms as message headers.
	GetHeaders() map[string]string
	// GetKey retrieves a key used by certain data-in-motion platforms (e.g. Apache Kafka) to route
	// messages with the same key to certain partitions/queues. Leave empty if routing is NOT desired.
	GetKey() string
}

An Event is an abstraction unit of Message. Represents factual information about a happening inside a system (hence its immutable).

type EventRegistry

type EventRegistry map[string]string

A EventRegistry is a low-level storage used to create relationships between Event types and topics (streams).

func (EventRegistry) GetEventTopic

func (r EventRegistry) GetEventTopic(event Event) (string, error)

GetEventTopic retrieves the attached topic of the Event. Returns ErrEventNotFound if Event entry is not available.

func (EventRegistry) RegisterEvent

func (r EventRegistry) RegisterEvent(event Event, topic string)

RegisterEvent creates a relationship between an Event type and a topic (stream).

type IdentifierFactory

type IdentifierFactory func() (string, error)

An IdentifierFactory is a small component function that generates unique identifiers.

type Message

type Message struct {
	ID         string            `json:"message_id"`      // Message unique identifier.
	StreamName string            `json:"stream_name"`     // Name of the stream this Message will be/is transported.
	StreamKey  string            `json:"stream_key"`      // A key used by underlying systems to route a Message to specific partitions/queues.
	Headers    map[string]string `json:"message_headers"` // Map which passes additional context and metadata about the message.
	// Type of Data content.
	//
	// The usage of the RFC2046 MIME specification is preferred (e.g. application/json, application/xml).
	//
	// [Reference](https://www.rfc-editor.org/rfc/rfc2046).
	ContentType string    `json:"content_type"`
	Data        []byte    `json:"data"`         // Encoded information generated by a system.
	Time        time.Time `json:"message_time"` // Timestamp of a Message publishing operation.
	DecodedData any       `json:"-"`            // Only available on readers. Decoded Data using an underlying codec.Codec implementation.
}

A Message is the unit of transport containing information generated by a system. In addition, it contains metadata used by `streams` internal mechanisms. A Message will be/is transported through one or more streams.

type NoopWriter

type NoopWriter struct {
	WantWriterErr error
}

NoopWriter no-operation Writer instance.

func (NoopWriter) Write

func (n NoopWriter) Write(_ context.Context, _ []Message) error

type Publisher

type Publisher struct {
	// contains filtered or unexported fields
}

A Publisher is a high-level component which writes Event(s) into topics (streams). Depending on the underlying Writer, publish routines will write Event(s) in batches or one-by-one.

func NewPublisher

func NewPublisher(w Writer, eventReg EventRegistry, options ...PublisherOption) Publisher

NewPublisher allocates a new Publisher instance ready to be used. Specify options to customize default configurations.

func (Publisher) Publish

func (p Publisher) Publish(ctx context.Context, events ...Event) error

Publish writes Event(s) into a topic specified on each Event.

func (Publisher) PublishToTopic

func (p Publisher) PublishToTopic(ctx context.Context, topic string, events ...Event) error

PublishToTopic writes Event(s) into a specific topic.

type PublisherConfig

type PublisherConfig struct {
	IdentifierFactory IdentifierFactory
	Codec             codec.Codec
}

PublisherConfig is the Publisher configuration.

type PublisherOption

type PublisherOption interface {
	// contains filtered or unexported methods
}

func WithIdentifierFactory

func WithIdentifierFactory(f IdentifierFactory) PublisherOption

func WithPublisherCodec

func WithPublisherCodec(c codec.Codec) PublisherOption

type ReadTask

type ReadTask struct {
	Stream       string
	Handler      ReaderHandleFunc
	ExternalArgs map[string]any
}

A ReadTask is the unit of information a SubscriberScheduler passes to Reader workers in order to start stream-reading jobs. Use ExternalArgs to specify driver-specific configuration.

func (*ReadTask) SetArg

func (t *ReadTask) SetArg(key string, value any) *ReadTask

SetArg sets an entry into ExternalArgs and returns the ReadTask instance ready to be chained to another builder routine (Fluent API-like). Arguments will be later passed to Reader concrete implementations; useful for situations where readers accept extra arguments such as consumer groups identifiers (Apache Kafka).

func (*ReadTask) WithMiddleware

func (t *ReadTask) WithMiddleware(middlewareFunc ReaderMiddlewareFunc) *ReadTask

WithMiddleware appends a ReaderHandleFunc instance to ReadTask.Handler; this is also known as chain of responsibility pattern.

type Reader

type Reader interface {
	// Read reads from the specified stream in ReadTask, blocking the I/O. Everytime a new message arrives,
	// Reader will execute ReadTask.Handler routine in a separate goroutine.
	//
	// Use ctx context.Context to signal shutdowns.
	Read(ctx context.Context, task ReadTask) error
}

A Reader is a low-level component which allows systems to read from a stream.

type ReaderHandleFunc

type ReaderHandleFunc func(ctx context.Context, msg Message) error

ReaderHandleFunc routine to be executed for each message received by Reader instances.

type ReaderMiddlewareFunc

type ReaderMiddlewareFunc func(next ReaderHandleFunc) ReaderHandleFunc

func WithDeadLetterQueue

func WithDeadLetterQueue(writer Writer) ReaderMiddlewareFunc

WithDeadLetterQueue appends to ReaderHandleFunc(s) a mechanism to send poisoned or failed (after retries) messages to a dead-letter queue (DLQ). The dead-letter queue MIGHT retain these messages for a longer time that a normal queue.

Dead-letter queue messages are emitted to the Message.StreamName but with the suffix ".dlq".

After failures, a dead-letter queue comes into play as engineering teams can manually/automatically enqueue failed messages again into the original queue (i.e. re-drive/replay policies), so messages can be processed again without further overhead.

Moreover, this dead-letter queue could not only be a message bus like Apache Kafka or services like Amazon SQS; even a blob storage service like Amazon S3 could implement Writer and retain failed messages.

func WithDeduplication

func WithDeduplication(group string, storage DeduplicationStorage) ReaderMiddlewareFunc

WithDeduplication appends to ReaderHandleFunc(s) a mechanism to deduplicate processed messages, ensuring idempotency. Uses DeduplicationStorage to keep track of processed messages and group identifier to enable multiple isolated workers commit their processed messages to the same storage.

func WithReaderErrorLogger

func WithReaderErrorLogger(logger *log.Logger) ReaderMiddlewareFunc

WithReaderErrorLogger appends to ReaderHandleFunc(s) a mechanism to log errors using a logger instance.

func WithReaderRetry

func WithReaderRetry(retry *retrier.Retrier) ReaderMiddlewareFunc

WithReaderRetry appends to ReaderHandleFunc(s) a mechanism to retry up to N times. Uses retrier.Retrier package to enable advanced backoff mechanisms such as exponential plus jitter.

type SubscriberScheduler

type SubscriberScheduler struct {
	// contains filtered or unexported fields
}

A SubscriberScheduler is a high-level component used to manage and schedule Reader tasks.

Zero value is NOT ready to use.

func NewSubscriberScheduler

func NewSubscriberScheduler(r Reader, eventReg EventRegistry) SubscriberScheduler

NewSubscriberScheduler allocates a new SubscriberScheduler instance ready to be used. Returns a ReadTask instance ready to be chained to another builder routine (Fluent API-like).

func (*SubscriberScheduler) Shutdown

func (r *SubscriberScheduler) Shutdown() error

Shutdown triggers graceful shutdown of running ReadTask(s) worker(s). This routine will block I/O until all workers have been properly shutdown.

func (*SubscriberScheduler) Start

func (r *SubscriberScheduler) Start() error

Start schedules and spins up a worker for each registered ReadTask(s).

func (*SubscriberScheduler) Subscribe

func (r *SubscriberScheduler) Subscribe(topic string, event Event, handler ReaderHandleFunc) *ReadTask

Subscribe registers a stream reading job using Event registered topic from EventRegistry. This routine will append a new entry to EventRegistry if Event was not found at first try, automating event-topic registration.

Returns a ReadTask instance ready to be chained to another builder routine (Fluent API-like).

func (*SubscriberScheduler) SubscribeEvent

func (r *SubscriberScheduler) SubscribeEvent(event Event, handler ReaderHandleFunc) *ReadTask

SubscribeEvent registers a stream reading job using Event registered topic from EventRegistry. This routine will panic if Event was not previously registered.

Returns a ReadTask instance ready to be chained to another builder routine (Fluent API-like).

func (*SubscriberScheduler) SubscribeEventSafe

func (r *SubscriberScheduler) SubscribeEventSafe(event Event, handler ReaderHandleFunc) (*ReadTask, error)

SubscribeEventSafe registers a stream reading job using Event registered topic from EventRegistry. Returns ErrEventNotFound if Event was not previously registered.

func (*SubscriberScheduler) SubscribeTopic

func (r *SubscriberScheduler) SubscribeTopic(topic string, handler ReaderHandleFunc) *ReadTask

SubscribeTopic registers a stream reading job to a specific topic. Returns a ReadTask instance ready to be chained to another builder routine (Fluent API-like).

type Writer

type Writer interface {
	// Write writes a message batch into a stream specified on each Message through the Message.StreamName field.
	//
	// Depending on the underlying Writer implementation, this routine will write messages in actual batches,
	// batch chunks or one-by-one.
	Write(ctx context.Context, msgBatch []Message) error
}

A Writer is a low-level component that writes messages into streams. Each message MUST have its own stream name specified.

Directories

Path Synopsis
driver
amazon Module
dynamodb Module
kafka Module
sns Module
sql Module
sqs Module
examples
internal
proxy

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL