Documentation ¶
Overview ¶
Package sarama is a pure Go client library for dealing with Apache Kafka (versions 0.8 and later). It includes a high-level API for easily producing and consuming messages, and a low-level API for controlling bytes on the wire when the high-level API is insufficient. Usage examples for the high-level APIs are provided inline with their full documentation.
To produce messages, use either the AsyncProducer or the SyncProducer. The AsyncProducer accepts messages on a channel and produces them asynchronously in the background as efficiently as possible; it is preferred in most cases. The SyncProducer provides a method which will block until Kafka acknowledges the message as produced. This can be useful but comes with two caveats: it will generally be less efficient, and the actual durability guarantees depend on the configured value of `Producer.RequiredAcks`. There are configurations where a message acknowledged by the SyncProducer can still sometimes be lost.
To consume messages, use the Consumer. Note that Sarama's Consumer implementation does not currently support automatic consumer-group rebalancing and offset tracking. For Zookeeper-based tracking (Kafka 0.8.2 and earlier), the https://github.com/wvanbergen/kafka library builds on Sarama to add this support. For Kafka-based tracking (Kafka 0.9 and later), the https://github.com/bsm/sarama-cluster library builds on Sarama to add this support.
For lower-level needs, the Broker and Request/Response objects permit precise control over each connection and message sent on the wire; the Client provides higher-level metadata management that is shared between the producers and the consumer. The Request/Response objects and properties are mostly undocumented, as they line up exactly with the protocol fields documented by Kafka at https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
Metrics are exposed through https://github.com/rcrowley/go-metrics library in a local registry.
Broker related metrics:
+----------------------------------------------+------------+---------------------------------------------------------------+ | Name | Type | Description | +----------------------------------------------+------------+---------------------------------------------------------------+ | incoming-byte-rate | meter | Bytes/second read off all brokers | | incoming-byte-rate-for-broker-<broker-id> | meter | Bytes/second read off a given broker | | outgoing-byte-rate | meter | Bytes/second written off all brokers | | outgoing-byte-rate-for-broker-<broker-id> | meter | Bytes/second written off a given broker | | request-rate | meter | Requests/second sent to all brokers | | request-rate-for-broker-<broker-id> | meter | Requests/second sent to a given broker | | request-size | histogram | Distribution of the request size in bytes for all brokers | | request-size-for-broker-<broker-id> | histogram | Distribution of the request size in bytes for a given broker | | request-latency-in-ms | histogram | Distribution of the request latency in ms for all brokers | | request-latency-in-ms-for-broker-<broker-id> | histogram | Distribution of the request latency in ms for a given broker | | response-rate | meter | Responses/second received from all brokers | | response-rate-for-broker-<broker-id> | meter | Responses/second received from a given broker | | response-size | histogram | Distribution of the response size in bytes for all brokers | | response-size-for-broker-<broker-id> | histogram | Distribution of the response size in bytes for a given broker | +----------------------------------------------+------------+---------------------------------------------------------------+
Note that we do not gather specific metrics for seed brokers but they are part of the "all brokers" metrics.
Producer related metrics:
+-------------------------------------------+------------+--------------------------------------------------------------------------------------+ | Name | Type | Description | +-------------------------------------------+------------+--------------------------------------------------------------------------------------+ | batch-size | histogram | Distribution of the number of bytes sent per partition per request for all topics | | batch-size-for-topic-<topic> | histogram | Distribution of the number of bytes sent per partition per request for a given topic | | record-send-rate | meter | Records/second sent to all topics | | record-send-rate-for-topic-<topic> | meter | Records/second sent to a given topic | | records-per-request | histogram | Distribution of the number of records sent per request for all topics | | records-per-request-for-topic-<topic> | histogram | Distribution of the number of records sent per request for a given topic | | compression-ratio | histogram | Distribution of the compression ratio times 100 of record batches for all topics | | compression-ratio-for-topic-<topic> | histogram | Distribution of the compression ratio times 100 of record batches for a given topic | +-------------------------------------------+------------+--------------------------------------------------------------------------------------+
Index ¶
- Constants
- Variables
- func Decode(buf []byte, body VersionedDecoder) error
- func DecodeHeader(buf []byte) (int32, int32, error)
- func Encode(correlationID int32, clientID string, body ProtocolBody) ([]byte, error)
- type ApiVersionsRequest
- type ApiVersionsResponse
- type ApiVersionsResponseBlock
- type Broker
- type ByteEncoder
- type CompressionCodec
- type ConfigurationError
- type ConsumerGroupMemberAssignment
- type ConsumerGroupMemberMetadata
- type ConsumerMetadataRequest
- type ConsumerMetadataResponse
- type DescribeGroupsRequest
- type DescribeGroupsResponse
- type Encoder
- type FetchRequest
- type FetchResponse
- type FetchResponseBlock
- type GroupDescription
- type GroupMemberDescription
- type HeartbeatRequest
- type HeartbeatResponse
- type JoinGroupRequest
- type JoinGroupResponse
- type KError
- type KafkaVersion
- type LeaveGroupRequest
- type LeaveGroupResponse
- type ListGroupsRequest
- type ListGroupsResponse
- type Message
- type MessageBlock
- type MessageSet
- type MetadataRequest
- type MetadataResponse
- type OffsetCommitRequest
- type OffsetCommitResponse
- type OffsetFetchRequest
- type OffsetFetchResponse
- type OffsetFetchResponseBlock
- type OffsetRequest
- type OffsetResponse
- type OffsetResponseBlock
- type PacketDecodingError
- type PacketEncodingError
- type PartitionMetadata
- type ProduceRequest
- type ProduceResponse
- type ProduceResponseBlock
- type ProtocolBody
- type RequiredAcks
- type SaslHandshakeRequest
- type SaslHandshakeResponse
- type StdLogger
- type StringEncoder
- type SyncGroupRequest
- type SyncGroupResponse
- type TopicMetadata
- type VersionedDecoder
Constants ¶
const GroupGenerationUndefined = -1
GroupGenerationUndefined is a special value for the group generation field of Offset Commit Requests that should be used when a consumer group does not rely on Kafka for partition management.
const ReceiveTime int64 = -1
ReceiveTime is a special value for the timestamp field of Offset Commit Requests which tells the broker to set the timestamp to the time at which the request was received. The timestamp is only used if message version 1 is used, which requires kafka 0.8.2.
Variables ¶
var ( V0_8_2_0 = newKafkaVersion(0, 8, 2, 0) V0_8_2_1 = newKafkaVersion(0, 8, 2, 1) V0_8_2_2 = newKafkaVersion(0, 8, 2, 2) V0_9_0_0 = newKafkaVersion(0, 9, 0, 0) V0_9_0_1 = newKafkaVersion(0, 9, 0, 1) V0_10_0_0 = newKafkaVersion(0, 10, 0, 0) V0_10_0_1 = newKafkaVersion(0, 10, 0, 1) V0_10_1_0 = newKafkaVersion(0, 10, 1, 0) )
Effective constants defining the supported kafka versions.
var ErrAlreadyConnected = errors.New("kafka: broker connection already initiated")
ErrAlreadyConnected is the error returned when calling Open() on a Broker that is already connected or connecting.
var ErrClosedClient = errors.New("kafka: tried to use a client that was closed")
ErrClosedClient is the error returned when a method is called on a client that has been closed.
var ErrIncompleteResponse = errors.New("kafka: response did not contain all the expected topic/partition blocks")
ErrIncompleteResponse is the error returned when the server returns a syntactically valid response, but it does not contain the expected information.
var ErrInsufficientData = errors.New("kafka: insufficient data to decode packet, more bytes expected")
ErrInsufficientData is returned when decoding and the packet is truncated. This can be expected when requesting messages, since as an optimization the server is allowed to return a partial message at the end of the message set.
var ErrInvalidPartition = errors.New("kafka: partitioner returned an invalid partition index")
ErrInvalidPartition is the error returned when a partitioner returns an invalid partition index (meaning one outside of the range [0...numPartitions-1]).
var ErrMessageTooLarge = errors.New("kafka: message is larger than Consumer.Fetch.Max")
ErrMessageTooLarge is returned when the next message to consume is larger than the configured Consumer.Fetch.Max
var ErrNotConnected = errors.New("kafka: broker not connected")
ErrNotConnected is the error returned when trying to send or call Close() on a Broker that is not connected.
var ErrOutOfBrokers = errors.New("kafka: client has run out of available brokers to talk to (Is your cluster reachable?)")
ErrOutOfBrokers is the error returned when the client has run out of brokers to talk to because all of them errored or otherwise failed to respond.
var ErrShuttingDown = errors.New("kafka: message received by producer in process of shutting down")
ErrShuttingDown is returned when a producer receives a message during shutdown.
var MaxRequestSize int32 = 100 * 1024 * 1024
MaxRequestSize is the maximum size (in bytes) of any request that Sarama will attempt to send. Trying to send a request larger than this will result in an PacketEncodingError. The default of 100 MiB is aligned with Kafka's default `socket.request.max.bytes`, which is the largest request the broker will attempt to process.
var MaxResponseSize int32 = 100 * 1024 * 1024
MaxResponseSize is the maximum size (in bytes) of any response that Sarama will attempt to parse. If a broker returns a response message larger than this value, Sarama will return a PacketDecodingError to protect the client from running out of memory. Please note that brokers do not have any natural limit on the size of responses they send. In particular, they can send arbitrarily large fetch responses to consumers (see https://issues.apache.org/jira/browse/KAFKA-2063).
var PanicHandler func(interface{})
PanicHandler is called for recovering from panics spawned internally to the library (and thus not recoverable by the caller's goroutine). Defaults to nil, which means panics are not recovered.
Functions ¶
func Decode ¶
func Decode(buf []byte, body VersionedDecoder) error
Types ¶
type ApiVersionsRequest ¶
type ApiVersionsRequest struct { }
type ApiVersionsResponse ¶
type ApiVersionsResponse struct { Err KError ApiVersions []*ApiVersionsResponseBlock }
type Broker ¶
type Broker struct {
// contains filtered or unexported fields
}
type ByteEncoder ¶
type ByteEncoder []byte
ByteEncoder implements the Encoder interface for Go byte slices so that they can be used as the Key or Value in a ProducerMessage.
func (ByteEncoder) Encode ¶
func (b ByteEncoder) Encode() ([]byte, error)
func (ByteEncoder) Length ¶
func (b ByteEncoder) Length() int
type CompressionCodec ¶
type CompressionCodec int8
CompressionCodec represents the various compression codecs recognized by Kafka in messages.
const ( CompressionNone CompressionCodec = 0 CompressionGZIP CompressionCodec = 1 CompressionSnappy CompressionCodec = 2 CompressionLZ4 CompressionCodec = 3 )
type ConfigurationError ¶
type ConfigurationError string
ConfigurationError is the type of error returned from a constructor (e.g. NewClient, or NewConsumer) when the specified configuration is invalid.
func (ConfigurationError) Error ¶
func (err ConfigurationError) Error() string
type ConsumerMetadataRequest ¶
type ConsumerMetadataRequest struct {
ConsumerGroup string
}
type DescribeGroupsRequest ¶
type DescribeGroupsRequest struct {
Groups []string
}
func (*DescribeGroupsRequest) AddGroup ¶
func (r *DescribeGroupsRequest) AddGroup(group string)
type DescribeGroupsResponse ¶
type DescribeGroupsResponse struct {
Groups []*GroupDescription
}
type Encoder ¶
Encoder is a simple interface for any type that can be encoded as an array of bytes in order to be sent as the key or value of a Kafka message. Length() is provided as an optimization, and must return the same as len() on the result of Encode().
type FetchRequest ¶
type FetchResponse ¶
type FetchResponse struct { Blocks map[string]map[int32]*FetchResponseBlock ThrottleTime time.Duration Version int16 // v1 requires 0.9+, v2 requires 0.10+ }
func (*FetchResponse) AddError ¶
func (r *FetchResponse) AddError(topic string, partition int32, err KError)
func (*FetchResponse) AddMessage ¶
func (r *FetchResponse) AddMessage(topic string, partition int32, key, value Encoder, offset int64)
func (*FetchResponse) GetBlock ¶
func (r *FetchResponse) GetBlock(topic string, partition int32) *FetchResponseBlock
type FetchResponseBlock ¶
type FetchResponseBlock struct { Err KError HighWaterMarkOffset int64 MsgSet MessageSet }
type GroupDescription ¶
type GroupMemberDescription ¶
type GroupMemberDescription struct { ClientId string ClientHost string MemberMetadata []byte MemberAssignment []byte }
func (*GroupMemberDescription) GetMemberAssignment ¶
func (gmd *GroupMemberDescription) GetMemberAssignment() (*ConsumerGroupMemberAssignment, error)
func (*GroupMemberDescription) GetMemberMetadata ¶
func (gmd *GroupMemberDescription) GetMemberMetadata() (*ConsumerGroupMemberMetadata, error)
type HeartbeatRequest ¶
type HeartbeatResponse ¶
type HeartbeatResponse struct {
Err KError
}
type JoinGroupRequest ¶
type JoinGroupRequest struct { GroupId string SessionTimeout int32 MemberId string ProtocolType string GroupProtocols map[string][]byte }
func (*JoinGroupRequest) AddGroupProtocol ¶
func (r *JoinGroupRequest) AddGroupProtocol(name string, metadata []byte)
func (*JoinGroupRequest) AddGroupProtocolMetadata ¶
func (r *JoinGroupRequest) AddGroupProtocolMetadata(name string, metadata *ConsumerGroupMemberMetadata) error
type JoinGroupResponse ¶
type JoinGroupResponse struct { Err KError GenerationId int32 GroupProtocol string LeaderId string MemberId string Members map[string][]byte }
func (*JoinGroupResponse) GetMembers ¶
func (r *JoinGroupResponse) GetMembers() (map[string]ConsumerGroupMemberMetadata, error)
type KError ¶
type KError int16
KError is the type of error that can be returned directly by the Kafka broker. See https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ErrorCodes
const ( ErrNoError KError = 0 ErrUnknown KError = -1 ErrOffsetOutOfRange KError = 1 ErrInvalidMessage KError = 2 ErrUnknownTopicOrPartition KError = 3 ErrInvalidMessageSize KError = 4 ErrLeaderNotAvailable KError = 5 ErrNotLeaderForPartition KError = 6 ErrRequestTimedOut KError = 7 ErrBrokerNotAvailable KError = 8 ErrReplicaNotAvailable KError = 9 ErrMessageSizeTooLarge KError = 10 ErrStaleControllerEpochCode KError = 11 ErrOffsetMetadataTooLarge KError = 12 ErrNetworkException KError = 13 ErrOffsetsLoadInProgress KError = 14 ErrConsumerCoordinatorNotAvailable KError = 15 ErrNotCoordinatorForConsumer KError = 16 ErrInvalidTopic KError = 17 ErrMessageSetSizeTooLarge KError = 18 ErrNotEnoughReplicas KError = 19 ErrNotEnoughReplicasAfterAppend KError = 20 ErrInvalidRequiredAcks KError = 21 ErrIllegalGeneration KError = 22 ErrInconsistentGroupProtocol KError = 23 ErrInvalidGroupId KError = 24 ErrUnknownMemberId KError = 25 ErrInvalidSessionTimeout KError = 26 ErrRebalanceInProgress KError = 27 ErrInvalidCommitOffsetSize KError = 28 ErrTopicAuthorizationFailed KError = 29 ErrGroupAuthorizationFailed KError = 30 ErrClusterAuthorizationFailed KError = 31 ErrInvalidTimestamp KError = 32 ErrUnsupportedSASLMechanism KError = 33 ErrIllegalSASLState KError = 34 ErrUnsupportedVersion KError = 35 ErrUnsupportedForMessageFormat KError = 43 )
Numeric error codes returned by the Kafka server.
type KafkaVersion ¶
type KafkaVersion struct {
// contains filtered or unexported fields
}
KafkaVersion instances represent versions of the upstream Kafka broker.
func (KafkaVersion) IsAtLeast ¶
func (v KafkaVersion) IsAtLeast(other KafkaVersion) bool
IsAtLeast return true if and only if the version it is called on is greater than or equal to the version passed in:
V1.IsAtLeast(V2) // false V2.IsAtLeast(V1) // true
type LeaveGroupRequest ¶
type LeaveGroupResponse ¶
type LeaveGroupResponse struct {
Err KError
}
type ListGroupsRequest ¶
type ListGroupsRequest struct { }
type ListGroupsResponse ¶
type Message ¶
type Message struct { Codec CompressionCodec // codec used to compress the message contents Key []byte // the message key, may be nil Value []byte // the message contents Set *MessageSet // the message set a message might wrap Version int8 // v1 requires Kafka 0.10 Timestamp time.Time // the timestamp of the message (version 1+ only) // contains filtered or unexported fields }
type MessageBlock ¶
func (*MessageBlock) Messages ¶
func (msb *MessageBlock) Messages() []*MessageBlock
Messages convenience helper which returns either all the messages that are wrapped in this block
type MessageSet ¶
type MessageSet struct { PartialTrailingMessage bool // whether the set on the wire contained an incomplete trailing MessageBlock Messages []*MessageBlock }
func (*MessageSet) AddMessage ¶
func (ms *MessageSet) AddMessage(msg *Message)
func (*MessageSet) AllMessages ¶
func (ms *MessageSet) AllMessages() []*Message
type MetadataRequest ¶
type MetadataRequest struct {
Topics []string
}
type MetadataResponse ¶
type MetadataResponse struct { Brokers []*Broker Topics []*TopicMetadata }
func (*MetadataResponse) AddBroker ¶
func (r *MetadataResponse) AddBroker(addr string, id int32)
func (*MetadataResponse) AddTopic ¶
func (r *MetadataResponse) AddTopic(topic string, err KError) *TopicMetadata
func (*MetadataResponse) AddTopicPartition ¶
func (r *MetadataResponse) AddTopicPartition(topic string, partition, brokerID int32, replicas, isr []int32, err KError)
type OffsetCommitRequest ¶
type OffsetCommitRequest struct { ConsumerGroup string ConsumerGroupGeneration int32 // v1 or later ConsumerID string // v1 or later RetentionTime int64 // v2 or later // Version can be: // - 0 (kafka 0.8.1 and later) // - 1 (kafka 0.8.2 and later) // - 2 (kafka 0.9.0 and later) Version int16 // contains filtered or unexported fields }
type OffsetCommitResponse ¶
type OffsetFetchRequest ¶
type OffsetFetchRequest struct { ConsumerGroup string Version int16 // contains filtered or unexported fields }
func (*OffsetFetchRequest) AddPartition ¶
func (r *OffsetFetchRequest) AddPartition(topic string, partitionID int32)
type OffsetFetchResponse ¶
type OffsetFetchResponse struct {
Blocks map[string]map[int32]*OffsetFetchResponseBlock
}
func (*OffsetFetchResponse) AddBlock ¶
func (r *OffsetFetchResponse) AddBlock(topic string, partition int32, block *OffsetFetchResponseBlock)
func (*OffsetFetchResponse) GetBlock ¶
func (r *OffsetFetchResponse) GetBlock(topic string, partition int32) *OffsetFetchResponseBlock
type OffsetRequest ¶
type OffsetRequest struct { Version int16 // contains filtered or unexported fields }
type OffsetResponse ¶
type OffsetResponse struct { Version int16 Blocks map[string]map[int32]*OffsetResponseBlock }
func (*OffsetResponse) AddTopicPartition ¶
func (r *OffsetResponse) AddTopicPartition(topic string, partition int32, offset int64)
func (*OffsetResponse) GetBlock ¶
func (r *OffsetResponse) GetBlock(topic string, partition int32) *OffsetResponseBlock
type OffsetResponseBlock ¶
type PacketDecodingError ¶
type PacketDecodingError struct {
Info string
}
PacketDecodingError is returned when there was an error (other than truncated data) decoding the Kafka broker's response. This can be a bad CRC or length field, or any other invalid value.
func (PacketDecodingError) Error ¶
func (err PacketDecodingError) Error() string
type PacketEncodingError ¶
type PacketEncodingError struct {
Info string
}
PacketEncodingError is returned from a failure while encoding a Kafka packet. This can happen, for example, if you try to encode a string over 2^15 characters in length, since Kafka's encoding rules do not permit that.
func (PacketEncodingError) Error ¶
func (err PacketEncodingError) Error() string
type PartitionMetadata ¶
type ProduceRequest ¶
type ProduceRequest struct { RequiredAcks RequiredAcks Timeout int32 Version int16 // v1 requires Kafka 0.9, v2 requires Kafka 0.10 // contains filtered or unexported fields }
func (*ProduceRequest) AddMessage ¶
func (r *ProduceRequest) AddMessage(topic string, partition int32, msg *Message)
func (*ProduceRequest) AddSet ¶
func (r *ProduceRequest) AddSet(topic string, partition int32, set *MessageSet)
type ProduceResponse ¶
type ProduceResponse struct { Blocks map[string]map[int32]*ProduceResponseBlock Version int16 ThrottleTime time.Duration // only provided if Version >= 1 }
func (*ProduceResponse) AddTopicPartition ¶
func (r *ProduceResponse) AddTopicPartition(topic string, partition int32, err KError)
func (*ProduceResponse) GetBlock ¶
func (r *ProduceResponse) GetBlock(topic string, partition int32) *ProduceResponseBlock
type ProduceResponseBlock ¶
type ProtocolBody ¶
type ProtocolBody interface { VersionedDecoder // contains filtered or unexported methods }
type RequiredAcks ¶
type RequiredAcks int16
RequiredAcks is used in Produce Requests to tell the broker how many replica acknowledgements it must see before responding. Any of the constants defined here are valid. On broker versions prior to 0.8.2.0 any other positive int16 is also valid (the broker will wait for that many acknowledgements) but in 0.8.2.0 and later this will raise an exception (it has been replaced by setting the `min.isr` value in the brokers configuration).
const ( // NoResponse doesn't send any response, the TCP ACK is all you get. NoResponse RequiredAcks = 0 // WaitForLocal waits for only the local commit to succeed before responding. WaitForLocal RequiredAcks = 1 // WaitForAll waits for all in-sync replicas to commit before responding. // The minimum number of in-sync replicas is configured on the broker via // the `min.insync.replicas` configuration key. WaitForAll RequiredAcks = -1 )
type SaslHandshakeRequest ¶
type SaslHandshakeRequest struct {
Mechanism string
}
type SaslHandshakeResponse ¶
type StdLogger ¶
type StdLogger interface { Print(v ...interface{}) Printf(format string, v ...interface{}) Println(v ...interface{}) }
StdLogger is used to log error messages.
type StringEncoder ¶
type StringEncoder string
StringEncoder implements the Encoder interface for Go strings so that they can be used as the Key or Value in a ProducerMessage.
func (StringEncoder) Encode ¶
func (s StringEncoder) Encode() ([]byte, error)
func (StringEncoder) Length ¶
func (s StringEncoder) Length() int
type SyncGroupRequest ¶
type SyncGroupRequest struct { GroupId string GenerationId int32 MemberId string GroupAssignments map[string][]byte }
func (*SyncGroupRequest) AddGroupAssignment ¶
func (r *SyncGroupRequest) AddGroupAssignment(memberId string, memberAssignment []byte)
func (*SyncGroupRequest) AddGroupAssignmentMember ¶
func (r *SyncGroupRequest) AddGroupAssignmentMember(memberId string, memberAssignment *ConsumerGroupMemberAssignment) error
type SyncGroupResponse ¶
func (*SyncGroupResponse) GetMemberAssignment ¶
func (r *SyncGroupResponse) GetMemberAssignment() (*ConsumerGroupMemberAssignment, error)
type TopicMetadata ¶
type TopicMetadata struct { Err KError Name string Partitions []*PartitionMetadata }
type VersionedDecoder ¶
type VersionedDecoder interface {
// contains filtered or unexported methods
}
Source Files ¶
- api_versions_request.go
- api_versions_response.go
- broker.go
- consumer_group_members.go
- consumer_metadata_request.go
- consumer_metadata_response.go
- crc32_field.go
- describe_groups_request.go
- describe_groups_response.go
- encoder_decoder.go
- errors.go
- fetch_request.go
- fetch_response.go
- heartbeat_request.go
- heartbeat_response.go
- join_group_request.go
- join_group_response.go
- leave_group_request.go
- leave_group_response.go
- length_field.go
- list_groups_request.go
- list_groups_response.go
- message.go
- message_set.go
- metadata_request.go
- metadata_response.go
- metrics.go
- mqxx_bridge.go
- offset_commit_request.go
- offset_commit_response.go
- offset_fetch_request.go
- offset_fetch_response.go
- offset_request.go
- offset_response.go
- packet_decoder.go
- packet_encoder.go
- prep_encoder.go
- produce_request.go
- produce_response.go
- real_decoder.go
- real_encoder.go
- request.go
- response_header.go
- sarama.go
- sasl_handshake_request.go
- sasl_handshake_response.go
- sync_group_request.go
- sync_group_response.go
- utils.go