polly

package
v0.8.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 15, 2024 License: Apache-2.0 Imports: 16 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func AsCloudEvent added in v0.3.0

func AsCloudEvent(message kafka.Message) (cloudevents.Event, error)

AsCloudEvent Helper function to unmarshal Kafka Message into a CloudEvent

func DumpMessage

func DumpMessage(_ context.Context, message kafka.Message)

DumpMessage simple handler function that can be used as HandleMessageFunc and simply dumps information about the received Kafka Message and the payload container therein

Types

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client represents a high level Kafka Consumer Client

func NewClient

func NewClient(options *Options) *Client

func NewClientFromEnv

func NewClientFromEnv() (*Client, error)

NewClientFromEnv delegated to NewClient amd returns a properly configured and ready-to-use Client that invoked the callback function for every received messages using the default KafkaConsumerTopic Spec: See https://github.com/segmentio/kafka-go#reader-

func (*Client) Poll added in v0.3.0

func (c *Client) Poll(ctx context.Context, rc kafka.ReaderConfig, msgHandler HandleMessageFunc) error

Poll uses kafka-go Reader which automatically handles reconnections and offset management, and exposes an API that supports asynchronous cancellations and timeouts using Go contexts. See https://github.com/segmentio/kafka-go#reader- and this nice tutorial https://www.sohamkamani.com/golang/working-with-kafka/ doneChan chan<- struct{}

func (*Client) String added in v0.5.2

func (c *Client) String() string

String representation of the client instance

func (*Client) WaitForClose added in v0.3.0

func (c *Client) WaitForClose(ctx context.Context)

WaitForClose blocks until the Consumer WaitGroup counter is zero, or timeout is reached

type ErrorLoggerWrapper

type ErrorLoggerWrapper struct {
	// contains filtered or unexported fields
}

func (ErrorLoggerWrapper) Printf

func (l ErrorLoggerWrapper) Printf(format string, v ...interface{})

type HandleMessageFunc

type HandleMessageFunc func(ctx context.Context, message kafka.Message)

HandleMessageFunc consumer will pass received messages to a function that matches this type

type LoggerWrapper

type LoggerWrapper struct {
	// contains filtered or unexported fields
}

LoggerWrapper wraps zerolog logger so we can used it as logger in kafka-go ReaderConfig Example:

r := kafka.NewReader(kafka.ReaderConfig{
	Logger:      LoggerWrapper{delegate: k.logger},
})

func (LoggerWrapper) Printf

func (l LoggerWrapper) Printf(format string, v ...interface{})

type MessageReader

type MessageReader interface {
	ReadMessage(ctx context.Context) (kafka.Message, error)
	Close() error
}

MessageReader interface that makes it easy to mock the real kafka.Reader in Poll() for testing purposes

type Options

type Options struct {
	BootstrapServers string `required:"false" default:"localhost:9092" desc:"Kafka Bootstrap server(s)" split_words:"true"`
	// ProducerClientID   string `required:"false" default:"kafkaClient" desc:"Client Id for Message Producer" split_words:"true"`
	ConsumerAPIKey     string `required:"false" default:"" desc:"Kafka API Key Key for consumer (user)"  split_words:"true"`
	ConsumerAPISecret  string `required:"false" default:"" desc:"Kafka API Secret for consumer (password)" split_words:"true"`
	ConsumerGroupID    string `required:"false" default:"default" desc:"Used as default id for KafkaConsumerGroups" split_words:"true"`
	ConsumerMaxReceive int32  `required:"false" default:"-1" desc:"Max num of received messages, default -1 (unlimited), useful for dev" split_words:"true"`
	ConsumerStartLast  bool   `required:"false" default:"false" desc:"Whether to start consuming at the last offset (default: first)" split_words:"true"`
	Debug              bool   `default:"false" desc:"Debug mode, registers logger for kafka packages" split_words:"true"`
}

Options Kafka Context params populated by envconfig in NewClientFromEnv...()

func NewOptionsFromEnv

func NewOptionsFromEnv() (*Options, error)

NewOptionsFromEnv uses environment configuration with default prefix "kafka" to init Options

func NewOptionsFromEnvWithPrefix

func NewOptionsFromEnvWithPrefix(prefix string) (*Options, error)

NewOptionsFromEnvWithPrefix same as NewOptionsFromEnv but allows custom prefix

func (Options) StartOffset

func (o Options) StartOffset() int64

StartOffset provides the reader options depending on ConsumerStartLast (true == first, else last) LastOffset int64 = -1 // The most recent offset available for a partition. FirstOffset int64 = -2 // The least recent offset available for a partition.

func (Options) String added in v0.5.2

func (o Options) String() string

String returns a String representation of the object (but hides sensitive information)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL