stream

package
v0.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 9, 2020 License: MIT Imports: 8 Imported by: 1

Documentation

Index

Constants

View Source
const AtsuUnsetGroupId = "atsu-unset-group-id"

AtsuUnsetGroupId is used by library callers, but also manually set as default below.

View Source
const DefaultFlushInterval = 100
View Source
const SessionTimeoutDefault = 6000 // ms

Variables

This section is empty.

Functions

This section is empty.

Types

type KafkaStreamConfig

type KafkaStreamConfig interface {
	NewProducer(km *kafka.ConfigMap) (*kafka.Producer, error)
	NewConsumer(km *kafka.ConfigMap) (*kafka.Consumer, error)
	SetDeliveryError(f func(*kafka.Message))
	SetTopic(topic string)
	SetPrefix(prefix string)
	SetBrokers(brokers string)
	SetFlags()
	ProducerDefaults() *kafka.ConfigMap
	Produce(topic *string, value []byte) error
	Flush(ms int) int
	FullTopic(t string) string
	ChannelProduce(topic *string, value []byte)
	GetConsumer() *kafka.Consumer
	GetProducer() *kafka.Producer
	GetBrokers() string
	GetPrefix() string
	Close() error
}

type StreamConfig

type StreamConfig struct {
	Brokers  string `json:"brokers" yaml:"brokers" default:"kafka-atsu-prod-01:9092,kafka-atsu-prod-02:9092,kafka-atsu-prod-03:9092"`
	Prefix   string `json:"prefix" yaml:"prefix" default:"atsu"` // defaults to atsu
	Topic    string `json:"topic" yaml:"topic" default:"unset"`  // defaults to unset
	Messages int    `json:"messages" yaml:"messages"`
	Bytes    int    `json:"bytes" yaml:"bytes"`

	Timeout  time.Duration `ignored:"true" json:"-"` // Must be set explicitly
	Interval time.Duration `ignored:"true" json:"-"` // Must be set explicitly

	Offset          string `default:"latest" json:"offset" yaml:"offset"`
	GroupId         string `default:"atsu-unset-group-id" split_words:"true" json:"group_id" yaml:"group_id"`
	Glob            bool   `default:"false" json:"glob" yaml:"glob"`
	DeliveryReports bool   `default:"false" json:"reports" yaml:"reports"`

	Codec string `default:"none" json:"codec" yaml:"codec"`
	// contains filtered or unexported fields
}

StreamConfig provides Kafka-related configuration

func (StreamConfig) ChannelProduce

func (sc StreamConfig) ChannelProduce(topic *string, value []byte)

func (*StreamConfig) Close

func (sc *StreamConfig) Close() error

func (*StreamConfig) Consume

func (sc *StreamConfig) Consume(consumer StreamConsumer, config interface{}) error

func (StreamConfig) Flush

func (sc StreamConfig) Flush(ms int) int

func (StreamConfig) FullTopic

func (sc StreamConfig) FullTopic(t string) string

FullTopic returns prefix.topic (XXX don't like this naming yet) if t == "" the sc.Topic will be used

func (StreamConfig) GetBrokers

func (sc StreamConfig) GetBrokers() string

func (StreamConfig) GetConsumer

func (sc StreamConfig) GetConsumer() *kafka.Consumer

/ XXX Temporary functions to allow more advanced usage

func (StreamConfig) GetPrefix

func (sc StreamConfig) GetPrefix() string

func (StreamConfig) GetProducer

func (sc StreamConfig) GetProducer() *kafka.Producer

func (*StreamConfig) NewConsumer

func (sc *StreamConfig) NewConsumer(km *kafka.ConfigMap) (*kafka.Consumer, error)

NewConsumer() creates a new Kafka consumer and subscribes to the underlying topic

func (*StreamConfig) NewProducer

func (sc *StreamConfig) NewProducer(km *kafka.ConfigMap) (*kafka.Producer, error)

NewProducer() creates a new Kafka producer

func (*StreamConfig) Produce

func (sc *StreamConfig) Produce(topic *string, value []byte) error

func (StreamConfig) ProducerDefaults

func (sc StreamConfig) ProducerDefaults() *kafka.ConfigMap

producerDefaults returns a *kafka.ConfigMap with sane defaults

func (*StreamConfig) SetBrokers

func (sc *StreamConfig) SetBrokers(brokers string)

func (*StreamConfig) SetDeliveryError

func (sc *StreamConfig) SetDeliveryError(f func(*kafka.Message))

func (*StreamConfig) SetFlags

func (sc *StreamConfig) SetFlags()

SetFlags to install various command-line flag(s)

func (*StreamConfig) SetPrefix

func (sc *StreamConfig) SetPrefix(prefix string)

func (*StreamConfig) SetTopic

func (sc *StreamConfig) SetTopic(topic string)

func (StreamConfig) String

func (sc StreamConfig) String() string

String returns JSON representation

type StreamConsumer

type StreamConsumer interface {
	Start(*StreamConfig, interface{}) error
	Message(*kafka.Message) error // error != nil, stop consumer
	Interval(time.Time) error     // error != nil, stop consumer
	Timeout(time.Time, bool) bool // bool != false, stop consumer
	Error(kafka.Error) bool       // bool != false, stop consumer
	Process() (bool, error)
	Finish() error // This value will be returned to the caller

	DoneCh() <-chan bool
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL