store

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 4, 2019 License: AGPL-3.0 Imports: 18 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewDuplicateEventError

func NewDuplicateEventError(eventInStore cloudevents.Event) error

func NewFossilServer

func NewFossilServer(
	collector Collector,
	factory *EventStreamFactory,
	store EventStore,
	loader EventLoader,
	lock DistributedLock,
	jwtTokenSecret string,
) *chi.Mux

Types

type Collector

type Collector interface {
	Collect(context context.Context, event *cloudevents.Event) error
}

type CollectorRouter

type CollectorRouter struct {
	// contains filtered or unexported fields
}

func NewCollectorRouter

func NewCollectorRouter(collector Collector, waiter *ConsumerWaiter) *CollectorRouter

func (*CollectorRouter) CollectEvent

func (r *CollectorRouter) CollectEvent(w http.ResponseWriter, request *http.Request)

func (*CollectorRouter) Mount

func (r *CollectorRouter) Mount(router *chi.Mux)

type ConsumerTimedOut

type ConsumerTimedOut struct {
	WaitConsumerConfiguration
}

func (*ConsumerTimedOut) Error

func (e *ConsumerTimedOut) Error() string

type ConsumerWaiter

type ConsumerWaiter struct {
	// contains filtered or unexported fields
}

func NewConsumerWaiter

func NewConsumerWaiter(broadcaster *concurrency.ChannelBroadcaster) *ConsumerWaiter

func (*ConsumerWaiter) WaitFor

func (w *ConsumerWaiter) WaitFor(ctx context.Context, configurations []WaitConsumerConfiguration) chan error

type ConsumerWaiterRouter

type ConsumerWaiterRouter struct {
	// contains filtered or unexported fields
}

func NewConsumerWaiterRouter

func NewConsumerWaiterRouter(collector Collector) *ConsumerWaiterRouter

func (*ConsumerWaiterRouter) Ack

func (*ConsumerWaiterRouter) Mount

func (cwr *ConsumerWaiterRouter) Mount(router *chi.Mux)

type DefaultCollector

type DefaultCollector struct {
	// contains filtered or unexported fields
}

func NewCollector

func NewCollector(store EventStore, publisher Publisher) *DefaultCollector

func (*DefaultCollector) Collect

func (c *DefaultCollector) Collect(context context.Context, event *cloudevents.Event) error

type DistributedLock

type DistributedLock interface {
	Lock(ctx context.Context, identifier string) error
	Release(identifier string) error
}

type DuplicateEventError

type DuplicateEventError struct {
	// contains filtered or unexported fields
}

func (*DuplicateEventError) Error

func (e *DuplicateEventError) Error() string

func (*DuplicateEventError) EventInStore

func (e *DuplicateEventError) EventInStore() cloudevents.Event

type EventAcknowledgment

type EventAcknowledgment struct {
	EventId      string `json:"event_id"`
	ConsumerName string `json:"consumer_name"`
}

type EventLoader

type EventLoader interface {
	MatchingStream(ctx context.Context, matcher events.Matcher) chan cloudevents.Event
}

type EventNotFound

type EventNotFound struct{}

func (*EventNotFound) Error

func (e *EventNotFound) Error() string

type EventStore

type EventStore interface {
	Store(ctx context.Context, stream string, event *cloudevents.Event) error
	Find(ctx context.Context, id string) (*cloudevents.Event, error)
}

type EventStreamFactory

type EventStreamFactory struct {
	Source      chan cloudevents.Event
	Broadcaster *concurrency.ChannelBroadcaster
	// contains filtered or unexported fields
}

func NewEventStreamFactory

func NewEventStreamFactory(loader EventLoader) *EventStreamFactory

func (*EventStreamFactory) NewEventStream

func (f *EventStreamFactory) NewEventStream(ctx context.Context, matcher events.Matcher) chan cloudevents.Event

type InMemoryPublisher

type InMemoryPublisher struct {
	Events []*cloudevents.Event
}

func NewInMemoryPublisher

func NewInMemoryPublisher() *InMemoryPublisher

func (*InMemoryPublisher) Publish

func (p *InMemoryPublisher) Publish(ctx context.Context, stream string, event *cloudevents.Event) error

type InMemoryStorage

type InMemoryStorage struct {
	Events []cloudevents.Event
}

func NewInMemoryStorage

func NewInMemoryStorage() *InMemoryStorage

func (*InMemoryStorage) Find

func (s *InMemoryStorage) Find(ctx context.Context, identifier string) (*cloudevents.Event, error)

func (*InMemoryStorage) MatchingStream

func (s *InMemoryStorage) MatchingStream(ctx context.Context, matcher events.Matcher) chan cloudevents.Event

func (*InMemoryStorage) Store

func (s *InMemoryStorage) Store(ctx context.Context, stream string, event *cloudevents.Event) error

type NamedConsumers

type NamedConsumers struct {
	// contains filtered or unexported fields
}

func NewNamedConsumers

func NewNamedConsumers(sseRouter *SSERouter, store EventStore, loader EventLoader, lock DistributedLock) *NamedConsumers

func (*NamedConsumers) CommitOffset

func (cg *NamedConsumers) CommitOffset(rw http.ResponseWriter, req *http.Request)

func (*NamedConsumers) Mount

func (cg *NamedConsumers) Mount(router *chi.Mux)

func (*NamedConsumers) Stream

func (cg *NamedConsumers) Stream(rw http.ResponseWriter, req *http.Request)

type Publisher

type Publisher interface {
	Publish(context context.Context, stream string, event *cloudevents.Event) error
}

type Router

type Router struct {
	// contains filtered or unexported fields
}

type SSERouter

type SSERouter struct {
	// contains filtered or unexported fields
}

func NewSSERouter

func NewSSERouter(eventStreamFactory *EventStreamFactory, store EventStore) *SSERouter

func (*SSERouter) Mount

func (r *SSERouter) Mount(router *chi.Mux)

func (*SSERouter) StreamEvents

func (r *SSERouter) StreamEvents(rw http.ResponseWriter, req *http.Request)

type SequenceNumberDoNotMatchError

type SequenceNumberDoNotMatchError struct{}

func (*SequenceNumberDoNotMatchError) Error

type WaitConsumerConfiguration

type WaitConsumerConfiguration struct {
	EventId      string
	ConsumerName string
	Timeout      time.Duration
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL