storage

package
v0.14.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 10, 2021 License: AGPL-3.0 Imports: 15 Imported by: 0

Documentation

Index

Constants

View Source
const (
	EncodingJSON     = 0
	EncodingProtobuf = 1
	EncodingMsgPack  = 2
	EncodingGob      = 3
)
View Source
const (
	MAX_EVENT_DATA_SIZE = 1024
	MAX_FETCH_SIZE      = 100
)
View Source
const (
	GLOBAL_STREAM = "$global"
)

Variables

View Source
var (
	ErrConcurrentStreamModification = errors.New("concurrent stream modification")
	ErrGappedStream                 = errors.New("given version leaves gap in stream")
	ErrEmptyEventType               = errors.New("event type cannot be empty")
	ErrEmptyEvents                  = errors.New("list of events is empty")
	ErrZeroStream                   = errors.New("stream cannot be all zeroes")
	ErrWrongVersion                 = errors.New("wrong event version")
	ErrExceededEventDataSize        = errors.New("exceeded event data size")
	ErrExceededCorrelationIDSize    = errors.New("exceeded event correlation id size")
	ErrInvalidDatabaseName          = errors.New("invalid database name")
	ErrInvalidStreamName            = errors.New("invalid stream name")
	ErrDatabaseDoesNotExist         = errors.New("database does not exist")
	ErrEventNotFound                = errors.New("event not found")
)
View Source
var (
	SEPERATOR       = []byte("/")
	STREAM_PREFIX   = []byte{0}
	EVENT_PREFIX    = []byte{1}
	SEQUENCE_PREFIX = []byte{2}
)
View Source
var File_proto_storage_proto protoreflect.FileDescriptor

Functions

This section is empty.

Types

type Entry

type Entry struct {
	Encoding int32  `protobuf:"varint,1,opt,name=encoding,proto3" json:"encoding,omitempty"`
	Value    []byte `protobuf:"bytes,2,opt,name=value,proto3" json:"value,omitempty"`
	// contains filtered or unexported fields
}

func (*Entry) Descriptor deprecated

func (*Entry) Descriptor() ([]byte, []int)

Deprecated: Use Entry.ProtoReflect.Descriptor instead.

func (*Entry) GetEncoding

func (x *Entry) GetEncoding() int32

func (*Entry) GetValue

func (x *Entry) GetValue() []byte

func (*Entry) ProtoMessage

func (*Entry) ProtoMessage()

func (*Entry) ProtoReflect

func (x *Entry) ProtoReflect() protoreflect.Message

func (*Entry) Reset

func (x *Entry) Reset()

func (*Entry) String

func (x *Entry) String() string

type Event

type Event struct {
	ID             string    `json:"id"`
	Stream         string    `json:"stream"`
	Sequence       uint64    `json:"sequence"`
	GlobalSequence uint64    `json:"global_sequence"`
	Type           string    `json:"type"`
	Data           []byte    `json:"data"`
	CausationID    string    `json:"causation_id"`
	CorrelationID  string    `json:"correlation_id"`
	Timestamp      time.Time `json:"timestamp"`
}

type EventRequest

type EventRequest struct {
	Type          string `json:"type"`
	Data          []byte `json:"data"`
	CausationID   string `json:"causation_id"`
	CorrelationID string `json:"correlation_id"`
}

type EventStore

type EventStore interface {
	AppendToStream(stream string, sequence uint64, events []EventRequest) ([]string, error)

	GetStream(stream string, sequence uint64, limit uint32) ([]string, error)

	GetEvent(id string) (Event, error)

	GetEventsFromStream(stream string, sequence uint64, limit uint32) ([]Event, error)

	ListStreams(skip uint32, limit uint32) ([]string, error)

	StreamCount() (uint64, error)

	EventCount() (uint64, error)

	StreamSequence(stream string) (uint64, error)

	DBSize() (int64, error)

	Close() error
}

type PersistedEvent

type PersistedEvent struct {
	Id             string `protobuf:"bytes,2,opt,name=id,proto3" json:"id,omitempty"`
	Stream         string `protobuf:"bytes,3,opt,name=stream,proto3" json:"stream,omitempty"`
	Sequence       uint64 `protobuf:"varint,4,opt,name=sequence,proto3" json:"sequence,omitempty"`
	GlobalSequence uint64 `protobuf:"varint,10,opt,name=global_sequence,json=globalSequence,proto3" json:"global_sequence,omitempty"`
	Type           string `protobuf:"bytes,5,opt,name=type,proto3" json:"type,omitempty"`
	Timestamp      int64  `protobuf:"varint,6,opt,name=timestamp,proto3" json:"timestamp,omitempty"`
	CorrelationId  string `protobuf:"bytes,7,opt,name=correlation_id,json=correlationId,proto3" json:"correlation_id,omitempty"`
	CausationId    string `protobuf:"bytes,9,opt,name=causation_id,json=causationId,proto3" json:"causation_id,omitempty"`
	Data           []byte `protobuf:"bytes,8,opt,name=data,proto3" json:"data,omitempty"`
	// contains filtered or unexported fields
}

func (*PersistedEvent) Descriptor deprecated

func (*PersistedEvent) Descriptor() ([]byte, []int)

Deprecated: Use PersistedEvent.ProtoReflect.Descriptor instead.

func (*PersistedEvent) GetCausationId

func (x *PersistedEvent) GetCausationId() string

func (*PersistedEvent) GetCorrelationId

func (x *PersistedEvent) GetCorrelationId() string

func (*PersistedEvent) GetData

func (x *PersistedEvent) GetData() []byte

func (*PersistedEvent) GetGlobalSequence

func (x *PersistedEvent) GetGlobalSequence() uint64

func (*PersistedEvent) GetId

func (x *PersistedEvent) GetId() string

func (*PersistedEvent) GetSequence

func (x *PersistedEvent) GetSequence() uint64

func (*PersistedEvent) GetStream

func (x *PersistedEvent) GetStream() string

func (*PersistedEvent) GetTimestamp

func (x *PersistedEvent) GetTimestamp() int64

func (*PersistedEvent) GetType

func (x *PersistedEvent) GetType() string

func (*PersistedEvent) ProtoMessage

func (*PersistedEvent) ProtoMessage()

func (*PersistedEvent) ProtoReflect

func (x *PersistedEvent) ProtoReflect() protoreflect.Message

func (*PersistedEvent) Reset

func (x *PersistedEvent) Reset()

func (*PersistedEvent) String

func (x *PersistedEvent) String() string

type Store

type Store struct {
	// contains filtered or unexported fields
}

func NewStore

func NewStore(db *badger.DB) (*Store, error)

func (*Store) AppendToStream

func (s *Store) AppendToStream(stream string, sequence uint64, events []EventRequest) ([]string, error)

func (*Store) AppendToStreamWithRetry

func (s *Store) AppendToStreamWithRetry(stream string, sequence uint64, events []EventRequest, retries int) ([]string, error)

func (*Store) Close

func (s *Store) Close() error

func (*Store) DBSize

func (s *Store) DBSize() (int64, error)

func (*Store) EventCount

func (s *Store) EventCount() (uint64, error)

func (*Store) GetEvent

func (s *Store) GetEvent(id string) (Event, error)

func (*Store) GetEventsFromStream

func (s *Store) GetEventsFromStream(stream string, sequence uint64, limit uint32) ([]Event, error)

func (*Store) GetStream

func (s *Store) GetStream(stream string, sequence uint64, limit uint32) ([]string, error)

func (*Store) ListStreams

func (s *Store) ListStreams(skip uint32, limit uint32) ([]string, error)

func (*Store) StreamCount

func (s *Store) StreamCount() (uint64, error)

func (*Store) StreamSequence

func (s *Store) StreamSequence(stream string) (uint64, error)

type Stream

type Stream struct {
	Id        string `protobuf:"bytes,2,opt,name=id,proto3" json:"id,omitempty"`
	Sequence  uint64 `protobuf:"varint,3,opt,name=sequence,proto3" json:"sequence,omitempty"`
	CreatedAt int64  `protobuf:"varint,4,opt,name=created_at,json=createdAt,proto3" json:"created_at,omitempty"`
	// contains filtered or unexported fields
}

func (*Stream) Descriptor deprecated

func (*Stream) Descriptor() ([]byte, []int)

Deprecated: Use Stream.ProtoReflect.Descriptor instead.

func (*Stream) GetCreatedAt

func (x *Stream) GetCreatedAt() int64

func (*Stream) GetId

func (x *Stream) GetId() string

func (*Stream) GetSequence

func (x *Stream) GetSequence() uint64

func (*Stream) ProtoMessage

func (*Stream) ProtoMessage()

func (*Stream) ProtoReflect

func (x *Stream) ProtoReflect() protoreflect.Message

func (*Stream) Reset

func (x *Stream) Reset()

func (*Stream) String

func (x *Stream) String() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL