Documentation ¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
var ( OffsetOldest = sarama.OffsetOldest OffsetNewest = sarama.OffsetNewest )
Initial offset type
var InitialOffset = OffsetNewest
InitialOffset allows to configure global initial offset from which to start consuming partitions which doesn't have offsets stored in the kafka_offsets table
var KafkaConfig *sarama.Config
KafkaConfig global per process Sarama config
var Pipes map[string]constructor
Pipes is the list of registered pipes Plugins insert their constructors into this map
Functions ¶
func CacheDestroy ¶
func CacheDestroy()
CacheDestroy releases all resources associated with cached pipes
Types ¶
type Consumer ¶
type Consumer interface { Close() error //CloseOnFailure doesn't save offsets CloseOnFailure() error Message() chan interface{} Error() chan error FetchNext() (interface{}, error) //Allows to explicitly persists current consumer position SaveOffset() error //SetFormat allow to tell consumer the format of the file when there is no //header SetFormat(format string) }
Consumer consumer interface for the pipe
type Header ¶
type Header struct { Format string Filters []string `json:",omitempty"` Schema []byte `json:",omitempty"` Delimited bool `json:",omitempty"` HMAC string `json:"HMAC-SHA256,omitempty"` IV string `json:"AES256-CFB-IV,omitempty"` }
Header represent file metadata in the beginning of the file
type KafkaPipe ¶
type KafkaPipe struct {
// contains filtered or unexported fields
}
KafkaPipe is wrapper on top of Sarama library to produce/consume through kafka
- after failure shutdown pipe guarantees to resent last batchSize messages,
meaning batchSize messages may be in flight, reading (batchSize+1)th message automatically acknowledges previous batch.
- producer caches and sends maximum batchSize messages at once
func (*KafkaPipe) Config ¶
func (p *KafkaPipe) Config() *config.PipeConfig
Config returns pipe configuration
func (*KafkaPipe) NewConsumer ¶
NewConsumer registers a new kafka consumer
func (*KafkaPipe) NewProducer ¶
NewProducer registers a new sync producer
type Pipe ¶
type Pipe interface { NewConsumer(topic string) (Consumer, error) NewProducer(topic string) (Producer, error) Type() string Config() *config.PipeConfig Close() error }
Pipe connects named producers and consumers
type Producer ¶
type Producer interface { Push(data interface{}) error PushK(key string, data interface{}) error PushSchema(key string, data []byte) error //PushBatch queues the messages instead of sending immediately PushBatch(key string, data interface{}) error //PushCommit writes out all the messages queued by PushBatch PushBatchCommit() error Close() error CloseOnFailure() error SetFormat(format string) PartitionKey(source string, key string) string }
Producer producer interface for pipe