batch

package module
v0.0.0-...-93a203a Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 26, 2023 License: Apache-2.0 Imports: 7 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type EntryValue

type EntryValue interface {
	CleanUp()
}

type GetLimiterFunc

type GetLimiterFunc func(key string) *rate.Limiter

GetLimiterFunc returns the limiter for a specific batch key. The function must return the same value for any given key.

type GetProcessorMetricsRecorderFunc

type GetProcessorMetricsRecorderFunc func() ProcessorMetricsRecorder

GetProcessorMetricsRecorderFunc returns a metrics recorder a Processor can use to record metrics.

type MetricsRecorder

type MetricsRecorder interface {
	// Records the rate limit delay, if any, incurred by the current batch.
	RecordRateLimitDelay(delay time.Duration)

	// Records completion of processing for the current batch.
	RecordBatchCompletion(size int, err error)
}

MetricsRecorder defines an interface for recording batch metrics.

type ProcessFunc

type ProcessFunc func(ctx context.Context, key string, values []interface{}) ([]interface{}, error)

ProcessFunc is the function that processes a batch of values and return results.

An implementation receives an slice of values to process. It must return an slice of results of the same size; each element corresponding to a result for the element at the same index in the values slice. If an entry contains an error, an error will be returned to the caller that submitted the value. The batch function may return an empty or nil slice iff it returns an error.

type Processor

type Processor struct {
	// contains filtered or unexported fields
}

Processor provides facilities to batch values in keyed batches and invoke a function to process a batch of values. It guarantees that only a single invocation of the function can be in-flight at any given time for each keyed batch.

func NewProcessor

func NewProcessor(fn ProcessFunc, options ...ProcessorOption) *Processor

NewProcessor returns a new batch processor.

func (*Processor) Delete

func (p *Processor) Delete(key string)

Deletes the keyed batch. The function returns when all batch processing has stopped for the specified key. Operations currently in-flight will complete, but any pending requests will be canceled.

func (*Processor) Do

func (p *Processor) Do(ctx context.Context, key string, value interface{}) (interface{}, error)

Do adds the value to the keyed batch and invokes the batch processing function. It guarantees that only one invocation may be in-flight at any given time for the specified key.

func (*Processor) DoChan

func (p *Processor) DoChan(ctx context.Context, key string, value interface{}) <-chan (Result)

DoChan performs the same action as Do, but returns a channel that the caller uses to receive results asynchronously.

type ProcessorMetricsRecorder

type ProcessorMetricsRecorder interface {
	// Records the processing start for a single batch of values for the specified key.
	RecordBatchStart(key string) MetricsRecorder
}

ProcessorMetricsRecorder defines an interface for recording batch processor metrics.

type ProcessorOption

type ProcessorOption func(p *Processor)

ProcessorOption modifies the Process configuration.

func WithBatchLimits

func WithBatchLimits(limit rate.Limit, burst int) ProcessorOption

WithBatchLimits applies rate limits that are used to throttle batch processing of each key independently.

func WithDelayBeforeStart

func WithDelayBeforeStart(delay time.Duration) ProcessorOption

WithDelayBeforeStart applies the specified time delay before starting batch processing for each unique key.

func WithGlobalLimiter

func WithGlobalLimiter(limiter *rate.Limiter) ProcessorOption

WithGlobalLimiter applies a rate limiter that is used to throttle batch processing of all keys.

func WithGlobalLimits

func WithGlobalLimits(limit rate.Limit, burst int) ProcessorOption

WithGlobalLimits applies rate limits that are used to throttle batch processing of all keys.

func WithLogger

func WithLogger(l logr.Logger) ProcessorOption

WithLogger sets the logger.

func WithMetricsRecorder

func WithMetricsRecorder(metricsRecorder ProcessorMetricsRecorder) ProcessorOption

WithMetricsRecorder sets the metrics recorder.

func WithVerboseLogLevel

func WithVerboseLogLevel(level int) ProcessorOption

WithVerboseLogLevel sets the verbose logging level

type Result

type Result struct {
	// contains filtered or unexported fields
}

Result represents the Result of a batched call for a specific value.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL