siesta

package
v0.0.0-...-c4b8a4c Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 21, 2015 License: Apache-2.0, Apache-2.0 Imports: 19 Imported by: 0

README

siesta

A low level Apache Kafka library in Go

Build Status

Installation:

  1. Install Golang http://golang.org/doc/install
  2. Make sure env variables GOPATH and GOROOT exist and point to correct places
  3. go get github.com/stealthly/siesta
  4. go test -v to make sure it works

You may also want to spin up a local broker at localhost:9092 for the functional test to work as well (it will be skipped otherwise).

Documentation

Index

Constants

This section is empty.

Variables

Mapping between Kafka error codes and actual error messages.

View Source
var BrokerNotAvailable = errors.New("Broker is likely not alive.")

A mapping for Kafka error code 8.

View Source
var ConsumerCoordinatorNotAvailableCode = errors.New("Offsets topic has not yet been created.")

A mapping for Kafka error code 14.

View Source
var EOF = errors.New("End of file reached")

Signals that an end of file or stream has been reached unexpectedly.

View Source
var InvalidMessage = errors.New("Message contents does not match its CRC")

A mapping for Kafka error code 2.

View Source
var InvalidMessageSize = errors.New("The message has a negative size")

A mapping for Kafka error code 4.

View Source
var LeaderNotAvailable = errors.New("In the middle of a leadership election and there is currently no leader for this partition and hence it is unavailable for writes.")

A mapping for Kafka error code 5.

View Source
var MessageSizeTooLarge = errors.New("You've just attempted to produce a message of size larger than broker is allowed to accept.")

A mapping for Kafka error code 10.

View Source
var NoDataToUncompress = errors.New("No data to uncompress")

Happens when a compressed message is empty.

View Source
var NoError = errors.New("No error - it worked!")

A mapping for Kafka error code 0.

View Source
var NotCoordinatorForConsumerCode = errors.New("There is no coordinator for this consumer.")

A mapping for Kafka error code 15.

View Source
var NotLeaderForPartition = errors.New("You've just attempted to send messages to a replica that is not the leader for some partition. It indicates that the clients metadata is out of date.")

A mapping for Kafka error code 6.

View Source
var OffsetMetadataTooLargeCode = errors.New("You've jsut specified a string larger than configured maximum for offset metadata.")

A mapping for Kafka error code 12.

View Source
var OffsetOutOfRange = errors.New("The requested offset is outside the range of offsets maintained by the server for the given topic/partition.")

A mapping for Kafka error code 1.

View Source
var OffsetsLoadInProgressCode = errors.New("Offset loading is in progress. (Usually happens after a leader change for that offsets topic partition).")

A mapping for Kafka error code 13.

View Source
var ReplicaNotAvailable = errors.New("Replica is expected on a broker, but is not (this can be safely ignored).")

A mapping for Kafka error code 9.

View Source
var RequestTimedOut = errors.New("Request exceeds the user-specified time limit in the request.")

A mapping for Kafka error code 7.

View Source
var StaleControllerEpochCode = errors.New("Broker-to-broker communication fault.")

A mapping for Kafka error code 11.

View Source
var Unknown = errors.New("An unexpected server error")

A mapping for Kafka error code -1.

View Source
var UnknownTopicOrPartition = errors.New("This request is for a topic or partition that does not exist on this broker.")

A mapping for Kafka error code 3.

Functions

func Critical

func Critical(tag interface{}, message interface{})

Writes a given message with a given tag to log with level Critical.

func Criticalf

func Criticalf(tag interface{}, message interface{}, params ...interface{})

Formats a given message according to given params with a given tag to log with level Critical.

func Debug

func Debug(tag interface{}, message interface{})

Writes a given message with a given tag to log with level Debug.

func Debugf

func Debugf(tag interface{}, message interface{}, params ...interface{})

Formats a given message according to given params with a given tag to log with level Debug.

func Error

func Error(tag interface{}, message interface{})

Writes a given message with a given tag to log with level Error.

func Errorf

func Errorf(tag interface{}, message interface{}, params ...interface{})

Formats a given message according to given params with a given tag to log with level Error.

func Info

func Info(tag interface{}, message interface{})

Writes a given message with a given tag to log with level Info.

func Infof

func Infof(tag interface{}, message interface{}, params ...interface{})

Formats a given message according to given params with a given tag to log with level Info.

func ReadMessageSet

func ReadMessageSet(decoder Decoder) ([]*MessageAndOffset, *DecodingError)

func Trace

func Trace(tag interface{}, message interface{})

Writes a given message with a given tag to log with level Trace.

func Tracef

func Tracef(tag interface{}, message interface{}, params ...interface{})

Formats a given message according to given params with a given tag to log with level Trace.

func Warn

func Warn(tag interface{}, message interface{})

Writes a given message with a given tag to log with level Warn.

func Warnf

func Warnf(tag interface{}, message interface{}, params ...interface{})

Formats a given message according to given params with a given tag to log with level Warn.

Types

type BinaryDecoder

type BinaryDecoder struct {
	// contains filtered or unexported fields
}

BinaryDecoder implements Decoder and is able to decode a Kafka wire protocol message into actual data.

func NewBinaryDecoder

func NewBinaryDecoder(raw []byte) *BinaryDecoder

Creates a new BinaryDecoder that will decode a given []byte.

func (*BinaryDecoder) GetBytes

func (this *BinaryDecoder) GetBytes() ([]byte, error)

Gets a []byte from this decoder. Returns EOF if end of stream is reached.

func (*BinaryDecoder) GetInt16

func (this *BinaryDecoder) GetInt16() (int16, error)

Gets an int16 from this decoder. Returns EOF if end of stream is reached.

func (*BinaryDecoder) GetInt32

func (this *BinaryDecoder) GetInt32() (int32, error)

Gets an int32 from this decoder. Returns EOF if end of stream is reached.

func (*BinaryDecoder) GetInt64

func (this *BinaryDecoder) GetInt64() (int64, error)

Gets an int64 from this decoder. Returns EOF if end of stream is reached.

func (*BinaryDecoder) GetInt8

func (this *BinaryDecoder) GetInt8() (int8, error)

Gets an int8 from this decoder. Returns EOF if end of stream is reached.

func (*BinaryDecoder) GetString

func (this *BinaryDecoder) GetString() (string, error)

Gets a string from this decoder. Returns EOF if end of stream is reached.

func (*BinaryDecoder) Remaining

func (this *BinaryDecoder) Remaining() int

Tells how many bytes left unread in this decoder.

type BinaryEncoder

type BinaryEncoder struct {
	// contains filtered or unexported fields
}

BinaryEncoder implements Decoder and is able to encode actual data into a Kafka wire protocol byte sequence.

func NewBinaryEncoder

func NewBinaryEncoder(buffer []byte) *BinaryEncoder

Creates a new BinaryEncoder that will write into a given []byte.

func (*BinaryEncoder) Reserve

func (this *BinaryEncoder) Reserve(slice UpdatableSlice)

Reserves a place for an updatable slice.

func (*BinaryEncoder) Size

func (this *BinaryEncoder) Size() int32

Returns the size in bytes written to this encoder.

func (*BinaryEncoder) UpdateReserved

func (this *BinaryEncoder) UpdateReserved()

Tells the last reserved slice to be updated with new data.

func (*BinaryEncoder) WriteBytes

func (this *BinaryEncoder) WriteBytes(value []byte)

Writes a []string to this encoder.

func (*BinaryEncoder) WriteInt16

func (this *BinaryEncoder) WriteInt16(value int16)

Writes an int16 to this encoder.

func (*BinaryEncoder) WriteInt32

func (this *BinaryEncoder) WriteInt32(value int32)

Writes an int32 to this encoder.

func (*BinaryEncoder) WriteInt64

func (this *BinaryEncoder) WriteInt64(value int64)

Writes an int64 to this encoder.

func (*BinaryEncoder) WriteInt8

func (this *BinaryEncoder) WriteInt8(value int8)

Writes an int8 to this encoder.

func (*BinaryEncoder) WriteString

func (this *BinaryEncoder) WriteString(value string)

Writes a string to this encoder.

type Broker

type Broker struct {
	NodeId int32
	Host   string
	Port   int32
}

func (*Broker) Read

func (this *Broker) Read(decoder Decoder) *DecodingError

func (*Broker) String

func (this *Broker) String() string

type CompressionCodec

type CompressionCodec int
const (
	CompressionNone   CompressionCodec = 0
	CompressionGZip   CompressionCodec = 1
	CompressionSnappy CompressionCodec = 2
	CompressionLZ4    CompressionCodec = 3
)

type Connector

type Connector interface {
	// GetTopicMetadata is primarily used to discover leaders for given topics and how many partitions these topics have.
	// Passing it an empty topic list will retrieve metadata for all topics in a cluster.
	GetTopicMetadata(topics []string) (*TopicMetadataResponse, error)

	// GetAvailableOffset issues an offset request to a specified topic and partition with a given offset time.
	// More on offset time here - https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetRequest
	GetAvailableOffset(topic string, partition int32, offsetTime OffsetTime) (int64, error)

	// Consume issues a single fetch request to a broker responsible for a given topic and partition and returns messages starting from a given offset.
	Consume(topic string, partition int32, offset int64) ([]*Message, error)

	// GetOffset is a part of new offset management API. Not fully functional yet.
	GetOffset(group string, topic string, partition int32) (int64, error)

	// Tells the Connector to close all existing connections and stop.
	// This method is NOT blocking but returns a channel which will get a single value once the closing is finished.
	Close() <-chan bool
}

Connector is an interface that should provide ways to clearly interact with Kafka cluster and hide all broker management stuff from user.

type ConnectorConfig

type ConnectorConfig struct {
	// BrokerList is a bootstrap list to discover other brokers in a cluster. At least one broker is required.
	BrokerList []string

	// ReadTimeout is a timeout to read the response from a TCP socket.
	ReadTimeout time.Duration

	// WriteTimeout is a timeout to write the request to a TCP socket.
	WriteTimeout time.Duration

	// ConnectTimeout is a timeout to connect to a TCP socket.
	ConnectTimeout time.Duration

	// Sets whether the connection should be kept alive.
	KeepAlive bool

	// A keep alive period for a TCP connection.
	KeepAliveTimeout time.Duration

	// Maximum number of open connections for a connector.
	MaxConnections int

	// Maximum number of open connections for a single broker for a connector.
	MaxConnectionsPerBroker int

	// Maximum fetch size in bytes which will be used in all Consume() calls.
	FetchSize int32

	// Number of retries to get topic metadata.
	MetadataRetries int

	// Backoff value between topic metadata requests.
	MetadataBackoff time.Duration

	// Client id that will be used by a connector to identify client requests by broker.
	ClientId string
}

ConnectorConfig is used to pass multiple configuration values for a Connector

func NewConnectorConfig

func NewConnectorConfig() *ConnectorConfig

Returns a new ConnectorConfig with sane defaults.

func (*ConnectorConfig) Validate

func (this *ConnectorConfig) Validate() error

Validates this ConnectorConfig. Returns a corresponding error if the ConnectorConfig is invalid and nil otherwise.

type ConsumerMetadataRequest

type ConsumerMetadataRequest struct {
	ConsumerGroup string
}

func NewConsumerMetadataRequest

func NewConsumerMetadataRequest(group string) *ConsumerMetadataRequest

func (*ConsumerMetadataRequest) Key

func (this *ConsumerMetadataRequest) Key() int16

func (*ConsumerMetadataRequest) Version

func (this *ConsumerMetadataRequest) Version() int16

func (*ConsumerMetadataRequest) Write

func (this *ConsumerMetadataRequest) Write(encoder Encoder)

type ConsumerMetadataResponse

type ConsumerMetadataResponse struct {
	Error           error
	CoordinatorId   int32
	CoordinatorHost string
	CoordinatorPort int32
}

func (*ConsumerMetadataResponse) Read

func (this *ConsumerMetadataResponse) Read(decoder Decoder) *DecodingError

type CrcSlice

type CrcSlice struct {
	// contains filtered or unexported fields
}

CrcSlice is used to calculate the CRC32 value of the message.

func (*CrcSlice) GetPosition

func (this *CrcSlice) GetPosition() int

Update this slice. At this point all necessary data should be written to encoder.

func (*CrcSlice) GetReserveLength

func (this *CrcSlice) GetReserveLength() int

Returns the length to reserve for this slice.

func (*CrcSlice) SetPosition

func (this *CrcSlice) SetPosition(pos int)

Set the current position within the encoder to be updated later.

func (*CrcSlice) Update

func (this *CrcSlice) Update(slice []byte)

Update this slice. At this point all necessary data should be written to encoder.

type Decoder

type Decoder interface {
	// Gets an int8 from this decoder. Returns EOF if end of stream is reached.
	GetInt8() (int8, error)

	// Gets an int16 from this decoder. Returns EOF if end of stream is reached.
	GetInt16() (int16, error)

	// Gets an int32 from this decoder. Returns EOF if end of stream is reached.
	GetInt32() (int32, error)

	// Gets an int64 from this decoder. Returns EOF if end of stream is reached.
	GetInt64() (int64, error)

	// Gets a []byte from this decoder. Returns EOF if end of stream is reached.
	GetBytes() ([]byte, error)

	// Gets a string from this decoder. Returns EOF if end of stream is reached.
	GetString() (string, error)

	// Tells how many bytes left unread in this decoder.
	Remaining() int
}

Decoder is able to decode a Kafka wire protocol message into actual data.

type DecodingError

type DecodingError struct {
	// contains filtered or unexported fields
}

A DecodingError is an error that also holds the information about why it happened.

func NewDecodingError

func NewDecodingError(err error, reason string) *DecodingError

Creates a new DecodingError with a given error message and reason.

func (*DecodingError) Error

func (this *DecodingError) Error() error

Returns the error message for this DecodingError.

func (*DecodingError) Reason

func (this *DecodingError) Reason() string

Returns the reason for this DecodingError.

type DefaultConnector

type DefaultConnector struct {
	// contains filtered or unexported fields
}

A default (and only one for now) Connector implementation for Siesta library.

func NewDefaultConnector

func NewDefaultConnector(config *ConnectorConfig) (*DefaultConnector, error)

Creates a new DefaultConnector with a given ConnectorConfig. May return an error if the passed config is invalid.

func (*DefaultConnector) Close

func (this *DefaultConnector) Close() <-chan bool

Tells the Connector to close all existing connections and stop. This method is NOT blocking but returns a channel which will get a single value once the closing is finished.

func (*DefaultConnector) Consume

func (this *DefaultConnector) Consume(topic string, partition int32, offset int64) ([]*Message, error)

Consume issues a single fetch request to a broker responsible for a given topic and partition and returns messages starting from a given offset.

func (*DefaultConnector) GetAvailableOffset

func (this *DefaultConnector) GetAvailableOffset(topic string, partition int32, offsetTime OffsetTime) (int64, error)

GetAvailableOffset issues an offset request to a specified topic and partition with a given offset time.

func (*DefaultConnector) GetOffset

func (this *DefaultConnector) GetOffset(group string, topic string, partition int32) (int64, error)

GetOffset is a part of new offset management API. Not fully functional yet.

func (*DefaultConnector) GetTopicMetadata

func (this *DefaultConnector) GetTopicMetadata(topics []string) (*TopicMetadataResponse, error)

GetTopicMetadata is primarily used to discover leaders for given topics and how many partitions these topics have. Passing it an empty topic list will retrieve metadata for all topics in a cluster.

func (*DefaultConnector) String

func (this *DefaultConnector) String() string

Returns a string representation of this DefaultConnector.

type DefaultLogger

type DefaultLogger struct {
	// contains filtered or unexported fields
}

Default implementation of KafkaLogger interface used in this client.

func NewDefaultLogger

func NewDefaultLogger(Level LogLevel) *DefaultLogger

Creates a new DefaultLogger that is configured to write messages to console with minimum log level Level.

func (*DefaultLogger) Critical

func (dl *DefaultLogger) Critical(message string, params ...interface{})

Formats a given message according to given params to log with level Critical.

func (*DefaultLogger) Debug

func (dl *DefaultLogger) Debug(message string, params ...interface{})

Formats a given message according to given params to log with level Debug.

func (*DefaultLogger) Error

func (dl *DefaultLogger) Error(message string, params ...interface{})

Formats a given message according to given params to log with level Error.

func (*DefaultLogger) Info

func (dl *DefaultLogger) Info(message string, params ...interface{})

Formats a given message according to given params to log with level Info.

func (*DefaultLogger) Trace

func (dl *DefaultLogger) Trace(message string, params ...interface{})

Formats a given message according to given params to log with level Trace.

func (*DefaultLogger) Warn

func (dl *DefaultLogger) Warn(message string, params ...interface{})

Formats a given message according to given params to log with level Warn.

type Encoder

type Encoder interface {
	// Writes an int8 to this encoder.
	WriteInt8(int8)

	// Writes an int16 to this encoder.
	WriteInt16(int16)

	// Writes an int32 to this encoder.
	WriteInt32(int32)

	// Writes an int64 to this encoder.
	WriteInt64(int64)

	// Writes a []byte to this encoder.
	WriteBytes([]byte)

	// Writes a string to this encoder.
	WriteString(string)

	// Returns the size in bytes written to this encoder.
	Size() int32

	// Reserves a place for an updatable slice.
	// This is used as an optimization for length and crc fields.
	// The encoder reserves a place for this data and updates it later instead of pre-calculating it and doing redundant work.
	Reserve(UpdatableSlice)

	// Tells the last reserved slice to be updated with new data.
	UpdateReserved()
}

Decoder is able to encode actual data into a Kafka wire protocol byte sequence.

type FetchRequest

type FetchRequest struct {
	MaxWaitTime int32
	MinBytes    int32
	RequestInfo map[string][]*PartitionFetchInfo
}

func (*FetchRequest) AddFetch

func (this *FetchRequest) AddFetch(topic string, partition int32, offset int64, fetchSize int32)

func (*FetchRequest) Key

func (this *FetchRequest) Key() int16

func (*FetchRequest) Version

func (this *FetchRequest) Version() int16

func (*FetchRequest) Write

func (this *FetchRequest) Write(encoder Encoder)

type FetchResponse

type FetchResponse struct {
	Blocks map[string]map[int32]*FetchResponseData
}

func (*FetchResponse) GetMessages

func (this *FetchResponse) GetMessages() ([]*Message, error)

func (*FetchResponse) Read

func (this *FetchResponse) Read(decoder Decoder) *DecodingError

type FetchResponseData

type FetchResponseData struct {
	Error               error
	HighwaterMarkOffset int64
	Messages            []*MessageAndOffset
}

func (*FetchResponseData) Read

func (this *FetchResponseData) Read(decoder Decoder) *DecodingError

type FetchedOffset

type FetchedOffset struct {
	Offset   int64
	Metadata string
	Error    error
}

func (*FetchedOffset) Read

func (this *FetchedOffset) Read(decoder Decoder) *DecodingError

type KafkaLogger

type KafkaLogger interface {
	//Formats a given message according to given params to log with level Trace.
	Trace(message string, params ...interface{})

	//Formats a given message according to given params to log with level Debug.
	Debug(message string, params ...interface{})

	//Formats a given message according to given params to log with level Info.
	Info(message string, params ...interface{})

	//Formats a given message according to given params to log with level Warn.
	Warn(message string, params ...interface{})

	//Formats a given message according to given params to log with level Error.
	Error(message string, params ...interface{})

	//Formats a given message according to given params to log with level Critical.
	Critical(message string, params ...interface{})
}

Logger interface. Lets you plug-in your custom logging library instead of using built-in one.

Logger used by this client. Defaults to build-in logger with Info log level.

type LengthSlice

type LengthSlice struct {
	// contains filtered or unexported fields
}

LengthSlice is used to determine the length of upcoming message.

func (*LengthSlice) GetPosition

func (this *LengthSlice) GetPosition() int

Get the position within the encoder to be updated later.

func (*LengthSlice) GetReserveLength

func (this *LengthSlice) GetReserveLength() int

Returns the length to reserve for this slice.

func (*LengthSlice) SetPosition

func (this *LengthSlice) SetPosition(pos int)

Set the current position within the encoder to be updated later.

func (*LengthSlice) Update

func (this *LengthSlice) Update(slice []byte)

Update this slice. At this point all necessary data should be written to encoder.

type LogLevel

type LogLevel string

Represents a logging level

const (
	//Use TraceLevel for debugging to find problems in functions, variables etc.
	TraceLevel LogLevel = "trace"

	//Use DebugLevel for detailed system reports and diagnostic messages.
	DebugLevel LogLevel = "debug"

	//Use InfoLevel for general information about a running application.
	InfoLevel LogLevel = "info"

	//Use WarnLevel to indicate small errors and failures that should not happen normally but are recovered automatically.
	WarnLevel LogLevel = "warn"

	//Use ErrorLevel to indicate severe errors that affect application workflow and are not handled automatically.
	ErrorLevel LogLevel = "error"

	//Use CriticalLevel to indicate fatal errors that may cause data corruption or loss.
	CriticalLevel LogLevel = "critical"
)

type Message

type Message struct {
	Topic     string
	Partition int32
	Offset    int64
	Key       []byte
	Value     []byte
}

type MessageAndOffset

type MessageAndOffset struct {
	Offset  int64
	Message *MessageData
}

func (*MessageAndOffset) Read

func (this *MessageAndOffset) Read(decoder Decoder) *DecodingError

func (*MessageAndOffset) Write

func (this *MessageAndOffset) Write(encoder Encoder)

type MessageData

type MessageData struct {
	Crc        int32
	MagicByte  int8
	Attributes int8
	Key        []byte
	Value      []byte

	Nested []*MessageAndOffset
}

func (*MessageData) Read

func (this *MessageData) Read(decoder Decoder) *DecodingError

func (*MessageData) Write

func (this *MessageData) Write(encoder Encoder)

TODO compress and write if needed

type OffsetAndMetadata

type OffsetAndMetadata struct {
	Offset    int64
	TimeStamp int64
	Metadata  string
}

type OffsetCommitRequest

type OffsetCommitRequest struct {
	ConsumerGroup string
	Offsets       map[string]map[int32]*OffsetAndMetadata
}

func NewOffsetCommitRequest

func NewOffsetCommitRequest(group string) *OffsetCommitRequest

func (*OffsetCommitRequest) AddOffset

func (this *OffsetCommitRequest) AddOffset(topic string, partition int32, offset int64, timestamp int64, metadata string)

func (*OffsetCommitRequest) Key

func (this *OffsetCommitRequest) Key() int16

func (*OffsetCommitRequest) Version

func (this *OffsetCommitRequest) Version() int16

func (*OffsetCommitRequest) Write

func (this *OffsetCommitRequest) Write(encoder Encoder)

type OffsetCommitResponse

type OffsetCommitResponse struct {
	Offsets map[string]map[int32]error
}

func (*OffsetCommitResponse) Read

func (this *OffsetCommitResponse) Read(decoder Decoder) *DecodingError

type OffsetFetchRequest

type OffsetFetchRequest struct {
	ConsumerGroup string
	Offsets       map[string][]int32
}

func NewOffsetFetchRequest

func NewOffsetFetchRequest(group string) *OffsetFetchRequest

func (*OffsetFetchRequest) AddOffset

func (this *OffsetFetchRequest) AddOffset(topic string, partition int32)

func (*OffsetFetchRequest) Key

func (this *OffsetFetchRequest) Key() int16

func (*OffsetFetchRequest) Version

func (this *OffsetFetchRequest) Version() int16

func (*OffsetFetchRequest) Write

func (this *OffsetFetchRequest) Write(encoder Encoder)

type OffsetFetchResponse

type OffsetFetchResponse struct {
	Offsets map[string]map[int32]*FetchedOffset
}

func (*OffsetFetchResponse) Read

func (this *OffsetFetchResponse) Read(decoder Decoder) *DecodingError

type OffsetRequest

type OffsetRequest struct {
	RequestInfo map[string][]*PartitionOffsetRequestInfo
}

func (*OffsetRequest) AddPartitionOffsetRequestInfo

func (this *OffsetRequest) AddPartitionOffsetRequestInfo(topic string, partition int32, time OffsetTime, maxNumOffsets int32)

func (*OffsetRequest) Key

func (this *OffsetRequest) Key() int16

func (*OffsetRequest) Version

func (this *OffsetRequest) Version() int16

func (*OffsetRequest) Write

func (this *OffsetRequest) Write(encoder Encoder)

type OffsetResponse

type OffsetResponse struct {
	Offsets map[string]map[int32]*PartitionOffsets
}

func (*OffsetResponse) Read

func (this *OffsetResponse) Read(decoder Decoder) *DecodingError

type OffsetTime

type OffsetTime int64
const EarliestTime OffsetTime = -2
const LatestTime OffsetTime = -1

type PartitionFetchInfo

type PartitionFetchInfo struct {
	Partition   int32
	FetchOffset int64
	FetchSize   int32
}

type PartitionMetadata

type PartitionMetadata struct {
	Error       error
	PartitionId int32
	Leader      int32
	Replicas    []int32
	Isr         []int32
}

func (*PartitionMetadata) Read

func (this *PartitionMetadata) Read(decoder Decoder) *DecodingError

type PartitionOffsetRequestInfo

type PartitionOffsetRequestInfo struct {
	Partition     int32
	Time          OffsetTime
	MaxNumOffsets int32
}

type PartitionOffsets

type PartitionOffsets struct {
	Error   error
	Offsets []int64
}

func (*PartitionOffsets) Read

func (this *PartitionOffsets) Read(decoder Decoder) *DecodingError

type ProduceRequest

type ProduceRequest struct {
	RequiredAcks int16
	Timeout      int32
	Messages     map[string]map[int32][]*MessageAndOffset
}

func (*ProduceRequest) AddMessage

func (this *ProduceRequest) AddMessage(topic string, partition int32, message *MessageData)

func (*ProduceRequest) Key

func (this *ProduceRequest) Key() int16

func (*ProduceRequest) Version

func (this *ProduceRequest) Version() int16

func (*ProduceRequest) Write

func (this *ProduceRequest) Write(encoder Encoder)

type ProduceResponse

type ProduceResponse struct {
	Blocks map[string]map[int32]*ProduceResponseData
}

func (*ProduceResponse) Read

func (this *ProduceResponse) Read(decoder Decoder) *DecodingError

type ProduceResponseData

type ProduceResponseData struct {
	Error  error
	Offset int64
}

type Request

type Request interface {
	// Writes the Request to the given Encoder.
	Write(Encoder)

	// Returns the Kafka API key for this Request.
	Key() int16

	// Returns the Kafka request version for backwards compatibility.
	Version() int16
}

A generic interface for any request issued to Kafka. Must be able to identify and write itself.

type RequestWriter

type RequestWriter struct {
	// contains filtered or unexported fields
}

RequestWriter is used to decouple the message header/metadata writing from the actual message. It is able to accept a request and encode/write it according to Kafka Wire Protocol format adding the correlation id and client id to the request.

func NewRequestWriter

func NewRequestWriter(correlationId int32, clientId string, request Request) *RequestWriter

Creates a new RequestWriter holding the correlation id, client id and the actual request.

func (*RequestWriter) Size

func (this *RequestWriter) Size() int32

Returns the size in bytes needed to write this request, including the length field. This value will be used when allocating memory for a byte array.

func (*RequestWriter) Write

func (this *RequestWriter) Write(encoder Encoder)

Writes itself into a given Encoder.

type Response

type Response interface {
	// Read the Response from the given Decoder. May return a DecodingError if the response is invalid.
	Read(Decoder) *DecodingError
}

A generic interface for any response received from Kafka. Must be able to read itself.

type SizingEncoder

type SizingEncoder struct {
	// contains filtered or unexported fields
}

SizingEncoder is used to determine the size for []byte that will hold the actual encoded data. This is used as an optimization as it is cheaper to run once and determine the size instead of growing the slice dynamically.

func NewSizingEncoder

func NewSizingEncoder() *SizingEncoder

Creates a new SizingEncoder

func (*SizingEncoder) Reserve

func (this *SizingEncoder) Reserve(slice UpdatableSlice)

Reserves a place for an updatable slice.

func (*SizingEncoder) Size

func (this *SizingEncoder) Size() int32

Returns the size in bytes written to this encoder.

func (*SizingEncoder) UpdateReserved

func (this *SizingEncoder) UpdateReserved()

Tells the last reserved slice to be updated with new data.

func (*SizingEncoder) WriteBytes

func (this *SizingEncoder) WriteBytes(value []byte)

Writes a []byte to this encoder.

func (*SizingEncoder) WriteInt16

func (this *SizingEncoder) WriteInt16(int16)

Writes an int16 to this encoder.

func (*SizingEncoder) WriteInt32

func (this *SizingEncoder) WriteInt32(int32)

Writes an int32 to this encoder.

func (*SizingEncoder) WriteInt64

func (this *SizingEncoder) WriteInt64(int64)

Writes an int64 to this encoder.

func (*SizingEncoder) WriteInt8

func (this *SizingEncoder) WriteInt8(int8)

Writes an int8 to this encoder.

func (*SizingEncoder) WriteString

func (this *SizingEncoder) WriteString(value string)

Writes a string to this encoder.

type TopicMetadata

type TopicMetadata struct {
	Error             error
	TopicName         string
	PartitionMetadata []*PartitionMetadata
}

func (*TopicMetadata) Read

func (this *TopicMetadata) Read(decoder Decoder) *DecodingError

type TopicMetadataRequest

type TopicMetadataRequest struct {
	Topics []string
}

func NewTopicMetadataRequest

func NewTopicMetadataRequest(topics []string) *TopicMetadataRequest

func (*TopicMetadataRequest) Key

func (this *TopicMetadataRequest) Key() int16

func (*TopicMetadataRequest) Version

func (this *TopicMetadataRequest) Version() int16

func (*TopicMetadataRequest) Write

func (this *TopicMetadataRequest) Write(encoder Encoder)

type TopicMetadataResponse

type TopicMetadataResponse struct {
	Brokers       []*Broker
	TopicMetadata []*TopicMetadata
}

func (*TopicMetadataResponse) Read

func (this *TopicMetadataResponse) Read(decoder Decoder) *DecodingError

type UpdatableSlice

type UpdatableSlice interface {
	// Returns the length to reserve for this slice.
	GetReserveLength() int

	// Set the current position within the encoder to be updated later.
	SetPosition(int)

	// Get the position within the encoder to be updated later.
	GetPosition() int

	// Update this slice. At this point all necessary data should be written to encoder.
	Update([]byte)
}

UpdatableSlice is an interface that is used when the encoder has to write the value based on bytes that are not yet written (e.g. calculate the CRC of the message).

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL