triper

package module
v0.0.0-...-36f5f09 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 28, 2022 License: MIT Imports: 9 Imported by: 0

README

Triper

CQRS/ES toolkit for Go.

CQRS stands for Command Query Responsibility Segregation. It's a pattern that I first heard described by Greg Young. At its heart is the notion that you can use a different model to update information than the model you use to read information.

The mainstream approach people use for interacting with an information system is to treat it as a CRUD datastore. By this I mean that we have mental model of some record structure where we can create new records, read records, update existing records, and delete records when we're done with them. In the simplest case, our interactions are all about storing and retrieving these records.

Event Sourcing ensures that every change to the state of an application is captured in an event object, and that these event objects are themselves stored in the sequence they were applied for the same lifetime as the application state itself.

Examples

bank account shows a full example with deposits and withdrawls.

Usage

There are 3 basic units of work: event, command and aggregate.

Command

A command describes an action that should be performed; it's always named in the imperative tense such as PerformDeposit or CreateAccount.

Let’s start with some code:

import "github.com/mishudark/triper"

// PerformDeposit to an account
type PerformDeposit struct {
	triper.BaseCommand
	Amount int
}

At the beginning, we create the PerformDeposit command. It contains an anonymous struct field of type triper.BaseCommand. This means PerformDeposit automatically acquires all the methods of triper.BaseCommand.

You can also define custom fields, in this case Amount contains a quantity to be deposited into an account.

Event

An event is the notification that something happened in the past. You can view an event as the representation of the reaction to a command after being executed. All events should be represented as verbs in the past tense such as CustomerRelocated, CargoShipped or InventoryLossageRecorded.

We create the DepositPerformed event; it's a pure go struct, and it's the past equivalent to the previous command PerformDeposit:

// DepositPerformed event
type DepositPerformed struct {
	Amount int
}

Aggregate

The aggregate is a logical boundary for things that can change in a business transaction of a given context. In the Triper context, it simplifies the process the commands and produce events.

Show me the code!

import "github.com/mishudark/triper"

//Account of bank
type Account struct {
	triper.BaseAggregate
	Owner   string
	Balance int
}

We create the Account aggregate. It contains an anonymous struct field of type triper.BaseAggregate. This means Account automatically acquires all the methods of triper.BaseAggregate.

Additionally Account has the fields Balance and Owner that represent the basic info of this context.

Now that we have our aggregate, we need to process the PerformDeposit command that we created earlier:

// HandleCommand create events and validate based on such command
func (a *Account) HandleCommand(command triper.Command) error {
	event := triper.Event{
		AggregateID:   a.ID,
		AggregateType: "Account",
	}

	switch c := command.(type) {
	case CreateAccount:
		event.AggregateID = c.AggregateID
		event.Data = &AccountCreated{c.Owner}

	case PerformDeposit:
		event.Data = &DepositPerformed{
			c.Amount,
		}
	}

	a.BaseAggregate.ApplyChangeHelper(a, event, true)
	return nil
}

First, we create an event with the basic info AggregateID as an identifier and AggregateType with the same name as our aggregate. Next, we use a switch to determine the type of the command and produce an event as a result.

Finally, the event should be applied to our aggregate; we use the helper BaseAggregate.ApplyChangeHelper with the params aggregate, event and the last argument set to true, meaning it should be stored and published via event store and event publisher.

Note: triper.BaseAggregate has some helper methods to make our life easier, we use HandleCommand to process a command and produce the respective event.

The last step in the aggregate journey is to apply the events to our aggregate:

// ApplyChange to account
func (a *Account) ApplyChange(event triper.Event) {
	switch e := event.Data.(type) {
	case *AccountCreated:
		a.Owner = e.Owner
		a.ID = event.AggregateID
	case *DepositPerformed:
		a.Balance += e.Amount
	}
}

Also, we use a switch-case format to determine the type of the event (note that events are pointers), and apply the respective changes.

Note: The aggregate is never saved in its current state. Instead, it is stored as a series of events that can recreate the aggregate in its last state.

Saving the events, publishing them, and recreating an aggregate from event store is made by Triper out of the box.

Config

Triper needs to be configured to manage events and commands, and to know where to store and publish events.

Event Store

Currently, it has support for MongoDB. Rethinkdb is in the scope to be added.

We create an event store with config.Mongo; it accepts host, port and table as arguments:

import "github.com/mishudark/triper/config"
...

config.Mongo("localhost", 27017, "bank") // event store

Event Publisher

RabbitMQ and Nats.io are supported.

We create an eventbus with config.Nats, it accepts url data config and useSSL as arguments:

import 	"github.com/mishudark/triper/config"
...

config.Nats("nats://ruser:T0pS3cr3t@localhost:4222", false) // event bus

Wire it all together

Now that we have all the pieces, we can register our events, commands and aggregates:

import (
	"github.com/mishudark/triper"
	"github.com/mishudark/triper/commandhandler/basic"
	"github.com/mishudark/triper/config"
	"github.com/mishudark/triper/examples/bank"
)

func getConfig() (triper.CommandBus, error) {
	// register events
	reg := triper.NewEventRegister()
	reg.Set(bank.AccountCreated{})
	reg.Set(bank.DepositPerformed{})
	reg.Set(bank.WithdrawalPerformed{})

    // wire all parts together
	return config.NewClient(
		config.Mongo("localhost", 27017, "bank"),                    // event store
		config.Nats("nats://ruser:T0pS3cr3t@localhost:4222", false), // event bus
		config.AsyncCommandBus(30),                                  // command bus
		config.WireCommands(
			&bank.Account{},          // aggregate
			basic.NewCommandHandler,  // command handler
			"bank",                   // event store bucket
			"account",                // event store subset
			bank.CreateAccount{},     // command
			bank.PerformDeposit{},    // command
			bank.PerformWithdrawal{}, // command
		),
	)
}

Now you are ready to process commands:

uuid, _ := utils.UUID()

// 1) Create an account
var account bank.CreateAccount
account.AggregateID = uuid
account.Owner = "mishudark"

commandBus.HandleCommand(account)

First, we generate a new UUID. This is because is a new account and we need a unique identifier. After we created the basic structure of our CreateAccount command, we only need to send it using the commandbus created in our config.

Event consumer

You should listen to your eventbus, the format of the event is always the same, only the data key changes in the function of your event struct.

{
  "id": "0000XSNJG0SB2WDBTATBYEC51P",
  "aggregate_id": "0000XSNJG0N0ZVS3YXM4D7ZZ9Z",
  "aggregate_type": "Account",
  "version": 1,
  "type": "AccountCreated",
  "data": {
    "owner": "mishudark"
  }
}

Prior Art

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Dispatch

func Dispatch(aggregate AggregateHandler, event Event)

Dispatch process the event and commit it

func GenerateUUID

func GenerateUUID() string

GenerateUUID returns an ULID id

func GetTypeName

func GetTypeName(source interface{}) (reflect.Type, string)

GetTypeName of given struct

func NewFailure

func NewFailure(err error, typ FailureType, command Command) error

NewFailure returns an alert that implements an error interface

func ReduceHelper

func ReduceHelper(aggregate AggregateHandler, event Event, commit bool)

ReduceHelper increments the version of an aggregate and apply the change itself

Types

type AggregateHandler

type AggregateHandler interface {
	// LoadsFromHistory(events []Event)
	Reduce(event Event) error
	HandleCommand(Command) error
	AddEvent(Event)
	AttachCommandID(id string)
	Uncommited() []Event
	ClearUncommited()
	IncrementVersion()
	GetID() string
	GetVersion() int
	AddError(error)
	GetError() error
	HasError() bool
}

AggregateHandler defines the methods to process commands

type BaseAggregate

type BaseAggregate struct {
	ID      string
	Type    string
	Version int
	Changes []Event
	Error   error
}

BaseAggregate contains the basic info that all aggregates should have

func (*BaseAggregate) AddError

func (b *BaseAggregate) AddError(err error)

AddError to the aggregate

func (*BaseAggregate) AddEvent

func (b *BaseAggregate) AddEvent(event Event)

AddEvent to the aggregate

func (*BaseAggregate) AttachCommandID

func (b *BaseAggregate) AttachCommandID(id string)

AttachCommandID to every change for traceability

func (*BaseAggregate) ClearUncommited

func (b *BaseAggregate) ClearUncommited()

ClearUncommited the events

func (*BaseAggregate) GetError

func (b *BaseAggregate) GetError() error

GetError returns a list of errors

func (*BaseAggregate) GetID

func (b *BaseAggregate) GetID() string

GetID of the current aggregate

func (*BaseAggregate) GetVersion

func (b *BaseAggregate) GetVersion() int

GetVersion of the current aggregate

func (*BaseAggregate) HasError

func (b *BaseAggregate) HasError() bool

HasError returns true if it contains at least one error

func (*BaseAggregate) IncrementVersion

func (b *BaseAggregate) IncrementVersion()

IncrementVersion ads 1 to the current version

func (*BaseAggregate) Uncommited

func (b *BaseAggregate) Uncommited() []Event

Uncommited return the events to be saved

type BaseCommand

type BaseCommand struct {
	ID            string
	Type          string
	AggregateID   string
	AggregateType string
	Version       int
}

BaseCommand contains the basic info that all commands should have

func (*BaseCommand) GenerateUUID

func (b *BaseCommand) GenerateUUID()

GenerateUUID generates an uuid

func (*BaseCommand) GetAggregateID

func (b *BaseCommand) GetAggregateID() string

GetAggregateID returns the command aggregate ID

func (*BaseCommand) GetAggregateType

func (b *BaseCommand) GetAggregateType() string

GetAggregateType returns the command aggregate type

func (*BaseCommand) GetID

func (b *BaseCommand) GetID() string

GetID returns the coomand ID

func (*BaseCommand) GetType

func (b *BaseCommand) GetType() string

GetType returns the command type

func (*BaseCommand) GetVersion

func (b *BaseCommand) GetVersion() int

GetVersion of the command

func (*BaseCommand) IsValid

func (b *BaseCommand) IsValid() bool

IsValid checks validates the command

type Command

type Command interface {
	GetType() string
	GetID() string
	GenerateUUID()
	GetAggregateID() string
	GetAggregateType() string
	IsValid() bool
	GetVersion() int
}

Command contains the methods to retreive basic info about it

type CommandBus

type CommandBus interface {
	HandleCommand(command Command) (id string)
}

CommandBus serve as the bridge between commands and command handler it should manage the queues

type CommandHandler

type CommandHandler interface {
	Handle(command Command) error
}

CommandHandler defines the contract to handle commands

type CommandHandlerRegister

type CommandHandlerRegister interface {
	Add(command interface{}, handler CommandHandler)
	GetHandler(command interface{}) (CommandHandler, error)
}

CommandHandlerRegister stores the handlers for commands

type CommandRegister

type CommandRegister struct {
	// contains filtered or unexported fields
}

CommandRegister contains a registry of command-handler style

func NewCommandRegister

func NewCommandRegister() *CommandRegister

NewCommandRegister creates a new CommandHandler

func (*CommandRegister) Add

func (c *CommandRegister) Add(command interface{}, handler CommandHandler)

Add a new command with its handler

func (*CommandRegister) GetHandler

func (c *CommandRegister) GetHandler(command interface{}) (CommandHandler, error)

GetHandler the handler for a command

type Event

type Event struct {
	ID            string      `json:"id"`
	AggregateID   string      `json:"aggregate_id"`
	AggregateType string      `json:"aggregate_type"`
	CommandID     string      `json:"command_id"`
	Version       int         `json:"version"`
	Type          string      `json:"type"`
	Data          interface{} `json:"data"`
}

Event stores the data for every event

type EventBus

type EventBus interface {
	Publish(event Event, bucket, subset string) error
}

EventBus defines the methods for manage the events publisher and consumer

type EventStore

type EventStore interface {
	Save(events []Event, version int) error
	SafeSave(events []Event, version int) error
	Load(aggregateID string) ([]Event, error)
}

EventStore saves the events from an aggregate

type EventType

type EventType struct {
	// contains filtered or unexported fields
}

EventType implements the EventyTypeRegister interface

func (*EventType) Count

func (e *EventType) Count() int

Count the quantity of events registered

func (*EventType) Events

func (e *EventType) Events() []string

Events registered

func (*EventType) Get

func (e *EventType) Get(name string) (interface{}, error)

Get a type based on its name

func (*EventType) Set

func (e *EventType) Set(source interface{})

Set a new type

type EventTypeRegister

type EventTypeRegister interface {
	Register
	Events() []string
}

EventTypeRegister defines the register for all the events that are Data field child of event struct

func NewEventRegister

func NewEventRegister() EventTypeRegister

NewEventRegister gets a EventyTypeRegister interface

type Failure

type Failure struct {
	CommandID      string      `json:"command_id"`
	CommandType    string      `json:"command_type"`
	CommandVersion int         `json:"command_version"`
	AggregateID    string      `json:"aggregate_id"`
	AggregateType  string      `json:"aggregate_type"`
	Type           FailureType `json:"type"`
	Err            error       `json:"error"`
}

Failure is an error while the command is being processed

func (Failure) Error

func (f Failure) Error() string

type FailureType

type FailureType string

FailureType defines the alert(error) type while a command is being processed

const (
	FailureLoadingEvents     FailureType = "loading_events"
	FailureReplayingEvents   FailureType = "replying_events"
	FailureProcessingCommand FailureType = "processing_command"
	FailureInvalidID         FailureType = "invalid_id"
	FailureSavingOnStorage   FailureType = "saving_on_storage"
	FailurePublishingEvents  FailureType = "publishing_events"
	FailureVersionMissmatch  FailureType = "version_missmatch"
)

nolint

type Register

type Register interface {
	Set(source interface{})
	Get(name string) (interface{}, error)
	Count() int
}

Register defines generic methods to create a registry

type Repository

type Repository struct {
	// contains filtered or unexported fields
}

Repository is responsible to generate an Aggregate save events and publish it

func NewRepository

func NewRepository(store EventStore, bus EventBus) *Repository

NewRepository creates a repository wieh a eventstore and eventbus access

func (*Repository) Load

func (r *Repository) Load(aggregate AggregateHandler, id string) error

Load restore the last state of an aggregate

func (*Repository) PublishError

func (r *Repository) PublishError(err error, command Command, bucket, subset string) error

PublishError to an eventBus

func (*Repository) PublishEvents

func (r *Repository) PublishEvents(aggregate AggregateHandler, bucket, subset string) error

PublishEvents to an eventBus

func (*Repository) SafeSave

func (r *Repository) SafeSave(aggregate AggregateHandler, version int) error

SafeSave the events without check the version

func (*Repository) Save

func (r *Repository) Save(aggregate AggregateHandler, version int) error

Save the events and publish it to eventbus

Directories

Path Synopsis
commandbus
commandhandler
eventstore
examples

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL