kafka

package
v1.5.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 3, 2021 License: Apache-2.0 Imports: 13 Imported by: 1

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrMessageInvalidConsumer = errors.New(`[message] invalid consumer`)

ErrMessageInvalidConsumer error

Functions

This section is empty.

Types

type Option added in v1.0.0

type Option func(options *Options)

Option function type

func WithBrokers added in v1.1.0

func WithBrokers(brokers ...string) Option

WithBrokers overrides the list of brokers to connect

func WithClientID added in v1.0.0

func WithClientID(clientID string) Option

WithClientID value

func WithClusterConfig added in v1.0.0

func WithClusterConfig(clusterConfig *cluster.Config) Option

WithClusterConfig custom config

func WithCompression added in v1.0.0

func WithCompression(codec sarama.CompressionCodec, level int) Option

WithCompression of the pipe

func WithErrorHandler added in v1.0.0

func WithErrorHandler(h nc.ErrorHandler) Option

WithErrorHandler set handler of error processing

func WithFlashFrequency added in v1.0.0

func WithFlashFrequency(frequency time.Duration) Option

WithFlashFrequency of flushing

func WithFlashMessages added in v1.0.0

func WithFlashMessages(messageCount int) Option

WithFlashMessages minimal count

func WithGroupName added in v1.0.0

func WithGroupName(name string) Option

WithGroupName of the message consuming

func WithKafkaURL added in v1.1.0

func WithKafkaURL(urlString string) Option

WithKafkaURL is an Option to set the URL the client should connect to. The url can contain username/password semantics. e.g. kafka://derek:pass@localhost:4222/{groupName}?topics=topic1,topic2 Comma separated arrays are also supported, e.g. urlA, urlB.

func WithKafkaVersion added in v1.0.0

func WithKafkaVersion(version sarama.KafkaVersion) Option

WithKafkaVersion minimal version

func WithPanicHandler added in v1.0.0

func WithPanicHandler(h nc.PanicHandler) Option

WithPanicHandler set handler of panic processing

func WithPublisherErrorHandler added in v1.0.0

func WithPublisherErrorHandler(h PublisherErrorHandler) Option

WithPublisherErrorHandler set handler of the sarama errors

func WithPublisherSuccessHandler added in v1.0.0

func WithPublisherSuccessHandler(h PublisherSuccessHandler) Option

WithPublisherSuccessHandler set handler of the sarama success

func WithSaramaConfig added in v1.0.0

func WithSaramaConfig(streamConfig *sarama.Config) Option

WithSaramaConfig custom config

func WithSubscriberNotificationHandler added in v1.0.0

func WithSubscriberNotificationHandler(h SubscriberNotificationHandler) Option

WithSubscriberNotificationHandler set handler of the cluster group notifications

func WithTopics added in v1.1.0

func WithTopics(topics ...string) Option

WithTopics will set the list of topics for publishing or subscribing

type Options added in v1.0.0

type Options struct {
	ClusterConfig cluster.Config

	// IsSynchronous type of producer
	// TODO: make it work for sync publisher
	IsSynchronous bool

	// Brokers contains list of broker hosts with port
	Brokers []string

	// Name of the subscription group
	GroupName string

	// Names of topics for subscribing or publishing
	Topics []string

	// ErrorHandler of message processing
	ErrorHandler nc.ErrorHandler

	// PanicHandler process panic
	PanicHandler nc.PanicHandler

	// PublisherErrorHandler provides handler of message send errors
	PublisherErrorHandler PublisherErrorHandler

	// PublisherSuccessHandler provides handler of message send success
	PublisherSuccessHandler PublisherSuccessHandler

	// SubscriberNotificationHandler provides handler of received messages
	SubscriberNotificationHandler SubscriberNotificationHandler

	// Message encoder interface
	Encoder encoder.Encoder

	// Logger of subscriber
	Logger loggerInterface
}

Options for publisher or subscriber

type Publisher added in v1.0.0

type Publisher struct {
	// contains filtered or unexported fields
}

Publisher implementation of Publisher interface for the Kafka driver

func MustNewPublisher added in v1.0.0

func MustNewPublisher(ctx context.Context, options ...Option) *Publisher

MustNewPublisher connection or panic

func NewPublisher added in v1.0.0

func NewPublisher(ctx context.Context, options ...Option) (*Publisher, error)

NewPublisher to the kafka with some brokers and topics for sending

func (*Publisher) Close added in v1.0.0

func (p *Publisher) Close() error

Close kafka producer

func (*Publisher) Publish added in v1.0.0

func (p *Publisher) Publish(ctx context.Context, messages ...interface{}) (err error)

Publish one or more messages to the pub-service

type PublisherErrorHandler added in v1.0.0

type PublisherErrorHandler func(*sarama.ProducerError)

PublisherErrorHandler callback function

type PublisherSuccessHandler added in v1.0.0

type PublisherSuccessHandler func(*sarama.ProducerMessage)

PublisherSuccessHandler callback function

type Subscriber

type Subscriber struct {
	notificationcenter.ModelSubscriber
	// contains filtered or unexported fields
}

Subscriber for kafka

func NewSubscriber

func NewSubscriber(options ...Option) (*Subscriber, error)

NewSubscriber connection to kafka "group" from list of topics

func (*Subscriber) Close

func (s *Subscriber) Close() error

Close kafka consumer

func (*Subscriber) Listen

func (s *Subscriber) Listen(ctx context.Context) (err error)

Listen kafka consumer

type SubscriberNotificationHandler added in v1.0.0

type SubscriberNotificationHandler func(notification *cluster.Notification)

SubscriberNotificationHandler callback function

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL