stores

package
v0.0.0-...-7055b2f Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 21, 2021 License: Apache-2.0 Imports: 18 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// DefaultEventsRetentionCheckInterval is the default interval when background expiration task is running.
	DefaultEventsRetentionCheckInterval = time.Minute

	// DefaultSegmentEventsChanSize is the default size of events buffered channel.
	DefaultSegmentEventsChanSize = 100
)
View Source
var (
	// DefaultS3KeyPrefix is the default prefix for stored segments.
	DefaultS3KeyPrefix = "data"

	// DefaultS3TempKeyPrefix is the default prefix for temporary segments.
	DefaultS3TempKeyPrefix = "temp"
)
View Source
var (
	// DefaultSQSReceiveMaxNumberOfMessages is the default maximum number of messages to return in each receive messages call.
	DefaultSQSReceiveMaxNumberOfMessages = 10

	// DefaultSQSReceiveWaitTime is default duration for which the receive call waits for a message to arrive in the queue before returning.
	DefaultSQSReceiveWaitTime = 20 * time.Second

	// DefaultSQSMessageVisibilityTimeout is the default duration that the received messages are hidden from subsequent retrieve requests.
	DefaultSQSMessageVisibilityTimeout = 5 * time.Second

	// DefaultSQSMessageMaxRetryCount is the default maximum number of retries for a failed message before it is dropped.
	DefaultSQSMessageMaxRetryCount = 3

	// DefaultSQSEventsChanSize is the default size of events buffered channel.
	DefaultSQSEventsChanSize = 100
)

Functions

This section is empty.

Types

type CheckpointStore

type CheckpointStore struct {
	// contains filtered or unexported fields
}

CheckpointStore represents the checkpoint store used by ingress controller

func NewCheckpointStore

func NewCheckpointStore(config CheckpointStoreConfig) (*CheckpointStore, error)

NewCheckpointStore returns a new CheckpointStore instance

func (*CheckpointStore) Load

func (r *CheckpointStore) Load(region, topic string, partition uint32) *core.Checkpoint

Load returns the last saved checkpoint

func (*CheckpointStore) Save

func (r *CheckpointStore) Save(checkpoint core.Checkpoint) error

Save persists the checkpoint

func (*CheckpointStore) Start

func (r *CheckpointStore) Start() error

Start will start updating store state

func (*CheckpointStore) Stop

func (r *CheckpointStore) Stop()

Stop will stop updating store state

type CheckpointStoreConfig

type CheckpointStoreConfig struct {
	// PubSub is used:
	//   - store ingress checkpoints, and
	//   - distribute the checkpoints to all running instances.
	PubSub core.Factory `required:"true"`

	// Unique name that identifies the local region/data center/cloud.
	//
	// Field value is required.
	LocalRegion string `required:"true"`

	// Kafka topic name for ingress checkpoints.
	//
	// The topic cleanup policy needs to be set to 'compacted' to retain
	// only the last checkpoint for each <region, topic, partition> tuple.
	// This avoids unecessary disk space consumption and improved startup time.
	//
	// Field value is required.
	Topic string `required:"true"`
}

CheckpointStoreConfig is the checkpoint store configuration

func (CheckpointStoreConfig) Get

func (c CheckpointStoreConfig) Get() (interface{}, error)

Get creates the corresponding instance

type ConsistentSegmentStore

type ConsistentSegmentStore struct {
	// contains filtered or unexported fields
}

ConsistentSegmentStore is the decorator that handles segment events to provide a consistent view of underlying store state.

func NewConsistentSegmentStore

func NewConsistentSegmentStore(config ConsistentSegmentStoreConfig) (*ConsistentSegmentStore, error)

NewConsistentSegmentStore returns a new ConsistentSegmentStore instance

func (*ConsistentSegmentStore) Create

Create will create a new segment file

func (*ConsistentSegmentStore) Delete

func (r *ConsistentSegmentStore) Delete(ctx context.Context, segment core.Segment) error

Delete removes the provided segment

func (*ConsistentSegmentStore) Events

func (r *ConsistentSegmentStore) Events() <-chan core.SegmentEventRequest

Events returns the segment event channel

func (*ConsistentSegmentStore) ListSegments

func (r *ConsistentSegmentStore) ListSegments(ctx context.Context, region, topic string, partition uint32) (map[core.Segment]core.SegmentInfo, error)

ListSegments returns the requested segments

func (*ConsistentSegmentStore) Open

Open will open the segment for reading

func (*ConsistentSegmentStore) Start

func (r *ConsistentSegmentStore) Start() error

Start will start updating store state

func (*ConsistentSegmentStore) Stop

func (r *ConsistentSegmentStore) Stop()

Stop will stop updating store state

type ConsistentSegmentStoreConfig

type ConsistentSegmentStoreConfig struct {
	// PubSub is used to:
	//   - distribute the incoming events to all running instances,
	//   - temporarily store events for group rebalance scenarios, and
	//   - to address problems that can arise due to AWS S3 eventual consistency model.
	PubSub core.Factory `required:"true"`

	// SegmentStore is the underlying segment store implementation.
	SegmentStore core.Factory `required:"true"`

	// SegmentEventSource is the source of segment events for underlying segment store.
	SegmentEventSource core.Factory `required:"true"`

	// Kafka topic name for storing segment events.
	//
	// The topic cleanup policy needs to be set to 'delete' with appropriate
	// retention time set to discard old segment events.
	//
	// Field value is required.
	Topic string `required:"true"`

	// The duration each segment event is tracked.
	//
	// Should match the configured Kafka topic retention time.
	//
	// Field value is required.
	EventsRetention time.Duration `min:"10m"`

	// The interval when background expiration task is running to remove old event from memory.
	//
	// Default value is set via DefaultEventsRetentionCheckInterval variable.
	EventsRetentionCheckInterval time.Duration `min:"1ms" max:"10m"`

	// Size of events buffered channel.
	//
	// Default value is set via DefaultSegmentEventsChanSize variable.
	EventsChanSize int `min:"1"`
}

ConsistentSegmentStoreConfig is the ConsistentSegmentStore configuration.

func (ConsistentSegmentStoreConfig) Get

func (c ConsistentSegmentStoreConfig) Get() (interface{}, error)

Get creates the corresponding instance

type S3SegmentStore

type S3SegmentStore struct {
	// contains filtered or unexported fields
}

S3SegmentStore is the segment storage backed by AWS S3

func NewS3SegmentStore

func NewS3SegmentStore(config S3SegmentStoreConfig) (*S3SegmentStore, error)

NewS3SegmentStore returns a new S3SegmentStore instance

func (*S3SegmentStore) Create

Create will create a new segment file

func (*S3SegmentStore) Delete

func (r *S3SegmentStore) Delete(ctx context.Context, segment core.Segment) error

Delete removes the provided segment

func (*S3SegmentStore) Events

func (r *S3SegmentStore) Events() <-chan core.SegmentEventRequest

Events channel is not supported in this implementation

func (*S3SegmentStore) ListSegments

func (r *S3SegmentStore) ListSegments(ctx context.Context, region, topic string, partition uint32) (map[core.Segment]core.SegmentInfo, error)

ListSegments returns all segments for the provided parameters

func (*S3SegmentStore) Open

func (r *S3SegmentStore) Open(ctx context.Context, segment core.Segment) (core.SegmentReader, error)

Open will open the segment for reading

type S3SegmentStoreConfig

type S3SegmentStoreConfig struct {
	// The segment file format used to read and write segments.
	SegmentFormat core.Factory `required:"true"`

	// The AWS config object used to create the AWS S3 client.
	//
	// Field value is required.
	AWSConfig *aws.Config `required:"true"`

	// The AWS session object used to create the AWS S3 client.
	//
	// Field value is required.
	AWSSession *session.Session `required:"true"`

	// Bucket name to store segments.
	//
	// Field value is required.
	Bucket string `required:"true"`

	// Key prefix for written segment objects.
	//
	// Default value is set via DefaultS3KeyPrefix variable.
	KeyPrefix string

	// Key prefix for temporary segment objects.
	//
	// Default value is set via DefaultS3TempKeyPrefix variable.
	TempKeyPrefix string

	// Breaker enables tracking AWS S3 client error rate.
	//
	// Default value is set via DefaultS3Breaker variable.
	Breaker core.Breaker
}

S3SegmentStoreConfig is the configuration for AWS S3 segment storage

func (S3SegmentStoreConfig) Get

func (c S3SegmentStoreConfig) Get() (interface{}, error)

Get creates the corresponding instance

type SQSSegmentEventSource

type SQSSegmentEventSource struct {
	// contains filtered or unexported fields
}

SQSSegmentEventSource is the worker for AWS S3 notification events sent to AWS SQS

func NewSQSSegmentEventSource

func NewSQSSegmentEventSource(config SQSSegmentEventSourceConfig) (*SQSSegmentEventSource, error)

NewSQSSegmentEventSource returns a new SQSSegmentEventSource instance

func (*SQSSegmentEventSource) Events

func (w *SQSSegmentEventSource) Events() <-chan core.SegmentEventRequest

Events returns the segment event channel

func (*SQSSegmentEventSource) Start

func (w *SQSSegmentEventSource) Start() error

Start will start the worker

func (*SQSSegmentEventSource) Stop

func (w *SQSSegmentEventSource) Stop()

Stop will stop the worker

type SQSSegmentEventSourceConfig

type SQSSegmentEventSourceConfig struct {
	// The AWS config object used to create the AWS SQS client.
	//
	// Field value is required.
	AWSConfig *aws.Config `required:"true"`

	// The AWS session object used to create the AWS SQS client.
	//
	// Field value is required.
	AWSSession *session.Session `required:"true"`

	// AWS SQS queue name where AWS S3 notification events are published.
	//
	// The implementation expects that both Created and Removed event types to be enabled
	// for keys storing segments (i.e. the keys with DataKeyPrefix).
	//
	// Check the AWS S3 documentation for instructions on how to enable event notifications:
	// https://docs.aws.amazon.com/AmazonS3/latest/dev/NotificationHowTo.html
	//
	// Field value is required.
	QueueName string `required:"true"`

	// Key prefix for segment notification events.
	//
	// Default value is set via DefaultS3KeyPrefix variable.
	S3KeyPrefix string

	// The maximum number of messages to return in each receive messages call.
	//
	// Default value is set via DefaultSQSReceiveMaxNumberOfMessages variable.
	ReceiveMaxNumberOfMessages int `min:"1" max:"10"`

	// The duration for which the receive call waits for a message to arrive in the queue before returning.
	//
	// Minimum value must be greater than zero to enable the use of long polling.
	// Default value is set via DefaultSQSReceiveWaitTime variable.
	ReceiveWaitTime time.Duration `min:"5s"`

	// The duration that the received messages are hidden from subsequent retrieve requests.
	//
	// Default value is set via DefaultSQSMessageVisibilityTimeout variable.
	MessageVisibilityTimeout time.Duration `min:"1s"`

	// Maximum number of retries for a failed message before it is dropped.
	//
	// Default value is set via DefaultSQSMessageMaxRetryCount variable.
	MessageMaxRetryCount int `min:"1"`

	// Size of events buffered channel.
	//
	// Default value is set via DefaultSQSEventsChanSize variable.
	EventsChanSize int `min:"1"`

	// Breaker enables tracking AWS SQS client error rate.
	//
	// Default value is set via DefaultSQSBreaker variable.
	Breaker core.Breaker
}

SQSSegmentEventSourceConfig is the configuration for AWS SQS segment event source

func (SQSSegmentEventSourceConfig) Get

func (c SQSSegmentEventSourceConfig) Get() (interface{}, error)

Get creates the corresponding instance

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL