streams

package module
v0.2.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 4, 2022 License: MIT Imports: 17 Imported by: 1

README

✉ Streams

Go Build GoDoc Go Report Card codebeat badge Coverage Status Go Version

Streams is a toolkit crafted for data-in-motion ecosystems written in Go.

Requirements

  • Go version >= 1.17

Overall Architecture

Streams is composed by several inner components which collaborate with each other in order to accomplish basic streaming handling operations (publishing and consuming messages).

Streams exposes all its operational capabilities through a simple and idiomatic API, enabling interactions between the program and the actual live infrastructure using a facade component called Hub.

Image

Internal Hub architecture and specific flows of basic streams operations. On the left: Message publishing flow. On the right: Message consumption flow.

Message

The Message type is the unit of information which will be used to interact with multiple systems through live infrastructure.

Streams implements natively most of the CNCF's CloudEvents specification fields to keep consistency between messages passed through a stream.

Just as the CloudEvents specification states, depending on the underlying communication protocol from Event Buses and Message Brokers (e.g. MQTT, Apache Kafka, raw JSON, Amazon SNS), a message will be constructed accordingly to the given protocol.

For example, if using Apache Kafka, most of the message fields will be attached to binary headers instead the body of the message itself. In the other hand, if using Amazon Simple Notification Service, messages will be encoded into the raw JSON template for messages as AWS specifies on their API definition for SNS. These processes are independent from the Marshaler operations. Hence, message inner data (the actual message content) codec won't change.

For more information about CloudEvents, please review this repository.

Stream Registry

An Stream Registry is an in-memory key-value database used by both Reader Node(s) and Writer which holds metadata about every stream that will interact with the program.

Moreover, stream metadata might contain critical information about the stream such as the name of the stream (also called topic), schema definition version and/or the schema definition name so components such as Writer and Reader Node can find schema definitions from the Schema Registry in order to continue with their further operations normally. The stream name defined here is used by both Writer and Reader Node(s) to interact with live infrastructure.

The Stream Registry accepts reflection-based structs which will lead to a registration with the given struct name (e.g. package_name.struct_name -> main.fooMessage) as string. In addition, the registry also accepts plain strings as keys in order to increase flexibility (one may use the stream name, e.g. foo-stream).

Note: If using plain strings as keys, remember to fulfill the GoType metadata field so the Reader Node handler can decode the incoming message data. If no GoType was found in stream metadata while consuming a message, the marshaling capabilities will be disabled to avoid program panics.

Note: Using reflection-based stream definitions will lead to performance degradation when listening to streams.

Unique Identifier Factory

A Unique Identifier Factory is a component which generates unique identifiers using an underlying concrete implementation of a unique identifier algorithm (e.g. UUID, NanoID). It is used by the Writer component to construct unique messages.

Schema Registry

An Schema Registry is a database which holds messages schema definitions and versioning. It ensures that every message produced and consumed by the program complies with the specified schema definition.

The registry MIGHT be implemented using either external or internal underlying solutions (e.g. Third-Party service such as Amazon Glue, Host's disk or In-memory).

Note: For Apache Avro message formats, the usage of an Schema Registry is a MUST in order for the Marshaler component to decode and encode message data.

Marshaler

A Marshaler is a component in charge of message data coding and encoding.

Currently, Streams has Apache Avro and JSON native implementations. Nevertheless, the Marshaler interface is exported through Streams API to give flexibility to developers as it lets custom Marshaler implementations.

We are currently considering adding Protocol-Buffers and Flat/Flex Buffers codecs for edge cases where greater performance is required.

Message Broker / Event Bus Driver

The Message Broker / Event Bus Driver is an abstract component which enables interactions between Hub internal components and the actual stream-messaging live infrastructure (e.g. Apache Kafka, Amazon SNS/SQS, In-memory).

The driver component implements both Writer and Reader Node interfaces. Thus, by separating behaviours through interfaces, technology heterogeneity and autonomy between processes is achieved, giving the program even greater flexibility of interaction.

For example, the program might contain a Hub which publishes messages to Amazon Simple Notification Service (SNS) while one set of Reader Nodes polls messages from Amazon Simple Queue Service (SQS) queues and another set of Reader Nodes receive messages from Apache Kafka topics.

Writer

A Writer is a high-level component that lets the program publish messages to desired streams defined on the message broker / event bus, so external programs may react to published messages in parallel.

Furthermore, the writer API is designed to allow chain of responsibility pattern implementations (middlewares) in order to aggregate extra behaviours when publishing messages (e.g. logging, tracing, monitoring, retries).

Streams offers native implementations through the use of a Driver. Nevertheless, custom Writer implementations crafted by developers are available as Streams API exposes the writer interface.

Reader Registry

A Stream Reader Registry is an in-memory database which holds information about workers to be scheduled when Hub gets started.

Workers are also called Reader Node.

Reader Supervisor

The Reader Supervisor is an internal Hub component which manages Reader Node(s) lifecycles.

It forks new workers into the Reader Registry queue, and it schedules workers on Hub startup.

In addition, when forking new workers, the supervisor crafts a Reader Task template, using the reader node configuration, which will be later passed to Driver reader node interface implementations on Hub startup. This template is used internally by drivers to access critical data, so they can interact with live infrastructure (e.g. Stream / Topic name, Consumer Groups / Queues to be used, Vendor-specific configurations such as Amazon Web Services or Shopify's Sarama lib for Apache Kafka).

Reader Node

A Reader Node is an internal Reader Supervisor component which schedules actual stream-listening jobs. These stream-listening jobs are mostly I/O blocking so the node will try to run then concurrently if a degree of parallelism was configured for the worker.

It uses the Driver reader node interface implementation to interact with live infrastructure.

Note: In order to stop Reader Node inner processes, a context cancellation MUST be issued through the root Context passed originally on Hub startup. Moreover, every node job has an internal timeout context constructed from the root context in order to avoid stream-reader jobs hang up or considerable wait times, affecting throughput directly.

Note: Every Reader Node inner process runs inside a new goroutine and uses a timeout scoped context to keep process autonomy and increase overall throughput.

ReaderHandler / ReaderHandleFunc

Each Reader Node contains a specific-configuration as previously mentioned. This configuration holds, asides from critical data for Driver implementations, Reader and ReaderFunc interface/type which represent the entry point for desired message processing operations defined by the developer (the handler for each message received from a queue/topic).

These types/interfaces lets programs to return an error if something failed when processing the message. If no error was returned, the Driver implementation will acknowledge the message to the actual live infrastructure to avoid message re-processing issues. As side note and recommendation, remember to keep message processors idempotent to deal with the nature of distributed systems (duplicated and un-ordered messages).

Moreover, the Reader and ReaderFunc types/interfaces APIs were defined to enable chain of responsibility pattern implementations (middlewares), just as the Writer API, to let developers add layers of extra behaviour when processing a message.

It is required to say that Streams adds layers of behaviour by default for every Reader/ReaderFunc forked. These behaviours include:

  • Exponential backoff retrying (fully customizable)
  • Correlation and Causation IDs injection into the handler-scoped context
  • Unmarshaling*
  • Logging*
  • Monitoring/Metrics*
  • Tracing*

* Available if properly configured

Supported infrastructure

  • Apache Kafka (on-premise, Confluent cloud or Amazon Managed Streaming for Apache Kafka/MSK)
  • Amazon Simple Notification Service (SNS) and Simple Queue Service (SQS) with the Topic-Queue chaining pattern implementation
  • Apache Pulsar*
  • MQTT-based buses/brokers (e.g. RabbitMQ, Apache ActiveMQ)*
  • Google Cloud PubSub*
  • Microsoft Azure Service Bus*
  • Redis Streams*

* On Streams's roadmap, not yet implemented.

Documentation

Overview

Package streams is a toolkit crafted for data-in-motion ecosystems written in Go.

Index

Constants

View Source
const (
	// MarshalerProtoContentType default content-type header for Protocol Buffer marshaller.
	MarshalerProtoContentType = "application/octet-stream"
	// MarshalerJSONContentType default content-type header for JSON marshaller.
	MarshalerJSONContentType = "application/json"
	// MarshalerAvroContentType default content-type header for Apache Avro marshaller.
	MarshalerAvroContentType = "application/avro"
)
View Source
const CloudEventsSpecVersion = "1.0"

CloudEventsSpecVersion the CloudEvents specification version used by streams

Variables

View Source
var (
	// DefaultConcurrencyLevel default stream-listening jobs to be running concurrently for each ReaderNode.
	DefaultConcurrencyLevel = 1
	// DefaultRetryInitialInterval default initial interval duration between each stream-listening job provisioning on failures.
	DefaultRetryInitialInterval = time.Second * 3
	// DefaultRetryMaxInterval default maximum interval duration between each stream-listening job provisioning on failures.
	DefaultRetryMaxInterval = time.Second * 15
	// DefaultRetryTimeout default duration of each stream-listening job provisioning on failures.
	DefaultRetryTimeout = time.Second * 15
	// DefaultMaxHandlerPoolSize default pool size of goroutines for ReaderNode's Reader(s) / ReaderHandleFunc(s) executions.
	DefaultMaxHandlerPoolSize = 10
)
View Source
var (
	// DefaultHubInstanceName default instance names for nameless Hub instances
	DefaultHubInstanceName = "com.streams"
)
View Source
var (
	// ErrInvalidProtocolBufferFormat the given data is not a valid protocol buffer message
	ErrInvalidProtocolBufferFormat = errors.New("streams: Invalid protocol buffer data")
)
View Source
var ErrMissingSchemaDefinition = errors.New("streams: Missing stream schema definition in schema registry")

ErrMissingSchemaDefinition the requested stream message definition was not found in the SchemaRegistry

View Source
var ErrMissingStream = errors.New("streams: Missing stream entry in stream registry")

ErrMissingStream the requested stream was not found in the StreamRegistry

View Source
var ErrMissingWriterDriver = errors.New("streams: Missing writer driver")

ErrMissingWriterDriver no publisher driver was found.

View Source
var ReaderBaseBehaviours = []ReaderBehaviour{
	unmarshalReaderBehaviour,
	injectGroupReaderBehaviour,
	injectTxIDsReaderBehaviour,
	retryReaderBehaviour,
}

ReaderBaseBehaviours default ReaderBehaviours

Behaviours will be executed in descending order

View Source
var ReaderBaseBehavioursNoUnmarshal = []ReaderBehaviour{
	injectGroupReaderBehaviour,
	injectTxIDsReaderBehaviour,
	retryReaderBehaviour,
}

ReaderBaseBehavioursNoUnmarshal default ReaderBehaviours without unmarshaling

Behaviours will be executed in descending order

Functions

func InjectMessageCausationID

func InjectMessageCausationID(ctx context.Context, messageID string) string

InjectMessageCausationID injects the causation id from the given context if available. If not, it will use the message id as fallback.

func InjectMessageCorrelationID

func InjectMessageCorrelationID(ctx context.Context, messageID string) string

InjectMessageCorrelationID injects the correlation id from the given context if available. If not, it will use the message id as fallback.

func Read added in v0.2.1

func Read(message interface{}, opts ...ReaderNodeOption) error

Read registers a new stream-reading background job.

If reading from a Google's Protocol Buffer message pipeline, DO NOT use a pointer as message schema to avoid marshaling problems

func ReadByStreamKey added in v0.2.1

func ReadByStreamKey(stream string, opts ...ReaderNodeOption)

ReadByStreamKey registers a new stream-reading background job using the raw stream identifier (e.g. topic name).

func RegisterStream added in v0.2.1

func RegisterStream(message interface{}, metadata StreamMetadata)

RegisterStream creates a relation between a stream message type and metadata.

If registering a Google's Protocol Buffer message, DO NOT use a pointer as message schema to avoid marshaling problems

func RegisterStreamByString added in v0.2.1

func RegisterStreamByString(messageType string, metadata StreamMetadata)

RegisterStreamByString creates a relation between a string key and metadata.

func Start added in v0.2.1

func Start(ctx context.Context)

Start initiates all daemons (e.g. stream-reading jobs) processes

func Write added in v0.2.1

func Write(ctx context.Context, message interface{}) error

Write inserts a message into a stream assigned to the message in the StreamRegistry in order to propagate the data to a set of subscribed systems for further processing.

Uses given context to inject correlation and causation IDs.

func WriteBatch added in v0.2.1

func WriteBatch(ctx context.Context, messages ...interface{}) (uint32, error)

WriteBatch inserts a set of messages into a stream assigned on the StreamRegistry in order to propagate the data to a set of subscribed systems for further processing.

Uses given context to inject correlation and causation IDs.

If an item from the batch fails, other items will fail too

func WriteByMessageKey added in v0.2.1

func WriteByMessageKey(ctx context.Context, messageKey string, message interface{}) error

WriteByMessageKey inserts a message into a stream using the custom message key from StreamRegistry in order to propagate the data to a set of subscribed systems for further processing.

Uses given context to inject correlation and causation IDs.

func WriteByMessageKeyBatch added in v0.2.1

func WriteByMessageKeyBatch(ctx context.Context, items WriteByMessageKeyBatchItems) (uint32, error)

WriteByMessageKeyBatch inserts a set of messages into a stream using the custom message key from StreamRegistry in order to propagate the data to a set of subscribed systems for further processing.

Uses given context to inject correlation and causation IDs.

If an item from the batch fails, other items will fail too

func WriteRawMessage added in v0.2.1

func WriteRawMessage(ctx context.Context, message Message) error

WriteRawMessage inserts a raw transport message into a stream in order to propagate the data to a set of subscribed systems for further processing.

Uses given context to inject correlation and causation IDs.

func WriteRawMessageBatch added in v0.2.1

func WriteRawMessageBatch(ctx context.Context, messages ...Message) (uint32, error)

WriteRawMessageBatch inserts a set of raw transport message into a stream in order to propagate the data to a set of subscribed systems for further processing.

Uses given context to inject correlation and causation IDs.

The whole batch will be passed to the underlying Writer driver implementation as every driver has its own way to deal with batches

Types

type AvroMarshaler

type AvroMarshaler struct {
	HashingFactory Hashing64AlgorithmFactory
	// contains filtered or unexported fields
}

AvroMarshaler handles data transformation between primitives and Apache Avro format.

Apache Avro REQUIRES a defined SchemaRegistry to decode/encode data.

func NewAvroMarshaler

func NewAvroMarshaler() AvroMarshaler

NewAvroMarshaler allocates a new Apache Avro marshaler with a simple caching system to reduce memory footprint and computational usage when parsing Avro schema definition files.

func (AvroMarshaler) ContentType

func (a AvroMarshaler) ContentType() string

ContentType retrieves the encoding/decoding Apache Avro format using RFC 2046 standard (application/avro).

func (AvroMarshaler) Marshal

func (a AvroMarshaler) Marshal(schemaDef string, data interface{}) (parsedData []byte, err error)

Marshal transforms a complex data type into a primitive binary array for data transportation using Apache Avro format.

func (AvroMarshaler) Unmarshal

func (a AvroMarshaler) Unmarshal(schemaDef string, data []byte, ref interface{}) (err error)

Unmarshal transforms a primitive binary array to a complex data type for data processing using Apache Avro format.

type Event

type Event interface {
	// GetSubject This describes the subject of the event in the context of the event producer (identified by source).
	// In publish-subscribe scenarios, a subscriber will typically subscribe to events emitted by a source, but the
	// source identifier alone might not be sufficient as a qualifier for any specific event if the source
	// context has internal sub-structure.
	//
	// Identifying the subject of the event in context metadata (opposed to only in the data payload) is particularly
	// helpful in generic subscription filtering scenarios where middleware is unable to interpret the data content.
	// In the above example, the subscriber might only be interested in blobs with names ending with '.jpg' or '.jpeg'
	// and the subject attribute allows for constructing a simple and efficient string-suffix filter for that
	// subset of events.
	GetSubject() string
}

Event is an abstract message unit used by streams-based systems to publish messages with a `subject` populated field of a Message

type FailingMarshalerNoop

type FailingMarshalerNoop struct{}

FailingMarshalerNoop the no-operation failing Marshaler

For testing purposes only

func (FailingMarshalerNoop) ContentType

func (f FailingMarshalerNoop) ContentType() string

ContentType the failing content type operation

func (FailingMarshalerNoop) Marshal

func (f FailingMarshalerNoop) Marshal(_ string, _ interface{}) ([]byte, error)

Marshal the failing marshal operation

func (FailingMarshalerNoop) Unmarshal

func (f FailingMarshalerNoop) Unmarshal(_ string, _ []byte, _ interface{}) error

Unmarshal the failing unmarshal operation

type Hashing64AlgorithmFactory

type Hashing64AlgorithmFactory func() hash.Hash64

Hashing64AlgorithmFactory factory for hash.Hash64 algorithms (used by Apache Avro schema definition caching system)

var DefaultHashing64AlgorithmFactory Hashing64AlgorithmFactory = func() hash.Hash64 {
	return fnv.New64a()
}

DefaultHashing64AlgorithmFactory the default hashing64 algorithm factory for Marshaler schema definition caching layer

type Hub

type Hub struct {
	InstanceName      string
	StreamRegistry    StreamRegistry
	Writer            Writer
	Marshaler         Marshaler
	IDFactory         IDFactoryFunc
	SchemaRegistry    SchemaRegistry
	Reader            Reader
	ReaderBehaviours  []ReaderBehaviour
	ReaderBaseOptions []ReaderNodeOption
	// contains filtered or unexported fields
}

Hub is the main component which enables interactions between several systems through the usage of streams.

var (
	// DefaultHub is the `streams` base instance used by the `streams` simple API. Recommended to use where only one
	// hub instance is required.
	//
	// This variable MUST be allocated manually.
	DefaultHub *Hub
	// ErrNilDefaultHub DefaultHub has not been initialized.
	ErrNilDefaultHub = errors.New("streams: DefaultHub has not been initialized")
)

func NewHub

func NewHub(opts ...HubOption) *Hub

NewHub allocates a new Hub

func (*Hub) GetStreamReaderNodes added in v0.2.4

func (h *Hub) GetStreamReaderNodes(stream string) *singlylinkedlist.List

GetStreamReaderNodes retrieves ReaderNode(s) from a stream.

func (*Hub) Read

func (h *Hub) Read(message interface{}, opts ...ReaderNodeOption) error

Read registers a new stream-listening background job.

If listening to a Google's Protocol Buffer message, DO NOT use a pointer as message schema to avoid marshaling problems

func (*Hub) ReadByStreamKey

func (h *Hub) ReadByStreamKey(stream string, opts ...ReaderNodeOption)

ReadByStreamKey registers a new stream-listening background job using the raw stream identifier (e.g. topic name).

func (*Hub) RegisterStream

func (h *Hub) RegisterStream(message interface{}, metadata StreamMetadata)

RegisterStream creates a relation between a stream message type and metadata.

If registering a Google's Protocol Buffer message, DO NOT use a pointer as message schema to avoid marshaling problems

func (*Hub) RegisterStreamByString

func (h *Hub) RegisterStreamByString(messageType string, metadata StreamMetadata)

RegisterStreamByString creates a relation between a string key and metadata.

func (*Hub) Start

func (h *Hub) Start(ctx context.Context)

Start initiates all daemons (e.g. stream-listening jobs) processes

func (*Hub) Write

func (h *Hub) Write(ctx context.Context, message interface{}) error

Write inserts a message into a stream assigned to the message in the StreamRegistry in order to propagate the data to a set of subscribed systems for further processing.

Uses given context to inject correlation and causation IDs.

func (*Hub) WriteBatch

func (h *Hub) WriteBatch(ctx context.Context, messages ...interface{}) (uint32, error)

WriteBatch inserts a set of messages into a stream assigned on the StreamRegistry in order to propagate the data to a set of subscribed systems for further processing.

Uses given context to inject correlation and causation IDs.

If an item from the batch fails, other items will fail too

func (*Hub) WriteByMessageKey

func (h *Hub) WriteByMessageKey(ctx context.Context, messageKey string, message interface{}) error

WriteByMessageKey inserts a message into a stream using the custom message key from StreamRegistry in order to propagate the data to a set of subscribed systems for further processing.

Uses given context to inject correlation and causation IDs.

func (*Hub) WriteByMessageKeyBatch

func (h *Hub) WriteByMessageKeyBatch(ctx context.Context, items WriteByMessageKeyBatchItems) (uint32, error)

WriteByMessageKeyBatch inserts a set of messages into a stream using the custom message key from StreamRegistry in order to propagate the data to a set of subscribed systems for further processing.

Uses given context to inject correlation and causation IDs.

If an item from the batch fails, other items will fail too

func (*Hub) WriteRawMessage

func (h *Hub) WriteRawMessage(ctx context.Context, message Message) error

WriteRawMessage inserts a raw transport message into a stream in order to propagate the data to a set of subscribed systems for further processing.

Uses given context to inject correlation and causation IDs.

func (*Hub) WriteRawMessageBatch

func (h *Hub) WriteRawMessageBatch(ctx context.Context, messages ...Message) (uint32, error)

WriteRawMessageBatch inserts a set of raw transport message into a stream in order to propagate the data to a set of subscribed systems for further processing.

Uses given context to inject correlation and causation IDs.

The whole batch will be passed to the underlying Writer driver implementation as every driver has its own way to deal with batches

type HubOption

type HubOption interface {
	// contains filtered or unexported methods
}

HubOption enables configuration of a Hub instance.

func WithIDFactory

func WithIDFactory(f IDFactoryFunc) HubOption

WithIDFactory sets the default unique identifier factory of a Hub instance.

func WithInstanceName

func WithInstanceName(n string) HubOption

WithInstanceName sets the name of a Hub instance.

func WithMarshaler

func WithMarshaler(m Marshaler) HubOption

WithMarshaler sets the default marshaler of a Hub instance.

func WithReader

func WithReader(d Reader) HubOption

WithReader sets the default reader driver of a Hub instance.

func WithReaderBaseOptions

func WithReaderBaseOptions(opts ...ReaderNodeOption) HubOption

WithReaderBaseOptions sets a list of ReaderNodeOption of a Hub instance used as global options for each reader node.

func WithReaderBehaviours

func WithReaderBehaviours(b ...ReaderBehaviour) HubOption

WithReaderBehaviours sets a list of ReaderBehaviour of a Hub instance ready to be executed by every stream-reading job's ReaderFunc or Reader component.

func WithSchemaRegistry

func WithSchemaRegistry(r SchemaRegistry) HubOption

WithSchemaRegistry sets the schema registry of a Hub instance for stream message schema definitions.

func WithWriter

func WithWriter(p Writer) HubOption

WithWriter sets the writer of a Hub instance.

If both Writer and WriterFunc are defined, Writer will override WriterFunc.

type IDFactoryFunc

type IDFactoryFunc func() (string, error)

IDFactoryFunc creates an unique identifier.

var RandInt64Factory IDFactoryFunc = func() (string, error) {
	i := rand.Int63()
	return strconv.Itoa(int(i)), nil
}

RandInt64Factory creates a unique identifier using math/rand built-in package with 64-bit signed integer format

var UuidIdFactory IDFactoryFunc = func() (string, error) {
	id, err := uuid.NewUUID()
	return id.String(), err
}

UuidIdFactory creates a unique identifier using UUID v4 algorithm.

type InMemorySchemaRegistry

type InMemorySchemaRegistry map[string]string

InMemorySchemaRegistry is the in memory schema registry, crafted specially for basic and/or testing scenarios.

func (InMemorySchemaRegistry) GetSchemaDefinition

func (i InMemorySchemaRegistry) GetSchemaDefinition(name string, version int) (string, error)

GetSchemaDefinition retrieves a schema definition (in string format) from the registry

func (InMemorySchemaRegistry) RegisterDefinition

func (i InMemorySchemaRegistry) RegisterDefinition(name, def string, version int)

RegisterDefinition stores the given schema definition into the registry

type JSONMarshaler

type JSONMarshaler struct{}

JSONMarshaler handles data transformation between primitives and JSON format.

func (JSONMarshaler) ContentType

func (m JSONMarshaler) ContentType() string

ContentType retrieves the encoding/decoding JSON format using RFC 2046 standard (application/json).

func (JSONMarshaler) Marshal

func (m JSONMarshaler) Marshal(_ string, data interface{}) ([]byte, error)

Marshal transforms a complex data type into a primitive binary array for data transportation using JSON format.

func (JSONMarshaler) Unmarshal

func (m JSONMarshaler) Unmarshal(_ string, data []byte, ref interface{}) error

Unmarshal transforms a primitive binary array to a complex data type for data processing using JSON format.

type Marshaler

type Marshaler interface {
	// Marshal transforms a complex data type into a primitive binary array for data transportation.
	Marshal(schemaDef string, data interface{}) ([]byte, error)
	// Unmarshal transforms a primitive binary array to a complex data type for data processing.
	Unmarshal(schemaDef string, data []byte, ref interface{}) error
	// ContentType retrieves the encoding/decoding format using RFC 2046 standard (e.g. application/json).
	ContentType() string
}

Marshaler handles data transformation between primitives and specific codecs/formats (e.g. JSON, Apache Avro).

type Message

type Message struct {
	// Stream name of destination stream (aka. topic)
	Stream string `json:"stream"`
	// StreamVersion destination stream major version. Useful when non-backwards compatible schema update is desired.
	StreamVersion int    `json:"stream_version"`
	ID            string `json:"id"`
	Source        string `json:"source"`
	SpecVersion   string `json:"specversion"`
	Type          string `json:"type"`
	Data          []byte `json:"data"`

	DataContentType   string `json:"datacontenttype,omitempty"`
	DataSchema        string `json:"dataschema,omitempty"`
	DataSchemaVersion int    `json:"dataschemaversion,omitempty"`
	Timestamp         string `json:"time,omitempty"`
	Subject           string `json:"subject,omitempty"`

	// Streamhub fields
	CorrelationID string `json:"correlation_id"`
	CausationID   string `json:"causation_id"`

	// DecodedData data decoded using unmarshalling ReaderBehaviour component. This field is ONLY available for usage
	// from ReaderNode(s).
	DecodedData interface{} `json:"-"`
	// GroupName name of the reader group (aka. consumer group). This field is ONLY available for usage
	// from ReaderNode(s).
	GroupName string `json:"-"`
}

Message is a unit of information which holds the primitive message (data) in binary format along multiple fields in order to preserve a schema definition within a stream pipeline.

The schema is based on the Cloud Native Computing Foundation (CNCF)'s CloudEvents specification.

For more information, please look: https://github.com/cloudevents/spec

func NewMessage

func NewMessage(args NewMessageArgs) Message

NewMessage allocates an immutable Message ready to be transported in a stream.

type MessageContextKey

type MessageContextKey string

MessageContextKey is the streams context key to inject data into transport messages.

const (
	// ContextCorrelationID is the main trace of a stream processing. Once generated, it MUST NOT be generated again
	// to keep track of the process from the beginning.
	ContextCorrelationID MessageContextKey = "shub-correlation-id"
	// ContextCausationID is reference of the last message processed. This helps to know a direct relation between
	// a new process and the past one.
	ContextCausationID MessageContextKey = "shub-causation-id"
)

type NewMessageArgs

type NewMessageArgs struct {
	SchemaVersion        int
	Data                 []byte
	ID                   string
	Source               string
	Stream               string
	StreamVersion        int
	SchemaDefinitionName string
	ContentType          string
	GroupName            string
	Subject              string
}

NewMessageArgs arguments required by NewMessage function to operate.

type NoopSchemaRegistry

type NoopSchemaRegistry struct{}

NoopSchemaRegistry is the no-operation implementation of SchemaRegistry

func (NoopSchemaRegistry) GetSchemaDefinition

func (n NoopSchemaRegistry) GetSchemaDefinition(_ string, _ int) (string, error)

GetSchemaDefinition retrieves an empty string and a nil error

type ProtocolBuffersMarshaler

type ProtocolBuffersMarshaler struct{}

ProtocolBuffersMarshaler handles data transformation between primitives and Google Protocol Buffers format

func (ProtocolBuffersMarshaler) ContentType

func (p ProtocolBuffersMarshaler) ContentType() string

ContentType retrieves the encoding/decoding Google Protocol Buffers format using the latest conventions.

More information here: https://github.com/google/protorpc/commit/eb03145a6a7c72ae6cc43867d9635a5b8d8c4545

func (ProtocolBuffersMarshaler) Marshal

func (p ProtocolBuffersMarshaler) Marshal(_ string, data interface{}) ([]byte, error)

Marshal transforms a complex data type into a primitive binary array for data transportation using Google Protocol Buffers format

func (ProtocolBuffersMarshaler) Unmarshal

func (p ProtocolBuffersMarshaler) Unmarshal(_ string, data []byte, ref interface{}) error

Unmarshal transforms a primitive binary array to a complex data type for data processing using Google Protocol Buffers format

type Reader

type Reader interface {
	// ExecuteTask starts a background stream-reading task.
	ExecuteTask(_ context.Context, _ ReaderTask) error
}

Reader defines the underlying implementation of the stream-reading job (driver), which addresses the usage of custom protocols and/or APIs from providers (Apache Kafka, Amazon SQS, ...).

type ReaderBehaviour

type ReaderBehaviour func(node *ReaderNode, hub *Hub, next ReaderHandleFunc) ReaderHandleFunc

ReaderBehaviour is a middleware function with extra functionality which will be executed prior a ReaderHandleFunc or Reader component for every stream-reading job instance registered into a Hub.

The middleware gets injected the context ReaderNode (the stream-reading job to be executed), the root Hub instance and the parent middleware function.

Moreover, there are built-in behaviours ready to be used with streams:

- Retry backoff

- Correlation and causation ID injection

- Consumer group injection

- Auto-unmarshalling (*only if using reflection-based stream registry or GoType was defined when registering stream)

- Logging*

- Metrics*

- Tracing*

*Manual specification on configuration required

type ReaderHandleFunc

type ReaderHandleFunc func(context.Context, Message) error

ReaderHandleFunc is the execution process triggered when a message is received from a stream.

Returns an error to indicate the process has failed so Hub will retry the processing using exponential backoff.

type ReaderHandler

type ReaderHandler interface {
	// Read starts the execution process triggered when a message is received from a stream.
	//
	// Returns an error to indicate the process has failed so Hub will retry the processing using exponential backoff.
	Read(context.Context, Message) error
}

ReaderHandler is a wrapping structure of the ReadFunc handler for complex data processing scenarios.

type ReaderHandlerNoop

type ReaderHandlerNoop struct{}

ReaderHandlerNoop the no-operation implementation of ReaderHandler

func (ReaderHandlerNoop) Read

Read the no-operation implementation of ReaderHandler.Read()

type ReaderNode

type ReaderNode struct {
	Stream                string
	HandlerFunc           ReaderHandleFunc
	Group                 string
	ProviderConfiguration interface{}
	ConcurrencyLevel      int
	RetryInitialInterval  time.Duration
	RetryMaxInterval      time.Duration
	RetryTimeout          time.Duration
	Reader                Reader
	MaxHandlerPoolSize    int
}

ReaderNode is the worker unit which schedules stream-reading job(s).

Each ReaderNode is independent of other nodes to guarantee resiliency of interleaved processes and avoid cascading failures.

func GetStreamReaderNodes added in v0.2.4

func GetStreamReaderNodes(stream string) []ReaderNode

GetStreamReaderNodes retrieves ReaderNode(s) from a stream.

type ReaderNodeOption

type ReaderNodeOption interface {
	// contains filtered or unexported methods
}

ReaderNodeOption enables configuration of a ReaderNode.

func WithConcurrencyLevel

func WithConcurrencyLevel(n int) ReaderNodeOption

WithConcurrencyLevel sets the concurrency level of a ReaderNode. In other words, jobs to be scheduled by the ReaderNode.

Note: If level was defined less or equal than 0, the ReaderNode will schedule 1 job

func WithDriver

func WithDriver(d Reader) ReaderNodeOption

WithDriver sets the driver of a ReaderNode (e.g. Apache Kafka, Apache Pulsar, Amazon SQS).

func WithGroup

func WithGroup(g string) ReaderNodeOption

WithGroup sets the consumer group or queue name of a ReaderNode.

Note: It may not be available for some providers.

func WithHandler

func WithHandler(l ReaderHandler) ReaderNodeOption

WithHandler sets the ReaderHandler of a ReaderNode.

func WithHandlerFunc

func WithHandlerFunc(l ReaderHandleFunc) ReaderNodeOption

WithHandlerFunc sets the ReaderHandleFunc of a ReaderNode.

func WithMaxHandlerPoolSize

func WithMaxHandlerPoolSize(n int) ReaderNodeOption

WithMaxHandlerPoolSize sets the maximum number of goroutines executed by a ReaderNode's Reader or ReaderFunc.

Note: If size was defined less or equal than 0, the ReaderNode internal implementations will allocate a semaphore of 10 goroutines per handler.

func WithProviderConfiguration

func WithProviderConfiguration(cfg interface{}) ReaderNodeOption

WithProviderConfiguration sets the custom provider configuration of a ReaderNode (e.g. aws.Config, sarama.Config).

func WithRetryInitialInterval

func WithRetryInitialInterval(d time.Duration) ReaderNodeOption

WithRetryInitialInterval sets the initial duration interval for each retying tasks of a ReaderNode.

func WithRetryMaxInterval

func WithRetryMaxInterval(d time.Duration) ReaderNodeOption

WithRetryMaxInterval sets the maximum duration interval for each retying tasks of a ReaderNode.

func WithRetryTimeout

func WithRetryTimeout(d time.Duration) ReaderNodeOption

WithRetryTimeout sets the maximum duration for retying tasks of a ReaderNode.

type ReaderTask

type ReaderTask struct {
	Stream             string
	HandlerFunc        ReaderHandleFunc
	Group              string
	Configuration      interface{}
	Timeout            time.Duration
	MaxHandlerPoolSize int
}

ReaderTask job metadata in order to be executed by the ListenerNodeDriver.

type SchemaRegistry

type SchemaRegistry interface {
	// GetSchemaDefinition retrieves a schema definition (in string format) from the registry
	GetSchemaDefinition(name string, version int) (string, error)
}

SchemaRegistry is an external storage of stream message schemas definitions with proper versioning.

Examples of this schema registries are Amazon Glue Schema Registry and Confluent Schema Registry.

type StreamMetadata

type StreamMetadata struct {
	// Stream destination stream name (aka. topic)
	Stream string
	// StreamVersion destination stream major version. Useful when non-backwards compatible schema update is desired.
	StreamVersion int
	// SchemaDefinitionName
	SchemaDefinitionName string
	SchemaVersion        int
	GoType               reflect2.Type
}

StreamMetadata contains information of stream messages.

type StreamRegistry

type StreamRegistry struct {
	// contains filtered or unexported fields
}

StreamRegistry is an in-memory storage of streams metadata used by Hub and any external agent to set and retrieve information about a specific stream.

Uses a custom string (or Go's struct type as string) as key.

Note: A message key differs from stream name as the message key COULD be anything the developer sets within the stream registry. Thus, scenarios where multiple data types require publishing messages to the same stream are possible. Moreover, the message key is set by reflection-based registries with the reflect.TypeOf function, so it will differ from the actual stream name.

func NewStreamRegistry added in v0.2.5

func NewStreamRegistry() StreamRegistry

func (*StreamRegistry) Get

func (r *StreamRegistry) Get(message interface{}) (StreamMetadata, error)

Get retrieves a stream message metadata from a stream message type.

func (*StreamRegistry) GetByStreamName

func (r *StreamRegistry) GetByStreamName(name string) (StreamMetadata, error)

GetByStreamName retrieves a stream message metadata from a stream name.

It contains an optimistic lookup mechanism to keep constant time complexity.

If metadata is not found by the given key, then fallback default to O(log n) lookup. This will increase time complexity of the fallback function by the GetByString base complexity. Nevertheless, GetByString will be always constant, so it is guaranteed to keep a constant complexity sum to the overall GetByStream complexity. E.g. GetByString = 49.75 ns/op, therefore GetByStreamName = original ns/op + GetByString ns/op.

This optimistic lookup is done in order to keep amortized time complexity when using non-reflection based implementations on the root Hub (using only String methods from this very Stream Registry component). Thus, greater performance is achieved for scenarios when reflection-based stream registration is not required by the program.

func (*StreamRegistry) GetByString

func (r *StreamRegistry) GetByString(key string) (StreamMetadata, error)

GetByString retrieves a stream message metadata from a string key.

func (*StreamRegistry) Set

func (r *StreamRegistry) Set(message interface{}, metadata StreamMetadata)

Set creates a relation between a stream message type and metadata.

func (*StreamRegistry) SetByString

func (r *StreamRegistry) SetByString(key string, metadata StreamMetadata)

SetByString creates a relation between a string key and metadata.

type WriteByMessageKeyBatchItems

type WriteByMessageKeyBatchItems map[string]interface{}

WriteByMessageKeyBatchItems items to be written as batch on the Hub.WriteByMessageKeyBatch() function

type Writer

type Writer interface {
	// Write inserts a message into a stream assigned to the message in the StreamRegistry in order to propagate the
	// data to a set of subscribed systems for further processing.
	Write(ctx context.Context, message Message) error
	// WriteBatch inserts a set of messages into a stream assigned to the message in the StreamRegistry in order to propagate the
	// data to a set of subscribed systems for further processing.
	//
	// Depending on the underlying Writer driver implementation, this function MIGHT return an error if a single operation failed,
	// or it MIGHT return an error if the whole operation failed. In addition, this function will return the number of
	// successful messages written into streams.
	WriteBatch(ctx context.Context, messages ...Message) (uint32, error)
}

Writer inserts messages into streams assigned on the StreamRegistry in order to propagate the data to a set of subscribed systems for further processing.

This type should be provided by a streams Driver (e.g. Apache Pulsar, Apache Kafka, Amazon SNS)

var NoopWriter Writer = noopWriter{}

NoopWriter is the no-operation implementation of Writer

Directories

Path Synopsis
benchmarks
driver
shmemory
Package streams-memory is the In-Memory implementation for Streamhub-based programs.
Package streams-memory is the In-Memory implementation for Streamhub-based programs.
amazon Module
examples

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL