Documentation ¶
Index ¶
- Constants
- func NewClient(config AWSConfig) (*kinesis.Kinesis, error)
- type AWSConfig
- type AverageStats
- type Checkpoint
- type CheckpointStrategy
- type Consumer
- func (c *Consumer) Log(level string, data map[string]interface{}, format string, args ...interface{})
- func (c *Consumer) LogEvent(event EventLog)
- func (c *Consumer) ResetIterators() error
- func (c *Consumer) Run(ctx context.Context) error
- func (c *Consumer) SetEventLogger(eventLogger EventLogger)
- func (c *Consumer) SetLogger(logger Logger)
- func (c *Consumer) Start() error
- func (c *Consumer) Stats() ConsumerStats
- func (c *Consumer) Stop() error
- type ConsumerConfig
- type ConsumerIterator
- type ConsumerOption
- func SinceLatest() ConsumerOption
- func SinceOldest() ConsumerOption
- func SinceSequence(shardID string, sequence string) ConsumerOption
- func SkipReshardingOrder() ConsumerOption
- func WithCheckpointStrategy(strategy CheckpointStrategy) ConsumerOption
- func WithClient(client kinesisiface.KinesisAPI) ConsumerOption
- func WithShards(shardIDs ...string) ConsumerOption
- func WithSpecificIterators(iterators map[string]ConsumerIterator) ConsumerOption
- type ConsumerOptions
- type ConsumerStats
- type EventLog
- type EventLogger
- type IteratorType
- type Logger
- type Message
- type MessageHandler
- type Producer
- func (producer Producer) Publish(message Message) error
- func (producer Producer) PublishBatch(messages []Message) error
- func (producer Producer) PublishBatchWithContext(ctx context.Context, messages []Message) error
- func (producer Producer) PublishWithContext(ctx context.Context, message Message) error
- type ProducerConfig
- type Runner
- type RunnerFactory
- type StreamChecker
Constants ¶
const ( // AfterRecord is a checkpoint strategy // When set it stores checkpoint in every record. AfterRecord CheckpointStrategy = iota // AfterRecordBatch is a checkpoint strategy // When set it stores checkpoint in every record batch. AfterRecordBatch // IteratorTypeTail is a iterator type that defines that consumer starts reading from tail. IteratorTypeTail IteratorType = iota // IteratorTypeHead is a iterator type that defines that consumer starts reading from beginning. IteratorTypeHead // IteratorTypeSequence is a iterator type that defines that consumer starts reading from a sequence number. IteratorTypeSequence // IteratorTypeAfterSequence is a iterator type that defines that consumer starts reading from a sequence number + 1. IteratorTypeAfterSequence )
const ( // StreamCheckedTriggered event StreamCheckedTriggered = "stream_checker_triggered" // ShardManagerTriggered event ShardManagerTriggered = "shard_manager_triggered" // ShardIteratorTriggered event ShardIteratorTriggered = "shard_iterator_triggered" // RecordProcessedSuccess event RecordProcessedSuccess = "record_processed_success" // RecordProcessedFail event RecordProcessedFail = "record_processed_fail" )
const ( // LevelDebug LogLevel LevelDebug = "debug" // LevelInfo LogLevel LevelInfo = "info" // LevelError LogLevel LevelError = "error" )
Variables ¶
This section is empty.
Functions ¶
Types ¶
type AWSConfig ¶
type AWSConfig struct { Endpoint string `json:"endpoint" mapstructure:"endpoint"` Region string `json:"region" mapstructure:"region"` }
AWSConfig is a aws configuration.
type AverageStats ¶
AverageStats holds average counters.
type Checkpoint ¶
Checkpoint manages last checkpoint.
type Consumer ¶
type Consumer struct { ConsumerOptions // contains filtered or unexported fields }
Consumer is a kinesis stream consumer.
func NewConsumer ¶
func NewConsumer(config ConsumerConfig, handler MessageHandler, checkpoint Checkpoint, opts ...ConsumerOption) (*Consumer, error)
NewConsumer creates a new kinesis consumer
func (*Consumer) Log ¶
func (c *Consumer) Log(level string, data map[string]interface{}, format string, args ...interface{})
Log main logger.
func (*Consumer) ResetIterators ¶
ResetIterators resets iterator for active shards.
func (*Consumer) SetEventLogger ¶
func (c *Consumer) SetEventLogger(eventLogger EventLogger)
SetEventLogger allows you to set the event logger.
type ConsumerConfig ¶
type ConsumerConfig struct { AWS AWSConfig `json:"aws" mapstructure:"aws"` Group string `json:"group" mapstructure:"group" validate:"nonzero"` Stream string `json:"stream" mapstructure:"stream" validate:"nonzero"` StreamCheckTick time.Duration `json:"stream_tick" mapstructure:"stream_tick"` RunnerFactoryTick time.Duration `json:"runner_factory_tick" mapstructure:"runner_factory_tick"` RunnerTick time.Duration `json:"runner_tick" mapstructure:"runner_tick"` RunnerGetRecordsRate time.Duration `json:"runner_get_records_rate" mapstructure:"runner_get_records_rate"` }
ConsumerConfig is a kinesis consumer configuration.
type ConsumerIterator ¶
type ConsumerIterator struct { Type IteratorType ShardID string Sequence string }
ConsumerIterator is a iterator configuration by shard.
type ConsumerOption ¶
type ConsumerOption func(*ConsumerOptions)
ConsumerOption is the abstract functional-parameter type used for worker configuration.
func SinceLatest ¶
func SinceLatest() ConsumerOption
SinceLatest allows you to set kinesis iterator. Starts reading just after the most recent record in the shard
func SinceOldest ¶
func SinceOldest() ConsumerOption
SinceOldest allows you to set kinesis iterator. Starts reading at the last untrimmed record in the shard in the system
func SinceSequence ¶
func SinceSequence(shardID string, sequence string) ConsumerOption
SinceSequence allows you to set kinesis iterator. Starts reading since sequence number in a specific shard.
func SkipReshardingOrder ¶
func SkipReshardingOrder() ConsumerOption
SkipReshardingOrder allows you to set consumer to start reading shards since detected.
func WithCheckpointStrategy ¶
func WithCheckpointStrategy(strategy CheckpointStrategy) ConsumerOption
WithCheckpointStrategy allows you to configure checkpoint strategy.
func WithClient ¶
func WithClient(client kinesisiface.KinesisAPI) ConsumerOption
WithClient allows you to set a kinesis client.
func WithShards ¶
func WithShards(shardIDs ...string) ConsumerOption
WithShards allows you to set a filtered shards. Consumer only reads specified shards ids.
func WithSpecificIterators ¶
func WithSpecificIterators(iterators map[string]ConsumerIterator) ConsumerOption
WithSpecificIterators allows you to set a specific iterators per shard.
type ConsumerOptions ¶
type ConsumerOptions struct {
// contains filtered or unexported fields
}
ConsumerOptions holds all consumer options.
type ConsumerStats ¶
type ConsumerStats struct { RecordsFailed AverageStats RecordsSuccess AverageStats }
ConsumerStats structure collects all stats.
type EventLogger ¶
type EventLogger interface {
LogEvent(event EventLog)
}
EventLogger callback that is called every event.
type Logger ¶
type Logger interface {
Log(level string, data map[string]interface{}, format string, args ...interface{})
}
Logger interface
type MessageHandler ¶
MessageHandler is the message handler.
type Producer ¶
type Producer struct {
// contains filtered or unexported fields
}
Producer should be able to dispatch messages
func NewProducer ¶
func NewProducer(config ProducerConfig) (*Producer, error)
NewProducer creates a new kinesis producer
func (Producer) PublishBatch ¶
PublishBatch publishes contents to kinesis.
func (Producer) PublishBatchWithContext ¶
PublishBatchWithContext publishes contents to kinesis.
type ProducerConfig ¶
type ProducerConfig struct { AWS AWSConfig `json:"aws" mapstructure:"aws" validate:"nonzero"` Stream string `json:"stream" mapstructure:"stream" validate:"nonzero"` }
ProducerConfig is a kinesis producer configuration.
type Runner ¶
type Runner interface { Start(ctx context.Context) error Stop(ctx context.Context) error ShardID() string Closed() bool RestartCursor() }
Runner is a shard iterator.
type RunnerFactory ¶
RunnerFactory handler stream sharding.
type StreamChecker ¶
StreamChecker checks stream state and handles it.