eda

package module
v0.0.0-...-090737f Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 20, 2017 License: MIT Imports: 14 Imported by: 0

README

eda

License MIT Go Report Card Build Status GoDoc

eda is a library for implementing event-driven architectures. It provides a thin layer on top of backends that support ordered, durable streams with a publish/subscribe interface. The current implementation uses NATS Streaming as a backend.

Status

The library is in an Alpha stage and looking feedback and discussions on API design and scope. Check out the issues for topics.

Use Case

The primary use case this library is being designed to support are applications involving "domain events". That is, these events carry information about something that occurred in a domain model that must be made available for other consumers.

One application of this is as a building block for systems using CQRS pattern where events produced on the write side (a result of handling a command) need to get published so the read side can consume and update their internal indexes.

Another related use case is Event Sourcing which are generally spoken of in the context of an "aggregate". The pattern requires each aggregate instance to maintain it's own stream of events acting as an internal changelog. This stream is generally "private" from other consumers and requires having a single handler to apply events in order to maintain a consistent internal state.

This library could be used for this, but the backends do not currently generalize well to 10's or 100's of thousands of streams. One strategy is "multi-plex" events from multiple aggregates on a single stream and have handlers that ignore events that are specific to the target aggregate. The basic trade-off are the number of streams (which may be limited by the backend) and the latency of reading events on a multi-plexed stream.

Examples

See the examples directory.

Install

Requires Go 1.9+

go get -u github.com/chop-dbhi/eda/...

Backend

This library relies on NATS Streaming as a backend. There are ways of running the server:

In any case, the suggested command line options for full durability:

$ nats-streaming-server \
  --cluster_id test-cluster \
  --store file \
  --dir data \
  --max_channels 0 \
  --max_subs 0 \
  --max_msgs 0 \
  --max_bytes 0 \
  --max_age 0s \

Get Started

Connecting to the backend

The first step is to establish a connection to the NATS Streaming server. Below uses the default address and cluster ID. Connect also takes a client ID which identifies the connection itself. Be thoughtful of the client ID since it will be used as part of the key for tracking progress in streams for subscribers. It is also added to events published by this connection for traceability.

// Establish a connection to NATS specifiying the server address, the cluster
// ID , and a client ID for this connection.
conn, _ := eda.Connect(
  "nats://localhost:4222",
  "test-cluster",
  "test-client",
)

// Close on exit.
defer conn.Close()
Publishing events

To publish an event, simply use the Publish method passing an Event value.

id, _ := conn.Publish("subjects", &eda.Event{
  Type: "subject-enrolled",
  Data: eda.JSON(&Subject{
    ID: "3292329",
  }),
})

By convention, Type should be past tense since it is describing something that already happened. The event Data should provide sufficient information on the event for consumers. There are helper functions to encode bytes, strings, JSON, and Protocol Buffer types.

A couple additional fields can be supplied including Cause which is an identifier of the upstream event (or something else) that caused this event and Meta which is a map of arbitrary key-value pairs for additional information.

Returned is the unique ID of the event and an error if publishing to the server failed.

Consuming events

The first thing to do is create a Handler function that will be used to handle events as they are received from the server. You can see the signature of the handler below which includes a context.Context value and the received event.

handle := func(ctx context.Context, evt *eda.Event) error {
  switch evt.Type {
  case "subject-enrolled":
    var s Subject
    if err := evt.Data.Decode(&s); err != nil {
      return err
    }

    // Do something with subject.
  }

  return nil
}

To start receiving events, a subscription needs to be created which takes the name of the stream to subscribe to, the handler, and subscription options.

sub, _ := conn.Subscribe("subjects", handle, *eda.SubscriptionOptions{
  Backfill: true,
  Durable: true,
  Serial: true,
})
defer sub.Close()

In this case, we want to ensure we process events in order even if an error occurs. We want to read the backfill of events in the stream to "catch-up" to the current state, and make the subscription durable so reconnects start where they are left off.

Learn More

License

MIT

Documentation

Overview

The eda package is a library for implementing event-driven architectures. It provides a thin layer on top of backends that support ordered streams with a publish/subscribe interface. The current implementation uses NATS Streaming: https://github.com/nats-io/nats-streaming-server, but additional backends could be supported.

Use Case

The primary use case this library is being designed to support are applications involving "domain events". That is, these events carry information about something that occurred in a domain model that must be made available for other consumers.

One application of this is as a building block for systems using CQRS pattern where events produced on the write side (a result of handling a command) need to get published so the read side can consume and update their internal indexes.

Another related use case is Event Sourcing which are generally spoken of in the context of an "aggregate". The pattern requires each aggregate instance to maintain it's own stream of events acting as an internal changelog. This stream is generally "private" from other consumers and requires having a single handler to apply events in order to maintain a consistent internal state.

This library could be used for this, but the backends do not currently generalize well to 10's or 100's of thousands of streams. One strategy is "multi-plex" events from multiple aggregates on a single stream and have handlers that ignore events that are specific to the target aggregate. The basic trade-off are the number of streams (which may be limited by the backend) and the latency of reading events on a multi-plexed stream.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Conn

type Conn interface {
	// Publish publishes an event to the specified stream. It returns the ID of the event.
	Publish(stream string, evt *Event) (string, error)

	// Subscribe creates a subscription to the stream and associates the handler.
	Subscribe(stream string, handle Handler, opts *SubscriptionOptions) (Subscription, error)

	// Close closes the connection.
	Close() error
}

Conn is a connection interface to the underlying event streams backend.

func Connect

func Connect(addr, cluster, client string, opts ...ConnectOption) (Conn, error)

Connect establishes a connection to the streaming backend.

type ConnectOption

type ConnectOption func(o *ConnectOptions)

func WithLogger

func WithLogger(l Logger) ConnectOption

type ConnectOptions

type ConnectOptions struct {
	Logger Logger
}

func (*ConnectOptions) Apply

func (o *ConnectOptions) Apply(opts ...ConnectOption)

type Data

type Data interface {
	// Type returns the encoding type used to encode the data to bytes.
	Type() string

	// Decodes the underlying bytes into the passed value pointer.
	Decode(v interface{}) error

	// Encode encodes the underlying type into bytes.
	Encode() ([]byte, error)
}

Data encapsulates a value with a known encoding scheme.

func Bytes

func Bytes(b []byte) Data

Bytes returns Data that encodes and decodes the raw bytes.

func JSON

func JSON(v interface{}) Data

JSON returns Data that encodes and decodes the JSON-encodable value.

func Proto

func Proto(m proto.Message) Data

Proto returns Data that encodes and decodes the proto message.

func String

func String(s string) Data

String returns Data that encodes and decodes a string.

type Event

type Event struct {
	// Stream is the stream this event was published on.
	Stream string `json:"stream"`

	// ID is the globally unique ID of the event.
	ID string `json:"id"`

	// Type is the event type.
	Type string `json:"type"`

	// Time when the event was published.
	Time time.Time `json:"time"`

	// Time the event was acknowledged by the server.
	AckTime time.Time `json:"ack_time"`

	// Data is the event data.
	Data Data `json:"data"`

	// Schema is an identifier of the data schema.
	Schema string `json:"schema"`

	// Client is the ID of the client that produced this event.
	Client string `json:"client"`

	// Cause is the ID of the event that caused/resulted in this event
	// being produced.
	Cause string `json:"cause"`

	Aggregate string `json:"aggregate"`

	// Meta supports arbitrary key-value information associated with the event.
	Meta map[string]string `json:"meta,omitempty"`
	// contains filtered or unexported fields
}

Event is the top-level type that wraps the event data.

func (*Event) Is

func (e *Event) Is(types ...string) bool

IsType returns true if the event is one of the passed types.

type Handler

type Handler func(ctx context.Context, evt *Event) error

Handler is the event handler type for creating subscriptions.

type Logger

type Logger interface {
	Print(v ...interface{})
	Printf(f string, v ...interface{})
}

Logger is a minimal interface required for internal logging. This is compatible with the stdlib log.Logger type.

type Subscription

type Subscription interface {
	// Unsubscribe closes the subscription and resets the offset.
	Unsubscribe() error

	// Close closes the subscription and retains the offset.
	Close() error
}

type SubscriptionOptions

type SubscriptionOptions struct {
	// Unique name of the subscriber. This is used to keep track of the
	// the offset of messages for a stream. This defaults to the stream name.
	Name string

	// If true, a new subscription will be send the entire backlog of events
	// in the stream. This useful for
	Backfill bool

	// If true, the stream offset will be tracked for the subscriber. Upon
	// reconnect, the next message from the offset will be received.
	Durable bool

	// If true and the subscriber had a durable subscription, this will reset the
	// durable subscription. The effect is that all events from the specified
	// start position or time will be replayed.
	Reset bool

	// If true, a new event will be processed only if/when the previous event
	// was handled successfully and acknowledged. If events should be processed
	// in order, one at a time, then this should be set to true.
	Serial bool

	// The maximum time to wait before acknowledging an event was handled.
	// If the timeout is reached, the server will redeliver the event.
	Timeout time.Duration
}

Directories

Path Synopsis
examples
internal
pb
Package pb is a generated protocol buffer package.
Package pb is a generated protocol buffer package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL