libkafka

package module
v0.0.30 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 18, 2022 License: BSD-3-Clause Imports: 4 Imported by: 4

README

Package libkafka is a low level golang library for producing to and consuming from Kafka 1.0+. It has no external dependencies. It is not modeled on the Java client. All API calls are synchronous and all code executes in the calling goroutine.

Project Scope

The library focuses on non transactional production and consumption. It implements single partition Producer and Consumer. Multi partition producers and consumers are built on top of this library (example: https://github.com/mkocikowski/kafkaclient).

Development status / "roadmap"

As of 2020-04-30 focus in on producer code. Consumer code has been validated (I have a working multi-partition consumer with sticky consumption coordinated over group membership protocol built on top of kafkaclient library) but that was just to make sure there were no design blind alleys. Next steps will be partitioned production and mtls. Consumer work will come after that.

Get Started

Read the documentation for the "batch" and "client" packages. Everything is in godoc.

Design Decisions

  1. Focus on record batches. Kafka protocol Produce and Fetch API calls operate on sets of record batches. Record batch is the unit at which messages are produced and fetched. It also is the unit at which data is partitioned and compressed. In libkafka producers and consumers operate on batches of records. Building and parsing of record batches is separate from Producing and Fetching. Record batch compression and decompression implementations are provided by the library user.
  2. Synchronous single-partition calls. Kafka wire protocol is asynchronous: on a single connection there can be multiple requests awaiting response from the Kafka broker. In addition, many API calls (such as Produce and Fetch) can combine data for multiple topics and partitions in a single call. Libkafka maintains a separate connection for every topic-partition and calls on that connection are synchronous, and each call is for only one topic-partition. That makes call handling (and failure) logic simpler.
  3. Wide use of reflection. All API calls (requests and responses) are defined as structs and marshaled using reflection. This is not a performance problem, because API calls are not frequent. Marshaling and unmarshaling of individual records within record batches (which has big performance impact) is done without using reflection.
  4. Limited use of data hiding. The library is not intended to be child proof. Most internal structures are exposed to make debugging and metrics collection easier.

Documentation

Overview

Package libkafka is a low level golang library for producing to and consuming from Kafka 1.0+. It has no external dependencies. It is not modeled on the Java client. All API calls are synchronous and all code executes in the calling goroutine.

Project Scope

The library focuses on non transactional production and consumption. It implements single partition Producer and Consumer. Multi partition producers and consumers are built on top of this library (example: https://github.com/mkocikowski/kafkaclient).

Get Started

Read the documentation for the "batch" and "client" packages.

Design Decisions

1. Focus on record batches. Kafka protocol Produce and Fetch API calls operate on sets of record batches. Record batch is the unit at which messages are produced and fetched. It also is the unit at which data is partitioned and compressed. In libkafka producers and consumers operate on batches of records. Building and parsing of record batches is separate from Producing and Fetching. Record batch compression and decompression implementations are provided by the library user.

2. Synchronous single-partition calls. Kafka wire protocol is asynchronous: on a single connection there can be multiple requests awaiting response from the Kafka broker. In addition, many API calls (such as Produce and Fetch) can combine data for multiple topics and partitions in a single call. Libkafka maintains a separate connection for every topic-partition and calls on that connection are synchronous, and each call is for only one topic-partition. That makes call handling (and failure) logic simpler.

3. Wide use of reflection. All API calls (requests and responses) are defined as structs and marshaled using reflection. This is not a performance problem, because API calls are not frequent. Marshaling and unmarshaling of individual records within record batches (which has big performance impact) is done without using reflection.

4. Limited use of data hiding. The library is not intended to be child proof. Most internal structures are exposed to make debugging and metrics collection easier.

Index

Constants

View Source
const (
	ERR_UNKNOWN_SERVER_ERROR                  = -1
	ERR_NONE                                  = 0
	ERR_OFFSET_OUT_OF_RANGE                   = 1
	ERR_CORRUPT_MESSAGE                       = 2 // retriable: True
	ERR_UNKNOWN_TOPIC_OR_PARTITION            = 3 // retriable: True
	ERR_INVALID_FETCH_SIZE                    = 4
	ERR_LEADER_NOT_AVAILABLE                  = 5 // retriable: True
	ERR_NOT_LEADER_FOR_PARTITION              = 6 // retriable: True
	ERR_REQUEST_TIMED_OUT                     = 7 // retriable: True
	ERR_BROKER_NOT_AVAILABLE                  = 8
	ERR_REPLICA_NOT_AVAILABLE                 = 9
	ERR_MESSAGE_TOO_LARGE                     = 10
	ERR_STALE_CONTROLLER_EPOCH                = 11
	ERR_OFFSET_METADATA_TOO_LARGE             = 12
	ERR_NETWORK_EXCEPTION                     = 13 // retriable: True
	ERR_COORDINATOR_LOAD_IN_PROGRESS          = 14 // retriable: True
	ERR_COORDINATOR_NOT_AVAILABLE             = 15 // retriable: True
	ERR_NOT_COORDINATOR                       = 16 // retriable: True
	ERR_INVALID_TOPIC_EXCEPTION               = 17
	ERR_RECORD_LIST_TOO_LARGE                 = 18
	ERR_NOT_ENOUGH_REPLICAS                   = 19 // retriable: True
	ERR_NOT_ENOUGH_REPLICAS_AFTER_APPEND      = 20 // retriable: True
	ERR_INVALID_REQUIRED_ACKS                 = 21
	ERR_ILLEGAL_GENERATION                    = 22
	ERR_INCONSISTENT_GROUP_PROTOCOL           = 23
	ERR_INVALID_GROUP_ID                      = 24
	ERR_UNKNOWN_MEMBER_ID                     = 25
	ERR_INVALID_SESSION_TIMEOUT               = 26
	ERR_REBALANCE_IN_PROGRESS                 = 27
	ERR_INVALID_COMMIT_OFFSET_SIZE            = 28
	ERR_TOPIC_AUTHORIZATION_FAILED            = 29
	ERR_GROUP_AUTHORIZATION_FAILED            = 30
	ERR_CLUSTER_AUTHORIZATION_FAILED          = 31
	ERR_INVALID_TIMESTAMP                     = 32
	ERR_UNSUPPORTED_SASL_MECHANISM            = 33
	ERR_ILLEGAL_SASL_STATE                    = 34
	ERR_UNSUPPORTED_VERSION                   = 35
	ERR_TOPIC_ALREADY_EXISTS                  = 36
	ERR_INVALID_PARTITIONS                    = 37
	ERR_INVALID_REPLICATION_FACTOR            = 38
	ERR_INVALID_REPLICA_ASSIGNMENT            = 39
	ERR_INVALID_CONFIG                        = 40
	ERR_NOT_CONTROLLER                        = 41 // retriable: True
	ERR_INVALID_REQUEST                       = 42
	ERR_UNSUPPORTED_FOR_MESSAGE_FORMAT        = 43
	ERR_POLICY_VIOLATION                      = 44
	ERR_OUT_OF_ORDER_SEQUENCE_NUMBER          = 45
	ERR_DUPLICATE_SEQUENCE_NUMBER             = 46
	ERR_INVALID_PRODUCER_EPOCH                = 47
	ERR_INVALID_TXN_STATE                     = 48
	ERR_INVALID_PRODUCER_ID_MAPPING           = 49
	ERR_INVALID_TRANSACTION_TIMEOUT           = 50
	ERR_CONCURRENT_TRANSACTIONS               = 51
	ERR_TRANSACTION_COORDINATOR_FENCED        = 52
	ERR_TRANSACTIONAL_ID_AUTHORIZATION_FAILED = 53
	ERR_SECURITY_DISABLED                     = 54
	ERR_OPERATION_NOT_ATTEMPTED               = 55
	ERR_KAFKA_STORAGE_ERROR                   = 56 // retriable: True
	ERR_LOG_DIR_NOT_FOUND                     = 57
	ERR_SASL_AUTHENTICATION_FAILED            = 58
	ERR_UNKNOWN_PRODUCER_ID                   = 59
	ERR_REASSIGNMENT_IN_PROGRESS              = 60
	ERR_DELEGATION_TOKEN_AUTH_DISABLED        = 61
	ERR_DELEGATION_TOKEN_NOT_FOUND            = 62
	ERR_DELEGATION_TOKEN_OWNER_MISMATCH       = 63
	ERR_DELEGATION_TOKEN_REQUEST_NOT_ALLOWED  = 64
	ERR_DELEGATION_TOKEN_AUTHORIZATION_FAILED = 65
	ERR_DELEGATION_TOKEN_EXPIRED              = 66
	ERR_INVALID_PRINCIPAL_TYPE                = 67
	ERR_NON_EMPTY_GROUP                       = 68
	ERR_GROUP_ID_NOT_FOUND                    = 69
	ERR_FETCH_SESSION_ID_NOT_FOUND            = 70 // retriable: True
	ERR_INVALID_FETCH_SESSION_EPOCH           = 71 // retriable: True
	ERR_LISTENER_NOT_FOUND                    = 72 // retriable: True
	ERR_TOPIC_DELETION_DISABLED               = 73
	ERR_FENCED_LEADER_EPOCH                   = 74 // retriable: True
	ERR_UNKNOWN_LEADER_EPOCH                  = 75 // retriable: True
	ERR_UNSUPPORTED_COMPRESSION_TYPE          = 76
	ERR_STALE_BROKER_EPOCH                    = 77
	ERR_OFFSET_NOT_AVAILABLE                  = 78 // retriable: True
	ERR_MEMBER_ID_REQUIRED                    = 79
	ERR_PREFERRED_LEADER_NOT_AVAILABLE        = 80 // retriable: True
	ERR_GROUP_MAX_SIZE_REACHED                = 81
)

Variables

View Source
var (
	// DialTimeout value is used in net.DialTimeout calls to connect to
	// kafka brokers (partition leaders, group coordinators, bootstrap
	// hosts).
	DialTimeout = 5 * time.Second
	// RequestTimeout used for setting deadlines while communicating via
	// TCP. Any single api call (request-response) can not take longer than
	// RequestTimeout. Set it to zero to prevent setting connection
	// deadlines. MaxWaitTimeMs for fetch requests should not be greater
	// than RequestTimeout.
	RequestTimeout = 60 * time.Second
	// ConnectionTTL specifies the max time a partition-client connection
	// to a broker will stay open (connection will be closed and re-opened
	// on first request after the TTL). The TTL counts from the time
	// connection was opened, not when it was last used. Default value of 0
	// means "ignore this setting" (connections will stay open "forever").
	ConnectionTTL time.Duration = 0
)

Changing timeouts is not safe for concurrent use. If you want to change them, do it once, right at the beginning.

Functions

This section is empty.

Types

type Batch

type Batch = batch.Batch

type Compressor added in v0.0.5

type Compressor = batch.Compressor

type Decompressor added in v0.0.5

type Decompressor = batch.Decompressor

type Error added in v0.0.9

type Error struct {
	Code    int16
	Message string
}

func (Error) Error added in v0.0.9

func (e Error) Error() string

type Record

type Record = record.Record

func NewRecord

func NewRecord(key, value []byte) *Record

Directories

Path Synopsis
api
Package api defines Kafka protocol requests and responses.
Package api defines Kafka protocol requests and responses.
Package batch implements functions for building, marshaling, and unmarshaling Kafka record batches.
Package batch implements functions for building, marshaling, and unmarshaling Kafka record batches.
Package client has code for making api calls to brokers.
Package client has code for making api calls to brokers.
fetcher
Package fetcher implements a single partition Kafka fetcher.
Package fetcher implements a single partition Kafka fetcher.
producer
Package producer implements a single partition Kafka producer.
Package producer implements a single partition Kafka producer.
Package record implements functions for marshaling and unmarshaling individual Kafka records.
Package record implements functions for marshaling and unmarshaling individual Kafka records.
Package varint implements varint and ZigZag encoding and decoding.
Package varint implements varint and ZigZag encoding and decoding.
Package wire implements functions for marshaling and unmarshaling Kafka requests and responses.
Package wire implements functions for marshaling and unmarshaling Kafka requests and responses.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL