event

package module
v0.1.14 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 20, 2022 License: MIT Imports: 26 Imported by: 2

README

event

Description

A simple event-sourcing library with an embeddable event-store based on sqlite3.

Examples

  • examples/user shows an example on how the library may be used in an event-sourcing scenario. This includes the definition of domain events and an aggregate, the saving and loading of this aggregate and creating projections.

Documentation

Index

Constants

View Source
const (
	HeaderEtag        = "Etag"
	HeaderLongPoll    = "Long-Poll"
	HeaderIfNoneMatch = "If-None-Match"
)
View Source
const (
	All = "$all"
)

Variables

This section is empty.

Functions

func ChunkedRecordStream added in v0.1.12

func ChunkedRecordStream(in RecordStream, batchSize int, timeout time.Duration) <-chan Records

func ContainsEventWithIndex

func ContainsEventWithIndex(page hyper.Item, i uint64) bool

func Decode

func Decode(data json.RawMessage, v interface{}) error

func Encode

func Encode(v interface{}) (json.RawMessage, error)

func Follow

func Follow() func(*Streamer) error

func From

func From(version uint64) func(*Streamer) error

func FromCurrent

func FromCurrent() func(*Streamer) error

func HandleGETStream

func HandleGETStream(store Store, stream string) func(w http.ResponseWriter, r *http.Request)

func Named

func Named(name string) func(*Streamer) error

func Response added in v0.1.5

func Response(msg string, errs ...error) hyper.Item

func Timeout

func Timeout(d time.Duration) func(*Streamer) error

func UseClient

func UseClient(client *http.Client) func(*Streamer) error

Types

type BasicStore added in v0.1.5

type BasicStore struct {
	// contains filtered or unexported fields
}

func NewBasicStore added in v0.1.5

func NewBasicStore(dataSourceName string) (*BasicStore, error)

func (*BasicStore) Append added in v0.1.5

func (s *BasicStore) Append(streamID string, expectedVersion uint64, records Records) error

func (*BasicStore) Close added in v0.1.5

func (s *BasicStore) Close() error

func (*BasicStore) Load added in v0.1.5

func (s *BasicStore) Load(streamID string) RecordStream

func (*BasicStore) LoadFrom added in v0.1.5

func (s *BasicStore) LoadFrom(streamID string, skip uint64) RecordStream

func (*BasicStore) LoadSlice added in v0.1.5

func (s *BasicStore) LoadSlice(streamID string, skip uint64, limit uint64) (*Slice, error)

func (*BasicStore) SubscribeToStream added in v0.1.5

func (s *BasicStore) SubscribeToStream(streamID string) Subscription

func (*BasicStore) SubscribeToStreamFrom added in v0.1.5

func (s *BasicStore) SubscribeToStreamFrom(streamID string, version uint64) Subscription

func (*BasicStore) SubscribeToStreamFromCurrent added in v0.1.5

func (s *BasicStore) SubscribeToStreamFromCurrent(streamID string) Subscription

func (*BasicStore) Version added in v0.1.5

func (s *BasicStore) Version(streamID string) uint64

type ChangeRecorder

type ChangeRecorder struct {
	// contains filtered or unexported fields
}

ChangeRecorder can be used to represent a unit of work within an event sourced system.

func NewChangeRecorder

func NewChangeRecorder() *ChangeRecorder

func (*ChangeRecorder) Changes

func (cr *ChangeRecorder) Changes() Events

Changes retrieves all currently pending changes.

func (*ChangeRecorder) ClearChanges

func (cr *ChangeRecorder) ClearChanges()

ClearChanges will remove all pending chnages from the underlying data structure.

func (*ChangeRecorder) Record

func (cr *ChangeRecorder) Record(e Event)

Record will add an event to the list of pending changes.

type ChunkedStore added in v0.1.7

type ChunkedStore struct {
	// contains filtered or unexported fields
}

func NewChunkedStore added in v0.1.7

func NewChunkedStore(dataSourceName string) (*ChunkedStore, error)

func (*ChunkedStore) Append added in v0.1.7

func (s *ChunkedStore) Append(streamID string, expectedVersion uint64, records Records) error

func (*ChunkedStore) Close added in v0.1.7

func (s *ChunkedStore) Close() error

func (*ChunkedStore) Load added in v0.1.7

func (s *ChunkedStore) Load(streamID string) RecordStream

func (*ChunkedStore) LoadFrom added in v0.1.7

func (s *ChunkedStore) LoadFrom(streamID string, skip uint64) RecordStream

func (*ChunkedStore) LoadSlice added in v0.1.7

func (s *ChunkedStore) LoadSlice(streamID string, skip uint64, limit uint64) (*Slice, error)

func (*ChunkedStore) SubscribeToStream added in v0.1.7

func (s *ChunkedStore) SubscribeToStream(streamID string) Subscription

func (*ChunkedStore) SubscribeToStreamFrom added in v0.1.7

func (s *ChunkedStore) SubscribeToStreamFrom(streamID string, version uint64) Subscription

func (*ChunkedStore) SubscribeToStreamFromCurrent added in v0.1.7

func (s *ChunkedStore) SubscribeToStreamFromCurrent(streamID string) Subscription

func (*ChunkedStore) Version added in v0.1.7

func (s *ChunkedStore) Version(streamID string) uint64

type Codec

type Codec struct {
	*io.TypeRegistry
	// contains filtered or unexported fields
}

func NewCodec

func NewCodec(opts ...CodecOption) *Codec

NewCodec creates a new Codec to encode Events into Records and decode Records into Events.

func (*Codec) Decode

func (c *Codec) Decode(record Record) (Event, error)

func (*Codec) DecodeAll

func (c *Codec) DecodeAll(records Records) (Events, error)

func (*Codec) Encode

func (c *Codec) Encode(event Event, muts ...RecordMutation) (Record, error)

func (*Codec) EncodeAll

func (c *Codec) EncodeAll(events Events, muts ...RecordMutation) (Records, error)

type CodecOption added in v0.1.9

type CodecOption func(*Codec)

func ExtractID added in v0.1.9

func ExtractID(fn ExtractIDFunc) CodecOption

ExtractID sets an ExtractIDFunc for the codec

type Event

type Event interface{}

type EventStream

type EventStream <-chan Event

type Events

type Events []Event

func (Events) Stream

func (es Events) Stream() EventStream

type ExtractIDFunc added in v0.1.9

type ExtractIDFunc func(e Event) string

func IDByField added in v0.1.9

func IDByField(fieldName string) ExtractIDFunc

IDByField creates an ExtractIDFunc that uses reflection to extract a string from a field with the fieldName. If that field is not present or the extracted string is empty a uuid v4 will be generated as a result.

type Feeder

type Feeder struct {
	Store    Store
	StreamID string
	PageSize uint64
}

func NewFeeder

func NewFeeder(store Store, streamID string) *Feeder

func (*Feeder) Page

func (f *Feeder) Page(url *url.URL) hyper.Item

type OptimisticConcurrencyError

type OptimisticConcurrencyError struct {
	Stream   string
	Expected uint64
	Actual   uint64
}

func (OptimisticConcurrencyError) Error

type Record

type Record struct {
	ID                string          `json:"id,omitempty"`        // the unique id of the event
	StreamID          string          `json:"stream-id"`           // the id of the current stream
	StreamIndex       uint64          `json:"stream-index"`        // the index of the event within the current stream
	OriginStreamID    string          `json:"origin-stream-id"`    // the id of the origin stream
	OriginStreamIndex uint64          `json:"origin-stream-index"` // the index of the event within the origin stream
	RecordedOn        time.Time       `json:"recorded-on"`         // the time the event was first recorded
	Type              string          `json:"type"`                // the type of the event
	Data              json.RawMessage `json:"data,omitempty"`      // the data of the event
	Metadata          json.RawMessage `json:"metadata,omitempty"`  // metadata to the event
}

type RecordMutation

type RecordMutation func(r *Record)

func WithMetadata

func WithMetadata(v interface{}) RecordMutation

type RecordStream

type RecordStream <-chan Record

func (RecordStream) Records added in v0.1.7

func (rs RecordStream) Records() Records

type Records

type Records []Record

func (Records) Stream

func (rs Records) Stream() RecordStream

type Server added in v0.1.5

type Server struct {
	// contains filtered or unexported fields
}

func NewServer added in v0.1.5

func NewServer(store Store) (*Server, error)

func (*Server) Close added in v0.1.5

func (s *Server) Close() error

func (*Server) ServeHTTP added in v0.1.13

func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request)

type Slice

type Slice struct {
	StreamID      string  `json:"stream-id"`
	From          uint64  `json:"from"`
	Next          uint64  `json:"next"`
	Records       Records `json:"records"`
	IsEndOfStream bool    `json:"is-end-of-stream"`
}

type Store

type Store interface {
	io.Closer
	Version(streamID string) uint64
	Load(streamID string) RecordStream
	LoadFrom(streamID string, skip uint64) RecordStream
	LoadSlice(streamID string, skip uint64, limit uint64) (*Slice, error)
	Append(streamID string, expectedVersion uint64, records Records) error
	SubscribeToStream(streamID string) Subscription
	SubscribeToStreamFrom(streamID string, version uint64) Subscription
	SubscribeToStreamFromCurrent(streamID string) Subscription
}

type Streamer

type Streamer struct {
	// contains filtered or unexported fields
}

func NewStreamer

func NewStreamer(url string, opts ...StreamerOption) (*Streamer, error)

func (*Streamer) Close

func (s *Streamer) Close() error

func (*Streamer) Stream

func (s *Streamer) Stream() RecordStream

func (*Streamer) URL

func (s *Streamer) URL() string

type StreamerOption added in v0.1.9

type StreamerOption func(*Streamer) error

type Subscription

type Subscription interface {
	Records() RecordStream
	On(callback func(r Record))
	Cancel() error
}

func SubscribeToStream

func SubscribeToStream(url string, auxOptions ...StreamerOption) (Subscription, error)

func SubscribeToStreamFrom

func SubscribeToStreamFrom(url string, version uint64, auxOptions ...StreamerOption) (Subscription, error)

Directories

Path Synopsis
cmd
es
examples

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL