kafka

package
v4.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 22, 2020 License: MIT Imports: 7 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type ByteDecoder

type ByteDecoder struct{}

ByteDecoder represents a byte decoder.

func (ByteDecoder) Decode

func (d ByteDecoder) Decode(b []byte) (interface{}, error)

Decode transforms byte data to the desired type.

type ByteEncoder

type ByteEncoder struct{}

ByteEncoder represents a byte encoder.

func (ByteEncoder) Encode

func (e ByteEncoder) Encode(v interface{}) ([]byte, error)

Encode transforms the typed data to bytes.

type Decoder

type Decoder interface {
	// Decode transforms byte data to the desired type.
	Decode([]byte) (interface{}, error)
}

Decoder represents a Kafka data decoder.

type DecoderFunc

type DecoderFunc func(value []byte) (interface{}, error)

DecoderFunc is an adapter allowing to use a function as a decoder.

var NilDecoder DecoderFunc = func([]byte) (interface{}, error) { return nil, nil }

NilDecoder is a decoder that always returns a nil, no matter the input.

func (DecoderFunc) Decode

func (f DecoderFunc) Decode(value []byte) (interface{}, error)

Decode transforms byte data to the desired type.

type Encoder

type Encoder interface {
	// Encode transforms the typed data to bytes.
	Encode(interface{}) ([]byte, error)
}

Encoder represents a Kafka data encoder.

type EncoderFunc

type EncoderFunc func(interface{}) ([]byte, error)

EncoderFunc is an adapter allowing to use a function as an encoder.

func (EncoderFunc) Encode

func (f EncoderFunc) Encode(value interface{}) ([]byte, error)

Encode transforms the typed data to bytes.

type Metadata

type Metadata []*PartitionOffset

Metadata represents an the kafka topic metadata.

func (Metadata) Merge

func (m Metadata) Merge(v streams.Metadata, s streams.MetadataStrategy) streams.Metadata

Merge merges the contained metadata into the given the metadata.

func (Metadata) WithOrigin

func (m Metadata) WithOrigin(o streams.MetadataOrigin)

WithOrigin sets the MetadataOrigin on the metadata.

type PartitionOffset

type PartitionOffset struct {
	Origin streams.MetadataOrigin

	Topic     string
	Partition int32
	Offset    int64
}

PartitionOffset represents the position in the stream of a message.

type Sink

type Sink struct {
	// contains filtered or unexported fields
}

Sink represents a Kafka streams sink.

func NewSink

func NewSink(c *SinkConfig) (*Sink, error)

NewSink creates a new Kafka sink.

func (*Sink) Close

func (p *Sink) Close() error

Close closes the processor.

func (*Sink) Commit

func (p *Sink) Commit(ctx context.Context) error

Commit commits a processors batch.

func (*Sink) Process

func (p *Sink) Process(msg streams.Message) error

Process processes the stream record.

func (*Sink) WithPipe

func (p *Sink) WithPipe(pipe streams.Pipe)

WithPipe sets the pipe on the Processor.

type SinkConfig

type SinkConfig struct {
	sarama.Config

	Brokers []string
	Topic   string

	KeyEncoder   Encoder
	ValueEncoder Encoder

	BatchSize int
}

SinkConfig represents the configuration of a Sink.

func NewSinkConfig

func NewSinkConfig() *SinkConfig

NewSinkConfig creates a new SinkConfig.

func (*SinkConfig) Validate

func (c *SinkConfig) Validate() error

Validate checks a Config instance. It will return a sarama.ConfigurationError if the specified values don't make sense.

type Source

type Source struct {
	// contains filtered or unexported fields
}

Source represents a Kafka stream source.

func NewSource

func NewSource(c *SourceConfig) (*Source, error)

NewSource creates a new Kafka stream source.

func (*Source) Cleanup

func (s *Source) Cleanup(_ sarama.ConsumerGroupSession) error

Cleanup is ran once for a session, after the consumption ends.

func (*Source) Close

func (s *Source) Close() error

Close closes the Source.

func (*Source) Commit

func (s *Source) Commit(v interface{}) error

Commit marks the consumed records as processed.

func (*Source) Consume

func (s *Source) Consume() (streams.Message, error)

Consume gets the next record from the Source.

func (*Source) ConsumeClaim

ConsumeClaim consumes messages from a single partition of a topic.

func (*Source) Setup

func (s *Source) Setup(session sarama.ConsumerGroupSession) error

Setup is ran once for a new consumer session, before the consumption starts.

type SourceConfig

type SourceConfig struct {
	sarama.Config

	Brokers []string
	Topic   string
	GroupID string

	Ctx          context.Context
	KeyDecoder   Decoder
	ValueDecoder Decoder

	BufferSize       int
	ErrorsBufferSize int
}

SourceConfig represents the configuration for a Kafka stream source.

func NewSourceConfig

func NewSourceConfig() *SourceConfig

NewSourceConfig creates a new Kafka source configuration.

func (*SourceConfig) Validate

func (c *SourceConfig) Validate() error

Validate checks a Config instance. It will return a sarama.ConfigurationError if the specified values don't make sense.

type StringDecoder

type StringDecoder struct{}

StringDecoder represents a string decoder.

func (StringDecoder) Decode

func (d StringDecoder) Decode(b []byte) (interface{}, error)

Decode transforms byte data to a string.

type StringEncoder

type StringEncoder struct{}

StringEncoder represents a string encoder.

func (StringEncoder) Encode

func (e StringEncoder) Encode(v interface{}) ([]byte, error)

Encode transforms the string data to bytes.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL