rpatterns

package
v0.0.0-...-c9f981a Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 7, 2024 License: MIT Imports: 13 Imported by: 8

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrBatchState = errors.New("batch error state", j.C("ERR_b3053f5f1a3ecd23"))

Functions

func Await

func Await(
	in context.Context, stream reflex.StreamFunc, pollFn func() (bool, error),
	foreignID string, eventTypes ...reflex.EventType,
) error

Await returns nil when a new event with foreignID and one of the eventTypes is received from the event streamer. It also returns nil if the pollFn returns true when it is periodically called. Cancel the input context to return early.

This function can be used to wait for some state that is associated with an event. pollFn is used to periodically query the state, while this logic waits for a new event. It is done in parallel to mitigate race conditions.

func MemCursorStore

func MemCursorStore(opts ...MemOpt) reflex.CursorStore

MemCursorStore returns an in-memory cursor store. Note that it obviously does not provide any persistence guarantees.

Use cases:

  • Testing
  • Programmatic seeding of a cursor: See ReadThroughCursorStore above.

func NewAckSpec

func NewAckSpec(stream reflex.StreamFunc, ac *AckConsumer,
	opts ...reflex.StreamOption,
) reflex.Spec

NewAckSpec returns a reflex spec for the AckConsumer.

func NewBatchSpec

func NewBatchSpec(
	stream reflex.StreamFunc,
	bc *BatchConsumer,
	opts ...reflex.StreamOption,
) reflex.Spec

NewBatchSpec returns a reflex spec for the AckConsumer.

func NewBestEffortConsumer

func NewBestEffortConsumer(name string, retries int, fn reflex.ConsumerFunc,
	opts ...reflex.ConsumerOption,
) reflex.Consumer

NewBestEffortConsumer returns a reflex consumer that ignores errors after the provided number of retries and therefore eventually continues to the next event.

func NewBootstrapConsumable

func NewBootstrapConsumable(sFn reflex.StreamFunc, cstore reflex.CursorStore,
	opts ...reflex.StreamOption,
) reflex.Consumable

NewBootstrapConsumable returns a reflex consumable that will start streaming from head if no cursor is found. This is useful if old events should be skipped for new consumers. Once running (bootstrapped), one can safely revert to reflex.NewConsumable. Deprecated: Use NewBootstrapSpec.

func NewBootstrapSpec

func NewBootstrapSpec(stream reflex.StreamFunc, cstore reflex.CursorStore,
	consumer reflex.Consumer, opts ...reflex.StreamOption,
) reflex.Spec

NewBootstrapSpec returns a reflex spec that will start streaming from head if no cursor is found. This is useful if old events should be skipped for new consumers. Once running (bootstrapped), one can safely revert to reflex.NewSpec.

func NewConcurrentSpec

func NewConcurrentSpec(
	stream reflex.StreamFunc,
	rac *ConcurrentConsumer,
	opts ...reflex.StreamOption,
) reflex.Spec

NewConcurrentSpec wraps the ConcurrentConsumer in a reflex.Spec

func NewDeadLetterConsumer

func NewDeadLetterConsumer(name string, retries int, fn reflex.ConsumerFunc, eFn reflex.ErrorInsertFunc,
	opts ...reflex.ConsumerOption,
) reflex.Consumer

NewDeadLetterConsumer returns a reflex consumer that records but ignores errors after the provided number of retries and therefore eventually continues to the next event. However, if the consumer cannot record the error it will return the error in a blocking fashion like a standard consumer.

func Parallel

func Parallel(getCtx getCtxFn, getConsumer getConsumerFn, n int, stream reflex.StreamFunc,
	cstore reflex.CursorStore, opts ...ParallelOption,
)

Parallel starts N consumers which consume the stream in parallel. Each event is consistently hashed to a consumer using the field specified in HashOption. Role scheduling combined with an appropriate getCtxFn can be used to implement distributed parallel consuming.

NOTE: N should preferably be a power of 2, and modifying N will reset the cursors.

func ParallelAck

func ParallelAck(getCtx getCtxFn, getConsumer getAckConsumerFn, n int, stream reflex.StreamFunc, opts ...ParallelOption)

ParallelAck starts N consumers which consume the stream in parallel. Each event is consistently hashed to a consumer using the field specified in HashOption. Role scheduling combined with an appropriate getCtxFn can be used to implement distributed parallel consuming. Events must be acked manually.

NOTE: N should preferably be a power of 2, and modifying N will reset the cursors.

func ParallelAckConsumer

func ParallelAckConsumer(
	shard ConsumerShard,
	store reflex.CursorStore,
	consume AckConsumerFunc,
	opts ...reflex.ConsumerOption,
) reflex.Consumer

ParallelAckConsumer constructs a AckConsumer from a ConsumerShard. This AckConsumer can be used in NewAckSpec to make it runnable.

func ParallelConsumer

func ParallelConsumer(
	shard ConsumerShard,
	consume reflex.ConsumerFunc,
	opts ...reflex.ConsumerOption,
) reflex.Consumer

ParallelConsumer constructs a reflex.Consumer from a ConsumerShard. This pattern is used when you need to customise the consume function for each shard. This reflex.Consumer can be used in reflex.NewSpec to make it runnable.

func ParallelSpecs

func ParallelSpecs(name string, n int,
	stream reflex.StreamFunc, store reflex.CursorStore,
	consume reflex.ConsumerFunc,
	opts ...ParallelOption,
) []reflex.Spec

ParallelSpecs will create n reflex.Spec structs, one for each shard. stream and store are re-used for each spec. This pattern is used when consume is the same function for each shard. See ParallelOption for more details on passing through reflex.ConsumerOption or reflex.StreamOption

func ReadThroughCursorStore

func ReadThroughCursorStore(primary, fallback reflex.CursorStore) reflex.CursorStore

ReadThroughCursorStore provides a cursor store that queries the fallback cursor store if the cursor is not found in the primary. It writes the cursor value to the primary if not. Cursor updates always go directly to the primary.

Use cases:

  • Migrating cursor stores: Use the new cursor store as the primary and the old cursor store as the fallback. Revert to just the new cursor store after the migration.
  • Programmatic seeding of a cursor: Use a MemCursorStore with the cursor seeded by WithMemCursor as the fallback and the target cursor store as the primary. Revert to just the target cursor store afterwards.

func RunForever

func RunForever(getCtx func() context.Context, req reflex.Spec)

RunForever continuously calls the run function, backing off and logging on unexpected errors.

func WithPollBackoff

func WithPollBackoff(d time.Duration) func(*Poller)

WithPollBackoff returns a option to configure the polling backoff (period) if no new events are available.

func WithSleep

func WithSleep(_ *testing.T, fn func(d time.Duration) <-chan time.Time) func(*Poller)

WithSleep returns a option to configure the sleep function for testing.

Types

type AckConsumer

type AckConsumer struct {
	// contains filtered or unexported fields
}

AckConsumer mirrors the reflex consumer except that events need to be acked explicitly. Ex. if processing batches, only the last event in the batch should be acked.

func NewAckConsumer

func NewAckConsumer(name string, cStore reflex.CursorStore,
	consume AckConsumerFunc,
	opts ...reflex.ConsumerOption,
) *AckConsumer

NewAckConsumer returns a new AckConsumer.

func (*AckConsumer) Consume

func (c *AckConsumer) Consume(ctx context.Context, e *reflex.Event) error

Consume executes the consumer business logic, converting the reflex event to an AckEvent.

func (*AckConsumer) Name

func (c *AckConsumer) Name() string

Name returns the ack consumer name.

type AckConsumerFunc

type AckConsumerFunc func(context.Context, *AckEvent) error

type AckEvent

type AckEvent struct {
	reflex.Event
	// contains filtered or unexported fields
}

AckEvent wraps a reflex event and provides an Ack method to update underlying consumer cursor.

func (*AckEvent) Ack

func (e *AckEvent) Ack(ctx context.Context) error

Ack sets (and flushes) the event id to the underlying cursor store. Note that out-of-order acks is allowed but should be avoided.

type Batch

type Batch []*reflex.Event

Batch is a batch of reflex events.

type BatchConsumeFn

type BatchConsumeFn func(context.Context, Batch) error

type BatchConsumer

type BatchConsumer struct {
	*AckConsumer
	// contains filtered or unexported fields
}

BatchConsumer provides a reflex consumer that buffers events and flushes a batch to the consume function when either flushLen or flushPeriod is reached.

It leverages the AckConsumer internally and acks (updates the cursor) after each batch is consumed.

This consumer is stateful. If the underlying stream errors reflex needs to be reset it to clear its state. It therefore implements the resetter interface. The consumer also implements the stopper interface to stop the processing go-routine when the run completes.

When the batch reaches capacity, the processing happens synchronously and the result is returned with the enqueue request. When the flush period is reached prior to batch capacity, processing happens asynchronously and the result will be returned when the next event is added to the queue. It assumes that the stream is reset to the previous cursor before sending subsequent events.

func NewBatchConsumer

func NewBatchConsumer(name string, cstore reflex.CursorStore,
	consume func(context.Context, Batch) error,
	batchPeriod time.Duration, batchLen int,
	opts ...reflex.ConsumerOption,
) *BatchConsumer

NewBatchConsumer returns a new BatchConsumer. Either batchPeriod or batchLen must be configured (non-zero).

func (*BatchConsumer) Reset

func (c *BatchConsumer) Reset(ctx context.Context) error

Reset ensures that the buffer and error is cleared enabling a clean run of the consumer whilst returning any errors that were found in the consumer.

func (*BatchConsumer) Stop

func (c *BatchConsumer) Stop() error

type ConcurrentConsumer

type ConcurrentConsumer struct {
	// contains filtered or unexported fields
}

ConcurrentConsumer will consume up to maxInFlight events concurrently. When an event has been consumed without error a new one will be picked up. This consumer is ideal when you have independent events that take a significant amount of time to process. Note that events for the same entity (foreign_id) can be processed out of order. The cursor is updated only when all events before it have been completed without error. TODO(adam): Add more options or make compatible with other Consumers

func NewConcurrentConsumer

func NewConcurrentConsumer(
	cStore reflex.CursorStore,
	consumer reflex.Consumer,
) *ConcurrentConsumer

NewConcurrentConsumer creates a new consumer with an inflight buffer of 100 events

func (*ConcurrentConsumer) Consume

func (c *ConcurrentConsumer) Consume(ctx context.Context, e *reflex.Event) error

Consume is used by reflex to feed events into the consumer

func (*ConcurrentConsumer) Name

func (c *ConcurrentConsumer) Name() string

Name returns the consumer name.

func (*ConcurrentConsumer) Reset

func (c *ConcurrentConsumer) Reset() error

Reset stops processing any current events and restarts the consumer ready to start consuming again.

type ConsumerShard

type ConsumerShard struct {
	Name string
	// contains filtered or unexported fields
}

ConsumerShard is one of n consumers, with a formatted name and a unique EventFilter

func ConsumerShards

func ConsumerShards(name string, n int, opts ...ParallelOption) []ConsumerShard

ConsumerShards gets the ConsumerShard for each of a ParallelConsumer or ParallelAckConsumer Each shard is configured to filter events such that each event is processed by one and only one shard of the returned n shards. You only need to call ConsumerShards if you're planning to use different consume functions for each shard. For most cases you can use ParallelSpecs.

func (ConsumerShard) GetFilter

func (s ConsumerShard) GetFilter() EventFilter

GetFilter gets the filter for this shard

type EventFilter

type EventFilter = reflex.EventFilter

EventFilter takes a reflex.Event and returns true if it should be allowed to be processed or false if it shouldn't. It can error if it fails to determine if the event should be processed.

type GapSequence

type GapSequence struct {
	// contains filtered or unexported fields
}

GapSequence can keep track of a maximum "done" identifier in a sequence that is consumed in ascending sequence but completed in a random sequence.

func NewGapSequence

func NewGapSequence() *GapSequence

NewGapSequence creates a new, empty GapSequence

func (GapSequence) CurrentMax

func (s GapSequence) CurrentMax() int64

CurrentMax is the largest `val` (max) that has been Done where all `val`s in Doing are greater than max.

func (*GapSequence) Doing

func (s *GapSequence) Doing(val int64)

Doing marks `val` as in progress CurrentMax will not be allowed to go >= `val` until it is Done

func (*GapSequence) Done

func (s *GapSequence) Done(val int64)

Done marks `val` as done and sets CurrentMax to the lowest done value so far

type HashOption

type HashOption int

HashOption the different hashing option to spread work over consumers.

const (
	// HashOptionEventID results in the most even distribution, but doesn't
	// provide any ordering guarantees. If no hash option is provided then
	// this option is used by default.
	HashOptionEventID HashOption = 0

	// HashOptionEventType will probably result in a very uneven distribution
	// (depending on the total number of event types), but it does guarantee
	// process ordering by type.
	HashOptionEventType HashOption = 1

	// HashOptionEventForeignID should result in a good distribution and
	// guarantees process ordering by foreign id.
	HashOptionEventForeignID HashOption = 2

	// HashOptionCustomHashFn allows the caller to provide a custom hash function
	// allowing them to tailor distribution and ordering for specific needs.
	// Deprecated: Only need to use WithHashFn
	HashOptionCustomHashFn HashOption = 3
)

type MemOpt

type MemOpt func(*memCursorStore)

MemOpt are options for the MemCursorStore

func WithMemCursor

func WithMemCursor(name, cursor string) MemOpt

WithMemCursor returns an option that stores the cursor in the MemCursorStore.

func WithMemCursorInt

func WithMemCursorInt(name string, cursor int64) MemOpt

WithMemCursorInt returns a option that stores the int cursor in the MemCursorStore.

type ParallelOption

type ParallelOption func(pc *parallelConfig)

ParallelOption configures the consumer with different behaviour

func WithConsumerSpecificOpts

func WithConsumerSpecificOpts(f ShardConsumerOpts) ParallelOption

WithConsumerSpecificOpts gets consumer options per shard

func WithHashFn

func WithHashFn(fn func(event *reflex.Event) ([]byte, error)) ParallelOption

WithHashFn specifies the custom hash function that will be used to distribute work to parallel consumers.

func WithHashOption

func WithHashOption(opt HashOption) ParallelOption

WithHashOption allows you to use one of the HashOption values to determine how events are distributed

func WithNameFormatter

func WithNameFormatter(fn func(base string, m, n int) string) ParallelOption

WithNameFormatter determines how each consumer name will be constructed. The default name formatter takes the base string and adds "_m_of_n" to the end. e.g. "test" becomes "test_3_of_8"

func WithStreamOpts

func WithStreamOpts(opts ...reflex.StreamOption) ParallelOption

WithStreamOpts passes stream options in to the reflex.Spec

type PollFunc

type PollFunc func(ctx context.Context, after string, opts ...reflex.StreamOption) ([]reflex.Event, error)

PollFunc returns subsequent events after the cursor or an error. It should return an empty list if no subsequent events are available. It may support stream options.

type Poller

type Poller struct {
	// contains filtered or unexported fields
}

Poller is an adapter that provides a reflex stream API on top of a polling (or pagination) API; so pull to push.

The polling API needs to provide consistent and stable ordering of events and reliable event IDs that are used by reflex as the cursor.

This is useful if one needs to continuously sync data from a polling API.

func NewPoller

func NewPoller(pollFunc PollFunc, opts ...func(*Poller)) Poller

NewPoller returns a new poller for the given poll function.

func (Poller) Stream

func (p Poller) Stream(ctx context.Context, after string,
	opts ...reflex.StreamOption,
) (reflex.StreamClient, error)

Stream implements reflex.StreamFunc and returns a StreamClient that streams events from the underlying polling API after the provided cursor. Stream is safe to call from multiple goroutines, but the returned StreamClient is only safe for a single goroutine to use.

type ShardConsumerOpts

type ShardConsumerOpts func(string) []reflex.ConsumerOption

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL