kafka

package module
v1.2.84 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 17, 2024 License: Apache-2.0 Imports: 18 Imported by: 0

README

Kafka

Install

go get github.com/aacfactory/fns-contrib/message-queues/kafka

Usage

app.Deploy(kafka.New(kafka.WithConsumeHandler(name, handleFn)))

Config

kafka:
  brokers: 
    - "192.168.0.1:9092"
  producers:
    enable: true
    num: 4
    partitioner:
      name: "round_robin"
  consumers:
    nameA:
      group: "groupId"
      topics:
        - "topicA"

As proxy

publishErr := kafka.Publish(ctx, kafka.NewMessage(topic, key, body))

Documentation

Index

Constants

View Source
const (
	UnknownTimestamp = iota - 1
	DefaultTimestamp
	AlternativeTimestamp
)

Variables

This section is empty.

Functions

func New added in v1.2.11

func New(options ...Option) services.Listenable

func Publish

func Publish(ctx context.Context, message ...ProducerMessage) (err error)

func PublishAsync added in v1.2.11

func PublishAsync(ctx context.Context, message ...ProducerMessage) (err error)

Types

type ConsumeErrorHandler added in v1.2.11

type ConsumeErrorHandler func(topic string, partition int32, cause error)

type ConsumeHandler added in v1.2.11

type ConsumeHandler func(ctx context.Context, value []byte, meta Meta) (err error)

ConsumeHandler value is uncompressed

type Consumer

type Consumer interface {
	Listen(ctx context.Context) (err error)
	Shutdown(ctx context.Context)
}

func NewGroupConsumer added in v1.2.11

func NewGroupConsumer(log logs.Logger, maxPollRecords int, partitionBuffer int, opts []kgo.Opt, handler ConsumeHandler, errorHandler ConsumeErrorHandler) (v Consumer, err error)

type ConsumerOptions added in v1.2.11

type ConsumerOptions struct {
	Log logs.Logger

	Config configs.ConsumerConfig
}

type GroupConsumer added in v1.2.11

type GroupConsumer struct {
	// contains filtered or unexported fields
}

func (*GroupConsumer) Listen added in v1.2.11

func (consumer *GroupConsumer) Listen(ctx context.Context) (err error)

func (*GroupConsumer) Shutdown added in v1.2.11

func (consumer *GroupConsumer) Shutdown(ctx context.Context)

type GroupPartitionConsumer added in v1.2.11

type GroupPartitionConsumer struct {
	// contains filtered or unexported fields
}

func (*GroupPartitionConsumer) Consume added in v1.2.11

func (consumer *GroupPartitionConsumer) Consume(ctx context.Context)
type Header struct {
	Key   string `json:"key" avro:"key"`
	Value []byte `json:"value" avro:"value"`
}

type Headers added in v1.2.11

type Headers []Header

func (Headers) ConvertToKafkaHeaders added in v1.2.11

func (headers Headers) ConvertToKafkaHeaders() (v []kgo.RecordHeader)

type Meta added in v1.2.11

type Meta struct {
	Topic           string
	Headers         Headers
	Key             []byte
	Partition       int32
	Offset          int64
	Time            time.Time
	ProducerEpoch   int16
	ProducerId      int64
	LeaderEpoch     int32
	TimestampType   TimestampType
	IsTransactional bool
	IsControl       bool
}

type Option added in v1.2.11

type Option func(options *Options)

func WithConsumeErrorHandler added in v1.2.11

func WithConsumeErrorHandler(handler ConsumeErrorHandler) Option

func WithConsumeHandler added in v1.2.11

func WithConsumeHandler(name string, handler ConsumeHandler) Option

type Options added in v1.2.11

type Options struct {
	// contains filtered or unexported fields
}

type Producer

type Producer struct {
	// contains filtered or unexported fields
}

func NewProducer added in v1.2.11

func NewProducer(log logs.Logger, num int, opts []kgo.Opt) (v *Producer, err error)

func (*Producer) Client added in v1.2.11

func (producer *Producer) Client() (v *kgo.Client)

func (*Producer) Publish

func (producer *Producer) Publish(ctx context.Context, messages []ProducerMessage, async bool) (err error)

func (*Producer) Shutdown added in v1.2.11

func (producer *Producer) Shutdown(_ context.Context)

type ProducerMessage added in v1.2.11

type ProducerMessage struct {
	Topic   string  `json:"topic" avro:"topic"`
	Key     []byte  `json:"key" avro:"key"`
	Body    []byte  `json:"body" avro:"body"`
	Headers Headers `json:"headers" avro:"headers"`
}

func NewMessage added in v1.2.11

func NewMessage(topic string, key []byte, body []byte) ProducerMessage

func (ProducerMessage) AddHeader added in v1.2.11

func (msg ProducerMessage) AddHeader(key string, value []byte) ProducerMessage

func (ProducerMessage) Validate added in v1.2.11

func (msg ProducerMessage) Validate() (err error)

type TimestampType added in v1.2.11

type TimestampType int8

type TopicPartitionKey added in v1.2.11

type TopicPartitionKey struct {
	Topic     string
	Partition int32
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL