kafka

package
v0.73.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 14, 2022 License: Apache-2.0 Imports: 15 Imported by: 0

Documentation

Overview

Package kafka provides a client with included tracing capabilities.

Deprecated: The Kafka client package is superseded by the `github.com/beatlabs/patron/client/kafka/v2` package. Please refer to the documents and the examples for the usage.

This package is frozen and no new functionality will be added.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type AsyncProducer

type AsyncProducer struct {
	// contains filtered or unexported fields
}

AsyncProducer is an asynchronous Kafka producer.

func (*AsyncProducer) ActiveBrokers

func (p *AsyncProducer) ActiveBrokers() []string

ActiveBrokers returns a list of active brokers' addresses.

func (*AsyncProducer) Close

func (ap *AsyncProducer) Close() error

Close shuts down the producer and waits for any buffered messages to be flushed. You must call this function before a producer object passes out of scope, as it may otherwise leak memory.

func (*AsyncProducer) Send

func (ap *AsyncProducer) Send(ctx context.Context, msg *Message) error

Send a message to a topic, asynchronously. Producer errors are queued on the channel obtained during the AsyncProducer creation.

type Builder added in v0.41.0

type Builder struct {
	// contains filtered or unexported fields
}

Builder gathers all required and optional properties, in order to construct a Kafka AsyncProducer/SyncProducer.

func NewBuilder deprecated

func NewBuilder(brokers []string) *Builder

NewBuilder initiates the AsyncProducer/SyncProducer builder chain. The builder instantiates the component using default values for EncodeFunc and Content-Type header.

Deprecated: The Kafka client package is superseded by the `github.com/beatlabs/patron/client/kafka/v2` package. Please refer to the documents and the examples for the usage.

This package is frozen and no new functionality will be added.

func (*Builder) CreateAsync added in v0.41.0

func (ab *Builder) CreateAsync() (*AsyncProducer, <-chan error, error)

CreateAsync constructs the AsyncProducer component by applying the gathered properties.

func (*Builder) CreateSync added in v0.41.0

func (ab *Builder) CreateSync() (*SyncProducer, error)

CreateSync constructs the SyncProducer component by applying the gathered properties.

func (*Builder) WithEncoder added in v0.41.0

func (ab *Builder) WithEncoder(enc encoding.EncodeFunc, contentType string) *Builder

WithEncoder sets a specific encoder implementation and Content-Type string header; if no option is provided it defaults to json.

func (*Builder) WithRequiredAcksPolicy added in v0.41.0

func (ab *Builder) WithRequiredAcksPolicy(ack RequiredAcks) *Builder

WithRequiredAcksPolicy adjusts how many replica acknowledgements broker must see before responding.

func (*Builder) WithTimeout added in v0.41.0

func (ab *Builder) WithTimeout(dial time.Duration) *Builder

WithTimeout sets the dial timeout for the sync or async producer.

func (*Builder) WithVersion added in v0.41.0

func (ab *Builder) WithVersion(version string) *Builder

WithVersion sets the kafka versionfor the AsyncProducer/SyncProducer.

type Message

type Message struct {
	// contains filtered or unexported fields
}

Message abstraction of a Kafka message.

func NewMessage

func NewMessage(t string, b interface{}) *Message

NewMessage creates a new message.

func NewMessageWithKey

func NewMessageWithKey(t string, b interface{}, k string) (*Message, error)

NewMessageWithKey creates a new message with an associated key.

func (*Message) SetHeader added in v0.47.0

func (m *Message) SetHeader(key, value string)

SetHeader allows to set a message header. Multiple headers with the same key are supported. Headers are only set if Kafka is version 0.11+.

type Producer

type Producer interface {
	Send(ctx context.Context, msg *Message) error
	Close() error
}

Producer interface for Kafka.

type RequiredAcks

type RequiredAcks int16

RequiredAcks is used in Produce Requests to tell the broker how many replica acknowledgements it must see before responding.

const (
	// NoResponse doesn't send any response, the TCP ACK is all you get.
	NoResponse RequiredAcks = 0
	// WaitForLocal waits for only the local commit to succeed before responding.
	WaitForLocal RequiredAcks = 1
	// WaitForAll waits for all in-sync replicas to commit before responding.
	WaitForAll RequiredAcks = -1
)

type SyncProducer added in v0.41.0

type SyncProducer struct {
	// contains filtered or unexported fields
}

SyncProducer is a synchronous Kafka producer.

func (*SyncProducer) ActiveBrokers added in v0.41.0

func (p *SyncProducer) ActiveBrokers() []string

ActiveBrokers returns a list of active brokers' addresses.

func (*SyncProducer) Close added in v0.41.0

func (p *SyncProducer) Close() error

Close shuts down the producer and waits for any buffered messages to be flushed. You must call this function before a producer object passes out of scope, as it may otherwise leak memory.

func (*SyncProducer) Send added in v0.41.0

func (p *SyncProducer) Send(ctx context.Context, msg *Message) error

Send a message to a topic.

Directories

Path Synopsis
Package v2 provides a client with included tracing capabilities.
Package v2 provides a client with included tracing capabilities.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL