kafka

package module
v1.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 1, 2023 License: MIT Imports: 1 Imported by: 0

README

simple kafka client wrap and cli tool

Requirements

  • Go Version >= 1.18

Usage

Get command line client

go install github.com/sko00o/kafka/cmd/kafka-cli@latest

Demo

docker compose up

Thanks

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Consumer

type Consumer interface {
	Run() error
	Stop()
	Receive() <-chan Message
}

type ConsumerConfig

type ConsumerConfig struct {
	WorkerCnt   uint32   `mapstructure:"worker_cnt"`
	Addresses   []string `mapstructure:"addresses"`
	Topics      []string `mapstructure:"topics"`
	GroupID     string   `mapstructure:"group_id"`
	StartOffset string   `mapstructure:"start_offset"`

	// sarama only
	Version      string `mapstructure:"version"`
	EnableErrors bool   `mapstructure:"enable_errors"`

	MinBytes         int           `mapstructure:"min_bytes"`
	MaxBytes         int           `mapstructure:"max_bytes"`
	CommitSync       bool          `mapstructure:"commit_sync"`
	CommitInterval   time.Duration `mapstructure:"commit_interval"`
	SessionTimeout   time.Duration `mapstructure:"session_timeout"`
	RebalanceTimeout time.Duration `mapstructure:"rebalance_timeout"`
}

type Message

type Message interface {
	Value() []byte
	Topic() string
	Partition() int32
	Offset() int64
}

type Producer

type Producer interface {
	Stop()
	Send(topic string, value []byte) error
	SendWithKey(topic string, key, value []byte) error
}

type ProducerConfig

type ProducerConfig struct {
	Addresses   []string `mapstructure:"addresses"`
	Async       bool     `mapstructure:"async"`
	Compression string   `mapstructure:"compression"`

	// sarama only
	Version           string        `mapstructure:"version"`
	EnableAsyncErrors bool          `mapstructure:"enable_async_errors"`
	BufferSize        int           `mapstructure:"buffer_size"`
	DialTimeout       time.Duration `mapstructure:"dial_timeout"`

	// kafka-go only
	Balancer           string `mapstructure:"balancer"`
	BalancerConsistent bool   `mapstructure:"balancer_consistent"`
	BatchQueueSize     int    `mapstructure:"batch_queue_size"`

	MaxAttempts  int           `mapstructure:"max_attempts"`
	RequiredAcks int           `mapstructure:"required_acks"`
	BatchSize    int           `mapstructure:"batch_size"`
	BatchBytes   int64         `mapstructure:"batch_bytes"`
	BatchTimeout time.Duration `mapstructure:"batch_timeout"`
	ReadTimeout  time.Duration `mapstructure:"read_timeout"`
	WriteTimeout time.Duration `mapstructure:"write_timeout"`

	SASL *struct {
		Mechanism string `mapstructure:"mechanism"`
		Username  string `mapstructure:"username"`
		Password  string `mapstructure:"password"`
	} `mapstructure:"sasl"`
}

Directories

Path Synopsis
cmd
consumer
producer

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL