Documentation ¶
Index ¶
- Constants
- func ChunkedRecordStream(in RecordStream, batchSize int, timeout time.Duration) <-chan Records
- func ContainsEventWithIndex(page hyper.Item, i uint64) bool
- func Decode(data json.RawMessage, v interface{}) error
- func Encode(v interface{}) (json.RawMessage, error)
- func Follow() func(*Streamer) error
- func From(version uint64) func(*Streamer) error
- func FromCurrent() func(*Streamer) error
- func HandleGETStream(store Store, stream string) func(w http.ResponseWriter, r *http.Request)
- func Named(name string) func(*Streamer) error
- func Response(msg string, errs ...error) hyper.Item
- func Timeout(d time.Duration) func(*Streamer) error
- func UseClient(client *http.Client) func(*Streamer) error
- type BasicStore
- func (s *BasicStore) Append(streamID string, expectedVersion uint64, records Records) error
- func (s *BasicStore) Close() error
- func (s *BasicStore) Load(streamID string) RecordStream
- func (s *BasicStore) LoadFrom(streamID string, skip uint64) RecordStream
- func (s *BasicStore) LoadSlice(streamID string, skip uint64, limit uint64) (*Slice, error)
- func (s *BasicStore) SubscribeToStream(streamID string) Subscription
- func (s *BasicStore) SubscribeToStreamFrom(streamID string, version uint64) Subscription
- func (s *BasicStore) SubscribeToStreamFromCurrent(streamID string) Subscription
- func (s *BasicStore) Version(streamID string) uint64
- type ChangeRecorder
- type ChunkedStore
- func (s *ChunkedStore) Append(streamID string, expectedVersion uint64, records Records) error
- func (s *ChunkedStore) Close() error
- func (s *ChunkedStore) Load(streamID string) RecordStream
- func (s *ChunkedStore) LoadFrom(streamID string, skip uint64) RecordStream
- func (s *ChunkedStore) LoadSlice(streamID string, skip uint64, limit uint64) (*Slice, error)
- func (s *ChunkedStore) SubscribeToStream(streamID string) Subscription
- func (s *ChunkedStore) SubscribeToStreamFrom(streamID string, version uint64) Subscription
- func (s *ChunkedStore) SubscribeToStreamFromCurrent(streamID string) Subscription
- func (s *ChunkedStore) Version(streamID string) uint64
- type Codec
- type CodecOption
- type Event
- type EventStream
- type Events
- type ExtractIDFunc
- type Feeder
- type OptimisticConcurrencyError
- type Record
- type RecordMutation
- type RecordStream
- type Records
- type Server
- type Slice
- type Store
- type Streamer
- type StreamerOption
- type Subscription
Constants ¶
View Source
const ( HeaderEtag = "Etag" HeaderLongPoll = "Long-Poll" HeaderIfNoneMatch = "If-None-Match" )
View Source
const (
All = "$all"
)
Variables ¶
This section is empty.
Functions ¶
func ChunkedRecordStream ¶ added in v0.1.12
func ChunkedRecordStream(in RecordStream, batchSize int, timeout time.Duration) <-chan Records
func Decode ¶
func Decode(data json.RawMessage, v interface{}) error
func Encode ¶
func Encode(v interface{}) (json.RawMessage, error)
func FromCurrent ¶
func HandleGETStream ¶
Types ¶
type BasicStore ¶ added in v0.1.5
type BasicStore struct {
// contains filtered or unexported fields
}
func NewBasicStore ¶ added in v0.1.5
func NewBasicStore(dataSourceName string) (*BasicStore, error)
func (*BasicStore) Append ¶ added in v0.1.5
func (s *BasicStore) Append(streamID string, expectedVersion uint64, records Records) error
func (*BasicStore) Close ¶ added in v0.1.5
func (s *BasicStore) Close() error
func (*BasicStore) Load ¶ added in v0.1.5
func (s *BasicStore) Load(streamID string) RecordStream
func (*BasicStore) LoadFrom ¶ added in v0.1.5
func (s *BasicStore) LoadFrom(streamID string, skip uint64) RecordStream
func (*BasicStore) SubscribeToStream ¶ added in v0.1.5
func (s *BasicStore) SubscribeToStream(streamID string) Subscription
func (*BasicStore) SubscribeToStreamFrom ¶ added in v0.1.5
func (s *BasicStore) SubscribeToStreamFrom(streamID string, version uint64) Subscription
func (*BasicStore) SubscribeToStreamFromCurrent ¶ added in v0.1.5
func (s *BasicStore) SubscribeToStreamFromCurrent(streamID string) Subscription
func (*BasicStore) Version ¶ added in v0.1.5
func (s *BasicStore) Version(streamID string) uint64
type ChangeRecorder ¶
type ChangeRecorder struct {
// contains filtered or unexported fields
}
ChangeRecorder can be used to represent a unit of work within an event sourced system.
func NewChangeRecorder ¶
func NewChangeRecorder() *ChangeRecorder
func (*ChangeRecorder) Changes ¶
func (cr *ChangeRecorder) Changes() Events
Changes retrieves all currently pending changes.
func (*ChangeRecorder) ClearChanges ¶
func (cr *ChangeRecorder) ClearChanges()
ClearChanges will remove all pending chnages from the underlying data structure.
func (*ChangeRecorder) Record ¶
func (cr *ChangeRecorder) Record(e Event)
Record will add an event to the list of pending changes.
type ChunkedStore ¶ added in v0.1.7
type ChunkedStore struct {
// contains filtered or unexported fields
}
func NewChunkedStore ¶ added in v0.1.7
func NewChunkedStore(dataSourceName string) (*ChunkedStore, error)
func (*ChunkedStore) Append ¶ added in v0.1.7
func (s *ChunkedStore) Append(streamID string, expectedVersion uint64, records Records) error
func (*ChunkedStore) Close ¶ added in v0.1.7
func (s *ChunkedStore) Close() error
func (*ChunkedStore) Load ¶ added in v0.1.7
func (s *ChunkedStore) Load(streamID string) RecordStream
func (*ChunkedStore) LoadFrom ¶ added in v0.1.7
func (s *ChunkedStore) LoadFrom(streamID string, skip uint64) RecordStream
func (*ChunkedStore) SubscribeToStream ¶ added in v0.1.7
func (s *ChunkedStore) SubscribeToStream(streamID string) Subscription
func (*ChunkedStore) SubscribeToStreamFrom ¶ added in v0.1.7
func (s *ChunkedStore) SubscribeToStreamFrom(streamID string, version uint64) Subscription
func (*ChunkedStore) SubscribeToStreamFromCurrent ¶ added in v0.1.7
func (s *ChunkedStore) SubscribeToStreamFromCurrent(streamID string) Subscription
func (*ChunkedStore) Version ¶ added in v0.1.7
func (s *ChunkedStore) Version(streamID string) uint64
type Codec ¶
type Codec struct { *io.TypeRegistry // contains filtered or unexported fields }
func NewCodec ¶
func NewCodec(opts ...CodecOption) *Codec
NewCodec creates a new Codec to encode Events into Records and decode Records into Events.
type CodecOption ¶ added in v0.1.9
type CodecOption func(*Codec)
func ExtractID ¶ added in v0.1.9
func ExtractID(fn ExtractIDFunc) CodecOption
ExtractID sets an ExtractIDFunc for the codec
type EventStream ¶
type EventStream <-chan Event
type ExtractIDFunc ¶ added in v0.1.9
func IDByField ¶ added in v0.1.9
func IDByField(fieldName string) ExtractIDFunc
IDByField creates an ExtractIDFunc that uses reflection to extract a string from a field with the fieldName. If that field is not present or the extracted string is empty a uuid v4 will be generated as a result.
type OptimisticConcurrencyError ¶
func (OptimisticConcurrencyError) Error ¶
func (e OptimisticConcurrencyError) Error() string
type Record ¶
type Record struct { ID string `json:"id,omitempty"` // the unique id of the event StreamID string `json:"stream-id"` // the id of the current stream StreamIndex uint64 `json:"stream-index"` // the index of the event within the current stream OriginStreamID string `json:"origin-stream-id"` // the id of the origin stream OriginStreamIndex uint64 `json:"origin-stream-index"` // the index of the event within the origin stream RecordedOn time.Time `json:"recorded-on"` // the time the event was first recorded Type string `json:"type"` // the type of the event Data json.RawMessage `json:"data,omitempty"` // the data of the event Metadata json.RawMessage `json:"metadata,omitempty"` // metadata to the event }
type RecordMutation ¶
type RecordMutation func(r *Record)
func WithMetadata ¶
func WithMetadata(v interface{}) RecordMutation
type RecordStream ¶
type RecordStream <-chan Record
func (RecordStream) Records ¶ added in v0.1.7
func (rs RecordStream) Records() Records
type Records ¶
type Records []Record
func (Records) Stream ¶
func (rs Records) Stream() RecordStream
type Store ¶
type Store interface { io.Closer Version(streamID string) uint64 Load(streamID string) RecordStream LoadFrom(streamID string, skip uint64) RecordStream LoadSlice(streamID string, skip uint64, limit uint64) (*Slice, error) Append(streamID string, expectedVersion uint64, records Records) error SubscribeToStream(streamID string) Subscription SubscribeToStreamFrom(streamID string, version uint64) Subscription SubscribeToStreamFromCurrent(streamID string) Subscription }
type Streamer ¶
type Streamer struct {
// contains filtered or unexported fields
}
func NewStreamer ¶
func NewStreamer(url string, opts ...StreamerOption) (*Streamer, error)
func (*Streamer) Stream ¶
func (s *Streamer) Stream() RecordStream
type StreamerOption ¶ added in v0.1.9
type Subscription ¶
type Subscription interface { Records() RecordStream On(callback func(r Record)) Cancel() error }
func SubscribeToStream ¶
func SubscribeToStream(url string, auxOptions ...StreamerOption) (Subscription, error)
func SubscribeToStreamFrom ¶
func SubscribeToStreamFrom(url string, version uint64, auxOptions ...StreamerOption) (Subscription, error)
Source Files ¶
Click to show internal directories.
Click to hide internal directories.