kafka

package
v0.5.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 17, 2023 License: Apache-2.0 Imports: 25 Imported by: 0

Documentation

Index

Constants

View Source
const (
	RequireNone = kafka.RequireNone
	RequireOne  = kafka.RequireOne
	RequireAll  = kafka.RequireAll
)

Variables

This section is empty.

Functions

func HeadersFromContext

func HeadersFromContext(ctx context.Context) ([]kafka.Header, bool)

func NewHeadersContext

func NewHeadersContext(ctx context.Context, headers ...kafka.Header) context.Context

func WithSyncCall added in v0.2.0

func WithSyncCall() queue.CallOptions

Types

type Balancer added in v0.1.3

type Balancer = kafka.Balancer

type BalancerFunc added in v0.1.3

type BalancerFunc = kafka.BalancerFunc

type CRC32Balancer added in v0.1.3

type CRC32Balancer = kafka.CRC32Balancer

type Conf

type Conf struct {
	Brokers    []string
	Group      string
	Topic      string
	Offset     string `json:",options=first|last,default=last"`
	Conns      int    `json:",default=1"`
	Consumers  int    `json:",default=8"`
	Processors int    `json:",default=8"`
	MinBytes   int    `json:",default=10240"`    // 10K
	MaxBytes   int    `json:",default=10485760"` // 10M
	Username   string `json:",optional"`
	Password   string `json:",optional"`
}

type Hash added in v0.1.3

type Hash = kafka.Hash

type Headers added in v0.0.11

type Headers struct {
	// contains filtered or unexported fields
}

func (*Headers) Get added in v0.0.11

func (h *Headers) Get(key string) string

func (*Headers) Keys added in v0.0.11

func (h *Headers) Keys() []string

func (*Headers) Set added in v0.0.11

func (h *Headers) Set(key string, value string)

type LeastBytes added in v0.1.3

type LeastBytes = kafka.LeastBytes

type Murmur2Balancer added in v0.1.3

type Murmur2Balancer = kafka.Murmur2Balancer

type PushOption

type PushOption func(options *pushOptions)

func WithAuth added in v0.5.0

func WithAuth(username, password string) PushOption

func WithBalancer added in v0.1.3

func WithBalancer(balancer Balancer) PushOption

func WithBatchBytes added in v0.1.6

func WithBatchBytes(batchBytes int64) PushOption

func WithBatchSize added in v0.1.6

func WithBatchSize(batchSize int) PushOption

func WithChunkSize

func WithChunkSize(chunkSize int) PushOption

func WithCompletion added in v0.0.12

func WithCompletion(completion func(messages []kafka.Message, err error)) PushOption

func WithDisableAutoTopicCreation added in v0.1.5

func WithDisableAutoTopicCreation() PushOption

func WithFlushInterval

func WithFlushInterval(flushInterval time.Duration) PushOption

func WithRequiredAcks added in v0.1.6

func WithRequiredAcks(requiredAcks RequiredAcks) PushOption

type Pusher

type Pusher struct {
	// contains filtered or unexported fields
}

func NewPusher

func NewPusher(addrs []string, topic string, opts ...PushOption) *Pusher

func (*Pusher) Close

func (p *Pusher) Close() error

func (*Pusher) Name

func (p *Pusher) Name() string

func (*Pusher) Push

func (p *Pusher) Push(ctx context.Context, k, v []byte, opts ...queue.CallOptions) (
	interface{}, error)

func (*Pusher) Start added in v0.1.4

func (p *Pusher) Start()

func (*Pusher) Stop added in v0.1.4

func (p *Pusher) Stop()

type QueueOption

type QueueOption func(*queueOptions)

func WithCommitInterval

func WithCommitInterval(interval time.Duration) QueueOption

func WithMaxWait

func WithMaxWait(wait time.Duration) QueueOption

func WithMetrics

func WithMetrics(metrics *stat.Metrics) QueueOption

func WithQueueCapacity

func WithQueueCapacity(queueCapacity int) QueueOption

type Queues

type Queues struct {
	// contains filtered or unexported fields
}

func MustNewQueue

func MustNewQueue(c Conf, handler queue.Consumer, opts ...QueueOption) *Queues

func NewQueue

func NewQueue(c Conf, handler queue.Consumer, opts ...QueueOption) (*Queues, error)

func (Queues) Start

func (q Queues) Start()

func (Queues) Stop

func (q Queues) Stop()

type ReferenceHash added in v0.1.3

type ReferenceHash = kafka.ReferenceHash

type RequiredAcks added in v0.1.6

type RequiredAcks = kafka.RequiredAcks

type RoundRobin added in v0.1.3

type RoundRobin = kafka.RoundRobin

type XXHashBalancer added in v0.1.4

type XXHashBalancer struct {
	// contains filtered or unexported fields
}

func (*XXHashBalancer) Balance added in v0.1.4

func (x *XXHashBalancer) Balance(msg kafka.Message, partitions ...int) (partition int)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL