client

package module
v0.0.0-...-1da1c1d Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 24, 2016 License: Apache-2.0 Imports: 18 Imported by: 5

README

kafka-client

GoDoc Build Status Go Report Card Coverage Status

Apache Kafka low level client in Golang

Installation:

  1. Install Golang http://golang.org/doc/install
  2. Make sure env variables GOPATH and GOROOT exist and point to correct places
  3. go get github.com/serejja/kafka-client
  4. go test -v to make sure it works

You may also want to spin up a local broker at localhost:9092 for the functional test to work as well (it will be skipped otherwise).

Documentation

Index

Constants

View Source
const EarliestTime int64 = -2

EarliestTime is a value used to request for the earliest available offset.

View Source
const InvalidOffset int64 = -1

InvalidOffset is a constant that is used to denote an invalid or uninitialized offset.

View Source
const LatestTime int64 = -1

LatestTime is a value used to request for the latest offset (i.e. the offset of the next coming message).

Variables

BrokerErrors are mappings between Kafka error codes and actual error messages.

View Source
var ErrBrokerNotAvailable = errors.New("Broker is likely not alive.")

ErrBrokerNotAvailable is a mapping for Kafka error code 8.

View Source
var ErrClusterAuthorizationFailed = errors.New("The client is not authorized to use an inter-broker or administrative API.")

ErrClusterAuthorizationFailed is a mapping for Kafka error code 31

View Source
var ErrConfigEmptyClientID = errors.New("ClientID cannot be empty.")

ErrConfigEmptyClientID happens when trying to create a new client with empty ClientID value.

View Source
var ErrConfigInvalidCommitOffsetBackoff = errors.New("CommitOffsetBackoff must be at least 1ms.")

ErrConfigInvalidCommitOffsetBackoff happens when trying to create a new client with too small CommitOffsetBackoff value.

View Source
var ErrConfigInvalidCommitOffsetRetries = errors.New("CommitOffsetRetries cannot be less than 0.")

ErrConfigInvalidCommitOffsetRetries happens when trying to create a new client with too small CommitOffsetRetries value.

View Source
var ErrConfigInvalidConnectTimeout = errors.New("ConnectTimeout must be at least 1ms.")

ErrConfigInvalidConnectTimeout happens when trying to create a new client with too small ConnectTimeout value.

View Source
var ErrConfigInvalidConsumerMetadataBackoff = errors.New("ConsumerMetadataBackoff must be at least 1ms.")

ErrConfigInvalidConsumerMetadataBackoff happens when trying to create a new client with too small ConsumerMetadataBackoff value.

View Source
var ErrConfigInvalidConsumerMetadataRetries = errors.New("ConsumerMetadataRetries cannot be less than 0.")

ErrConfigInvalidConsumerMetadataRetries happens when trying to create a new client with too small ConsumerMetadataRetries value.

View Source
var ErrConfigInvalidFetchSize = errors.New("FetchSize cannot be less than 1.")

ErrConfigInvalidFetchSize happens when trying to create a new client with too small FetchSize value.

View Source
var ErrConfigInvalidKeepAliveTimeout = errors.New("KeepAliveTimeout must be at least 1ms.")

ErrConfigInvalidKeepAliveTimeout happens when trying to create a new client with too small KeepAliveTimeout value.

View Source
var ErrConfigInvalidMetadataBackoff = errors.New("MetadataBackoff must be at least 1ms.")

ErrConfigInvalidMetadataBackoff happens when trying to create a new client with too small MetadataBackoff value.

View Source
var ErrConfigInvalidMetadataRetries = errors.New("MetadataRetries cannot be less than 0.")

ErrConfigInvalidMetadataRetries happens when trying to create a new client with too small MetadataRetries value.

View Source
var ErrConfigInvalidMetadataTTL = errors.New("MetadataTTL must be at least 1ms.")

ErrConfigInvalidMetadataTTL happens when trying to create a new client with too small MetadataTTL value.

View Source
var ErrConfigInvalidReadTimeout = errors.New("ReadTimeout must be at least 1ms.")

ErrConfigInvalidReadTimeout happens when trying to create a new client with too small ReadTimeout value.

View Source
var ErrConfigInvalidWriteTimeout = errors.New("WriteTimeout must be at least 1ms.")

ErrConfigInvalidWriteTimeout happens when trying to create a new client with too small WriteTimeout value.

View Source
var ErrConfigNoBrokers = errors.New("BrokerList must have at least one broker.")

ErrConfigNoBrokers happens when trying to create a new client without bootstrap brokers specified.

View Source
var ErrConsumerCoordinatorNotAvailableCode = errors.New("Offsets topic has not yet been created.")

ErrConsumerCoordinatorNotAvailableCode is a mapping for Kafka error code 15.

View Source
var ErrEOF = errors.New("End of file reached")

ErrEOF signals that an end of file or stream has been reached unexpectedly.

View Source
var ErrFailedToGetLeader = errors.New("Failed to get leader.")

ErrFailedToGetLeader happens when TopicMetadataResponse does not contain leader metadata for requested topic and partition.

View Source
var ErrFailedToGetMetadata = errors.New("Failed to get topic metadata.")

ErrFailedToGetMetadata happens when TopicMetadataResponse does not contain metadata for requested topic.

View Source
var ErrGroupAuthorizationFailed = errors.New("The client is not authorized to access a particular groupId.")

ErrGroupAuthorizationFailed is a mapping for Kafka error code 30

View Source
var ErrIllegalGeneration = errors.New("The generation id provided in the request is not the current generation.")

ErrIllegalGeneration is a mapping for Kafka error code 22

View Source
var ErrInconsistentGroupProtocol = errors.New("Provided protocol type or set of protocols is not compatible with the current group.")

ErrInconsistentGroupProtocol is a mapping for Kafka error code 23

View Source
var ErrInvalidCommitOffsetSize = errors.New("Offset commit was rejected because of oversize metadata.")

ErrInvalidCommitOffsetSize is a mapping for Kafka error code 28

View Source
var ErrInvalidGroupID = errors.New("The groupId is empty or null.")

ErrInvalidGroupID is a mapping for Kafka error code 24

View Source
var ErrInvalidMessage = errors.New("Message contents does not match its CRC")

ErrInvalidMessage is a mapping for Kafka error code 2.

View Source
var ErrInvalidMessageSize = errors.New("The message has a negative size")

ErrInvalidMessageSize is a mapping for Kafka error code 4.

View Source
var ErrInvalidRequiredAcks = errors.New("The requested requiredAcks is invalid.")

ErrInvalidRequiredAcks is a mapping for Kafka error code 21

View Source
var ErrInvalidSessionTimeout = errors.New("The requested session timeout is outside of the allowed range on the broker.")

ErrInvalidSessionTimeout is a mapping for Kafka error code 26

View Source
var ErrInvalidTopicCode = errors.New("Attempt to access an invalid topic.")

ErrInvalidTopicCode is a mapping for Kafka error code 17

View Source
var ErrLeaderNotAvailable = errors.New("In the middle of a leadership election and there is currently no leader for this partition and hence it is unavailable for writes.")

ErrLeaderNotAvailable is a mapping for Kafka error code 5.

View Source
var ErrMessageSizeTooLarge = errors.New("You've just attempted to produce a message of size larger than broker is allowed to accept.")

ErrMessageSizeTooLarge is a mapping for Kafka error code 10.

View Source
var ErrNoClientConfig = errors.New("ClientConfig cannot be nil.")

ErrNoClientConfig happens when trying to create a new client without a configuration.

View Source
var ErrNoDataToUncompress = errors.New("No data to uncompress")

ErrNoDataToUncompress happens when a compressed message is empty.

View Source
var ErrNoError = errors.New("No error - it worked!")

ErrNoError is a mapping for Kafka error code 0.

View Source
var ErrNotCoordinatorForConsumerCode = errors.New("There is no coordinator for this consumer.")

ErrNotCoordinatorForConsumerCode is a mapping for Kafka error code 16.

View Source
var ErrNotEnoughReplicas = errors.New("The number of in-sync replicas is lower than the configured minimum.")

ErrNotEnoughReplicas is a mapping for Kafka error code 19

View Source
var ErrNotEnoughReplicasAfterAppend = errors.New("The message was written to the log, but with fewer in-sync replicas than required.")

ErrNotEnoughReplicasAfterAppend is a mapping for Kafka error code 20

View Source
var ErrNotLeaderForPartition = errors.New("You've just attempted to send messages to a replica that is not the leader for some partition. It indicates that the clients metadata is out of date.")

ErrNotLeaderForPartition is a mapping for Kafka error code 6.

View Source
var ErrOffsetMetadataTooLargeCode = errors.New("You've jsut specified a string larger than configured maximum for offset metadata.")

ErrOffsetMetadataTooLargeCode is a mapping for Kafka error code 12.

View Source
var ErrOffsetOutOfRange = errors.New("The requested offset is outside the range of offsets maintained by the server for the given topic/partition.")

ErrOffsetOutOfRange is a mapping for Kafka error code 1.

View Source
var ErrOffsetsLoadInProgressCode = errors.New("Offset loading is in progress. (Usually happens after a leader change for that offsets topic partition).")

ErrOffsetsLoadInProgressCode is a mapping for Kafka error code 14.

View Source
var ErrRebalanceInProgress = errors.New("The coordinator has begun rebalancing the group. This indicates to the client that it should rejoin the group.")

ErrRebalanceInProgress is a mapping for Kafka error code 27

View Source
var ErrRecordListTooLarge = errors.New("Message batch exceeds the maximum configured segment size.")

ErrRecordListTooLarge is a mapping for Kafka error code 18

View Source
var ErrReplicaNotAvailable = errors.New("Replica is expected on a broker, but is not (this can be safely ignored).")

ErrReplicaNotAvailable is a mapping for Kafka error code 9.

View Source
var ErrRequestTimedOut = errors.New("Request exceeds the user-specified time limit in the request.")

ErrRequestTimedOut is a mapping for Kafka error code 7.

View Source
var ErrStaleControllerEpochCode = errors.New("Broker-to-broker communication fault.")

ErrStaleControllerEpochCode is a mapping for Kafka error code 11.

View Source
var ErrTopicAuthorizationFailed = errors.New("The client is not authorized to access the requested topic.")

ErrTopicAuthorizationFailed is a mapping for Kafka error code 29

View Source
var ErrUnknown = errors.New("An unexpected server error")

ErrUnknown is a mapping for Kafka error code -1.

View Source
var ErrUnknownMemberID = errors.New("The memberId is not in the current generation.")

ErrUnknownMemberID is a mapping for Kafka error code 25

View Source
var ErrUnknownTopicOrPartition = errors.New("This request is for a topic or partition that does not exist on this broker.")

ErrUnknownTopicOrPartition is a mapping for Kafka erfror code 3.

Functions

func ReadMessageSet

func ReadMessageSet(decoder Decoder) ([]*MessageAndOffset, *DecodingError)

ReadMessageSet decodes a nested message set if the MessageAndOffset is compressed.

Types

type BinaryDecoder

type BinaryDecoder struct {
	// contains filtered or unexported fields
}

BinaryDecoder implements Decoder and is able to decode a Kafka wire protocol message into actual data.

func NewBinaryDecoder

func NewBinaryDecoder(raw []byte) *BinaryDecoder

NewBinaryDecoder creates a new BinaryDecoder that will decode a given []byte.

func (*BinaryDecoder) GetBytes

func (bd *BinaryDecoder) GetBytes() ([]byte, error)

GetBytes gets a []byte from this decoder. Returns EOF if end of stream is reached.

func (*BinaryDecoder) GetInt16

func (bd *BinaryDecoder) GetInt16() (int16, error)

GetInt16 gets an int16 from this decoder. Returns EOF if end of stream is reached.

func (*BinaryDecoder) GetInt32

func (bd *BinaryDecoder) GetInt32() (int32, error)

GetInt32 gets an int32 from this decoder. Returns EOF if end of stream is reached.

func (*BinaryDecoder) GetInt64

func (bd *BinaryDecoder) GetInt64() (int64, error)

GetInt64 gets an int64 from this decoder. Returns EOF if end of stream is reached.

func (*BinaryDecoder) GetInt8

func (bd *BinaryDecoder) GetInt8() (int8, error)

GetInt8 gets an int8 from this decoder. Returns EOF if end of stream is reached.

func (*BinaryDecoder) GetString

func (bd *BinaryDecoder) GetString() (string, error)

GetString gets a string from this decoder. Returns EOF if end of stream is reached.

func (*BinaryDecoder) Remaining

func (bd *BinaryDecoder) Remaining() int

Remaining tells how many bytes left unread in this decoder.

type BinaryEncoder

type BinaryEncoder struct {
	// contains filtered or unexported fields
}

BinaryEncoder implements Decoder and is able to encode actual data into a Kafka wire protocol byte sequence.

func NewBinaryEncoder

func NewBinaryEncoder(buffer []byte) *BinaryEncoder

NewBinaryEncoder creates a new BinaryEncoder that will write into a given []byte.

func (*BinaryEncoder) Reserve

func (be *BinaryEncoder) Reserve(slice UpdatableSlice)

Reserve reserves a place for an updatable slice.

func (*BinaryEncoder) Reset

func (be *BinaryEncoder) Reset()

func (*BinaryEncoder) Size

func (be *BinaryEncoder) Size() int32

Size returns the size in bytes written to this encoder.

func (*BinaryEncoder) UpdateReserved

func (be *BinaryEncoder) UpdateReserved()

UpdateReserved tells the last reserved slice to be updated with new data.

func (*BinaryEncoder) WriteBytes

func (be *BinaryEncoder) WriteBytes(value []byte)

WriteBytes writes a []byte to this encoder.

func (*BinaryEncoder) WriteInt16

func (be *BinaryEncoder) WriteInt16(value int16)

WriteInt16 writes an int16 to this encoder.

func (*BinaryEncoder) WriteInt32

func (be *BinaryEncoder) WriteInt32(value int32)

WriteInt32 writes an int32 to this encoder.

func (*BinaryEncoder) WriteInt64

func (be *BinaryEncoder) WriteInt64(value int64)

WriteInt64 writes an int64 to this encoder.

func (*BinaryEncoder) WriteInt8

func (be *BinaryEncoder) WriteInt8(value int8)

WriteInt8 writes an int8 to this encoder.

func (*BinaryEncoder) WriteString

func (be *BinaryEncoder) WriteString(value string)

WriteString writes a string to this encoder.

type Broker

type Broker struct {
	ID   int32
	Host string
	Port int32
}

Broker contains information about a Kafka broker in cluster - its ID, host name and port.

func (*Broker) Read

func (n *Broker) Read(decoder Decoder) *DecodingError

func (*Broker) String

func (n *Broker) String() string

type BrokerConnection

type BrokerConnection struct {
	// contains filtered or unexported fields
}

BrokerConnection manages TCP connections to a single broker.

func NewBrokerConnection

func NewBrokerConnection(broker *Broker, keepAliveTimeout time.Duration) *BrokerConnection

NewBrokerConnection creates a new BrokerConnection to a given broker with a given TCP keep alive timeout.

func (*BrokerConnection) GetConnection

func (bc *BrokerConnection) GetConnection() (*net.TCPConn, error)

GetConnection either gets an existing connection from pool or creates a new one. May return an error if fails to open a new connection.

func (*BrokerConnection) ReleaseConnection

func (bc *BrokerConnection) ReleaseConnection(conn *net.TCPConn)

ReleaseConnection puts an existing connection back to pool to be reused later.

type Brokers

type Brokers struct {
	// contains filtered or unexported fields
}

Brokers provide information about Kafka cluster and expose convenience functions to simplify interaction with it.

func NewBrokers

func NewBrokers(keepAliveTimeout time.Duration) *Brokers

NewBrokers creates new Brokers with provided TCP keep alive timeout for all connection pools that will be created by this structure.

func (*Brokers) Add

func (b *Brokers) Add(broker *Broker)

Add adds a new Kafka broker infromation to this Brokers structure.

func (*Brokers) Get

func (b *Brokers) Get(id int32) *BrokerConnection

Get gets a BrokerConnection for a given broker ID.

func (*Brokers) GetAll

func (b *Brokers) GetAll() []*BrokerConnection

GetAll gets all BrokerConnections for this Brokers structure.

func (*Brokers) NextCorrelationID

func (b *Brokers) NextCorrelationID() int32

NextCorrelationID returns a next sequential request correlation ID.

func (*Brokers) Remove

func (b *Brokers) Remove(id int32)

Remove removes Kafka broker information from this Brokers structure.

func (*Brokers) Update

func (b *Brokers) Update(broker *Broker)

Update updates Kafka broker information in this Brokers structure.

type Client

type Client interface {
	// GetTopicMetadata is primarily used to discover leaders for given topics and how many partitions these topics have.
	// Passing it an empty topic list will retrieve metadata for all topics in a cluster.
	GetTopicMetadata(topics []string) (*MetadataResponse, error)

	// GetAvailableOffset issues an offset request to a specified topic and partition with a given offset time.
	// More on offset time here - https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetRequest
	GetAvailableOffset(topic string, partition int32, offsetTime int64) (int64, error)

	// Fetch issues a single fetch request to a broker responsible for a given topic and partition and returns a FetchResponse that contains messages starting from a given offset.
	Fetch(topic string, partition int32, offset int64) (*FetchResponse, error)

	// GetOffset gets the offset for a given group, topic and partition from Kafka. A part of new offset management API.
	GetOffset(group string, topic string, partition int32) (int64, error)

	// CommitOffset commits the offset for a given group, topic and partition to Kafka. A part of new offset management API.
	CommitOffset(group string, topic string, partition int32, offset int64) error

	GetLeader(topic string, partition int32) (*BrokerConnection, error)

	GetConsumerMetadata(group string) (*ConsumerMetadataResponse, error)

	// Metadata returns a structure that holds all topic and broker metadata.
	Metadata() *Metadata

	// Tells the Client to close all existing connections and stop.
	// This method is NOT blocking but returns a channel which will be closed once the closing is finished.
	Close() <-chan struct{}
}

Client is an interface that should provide ways to clearly interact with Kafka cluster and hide all broker management stuff from user.

type CompressionCodec

type CompressionCodec int

CompressionCodec is a compression codec id used to distinguish various compression types.

const (
	// CompressionNone is a compression codec id for uncompressed data.
	CompressionNone CompressionCodec = 0

	// CompressionGZIP is a compression codec id for GZIP compression.
	CompressionGZIP CompressionCodec = 1

	// CompressionSnappy is a compression codec id for Snappy compression.
	CompressionSnappy CompressionCodec = 2

	// CompressionLZ4 is a compression codec id for LZ4 compression.
	CompressionLZ4 CompressionCodec = 3
)

type Config

type Config struct {
	// BrokerList is a bootstrap list to discover other brokers in a cluster. At least one broker is required.
	BrokerList []string

	// ReadTimeout is a timeout to read the response from a TCP socket.
	ReadTimeout time.Duration

	// WriteTimeout is a timeout to write the request to a TCP socket.
	WriteTimeout time.Duration

	// ConnectTimeout is a timeout to connect to a TCP socket.
	ConnectTimeout time.Duration

	// Sets whether the connection should be kept alive.
	KeepAlive bool

	// A keep alive period for a TCP connection.
	KeepAliveTimeout time.Duration

	// Maximum fetch size in bytes which will be used in all Consume() calls.
	FetchSize int32

	// The minimum amount of data the server should return for a fetch request. If insufficient data is available the request will block
	FetchMinBytes int32

	// The maximum amount of time the server will block before answering the fetch request if there isn't sufficient data to immediately satisfy FetchMinBytes
	FetchMaxWaitTime int32

	// Number of retries to get topic metadata.
	MetadataRetries int

	// Backoff value between topic metadata requests.
	MetadataBackoff time.Duration

	// MetadataTTL is how long topic metadata is considered valid. Used to refresh metadata from time to time even if no leader changes occurred.
	MetadataTTL time.Duration

	// Number of retries to commit an offset.
	CommitOffsetRetries int

	// Backoff value between commit offset requests.
	CommitOffsetBackoff time.Duration

	// Number of retries to get consumer metadata.
	ConsumerMetadataRetries int

	// Backoff value between consumer metadata requests.
	ConsumerMetadataBackoff time.Duration

	// ClientID that will be used by a Client to identify client requests by broker.
	ClientID string
}

Config is used to pass multiple configuration values for a Client

func NewConfig

func NewConfig() *Config

NewConfig returns a new ClientConfig with sane defaults.

func (*Config) Validate

func (cc *Config) Validate() error

Validate validates this ClientConfig. Returns a corresponding error if the ClientConfig is invalid and nil otherwise.

type ConsumerMetadataRequest

type ConsumerMetadataRequest struct {
	Group string
}

ConsumerMetadataRequest is used to discover the current offset coordinator to issue its offset commit and fetch requests.

func NewConsumerMetadataRequest

func NewConsumerMetadataRequest(group string) *ConsumerMetadataRequest

NewConsumerMetadataRequest creates a new ConsumerMetadataRequest for a given consumer group.

func (*ConsumerMetadataRequest) Key

func (cmr *ConsumerMetadataRequest) Key() int16

Key returns the Kafka API key for ConsumerMetadataRequest.

func (*ConsumerMetadataRequest) Version

func (cmr *ConsumerMetadataRequest) Version() int16

Version returns the Kafka request version for backwards compatibility.

func (*ConsumerMetadataRequest) Write

func (cmr *ConsumerMetadataRequest) Write(encoder Encoder)

Write writes the ConsumerMetadataRequest to the given Encoder.

type ConsumerMetadataResponse

type ConsumerMetadataResponse struct {
	Error       error
	Coordinator *Broker
}

ConsumerMetadataResponse contains information about the current offset coordinator and error if it occurred.

func (*ConsumerMetadataResponse) Read

func (cmr *ConsumerMetadataResponse) Read(decoder Decoder) *DecodingError

type CorrelationIDGenerator

type CorrelationIDGenerator struct {
	// contains filtered or unexported fields
}

CorrelationIDGenerator is a simple structure that provides thread-safe correlation ID generation.

func (*CorrelationIDGenerator) NextCorrelationID

func (c *CorrelationIDGenerator) NextCorrelationID() int32

NextCorrelationID returns a next sequential request correlation ID.

type CrcSlice

type CrcSlice struct {
	// contains filtered or unexported fields
}

CrcSlice is used to calculate the CRC32 value of the message.

func (*CrcSlice) GetPosition

func (cs *CrcSlice) GetPosition() int

GetPosition gets the position within the encoder to be updated later.

func (*CrcSlice) GetReserveLength

func (cs *CrcSlice) GetReserveLength() int

GetReserveLength returns the length to reserve for this slice.

func (*CrcSlice) SetPosition

func (cs *CrcSlice) SetPosition(pos int)

SetPosition sets the current position within the encoder to be updated later.

func (*CrcSlice) Update

func (cs *CrcSlice) Update(slice []byte)

Update this slice. At this point all necessary data should be written to encoder.

type Decoder

type Decoder interface {
	// Gets an int8 from this decoder. Returns EOF if end of stream is reached.
	GetInt8() (int8, error)

	// Gets an int16 from this decoder. Returns EOF if end of stream is reached.
	GetInt16() (int16, error)

	// Gets an int32 from this decoder. Returns EOF if end of stream is reached.
	GetInt32() (int32, error)

	// Gets an int64 from this decoder. Returns EOF if end of stream is reached.
	GetInt64() (int64, error)

	// Gets a []byte from this decoder. Returns EOF if end of stream is reached.
	GetBytes() ([]byte, error)

	// Gets a string from this decoder. Returns EOF if end of stream is reached.
	GetString() (string, error)

	// Tells how many bytes left unread in this decoder.
	Remaining() int
}

Decoder is able to decode a Kafka wire protocol message into actual data.

type DecodingError

type DecodingError struct {
	// contains filtered or unexported fields
}

DecodingError is an error that also holds the information about why it happened.

func NewDecodingError

func NewDecodingError(err error, reason string) *DecodingError

NewDecodingError creates a new DecodingError with a given error message and reason.

func (*DecodingError) Error

func (de *DecodingError) Error() error

Error returns the error message for this DecodingError.

func (*DecodingError) Reason

func (de *DecodingError) Reason() string

Reason returns the reason for this DecodingError.

type DescribeGroupsRequest

type DescribeGroupsRequest struct {
	Groups []string
}

DescribeGroupsRequest is used to fetch consumer group state information from Kafka cluster.

func (*DescribeGroupsRequest) Key

Key returns the Kafka API key for DescribeGroupsRequest.

func (*DescribeGroupsRequest) Version

func (*DescribeGroupsRequest) Version() int16

Version returns the Kafka request version for backwards compatibility.

func (*DescribeGroupsRequest) Write

func (dgr *DescribeGroupsRequest) Write(encoder Encoder)

type DescribeGroupsResponse

type DescribeGroupsResponse struct {
	Groups []*GroupDescription
}

DescribeGroupsResponse is used to hold consumer group state information from Kafka cluster.

func (*DescribeGroupsResponse) Read

func (dgr *DescribeGroupsResponse) Read(decoder Decoder) *DecodingError

type Encoder

type Encoder interface {
	// Writes an int8 to this encoder.
	WriteInt8(int8)

	// Writes an int16 to this encoder.
	WriteInt16(int16)

	// Writes an int32 to this encoder.
	WriteInt32(int32)

	// Writes an int64 to this encoder.
	WriteInt64(int64)

	// Writes a []byte to this encoder.
	WriteBytes([]byte)

	// Writes a string to this encoder.
	WriteString(string)

	// Returns the size in bytes written to this encoder.
	Size() int32

	// Reserves a place for an updatable slice.
	// This is used as an optimization for length and crc fields.
	// The encoder reserves a place for this data and updates it later instead of pre-calculating it and doing redundant work.
	Reserve(UpdatableSlice)

	// Tells the last reserved slice to be updated with new data.
	UpdateReserved()
}

Encoder is able to encode actual data into a Kafka wire protocol byte sequence.

type FetchRequest

type FetchRequest struct {
	MaxWait     int32
	MinBytes    int32
	RequestInfo map[string][]*PartitionFetchInfo
}

FetchRequest is used to fetch a chunk of one or more logs for some topic-partitions.

func (*FetchRequest) AddFetch

func (fr *FetchRequest) AddFetch(topic string, partition int32, offset int64, fetchSize int32)

AddFetch is a convenience method to add a PartitionFetchInfo.

func (*FetchRequest) Key

func (fr *FetchRequest) Key() int16

Key returns the Kafka API key for FetchRequest.

func (*FetchRequest) Version

func (fr *FetchRequest) Version() int16

Version returns the Kafka request version for backwards compatibility.

func (*FetchRequest) Write

func (fr *FetchRequest) Write(encoder Encoder)

Write writes the FetchRequest to the given Encoder.

type FetchResponse

type FetchResponse struct {
	Data map[string]map[int32]*FetchResponsePartitionData
}

FetchResponse contains FetchResponseData for all requested topics and partitions.

func (*FetchResponse) CollectMessages

func (fr *FetchResponse) CollectMessages(collector func(topic string, partition int32, offset int64, key []byte, value []byte) error) error

CollectMessages traverses this FetchResponse and applies a collector function to each message giving the possibility to avoid response -> kafka-client.Message -> other.Message conversion if necessary.

func (*FetchResponse) Error

func (fr *FetchResponse) Error(topic string, partition int32) error

Error returns the error message for a given topic and pertion of this FetchResponse

func (*FetchResponse) GetMessages

func (fr *FetchResponse) GetMessages() ([]*MessageAndMetadata, error)

GetMessages traverses this FetchResponse and collects all messages. Returns an error if FetchResponse contains one. Messages should be ordered by offset.

func (*FetchResponse) Read

func (fr *FetchResponse) Read(decoder Decoder) *DecodingError

type FetchResponsePartitionData

type FetchResponsePartitionData struct {
	Error               error
	HighwaterMarkOffset int64
	Messages            []*MessageAndOffset
}

FetchResponsePartitionData contains fetched messages for a single partition, the offset at the end of the log for this partition and an error code.

func (*FetchResponsePartitionData) Read

func (frd *FetchResponsePartitionData) Read(decoder Decoder) *DecodingError

type GroupDescription

type GroupDescription struct {
	Error        error
	GroupID      string
	State        string
	ProtocolType string
	Protocol     string
	Members      []*GroupMemberDescription
}

GroupDescription holds information about a single consumer group in a Kafka cluster.

func (*GroupDescription) Read

func (gd *GroupDescription) Read(decoder Decoder) *DecodingError

type GroupMemberDescription

type GroupMemberDescription struct {
	MemberID         string
	ClientID         string
	ClientHost       string
	MemberMetadata   []byte
	MemberAssignment []byte
}

GroupMemberDescription holds information about a single consumer group member in a Kafka cluster.

func (*GroupMemberDescription) Read

func (gmd *GroupMemberDescription) Read(decoder Decoder) *DecodingError

type GroupProtocol

type GroupProtocol struct {
	ProtocolName     string
	ProtocolMetadata []byte
}

GroupProtocol carries additional protocol information for a ProtocolType in JoinGroupRequest.

type HeartbeatRequest

type HeartbeatRequest struct {
	GroupID      string
	GenerationID int32
	MemberID     string
}

HeartbeatRequest is used to keep a member alive in a group.

func (*HeartbeatRequest) Key

func (*HeartbeatRequest) Key() int16

Key returns the Kafka API key for HeartbeatRequest.

func (*HeartbeatRequest) Version

func (*HeartbeatRequest) Version() int16

Version returns the Kafka request version for backwards compatibility.

func (*HeartbeatRequest) Write

func (hr *HeartbeatRequest) Write(encoder Encoder)

type HeartbeatResponse

type HeartbeatResponse struct {
	Error error
}

HeartbeatResponse signals whether the sent HeartbeatRequest succeeded or not, and tells why if not.

func (*HeartbeatResponse) Read

func (hr *HeartbeatResponse) Read(decoder Decoder) *DecodingError

type JoinGroupRequest

type JoinGroupRequest struct {
	GroupID        string
	SessionTimeout int32
	MemberID       string
	ProtocolType   string
	GroupProtocols []*GroupProtocol
}

JoinGroupRequest is used to become a member of a group, creating it if there are no active members.

func (*JoinGroupRequest) Key

func (jgr *JoinGroupRequest) Key() int16

Key returns the Kafka API key for JoinGroupRequest.

func (*JoinGroupRequest) Version

func (jgr *JoinGroupRequest) Version() int16

Version returns the Kafka request version for backwards compatibility.

func (*JoinGroupRequest) Write

func (jgr *JoinGroupRequest) Write(encoder Encoder)

type JoinGroupResponse

type JoinGroupResponse struct {
	Error         error
	GenerationID  int32
	GroupProtocol string
	LeaderID      string
	MemberID      string
	Members       map[string][]byte
}

JoinGroupResponse kindly asks you to write a meaningful comment when you get a chance.

func (*JoinGroupResponse) Read

func (jgr *JoinGroupResponse) Read(decoder Decoder) *DecodingError

type KafkaClient

type KafkaClient struct {
	// contains filtered or unexported fields
}

KafkaClient is a default (and only one for now) Client implementation for kafka-client library.

func New

func New(config *Config) (*KafkaClient, error)

New creates a new KafkaClient with a given ClientConfig. May return an error if the passed config is invalid.

func (*KafkaClient) Close

func (c *KafkaClient) Close() <-chan struct{}

Close tells the Client to close all existing connections and stop. This method is NOT blocking but returns a channel which will be closed once the closing is finished.

func (*KafkaClient) CommitOffset

func (c *KafkaClient) CommitOffset(group string, topic string, partition int32, offset int64) error

CommitOffset commits the offset for a given group, topic and partition to Kafka. A part of new offset management API.

func (*KafkaClient) Fetch

func (c *KafkaClient) Fetch(topic string, partition int32, offset int64) (*FetchResponse, error)

Fetch issues a single fetch request to a broker responsible for a given topic and partition and returns a FetchResponse that contains messages starting from a given offset.

func (*KafkaClient) GetAvailableOffset

func (c *KafkaClient) GetAvailableOffset(topic string, partition int32, offsetTime int64) (int64, error)

GetAvailableOffset issues an offset request to a specified topic and partition with a given offset time.

func (*KafkaClient) GetConsumerMetadata

func (c *KafkaClient) GetConsumerMetadata(group string) (*ConsumerMetadataResponse, error)

GetConsumerMetadata returns a ConsumerMetadataResponse for a given consumer group. May return an error if fails to get consumer metadata for whatever reason within ConsumerMetadataRetries retries.

func (*KafkaClient) GetLeader

func (c *KafkaClient) GetLeader(topic string, partition int32) (*BrokerConnection, error)

GetLeader returns a leader broker for a given topic and partition. Returns an error if fails to get leader for whatever reason for MetadataRetries retries.

func (*KafkaClient) GetOffset

func (c *KafkaClient) GetOffset(group string, topic string, partition int32) (int64, error)

GetOffset gets the offset for a given group, topic and partition from Kafka. A part of new offset management API.

func (*KafkaClient) GetTopicMetadata

func (c *KafkaClient) GetTopicMetadata(topics []string) (*MetadataResponse, error)

GetTopicMetadata is primarily used to discover leaders for given topics and how many partitions these topics have. Passing it an empty topic list will retrieve metadata for all topics in a cluster.

func (*KafkaClient) Metadata

func (c *KafkaClient) Metadata() *Metadata

Metadata returns Metadata structure used by this Client.

type LeaveGroupRequest

type LeaveGroupRequest struct {
	GroupID  string
	MemberID string
}

LeaveGroupRequest is used to directly depart a group.

func (*LeaveGroupRequest) Key

func (*LeaveGroupRequest) Key() int16

Key returns the Kafka API key for LeaveGroupRequest.

func (*LeaveGroupRequest) Version

func (*LeaveGroupRequest) Version() int16

Version returns the Kafka request version for backwards compatibility.

func (*LeaveGroupRequest) Write

func (lgr *LeaveGroupRequest) Write(encoder Encoder)

type LeaveGroupResponse

type LeaveGroupResponse struct {
	Error error
}

LeaveGroupResponse contains whether the member successfully left a group and contains a failure reason if not.

func (*LeaveGroupResponse) Read

func (lgr *LeaveGroupResponse) Read(decoder Decoder) *DecodingError

type LengthSlice

type LengthSlice struct {
	// contains filtered or unexported fields
}

LengthSlice is used to determine the length of upcoming message.

func (*LengthSlice) GetPosition

func (ls *LengthSlice) GetPosition() int

GetPosition gets the position within the encoder to be updated later.

func (*LengthSlice) GetReserveLength

func (ls *LengthSlice) GetReserveLength() int

GetReserveLength returns the length to reserve for this slice.

func (*LengthSlice) SetPosition

func (ls *LengthSlice) SetPosition(pos int)

SetPosition sets the current position within the encoder to be updated later.

func (*LengthSlice) Update

func (ls *LengthSlice) Update(slice []byte)

Update this slice. At this point all necessary data should be written to encoder.

type ListGroupsRequest

type ListGroupsRequest struct{}

ListGroupsRequest is used to list the current groups managed by a broker

func (*ListGroupsRequest) Key

func (*ListGroupsRequest) Key() int16

Key returns the Kafka API key for ListGroupsRequest.

func (*ListGroupsRequest) Version

func (*ListGroupsRequest) Version() int16

Version returns the Kafka request version for backwards compatibility.

func (*ListGroupsRequest) Write

func (*ListGroupsRequest) Write(encoder Encoder)

type ListGroupsResponse

type ListGroupsResponse struct {
	Error  error
	Groups map[string]string
}

ListGroupsResponse lists the current groups managed by a broker and contains an error if it happened.

func (*ListGroupsResponse) Read

func (lgr *ListGroupsResponse) Read(decoder Decoder) *DecodingError

type Message

type Message struct {
	Crc        int32
	MagicByte  int8
	Attributes int8
	Key        []byte
	Value      []byte

	Nested []*MessageAndOffset
}

Message contains a single message and its metadata or a nested message set if compression is used.

func (*Message) Read

func (md *Message) Read(decoder Decoder) *DecodingError

func (*Message) Write

func (md *Message) Write(encoder Encoder)

TODO compress and write if needed

type MessageAndMetadata

type MessageAndMetadata struct {
	Topic     string
	Partition int32
	Offset    int64
	Key       []byte
	Value     []byte
}

MessageAndMetadata is a single message and its metadata.

type MessageAndOffset

type MessageAndOffset struct {
	Offset  int64
	Message *Message
}

MessageAndOffset is a single message or a message set (if it is compressed) with its offset value.

func (*MessageAndOffset) Read

func (mo *MessageAndOffset) Read(decoder Decoder) *DecodingError

func (*MessageAndOffset) Write

func (mo *MessageAndOffset) Write(encoder Encoder)

type Metadata

type Metadata struct {
	Brokers *Brokers
	// contains filtered or unexported fields
}

Metadata is a helper structure that provides access to topic/partition leaders and offset coordinators, caches them, refreshes and invalidates when necessary.

func NewMetadata

func NewMetadata(kafkaClient Client, brokers *Brokers, metadataTTL time.Duration) *Metadata

NewMetadata creates a new Metadata for a given Client and Brokers with metadata cache TTL set to metadataTTL.

func (*Metadata) Invalidate

func (m *Metadata) Invalidate(topic string)

Invalidate forcibly invalidates metadata cache for a given topic so that next time it is requested it is guaranteed to be refreshed.

func (*Metadata) Leader

func (m *Metadata) Leader(topic string, partition int32) (int32, error)

Leader tries to get a leader for a topic and partition from cache. Automatically refreshes if the leader information is missing or expired. May return an error if fails to get a leader for whatever reason.

func (*Metadata) OffsetCoordinator

func (m *Metadata) OffsetCoordinator(group string) (*BrokerConnection, error)

OffsetCoordinator returns a BrokerConnection for an offset coordinator for a given group ID. May return an error if fails to to get metadata for whatever reason.

func (*Metadata) PartitionsFor

func (m *Metadata) PartitionsFor(topic string) ([]int32, error)

PartitionsFor returns a sorted slice of partitions for a given topic. Automatically refreshes metadata if it is missing or expired. May return an error if fails to to get metadata for whatever reason.

func (*Metadata) Refresh

func (m *Metadata) Refresh(topics []string) error

Refresh forces metadata refresh for given topics. If the argument is empty or nil, metadata for all known topics by a Kafka cluster will be requested. May return an error if fails to to get metadata for whatever reason.

func (*Metadata) TopicMetadata

func (m *Metadata) TopicMetadata(topic string) (map[int32]int32, error)

TopicMetadata returns a map where keys are partitions of a topic and values are leader broker IDs. Automatically refreshes metadata if it is missing or expired. May return an error if fails to to get metadata for whatever reason.

type MetadataResponse

type MetadataResponse struct {
	Brokers        []*Broker
	TopicsMetadata []*TopicMetadata
}

MetadataResponse contains information about brokers in cluster and topics that exist.

func (*MetadataResponse) Read

func (tmr *MetadataResponse) Read(decoder Decoder) *DecodingError

type OffsetAndMetadata

type OffsetAndMetadata struct {
	Offset    int64
	Timestamp int64
	Metadata  string
}

OffsetAndMetadata contains offset for a partition and optional metadata.

type OffsetCommitRequest

type OffsetCommitRequest struct {
	GroupID     string
	RequestInfo map[string]map[int32]*OffsetAndMetadata
}

OffsetCommitRequest is used to commit offsets for a group/topic/partition.

func NewOffsetCommitRequest

func NewOffsetCommitRequest(group string) *OffsetCommitRequest

NewOffsetCommitRequest creates a new OffsetCommitRequest for a given consumer group.

func (*OffsetCommitRequest) AddOffset

func (ocr *OffsetCommitRequest) AddOffset(topic string, partition int32, offset int64, timestamp int64, metadata string)

AddOffset is a convenience method to add an offset for a topic partition.

func (*OffsetCommitRequest) Key

func (ocr *OffsetCommitRequest) Key() int16

Key returns the Kafka API key for OffsetCommitRequest.

func (*OffsetCommitRequest) Version

func (ocr *OffsetCommitRequest) Version() int16

Version returns the Kafka request version for backwards compatibility.

func (*OffsetCommitRequest) Write

func (ocr *OffsetCommitRequest) Write(encoder Encoder)

type OffsetCommitResponse

type OffsetCommitResponse struct {
	CommitStatus map[string]map[int32]error
}

OffsetCommitResponse contains errors for partitions if they occur.

func (*OffsetCommitResponse) Read

func (ocr *OffsetCommitResponse) Read(decoder Decoder) *DecodingError

type OffsetFetchRequest

type OffsetFetchRequest struct {
	GroupID     string
	RequestInfo map[string][]int32
}

OffsetFetchRequest is used to fetch offsets for a consumer group and given topic partitions.

func NewOffsetFetchRequest

func NewOffsetFetchRequest(group string) *OffsetFetchRequest

NewOffsetFetchRequest creates a new OffsetFetchRequest for a given consumer group.

func (*OffsetFetchRequest) AddOffset

func (ofr *OffsetFetchRequest) AddOffset(topic string, partition int32)

AddOffset is a convenience method to add a topic partition to this OffsetFetchRequest.

func (*OffsetFetchRequest) Key

func (ofr *OffsetFetchRequest) Key() int16

Key returns the Kafka API key for OffsetFetchRequest.

func (*OffsetFetchRequest) Version

func (ofr *OffsetFetchRequest) Version() int16

Version returns the Kafka request version for backwards compatibility.

func (*OffsetFetchRequest) Write

func (ofr *OffsetFetchRequest) Write(encoder Encoder)

type OffsetFetchResponse

type OffsetFetchResponse struct {
	Offsets map[string]map[int32]*OffsetMetadataAndError
}

OffsetFetchResponse contains fetched offsets for each requested topic partition.

func (*OffsetFetchResponse) Read

func (ofr *OffsetFetchResponse) Read(decoder Decoder) *DecodingError

type OffsetMetadataAndError

type OffsetMetadataAndError struct {
	Offset   int64
	Metadata string
	Error    error
}

OffsetMetadataAndError contains a fetched offset for a topic partition, optional metadata and an error if it occurred.

func (*OffsetMetadataAndError) Read

func (ofr *OffsetMetadataAndError) Read(decoder Decoder) *DecodingError

type OffsetRequest

type OffsetRequest struct {
	RequestInfo map[string][]*PartitionOffsetRequestInfo
}

OffsetRequest describes the valid offset range available for a set of topic-partitions.

func (*OffsetRequest) AddPartitionOffsetRequestInfo

func (or *OffsetRequest) AddPartitionOffsetRequestInfo(topic string, partition int32, time int64, maxNumOffsets int32)

AddPartitionOffsetRequestInfo is a convenience method to add a PartitionOffsetRequestInfo to this request.

func (*OffsetRequest) Key

func (or *OffsetRequest) Key() int16

Key returns the Kafka API key for OffsetRequest.

func (*OffsetRequest) Version

func (or *OffsetRequest) Version() int16

Version returns the Kafka request version for backwards compatibility.

func (*OffsetRequest) Write

func (or *OffsetRequest) Write(encoder Encoder)

type OffsetResponse

type OffsetResponse struct {
	PartitionErrorAndOffsets map[string]map[int32]*PartitionOffsetsResponse
}

OffsetResponse contains the starting offset of each segment for the requested partition as well as the "log end offset" i.e. the offset of the next message that would be appended to the given partition.

func (*OffsetResponse) Read

func (or *OffsetResponse) Read(decoder Decoder) *DecodingError

type PartitionFetchInfo

type PartitionFetchInfo struct {
	Partition int32
	Offset    int64
	FetchSize int32
}

PartitionFetchInfo contains information about what partition to fetch, what offset to fetch from and the maximum bytes to include in the message set for this partition.

type PartitionMetadata

type PartitionMetadata struct {
	Error       error
	PartitionID int32
	Leader      int32
	Replicas    []int32
	ISR         []int32
}

PartitionMetadata contains information about a topic partition - its id, leader, replicas, ISRs and error if it occurred.

func (*PartitionMetadata) Read

func (pm *PartitionMetadata) Read(decoder Decoder) *DecodingError

type PartitionOffsetRequestInfo

type PartitionOffsetRequestInfo struct {
	Partition     int32
	Time          int64
	MaxNumOffsets int32
}

PartitionOffsetRequestInfo contains partition specific configurations to fetch offsets.

type PartitionOffsetsResponse

type PartitionOffsetsResponse struct {
	Error   error
	Offsets []int64
}

PartitionOffsetsResponse contain offsets for a single partition and an error if it occurred.

func (*PartitionOffsetsResponse) Read

func (po *PartitionOffsetsResponse) Read(decoder Decoder) *DecodingError

type ProduceRequest

type ProduceRequest struct {
	RequiredAcks int16
	AckTimeoutMs int32
	Data         map[string]map[int32][]*MessageAndOffset
}

ProduceRequest is used to send message sets to the server.

func (*ProduceRequest) AddMessage

func (pr *ProduceRequest) AddMessage(topic string, partition int32, message *Message)

AddMessage is a convenience method to add a single message to be produced to a topic partition.

func (*ProduceRequest) Key

func (pr *ProduceRequest) Key() int16

Key returns the Kafka API key for ProduceRequest.

func (*ProduceRequest) Version

func (pr *ProduceRequest) Version() int16

Version returns the Kafka request version for backwards compatibility.

func (*ProduceRequest) Write

func (pr *ProduceRequest) Write(encoder Encoder)

type ProduceResponse

type ProduceResponse struct {
	Status map[string]map[int32]*ProduceResponseStatus
}

ProduceResponse contains highest assigned offsets by topic partitions and errors if they occurred.

func (*ProduceResponse) Read

func (pr *ProduceResponse) Read(decoder Decoder) *DecodingError

type ProduceResponseStatus

type ProduceResponseStatus struct {
	Error  error
	Offset int64
}

ProduceResponseStatus contains a highest assigned offset from a ProduceRequest and an error if it occurred.

type Request

type Request interface {
	// Writes the Request to the given Encoder.
	Write(Encoder)

	// Returns the Kafka API key for this Request.
	Key() int16

	// Returns the Kafka request version for backwards compatibility.
	Version() int16
}

Request is a generic interface for any request issued to Kafka. Must be able to identify and write itself.

type RequestHeader

type RequestHeader struct {
	// contains filtered or unexported fields
}

RequestHeader is used to decouple the message header/metadata writing from the actual message. It is able to accept a request and encode/write it according to Kafka Wire Protocol format adding the correlation id and client id to the request.

func NewRequestHeader

func NewRequestHeader(correlationID int32, clientID string, request Request) *RequestHeader

NewRequestHeader creates a new RequestHeader holding the correlation id, client id and the actual request.

func (*RequestHeader) Size

func (rw *RequestHeader) Size() int32

Size returns the size in bytes needed to write this request, including the length field. This value will be used when allocating memory for a byte array.

func (*RequestHeader) Write

func (rw *RequestHeader) Write(encoder Encoder)

Write writes this RequestHeader into a given Encoder.

type Response

type Response interface {
	// Read the Response from the given Decoder. May return a DecodingError if the response is invalid.
	Read(Decoder) *DecodingError
}

Response is a generic interface for any response received from Kafka. Must be able to read itself.

type SizingEncoder

type SizingEncoder struct {
	// contains filtered or unexported fields
}

SizingEncoder is used to determine the size for []byte that will hold the actual encoded data. This is used as an optimization as it is cheaper to run once and determine the size instead of growing the slice dynamically.

func NewSizingEncoder

func NewSizingEncoder() *SizingEncoder

NewSizingEncoder creates a new SizingEncoder

func (*SizingEncoder) Reserve

func (se *SizingEncoder) Reserve(slice UpdatableSlice)

Reserve reserves a place for an updatable slice.

func (*SizingEncoder) Reset

func (se *SizingEncoder) Reset()

Reset resets this encoders value to 0 for reusing.

func (*SizingEncoder) Size

func (se *SizingEncoder) Size() int32

Size returns the size in bytes written to this encoder.

func (*SizingEncoder) UpdateReserved

func (se *SizingEncoder) UpdateReserved()

UpdateReserved tells the last reserved slice to be updated with new data.

func (*SizingEncoder) WriteBytes

func (se *SizingEncoder) WriteBytes(value []byte)

WriteBytes writes a []byte to this encoder.

func (*SizingEncoder) WriteInt16

func (se *SizingEncoder) WriteInt16(int16)

WriteInt16 writes an int16 to this encoder.

func (*SizingEncoder) WriteInt32

func (se *SizingEncoder) WriteInt32(int32)

WriteInt32 writes an int32 to this encoder.

func (*SizingEncoder) WriteInt64

func (se *SizingEncoder) WriteInt64(int64)

WriteInt64 writes an int64 to this encoder.

func (*SizingEncoder) WriteInt8

func (se *SizingEncoder) WriteInt8(int8)

WriteInt8 writes an int8 to this encoder.

func (*SizingEncoder) WriteString

func (se *SizingEncoder) WriteString(value string)

WriteString writes a string to this encoder.

type SyncGroupRequest

type SyncGroupRequest struct {
	GroupID         string
	GenerationID    int32
	MemberID        string
	GroupAssignment map[string][]byte
}

SyncGroupRequest is used to synchronize state for all members of a group.

func (*SyncGroupRequest) Key

func (*SyncGroupRequest) Key() int16

Key returns the Kafka API key for SyncGroupRequest.

func (*SyncGroupRequest) Version

func (*SyncGroupRequest) Version() int16

Version returns the Kafka request version for backwards compatibility.

func (*SyncGroupRequest) Write

func (sgr *SyncGroupRequest) Write(encoder Encoder)

type SyncGroupResponse

type SyncGroupResponse struct {
	Error            error
	MemberAssignment []byte
}

SyncGroupResponse contains information about partition distribution within a group.

func (*SyncGroupResponse) Read

func (sgr *SyncGroupResponse) Read(decoder Decoder) *DecodingError

type TopicMetadata

type TopicMetadata struct {
	Error              error
	Topic              string
	PartitionsMetadata []*PartitionMetadata
}

TopicMetadata contains information about topic - its name, number of partitions, leaders, ISRs and errors if they occur.

func (*TopicMetadata) Read

func (tm *TopicMetadata) Read(decoder Decoder) *DecodingError

type TopicMetadataRequest

type TopicMetadataRequest struct {
	Topics []string
}

TopicMetadataRequest is used to get topics, their partitions, leader brokers for them and where these brokers are located.

func NewMetadataRequest

func NewMetadataRequest(topics []string) *TopicMetadataRequest

NewMetadataRequest creates a new MetadataRequest to fetch metadata for given topics. Passing it an empty slice will request metadata for all topics.

func (*TopicMetadataRequest) Key

func (mr *TopicMetadataRequest) Key() int16

Key returns the Kafka API key for TopicMetadataRequest.

func (*TopicMetadataRequest) Version

func (mr *TopicMetadataRequest) Version() int16

Version returns the Kafka request version for backwards compatibility.

func (*TopicMetadataRequest) Write

func (mr *TopicMetadataRequest) Write(encoder Encoder)

type UpdatableSlice

type UpdatableSlice interface {
	// Returns the length to reserve for this slice.
	GetReserveLength() int

	// Set the current position within the encoder to be updated later.
	SetPosition(int)

	// Get the position within the encoder to be updated later.
	GetPosition() int

	// Update this slice. At this point all necessary data should be written to encoder.
	Update([]byte)
}

UpdatableSlice is an interface that is used when the encoder has to write the value based on bytes that are not yet written (e.g. calculate the CRC of the message).

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL