pipe

package
v1.1.0-beta Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 6, 2020 License: MIT Imports: 44 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	OffsetOldest = sarama.OffsetOldest
	OffsetNewest = sarama.OffsetNewest
)

Initial offset type

View Source
var InitialOffset = OffsetNewest

InitialOffset allows to configure global initial offset from which to start consuming partitions which doesn't have offsets stored in the kafka_offsets table

View Source
var KafkaConfig *sarama.Config

KafkaConfig global per process Sarama config

View Source
var Pipes map[string]constructor

Pipes is the list of registered pipes Plugins insert their constructors into this map

Functions

func CacheDestroy

func CacheDestroy()

CacheDestroy releases all resources associated with cached pipes

func DeleteKafkaOffsets

func DeleteKafkaOffsets(topic string, conn *sql.DB) error

DeleteKafkaOffsets deletes Kafka offsets of all partitions of specified topic

Types

type Consumer

type Consumer interface {
	Close() error
	//CloseOnFailure doesn't save offsets
	CloseOnFailure() error
	Message() chan interface{}
	Error() chan error
	FetchNext() (interface{}, error)
	//Allows to explicitly persists current consumer position
	SaveOffset() error

	//SetFormat allow to tell consumer the format of the file when there is no
	//header
	SetFormat(format string)
}

Consumer consumer interface for the pipe

type Header struct {
	Format    string
	Filters   []string `json:",omitempty"`
	Schema    []byte   `json:",omitempty"`
	Delimited bool     `json:",omitempty"`
	HMAC      string   `json:"HMAC-SHA256,omitempty"`
	IV        string   `json:"AES256-CFB-IV,omitempty"`
}

Header represent file metadata in the beginning of the file

type KafkaPipe

type KafkaPipe struct {
	// contains filtered or unexported fields
}

KafkaPipe is wrapper on top of Sarama library to produce/consume through kafka

  • after failure shutdown pipe guarantees to resent last batchSize messages,

meaning batchSize messages may be in flight, reading (batchSize+1)th message automatically acknowledges previous batch.

  • producer caches and sends maximum batchSize messages at once

func (*KafkaPipe) Close

func (p *KafkaPipe) Close() error

Close release resources associated with the pipe

func (*KafkaPipe) Config

func (p *KafkaPipe) Config() *config.PipeConfig

Config returns pipe configuration

func (*KafkaPipe) Init

func (p *KafkaPipe) Init() error

Init initializes Kafka pipe creating kafka_offsets table

func (*KafkaPipe) NewConsumer

func (p *KafkaPipe) NewConsumer(topic string) (Consumer, error)

NewConsumer registers a new kafka consumer

func (*KafkaPipe) NewProducer

func (p *KafkaPipe) NewProducer(topic string) (Producer, error)

NewProducer registers a new sync producer

func (*KafkaPipe) Type

func (p *KafkaPipe) Type() string

Type returns Pipe type as Kafka

type Pipe

type Pipe interface {
	NewConsumer(topic string) (Consumer, error)
	NewProducer(topic string) (Producer, error)
	Type() string
	Config() *config.PipeConfig
	Close() error
}

Pipe connects named producers and consumers

func CacheGet

func CacheGet(pipeType string, cfg *config.PipeConfig, db *sql.DB) (Pipe, error)

CacheGet returns an instance of pipe with specified config from cache or creates new one if it's not in the cache yet

func Create

func Create(pipeType string, cfg *config.PipeConfig, db *sql.DB) (Pipe, error)

Create is a pipe factory pctx is used to be able to cancel blocking calls inside pipe, like during shutdown

type Producer

type Producer interface {
	Push(data interface{}) error
	PushK(key string, data interface{}) error
	PushSchema(key string, data []byte) error
	//PushBatch queues the messages instead of sending immediately
	PushBatch(key string, data interface{}) error
	//PushCommit writes out all the messages queued by PushBatch
	PushBatchCommit() error
	Close() error
	CloseOnFailure() error

	SetFormat(format string)

	PartitionKey(source string, key string) string
}

Producer producer interface for pipe

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL