Documentation ¶
Index ¶
- Variables
- func Critical(tag interface{}, message interface{})
- func Criticalf(tag interface{}, message interface{}, params ...interface{})
- func Debug(tag interface{}, message interface{})
- func Debugf(tag interface{}, message interface{}, params ...interface{})
- func Error(tag interface{}, message interface{})
- func Errorf(tag interface{}, message interface{}, params ...interface{})
- func Info(tag interface{}, message interface{})
- func Infof(tag interface{}, message interface{}, params ...interface{})
- func ReadMessageSet(decoder Decoder) ([]*MessageAndOffset, *DecodingError)
- func Trace(tag interface{}, message interface{})
- func Tracef(tag interface{}, message interface{}, params ...interface{})
- func Warn(tag interface{}, message interface{})
- func Warnf(tag interface{}, message interface{}, params ...interface{})
- type BinaryDecoder
- func (this *BinaryDecoder) GetBytes() ([]byte, error)
- func (this *BinaryDecoder) GetInt16() (int16, error)
- func (this *BinaryDecoder) GetInt32() (int32, error)
- func (this *BinaryDecoder) GetInt64() (int64, error)
- func (this *BinaryDecoder) GetInt8() (int8, error)
- func (this *BinaryDecoder) GetString() (string, error)
- func (this *BinaryDecoder) Remaining() int
- type BinaryEncoder
- func (this *BinaryEncoder) Reserve(slice UpdatableSlice)
- func (this *BinaryEncoder) Size() int32
- func (this *BinaryEncoder) UpdateReserved()
- func (this *BinaryEncoder) WriteBytes(value []byte)
- func (this *BinaryEncoder) WriteInt16(value int16)
- func (this *BinaryEncoder) WriteInt32(value int32)
- func (this *BinaryEncoder) WriteInt64(value int64)
- func (this *BinaryEncoder) WriteInt8(value int8)
- func (this *BinaryEncoder) WriteString(value string)
- type Broker
- type CompressionCodec
- type Connector
- type ConnectorConfig
- type ConsumerMetadataRequest
- type ConsumerMetadataResponse
- type CrcSlice
- type Decoder
- type DecodingError
- type DefaultConnector
- func (this *DefaultConnector) Close() <-chan bool
- func (this *DefaultConnector) Consume(topic string, partition int32, offset int64) ([]*Message, error)
- func (this *DefaultConnector) GetAvailableOffset(topic string, partition int32, offsetTime OffsetTime) (int64, error)
- func (this *DefaultConnector) GetOffset(group string, topic string, partition int32) (int64, error)
- func (this *DefaultConnector) GetTopicMetadata(topics []string) (*TopicMetadataResponse, error)
- func (this *DefaultConnector) String() string
- type DefaultLogger
- func (dl *DefaultLogger) Critical(message string, params ...interface{})
- func (dl *DefaultLogger) Debug(message string, params ...interface{})
- func (dl *DefaultLogger) Error(message string, params ...interface{})
- func (dl *DefaultLogger) Info(message string, params ...interface{})
- func (dl *DefaultLogger) Trace(message string, params ...interface{})
- func (dl *DefaultLogger) Warn(message string, params ...interface{})
- type Encoder
- type FetchRequest
- type FetchResponse
- type FetchResponseData
- type FetchedOffset
- type KafkaLogger
- type LengthSlice
- type LogLevel
- type Message
- type MessageAndOffset
- type MessageData
- type OffsetAndMetadata
- type OffsetCommitRequest
- type OffsetCommitResponse
- type OffsetFetchRequest
- type OffsetFetchResponse
- type OffsetRequest
- type OffsetResponse
- type OffsetTime
- type PartitionFetchInfo
- type PartitionMetadata
- type PartitionOffsetRequestInfo
- type PartitionOffsets
- type ProduceRequest
- type ProduceResponse
- type ProduceResponseData
- type Request
- type RequestWriter
- type Response
- type SizingEncoder
- func (this *SizingEncoder) Reserve(slice UpdatableSlice)
- func (this *SizingEncoder) Size() int32
- func (this *SizingEncoder) UpdateReserved()
- func (this *SizingEncoder) WriteBytes(value []byte)
- func (this *SizingEncoder) WriteInt16(int16)
- func (this *SizingEncoder) WriteInt32(int32)
- func (this *SizingEncoder) WriteInt64(int64)
- func (this *SizingEncoder) WriteInt8(int8)
- func (this *SizingEncoder) WriteString(value string)
- type TopicMetadata
- type TopicMetadataRequest
- type TopicMetadataResponse
- type UpdatableSlice
Constants ¶
This section is empty.
Variables ¶
var BrokerErrors = map[int16]error{ -1: Unknown, 0: NoError, 1: OffsetOutOfRange, 2: InvalidMessage, 3: UnknownTopicOrPartition, 4: InvalidMessageSize, 5: LeaderNotAvailable, 6: NotLeaderForPartition, 7: RequestTimedOut, 8: BrokerNotAvailable, 9: ReplicaNotAvailable, 10: MessageSizeTooLarge, 11: StaleControllerEpochCode, 12: OffsetMetadataTooLargeCode, 14: OffsetsLoadInProgressCode, 15: ConsumerCoordinatorNotAvailableCode, 16: NotCoordinatorForConsumerCode, }
Mapping between Kafka error codes and actual error messages.
var BrokerNotAvailable = errors.New("Broker is likely not alive.")
A mapping for Kafka error code 8.
var ConsumerCoordinatorNotAvailableCode = errors.New("Offsets topic has not yet been created.")
A mapping for Kafka error code 14.
var EOF = errors.New("End of file reached")
Signals that an end of file or stream has been reached unexpectedly.
var InvalidMessage = errors.New("Message contents does not match its CRC")
A mapping for Kafka error code 2.
var InvalidMessageSize = errors.New("The message has a negative size")
A mapping for Kafka error code 4.
var LeaderNotAvailable = errors.New("In the middle of a leadership election and there is currently no leader for this partition and hence it is unavailable for writes.")
A mapping for Kafka error code 5.
var MessageSizeTooLarge = errors.New("You've just attempted to produce a message of size larger than broker is allowed to accept.")
A mapping for Kafka error code 10.
var NoDataToUncompress = errors.New("No data to uncompress")
Happens when a compressed message is empty.
var NoError = errors.New("No error - it worked!")
A mapping for Kafka error code 0.
var NotCoordinatorForConsumerCode = errors.New("There is no coordinator for this consumer.")
A mapping for Kafka error code 15.
var NotLeaderForPartition = errors.New("You've just attempted to send messages to a replica that is not the leader for some partition. It indicates that the clients metadata is out of date.")
A mapping for Kafka error code 6.
var OffsetMetadataTooLargeCode = errors.New("You've jsut specified a string larger than configured maximum for offset metadata.")
A mapping for Kafka error code 12.
var OffsetOutOfRange = errors.New("The requested offset is outside the range of offsets maintained by the server for the given topic/partition.")
A mapping for Kafka error code 1.
var OffsetsLoadInProgressCode = errors.New("Offset loading is in progress. (Usually happens after a leader change for that offsets topic partition).")
A mapping for Kafka error code 13.
var ReplicaNotAvailable = errors.New("Replica is expected on a broker, but is not (this can be safely ignored).")
A mapping for Kafka error code 9.
var RequestTimedOut = errors.New("Request exceeds the user-specified time limit in the request.")
A mapping for Kafka error code 7.
var StaleControllerEpochCode = errors.New("Broker-to-broker communication fault.")
A mapping for Kafka error code 11.
var Unknown = errors.New("An unexpected server error")
A mapping for Kafka error code -1.
var UnknownTopicOrPartition = errors.New("This request is for a topic or partition that does not exist on this broker.")
A mapping for Kafka error code 3.
Functions ¶
func Critical ¶
func Critical(tag interface{}, message interface{})
Writes a given message with a given tag to log with level Critical.
func Criticalf ¶
func Criticalf(tag interface{}, message interface{}, params ...interface{})
Formats a given message according to given params with a given tag to log with level Critical.
func Debug ¶
func Debug(tag interface{}, message interface{})
Writes a given message with a given tag to log with level Debug.
func Debugf ¶
func Debugf(tag interface{}, message interface{}, params ...interface{})
Formats a given message according to given params with a given tag to log with level Debug.
func Error ¶
func Error(tag interface{}, message interface{})
Writes a given message with a given tag to log with level Error.
func Errorf ¶
func Errorf(tag interface{}, message interface{}, params ...interface{})
Formats a given message according to given params with a given tag to log with level Error.
func Info ¶
func Info(tag interface{}, message interface{})
Writes a given message with a given tag to log with level Info.
func Infof ¶
func Infof(tag interface{}, message interface{}, params ...interface{})
Formats a given message according to given params with a given tag to log with level Info.
func ReadMessageSet ¶
func ReadMessageSet(decoder Decoder) ([]*MessageAndOffset, *DecodingError)
func Trace ¶
func Trace(tag interface{}, message interface{})
Writes a given message with a given tag to log with level Trace.
func Tracef ¶
func Tracef(tag interface{}, message interface{}, params ...interface{})
Formats a given message according to given params with a given tag to log with level Trace.
Types ¶
type BinaryDecoder ¶
type BinaryDecoder struct {
// contains filtered or unexported fields
}
BinaryDecoder implements Decoder and is able to decode a Kafka wire protocol message into actual data.
func NewBinaryDecoder ¶
func NewBinaryDecoder(raw []byte) *BinaryDecoder
Creates a new BinaryDecoder that will decode a given []byte.
func (*BinaryDecoder) GetBytes ¶
func (this *BinaryDecoder) GetBytes() ([]byte, error)
Gets a []byte from this decoder. Returns EOF if end of stream is reached.
func (*BinaryDecoder) GetInt16 ¶
func (this *BinaryDecoder) GetInt16() (int16, error)
Gets an int16 from this decoder. Returns EOF if end of stream is reached.
func (*BinaryDecoder) GetInt32 ¶
func (this *BinaryDecoder) GetInt32() (int32, error)
Gets an int32 from this decoder. Returns EOF if end of stream is reached.
func (*BinaryDecoder) GetInt64 ¶
func (this *BinaryDecoder) GetInt64() (int64, error)
Gets an int64 from this decoder. Returns EOF if end of stream is reached.
func (*BinaryDecoder) GetInt8 ¶
func (this *BinaryDecoder) GetInt8() (int8, error)
Gets an int8 from this decoder. Returns EOF if end of stream is reached.
func (*BinaryDecoder) GetString ¶
func (this *BinaryDecoder) GetString() (string, error)
Gets a string from this decoder. Returns EOF if end of stream is reached.
func (*BinaryDecoder) Remaining ¶
func (this *BinaryDecoder) Remaining() int
Tells how many bytes left unread in this decoder.
type BinaryEncoder ¶
type BinaryEncoder struct {
// contains filtered or unexported fields
}
BinaryEncoder implements Decoder and is able to encode actual data into a Kafka wire protocol byte sequence.
func NewBinaryEncoder ¶
func NewBinaryEncoder(buffer []byte) *BinaryEncoder
Creates a new BinaryEncoder that will write into a given []byte.
func (*BinaryEncoder) Reserve ¶
func (this *BinaryEncoder) Reserve(slice UpdatableSlice)
Reserves a place for an updatable slice.
func (*BinaryEncoder) Size ¶
func (this *BinaryEncoder) Size() int32
Returns the size in bytes written to this encoder.
func (*BinaryEncoder) UpdateReserved ¶
func (this *BinaryEncoder) UpdateReserved()
Tells the last reserved slice to be updated with new data.
func (*BinaryEncoder) WriteBytes ¶
func (this *BinaryEncoder) WriteBytes(value []byte)
Writes a []string to this encoder.
func (*BinaryEncoder) WriteInt16 ¶
func (this *BinaryEncoder) WriteInt16(value int16)
Writes an int16 to this encoder.
func (*BinaryEncoder) WriteInt32 ¶
func (this *BinaryEncoder) WriteInt32(value int32)
Writes an int32 to this encoder.
func (*BinaryEncoder) WriteInt64 ¶
func (this *BinaryEncoder) WriteInt64(value int64)
Writes an int64 to this encoder.
func (*BinaryEncoder) WriteInt8 ¶
func (this *BinaryEncoder) WriteInt8(value int8)
Writes an int8 to this encoder.
func (*BinaryEncoder) WriteString ¶
func (this *BinaryEncoder) WriteString(value string)
Writes a string to this encoder.
type CompressionCodec ¶
type CompressionCodec int
const ( CompressionNone CompressionCodec = 0 CompressionGZip CompressionCodec = 1 CompressionSnappy CompressionCodec = 2 CompressionLZ4 CompressionCodec = 3 )
type Connector ¶
type Connector interface { // GetTopicMetadata is primarily used to discover leaders for given topics and how many partitions these topics have. // Passing it an empty topic list will retrieve metadata for all topics in a cluster. GetTopicMetadata(topics []string) (*TopicMetadataResponse, error) // GetAvailableOffset issues an offset request to a specified topic and partition with a given offset time. // More on offset time here - https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetRequest GetAvailableOffset(topic string, partition int32, offsetTime OffsetTime) (int64, error) // Consume issues a single fetch request to a broker responsible for a given topic and partition and returns messages starting from a given offset. Consume(topic string, partition int32, offset int64) ([]*Message, error) // GetOffset is a part of new offset management API. Not fully functional yet. GetOffset(group string, topic string, partition int32) (int64, error) // Tells the Connector to close all existing connections and stop. // This method is NOT blocking but returns a channel which will get a single value once the closing is finished. Close() <-chan bool }
Connector is an interface that should provide ways to clearly interact with Kafka cluster and hide all broker management stuff from user.
type ConnectorConfig ¶
type ConnectorConfig struct { // BrokerList is a bootstrap list to discover other brokers in a cluster. At least one broker is required. BrokerList []string // ReadTimeout is a timeout to read the response from a TCP socket. ReadTimeout time.Duration // WriteTimeout is a timeout to write the request to a TCP socket. WriteTimeout time.Duration // ConnectTimeout is a timeout to connect to a TCP socket. ConnectTimeout time.Duration // Sets whether the connection should be kept alive. KeepAlive bool // A keep alive period for a TCP connection. KeepAliveTimeout time.Duration // Maximum number of open connections for a connector. MaxConnections int // Maximum number of open connections for a single broker for a connector. MaxConnectionsPerBroker int // Maximum fetch size in bytes which will be used in all Consume() calls. FetchSize int32 // Number of retries to get topic metadata. MetadataRetries int // Backoff value between topic metadata requests. MetadataBackoff time.Duration // Client id that will be used by a connector to identify client requests by broker. ClientId string }
ConnectorConfig is used to pass multiple configuration values for a Connector
func NewConnectorConfig ¶
func NewConnectorConfig() *ConnectorConfig
Returns a new ConnectorConfig with sane defaults.
func (*ConnectorConfig) Validate ¶
func (this *ConnectorConfig) Validate() error
Validates this ConnectorConfig. Returns a corresponding error if the ConnectorConfig is invalid and nil otherwise.
type ConsumerMetadataRequest ¶
type ConsumerMetadataRequest struct {
ConsumerGroup string
}
func NewConsumerMetadataRequest ¶
func NewConsumerMetadataRequest(group string) *ConsumerMetadataRequest
func (*ConsumerMetadataRequest) Key ¶
func (this *ConsumerMetadataRequest) Key() int16
func (*ConsumerMetadataRequest) Version ¶
func (this *ConsumerMetadataRequest) Version() int16
func (*ConsumerMetadataRequest) Write ¶
func (this *ConsumerMetadataRequest) Write(encoder Encoder)
type ConsumerMetadataResponse ¶
type ConsumerMetadataResponse struct { Error error CoordinatorId int32 CoordinatorHost string CoordinatorPort int32 }
func (*ConsumerMetadataResponse) Read ¶
func (this *ConsumerMetadataResponse) Read(decoder Decoder) *DecodingError
type CrcSlice ¶
type CrcSlice struct {
// contains filtered or unexported fields
}
CrcSlice is used to calculate the CRC32 value of the message.
func (*CrcSlice) GetPosition ¶
Update this slice. At this point all necessary data should be written to encoder.
func (*CrcSlice) GetReserveLength ¶
Returns the length to reserve for this slice.
func (*CrcSlice) SetPosition ¶
Set the current position within the encoder to be updated later.
type Decoder ¶
type Decoder interface { // Gets an int8 from this decoder. Returns EOF if end of stream is reached. GetInt8() (int8, error) // Gets an int16 from this decoder. Returns EOF if end of stream is reached. GetInt16() (int16, error) // Gets an int32 from this decoder. Returns EOF if end of stream is reached. GetInt32() (int32, error) // Gets an int64 from this decoder. Returns EOF if end of stream is reached. GetInt64() (int64, error) // Gets a []byte from this decoder. Returns EOF if end of stream is reached. GetBytes() ([]byte, error) // Gets a string from this decoder. Returns EOF if end of stream is reached. GetString() (string, error) // Tells how many bytes left unread in this decoder. Remaining() int }
Decoder is able to decode a Kafka wire protocol message into actual data.
type DecodingError ¶
type DecodingError struct {
// contains filtered or unexported fields
}
A DecodingError is an error that also holds the information about why it happened.
func NewDecodingError ¶
func NewDecodingError(err error, reason string) *DecodingError
Creates a new DecodingError with a given error message and reason.
func (*DecodingError) Error ¶
func (this *DecodingError) Error() error
Returns the error message for this DecodingError.
func (*DecodingError) Reason ¶
func (this *DecodingError) Reason() string
Returns the reason for this DecodingError.
type DefaultConnector ¶
type DefaultConnector struct {
// contains filtered or unexported fields
}
A default (and only one for now) Connector implementation for Siesta library.
func NewDefaultConnector ¶
func NewDefaultConnector(config *ConnectorConfig) (*DefaultConnector, error)
Creates a new DefaultConnector with a given ConnectorConfig. May return an error if the passed config is invalid.
func (*DefaultConnector) Close ¶
func (this *DefaultConnector) Close() <-chan bool
Tells the Connector to close all existing connections and stop. This method is NOT blocking but returns a channel which will get a single value once the closing is finished.
func (*DefaultConnector) Consume ¶
func (this *DefaultConnector) Consume(topic string, partition int32, offset int64) ([]*Message, error)
Consume issues a single fetch request to a broker responsible for a given topic and partition and returns messages starting from a given offset.
func (*DefaultConnector) GetAvailableOffset ¶
func (this *DefaultConnector) GetAvailableOffset(topic string, partition int32, offsetTime OffsetTime) (int64, error)
GetAvailableOffset issues an offset request to a specified topic and partition with a given offset time.
func (*DefaultConnector) GetOffset ¶
GetOffset is a part of new offset management API. Not fully functional yet.
func (*DefaultConnector) GetTopicMetadata ¶
func (this *DefaultConnector) GetTopicMetadata(topics []string) (*TopicMetadataResponse, error)
GetTopicMetadata is primarily used to discover leaders for given topics and how many partitions these topics have. Passing it an empty topic list will retrieve metadata for all topics in a cluster.
func (*DefaultConnector) String ¶
func (this *DefaultConnector) String() string
Returns a string representation of this DefaultConnector.
type DefaultLogger ¶
type DefaultLogger struct {
// contains filtered or unexported fields
}
Default implementation of KafkaLogger interface used in this client.
func NewDefaultLogger ¶
func NewDefaultLogger(Level LogLevel) *DefaultLogger
Creates a new DefaultLogger that is configured to write messages to console with minimum log level Level.
func (*DefaultLogger) Critical ¶
func (dl *DefaultLogger) Critical(message string, params ...interface{})
Formats a given message according to given params to log with level Critical.
func (*DefaultLogger) Debug ¶
func (dl *DefaultLogger) Debug(message string, params ...interface{})
Formats a given message according to given params to log with level Debug.
func (*DefaultLogger) Error ¶
func (dl *DefaultLogger) Error(message string, params ...interface{})
Formats a given message according to given params to log with level Error.
func (*DefaultLogger) Info ¶
func (dl *DefaultLogger) Info(message string, params ...interface{})
Formats a given message according to given params to log with level Info.
func (*DefaultLogger) Trace ¶
func (dl *DefaultLogger) Trace(message string, params ...interface{})
Formats a given message according to given params to log with level Trace.
func (*DefaultLogger) Warn ¶
func (dl *DefaultLogger) Warn(message string, params ...interface{})
Formats a given message according to given params to log with level Warn.
type Encoder ¶
type Encoder interface { // Writes an int8 to this encoder. WriteInt8(int8) // Writes an int16 to this encoder. WriteInt16(int16) // Writes an int32 to this encoder. WriteInt32(int32) // Writes an int64 to this encoder. WriteInt64(int64) // Writes a []byte to this encoder. WriteBytes([]byte) // Writes a string to this encoder. WriteString(string) // Returns the size in bytes written to this encoder. Size() int32 // Reserves a place for an updatable slice. // This is used as an optimization for length and crc fields. // The encoder reserves a place for this data and updates it later instead of pre-calculating it and doing redundant work. Reserve(UpdatableSlice) // Tells the last reserved slice to be updated with new data. UpdateReserved() }
Decoder is able to encode actual data into a Kafka wire protocol byte sequence.
type FetchRequest ¶
type FetchRequest struct { MaxWaitTime int32 MinBytes int32 RequestInfo map[string][]*PartitionFetchInfo }
func (*FetchRequest) AddFetch ¶
func (this *FetchRequest) AddFetch(topic string, partition int32, offset int64, fetchSize int32)
func (*FetchRequest) Key ¶
func (this *FetchRequest) Key() int16
func (*FetchRequest) Version ¶
func (this *FetchRequest) Version() int16
func (*FetchRequest) Write ¶
func (this *FetchRequest) Write(encoder Encoder)
type FetchResponse ¶
type FetchResponse struct {
Blocks map[string]map[int32]*FetchResponseData
}
func (*FetchResponse) GetMessages ¶
func (this *FetchResponse) GetMessages() ([]*Message, error)
func (*FetchResponse) Read ¶
func (this *FetchResponse) Read(decoder Decoder) *DecodingError
type FetchResponseData ¶
type FetchResponseData struct { Error error HighwaterMarkOffset int64 Messages []*MessageAndOffset }
func (*FetchResponseData) Read ¶
func (this *FetchResponseData) Read(decoder Decoder) *DecodingError
type FetchedOffset ¶
func (*FetchedOffset) Read ¶
func (this *FetchedOffset) Read(decoder Decoder) *DecodingError
type KafkaLogger ¶
type KafkaLogger interface { //Formats a given message according to given params to log with level Trace. Trace(message string, params ...interface{}) //Formats a given message according to given params to log with level Debug. Debug(message string, params ...interface{}) //Formats a given message according to given params to log with level Info. Info(message string, params ...interface{}) //Formats a given message according to given params to log with level Warn. Warn(message string, params ...interface{}) //Formats a given message according to given params to log with level Error. Error(message string, params ...interface{}) //Formats a given message according to given params to log with level Critical. Critical(message string, params ...interface{}) }
Logger interface. Lets you plug-in your custom logging library instead of using built-in one.
var Logger KafkaLogger = NewDefaultLogger(InfoLevel)
Logger used by this client. Defaults to build-in logger with Info log level.
type LengthSlice ¶
type LengthSlice struct {
// contains filtered or unexported fields
}
LengthSlice is used to determine the length of upcoming message.
func (*LengthSlice) GetPosition ¶
func (this *LengthSlice) GetPosition() int
Get the position within the encoder to be updated later.
func (*LengthSlice) GetReserveLength ¶
func (this *LengthSlice) GetReserveLength() int
Returns the length to reserve for this slice.
func (*LengthSlice) SetPosition ¶
func (this *LengthSlice) SetPosition(pos int)
Set the current position within the encoder to be updated later.
func (*LengthSlice) Update ¶
func (this *LengthSlice) Update(slice []byte)
Update this slice. At this point all necessary data should be written to encoder.
type LogLevel ¶
type LogLevel string
Represents a logging level
const ( //Use TraceLevel for debugging to find problems in functions, variables etc. TraceLevel LogLevel = "trace" //Use DebugLevel for detailed system reports and diagnostic messages. DebugLevel LogLevel = "debug" //Use InfoLevel for general information about a running application. InfoLevel LogLevel = "info" //Use WarnLevel to indicate small errors and failures that should not happen normally but are recovered automatically. WarnLevel LogLevel = "warn" //Use ErrorLevel to indicate severe errors that affect application workflow and are not handled automatically. ErrorLevel LogLevel = "error" //Use CriticalLevel to indicate fatal errors that may cause data corruption or loss. CriticalLevel LogLevel = "critical" )
type MessageAndOffset ¶
type MessageAndOffset struct { Offset int64 Message *MessageData }
func (*MessageAndOffset) Read ¶
func (this *MessageAndOffset) Read(decoder Decoder) *DecodingError
func (*MessageAndOffset) Write ¶
func (this *MessageAndOffset) Write(encoder Encoder)
type MessageData ¶
type MessageData struct { Crc int32 MagicByte int8 Attributes int8 Key []byte Value []byte Nested []*MessageAndOffset }
func (*MessageData) Read ¶
func (this *MessageData) Read(decoder Decoder) *DecodingError
func (*MessageData) Write ¶
func (this *MessageData) Write(encoder Encoder)
TODO compress and write if needed
type OffsetAndMetadata ¶
type OffsetCommitRequest ¶
type OffsetCommitRequest struct { ConsumerGroup string Offsets map[string]map[int32]*OffsetAndMetadata }
func NewOffsetCommitRequest ¶
func NewOffsetCommitRequest(group string) *OffsetCommitRequest
func (*OffsetCommitRequest) Key ¶
func (this *OffsetCommitRequest) Key() int16
func (*OffsetCommitRequest) Version ¶
func (this *OffsetCommitRequest) Version() int16
func (*OffsetCommitRequest) Write ¶
func (this *OffsetCommitRequest) Write(encoder Encoder)
type OffsetCommitResponse ¶
func (*OffsetCommitResponse) Read ¶
func (this *OffsetCommitResponse) Read(decoder Decoder) *DecodingError
type OffsetFetchRequest ¶
func NewOffsetFetchRequest ¶
func NewOffsetFetchRequest(group string) *OffsetFetchRequest
func (*OffsetFetchRequest) AddOffset ¶
func (this *OffsetFetchRequest) AddOffset(topic string, partition int32)
func (*OffsetFetchRequest) Key ¶
func (this *OffsetFetchRequest) Key() int16
func (*OffsetFetchRequest) Version ¶
func (this *OffsetFetchRequest) Version() int16
func (*OffsetFetchRequest) Write ¶
func (this *OffsetFetchRequest) Write(encoder Encoder)
type OffsetFetchResponse ¶
type OffsetFetchResponse struct {
Offsets map[string]map[int32]*FetchedOffset
}
func (*OffsetFetchResponse) Read ¶
func (this *OffsetFetchResponse) Read(decoder Decoder) *DecodingError
type OffsetRequest ¶
type OffsetRequest struct {
RequestInfo map[string][]*PartitionOffsetRequestInfo
}
func (*OffsetRequest) AddPartitionOffsetRequestInfo ¶
func (this *OffsetRequest) AddPartitionOffsetRequestInfo(topic string, partition int32, time OffsetTime, maxNumOffsets int32)
func (*OffsetRequest) Key ¶
func (this *OffsetRequest) Key() int16
func (*OffsetRequest) Version ¶
func (this *OffsetRequest) Version() int16
func (*OffsetRequest) Write ¶
func (this *OffsetRequest) Write(encoder Encoder)
type OffsetResponse ¶
type OffsetResponse struct {
Offsets map[string]map[int32]*PartitionOffsets
}
func (*OffsetResponse) Read ¶
func (this *OffsetResponse) Read(decoder Decoder) *DecodingError
type OffsetTime ¶
type OffsetTime int64
const EarliestTime OffsetTime = -2
const LatestTime OffsetTime = -1
type PartitionFetchInfo ¶
type PartitionMetadata ¶
type PartitionMetadata struct { Error error PartitionId int32 Leader int32 Replicas []int32 Isr []int32 }
func (*PartitionMetadata) Read ¶
func (this *PartitionMetadata) Read(decoder Decoder) *DecodingError
type PartitionOffsetRequestInfo ¶
type PartitionOffsetRequestInfo struct { Partition int32 Time OffsetTime MaxNumOffsets int32 }
type PartitionOffsets ¶
func (*PartitionOffsets) Read ¶
func (this *PartitionOffsets) Read(decoder Decoder) *DecodingError
type ProduceRequest ¶
type ProduceRequest struct { RequiredAcks int16 Timeout int32 Messages map[string]map[int32][]*MessageAndOffset }
func (*ProduceRequest) AddMessage ¶
func (this *ProduceRequest) AddMessage(topic string, partition int32, message *MessageData)
func (*ProduceRequest) Key ¶
func (this *ProduceRequest) Key() int16
func (*ProduceRequest) Version ¶
func (this *ProduceRequest) Version() int16
func (*ProduceRequest) Write ¶
func (this *ProduceRequest) Write(encoder Encoder)
type ProduceResponse ¶
type ProduceResponse struct {
Blocks map[string]map[int32]*ProduceResponseData
}
func (*ProduceResponse) Read ¶
func (this *ProduceResponse) Read(decoder Decoder) *DecodingError
type ProduceResponseData ¶
type Request ¶
type Request interface { // Writes the Request to the given Encoder. Write(Encoder) // Returns the Kafka API key for this Request. Key() int16 // Returns the Kafka request version for backwards compatibility. Version() int16 }
A generic interface for any request issued to Kafka. Must be able to identify and write itself.
type RequestWriter ¶
type RequestWriter struct {
// contains filtered or unexported fields
}
RequestWriter is used to decouple the message header/metadata writing from the actual message. It is able to accept a request and encode/write it according to Kafka Wire Protocol format adding the correlation id and client id to the request.
func NewRequestWriter ¶
func NewRequestWriter(correlationId int32, clientId string, request Request) *RequestWriter
Creates a new RequestWriter holding the correlation id, client id and the actual request.
func (*RequestWriter) Size ¶
func (this *RequestWriter) Size() int32
Returns the size in bytes needed to write this request, including the length field. This value will be used when allocating memory for a byte array.
func (*RequestWriter) Write ¶
func (this *RequestWriter) Write(encoder Encoder)
Writes itself into a given Encoder.
type Response ¶
type Response interface { // Read the Response from the given Decoder. May return a DecodingError if the response is invalid. Read(Decoder) *DecodingError }
A generic interface for any response received from Kafka. Must be able to read itself.
type SizingEncoder ¶
type SizingEncoder struct {
// contains filtered or unexported fields
}
SizingEncoder is used to determine the size for []byte that will hold the actual encoded data. This is used as an optimization as it is cheaper to run once and determine the size instead of growing the slice dynamically.
func (*SizingEncoder) Reserve ¶
func (this *SizingEncoder) Reserve(slice UpdatableSlice)
Reserves a place for an updatable slice.
func (*SizingEncoder) Size ¶
func (this *SizingEncoder) Size() int32
Returns the size in bytes written to this encoder.
func (*SizingEncoder) UpdateReserved ¶
func (this *SizingEncoder) UpdateReserved()
Tells the last reserved slice to be updated with new data.
func (*SizingEncoder) WriteBytes ¶
func (this *SizingEncoder) WriteBytes(value []byte)
Writes a []byte to this encoder.
func (*SizingEncoder) WriteInt16 ¶
func (this *SizingEncoder) WriteInt16(int16)
Writes an int16 to this encoder.
func (*SizingEncoder) WriteInt32 ¶
func (this *SizingEncoder) WriteInt32(int32)
Writes an int32 to this encoder.
func (*SizingEncoder) WriteInt64 ¶
func (this *SizingEncoder) WriteInt64(int64)
Writes an int64 to this encoder.
func (*SizingEncoder) WriteInt8 ¶
func (this *SizingEncoder) WriteInt8(int8)
Writes an int8 to this encoder.
func (*SizingEncoder) WriteString ¶
func (this *SizingEncoder) WriteString(value string)
Writes a string to this encoder.
type TopicMetadata ¶
type TopicMetadata struct { Error error TopicName string PartitionMetadata []*PartitionMetadata }
func (*TopicMetadata) Read ¶
func (this *TopicMetadata) Read(decoder Decoder) *DecodingError
type TopicMetadataRequest ¶
type TopicMetadataRequest struct {
Topics []string
}
func NewTopicMetadataRequest ¶
func NewTopicMetadataRequest(topics []string) *TopicMetadataRequest
func (*TopicMetadataRequest) Key ¶
func (this *TopicMetadataRequest) Key() int16
func (*TopicMetadataRequest) Version ¶
func (this *TopicMetadataRequest) Version() int16
func (*TopicMetadataRequest) Write ¶
func (this *TopicMetadataRequest) Write(encoder Encoder)
type TopicMetadataResponse ¶
type TopicMetadataResponse struct { Brokers []*Broker TopicMetadata []*TopicMetadata }
func (*TopicMetadataResponse) Read ¶
func (this *TopicMetadataResponse) Read(decoder Decoder) *DecodingError
type UpdatableSlice ¶
type UpdatableSlice interface { // Returns the length to reserve for this slice. GetReserveLength() int // Set the current position within the encoder to be updated later. SetPosition(int) // Get the position within the encoder to be updated later. GetPosition() int // Update this slice. At this point all necessary data should be written to encoder. Update([]byte) }
UpdatableSlice is an interface that is used when the encoder has to write the value based on bytes that are not yet written (e.g. calculate the CRC of the message).