persistence

package
v0.0.0-...-83edfdb Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 17, 2021 License: Apache-2.0 Imports: 21 Imported by: 0

Documentation

Overview

Package persistence is a generated protocol buffer package.

It is generated from these files:

testprotos.proto

It has these top-level messages:

TestEvent
TestSnapshot

Index

Constants

This section is empty.

Variables

View Source
var ErrMarshalling = fmt.Errorf("Persistence provider failed with marshalling error")

ErrMarshalling will be provided to panic on marshalling failures

View Source
var ErrPersistenceFailed = fmt.Errorf("Persistence provider failed to persist event")

ErrPersistenceFailed is the panic reason if PersistEvent fails to write to persistence provider

View Source
var ErrPersistingSnapshot = fmt.Errorf("Persistence provider failed to persist snapshot")

ErrPersistingSnapshot will be provided to panic on PersistSnapshot failures

View Source
var ErrReadingEvents = fmt.Errorf("Persistence provider failed to read events")

ErrReadingEvents is the panic reason if GetEvents fails to read from persistence provider

Functions

func CreateDBConnection

func CreateDBConnection(url *url.URL) (*sqlx.DB, error)

CreateDBConnection sets up a DB connection and ensures required tables exist

func ResetTestDB

func ResetTestDB(dbPath string)

ResetTestDB clears the test database

func TestDBURL

func TestDBURL() (*url.URL, string)

TestDBURL is the location of the test DB

func Using

func Using(provider Provider) func(next actor.ActorFunc) actor.ActorFunc

Using adds the persistence provider to a given actor

Types

type InMemoryProvider

type InMemoryProvider struct {
	// contains filtered or unexported fields
}

InMemoryProvider is a proto.actor persistence provider

func (*InMemoryProvider) GetEvents

func (provider *InMemoryProvider) GetEvents(actorName string, eventIndexStart int, callback func(index int, e interface{}))

GetEvents implements ProviderState.GetEvents

func (*InMemoryProvider) GetSnapshot

func (provider *InMemoryProvider) GetSnapshot(actorName string) (snapshot interface{}, eventIndex int, ok bool)

GetSnapshot implements ProviderState.GetSnapshot

func (*InMemoryProvider) GetSnapshotInterval

func (provider *InMemoryProvider) GetSnapshotInterval() int

GetSnapshotInterval implements ProviderState.GetSnapshotInterval

func (*InMemoryProvider) PersistEvent

func (provider *InMemoryProvider) PersistEvent(actorName string, eventIndex int, event proto.Message)

PersistEvent implements ProviderState.PersistEvent

func (*InMemoryProvider) PersistSnapshot

func (provider *InMemoryProvider) PersistSnapshot(actorName string, eventIndex int, snapshot proto.Message)

PersistSnapshot implements ProviderState.PersistSnapshot

func (*InMemoryProvider) Restart

func (provider *InMemoryProvider) Restart()

Restart implements ProviderStage.Restart

type Mixin

type Mixin struct {
	// contains filtered or unexported fields
}

Mixin is the persistence mixin for actors

func (*Mixin) Name

func (mixin *Mixin) Name() string

Name is the actors' persistence name

func (*Mixin) PersistReceive

func (mixin *Mixin) PersistReceive(message proto.Message)

PersistReceive saves an event to the actors journal

func (*Mixin) PersistSnapshot

func (mixin *Mixin) PersistSnapshot(snapshot proto.Message)

PersistSnapshot overwrites an actor's current snapshot

func (*Mixin) Recovering

func (mixin *Mixin) Recovering() bool

Recovering indicates if this actor is recovering (in which all messages are replays) or not

type Provider

type Provider interface {
	GetState() ProviderState
}

Provider is the abstraction used for persistence

type ProviderState

type ProviderState interface {
	Restart()
	GetSnapshotInterval() int
	GetSnapshot(actorName string) (snapshot interface{}, eventIndex int, ok bool)
	GetEvents(actorName string, eventIndexStart int, callback func(messageIndex int, e interface{}))
	PersistEvent(actorName string, eventIndex int, event proto.Message)
	PersistSnapshot(actorName string, eventIndex int, snapshot proto.Message)
}

ProviderState is the contract with a given persistence provider

func NewInMemoryProvider

func NewInMemoryProvider(snapshotInterval int) ProviderState

NewInMemoryProvider creates a new in mem provider

func NewSQLProvider

func NewSQLProvider(db *sqlx.DB, snapshotInterval int) (ProviderState, error)

NewSQLProvider creates a journal/snapshot provider with an SQL db backing it

type StreamCallBack

type StreamCallBack func(event *StreamEvent)

StreamCallBack is a callback when an event is delivered to a persistence stream

type StreamEvent

type StreamEvent struct {
	ActorName  string
	EventIndex int
	Event      proto.Message
}

StreamEvent adds metadata to an actor's message

type StreamPredicate

type StreamPredicate func(event *StreamEvent) bool

StreamPredicate filters a stream

type StreamingProvider

type StreamingProvider struct {
	// contains filtered or unexported fields
}

StreamingProvider is a wrapper for an existing provider that adds event streaming support

func NewStreamingProvider

func NewStreamingProvider(target ProviderState) *StreamingProvider

NewStreamingProvider wraps an existing provier to provide a stream on events

func (*StreamingProvider) GetEventStream

func (p *StreamingProvider) GetEventStream() *eventstream.EventStream

GetEventStream returns the underlyinfg proto.actor stream for a streaming provider

func (*StreamingProvider) GetState

func (p *StreamingProvider) GetState() ProviderState

GetState returns the persistence.ProviderState associated with this provider

func (*StreamingProvider) GetStreamingState

func (p *StreamingProvider) GetStreamingState() StreamingProviderState

GetStreamingState returns the persistence.ProviderState associated with this provider

type StreamingProviderState

type StreamingProviderState interface {
	ProviderState
	// StreamNewEvents sends any  events that match the predicate as they arrive to fn
	StreamNewEvents(predicate StreamPredicate, fn StreamCallBack) *eventstream.Subscription
	// SubscribeActorJournal streams all events that were persisted as they were saved by a given actor after fromIndex, and continues to stream any new events to the subscription
	SubscribeActorJournal(persistenceName string, fromIndex int, fn StreamCallBack) *eventstream.Subscription
	// QueryActorJournal searches the actor journal from a given index and sends all events that match predicate to fn
	QueryActorJournal(persistenceName string, fromIndex int, predicate StreamPredicate, fn StreamCallBack)
	// UnsubscribeStream closes a subscription
	UnsubscribeStream(sub *eventstream.Subscription)
}

StreamingProviderState provides streaming access to an actor's events (including historical journal)

type TestEvent

type TestEvent struct {
	Index     uint32 `protobuf:"varint,1,opt,name=index" json:"index,omitempty"`
	StringVal string `protobuf:"bytes,2,opt,name=string_val,json=stringVal" json:"string_val,omitempty"`
}

func (*TestEvent) Descriptor

func (*TestEvent) Descriptor() ([]byte, []int)

func (*TestEvent) GetIndex

func (m *TestEvent) GetIndex() uint32

func (*TestEvent) GetStringVal

func (m *TestEvent) GetStringVal() string

func (*TestEvent) ProtoMessage

func (*TestEvent) ProtoMessage()

func (*TestEvent) Reset

func (m *TestEvent) Reset()

func (*TestEvent) String

func (m *TestEvent) String() string

type TestSnapshot

type TestSnapshot struct {
	StringVal string `protobuf:"bytes,1,opt,name=string_val,json=stringVal" json:"string_val,omitempty"`
	Index     uint32 `protobuf:"varint,2,opt,name=index" json:"index,omitempty"`
}

func (*TestSnapshot) Descriptor

func (*TestSnapshot) Descriptor() ([]byte, []int)

func (*TestSnapshot) GetIndex

func (m *TestSnapshot) GetIndex() uint32

func (*TestSnapshot) GetStringVal

func (m *TestSnapshot) GetStringVal() string

func (*TestSnapshot) ProtoMessage

func (*TestSnapshot) ProtoMessage()

func (*TestSnapshot) Reset

func (m *TestSnapshot) Reset()

func (*TestSnapshot) String

func (m *TestSnapshot) String() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL