ego

package module
v1.1.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 8, 2024 License: MIT Imports: 20 Imported by: 0

README

eGo

build Go Reference GitHub go.mod Go version codecov

eGo is a minimal library that help build event-sourcing and CQRS application through a simple interface, and it allows developers to describe their commands, events and states are defined using google protocol buffers. Under the hood, ego leverages Go-Akt to scale out and guarantee performant, reliable persistence.

Features

Domain Entity/Aggregate Root

The aggregate root is crucial for maintaining data consistency, especially in distributed systems. It defines how to handle the various commands (requests to perform actions) that are always directed at the aggregate root. In eGo commands sent the aggregate root are processed in order. When a command is processed, it may result in the generation of events, which are then stored in an event store. Every event persisted has a revision number and timestamp that can help track it. The aggregate root in eGo is responsible for defining how to handle events that are the result of command handlers. The end result of events handling is to build the new state of the aggregate root. When running in cluster mode, aggregate root are sharded.

  • Commands handler: The command handlers define how to handle each incoming command, which validations must be applied, and finally, which events will be persisted if any. When there is no event to be persisted a nil can be returned as a no-op. Command handlers are the meat of the event sourced actor. They encode the business rules of your event sourced actor and act as a guardian of the Aggregate consistency. The command handler must first validate that the incoming command can be applied to the current model state. Any decision should be solely based on the data passed in the commands and the state of the Behavior. In case of successful validation, one or more events expressing the mutations are persisted. Once the events are persisted, they are applied to the state producing a new valid state.
  • Events handler: The event handlers are used to mutate the state of the Aggregate by applying the events to it. Event handlers must be pure functions as they will be used when instantiating the Aggregate and replaying the event store.

To define an Aggregate Root, one needs to:

  1. the state of the aggregate root using google protocol buffers message
  2. the various commands that will be handled by the aggregate root
  3. the various events that are result of the command handlers and that will be handled by the aggregate root to return the new state of the aggregate root
  4. implements the EntityBehavior[T State] interface where T is the generated golang struct of the prior defined aggregate root state.
Events Stream

Every event handled by Aggregate Root are pushed to an events stream. That enables real-time processing of events without having to interact with the events store

Projection

One can add a projection to the eGo engine to help build a read model. Projections in eGo rely on an offset store to track how far they have consumed events persisted by the write model. The offset used in eGo is a timestamp-based offset.

Events Store

One can implement a custom events store. See EventsStore. eGo comes packaged with two events store:

Offsets Store

One can implement a custom offsets store. See OffsetStore. eGo comes packaged with two offset store:

Cluster

The cluster mode heavily relies on Go-Akt clustering.

Examples

Check the examples

Installation

go get github.com/tochemey/ego

Sample

package main

import (
	"context"
	"errors"
	"log"
	"os"
	"os/signal"
	"syscall"

	"github.com/google/uuid"
	"github.com/tochemey/ego"
	"github.com/tochemey/ego/eventstore/memory"
	samplepb "github.com/tochemey/ego/example/pbs/sample/pb/v1"
	"google.golang.org/protobuf/proto"
)

func main() {
	// create the go context
	ctx := context.Background()
	// create the event store
	eventStore := memory.NewEventsStore()
	// create the ego engine
	e := ego.NewEngine("Sample", eventStore)
	// start ego engine
	_ = e.Start(ctx)
	// create a persistence id
	entityID := uuid.NewString()
	// create an entity behavior with a given id
	behavior := NewAccountBehavior(entityID)
	// create an entity
	entity, _ := ego.NewEntity[*samplepb.Account](ctx, behavior, e)

	// send some commands to the pid
	var command proto.Message
	// create an account
	command = &samplepb.CreateAccount{
		AccountId:      entityID,
		AccountBalance: 500.00,
	}
	// send the command to the actor. Please don't ignore the error in production grid code
	account, _, _ := entity.SendCommand(ctx, command)

	log.Printf("current balance: %v", account.GetAccountBalance())

	// send another command to credit the balance
	command = &samplepb.CreditAccount{
		AccountId: entityID,
		Balance:   250,
	}
	account, _, _ = entity.SendCommand(ctx, command)
	log.Printf("current balance: %v", account.GetAccountBalance())

	// capture ctrl+c
	interruptSignal := make(chan os.Signal, 1)
	signal.Notify(interruptSignal, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
	<-interruptSignal

	// stop the actor system
	_ = e.Stop(ctx)
	os.Exit(0)
}

// AccountBehavior implements persistence.Behavior
type AccountBehavior struct {
	id string
}

// make sure that AccountBehavior is a true persistence behavior
var _ ego.EntityBehavior[*samplepb.Account] = &AccountBehavior{}

// NewAccountBehavior creates an instance of AccountBehavior
func NewAccountBehavior(id string) *AccountBehavior {
	return &AccountBehavior{id: id}
}

// ID returns the id
func (a *AccountBehavior) ID() string {
	return a.id
}

// InitialState returns the initial state
func (a *AccountBehavior) InitialState() *samplepb.Account {
	return new(samplepb.Account)
}

// HandleCommand handles every command that is sent to the persistent behavior
func (a *AccountBehavior) HandleCommand(_ context.Context, command ego.Command, _ *samplepb.Account) (event ego.Event, err error) {
	switch cmd := command.(type) {
	case *samplepb.CreateAccount:
		// TODO in production grid app validate the command using the prior state
		return &samplepb.AccountCreated{
			AccountId:      cmd.GetAccountId(),
			AccountBalance: cmd.GetAccountBalance(),
		}, nil

	case *samplepb.CreditAccount:
		// TODO in production grid app validate the command using the prior state
		return &samplepb.AccountCredited{
			AccountId:      cmd.GetAccountId(),
			AccountBalance: cmd.GetBalance(),
		}, nil

	default:
		return nil, errors.New("unhandled command")
	}
}

// HandleEvent handles every event emitted
func (a *AccountBehavior) HandleEvent(_ context.Context, event ego.Event, priorState *samplepb.Account) (state *samplepb.Account, err error) {
	switch evt := event.(type) {
	case *samplepb.AccountCreated:
		return &samplepb.Account{
			AccountId:      evt.GetAccountId(),
			AccountBalance: evt.GetAccountBalance(),
		}, nil

	case *samplepb.AccountCredited:
		bal := priorState.GetAccountBalance() + evt.GetAccountBalance()
		return &samplepb.Account{
			AccountId:      evt.GetAccountId(),
			AccountBalance: bal,
		}, nil

	default:
		return nil, errors.New("unhandled event")
	}
}

Contribution

Contributions are welcome! The project adheres to Semantic Versioning and Conventional Commits. This repo uses Earthly.

To contribute please:

  • Fork the repository
  • Create a feature branch
  • Submit a pull request
Test & Linter

Prior to submitting a pull request, please run:

earthly +test

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrEngineRequired is returned when the eGo engine is not set
	ErrEngineRequired = errors.New("eGo engine is not defined")
	// ErrEngineNotStarted is returned when the eGo engine has not started
	ErrEngineNotStarted = errors.New("eGo engine has not started")
	// ErrUndefinedEntity is returned when sending a command to an undefined entity
	ErrUndefinedEntity = errors.New("eGo entity is not defined")
)

Functions

This section is empty.

Types

type Command

type Command proto.Message

type Engine

type Engine struct {
	// contains filtered or unexported fields
}

Engine represents the engine that empowers the various entities

func NewEngine

func NewEngine(name string, eventsStore eventstore.EventsStore, opts ...Option) *Engine

NewEngine creates an instance of Engine

func (*Engine) AddProjection added in v1.1.0

func (x *Engine) AddProjection(ctx context.Context, name string, handler projection.Handler, offsetStore offsetstore.OffsetStore, opts ...projection.Option) error

AddProjection add a projection to the running eGo engine and start it

func (*Engine) Start

func (x *Engine) Start(ctx context.Context) error

Start starts the ego engine

func (*Engine) Stop

func (x *Engine) Stop(ctx context.Context) error

Stop stops the ego engine

func (*Engine) Subscribe added in v1.1.0

func (x *Engine) Subscribe(ctx context.Context) (eventstream.Subscriber, error)

Subscribe creates an events subscriber

type Entity

type Entity[T State] struct {
	// contains filtered or unexported fields
}

Entity defines the event sourced persistent entity This handles commands in order

func NewEntity

func NewEntity[T State](ctx context.Context, behavior EntityBehavior[T], engine *Engine) (*Entity[T], error)

NewEntity creates an instance of Entity

func (Entity[T]) SendCommand

func (x Entity[T]) SendCommand(ctx context.Context, command Command) (resultingState T, revision uint64, err error)

SendCommand sends command to a given entity ref. This will return: 1. the resulting state after the command has been handled and the emitted event persisted 2. nil when there is no resulting state or no event persisted 3. an error in case of error

type EntityBehavior

type EntityBehavior[T State] interface {
	// ID defines the id that will be used in the event journal.
	// This helps track the entity in the events store.
	ID() string
	// InitialState returns the event sourced actor initial state.
	// This is set as the initial state when there are no snapshots found the entity
	InitialState() T
	// HandleCommand helps handle commands received by the event sourced actor. The command handlers define how to handle each incoming command,
	// which validations must be applied, and finally, which events will be persisted if any. When there is no event to be persisted a nil can
	// be returned as a no-op. Command handlers are the meat of the event sourced actor.
	// They encode the business rules of your event sourced actor and act as a guardian of the event sourced actor consistency.
	// The command eventSourcedHandler must first validate that the incoming command can be applied to the current model state.
	//  Any decision should be solely based on the data passed in the commands and the state of the Behavior.
	// In case of successful validation, one or more events expressing the mutations are persisted.
	// Once the events are persisted, they are applied to the state producing a new valid state.
	HandleCommand(ctx context.Context, command Command, priorState T) (event Event, err error)
	// HandleEvent handle events emitted by the command handlers. The event handlers are used to mutate the state of the event sourced actor by applying the events to it.
	// Event handlers must be pure functions as they will be used when instantiating the event sourced actor and replaying the event journal.
	HandleEvent(ctx context.Context, event Event, priorState T) (state T, err error)
}

EntityBehavior defines an event sourced behavior when modeling a CQRS EntityBehavior.

type Event

type Event proto.Message

type Option

type Option interface {
	// Apply sets the Option value of a config.
	Apply(e *Engine)
}

Option is the interface that applies a configuration option.

func WithCluster

func WithCluster(discoProvider discovery.Provider, config discovery.Config, partitionsCount uint64) Option

WithCluster enables cluster mode

func WithLogger

func WithLogger(logger log.Logger) Option

WithLogger sets the logger

func WithTelemetry

func WithTelemetry(telemetry *telemetry.Telemetry) Option

WithTelemetry sets the telemetry engine

type OptionFunc

type OptionFunc func(e *Engine)

OptionFunc implements the Option interface.

func (OptionFunc) Apply

func (f OptionFunc) Apply(e *Engine)

Apply applies the options to Engine

type State

type State proto.Message

Directories

Path Synopsis
internal
test

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL