kfk

package module
v2.1.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 3, 2020 License: Apache-2.0 Imports: 8 Imported by: 0

README

Kfk

Sarama wrapper to produce and consume messages.

Messages are enriched with message type, which is used in the consumer to offer this information to the message handlers

Message handlers implement messageHandler interface

Check kafka_test.go for more information and examples of use

Authors

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func TopicFromContext added in v2.1.2

func TopicFromContext(ctx context.Context) (string, bool)

Types

type ConsumerCfgOption

type ConsumerCfgOption func(config *sarama.Config)

func FromNewest

func FromNewest() ConsumerCfgOption

type FallbackFunc

type FallbackFunc func(context.Context, []byte) error

type KafkaConsumer

type KafkaConsumer struct {
	// contains filtered or unexported fields
}

func NewKafkaConsumer

func NewKafkaConsumer(
	kafkaBrokers []string,
	consumerGroupID string,
	topics []string,
	cfgOptions ...ConsumerCfgOption,
) (*KafkaConsumer, error)

func (*KafkaConsumer) AddFallback

func (c *KafkaConsumer) AddFallback(fn FallbackFunc)

func (*KafkaConsumer) AddHandler

func (c *KafkaConsumer) AddHandler(messageType string, handler MessageHandler)

func (*KafkaConsumer) HealthCheck added in v2.1.1

func (c *KafkaConsumer) HealthCheck(_ context.Context) bool

func (*KafkaConsumer) Start

func (c *KafkaConsumer) Start(ctx context.Context) error

type KafkaProducer

type KafkaProducer struct {
	// contains filtered or unexported fields
}

Producer.

func NewKafkaProducer

func NewKafkaProducer(kafkaBrokers []string) (*KafkaProducer, error)

NewKafkaProducer.

func (KafkaProducer) HealthCheck added in v2.1.1

func (p KafkaProducer) HealthCheck(_ context.Context) bool

func (*KafkaProducer) Send

func (p *KafkaProducer) Send(topic, key string, message interface{}) error

Send a message to a topic to be scattered using the key.

type Marshaller added in v2.1.0

type Marshaller interface {
	MarshalKFK() ([]byte, error)
}

Marshaller is an interface to serialize messages to kfkTopics.

type MessageHandler

type MessageHandler struct {
	// contains filtered or unexported fields
}

func NewHandler

func NewHandler(handlerFunc interface{}) MessageHandler

func (MessageHandler) Handle

func (m MessageHandler) Handle(ctx context.Context, msg []byte) error

type Unmarshaller added in v2.1.0

type Unmarshaller interface {
	UnmarshallKFK([]byte) error
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL