Documentation ¶
Index ¶
- Variables
- type CheckpointStore
- type CheckpointStoreConfig
- type ConsistentSegmentStore
- func (r *ConsistentSegmentStore) Create(ctx context.Context) (core.SegmentWriter, error)
- func (r *ConsistentSegmentStore) Delete(ctx context.Context, segment core.Segment) error
- func (r *ConsistentSegmentStore) Events() <-chan core.SegmentEventRequest
- func (r *ConsistentSegmentStore) ListSegments(ctx context.Context, region, topic string, partition uint32) (map[core.Segment]core.SegmentInfo, error)
- func (r *ConsistentSegmentStore) Open(ctx context.Context, segment core.Segment) (core.SegmentReader, error)
- func (r *ConsistentSegmentStore) Start() error
- func (r *ConsistentSegmentStore) Stop()
- type ConsistentSegmentStoreConfig
- type S3SegmentStore
- func (r *S3SegmentStore) Create(ctx context.Context) (core.SegmentWriter, error)
- func (r *S3SegmentStore) Delete(ctx context.Context, segment core.Segment) error
- func (r *S3SegmentStore) Events() <-chan core.SegmentEventRequest
- func (r *S3SegmentStore) ListSegments(ctx context.Context, region, topic string, partition uint32) (map[core.Segment]core.SegmentInfo, error)
- func (r *S3SegmentStore) Open(ctx context.Context, segment core.Segment) (core.SegmentReader, error)
- type S3SegmentStoreConfig
- type SQSSegmentEventSource
- type SQSSegmentEventSourceConfig
Constants ¶
This section is empty.
Variables ¶
var ( // DefaultEventsRetentionCheckInterval is the default interval when background expiration task is running. DefaultEventsRetentionCheckInterval = time.Minute // DefaultSegmentEventsChanSize is the default size of events buffered channel. DefaultSegmentEventsChanSize = 100 )
var ( // DefaultS3KeyPrefix is the default prefix for stored segments. DefaultS3KeyPrefix = "data" // DefaultS3TempKeyPrefix is the default prefix for temporary segments. DefaultS3TempKeyPrefix = "temp" )
var ( // DefaultSQSReceiveMaxNumberOfMessages is the default maximum number of messages to return in each receive messages call. DefaultSQSReceiveMaxNumberOfMessages = 10 // DefaultSQSReceiveWaitTime is default duration for which the receive call waits for a message to arrive in the queue before returning. DefaultSQSReceiveWaitTime = 20 * time.Second // DefaultSQSMessageVisibilityTimeout is the default duration that the received messages are hidden from subsequent retrieve requests. DefaultSQSMessageVisibilityTimeout = 5 * time.Second // DefaultSQSMessageMaxRetryCount is the default maximum number of retries for a failed message before it is dropped. DefaultSQSMessageMaxRetryCount = 3 // DefaultSQSEventsChanSize is the default size of events buffered channel. DefaultSQSEventsChanSize = 100 )
Functions ¶
This section is empty.
Types ¶
type CheckpointStore ¶
type CheckpointStore struct {
// contains filtered or unexported fields
}
CheckpointStore represents the checkpoint store used by ingress controller
func NewCheckpointStore ¶
func NewCheckpointStore(config CheckpointStoreConfig) (*CheckpointStore, error)
NewCheckpointStore returns a new CheckpointStore instance
func (*CheckpointStore) Load ¶
func (r *CheckpointStore) Load(region, topic string, partition uint32) *core.Checkpoint
Load returns the last saved checkpoint
func (*CheckpointStore) Save ¶
func (r *CheckpointStore) Save(checkpoint core.Checkpoint) error
Save persists the checkpoint
func (*CheckpointStore) Start ¶
func (r *CheckpointStore) Start() error
Start will start updating store state
type CheckpointStoreConfig ¶
type CheckpointStoreConfig struct { // PubSub is used: // - store ingress checkpoints, and // - distribute the checkpoints to all running instances. PubSub core.Factory `required:"true"` // Unique name that identifies the local region/data center/cloud. // // Field value is required. LocalRegion string `required:"true"` // Kafka topic name for ingress checkpoints. // // The topic cleanup policy needs to be set to 'compacted' to retain // only the last checkpoint for each <region, topic, partition> tuple. // This avoids unecessary disk space consumption and improved startup time. // // Field value is required. Topic string `required:"true"` }
CheckpointStoreConfig is the checkpoint store configuration
func (CheckpointStoreConfig) Get ¶
func (c CheckpointStoreConfig) Get() (interface{}, error)
Get creates the corresponding instance
type ConsistentSegmentStore ¶
type ConsistentSegmentStore struct {
// contains filtered or unexported fields
}
ConsistentSegmentStore is the decorator that handles segment events to provide a consistent view of underlying store state.
func NewConsistentSegmentStore ¶
func NewConsistentSegmentStore(config ConsistentSegmentStoreConfig) (*ConsistentSegmentStore, error)
NewConsistentSegmentStore returns a new ConsistentSegmentStore instance
func (*ConsistentSegmentStore) Create ¶
func (r *ConsistentSegmentStore) Create(ctx context.Context) (core.SegmentWriter, error)
Create will create a new segment file
func (*ConsistentSegmentStore) Events ¶
func (r *ConsistentSegmentStore) Events() <-chan core.SegmentEventRequest
Events returns the segment event channel
func (*ConsistentSegmentStore) ListSegments ¶
func (r *ConsistentSegmentStore) ListSegments(ctx context.Context, region, topic string, partition uint32) (map[core.Segment]core.SegmentInfo, error)
ListSegments returns the requested segments
func (*ConsistentSegmentStore) Open ¶
func (r *ConsistentSegmentStore) Open(ctx context.Context, segment core.Segment) (core.SegmentReader, error)
Open will open the segment for reading
func (*ConsistentSegmentStore) Start ¶
func (r *ConsistentSegmentStore) Start() error
Start will start updating store state
func (*ConsistentSegmentStore) Stop ¶
func (r *ConsistentSegmentStore) Stop()
Stop will stop updating store state
type ConsistentSegmentStoreConfig ¶
type ConsistentSegmentStoreConfig struct { // PubSub is used to: // - distribute the incoming events to all running instances, // - temporarily store events for group rebalance scenarios, and // - to address problems that can arise due to AWS S3 eventual consistency model. PubSub core.Factory `required:"true"` // SegmentStore is the underlying segment store implementation. SegmentStore core.Factory `required:"true"` // SegmentEventSource is the source of segment events for underlying segment store. SegmentEventSource core.Factory `required:"true"` // Kafka topic name for storing segment events. // // The topic cleanup policy needs to be set to 'delete' with appropriate // retention time set to discard old segment events. // // Field value is required. Topic string `required:"true"` // The duration each segment event is tracked. // // Should match the configured Kafka topic retention time. // // Field value is required. EventsRetention time.Duration `min:"10m"` // The interval when background expiration task is running to remove old event from memory. // // Default value is set via DefaultEventsRetentionCheckInterval variable. EventsRetentionCheckInterval time.Duration `min:"1ms" max:"10m"` // Size of events buffered channel. // // Default value is set via DefaultSegmentEventsChanSize variable. EventsChanSize int `min:"1"` }
ConsistentSegmentStoreConfig is the ConsistentSegmentStore configuration.
func (ConsistentSegmentStoreConfig) Get ¶
func (c ConsistentSegmentStoreConfig) Get() (interface{}, error)
Get creates the corresponding instance
type S3SegmentStore ¶
type S3SegmentStore struct {
// contains filtered or unexported fields
}
S3SegmentStore is the segment storage backed by AWS S3
func NewS3SegmentStore ¶
func NewS3SegmentStore(config S3SegmentStoreConfig) (*S3SegmentStore, error)
NewS3SegmentStore returns a new S3SegmentStore instance
func (*S3SegmentStore) Create ¶
func (r *S3SegmentStore) Create(ctx context.Context) (core.SegmentWriter, error)
Create will create a new segment file
func (*S3SegmentStore) Events ¶
func (r *S3SegmentStore) Events() <-chan core.SegmentEventRequest
Events channel is not supported in this implementation
func (*S3SegmentStore) ListSegments ¶
func (r *S3SegmentStore) ListSegments(ctx context.Context, region, topic string, partition uint32) (map[core.Segment]core.SegmentInfo, error)
ListSegments returns all segments for the provided parameters
func (*S3SegmentStore) Open ¶
func (r *S3SegmentStore) Open(ctx context.Context, segment core.Segment) (core.SegmentReader, error)
Open will open the segment for reading
type S3SegmentStoreConfig ¶
type S3SegmentStoreConfig struct { // The segment file format used to read and write segments. SegmentFormat core.Factory `required:"true"` // The AWS config object used to create the AWS S3 client. // // Field value is required. AWSConfig *aws.Config `required:"true"` // The AWS session object used to create the AWS S3 client. // // Field value is required. AWSSession *session.Session `required:"true"` // Bucket name to store segments. // // Field value is required. Bucket string `required:"true"` // Key prefix for written segment objects. // // Default value is set via DefaultS3KeyPrefix variable. KeyPrefix string // Key prefix for temporary segment objects. // // Default value is set via DefaultS3TempKeyPrefix variable. TempKeyPrefix string // Breaker enables tracking AWS S3 client error rate. // // Default value is set via DefaultS3Breaker variable. Breaker core.Breaker }
S3SegmentStoreConfig is the configuration for AWS S3 segment storage
func (S3SegmentStoreConfig) Get ¶
func (c S3SegmentStoreConfig) Get() (interface{}, error)
Get creates the corresponding instance
type SQSSegmentEventSource ¶
type SQSSegmentEventSource struct {
// contains filtered or unexported fields
}
SQSSegmentEventSource is the worker for AWS S3 notification events sent to AWS SQS
func NewSQSSegmentEventSource ¶
func NewSQSSegmentEventSource(config SQSSegmentEventSourceConfig) (*SQSSegmentEventSource, error)
NewSQSSegmentEventSource returns a new SQSSegmentEventSource instance
func (*SQSSegmentEventSource) Events ¶
func (w *SQSSegmentEventSource) Events() <-chan core.SegmentEventRequest
Events returns the segment event channel
func (*SQSSegmentEventSource) Start ¶
func (w *SQSSegmentEventSource) Start() error
Start will start the worker
func (*SQSSegmentEventSource) Stop ¶
func (w *SQSSegmentEventSource) Stop()
Stop will stop the worker
type SQSSegmentEventSourceConfig ¶
type SQSSegmentEventSourceConfig struct { // The AWS config object used to create the AWS SQS client. // // Field value is required. AWSConfig *aws.Config `required:"true"` // The AWS session object used to create the AWS SQS client. // // Field value is required. AWSSession *session.Session `required:"true"` // AWS SQS queue name where AWS S3 notification events are published. // // The implementation expects that both Created and Removed event types to be enabled // for keys storing segments (i.e. the keys with DataKeyPrefix). // // Check the AWS S3 documentation for instructions on how to enable event notifications: // https://docs.aws.amazon.com/AmazonS3/latest/dev/NotificationHowTo.html // // Field value is required. QueueName string `required:"true"` // Key prefix for segment notification events. // // Default value is set via DefaultS3KeyPrefix variable. S3KeyPrefix string // The maximum number of messages to return in each receive messages call. // // Default value is set via DefaultSQSReceiveMaxNumberOfMessages variable. ReceiveMaxNumberOfMessages int `min:"1" max:"10"` // The duration for which the receive call waits for a message to arrive in the queue before returning. // // Minimum value must be greater than zero to enable the use of long polling. // Default value is set via DefaultSQSReceiveWaitTime variable. ReceiveWaitTime time.Duration `min:"5s"` // The duration that the received messages are hidden from subsequent retrieve requests. // // Default value is set via DefaultSQSMessageVisibilityTimeout variable. MessageVisibilityTimeout time.Duration `min:"1s"` // Maximum number of retries for a failed message before it is dropped. // // Default value is set via DefaultSQSMessageMaxRetryCount variable. MessageMaxRetryCount int `min:"1"` // Size of events buffered channel. // // Default value is set via DefaultSQSEventsChanSize variable. EventsChanSize int `min:"1"` // Breaker enables tracking AWS SQS client error rate. // // Default value is set via DefaultSQSBreaker variable. Breaker core.Breaker }
SQSSegmentEventSourceConfig is the configuration for AWS SQS segment event source
func (SQSSegmentEventSourceConfig) Get ¶
func (c SQSSegmentEventSourceConfig) Get() (interface{}, error)
Get creates the corresponding instance