eventstore

package module
v0.2.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 14, 2022 License: MIT Imports: 14 Imported by: 1

README

EventStore

GoDoc

High performance event store which is used by Brobridge Gravity.

Benchmark

Here is benchmark results:

$ go test -v -bench Benchmark -run Benchmark
goos: darwin
goarch: amd64
pkg: github.com/BrobridgeOrg/EventStore
cpu: Intel(R) Core(TM) i9-9880H CPU @ 2.30GHz
BenchmarkWrite
BenchmarkWrite-16                  	  208004	      5060 ns/op
BenchmarkEventThroughput
BenchmarkEventThroughput-16        	  119781	      9529 ns/op
BenchmarkSnapshotPerformance
BenchmarkSnapshotPerformance-16    	   62036	     17749 ns/op
PASS
ok  	github.com/BrobridgeOrg/EventStore	20.779s

License

Licensed under the MIT License

Authors

Copyright(c) 2020 Fred Chien fred@brobridge.com

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	PrefixState        = []byte("s")
	PrefixEvent        = []byte("e")
	PrefixSnapshotData = []byte("d")
)
View Source
var (
	StateClassStore    = []byte("s")
	StateGroupEvent    = []byte("e")
	StateGroupSnapshot = []byte("s")
)
View Source
var (
	StatePathEventRev        = bytes.Join([][]byte{StateClassStore, StateGroupEvent, []byte("rev")}, []byte("."))
	StatePathEventLastSeq    = bytes.Join([][]byte{StateClassStore, StateGroupEvent, []byte("lastSeq")}, []byte("."))
	StatePathEventCount      = bytes.Join([][]byte{StateClassStore, StateGroupEvent, []byte("count")}, []byte("."))
	StatePathSnapshotLastSeq = bytes.Join([][]byte{StateClassStore, StateGroupSnapshot, []byte("lastSeq")}, []byte("."))
	StatePathSnapshotCount   = bytes.Join([][]byte{StateClassStore, StateGroupSnapshot, []byte("count")}, []byte("."))
)
View Source
var (
	ErrInvalidKey = errors.New("EventStore: invalid key")
)
View Source
var (
	ErrRecordNotFound = errors.New("EventStore: record not found")
)
View Source
var (
	ErrSnapshotNotEnabled = errors.New("EventStore: snapshot not enabled")
)
View Source
var (
	ErrStateEntryNotFound = errors.New("store: state entry not found")
)

Functions

func BytesToFloat64 added in v0.2.0

func BytesToFloat64(data []byte) float64

func BytesToInt64 added in v0.2.0

func BytesToInt64(data []byte) int64

func BytesToString added in v0.1.7

func BytesToString(b []byte) string

func BytesToUint64

func BytesToUint64(data []byte) uint64

func Float64ToBytes added in v0.2.0

func Float64ToBytes(n float64) []byte

func GetFixedSizeBytes

func GetFixedSizeBytes(size int, str string) []byte

func Int64ToBytes added in v0.2.0

func Int64ToBytes(n int64) []byte

func StrToBytes

func StrToBytes(s string) []byte

func Uint64ToBytes

func Uint64ToBytes(n uint64) []byte

Types

type ColumnFamily added in v0.0.14

type ColumnFamily struct {
	Store  *Store
	Name   string
	Symbol []byte
	Prefix []byte
}

func NewColumnFamily added in v0.0.14

func NewColumnFamily(store *Store, name string, symbol []byte) *ColumnFamily

func (*ColumnFamily) Delete added in v0.0.23

func (cf *ColumnFamily) Delete(b *pebble.Batch, key []byte) error

func (*ColumnFamily) Get added in v0.2.0

func (cf *ColumnFamily) Get(b *pebble.Batch, key []byte) ([]byte, io.Closer, error)

func (*ColumnFamily) List added in v0.2.0

func (cf *ColumnFamily) List(rawPrefix []byte, targetPrimaryKey []byte, opts *ListOptions) (*Cursor, error)

func (*ColumnFamily) Write added in v0.0.16

func (cf *ColumnFamily) Write(b *pebble.Batch, key []byte, data []byte) error

type Counter

type Counter uint64

func (*Counter) Count

func (counter *Counter) Count() uint64

func (*Counter) Increase

func (counter *Counter) Increase(delta uint64) uint64

func (*Counter) SetCount

func (counter *Counter) SetCount(val uint64)

func (*Counter) String

func (counter *Counter) String() string

type Cursor added in v0.2.0

type Cursor struct {
	// contains filtered or unexported fields
}

func (*Cursor) Close added in v0.2.0

func (cur *Cursor) Close() error

func (*Cursor) EOF added in v0.2.0

func (cur *Cursor) EOF() bool

func (*Cursor) GetData added in v0.2.0

func (cur *Cursor) GetData() []byte

func (*Cursor) GetKey added in v0.2.0

func (cur *Cursor) GetKey() []byte

func (*Cursor) Next added in v0.2.0

func (cur *Cursor) Next() bool

type Event

type Event struct {
	Sequence uint64
	Data     []byte
	// contains filtered or unexported fields
}

func NewEvent

func NewEvent() *Event

func (*Event) Ack

func (event *Event) Ack()

func (*Event) Release added in v0.0.21

func (event *Event) Release()

type EventStore

type EventStore struct {
	// contains filtered or unexported fields
}

func CreateEventStore

func CreateEventStore(options *Options) (*EventStore, error)

func (*EventStore) Close

func (eventstore *EventStore) Close()

func (*EventStore) GetStore

func (eventstore *EventStore) GetStore(storeName string, opts ...StoreOpt) (*Store, error)

func (*EventStore) RecoverSnapshot added in v0.0.2

func (eventstore *EventStore) RecoverSnapshot(store *Store) error

func (*EventStore) SetSnapshotHandler

func (eventstore *EventStore) SetSnapshotHandler(fn func(*SnapshotRequest) error)

func (*EventStore) TakeSnapshot

func (eventstore *EventStore) TakeSnapshot(b *pebble.Batch, store *Store, seq uint64, data []byte) error

func (*EventStore) UnregisterStore

func (eventstore *EventStore) UnregisterStore(name string)

type ListOptions added in v0.2.0

type ListOptions struct {
	Prefix           []byte
	WithoutRawPrefix bool
}

type Merger added in v0.0.15

type Merger struct {
	// contains filtered or unexported fields
}

func NewMerger added in v0.1.3

func NewMerger() *Merger

func (*Merger) Finish added in v0.0.15

func (m *Merger) Finish(includesBase bool) ([]byte, io.Closer, error)

func (*Merger) MergeNewer added in v0.0.15

func (m *Merger) MergeNewer(value []byte) error

func (*Merger) MergeOlder added in v0.0.15

func (m *Merger) MergeOlder(value []byte) error

type Options

type Options struct {
	DatabasePath     string
	BypassEventStore bool
	EnabledSnapshot  bool
	SnapshotOptions  *SnapshotOptions
}

func NewOptions

func NewOptions() *Options

type Record added in v0.0.12

type Record struct {
	Key  []byte
	Data []byte
}

func NewRecord added in v0.0.12

func NewRecord() *Record

func (*Record) Release added in v0.0.21

func (r *Record) Release()

type SnapshotController

type SnapshotController struct {
	// contains filtered or unexported fields
}

func NewSnapshotController

func NewSnapshotController(options *SnapshotOptions) *SnapshotController

func (*SnapshotController) RecoverSnapshot added in v0.0.2

func (ss *SnapshotController) RecoverSnapshot(store *Store) error

func (*SnapshotController) Request

func (ss *SnapshotController) Request(b *pebble.Batch, store *Store, seq uint64, data []byte) error

func (*SnapshotController) SetHandler

func (ss *SnapshotController) SetHandler(fn func(*SnapshotRequest) error)

type SnapshotOptions

type SnapshotOptions struct {
	WorkerCount int32
	BufferSize  int
}

func NewSnapshotOptions

func NewSnapshotOptions() *SnapshotOptions

type SnapshotRequest

type SnapshotRequest struct {
	Sequence uint64
	Store    *Store
	Data     []byte
	Batch    *pebble.Batch
}

func NewSnapshotRequest

func NewSnapshotRequest() *SnapshotRequest

func (*SnapshotRequest) Delete

func (request *SnapshotRequest) Delete(collection []byte, key []byte) error

func (*SnapshotRequest) Get

func (request *SnapshotRequest) Get(collection []byte, key []byte) ([]byte, error)

func (*SnapshotRequest) UpdateDurableState added in v0.0.2

func (request *SnapshotRequest) UpdateDurableState(b *pebble.Batch, collection []byte) error

func (*SnapshotRequest) Upsert

func (request *SnapshotRequest) Upsert(collection []byte, key []byte, value []byte, fn func([]byte, []byte) []byte) error

type SnapshotView added in v0.0.8

type SnapshotView struct {
	// contains filtered or unexported fields
}

func NewSnapshotView added in v0.0.8

func NewSnapshotView(store *Store) *SnapshotView

func (*SnapshotView) Fetch added in v0.0.8

func (sv *SnapshotView) Fetch(collection []byte, key []byte, offset uint64, count int) ([]*Record, error)

func (*SnapshotView) Get added in v0.1.2

func (sv *SnapshotView) Get(collection []byte, key []byte) ([]byte, error)

func (*SnapshotView) Initialize added in v0.0.8

func (sv *SnapshotView) Initialize() error

func (*SnapshotView) Release added in v0.0.8

func (sv *SnapshotView) Release() error

type Store

type Store struct {
	// contains filtered or unexported fields
}

func NewStore

func NewStore(eventstore *EventStore, storeName string, opts ...StoreOpt) (*Store, error)

func (*Store) Close

func (store *Store) Close()

func (*Store) CreateSnapshotView added in v0.0.8

func (store *Store) CreateSnapshotView() *SnapshotView

func (*Store) Delete added in v0.0.23

func (store *Store) Delete(seq uint64) error

func (*Store) DeleteState added in v0.2.0

func (store *Store) DeleteState(b *pebble.Batch, class []byte, group []byte, key []byte) error

func (*Store) DispatchEvent

func (store *Store) DispatchEvent()

func (*Store) Fetch added in v0.0.5

func (store *Store) Fetch(startAt uint64, offset uint64, count int) ([]*Event, error)

func (*Store) Get added in v0.2.0

func (store *Store) Get(seq uint64) ([]byte, error)

func (*Store) GetDurableState

func (store *Store) GetDurableState(durableName string) (uint64, error)

func (*Store) GetStateBytes added in v0.2.0

func (store *Store) GetStateBytes(class []byte, group []byte, key []byte) ([]byte, error)

func (*Store) GetStateFloat64 added in v0.2.0

func (store *Store) GetStateFloat64(class []byte, group []byte, key []byte) (float64, error)

func (*Store) GetStateInt64 added in v0.2.0

func (store *Store) GetStateInt64(class []byte, group []byte, key []byte) (int64, error)

func (*Store) GetStateString added in v0.2.0

func (store *Store) GetStateString(class []byte, group []byte, key []byte) (string, error)

func (*Store) GetStateUint64 added in v0.2.0

func (store *Store) GetStateUint64(class []byte, group []byte, key []byte) (uint64, error)

func (*Store) ListStates added in v0.2.0

func (store *Store) ListStates(class []byte, group []byte, key []byte) (*Cursor, error)

func (*Store) SetStateBytes added in v0.2.0

func (store *Store) SetStateBytes(b *pebble.Batch, class []byte, group []byte, key []byte, value []byte) error

func (*Store) SetStateBytesByPath added in v0.2.1

func (store *Store) SetStateBytesByPath(b *pebble.Batch, key []byte, value []byte) error

func (*Store) SetStateFloat64 added in v0.2.0

func (store *Store) SetStateFloat64(b *pebble.Batch, class []byte, group []byte, key []byte, value float64) error

func (*Store) SetStateInt64 added in v0.2.0

func (store *Store) SetStateInt64(b *pebble.Batch, class []byte, group []byte, key []byte, value int64) error

func (*Store) SetStateString added in v0.2.0

func (store *Store) SetStateString(b *pebble.Batch, class []byte, group []byte, key []byte, value string) error

func (*Store) SetStateUint64 added in v0.2.0

func (store *Store) SetStateUint64(b *pebble.Batch, class []byte, group []byte, key []byte, value uint64) error

func (*Store) SetStateUint64ByPath added in v0.2.1

func (store *Store) SetStateUint64ByPath(b *pebble.Batch, key []byte, value uint64) error

func (*Store) State added in v0.2.0

func (store *Store) State() *StoreState

func (*Store) Subscribe

func (store *Store) Subscribe(fn StoreHandler, opts ...SubOpt) (*Subscription, error)

func (*Store) UpdateDurableState

func (store *Store) UpdateDurableState(b *pebble.Batch, durableName string, lastSeq uint64) error

func (*Store) Write

func (store *Store) Write(data []byte, rev uint64) (uint64, error)

type StoreHandler

type StoreHandler func(*Event)

type StoreOpt added in v0.2.0

type StoreOpt func(s *Store)

func WithMaxSyncInterval added in v0.2.0

func WithMaxSyncInterval(interval time.Duration) StoreOpt

func WithSnapshot added in v0.2.0

func WithSnapshot(enabled bool) StoreOpt

type StoreState added in v0.2.0

type StoreState struct {
	// contains filtered or unexported fields
}

func (*StoreState) Count added in v0.2.0

func (ss *StoreState) Count() uint64

func (*StoreState) LastSeq added in v0.2.0

func (ss *StoreState) LastSeq() uint64

func (*StoreState) Rev added in v0.2.1

func (ss *StoreState) Rev() uint64

func (*StoreState) SnapshotCount added in v0.2.0

func (ss *StoreState) SnapshotCount() uint64

func (*StoreState) SnapshotLastSeq added in v0.2.0

func (ss *StoreState) SnapshotLastSeq() uint64

type SubOpt added in v0.1.0

type SubOpt func(s *Subscription)

func DurableName added in v0.1.0

func DurableName(durable string) SubOpt

func StartAtSequence added in v0.1.0

func StartAtSequence(seq uint64) SubOpt

type Subscription

type Subscription struct {
	// contains filtered or unexported fields
}

func NewSubscription

func NewSubscription(store *Store, cf *ColumnFamily, fn StoreHandler, opts ...SubOpt) *Subscription

func (*Subscription) Close

func (sub *Subscription) Close()

func (*Subscription) Trigger

func (sub *Subscription) Trigger() error

func (*Subscription) Watch

func (sub *Subscription) Watch()

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL