edatkafkago

package module
v0.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 3, 2021 License: MIT Imports: 6 Imported by: 0

README

#edat-kafka-go - Kafka for edat

Installation

go get -u github.com/stackus/edat-kafka-go

Usage Example

import (
    "github.com/stackus/edat-kafka-go"
    "github.com/stackus/edat/msg"
)

// Create a consumer and use it in a message subscriber
consumer := edatkafkago.NewConsumer(brokers, groupID)
subscriber := msg.NewSubscriber(consumer)

// Create a producer and use it in a message publisher
producer := edatkafkago.NewProducer(brokers)
publisher := msg.NewPublisher(producer)

Prerequisites

Go 1.15

Features

  • Message Consumer NewConsumer(brokers, groupID, ...options)
  • Message Producer NewProducer(brokers, ...options)

TODOs

  • Documentation
  • Tests, tests, and more tests

Contributing

Pull requests are welcome. For major changes, please open an issue first to discuss what you would like to change.

Please make sure to update tests as appropriate.

License

MIT

Documentation

Index

Constants

This section is empty.

Variables

View Source
var DefaultAckWait = time.Second * 30

DefaultAckWait is a time.Duration representing the maximum amount of time for a consumer to finish

View Source
var DefaultSerializer = KafkaGoSerializer{}

Functions

This section is empty.

Types

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer implements msg.Consumer

func NewConsumer

func NewConsumer(brokers []string, groupID string, options ...ConsumerOption) *Consumer

NewConsumer constructs a new instance of Consumer

func (*Consumer) Close

func (c *Consumer) Close(context.Context) error

func (*Consumer) Listen

func (c *Consumer) Listen(ctx context.Context, channel string, consumer msg.ReceiveMessageFunc) error

type ConsumerOption

type ConsumerOption func(*Consumer)

func WithConsumerAckWait

func WithConsumerAckWait(ackWait time.Duration) ConsumerOption

func WithConsumerDialer

func WithConsumerDialer(dialer *kafka.Dialer) ConsumerOption

func WithConsumerLogger

func WithConsumerLogger(logger log.Logger) ConsumerOption

func WithConsumerSerializer

func WithConsumerSerializer(serializer Serializer) ConsumerOption

type KafkaGoSerializer

type KafkaGoSerializer struct{}

func (KafkaGoSerializer) Deserialize

func (s KafkaGoSerializer) Deserialize(message kafka.Message) (msg.Message, error)

func (KafkaGoSerializer) Serialize

func (s KafkaGoSerializer) Serialize(message msg.Message) (kafka.Message, error)

type Producer

type Producer struct {
	// contains filtered or unexported fields
}

Producer implements msg.Producer

func NewProducer

func NewProducer(brokers []string, options ...ProducerOption) *Producer

NewProducer constructs a new instance of Producer

func (*Producer) Close

func (p *Producer) Close(context.Context) error

func (*Producer) Send

func (p *Producer) Send(ctx context.Context, channel string, message msg.Message) error

type ProducerOption

type ProducerOption func(*Producer)

func WithProducerLogger

func WithProducerLogger(logger log.Logger) ProducerOption

func WithProducerSerializer

func WithProducerSerializer(serializer Serializer) ProducerOption

func WithProducerTransport

func WithProducerTransport(transport *kafka.Transport) ProducerOption

type Serializer

type Serializer interface {
	Serialize(message msg.Message) (kafka.Message, error)
	Deserialize(message kafka.Message) (msg.Message, error)
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL