sarama

package
v0.0.0-...-1efbfc0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 6, 2017 License: MIT Imports: 19 Imported by: 0

README

a stripped down version of github.com/Shopify/sarama

Documentation

Overview

Package sarama is a pure Go client library for dealing with Apache Kafka (versions 0.8 and later). It includes a high-level API for easily producing and consuming messages, and a low-level API for controlling bytes on the wire when the high-level API is insufficient. Usage examples for the high-level APIs are provided inline with their full documentation.

To produce messages, use either the AsyncProducer or the SyncProducer. The AsyncProducer accepts messages on a channel and produces them asynchronously in the background as efficiently as possible; it is preferred in most cases. The SyncProducer provides a method which will block until Kafka acknowledges the message as produced. This can be useful but comes with two caveats: it will generally be less efficient, and the actual durability guarantees depend on the configured value of `Producer.RequiredAcks`. There are configurations where a message acknowledged by the SyncProducer can still sometimes be lost.

To consume messages, use the Consumer. Note that Sarama's Consumer implementation does not currently support automatic consumer-group rebalancing and offset tracking. For Zookeeper-based tracking (Kafka 0.8.2 and earlier), the https://github.com/wvanbergen/kafka library builds on Sarama to add this support. For Kafka-based tracking (Kafka 0.9 and later), the https://github.com/bsm/sarama-cluster library builds on Sarama to add this support.

For lower-level needs, the Broker and Request/Response objects permit precise control over each connection and message sent on the wire; the Client provides higher-level metadata management that is shared between the producers and the consumer. The Request/Response objects and properties are mostly undocumented, as they line up exactly with the protocol fields documented by Kafka at https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol

Metrics are exposed through https://github.com/rcrowley/go-metrics library in a local registry.

Broker related metrics:

+----------------------------------------------+------------+---------------------------------------------------------------+
| Name                                         | Type       | Description                                                   |
+----------------------------------------------+------------+---------------------------------------------------------------+
| incoming-byte-rate                           | meter      | Bytes/second read off all brokers                             |
| incoming-byte-rate-for-broker-<broker-id>    | meter      | Bytes/second read off a given broker                          |
| outgoing-byte-rate                           | meter      | Bytes/second written off all brokers                          |
| outgoing-byte-rate-for-broker-<broker-id>    | meter      | Bytes/second written off a given broker                       |
| request-rate                                 | meter      | Requests/second sent to all brokers                           |
| request-rate-for-broker-<broker-id>          | meter      | Requests/second sent to a given broker                        |
| request-size                                 | histogram  | Distribution of the request size in bytes for all brokers     |
| request-size-for-broker-<broker-id>          | histogram  | Distribution of the request size in bytes for a given broker  |
| request-latency-in-ms                        | histogram  | Distribution of the request latency in ms for all brokers     |
| request-latency-in-ms-for-broker-<broker-id> | histogram  | Distribution of the request latency in ms for a given broker  |
| response-rate                                | meter      | Responses/second received from all brokers                    |
| response-rate-for-broker-<broker-id>         | meter      | Responses/second received from a given broker                 |
| response-size                                | histogram  | Distribution of the response size in bytes for all brokers    |
| response-size-for-broker-<broker-id>         | histogram  | Distribution of the response size in bytes for a given broker |
+----------------------------------------------+------------+---------------------------------------------------------------+

Note that we do not gather specific metrics for seed brokers but they are part of the "all brokers" metrics.

Producer related metrics:

+-------------------------------------------+------------+--------------------------------------------------------------------------------------+
| Name                                      | Type       | Description                                                                          |
+-------------------------------------------+------------+--------------------------------------------------------------------------------------+
| batch-size                                | histogram  | Distribution of the number of bytes sent per partition per request for all topics    |
| batch-size-for-topic-<topic>              | histogram  | Distribution of the number of bytes sent per partition per request for a given topic |
| record-send-rate                          | meter      | Records/second sent to all topics                                                    |
| record-send-rate-for-topic-<topic>        | meter      | Records/second sent to a given topic                                                 |
| records-per-request                       | histogram  | Distribution of the number of records sent per request for all topics                |
| records-per-request-for-topic-<topic>     | histogram  | Distribution of the number of records sent per request for a given topic             |
| compression-ratio                         | histogram  | Distribution of the compression ratio times 100 of record batches for all topics     |
| compression-ratio-for-topic-<topic>       | histogram  | Distribution of the compression ratio times 100 of record batches for a given topic  |
+-------------------------------------------+------------+--------------------------------------------------------------------------------------+

Index

Constants

View Source
const GroupGenerationUndefined = -1

GroupGenerationUndefined is a special value for the group generation field of Offset Commit Requests that should be used when a consumer group does not rely on Kafka for partition management.

View Source
const ReceiveTime int64 = -1

ReceiveTime is a special value for the timestamp field of Offset Commit Requests which tells the broker to set the timestamp to the time at which the request was received. The timestamp is only used if message version 1 is used, which requires kafka 0.8.2.

Variables

View Source
var (
	V0_8_2_0  = newKafkaVersion(0, 8, 2, 0)
	V0_8_2_1  = newKafkaVersion(0, 8, 2, 1)
	V0_8_2_2  = newKafkaVersion(0, 8, 2, 2)
	V0_9_0_0  = newKafkaVersion(0, 9, 0, 0)
	V0_9_0_1  = newKafkaVersion(0, 9, 0, 1)
	V0_10_0_0 = newKafkaVersion(0, 10, 0, 0)
	V0_10_0_1 = newKafkaVersion(0, 10, 0, 1)
	V0_10_1_0 = newKafkaVersion(0, 10, 1, 0)
)

Effective constants defining the supported kafka versions.

View Source
var ErrAlreadyConnected = errors.New("kafka: broker connection already initiated")

ErrAlreadyConnected is the error returned when calling Open() on a Broker that is already connected or connecting.

View Source
var ErrClosedClient = errors.New("kafka: tried to use a client that was closed")

ErrClosedClient is the error returned when a method is called on a client that has been closed.

View Source
var ErrIncompleteResponse = errors.New("kafka: response did not contain all the expected topic/partition blocks")

ErrIncompleteResponse is the error returned when the server returns a syntactically valid response, but it does not contain the expected information.

View Source
var ErrInsufficientData = errors.New("kafka: insufficient data to decode packet, more bytes expected")

ErrInsufficientData is returned when decoding and the packet is truncated. This can be expected when requesting messages, since as an optimization the server is allowed to return a partial message at the end of the message set.

View Source
var ErrInvalidPartition = errors.New("kafka: partitioner returned an invalid partition index")

ErrInvalidPartition is the error returned when a partitioner returns an invalid partition index (meaning one outside of the range [0...numPartitions-1]).

View Source
var ErrMessageTooLarge = errors.New("kafka: message is larger than Consumer.Fetch.Max")

ErrMessageTooLarge is returned when the next message to consume is larger than the configured Consumer.Fetch.Max

View Source
var ErrNotConnected = errors.New("kafka: broker not connected")

ErrNotConnected is the error returned when trying to send or call Close() on a Broker that is not connected.

View Source
var ErrOutOfBrokers = errors.New("kafka: client has run out of available brokers to talk to (Is your cluster reachable?)")

ErrOutOfBrokers is the error returned when the client has run out of brokers to talk to because all of them errored or otherwise failed to respond.

View Source
var ErrShuttingDown = errors.New("kafka: message received by producer in process of shutting down")

ErrShuttingDown is returned when a producer receives a message during shutdown.

View Source
var MaxRequestSize int32 = 100 * 1024 * 1024

MaxRequestSize is the maximum size (in bytes) of any request that Sarama will attempt to send. Trying to send a request larger than this will result in an PacketEncodingError. The default of 100 MiB is aligned with Kafka's default `socket.request.max.bytes`, which is the largest request the broker will attempt to process.

View Source
var MaxResponseSize int32 = 100 * 1024 * 1024

MaxResponseSize is the maximum size (in bytes) of any response that Sarama will attempt to parse. If a broker returns a response message larger than this value, Sarama will return a PacketDecodingError to protect the client from running out of memory. Please note that brokers do not have any natural limit on the size of responses they send. In particular, they can send arbitrarily large fetch responses to consumers (see https://issues.apache.org/jira/browse/KAFKA-2063).

View Source
var PanicHandler func(interface{})

PanicHandler is called for recovering from panics spawned internally to the library (and thus not recoverable by the caller's goroutine). Defaults to nil, which means panics are not recovered.

Functions

func Decode

func Decode(buf []byte, body VersionedDecoder) error

func DecodeHeader

func DecodeHeader(buf []byte) (int32, int32, error)

func Encode

func Encode(correlationID int32, clientID string, body ProtocolBody) ([]byte, error)

TODO: reuse buffer

Types

type ApiVersionsRequest

type ApiVersionsRequest struct {
}

type ApiVersionsResponse

type ApiVersionsResponse struct {
	Err         KError
	ApiVersions []*ApiVersionsResponseBlock
}

type ApiVersionsResponseBlock

type ApiVersionsResponseBlock struct {
	ApiKey     int16
	MinVersion int16
	MaxVersion int16
}

type Broker

type Broker struct {
	// contains filtered or unexported fields
}

func (*Broker) Addr

func (b *Broker) Addr() string

Addr returns the broker address as either retrieved from Kafka's metadata or passed to NewBroker.

func (*Broker) ID

func (b *Broker) ID() int32

ID returns the broker ID retrieved from Kafka's metadata, or -1 if that is not known.

type ByteEncoder

type ByteEncoder []byte

ByteEncoder implements the Encoder interface for Go byte slices so that they can be used as the Key or Value in a ProducerMessage.

func (ByteEncoder) Encode

func (b ByteEncoder) Encode() ([]byte, error)

func (ByteEncoder) Length

func (b ByteEncoder) Length() int

type CompressionCodec

type CompressionCodec int8

CompressionCodec represents the various compression codecs recognized by Kafka in messages.

const (
	CompressionNone   CompressionCodec = 0
	CompressionGZIP   CompressionCodec = 1
	CompressionSnappy CompressionCodec = 2
	CompressionLZ4    CompressionCodec = 3
)

type ConfigurationError

type ConfigurationError string

ConfigurationError is the type of error returned from a constructor (e.g. NewClient, or NewConsumer) when the specified configuration is invalid.

func (ConfigurationError) Error

func (err ConfigurationError) Error() string

type ConsumerGroupMemberAssignment

type ConsumerGroupMemberAssignment struct {
	Version  int16
	Topics   map[string][]int32
	UserData []byte
}

type ConsumerGroupMemberMetadata

type ConsumerGroupMemberMetadata struct {
	Version  int16
	Topics   []string
	UserData []byte
}

type ConsumerMetadataRequest

type ConsumerMetadataRequest struct {
	ConsumerGroup string
}

type ConsumerMetadataResponse

type ConsumerMetadataResponse struct {
	Err             KError
	Coordinator     *Broker
	CoordinatorID   int32  // deprecated: use Coordinator.ID()
	CoordinatorHost string // deprecated: use Coordinator.Addr()
	CoordinatorPort int32  // deprecated: use Coordinator.Addr()
}

type DescribeGroupsRequest

type DescribeGroupsRequest struct {
	Groups []string
}

func (*DescribeGroupsRequest) AddGroup

func (r *DescribeGroupsRequest) AddGroup(group string)

type DescribeGroupsResponse

type DescribeGroupsResponse struct {
	Groups []*GroupDescription
}

type Encoder

type Encoder interface {
	Encode() ([]byte, error)
	Length() int
}

Encoder is a simple interface for any type that can be encoded as an array of bytes in order to be sent as the key or value of a Kafka message. Length() is provided as an optimization, and must return the same as len() on the result of Encode().

type FetchRequest

type FetchRequest struct {
	MaxWaitTime int32
	MinBytes    int32
	Version     int16
	// contains filtered or unexported fields
}

func (*FetchRequest) AddBlock

func (r *FetchRequest) AddBlock(topic string, partitionID int32, fetchOffset int64, maxBytes int32)

type FetchResponse

type FetchResponse struct {
	Blocks       map[string]map[int32]*FetchResponseBlock
	ThrottleTime time.Duration
	Version      int16 // v1 requires 0.9+, v2 requires 0.10+
}

func (*FetchResponse) AddError

func (r *FetchResponse) AddError(topic string, partition int32, err KError)

func (*FetchResponse) AddMessage

func (r *FetchResponse) AddMessage(topic string, partition int32, key, value Encoder, offset int64)

func (*FetchResponse) GetBlock

func (r *FetchResponse) GetBlock(topic string, partition int32) *FetchResponseBlock

type FetchResponseBlock

type FetchResponseBlock struct {
	Err                 KError
	HighWaterMarkOffset int64
	MsgSet              MessageSet
}

type GroupDescription

type GroupDescription struct {
	Err          KError
	GroupId      string
	State        string
	ProtocolType string
	Protocol     string
	Members      map[string]*GroupMemberDescription
}

type GroupMemberDescription

type GroupMemberDescription struct {
	ClientId         string
	ClientHost       string
	MemberMetadata   []byte
	MemberAssignment []byte
}

func (*GroupMemberDescription) GetMemberAssignment

func (gmd *GroupMemberDescription) GetMemberAssignment() (*ConsumerGroupMemberAssignment, error)

func (*GroupMemberDescription) GetMemberMetadata

func (gmd *GroupMemberDescription) GetMemberMetadata() (*ConsumerGroupMemberMetadata, error)

type HeartbeatRequest

type HeartbeatRequest struct {
	GroupId      string
	GenerationId int32
	MemberId     string
}

type HeartbeatResponse

type HeartbeatResponse struct {
	Err KError
}

type JoinGroupRequest

type JoinGroupRequest struct {
	GroupId        string
	SessionTimeout int32
	MemberId       string
	ProtocolType   string
	GroupProtocols map[string][]byte
}

func (*JoinGroupRequest) AddGroupProtocol

func (r *JoinGroupRequest) AddGroupProtocol(name string, metadata []byte)

func (*JoinGroupRequest) AddGroupProtocolMetadata

func (r *JoinGroupRequest) AddGroupProtocolMetadata(name string, metadata *ConsumerGroupMemberMetadata) error

type JoinGroupResponse

type JoinGroupResponse struct {
	Err           KError
	GenerationId  int32
	GroupProtocol string
	LeaderId      string
	MemberId      string
	Members       map[string][]byte
}

func (*JoinGroupResponse) GetMembers

type KError

type KError int16

KError is the type of error that can be returned directly by the Kafka broker. See https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ErrorCodes

const (
	ErrNoError                         KError = 0
	ErrUnknown                         KError = -1
	ErrOffsetOutOfRange                KError = 1
	ErrInvalidMessage                  KError = 2
	ErrUnknownTopicOrPartition         KError = 3
	ErrInvalidMessageSize              KError = 4
	ErrLeaderNotAvailable              KError = 5
	ErrNotLeaderForPartition           KError = 6
	ErrRequestTimedOut                 KError = 7
	ErrBrokerNotAvailable              KError = 8
	ErrReplicaNotAvailable             KError = 9
	ErrMessageSizeTooLarge             KError = 10
	ErrStaleControllerEpochCode        KError = 11
	ErrOffsetMetadataTooLarge          KError = 12
	ErrNetworkException                KError = 13
	ErrOffsetsLoadInProgress           KError = 14
	ErrConsumerCoordinatorNotAvailable KError = 15
	ErrNotCoordinatorForConsumer       KError = 16
	ErrInvalidTopic                    KError = 17
	ErrMessageSetSizeTooLarge          KError = 18
	ErrNotEnoughReplicas               KError = 19
	ErrNotEnoughReplicasAfterAppend    KError = 20
	ErrInvalidRequiredAcks             KError = 21
	ErrIllegalGeneration               KError = 22
	ErrInconsistentGroupProtocol       KError = 23
	ErrInvalidGroupId                  KError = 24
	ErrUnknownMemberId                 KError = 25
	ErrInvalidSessionTimeout           KError = 26
	ErrRebalanceInProgress             KError = 27
	ErrInvalidCommitOffsetSize         KError = 28
	ErrTopicAuthorizationFailed        KError = 29
	ErrGroupAuthorizationFailed        KError = 30
	ErrClusterAuthorizationFailed      KError = 31
	ErrInvalidTimestamp                KError = 32
	ErrUnsupportedSASLMechanism        KError = 33
	ErrIllegalSASLState                KError = 34
	ErrUnsupportedVersion              KError = 35
	ErrUnsupportedForMessageFormat     KError = 43
)

Numeric error codes returned by the Kafka server.

func (KError) Error

func (err KError) Error() string

type KafkaVersion

type KafkaVersion struct {
	// contains filtered or unexported fields
}

KafkaVersion instances represent versions of the upstream Kafka broker.

func (KafkaVersion) IsAtLeast

func (v KafkaVersion) IsAtLeast(other KafkaVersion) bool

IsAtLeast return true if and only if the version it is called on is greater than or equal to the version passed in:

V1.IsAtLeast(V2) // false
V2.IsAtLeast(V1) // true

type LeaveGroupRequest

type LeaveGroupRequest struct {
	GroupId  string
	MemberId string
}

type LeaveGroupResponse

type LeaveGroupResponse struct {
	Err KError
}

type ListGroupsRequest

type ListGroupsRequest struct {
}

type ListGroupsResponse

type ListGroupsResponse struct {
	Err    KError
	Groups map[string]string
}

type Message

type Message struct {
	Codec     CompressionCodec // codec used to compress the message contents
	Key       []byte           // the message key, may be nil
	Value     []byte           // the message contents
	Set       *MessageSet      // the message set a message might wrap
	Version   int8             // v1 requires Kafka 0.10
	Timestamp time.Time        // the timestamp of the message (version 1+ only)
	// contains filtered or unexported fields
}

type MessageBlock

type MessageBlock struct {
	Offset int64
	Msg    *Message
}

func (*MessageBlock) Messages

func (msb *MessageBlock) Messages() []*MessageBlock

Messages convenience helper which returns either all the messages that are wrapped in this block

type MessageSet

type MessageSet struct {
	PartialTrailingMessage bool // whether the set on the wire contained an incomplete trailing MessageBlock
	Messages               []*MessageBlock
}

func (*MessageSet) AddMessage

func (ms *MessageSet) AddMessage(msg *Message)

func (*MessageSet) AllMessages

func (ms *MessageSet) AllMessages() []*Message

type MetadataRequest

type MetadataRequest struct {
	Topics []string
}

type MetadataResponse

type MetadataResponse struct {
	Brokers []*Broker
	Topics  []*TopicMetadata
}

func (*MetadataResponse) AddBroker

func (r *MetadataResponse) AddBroker(addr string, id int32)

func (*MetadataResponse) AddTopic

func (r *MetadataResponse) AddTopic(topic string, err KError) *TopicMetadata

func (*MetadataResponse) AddTopicPartition

func (r *MetadataResponse) AddTopicPartition(topic string, partition, brokerID int32, replicas, isr []int32, err KError)

type OffsetCommitRequest

type OffsetCommitRequest struct {
	ConsumerGroup           string
	ConsumerGroupGeneration int32  // v1 or later
	ConsumerID              string // v1 or later
	RetentionTime           int64  // v2 or later

	// Version can be:
	// - 0 (kafka 0.8.1 and later)
	// - 1 (kafka 0.8.2 and later)
	// - 2 (kafka 0.9.0 and later)
	Version int16
	// contains filtered or unexported fields
}

func (*OffsetCommitRequest) AddBlock

func (r *OffsetCommitRequest) AddBlock(topic string, partitionID int32, offset int64, timestamp int64, metadata string)

type OffsetCommitResponse

type OffsetCommitResponse struct {
	Errors map[string]map[int32]KError
}

func (*OffsetCommitResponse) AddError

func (r *OffsetCommitResponse) AddError(topic string, partition int32, kerror KError)

type OffsetFetchRequest

type OffsetFetchRequest struct {
	ConsumerGroup string
	Version       int16
	// contains filtered or unexported fields
}

func (*OffsetFetchRequest) AddPartition

func (r *OffsetFetchRequest) AddPartition(topic string, partitionID int32)

type OffsetFetchResponse

type OffsetFetchResponse struct {
	Blocks map[string]map[int32]*OffsetFetchResponseBlock
}

func (*OffsetFetchResponse) AddBlock

func (r *OffsetFetchResponse) AddBlock(topic string, partition int32, block *OffsetFetchResponseBlock)

func (*OffsetFetchResponse) GetBlock

func (r *OffsetFetchResponse) GetBlock(topic string, partition int32) *OffsetFetchResponseBlock

type OffsetFetchResponseBlock

type OffsetFetchResponseBlock struct {
	Offset   int64
	Metadata string
	Err      KError
}

type OffsetRequest

type OffsetRequest struct {
	Version int16
	// contains filtered or unexported fields
}

func (*OffsetRequest) AddBlock

func (r *OffsetRequest) AddBlock(topic string, partitionID int32, time int64, maxOffsets int32)

type OffsetResponse

type OffsetResponse struct {
	Version int16
	Blocks  map[string]map[int32]*OffsetResponseBlock
}

func (*OffsetResponse) AddTopicPartition

func (r *OffsetResponse) AddTopicPartition(topic string, partition int32, offset int64)

func (*OffsetResponse) GetBlock

func (r *OffsetResponse) GetBlock(topic string, partition int32) *OffsetResponseBlock

type OffsetResponseBlock

type OffsetResponseBlock struct {
	Err       KError
	Offsets   []int64 // Version 0
	Offset    int64   // Version 1
	Timestamp int64   // Version 1
}

type PacketDecodingError

type PacketDecodingError struct {
	Info string
}

PacketDecodingError is returned when there was an error (other than truncated data) decoding the Kafka broker's response. This can be a bad CRC or length field, or any other invalid value.

func (PacketDecodingError) Error

func (err PacketDecodingError) Error() string

type PacketEncodingError

type PacketEncodingError struct {
	Info string
}

PacketEncodingError is returned from a failure while encoding a Kafka packet. This can happen, for example, if you try to encode a string over 2^15 characters in length, since Kafka's encoding rules do not permit that.

func (PacketEncodingError) Error

func (err PacketEncodingError) Error() string

type PartitionMetadata

type PartitionMetadata struct {
	Err      KError
	ID       int32
	Leader   int32
	Replicas []int32
	Isr      []int32
}

type ProduceRequest

type ProduceRequest struct {
	RequiredAcks RequiredAcks
	Timeout      int32
	Version      int16 // v1 requires Kafka 0.9, v2 requires Kafka 0.10
	// contains filtered or unexported fields
}

func (*ProduceRequest) AddMessage

func (r *ProduceRequest) AddMessage(topic string, partition int32, msg *Message)

func (*ProduceRequest) AddSet

func (r *ProduceRequest) AddSet(topic string, partition int32, set *MessageSet)

type ProduceResponse

type ProduceResponse struct {
	Blocks       map[string]map[int32]*ProduceResponseBlock
	Version      int16
	ThrottleTime time.Duration // only provided if Version >= 1
}

func (*ProduceResponse) AddTopicPartition

func (r *ProduceResponse) AddTopicPartition(topic string, partition int32, err KError)

func (*ProduceResponse) GetBlock

func (r *ProduceResponse) GetBlock(topic string, partition int32) *ProduceResponseBlock

type ProduceResponseBlock

type ProduceResponseBlock struct {
	Err    KError
	Offset int64
	// only provided if Version >= 2 and the broker is configured with `LogAppendTime`
	Timestamp time.Time
}

type ProtocolBody

type ProtocolBody interface {
	VersionedDecoder
	// contains filtered or unexported methods
}

type RequiredAcks

type RequiredAcks int16

RequiredAcks is used in Produce Requests to tell the broker how many replica acknowledgements it must see before responding. Any of the constants defined here are valid. On broker versions prior to 0.8.2.0 any other positive int16 is also valid (the broker will wait for that many acknowledgements) but in 0.8.2.0 and later this will raise an exception (it has been replaced by setting the `min.isr` value in the brokers configuration).

const (
	// NoResponse doesn't send any response, the TCP ACK is all you get.
	NoResponse RequiredAcks = 0
	// WaitForLocal waits for only the local commit to succeed before responding.
	WaitForLocal RequiredAcks = 1
	// WaitForAll waits for all in-sync replicas to commit before responding.
	// The minimum number of in-sync replicas is configured on the broker via
	// the `min.insync.replicas` configuration key.
	WaitForAll RequiredAcks = -1
)

type SaslHandshakeRequest

type SaslHandshakeRequest struct {
	Mechanism string
}

type SaslHandshakeResponse

type SaslHandshakeResponse struct {
	Err               KError
	EnabledMechanisms []string
}

type StdLogger

type StdLogger interface {
	Print(v ...interface{})
	Printf(format string, v ...interface{})
	Println(v ...interface{})
}

StdLogger is used to log error messages.

var Logger StdLogger = log.New(ioutil.Discard, "[Sarama] ", log.LstdFlags)

Logger is the instance of a StdLogger interface that Sarama writes connection management events to. By default it is set to discard all log messages via ioutil.Discard, but you can set it to redirect wherever you want.

type StringEncoder

type StringEncoder string

StringEncoder implements the Encoder interface for Go strings so that they can be used as the Key or Value in a ProducerMessage.

func (StringEncoder) Encode

func (s StringEncoder) Encode() ([]byte, error)

func (StringEncoder) Length

func (s StringEncoder) Length() int

type SyncGroupRequest

type SyncGroupRequest struct {
	GroupId          string
	GenerationId     int32
	MemberId         string
	GroupAssignments map[string][]byte
}

func (*SyncGroupRequest) AddGroupAssignment

func (r *SyncGroupRequest) AddGroupAssignment(memberId string, memberAssignment []byte)

func (*SyncGroupRequest) AddGroupAssignmentMember

func (r *SyncGroupRequest) AddGroupAssignmentMember(memberId string, memberAssignment *ConsumerGroupMemberAssignment) error

type SyncGroupResponse

type SyncGroupResponse struct {
	Err              KError
	MemberAssignment []byte
}

func (*SyncGroupResponse) GetMemberAssignment

func (r *SyncGroupResponse) GetMemberAssignment() (*ConsumerGroupMemberAssignment, error)

type TopicMetadata

type TopicMetadata struct {
	Err        KError
	Name       string
	Partitions []*PartitionMetadata
}

type VersionedDecoder

type VersionedDecoder interface {
	// contains filtered or unexported methods
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL