sarama: github.com/Shopify/sarama Index | Examples | Files | Directories

package sarama

import "github.com/Shopify/sarama"

Package sarama is a pure Go client library for dealing with Apache Kafka (versions 0.8 and later). It includes a high-level API for easily producing and consuming messages, and a low-level API for controlling bytes on the wire when the high-level API is insufficient. Usage examples for the high-level APIs are provided inline with their full documentation.

To produce messages, use either the AsyncProducer or the SyncProducer. The AsyncProducer accepts messages on a channel and produces them asynchronously in the background as efficiently as possible; it is preferred in most cases. The SyncProducer provides a method which will block until Kafka acknowledges the message as produced. This can be useful but comes with two caveats: it will generally be less efficient, and the actual durability guarantees depend on the configured value of `Producer.RequiredAcks`. There are configurations where a message acknowledged by the SyncProducer can still sometimes be lost.

To consume messages, use the Consumer. Note that Sarama's Consumer implementation does not currently support automatic consumer-group rebalancing and offset tracking. For Zookeeper-based tracking (Kafka 0.8.2 and earlier), the https://github.com/wvanbergen/kafka library builds on Sarama to add this support. For Kafka-based tracking (Kafka 0.9 and later), the https://github.com/bsm/sarama-cluster library builds on Sarama to add this support.

For lower-level needs, the Broker and Request/Response objects permit precise control over each connection and message sent on the wire; the Client provides higher-level metadata management that is shared between the producers and the consumer. The Request/Response objects and properties are mostly undocumented, as they line up exactly with the protocol fields documented by Kafka at https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol

Metrics are exposed through https://github.com/rcrowley/go-metrics library in a local registry.

Broker related metrics:

+----------------------------------------------+------------+---------------------------------------------------------------+
| Name                                         | Type       | Description                                                   |
+----------------------------------------------+------------+---------------------------------------------------------------+
| incoming-byte-rate                           | meter      | Bytes/second read off all brokers                             |
| incoming-byte-rate-for-broker-<broker-id>    | meter      | Bytes/second read off a given broker                          |
| outgoing-byte-rate                           | meter      | Bytes/second written off all brokers                          |
| outgoing-byte-rate-for-broker-<broker-id>    | meter      | Bytes/second written off a given broker                       |
| request-rate                                 | meter      | Requests/second sent to all brokers                           |
| request-rate-for-broker-<broker-id>          | meter      | Requests/second sent to a given broker                        |
| request-size                                 | histogram  | Distribution of the request size in bytes for all brokers     |
| request-size-for-broker-<broker-id>          | histogram  | Distribution of the request size in bytes for a given broker  |
| request-latency-in-ms                        | histogram  | Distribution of the request latency in ms for all brokers     |
| request-latency-in-ms-for-broker-<broker-id> | histogram  | Distribution of the request latency in ms for a given broker  |
| response-rate                                | meter      | Responses/second received from all brokers                    |
| response-rate-for-broker-<broker-id>         | meter      | Responses/second received from a given broker                 |
| response-size                                | histogram  | Distribution of the response size in bytes for all brokers    |
| response-size-for-broker-<broker-id>         | histogram  | Distribution of the response size in bytes for a given broker |
+----------------------------------------------+------------+---------------------------------------------------------------+

Note that we do not gather specific metrics for seed brokers but they are part of the "all brokers" metrics.

Producer related metrics:

+-------------------------------------------+------------+--------------------------------------------------------------------------------------+
| Name                                      | Type       | Description                                                                          |
+-------------------------------------------+------------+--------------------------------------------------------------------------------------+
| batch-size                                | histogram  | Distribution of the number of bytes sent per partition per request for all topics    |
| batch-size-for-topic-<topic>              | histogram  | Distribution of the number of bytes sent per partition per request for a given topic |
| record-send-rate                          | meter      | Records/second sent to all topics                                                    |
| record-send-rate-for-topic-<topic>        | meter      | Records/second sent to a given topic                                                 |
| records-per-request                       | histogram  | Distribution of the number of records sent per request for all topics                |
| records-per-request-for-topic-<topic>     | histogram  | Distribution of the number of records sent per request for a given topic             |
| compression-ratio                         | histogram  | Distribution of the compression ratio times 100 of record batches for all topics     |
| compression-ratio-for-topic-<topic>       | histogram  | Distribution of the compression ratio times 100 of record batches for a given topic  |
+-------------------------------------------+------------+--------------------------------------------------------------------------------------+

Index

Examples

Package Files

acl_bindings.go acl_create_request.go acl_create_response.go acl_delete_request.go acl_delete_response.go acl_describe_request.go acl_describe_response.go acl_filter.go acl_types.go add_offsets_to_txn_request.go add_offsets_to_txn_response.go add_partitions_to_txn_request.go add_partitions_to_txn_response.go admin.go alter_configs_request.go alter_configs_response.go api_versions_request.go api_versions_response.go async_producer.go broker.go client.go config.go config_resource_type.go consumer.go consumer_group_members.go consumer_metadata_request.go consumer_metadata_response.go crc32_field.go create_partitions_request.go create_partitions_response.go create_topics_request.go create_topics_response.go delete_groups_request.go delete_groups_response.go delete_records_request.go delete_records_response.go delete_topics_request.go delete_topics_response.go describe_configs_request.go describe_configs_response.go describe_groups_request.go describe_groups_response.go encoder_decoder.go end_txn_request.go end_txn_response.go errors.go fetch_request.go fetch_response.go find_coordinator_request.go find_coordinator_response.go heartbeat_request.go heartbeat_response.go init_producer_id_request.go init_producer_id_response.go join_group_request.go join_group_response.go leave_group_request.go leave_group_response.go length_field.go list_groups_request.go list_groups_response.go message.go message_set.go metadata_request.go metadata_response.go metrics.go mockbroker.go mockresponses.go offset_commit_request.go offset_commit_response.go offset_fetch_request.go offset_fetch_response.go offset_manager.go offset_request.go offset_response.go packet_decoder.go packet_encoder.go partitioner.go prep_encoder.go produce_request.go produce_response.go produce_set.go real_decoder.go real_encoder.go record.go record_batch.go records.go request.go response_header.go sarama.go sasl_handshake_request.go sasl_handshake_response.go sync_group_request.go sync_group_response.go sync_producer.go timestamp.go txn_offset_commit_request.go txn_offset_commit_response.go utils.go

Constants

const (
    // OffsetNewest stands for the log head offset, i.e. the offset that will be
    // assigned to the next message that will be produced to the partition. You
    // can send this to a client's GetOffset method to get this offset, or when
    // calling ConsumePartition to start consuming new messages.
    OffsetNewest int64 = -1
    // OffsetOldest stands for the oldest offset available on the broker for a
    // partition. You can send this to a client's GetOffset method to get this
    // offset, or when calling ConsumePartition to start consuming from the
    // oldest offset that is still available on the broker.
    OffsetOldest int64 = -2
)
const CompressionLevelDefault = -1000

CompressionLevelDefault is the constant to use in CompressionLevel to have the default compression level for any codec. The value is picked that we don't use any existing compression levels.

const GroupGenerationUndefined = -1

GroupGenerationUndefined is a special value for the group generation field of Offset Commit Requests that should be used when a consumer group does not rely on Kafka for partition management.

const ReceiveTime int64 = -1

ReceiveTime is a special value for the timestamp field of Offset Commit Requests which tells the broker to set the timestamp to the time at which the request was received. The timestamp is only used if message version 1 is used, which requires kafka 0.8.2.

Variables

var (
    V0_8_2_0  = newKafkaVersion(0, 8, 2, 0)
    V0_8_2_1  = newKafkaVersion(0, 8, 2, 1)
    V0_8_2_2  = newKafkaVersion(0, 8, 2, 2)
    V0_9_0_0  = newKafkaVersion(0, 9, 0, 0)
    V0_9_0_1  = newKafkaVersion(0, 9, 0, 1)
    V0_10_0_0 = newKafkaVersion(0, 10, 0, 0)
    V0_10_0_1 = newKafkaVersion(0, 10, 0, 1)
    V0_10_1_0 = newKafkaVersion(0, 10, 1, 0)
    V0_10_1_1 = newKafkaVersion(0, 10, 1, 1)
    V0_10_2_0 = newKafkaVersion(0, 10, 2, 0)
    V0_10_2_1 = newKafkaVersion(0, 10, 2, 1)
    V0_11_0_0 = newKafkaVersion(0, 11, 0, 0)
    V0_11_0_1 = newKafkaVersion(0, 11, 0, 1)
    V0_11_0_2 = newKafkaVersion(0, 11, 0, 2)
    V1_0_0_0  = newKafkaVersion(1, 0, 0, 0)
    V1_1_0_0  = newKafkaVersion(1, 1, 0, 0)
    V2_0_0_0  = newKafkaVersion(2, 0, 0, 0)

    SupportedVersions = []KafkaVersion{
        V0_8_2_0,
        V0_8_2_1,
        V0_8_2_2,
        V0_9_0_0,
        V0_9_0_1,
        V0_10_0_0,
        V0_10_0_1,
        V0_10_1_0,
        V0_10_1_1,
        V0_10_2_0,
        V0_10_2_1,
        V0_11_0_0,
        V0_11_0_1,
        V0_11_0_2,
        V1_0_0_0,
        V1_1_0_0,
        V2_0_0_0,
    }
    MinVersion = V0_8_2_0
    MaxVersion = V2_0_0_0
)

Effective constants defining the supported kafka versions.

var ErrAlreadyConnected = errors.New("kafka: broker connection already initiated")

ErrAlreadyConnected is the error returned when calling Open() on a Broker that is already connected or connecting.

var ErrClosedClient = errors.New("kafka: tried to use a client that was closed")

ErrClosedClient is the error returned when a method is called on a client that has been closed.

var ErrConsumerOffsetNotAdvanced = errors.New("kafka: consumer offset was not advanced after a RecordBatch")

ErrConsumerOffsetNotAdvanced is returned when a partition consumer didn't advance its offset after parsing a RecordBatch.

var ErrControllerNotAvailable = errors.New("kafka: controller is not available")

ErrControllerNotAvailable is returned when server didn't give correct controller id. May be kafka server's version is lower than 0.10.0.0.

var ErrIncompleteResponse = errors.New("kafka: response did not contain all the expected topic/partition blocks")

ErrIncompleteResponse is the error returned when the server returns a syntactically valid response, but it does not contain the expected information.

var ErrInsufficientData = errors.New("kafka: insufficient data to decode packet, more bytes expected")

ErrInsufficientData is returned when decoding and the packet is truncated. This can be expected when requesting messages, since as an optimization the server is allowed to return a partial message at the end of the message set.

var ErrInvalidPartition = errors.New("kafka: partitioner returned an invalid partition index")

ErrInvalidPartition is the error returned when a partitioner returns an invalid partition index (meaning one outside of the range [0...numPartitions-1]).

var ErrMessageTooLarge = errors.New("kafka: message is larger than Consumer.Fetch.Max")

ErrMessageTooLarge is returned when the next message to consume is larger than the configured Consumer.Fetch.Max

var ErrNoTopicsToUpdateMetadata = errors.New("kafka: no specific topics to update metadata")

ErrNoTopicsToUpdateMetadata is returned when Meta.Full is set to false but no specific topics were found to update the metadata.

var ErrNotConnected = errors.New("kafka: broker not connected")

ErrNotConnected is the error returned when trying to send or call Close() on a Broker that is not connected.

var ErrOutOfBrokers = errors.New("kafka: client has run out of available brokers to talk to (Is your cluster reachable?)")

ErrOutOfBrokers is the error returned when the client has run out of brokers to talk to because all of them errored or otherwise failed to respond.

var ErrShuttingDown = errors.New("kafka: message received by producer in process of shutting down")

ErrShuttingDown is returned when a producer receives a message during shutdown.

var MaxRequestSize int32 = 100 * 1024 * 1024

MaxRequestSize is the maximum size (in bytes) of any request that Sarama will attempt to send. Trying to send a request larger than this will result in an PacketEncodingError. The default of 100 MiB is aligned with Kafka's default `socket.request.max.bytes`, which is the largest request the broker will attempt to process.

var MaxResponseSize int32 = 100 * 1024 * 1024

MaxResponseSize is the maximum size (in bytes) of any response that Sarama will attempt to parse. If a broker returns a response message larger than this value, Sarama will return a PacketDecodingError to protect the client from running out of memory. Please note that brokers do not have any natural limit on the size of responses they send. In particular, they can send arbitrarily large fetch responses to consumers (see https://issues.apache.org/jira/browse/KAFKA-2063).

var NoNode = &Broker{id: -1, addr: ":-1"}
var PanicHandler func(interface{})

PanicHandler is called for recovering from panics spawned internally to the library (and thus not recoverable by the caller's goroutine). Defaults to nil, which means panics are not recovered.

type AbortedTransaction Uses

type AbortedTransaction struct {
    ProducerID  int64
    FirstOffset int64
}

type Acl Uses

type Acl struct {
    Principal      string
    Host           string
    Operation      AclOperation
    PermissionType AclPermissionType
}

type AclCreation Uses

type AclCreation struct {
    Resource
    Acl
}

type AclCreationResponse Uses

type AclCreationResponse struct {
    Err    KError
    ErrMsg *string
}

type AclFilter Uses

type AclFilter struct {
    ResourceType   AclResourceType
    ResourceName   *string
    Principal      *string
    Host           *string
    Operation      AclOperation
    PermissionType AclPermissionType
}

type AclOperation Uses

type AclOperation int
const (
    AclOperationUnknown         AclOperation = 0
    AclOperationAny             AclOperation = 1
    AclOperationAll             AclOperation = 2
    AclOperationRead            AclOperation = 3
    AclOperationWrite           AclOperation = 4
    AclOperationCreate          AclOperation = 5
    AclOperationDelete          AclOperation = 6
    AclOperationAlter           AclOperation = 7
    AclOperationDescribe        AclOperation = 8
    AclOperationClusterAction   AclOperation = 9
    AclOperationDescribeConfigs AclOperation = 10
    AclOperationAlterConfigs    AclOperation = 11
    AclOperationIdempotentWrite AclOperation = 12
)

ref: https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/acl/AclOperation.java

type AclPermissionType Uses

type AclPermissionType int
const (
    AclPermissionUnknown AclPermissionType = 0
    AclPermissionAny     AclPermissionType = 1
    AclPermissionDeny    AclPermissionType = 2
    AclPermissionAllow   AclPermissionType = 3
)

ref: https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/acl/AclPermissionType.java

type AclResourceType Uses

type AclResourceType int
const (
    AclResourceUnknown         AclResourceType = 0
    AclResourceAny             AclResourceType = 1
    AclResourceTopic           AclResourceType = 2
    AclResourceGroup           AclResourceType = 3
    AclResourceCluster         AclResourceType = 4
    AclResourceTransactionalID AclResourceType = 5
)

ref: https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/resource/ResourceType.java

type AddOffsetsToTxnRequest Uses

type AddOffsetsToTxnRequest struct {
    TransactionalID string
    ProducerID      int64
    ProducerEpoch   int16
    GroupID         string
}

type AddOffsetsToTxnResponse Uses

type AddOffsetsToTxnResponse struct {
    ThrottleTime time.Duration
    Err          KError
}

type AddPartitionsToTxnRequest Uses

type AddPartitionsToTxnRequest struct {
    TransactionalID string
    ProducerID      int64
    ProducerEpoch   int16
    TopicPartitions map[string][]int32
}

type AddPartitionsToTxnResponse Uses

type AddPartitionsToTxnResponse struct {
    ThrottleTime time.Duration
    Errors       map[string][]*PartitionError
}

type AlterConfigsRequest Uses

type AlterConfigsRequest struct {
    Resources    []*AlterConfigsResource
    ValidateOnly bool
}

type AlterConfigsResource Uses

type AlterConfigsResource struct {
    Type          ConfigResourceType
    Name          string
    ConfigEntries map[string]*string
}

type AlterConfigsResourceResponse Uses

type AlterConfigsResourceResponse struct {
    ErrorCode int16
    ErrorMsg  string
    Type      ConfigResourceType
    Name      string
}

type AlterConfigsResponse Uses

type AlterConfigsResponse struct {
    ThrottleTime time.Duration
    Resources    []*AlterConfigsResourceResponse
}

type ApiVersionsRequest Uses

type ApiVersionsRequest struct {
}

type ApiVersionsResponse Uses

type ApiVersionsResponse struct {
    Err         KError
    ApiVersions []*ApiVersionsResponseBlock
}

type ApiVersionsResponseBlock Uses

type ApiVersionsResponseBlock struct {
    ApiKey     int16
    MinVersion int16
    MaxVersion int16
}

type AsyncProducer Uses

type AsyncProducer interface {

    // AsyncClose triggers a shutdown of the producer. The shutdown has completed
    // when both the Errors and Successes channels have been closed. When calling
    // AsyncClose, you *must* continue to read from those channels in order to
    // drain the results of any messages in flight.
    AsyncClose()

    // Close shuts down the producer and waits for any buffered messages to be
    // flushed. You must call this function before a producer object passes out of
    // scope, as it may otherwise leak memory. You must call this before calling
    // Close on the underlying client.
    Close() error

    // Input is the input channel for the user to write messages to that they
    // wish to send.
    Input() chan<- *ProducerMessage

    // Successes is the success output channel back to the user when Return.Successes is
    // enabled. If Return.Successes is true, you MUST read from this channel or the
    // Producer will deadlock. It is suggested that you send and read messages
    // together in a single select statement.
    Successes() <-chan *ProducerMessage

    // Errors is the error output channel back to the user. You MUST read from this
    // channel or the Producer will deadlock when the channel is full. Alternatively,
    // you can set Producer.Return.Errors in your config to false, which prevents
    // errors to be returned.
    Errors() <-chan *ProducerError
}

AsyncProducer publishes Kafka messages using a non-blocking API. It routes messages to the correct broker for the provided topic-partition, refreshing metadata as appropriate, and parses responses for errors. You must read from the Errors() channel or the producer will deadlock. You must call Close() or AsyncClose() on a producer to avoid leaks: it will not be garbage-collected automatically when it passes out of scope.

This example shows how to use the producer with separate goroutines reading from the Successes and Errors channels. Note that in order for the Successes channel to be populated, you have to set config.Producer.Return.Successes to true.

Code:

config := NewConfig()
config.Producer.Return.Successes = true
producer, err := NewAsyncProducer([]string{"localhost:9092"}, config)
if err != nil {
    panic(err)
}

// Trap SIGINT to trigger a graceful shutdown.
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)

var (
    wg                          sync.WaitGroup
    enqueued, successes, errors int
)

wg.Add(1)
go func() {
    defer wg.Done()
    for range producer.Successes() {
        successes++
    }
}()

wg.Add(1)
go func() {
    defer wg.Done()
    for err := range producer.Errors() {
        log.Println(err)
        errors++
    }
}()

ProducerLoop:
for {
    message := &ProducerMessage{Topic: "my_topic", Value: StringEncoder("testing 123")}
    select {
    case producer.Input() <- message:
        enqueued++

    case <-signals:
        producer.AsyncClose() // Trigger a shutdown of the producer.
        break ProducerLoop
    }
}

wg.Wait()

log.Printf("Successfully produced: %d; errors: %d\n", successes, errors)

This example shows how to use the producer while simultaneously reading the Errors channel to know about any failures.

Code:

producer, err := NewAsyncProducer([]string{"localhost:9092"}, nil)
if err != nil {
    panic(err)
}

defer func() {
    if err := producer.Close(); err != nil {
        log.Fatalln(err)
    }
}()

// Trap SIGINT to trigger a shutdown.
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)

var enqueued, errors int
ProducerLoop:
for {
    select {
    case producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder("testing 123")}:
        enqueued++
    case err := <-producer.Errors():
        log.Println("Failed to produce message", err)
        errors++
    case <-signals:
        break ProducerLoop
    }
}

log.Printf("Enqueued: %d; errors: %d\n", enqueued, errors)

func NewAsyncProducer Uses

func NewAsyncProducer(addrs []string, conf *Config) (AsyncProducer, error)

NewAsyncProducer creates a new AsyncProducer using the given broker addresses and configuration.

func NewAsyncProducerFromClient Uses

func NewAsyncProducerFromClient(client Client) (AsyncProducer, error)

NewAsyncProducerFromClient creates a new Producer using the given client. It is still necessary to call Close() on the underlying client when shutting down this producer.

type Broker Uses

type Broker struct {
    // contains filtered or unexported fields
}

Broker represents a single Kafka broker connection. All operations on this object are entirely concurrency-safe.

Code:

broker := NewBroker("localhost:9092")
err := broker.Open(nil)
if err != nil {
    panic(err)
}

request := MetadataRequest{Topics: []string{"myTopic"}}
response, err := broker.GetMetadata(&request)
if err != nil {
    _ = broker.Close()
    panic(err)
}

fmt.Println("There are", len(response.Topics), "topics active in the cluster.")

if err = broker.Close(); err != nil {
    panic(err)
}

func NewBroker Uses

func NewBroker(addr string) *Broker

NewBroker creates and returns a Broker targeting the given host:port address. This does not attempt to actually connect, you have to call Open() for that.

func (*Broker) AddOffsetsToTxn Uses

func (b *Broker) AddOffsetsToTxn(request *AddOffsetsToTxnRequest) (*AddOffsetsToTxnResponse, error)

func (*Broker) AddPartitionsToTxn Uses

func (b *Broker) AddPartitionsToTxn(request *AddPartitionsToTxnRequest) (*AddPartitionsToTxnResponse, error)

func (*Broker) Addr Uses

func (b *Broker) Addr() string

Addr returns the broker address as either retrieved from Kafka's metadata or passed to NewBroker.

func (*Broker) AlterConfigs Uses

func (b *Broker) AlterConfigs(request *AlterConfigsRequest) (*AlterConfigsResponse, error)

func (*Broker) ApiVersions Uses

func (b *Broker) ApiVersions(request *ApiVersionsRequest) (*ApiVersionsResponse, error)

func (*Broker) Close Uses

func (b *Broker) Close() error

func (*Broker) CommitOffset Uses

func (b *Broker) CommitOffset(request *OffsetCommitRequest) (*OffsetCommitResponse, error)

func (*Broker) Connected Uses

func (b *Broker) Connected() (bool, error)

Connected returns true if the broker is connected and false otherwise. If the broker is not connected but it had tried to connect, the error from that connection attempt is also returned.

func (*Broker) CreateAcls Uses

func (b *Broker) CreateAcls(request *CreateAclsRequest) (*CreateAclsResponse, error)

func (*Broker) CreatePartitions Uses

func (b *Broker) CreatePartitions(request *CreatePartitionsRequest) (*CreatePartitionsResponse, error)

func (*Broker) CreateTopics Uses

func (b *Broker) CreateTopics(request *CreateTopicsRequest) (*CreateTopicsResponse, error)

func (*Broker) DeleteAcls Uses

func (b *Broker) DeleteAcls(request *DeleteAclsRequest) (*DeleteAclsResponse, error)

func (*Broker) DeleteGroups Uses

func (b *Broker) DeleteGroups(request *DeleteGroupsRequest) (*DeleteGroupsResponse, error)

func (*Broker) DeleteRecords Uses

func (b *Broker) DeleteRecords(request *DeleteRecordsRequest) (*DeleteRecordsResponse, error)

func (*Broker) DeleteTopics Uses

func (b *Broker) DeleteTopics(request *DeleteTopicsRequest) (*DeleteTopicsResponse, error)

func (*Broker) DescribeAcls Uses

func (b *Broker) DescribeAcls(request *DescribeAclsRequest) (*DescribeAclsResponse, error)

func (*Broker) DescribeConfigs Uses

func (b *Broker) DescribeConfigs(request *DescribeConfigsRequest) (*DescribeConfigsResponse, error)

func (*Broker) DescribeGroups Uses

func (b *Broker) DescribeGroups(request *DescribeGroupsRequest) (*DescribeGroupsResponse, error)

func (*Broker) EndTxn Uses

func (b *Broker) EndTxn(request *EndTxnRequest) (*EndTxnResponse, error)

func (*Broker) Fetch Uses

func (b *Broker) Fetch(request *FetchRequest) (*FetchResponse, error)

func (*Broker) FetchOffset Uses

func (b *Broker) FetchOffset(request *OffsetFetchRequest) (*OffsetFetchResponse, error)

func (*Broker) FindCoordinator Uses

func (b *Broker) FindCoordinator(request *FindCoordinatorRequest) (*FindCoordinatorResponse, error)

func (*Broker) GetAvailableOffsets Uses

func (b *Broker) GetAvailableOffsets(request *OffsetRequest) (*OffsetResponse, error)

func (*Broker) GetConsumerMetadata Uses

func (b *Broker) GetConsumerMetadata(request *ConsumerMetadataRequest) (*ConsumerMetadataResponse, error)

func (*Broker) GetMetadata Uses

func (b *Broker) GetMetadata(request *MetadataRequest) (*MetadataResponse, error)

func (*Broker) Heartbeat Uses

func (b *Broker) Heartbeat(request *HeartbeatRequest) (*HeartbeatResponse, error)

func (*Broker) ID Uses

func (b *Broker) ID() int32

ID returns the broker ID retrieved from Kafka's metadata, or -1 if that is not known.

func (*Broker) InitProducerID Uses

func (b *Broker) InitProducerID(request *InitProducerIDRequest) (*InitProducerIDResponse, error)

func (*Broker) JoinGroup Uses

func (b *Broker) JoinGroup(request *JoinGroupRequest) (*JoinGroupResponse, error)

func (*Broker) LeaveGroup Uses

func (b *Broker) LeaveGroup(request *LeaveGroupRequest) (*LeaveGroupResponse, error)

func (*Broker) ListGroups Uses

func (b *Broker) ListGroups(request *ListGroupsRequest) (*ListGroupsResponse, error)

func (*Broker) Open Uses

func (b *Broker) Open(conf *Config) error

Open tries to connect to the Broker if it is not already connected or connecting, but does not block waiting for the connection to complete. This means that any subsequent operations on the broker will block waiting for the connection to succeed or fail. To get the effect of a fully synchronous Open call, follow it by a call to Connected(). The only errors Open will return directly are ConfigurationError or AlreadyConnected. If conf is nil, the result of NewConfig() is used.

func (*Broker) Produce Uses

func (b *Broker) Produce(request *ProduceRequest) (*ProduceResponse, error)

func (*Broker) SyncGroup Uses

func (b *Broker) SyncGroup(request *SyncGroupRequest) (*SyncGroupResponse, error)

func (*Broker) TxnOffsetCommit Uses

func (b *Broker) TxnOffsetCommit(request *TxnOffsetCommitRequest) (*TxnOffsetCommitResponse, error)

type ByteEncoder Uses

type ByteEncoder []byte

ByteEncoder implements the Encoder interface for Go byte slices so that they can be used as the Key or Value in a ProducerMessage.

func (ByteEncoder) Encode Uses

func (b ByteEncoder) Encode() ([]byte, error)

func (ByteEncoder) Length Uses

func (b ByteEncoder) Length() int

type Client Uses

type Client interface {
    // Config returns the Config struct of the client. This struct should not be
    // altered after it has been created.
    Config() *Config

    // Controller returns the cluster controller broker. Requires Kafka 0.10 or higher.
    Controller() (*Broker, error)

    // Brokers returns the current set of active brokers as retrieved from cluster metadata.
    Brokers() []*Broker

    // Topics returns the set of available topics as retrieved from cluster metadata.
    Topics() ([]string, error)

    // Partitions returns the sorted list of all partition IDs for the given topic.
    Partitions(topic string) ([]int32, error)

    // WritablePartitions returns the sorted list of all writable partition IDs for
    // the given topic, where "writable" means "having a valid leader accepting
    // writes".
    WritablePartitions(topic string) ([]int32, error)

    // Leader returns the broker object that is the leader of the current
    // topic/partition, as determined by querying the cluster metadata.
    Leader(topic string, partitionID int32) (*Broker, error)

    // Replicas returns the set of all replica IDs for the given partition.
    Replicas(topic string, partitionID int32) ([]int32, error)

    // InSyncReplicas returns the set of all in-sync replica IDs for the given
    // partition. In-sync replicas are replicas which are fully caught up with
    // the partition leader.
    InSyncReplicas(topic string, partitionID int32) ([]int32, error)

    // RefreshMetadata takes a list of topics and queries the cluster to refresh the
    // available metadata for those topics. If no topics are provided, it will refresh
    // metadata for all topics.
    RefreshMetadata(topics ...string) error

    // GetOffset queries the cluster to get the most recent available offset at the
    // given time (in milliseconds) on the topic/partition combination.
    // Time should be OffsetOldest for the earliest available offset,
    // OffsetNewest for the offset of the message that will be produced next, or a time.
    GetOffset(topic string, partitionID int32, time int64) (int64, error)

    // Coordinator returns the coordinating broker for a consumer group. It will
    // return a locally cached value if it's available. You can call
    // RefreshCoordinator to update the cached value. This function only works on
    // Kafka 0.8.2 and higher.
    Coordinator(consumerGroup string) (*Broker, error)

    // RefreshCoordinator retrieves the coordinator for a consumer group and stores it
    // in local cache. This function only works on Kafka 0.8.2 and higher.
    RefreshCoordinator(consumerGroup string) error

    // Close shuts down all broker connections managed by this client. It is required
    // to call this function before a client object passes out of scope, as it will
    // otherwise leak memory. You must close any Producers or Consumers using a client
    // before you close the client.
    Close() error

    // Closed returns true if the client has already had Close called on it
    Closed() bool
}

Client is a generic Kafka client. It manages connections to one or more Kafka brokers. You MUST call Close() on a client to avoid leaks, it will not be garbage-collected automatically when it passes out of scope. It is safe to share a client amongst many users, however Kafka will process requests from a single client strictly in serial, so it is generally more efficient to use the default one client per producer/consumer.

func NewClient Uses

func NewClient(addrs []string, conf *Config) (Client, error)

NewClient creates a new Client. It connects to one of the given broker addresses and uses that broker to automatically fetch metadata on the rest of the kafka cluster. If metadata cannot be retrieved from any of the given broker addresses, the client is not created.

type ClusterAdmin Uses

type ClusterAdmin interface {
    // Creates a new topic. This operation is supported by brokers with version 0.10.1.0 or higher.
    // It may take several seconds after CreateTopic returns success for all the brokers
    // to become aware that the topic has been created. During this time, listTopics
    // may not return information about the new topic.The validateOnly option is supported from version 0.10.2.0.
    CreateTopic(topic string, detail *TopicDetail, validateOnly bool) error

    // Delete a topic. It may take several seconds after the DeleteTopic to returns success
    // and for all the brokers to become aware that the topics are gone.
    // During this time, listTopics  may continue to return information about the deleted topic.
    // If delete.topic.enable is false on the brokers, deleteTopic will mark
    // the topic for deletion, but not actually delete them.
    // This operation is supported by brokers with version 0.10.1.0 or higher.
    DeleteTopic(topic string) error

    // Increase the number of partitions of the topics  according to the corresponding values.
    // If partitions are increased for a topic that has a key, the partition logic or ordering of
    // the messages will be affected. It may take several seconds after this method returns
    // success for all the brokers to become aware that the partitions have been created.
    // During this time, ClusterAdmin#describeTopics may not return information about the
    // new partitions. This operation is supported by brokers with version 1.0.0 or higher.
    CreatePartitions(topic string, count int32, assignment [][]int32, validateOnly bool) error

    // Delete records whose offset is smaller than the given offset of the corresponding partition.
    // This operation is supported by brokers with version 0.11.0.0 or higher.
    DeleteRecords(topic string, partitionOffsets map[int32]int64) error

    // Get the configuration for the specified resources.
    // The returned configuration includes default values and the Default is true
    // can be used to distinguish them from user supplied values.
    // Config entries where ReadOnly is true cannot be updated.
    // The value of config entries where Sensitive is true is always nil so
    // sensitive information is not disclosed.
    // This operation is supported by brokers with version 0.11.0.0 or higher.
    DescribeConfig(resource ConfigResource) ([]ConfigEntry, error)

    // Update the configuration for the specified resources with the default options.
    // This operation is supported by brokers with version 0.11.0.0 or higher.
    // The resources with their configs (topic is the only resource type with configs
    // that can be updated currently Updates are not transactional so they may succeed
    // for some resources while fail for others. The configs for a particular resource are updated automatically.
    AlterConfig(resourceType ConfigResourceType, name string, entries map[string]*string, validateOnly bool) error

    // Creates access control lists (ACLs) which are bound to specific resources.
    // This operation is not transactional so it may succeed for some ACLs while fail for others.
    // If you attempt to add an ACL that duplicates an existing ACL, no error will be raised, but
    // no changes will be made. This operation is supported by brokers with version 0.11.0.0 or higher.
    CreateACL(resource Resource, acl Acl) error

    // Lists access control lists (ACLs) according to the supplied filter.
    // it may take some time for changes made by createAcls or deleteAcls to be reflected in the output of ListAcls
    // This operation is supported by brokers with version 0.11.0.0 or higher.
    ListAcls(filter AclFilter) ([]ResourceAcls, error)

    // Deletes access control lists (ACLs) according to the supplied filters.
    // This operation is not transactional so it may succeed for some ACLs while fail for others.
    // This operation is supported by brokers with version 0.11.0.0 or higher.
    DeleteACL(filter AclFilter, validateOnly bool) ([]MatchingAcl, error)

    // Close shuts down the admin and closes underlying client.
    Close() error
}

ClusterAdmin is the administrative client for Kafka, which supports managing and inspecting topics, brokers, configurations and ACLs. The minimum broker version required is 0.10.0.0. Methods with stricter requirements will specify the minimum broker version required. You MUST call Close() on a client to avoid leaks

func NewClusterAdmin Uses

func NewClusterAdmin(addrs []string, conf *Config) (ClusterAdmin, error)

NewClusterAdmin creates a new ClusterAdmin using the given broker addresses and configuration.

type CompressionCodec Uses

type CompressionCodec int8

CompressionCodec represents the various compression codecs recognized by Kafka in messages.

const (
    CompressionNone   CompressionCodec = 0
    CompressionGZIP   CompressionCodec = 1
    CompressionSnappy CompressionCodec = 2
    CompressionLZ4    CompressionCodec = 3
)

func (CompressionCodec) String Uses

func (cc CompressionCodec) String() string

type Config Uses

type Config struct {
    // Admin is the namespace for ClusterAdmin properties used by the administrative Kafka client.
    Admin struct {
        // The maximum duration the administrative Kafka client will wait for ClusterAdmin operations,
        // including topics, brokers, configurations and ACLs (defaults to 3 seconds).
        Timeout time.Duration
    }

    // Net is the namespace for network-level properties used by the Broker, and
    // shared by the Client/Producer/Consumer.
    Net struct {
        // How many outstanding requests a connection is allowed to have before
        // sending on it blocks (default 5).
        MaxOpenRequests int

        // All three of the below configurations are similar to the
        // `socket.timeout.ms` setting in JVM kafka. All of them default
        // to 30 seconds.
        DialTimeout  time.Duration // How long to wait for the initial connection.
        ReadTimeout  time.Duration // How long to wait for a response.
        WriteTimeout time.Duration // How long to wait for a transmit.

        TLS struct {
            // Whether or not to use TLS when connecting to the broker
            // (defaults to false).
            Enable bool
            // The TLS configuration to use for secure connections if
            // enabled (defaults to nil).
            Config *tls.Config
        }

        // SASL based authentication with broker. While there are multiple SASL authentication methods
        // the current implementation is limited to plaintext (SASL/PLAIN) authentication
        SASL struct {
            // Whether or not to use SASL authentication when connecting to the broker
            // (defaults to false).
            Enable bool
            // Whether or not to send the Kafka SASL handshake first if enabled
            // (defaults to true). You should only set this to false if you're using
            // a non-Kafka SASL proxy.
            Handshake bool
            //username and password for SASL/PLAIN authentication
            User     string
            Password string
        }

        // KeepAlive specifies the keep-alive period for an active network connection.
        // If zero, keep-alives are disabled. (default is 0: disabled).
        KeepAlive time.Duration

        // LocalAddr is the local address to use when dialing an
        // address. The address must be of a compatible type for the
        // network being dialed.
        // If nil, a local address is automatically chosen.
        LocalAddr net.Addr
    }

    // Metadata is the namespace for metadata management properties used by the
    // Client, and shared by the Producer/Consumer.
    Metadata struct {
        Retry struct {
            // The total number of times to retry a metadata request when the
            // cluster is in the middle of a leader election (default 3).
            Max int
            // How long to wait for leader election to occur before retrying
            // (default 250ms). Similar to the JVM's `retry.backoff.ms`.
            Backoff time.Duration
        }
        // How frequently to refresh the cluster metadata in the background.
        // Defaults to 10 minutes. Set to 0 to disable. Similar to
        // `topic.metadata.refresh.interval.ms` in the JVM version.
        RefreshFrequency time.Duration

        // Whether to maintain a full set of metadata for all topics, or just
        // the minimal set that has been necessary so far. The full set is simpler
        // and usually more convenient, but can take up a substantial amount of
        // memory if you have many topics and partitions. Defaults to true.
        Full bool
    }

    // Producer is the namespace for configuration related to producing messages,
    // used by the Producer.
    Producer struct {
        // The maximum permitted size of a message (defaults to 1000000). Should be
        // set equal to or smaller than the broker's `message.max.bytes`.
        MaxMessageBytes int
        // The level of acknowledgement reliability needed from the broker (defaults
        // to WaitForLocal). Equivalent to the `request.required.acks` setting of the
        // JVM producer.
        RequiredAcks RequiredAcks
        // The maximum duration the broker will wait the receipt of the number of
        // RequiredAcks (defaults to 10 seconds). This is only relevant when
        // RequiredAcks is set to WaitForAll or a number > 1. Only supports
        // millisecond resolution, nanoseconds will be truncated. Equivalent to
        // the JVM producer's `request.timeout.ms` setting.
        Timeout time.Duration
        // The type of compression to use on messages (defaults to no compression).
        // Similar to `compression.codec` setting of the JVM producer.
        Compression CompressionCodec
        // The level of compression to use on messages. The meaning depends
        // on the actual compression type used and defaults to default compression
        // level for the codec.
        CompressionLevel int
        // Generates partitioners for choosing the partition to send messages to
        // (defaults to hashing the message key). Similar to the `partitioner.class`
        // setting for the JVM producer.
        Partitioner PartitionerConstructor

        // Return specifies what channels will be populated. If they are set to true,
        // you must read from the respective channels to prevent deadlock. If,
        // however, this config is used to create a `SyncProducer`, both must be set
        // to true and you shall not read from the channels since the producer does
        // this internally.
        Return struct {
            // If enabled, successfully delivered messages will be returned on the
            // Successes channel (default disabled).
            Successes bool

            // If enabled, messages that failed to deliver will be returned on the
            // Errors channel, including error (default enabled).
            Errors bool
        }

        // The following config options control how often messages are batched up and
        // sent to the broker. By default, messages are sent as fast as possible, and
        // all messages received while the current batch is in-flight are placed
        // into the subsequent batch.
        Flush struct {
            // The best-effort number of bytes needed to trigger a flush. Use the
            // global sarama.MaxRequestSize to set a hard upper limit.
            Bytes int
            // The best-effort number of messages needed to trigger a flush. Use
            // `MaxMessages` to set a hard upper limit.
            Messages int
            // The best-effort frequency of flushes. Equivalent to
            // `queue.buffering.max.ms` setting of JVM producer.
            Frequency time.Duration
            // The maximum number of messages the producer will send in a single
            // broker request. Defaults to 0 for unlimited. Similar to
            // `queue.buffering.max.messages` in the JVM producer.
            MaxMessages int
        }

        Retry struct {
            // The total number of times to retry sending a message (default 3).
            // Similar to the `message.send.max.retries` setting of the JVM producer.
            Max int
            // How long to wait for the cluster to settle between retries
            // (default 100ms). Similar to the `retry.backoff.ms` setting of the
            // JVM producer.
            Backoff time.Duration
        }
    }

    // Consumer is the namespace for configuration related to consuming messages,
    // used by the Consumer.
    //
    // Note that Sarama's Consumer type does not currently support automatic
    // consumer-group rebalancing and offset tracking.  For Zookeeper-based
    // tracking (Kafka 0.8.2 and earlier), the https://github.com/wvanbergen/kafka
    // library builds on Sarama to add this support. For Kafka-based tracking
    // (Kafka 0.9 and later), the https://github.com/bsm/sarama-cluster library
    // builds on Sarama to add this support.
    Consumer struct {
        Retry struct {
            // How long to wait after a failing to read from a partition before
            // trying again (default 2s).
            Backoff time.Duration
        }

        // Fetch is the namespace for controlling how many bytes are retrieved by any
        // given request.
        Fetch struct {
            // The minimum number of message bytes to fetch in a request - the broker
            // will wait until at least this many are available. The default is 1,
            // as 0 causes the consumer to spin when no messages are available.
            // Equivalent to the JVM's `fetch.min.bytes`.
            Min int32
            // The default number of message bytes to fetch from the broker in each
            // request (default 1MB). This should be larger than the majority of
            // your messages, or else the consumer will spend a lot of time
            // negotiating sizes and not actually consuming. Similar to the JVM's
            // `fetch.message.max.bytes`.
            Default int32
            // The maximum number of message bytes to fetch from the broker in a
            // single request. Messages larger than this will return
            // ErrMessageTooLarge and will not be consumable, so you must be sure
            // this is at least as large as your largest message. Defaults to 0
            // (no limit). Similar to the JVM's `fetch.message.max.bytes`. The
            // global `sarama.MaxResponseSize` still applies.
            Max int32
        }
        // The maximum amount of time the broker will wait for Consumer.Fetch.Min
        // bytes to become available before it returns fewer than that anyways. The
        // default is 250ms, since 0 causes the consumer to spin when no events are
        // available. 100-500ms is a reasonable range for most cases. Kafka only
        // supports precision up to milliseconds; nanoseconds will be truncated.
        // Equivalent to the JVM's `fetch.wait.max.ms`.
        MaxWaitTime time.Duration

        // The maximum amount of time the consumer expects a message takes to
        // process for the user. If writing to the Messages channel takes longer
        // than this, that partition will stop fetching more messages until it
        // can proceed again.
        // Note that, since the Messages channel is buffered, the actual grace time is
        // (MaxProcessingTime * ChanneBufferSize). Defaults to 100ms.
        // If a message is not written to the Messages channel between two ticks
        // of the expiryTicker then a timeout is detected.
        // Using a ticker instead of a timer to detect timeouts should typically
        // result in many fewer calls to Timer functions which may result in a
        // significant performance improvement if many messages are being sent
        // and timeouts are infrequent.
        // The disadvantage of using a ticker instead of a timer is that
        // timeouts will be less accurate. That is, the effective timeout could
        // be between `MaxProcessingTime` and `2 * MaxProcessingTime`. For
        // example, if `MaxProcessingTime` is 100ms then a delay of 180ms
        // between two messages being sent may not be recognized as a timeout.
        MaxProcessingTime time.Duration

        // Return specifies what channels will be populated. If they are set to true,
        // you must read from them to prevent deadlock.
        Return struct {
            // If enabled, any errors that occurred while consuming are returned on
            // the Errors channel (default disabled).
            Errors bool
        }

        // Offsets specifies configuration for how and when to commit consumed
        // offsets. This currently requires the manual use of an OffsetManager
        // but will eventually be automated.
        Offsets struct {
            // How frequently to commit updated offsets. Defaults to 1s.
            CommitInterval time.Duration

            // The initial offset to use if no offset was previously committed.
            // Should be OffsetNewest or OffsetOldest. Defaults to OffsetNewest.
            Initial int64

            // The retention duration for committed offsets. If zero, disabled
            // (in which case the `offsets.retention.minutes` option on the
            // broker will be used).  Kafka only supports precision up to
            // milliseconds; nanoseconds will be truncated. Requires Kafka
            // broker version 0.9.0 or later.
            // (default is 0: disabled).
            Retention time.Duration

            Retry struct {
                // The total number of times to retry failing commit
                // requests during OffsetManager shutdown (default 3).
                Max int
            }
        }
    }

    // A user-provided string sent with every request to the brokers for logging,
    // debugging, and auditing purposes. Defaults to "sarama", but you should
    // probably set it to something specific to your application.
    ClientID string
    // The number of events to buffer in internal and external channels. This
    // permits the producer and consumer to continue processing some messages
    // in the background while user code is working, greatly improving throughput.
    // Defaults to 256.
    ChannelBufferSize int
    // The version of Kafka that Sarama will assume it is running against.
    // Defaults to the oldest supported stable version. Since Kafka provides
    // backwards-compatibility, setting it to a version older than you have
    // will not break anything, although it may prevent you from using the
    // latest features. Setting it to a version greater than you are actually
    // running may lead to random breakage.
    Version KafkaVersion
    // The registry to define metrics into.
    // Defaults to a local registry.
    // If you want to disable metrics gathering, set "metrics.UseNilMetrics" to "true"
    // prior to starting Sarama.
    // See Examples on how to use the metrics registry
    MetricRegistry metrics.Registry
}

Config is used to pass multiple configuration options to Sarama's constructors.

This example shows how to integrate with an existing registry as well as publishing metrics on the standard output

Code:

// Our application registry
appMetricRegistry := metrics.NewRegistry()
appGauge := metrics.GetOrRegisterGauge("m1", appMetricRegistry)
appGauge.Update(1)

config := NewConfig()
// Use a prefix registry instead of the default local one
config.MetricRegistry = metrics.NewPrefixedChildRegistry(appMetricRegistry, "sarama.")

// Simulate a metric created by sarama without starting a broker
saramaGauge := metrics.GetOrRegisterGauge("m2", config.MetricRegistry)
saramaGauge.Update(2)

metrics.WriteOnce(appMetricRegistry, os.Stdout)

Output:

gauge m1
  value:               1
gauge sarama.m2
  value:               2

func NewConfig Uses

func NewConfig() *Config

NewConfig returns a new configuration instance with sane defaults.

func (*Config) Validate Uses

func (c *Config) Validate() error

Validate checks a Config instance. It will return a ConfigurationError if the specified values don't make sense.

type ConfigEntry Uses

type ConfigEntry struct {
    Name      string
    Value     string
    ReadOnly  bool
    Default   bool
    Sensitive bool
}

type ConfigResource Uses

type ConfigResource struct {
    Type        ConfigResourceType
    Name        string
    ConfigNames []string
}

type ConfigResourceType Uses

type ConfigResourceType int8
const (
    UnknownResource ConfigResourceType = 0
    AnyResource     ConfigResourceType = 1
    TopicResource   ConfigResourceType = 2
    GroupResource   ConfigResourceType = 3
    ClusterResource ConfigResourceType = 4
    BrokerResource  ConfigResourceType = 5
)

type ConfigurationError Uses

type ConfigurationError string

ConfigurationError is the type of error returned from a constructor (e.g. NewClient, or NewConsumer) when the specified configuration is invalid.

func (ConfigurationError) Error Uses

func (err ConfigurationError) Error() string

type Consumer Uses

type Consumer interface {

    // Topics returns the set of available topics as retrieved from the cluster
    // metadata. This method is the same as Client.Topics(), and is provided for
    // convenience.
    Topics() ([]string, error)

    // Partitions returns the sorted list of all partition IDs for the given topic.
    // This method is the same as Client.Partitions(), and is provided for convenience.
    Partitions(topic string) ([]int32, error)

    // ConsumePartition creates a PartitionConsumer on the given topic/partition with
    // the given offset. It will return an error if this Consumer is already consuming
    // on the given topic/partition. Offset can be a literal offset, or OffsetNewest
    // or OffsetOldest
    ConsumePartition(topic string, partition int32, offset int64) (PartitionConsumer, error)

    // HighWaterMarks returns the current high water marks for each topic and partition.
    // Consistency between partitions is not guaranteed since high water marks are updated separately.
    HighWaterMarks() map[string]map[int32]int64

    // Close shuts down the consumer. It must be called after all child
    // PartitionConsumers have already been closed.
    Close() error
}

Consumer manages PartitionConsumers which process Kafka messages from brokers. You MUST call Close() on a consumer to avoid leaks, it will not be garbage-collected automatically when it passes out of scope.

Sarama's Consumer type does not currently support automatic consumer-group rebalancing and offset tracking. For Zookeeper-based tracking (Kafka 0.8.2 and earlier), the https://github.com/wvanbergen/kafka library builds on Sarama to add this support. For Kafka-based tracking (Kafka 0.9 and later), the https://github.com/bsm/sarama-cluster library builds on Sarama to add this support.

This example shows how to use the consumer to read messages from a single partition.

Code:

consumer, err := NewConsumer([]string{"localhost:9092"}, nil)
if err != nil {
    panic(err)
}

defer func() {
    if err := consumer.Close(); err != nil {
        log.Fatalln(err)
    }
}()

partitionConsumer, err := consumer.ConsumePartition("my_topic", 0, OffsetNewest)
if err != nil {
    panic(err)
}

defer func() {
    if err := partitionConsumer.Close(); err != nil {
        log.Fatalln(err)
    }
}()

// Trap SIGINT to trigger a shutdown.
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)

consumed := 0
ConsumerLoop:
for {
    select {
    case msg := <-partitionConsumer.Messages():
        log.Printf("Consumed message offset %d\n", msg.Offset)
        consumed++
    case <-signals:
        break ConsumerLoop
    }
}

log.Printf("Consumed: %d\n", consumed)

func NewConsumer Uses

func NewConsumer(addrs []string, config *Config) (Consumer, error)

NewConsumer creates a new consumer using the given broker addresses and configuration.

func NewConsumerFromClient Uses

func NewConsumerFromClient(client Client) (Consumer, error)

NewConsumerFromClient creates a new consumer using the given client. It is still necessary to call Close() on the underlying client when shutting down this consumer.

type ConsumerError Uses

type ConsumerError struct {
    Topic     string
    Partition int32
    Err       error
}

ConsumerError is what is provided to the user when an error occurs. It wraps an error and includes the topic and partition.

func (ConsumerError) Error Uses

func (ce ConsumerError) Error() string

type ConsumerErrors Uses

type ConsumerErrors []*ConsumerError

ConsumerErrors is a type that wraps a batch of errors and implements the Error interface. It can be returned from the PartitionConsumer's Close methods to avoid the need to manually drain errors when stopping.

func (ConsumerErrors) Error Uses

func (ce ConsumerErrors) Error() string

type ConsumerGroupMemberAssignment Uses

type ConsumerGroupMemberAssignment struct {
    Version  int16
    Topics   map[string][]int32
    UserData []byte
}

type ConsumerGroupMemberMetadata Uses

type ConsumerGroupMemberMetadata struct {
    Version  int16
    Topics   []string
    UserData []byte
}

type ConsumerMessage Uses

type ConsumerMessage struct {
    Key, Value     []byte
    Topic          string
    Partition      int32
    Offset         int64
    Timestamp      time.Time       // only set if kafka is version 0.10+, inner message timestamp
    BlockTimestamp time.Time       // only set if kafka is version 0.10+, outer (compressed) block timestamp
    Headers        []*RecordHeader // only set if kafka is version 0.11+
}

ConsumerMessage encapsulates a Kafka message returned by the consumer.

type ConsumerMetadataRequest Uses

type ConsumerMetadataRequest struct {
    ConsumerGroup string
}

type ConsumerMetadataResponse Uses

type ConsumerMetadataResponse struct {
    Err             KError
    Coordinator     *Broker
    CoordinatorID   int32  // deprecated: use Coordinator.ID()
    CoordinatorHost string // deprecated: use Coordinator.Addr()
    CoordinatorPort int32  // deprecated: use Coordinator.Addr()
}

type CoordinatorType Uses

type CoordinatorType int8
const (
    CoordinatorGroup       CoordinatorType = 0
    CoordinatorTransaction CoordinatorType = 1
)

type CreateAclsRequest Uses

type CreateAclsRequest struct {
    AclCreations []*AclCreation
}

type CreateAclsResponse Uses

type CreateAclsResponse struct {
    ThrottleTime         time.Duration
    AclCreationResponses []*AclCreationResponse
}

type CreatePartitionsRequest Uses

type CreatePartitionsRequest struct {
    TopicPartitions map[string]*TopicPartition
    Timeout         time.Duration
    ValidateOnly    bool
}

type CreatePartitionsResponse Uses

type CreatePartitionsResponse struct {
    ThrottleTime         time.Duration
    TopicPartitionErrors map[string]*TopicPartitionError
}

type CreateTopicsRequest Uses

type CreateTopicsRequest struct {
    Version int16

    TopicDetails map[string]*TopicDetail
    Timeout      time.Duration
    ValidateOnly bool
}

type CreateTopicsResponse Uses

type CreateTopicsResponse struct {
    Version      int16
    ThrottleTime time.Duration
    TopicErrors  map[string]*TopicError
}

type DeleteAclsRequest Uses

type DeleteAclsRequest struct {
    Filters []*AclFilter
}

type DeleteAclsResponse Uses

type DeleteAclsResponse struct {
    ThrottleTime    time.Duration
    FilterResponses []*FilterResponse
}

type DeleteGroupsRequest Uses

type DeleteGroupsRequest struct {
    Groups []string
}

func (*DeleteGroupsRequest) AddGroup Uses

func (r *DeleteGroupsRequest) AddGroup(group string)

type DeleteGroupsResponse Uses

type DeleteGroupsResponse struct {
    ThrottleTime    time.Duration
    GroupErrorCodes map[string]KError
}

type DeleteRecordsRequest Uses

type DeleteRecordsRequest struct {
    Topics  map[string]*DeleteRecordsRequestTopic
    Timeout time.Duration
}

type DeleteRecordsRequestTopic Uses

type DeleteRecordsRequestTopic struct {
    PartitionOffsets map[int32]int64 // partition => offset
}

type DeleteRecordsResponse Uses

type DeleteRecordsResponse struct {
    Version      int16
    ThrottleTime time.Duration
    Topics       map[string]*DeleteRecordsResponseTopic
}

type DeleteRecordsResponsePartition Uses

type DeleteRecordsResponsePartition struct {
    LowWatermark int64
    Err          KError
}

type DeleteRecordsResponseTopic Uses

type DeleteRecordsResponseTopic struct {
    Partitions map[int32]*DeleteRecordsResponsePartition
}

type DeleteTopicsRequest Uses

type DeleteTopicsRequest struct {
    Version int16
    Topics  []string
    Timeout time.Duration
}

type DeleteTopicsResponse Uses

type DeleteTopicsResponse struct {
    Version         int16
    ThrottleTime    time.Duration
    TopicErrorCodes map[string]KError
}

type DescribeAclsRequest Uses

type DescribeAclsRequest struct {
    AclFilter
}

type DescribeAclsResponse Uses

type DescribeAclsResponse struct {
    ThrottleTime time.Duration
    Err          KError
    ErrMsg       *string
    ResourceAcls []*ResourceAcls
}

type DescribeConfigsRequest Uses

type DescribeConfigsRequest struct {
    Resources []*ConfigResource
}

type DescribeConfigsResponse Uses

type DescribeConfigsResponse struct {
    ThrottleTime time.Duration
    Resources    []*ResourceResponse
}

type DescribeGroupsRequest Uses

type DescribeGroupsRequest struct {
    Groups []string
}

func (*DescribeGroupsRequest) AddGroup Uses

func (r *DescribeGroupsRequest) AddGroup(group string)

type DescribeGroupsResponse Uses

type DescribeGroupsResponse struct {
    Groups []*GroupDescription
}

type DynamicConsistencyPartitioner Uses

type DynamicConsistencyPartitioner interface {
    Partitioner

    // MessageRequiresConsistency is similar to Partitioner.RequiresConsistency,
    // but takes in the message being partitioned so that the partitioner can
    // make a per-message determination.
    MessageRequiresConsistency(message *ProducerMessage) bool
}

DynamicConsistencyPartitioner can optionally be implemented by Partitioners in order to allow more flexibility than is originally allowed by the RequiresConsistency method in the Partitioner interface. This allows partitioners to require consistency sometimes, but not all times. It's useful for, e.g., the HashPartitioner, which does not require consistency if the message key is nil.

type Encoder Uses

type Encoder interface {
    Encode() ([]byte, error)
    Length() int
}

Encoder is a simple interface for any type that can be encoded as an array of bytes in order to be sent as the key or value of a Kafka message. Length() is provided as an optimization, and must return the same as len() on the result of Encode().

type EndTxnRequest Uses

type EndTxnRequest struct {
    TransactionalID   string
    ProducerID        int64
    ProducerEpoch     int16
    TransactionResult bool
}

type EndTxnResponse Uses

type EndTxnResponse struct {
    ThrottleTime time.Duration
    Err          KError
}

type FetchRequest Uses

type FetchRequest struct {
    MaxWaitTime int32
    MinBytes    int32
    MaxBytes    int32
    Version     int16
    Isolation   IsolationLevel
    // contains filtered or unexported fields
}

FetchRequest (API key 1) will fetch Kafka messages. Version 3 introduced the MaxBytes field. See https://issues.apache.org/jira/browse/KAFKA-2063 for a discussion of the issues leading up to that. The KIP is at https://cwiki.apache.org/confluence/display/KAFKA/KIP-74%3A+Add+Fetch+Response+Size+Limit+in+Bytes

func (*FetchRequest) AddBlock Uses

func (r *FetchRequest) AddBlock(topic string, partitionID int32, fetchOffset int64, maxBytes int32)

type FetchResponse Uses

type FetchResponse struct {
    Blocks       map[string]map[int32]*FetchResponseBlock
    ThrottleTime time.Duration
    Version      int16 // v1 requires 0.9+, v2 requires 0.10+
}

func (*FetchResponse) AddError Uses

func (r *FetchResponse) AddError(topic string, partition int32, err KError)

func (*FetchResponse) AddMessage Uses

func (r *FetchResponse) AddMessage(topic string, partition int32, key, value Encoder, offset int64)

func (*FetchResponse) AddRecord Uses

func (r *FetchResponse) AddRecord(topic string, partition int32, key, value Encoder, offset int64)

func (*FetchResponse) GetBlock Uses

func (r *FetchResponse) GetBlock(topic string, partition int32) *FetchResponseBlock

func (*FetchResponse) SetLastOffsetDelta Uses

func (r *FetchResponse) SetLastOffsetDelta(topic string, partition int32, offset int32)

func (*FetchResponse) SetLastStableOffset Uses

func (r *FetchResponse) SetLastStableOffset(topic string, partition int32, offset int64)

type FetchResponseBlock Uses

type FetchResponseBlock struct {
    Err                 KError
    HighWaterMarkOffset int64
    LastStableOffset    int64
    AbortedTransactions []*AbortedTransaction
    Records             *Records // deprecated: use FetchResponseBlock.Records
    RecordsSet          []*Records
    Partial             bool
}

type FilterResponse Uses

type FilterResponse struct {
    Err          KError
    ErrMsg       *string
    MatchingAcls []*MatchingAcl
}

type FindCoordinatorRequest Uses

type FindCoordinatorRequest struct {
    Version         int16
    CoordinatorKey  string
    CoordinatorType CoordinatorType
}

type FindCoordinatorResponse Uses

type FindCoordinatorResponse struct {
    Version      int16
    ThrottleTime time.Duration
    Err          KError
    ErrMsg       *string
    Coordinator  *Broker
}

type GroupDescription Uses

type GroupDescription struct {
    Err          KError
    GroupId      string
    State        string
    ProtocolType string
    Protocol     string
    Members      map[string]*GroupMemberDescription
}

type GroupMemberDescription Uses

type GroupMemberDescription struct {
    ClientId         string
    ClientHost       string
    MemberMetadata   []byte
    MemberAssignment []byte
}

func (*GroupMemberDescription) GetMemberAssignment Uses

func (gmd *GroupMemberDescription) GetMemberAssignment() (*ConsumerGroupMemberAssignment, error)

func (*GroupMemberDescription) GetMemberMetadata Uses

func (gmd *GroupMemberDescription) GetMemberMetadata() (*ConsumerGroupMemberMetadata, error)

type GroupProtocol Uses

type GroupProtocol struct {
    Name     string
    Metadata []byte
}

type HashPartitionerOption Uses

type HashPartitionerOption func(*hashPartitioner)

HashPartitionOption lets you modify default values of the partitioner

func WithAbsFirst Uses

func WithAbsFirst() HashPartitionerOption

WithAbsFirst means that the partitioner handles absolute values in the same way as the reference Java implementation

func WithCustomFallbackPartitioner Uses

func WithCustomFallbackPartitioner(randomHP *hashPartitioner) HashPartitionerOption

WithCustomFallbackPartitioner lets you specify what HashPartitioner should be used in case a Distribution Key is empty

func WithCustomHashFunction Uses

func WithCustomHashFunction(hasher func() hash.Hash32) HashPartitionerOption

WithCustomHashFunction lets you specify what hash function to use for the partitioning

type HeartbeatRequest Uses

type HeartbeatRequest struct {
    GroupId      string
    GenerationId int32
    MemberId     string
}

type HeartbeatResponse Uses

type HeartbeatResponse struct {
    Err KError
}

type InitProducerIDRequest Uses

type InitProducerIDRequest struct {
    TransactionalID    *string
    TransactionTimeout time.Duration
}

type InitProducerIDResponse Uses

type InitProducerIDResponse struct {
    ThrottleTime  time.Duration
    Err           KError
    ProducerID    int64
    ProducerEpoch int16
}

type IsolationLevel Uses

type IsolationLevel int8
const (
    ReadUncommitted IsolationLevel = 0
    ReadCommitted   IsolationLevel = 1
)

type JoinGroupRequest Uses

type JoinGroupRequest struct {
    Version               int16
    GroupId               string
    SessionTimeout        int32
    RebalanceTimeout      int32
    MemberId              string
    ProtocolType          string
    GroupProtocols        map[string][]byte // deprecated; use OrderedGroupProtocols
    OrderedGroupProtocols []*GroupProtocol
}

func (*JoinGroupRequest) AddGroupProtocol Uses

func (r *JoinGroupRequest) AddGroupProtocol(name string, metadata []byte)

func (*JoinGroupRequest) AddGroupProtocolMetadata Uses

func (r *JoinGroupRequest) AddGroupProtocolMetadata(name string, metadata *ConsumerGroupMemberMetadata) error

type JoinGroupResponse Uses

type JoinGroupResponse struct {
    Version       int16
    ThrottleTime  int32
    Err           KError
    GenerationId  int32
    GroupProtocol string
    LeaderId      string
    MemberId      string
    Members       map[string][]byte
}

func (*JoinGroupResponse) GetMembers Uses

func (r *JoinGroupResponse) GetMembers() (map[string]ConsumerGroupMemberMetadata, error)

type KError Uses

type KError int16

KError is the type of error that can be returned directly by the Kafka broker. See https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ErrorCodes

const (
    ErrNoError                            KError = 0
    ErrUnknown                            KError = -1
    ErrOffsetOutOfRange                   KError = 1
    ErrInvalidMessage                     KError = 2
    ErrUnknownTopicOrPartition            KError = 3
    ErrInvalidMessageSize                 KError = 4
    ErrLeaderNotAvailable                 KError = 5
    ErrNotLeaderForPartition              KError = 6
    ErrRequestTimedOut                    KError = 7
    ErrBrokerNotAvailable                 KError = 8
    ErrReplicaNotAvailable                KError = 9
    ErrMessageSizeTooLarge                KError = 10
    ErrStaleControllerEpochCode           KError = 11
    ErrOffsetMetadataTooLarge             KError = 12
    ErrNetworkException                   KError = 13
    ErrOffsetsLoadInProgress              KError = 14
    ErrConsumerCoordinatorNotAvailable    KError = 15
    ErrNotCoordinatorForConsumer          KError = 16
    ErrInvalidTopic                       KError = 17
    ErrMessageSetSizeTooLarge             KError = 18
    ErrNotEnoughReplicas                  KError = 19
    ErrNotEnoughReplicasAfterAppend       KError = 20
    ErrInvalidRequiredAcks                KError = 21
    ErrIllegalGeneration                  KError = 22
    ErrInconsistentGroupProtocol          KError = 23
    ErrInvalidGroupId                     KError = 24
    ErrUnknownMemberId                    KError = 25
    ErrInvalidSessionTimeout              KError = 26
    ErrRebalanceInProgress                KError = 27
    ErrInvalidCommitOffsetSize            KError = 28
    ErrTopicAuthorizationFailed           KError = 29
    ErrGroupAuthorizationFailed           KError = 30
    ErrClusterAuthorizationFailed         KError = 31
    ErrInvalidTimestamp                   KError = 32
    ErrUnsupportedSASLMechanism           KError = 33
    ErrIllegalSASLState                   KError = 34
    ErrUnsupportedVersion                 KError = 35
    ErrTopicAlreadyExists                 KError = 36
    ErrInvalidPartitions                  KError = 37
    ErrInvalidReplicationFactor           KError = 38
    ErrInvalidReplicaAssignment           KError = 39
    ErrInvalidConfig                      KError = 40
    ErrNotController                      KError = 41
    ErrInvalidRequest                     KError = 42
    ErrUnsupportedForMessageFormat        KError = 43
    ErrPolicyViolation                    KError = 44
    ErrOutOfOrderSequenceNumber           KError = 45
    ErrDuplicateSequenceNumber            KError = 46
    ErrInvalidProducerEpoch               KError = 47
    ErrInvalidTxnState                    KError = 48
    ErrInvalidProducerIDMapping           KError = 49
    ErrInvalidTransactionTimeout          KError = 50
    ErrConcurrentTransactions             KError = 51
    ErrTransactionCoordinatorFenced       KError = 52
    ErrTransactionalIDAuthorizationFailed KError = 53
    ErrSecurityDisabled                   KError = 54
    ErrOperationNotAttempted              KError = 55
    ErrKafkaStorageError                  KError = 56
    ErrLogDirNotFound                     KError = 57
    ErrSASLAuthenticationFailed           KError = 58
    ErrUnknownProducerID                  KError = 59
    ErrReassignmentInProgress             KError = 60
)

Numeric error codes returned by the Kafka server.

func (KError) Error Uses

func (err KError) Error() string

type KafkaVersion Uses

type KafkaVersion struct {
    // contains filtered or unexported fields
}

KafkaVersion instances represent versions of the upstream Kafka broker.

func ParseKafkaVersion Uses

func ParseKafkaVersion(s string) (KafkaVersion, error)

func (KafkaVersion) IsAtLeast Uses

func (v KafkaVersion) IsAtLeast(other KafkaVersion) bool

IsAtLeast return true if and only if the version it is called on is greater than or equal to the version passed in:

V1.IsAtLeast(V2) // false
V2.IsAtLeast(V1) // true

func (KafkaVersion) String Uses

func (v KafkaVersion) String() string

type LeaveGroupRequest Uses

type LeaveGroupRequest struct {
    GroupId  string
    MemberId string
}

type LeaveGroupResponse Uses

type LeaveGroupResponse struct {
    Err KError
}

type ListGroupsRequest Uses

type ListGroupsRequest struct {
}

type ListGroupsResponse Uses

type ListGroupsResponse struct {
    Err    KError
    Groups map[string]string
}

type MatchingAcl Uses

type MatchingAcl struct {
    Err    KError
    ErrMsg *string
    Resource
    Acl
}

type Message Uses

type Message struct {
    Codec            CompressionCodec // codec used to compress the message contents
    CompressionLevel int              // compression level
    Key              []byte           // the message key, may be nil
    Value            []byte           // the message contents
    Set              *MessageSet      // the message set a message might wrap
    Version          int8             // v1 requires Kafka 0.10
    Timestamp        time.Time        // the timestamp of the message (version 1+ only)
    // contains filtered or unexported fields
}

type MessageBlock Uses

type MessageBlock struct {
    Offset int64
    Msg    *Message
}

func (*MessageBlock) Messages Uses

func (msb *MessageBlock) Messages() []*MessageBlock

Messages convenience helper which returns either all the messages that are wrapped in this block

type MessageSet Uses

type MessageSet struct {
    PartialTrailingMessage bool // whether the set on the wire contained an incomplete trailing MessageBlock
    OverflowMessage        bool // whether the set on the wire contained an overflow message
    Messages               []*MessageBlock
}

type MetadataRequest Uses

type MetadataRequest struct {
    Version                int16
    Topics                 []string
    AllowAutoTopicCreation bool
}

type MetadataResponse Uses

type MetadataResponse struct {
    Version        int16
    ThrottleTimeMs int32
    Brokers        []*Broker
    ClusterID      *string
    ControllerID   int32
    Topics         []*TopicMetadata
}

func (*MetadataResponse) AddBroker Uses

func (r *MetadataResponse) AddBroker(addr string, id int32)

func (*MetadataResponse) AddTopic Uses

func (r *MetadataResponse) AddTopic(topic string, err KError) *TopicMetadata

func (*MetadataResponse) AddTopicPartition Uses

func (r *MetadataResponse) AddTopicPartition(topic string, partition, brokerID int32, replicas, isr []int32, err KError)

type MockAlterConfigsResponse Uses

type MockAlterConfigsResponse struct {
    // contains filtered or unexported fields
}

func NewMockAlterConfigsResponse Uses

func NewMockAlterConfigsResponse(t TestReporter) *MockAlterConfigsResponse

func (*MockAlterConfigsResponse) For Uses

func (mr *MockAlterConfigsResponse) For(reqBody versionedDecoder) encoder

type MockBroker Uses

type MockBroker struct {
    // contains filtered or unexported fields
}

MockBroker is a mock Kafka broker that is used in unit tests. It is exposed to facilitate testing of higher level or specialized consumers and producers built on top of Sarama. Note that it does not 'mimic' the Kafka API protocol, but rather provides a facility to do that. It takes care of the TCP transport, request unmarshaling, response marshaling, and makes it the test writer responsibility to program correct according to the Kafka API protocol MockBroker behaviour.

MockBroker is implemented as a TCP server listening on a kernel-selected localhost port that can accept many connections. It reads Kafka requests from that connection and returns responses programmed by the SetHandlerByMap function. If a MockBroker receives a request that it has no programmed response for, then it returns nothing and the request times out.

A set of MockRequest builders to define mappings used by MockBroker is provided by Sarama. But users can develop MockRequests of their own and use them along with or instead of the standard ones.

When running tests with MockBroker it is strongly recommended to specify a timeout to `go test` so that if the broker hangs waiting for a response, the test panics.

It is not necessary to prefix message length or correlation ID to your response bytes, the server does that automatically as a convenience.

func NewMockBroker Uses

func NewMockBroker(t TestReporter, brokerID int32) *MockBroker

NewMockBroker launches a fake Kafka broker. It takes a TestReporter as provided by the test framework and a channel of responses to use. If an error occurs it is simply logged to the TestReporter and the broker exits.

func NewMockBrokerAddr Uses

func NewMockBrokerAddr(t TestReporter, brokerID int32, addr string) *MockBroker

NewMockBrokerAddr behaves like newMockBroker but listens on the address you give it rather than just some ephemeral port.

func NewMockBrokerListener Uses

func NewMockBrokerListener(t TestReporter, brokerID int32, listener net.Listener) *MockBroker

NewMockBrokerListener behaves like newMockBrokerAddr but accepts connections on the listener specified.

func (*MockBroker) Addr Uses

func (b *MockBroker) Addr() string

Addr returns the broker connection string in the form "<address>:<port>".

func (*MockBroker) BrokerID Uses

func (b *MockBroker) BrokerID() int32

BrokerID returns broker ID assigned to the broker.

func (*MockBroker) Close Uses

func (b *MockBroker) Close()

Close terminates the broker blocking until it stops internal goroutines and releases all resources.

func (*MockBroker) History Uses

func (b *MockBroker) History() []RequestResponse

History returns a slice of RequestResponse pairs in the order they were processed by the broker. Note that in case of multiple connections to the broker the order expected by a test can be different from the order recorded in the history, unless some synchronization is implemented in the test.

func (*MockBroker) Port Uses

func (b *MockBroker) Port() int32

Port returns the TCP port number the broker is listening for requests on.

func (*MockBroker) Returns Uses

func (b *MockBroker) Returns(e encoder)

func (*MockBroker) SetHandlerByMap Uses

func (b *MockBroker) SetHandlerByMap(handlerMap map[string]MockResponse)

SetHandlerByMap defines mapping of Request types to MockResponses. When a request is received by the broker, it looks up the request type in the map and uses the found MockResponse instance to generate an appropriate reply. If the request type is not found in the map then nothing is sent.

func (*MockBroker) SetLatency Uses

func (b *MockBroker) SetLatency(latency time.Duration)

SetLatency makes broker pause for the specified period every time before replying.

func (*MockBroker) SetNotifier Uses

func (b *MockBroker) SetNotifier(notifier RequestNotifierFunc)

SetNotifier set a function that will get invoked whenever a request has been processed successfully and will provide the number of bytes read and written

type MockConsumerMetadataResponse Uses

type MockConsumerMetadataResponse struct {
    // contains filtered or unexported fields
}

MockConsumerMetadataResponse is a `ConsumerMetadataResponse` builder.

func NewMockConsumerMetadataResponse Uses

func NewMockConsumerMetadataResponse(t TestReporter) *MockConsumerMetadataResponse

func (*MockConsumerMetadataResponse) For Uses

func (mr *MockConsumerMetadataResponse) For(reqBody versionedDecoder) encoder

func (*MockConsumerMetadataResponse) SetCoordinator Uses

func (mr *MockConsumerMetadataResponse) SetCoordinator(group string, broker *MockBroker) *MockConsumerMetadataResponse

func (*MockConsumerMetadataResponse) SetError Uses

func (mr *MockConsumerMetadataResponse) SetError(group string, kerror KError) *MockConsumerMetadataResponse

type MockCreateAclsResponse Uses

type MockCreateAclsResponse struct {
    // contains filtered or unexported fields
}

func NewMockCreateAclsResponse Uses

func NewMockCreateAclsResponse(t TestReporter) *MockCreateAclsResponse

func (*MockCreateAclsResponse) For Uses

func (mr *MockCreateAclsResponse) For(reqBody versionedDecoder) encoder

type MockCreatePartitionsResponse Uses

type MockCreatePartitionsResponse struct {
    // contains filtered or unexported fields
}

func NewMockCreatePartitionsResponse Uses

func NewMockCreatePartitionsResponse(t TestReporter) *MockCreatePartitionsResponse

func (*MockCreatePartitionsResponse) For Uses

func (mr *MockCreatePartitionsResponse) For(reqBody versionedDecoder) encoder

type MockCreateTopicsResponse Uses

type MockCreateTopicsResponse struct {
    // contains filtered or unexported fields
}

func NewMockCreateTopicsResponse Uses

func NewMockCreateTopicsResponse(t TestReporter) *MockCreateTopicsResponse

func (*MockCreateTopicsResponse) For Uses

func (mr *MockCreateTopicsResponse) For(reqBody versionedDecoder) encoder

type MockDeleteAclsResponse Uses

type MockDeleteAclsResponse struct {
    // contains filtered or unexported fields
}

func NewMockDeleteAclsResponse Uses

func NewMockDeleteAclsResponse(t TestReporter) *MockDeleteAclsResponse

func (*MockDeleteAclsResponse) For Uses

func (mr *MockDeleteAclsResponse) For(reqBody versionedDecoder) encoder

type MockDeleteRecordsResponse Uses

type MockDeleteRecordsResponse struct {
    // contains filtered or unexported fields
}

func NewMockDeleteRecordsResponse Uses

func NewMockDeleteRecordsResponse(t TestReporter) *MockDeleteRecordsResponse

func (*MockDeleteRecordsResponse) For Uses

func (mr *MockDeleteRecordsResponse) For(reqBody versionedDecoder) encoder

type MockDeleteTopicsResponse Uses

type MockDeleteTopicsResponse struct {
    // contains filtered or unexported fields
}

func NewMockDeleteTopicsResponse Uses

func NewMockDeleteTopicsResponse(t TestReporter) *MockDeleteTopicsResponse

func (*MockDeleteTopicsResponse) For Uses

func (mr *MockDeleteTopicsResponse) For(reqBody versionedDecoder) encoder

type MockDescribeConfigsResponse Uses

type MockDescribeConfigsResponse struct {
    // contains filtered or unexported fields
}

func NewMockDescribeConfigsResponse Uses

func NewMockDescribeConfigsResponse(t TestReporter) *MockDescribeConfigsResponse

func (*MockDescribeConfigsResponse) For Uses

func (mr *MockDescribeConfigsResponse) For(reqBody versionedDecoder) encoder

type MockFetchResponse Uses

type MockFetchResponse struct {
    // contains filtered or unexported fields
}

MockFetchResponse is a `FetchResponse` builder.

func NewMockFetchResponse Uses

func NewMockFetchResponse(t TestReporter, batchSize int) *MockFetchResponse

func (*MockFetchResponse) For Uses

func (mfr *MockFetchResponse) For(reqBody versionedDecoder) encoder

func (*MockFetchResponse) SetHighWaterMark Uses

func (mfr *MockFetchResponse) SetHighWaterMark(topic string, partition int32, offset int64) *MockFetchResponse

func (*MockFetchResponse) SetMessage Uses

func (mfr *MockFetchResponse) SetMessage(topic string, partition int32, offset int64, msg Encoder) *MockFetchResponse

func (*MockFetchResponse) SetVersion Uses

func (mfr *MockFetchResponse) SetVersion(version int16) *MockFetchResponse

type MockFindCoordinatorResponse Uses

type MockFindCoordinatorResponse struct {
    // contains filtered or unexported fields
}

MockFindCoordinatorResponse is a `FindCoordinatorResponse` builder.

func NewMockFindCoordinatorResponse Uses

func NewMockFindCoordinatorResponse(t TestReporter) *MockFindCoordinatorResponse

func (*MockFindCoordinatorResponse) For Uses

func (mr *MockFindCoordinatorResponse) For(reqBody versionedDecoder) encoder

func (*MockFindCoordinatorResponse) SetCoordinator Uses

func (mr *MockFindCoordinatorResponse) SetCoordinator(coordinatorType CoordinatorType, group string, broker *MockBroker) *MockFindCoordinatorResponse

func (*MockFindCoordinatorResponse) SetError Uses

func (mr *MockFindCoordinatorResponse) SetError(coordinatorType CoordinatorType, group string, kerror KError) *MockFindCoordinatorResponse

type MockListAclsResponse Uses

type MockListAclsResponse struct {
    // contains filtered or unexported fields
}

func NewMockListAclsResponse Uses

func NewMockListAclsResponse(t TestReporter) *MockListAclsResponse

func (*MockListAclsResponse) For Uses

func (mr *MockListAclsResponse) For(reqBody versionedDecoder) encoder

type MockMetadataResponse Uses

type MockMetadataResponse struct {
    // contains filtered or unexported fields
}

MockMetadataResponse is a `MetadataResponse` builder.

func NewMockMetadataResponse Uses

func NewMockMetadataResponse(t TestReporter) *MockMetadataResponse

func (*MockMetadataResponse) For Uses

func (mmr *MockMetadataResponse) For(reqBody versionedDecoder) encoder

func (*MockMetadataResponse) SetBroker Uses

func (mmr *MockMetadataResponse) SetBroker(addr string, brokerID int32) *MockMetadataResponse

func (*MockMetadataResponse) SetController Uses

func (mmr *MockMetadataResponse) SetController(brokerID int32) *MockMetadataResponse

func (*MockMetadataResponse) SetLeader Uses

func (mmr *MockMetadataResponse) SetLeader(topic string, partition, brokerID int32) *MockMetadataResponse

type MockOffsetCommitResponse Uses

type MockOffsetCommitResponse struct {
    // contains filtered or unexported fields
}

MockOffsetCommitResponse is a `OffsetCommitResponse` builder.

func NewMockOffsetCommitResponse Uses

func NewMockOffsetCommitResponse(t TestReporter) *MockOffsetCommitResponse

func (*MockOffsetCommitResponse) For Uses

func (mr *MockOffsetCommitResponse) For(reqBody versionedDecoder) encoder

func (*MockOffsetCommitResponse) SetError Uses

func (mr *MockOffsetCommitResponse) SetError(group, topic string, partition int32, kerror KError) *MockOffsetCommitResponse

type MockOffsetFetchResponse Uses

type MockOffsetFetchResponse struct {
    // contains filtered or unexported fields
}

MockOffsetFetchResponse is a `OffsetFetchResponse` builder.

func NewMockOffsetFetchResponse Uses

func NewMockOffsetFetchResponse(t TestReporter) *MockOffsetFetchResponse

func (*MockOffsetFetchResponse) For Uses

func (mr *MockOffsetFetchResponse) For(reqBody versionedDecoder) encoder

func (*MockOffsetFetchResponse) SetOffset Uses

func (mr *MockOffsetFetchResponse) SetOffset(group, topic string, partition int32, offset int64, metadata string, kerror KError) *MockOffsetFetchResponse

type MockOffsetResponse Uses

type MockOffsetResponse struct {
    // contains filtered or unexported fields
}

MockOffsetResponse is an `OffsetResponse` builder.

func NewMockOffsetResponse Uses

func NewMockOffsetResponse(t TestReporter) *MockOffsetResponse

func (*MockOffsetResponse) For Uses

func (mor *MockOffsetResponse) For(reqBody versionedDecoder) encoder

func (*MockOffsetResponse) SetOffset Uses

func (mor *MockOffsetResponse) SetOffset(topic string, partition int32, time, offset int64) *MockOffsetResponse

func (*MockOffsetResponse) SetVersion Uses

func (mor *MockOffsetResponse) SetVersion(version int16) *MockOffsetResponse

type MockProduceResponse Uses

type MockProduceResponse struct {
    // contains filtered or unexported fields
}

MockProduceResponse is a `ProduceResponse` builder.

func NewMockProduceResponse Uses

func NewMockProduceResponse(t TestReporter) *MockProduceResponse

func (*MockProduceResponse) For Uses

func (mr *MockProduceResponse) For(reqBody versionedDecoder) encoder

func (*MockProduceResponse) SetError Uses

func (mr *MockProduceResponse) SetError(topic string, partition int32, kerror KError) *MockProduceResponse

func (*MockProduceResponse) SetVersion Uses

func (mr *MockProduceResponse) SetVersion(version int16) *MockProduceResponse

type MockResponse Uses

type MockResponse interface {
    For(reqBody versionedDecoder) (res encoder)
}

MockResponse is a response builder interface it defines one method that allows generating a response based on a request body. MockResponses are used to program behavior of MockBroker in tests.

type MockSequence Uses

type MockSequence struct {
    // contains filtered or unexported fields
}

MockSequence is a mock response builder that is created from a sequence of concrete responses. Every time when a `MockBroker` calls its `For` method the next response from the sequence is returned. When the end of the sequence is reached the last element from the sequence is returned.

func NewMockSequence Uses

func NewMockSequence(responses ...interface{}) *MockSequence

func (*MockSequence) For Uses

func (mc *MockSequence) For(reqBody versionedDecoder) (res encoder)

type MockWrapper Uses

type MockWrapper struct {
    // contains filtered or unexported fields
}

MockWrapper is a mock response builder that returns a particular concrete response regardless of the actual request passed to the `For` method.

func NewMockWrapper Uses

func NewMockWrapper(res encoder) *MockWrapper

func (*MockWrapper) For Uses

func (mw *MockWrapper) For(reqBody versionedDecoder) (res encoder)

type OffsetCommitRequest Uses

type OffsetCommitRequest struct {
    ConsumerGroup           string
    ConsumerGroupGeneration int32  // v1 or later
    ConsumerID              string // v1 or later
    RetentionTime           int64  // v2 or later

    // Version can be:
    // - 0 (kafka 0.8.1 and later)
    // - 1 (kafka 0.8.2 and later)
    // - 2 (kafka 0.9.0 and later)
    Version int16
    // contains filtered or unexported fields
}

func (*OffsetCommitRequest) AddBlock Uses

func (r *OffsetCommitRequest) AddBlock(topic string, partitionID int32, offset int64, timestamp int64, metadata string)

func (*OffsetCommitRequest) Offset Uses

func (r *OffsetCommitRequest) Offset(topic string, partitionID int32) (int64, string, error)

type OffsetCommitResponse Uses

type OffsetCommitResponse struct {
    Errors map[string]map[int32]KError
}

func (*OffsetCommitResponse) AddError Uses

func (r *OffsetCommitResponse) AddError(topic string, partition int32, kerror KError)

type OffsetFetchRequest Uses

type OffsetFetchRequest struct {
    ConsumerGroup string
    Version       int16
    // contains filtered or unexported fields
}

func (*OffsetFetchRequest) AddPartition Uses

func (r *OffsetFetchRequest) AddPartition(topic string, partitionID int32)

type OffsetFetchResponse Uses

type OffsetFetchResponse struct {
    Blocks map[string]map[int32]*OffsetFetchResponseBlock
}

func (*OffsetFetchResponse) AddBlock Uses

func (r *OffsetFetchResponse) AddBlock(topic string, partition int32, block *OffsetFetchResponseBlock)

func (*OffsetFetchResponse) GetBlock Uses

func (r *OffsetFetchResponse) GetBlock(topic string, partition int32) *OffsetFetchResponseBlock

type OffsetFetchResponseBlock Uses

type OffsetFetchResponseBlock struct {
    Offset   int64
    Metadata string
    Err      KError
}

type OffsetManager Uses

type OffsetManager interface {
    // ManagePartition creates a PartitionOffsetManager on the given topic/partition.
    // It will return an error if this OffsetManager is already managing the given
    // topic/partition.
    ManagePartition(topic string, partition int32) (PartitionOffsetManager, error)

    // Close stops the OffsetManager from managing offsets. It is required to call
    // this function before an OffsetManager object passes out of scope, as it
    // will otherwise leak memory. You must call this after all the
    // PartitionOffsetManagers are closed.
    Close() error
}

OffsetManager uses Kafka to store and fetch consumed partition offsets.

func NewOffsetManagerFromClient Uses

func NewOffsetManagerFromClient(group string, client Client) (OffsetManager, error)

NewOffsetManagerFromClient creates a new OffsetManager from the given client. It is still necessary to call Close() on the underlying client when finished with the partition manager.

type OffsetRequest Uses

type OffsetRequest struct {
    Version int16
    // contains filtered or unexported fields
}

func (*OffsetRequest) AddBlock Uses

func (r *OffsetRequest) AddBlock(topic string, partitionID int32, time int64, maxOffsets int32)

type OffsetResponse Uses

type OffsetResponse struct {
    Version int16
    Blocks  map[string]map[int32]*OffsetResponseBlock
}

func (*OffsetResponse) AddTopicPartition Uses

func (r *OffsetResponse) AddTopicPartition(topic string, partition int32, offset int64)

func (*OffsetResponse) GetBlock Uses

func (r *OffsetResponse) GetBlock(topic string, partition int32) *OffsetResponseBlock

type OffsetResponseBlock Uses

type OffsetResponseBlock struct {
    Err       KError
    Offsets   []int64 // Version 0
    Offset    int64   // Version 1
    Timestamp int64   // Version 1
}

type PacketDecodingError Uses

type PacketDecodingError struct {
    Info string
}

PacketDecodingError is returned when there was an error (other than truncated data) decoding the Kafka broker's response. This can be a bad CRC or length field, or any other invalid value.

func (PacketDecodingError) Error Uses

func (err PacketDecodingError) Error() string

type PacketEncodingError Uses

type PacketEncodingError struct {
    Info string
}

PacketEncodingError is returned from a failure while encoding a Kafka packet. This can happen, for example, if you try to encode a string over 2^15 characters in length, since Kafka's encoding rules do not permit that.

func (PacketEncodingError) Error Uses

func (err PacketEncodingError) Error() string

type PartitionConsumer Uses

type PartitionConsumer interface {

    // AsyncClose initiates a shutdown of the PartitionConsumer. This method will return immediately, after which you
    // should continue to service the 'Messages' and 'Errors' channels until they are empty. It is required to call this
    // function, or Close before a consumer object passes out of scope, as it will otherwise leak memory. You must call
    // this before calling Close on the underlying client.
    AsyncClose()

    // Close stops the PartitionConsumer from fetching messages. It will initiate a shutdown just like AsyncClose, drain
    // the Messages channel, harvest any errors & return them to the caller. Note that if you are continuing to service
    // the Messages channel when this function is called, you will be competing with Close for messages; consider
    // calling AsyncClose, instead. It is required to call this function (or AsyncClose) before a consumer object passes
    // out of scope, as it will otherwise leak memory. You must call this before calling Close on the underlying client.
    Close() error

    // Messages returns the read channel for the messages that are returned by
    // the broker.
    Messages() <-chan *ConsumerMessage

    // Errors returns a read channel of errors that occurred during consuming, if
    // enabled. By default, errors are logged and not returned over this channel.
    // If you want to implement any custom error handling, set your config's
    // Consumer.Return.Errors setting to true, and read from this channel.
    Errors() <-chan *ConsumerError

    // HighWaterMarkOffset returns the high water mark offset of the partition,
    // i.e. the offset that will be used for the next message that will be produced.
    // You can use this to determine how far behind the processing is.
    HighWaterMarkOffset() int64
}

PartitionConsumer processes Kafka messages from a given topic and partition. You MUST call one of Close() or AsyncClose() on a PartitionConsumer to avoid leaks; it will not be garbage-collected automatically when it passes out of scope.

The simplest way of using a PartitionConsumer is to loop over its Messages channel using a for/range loop. The PartitionConsumer will only stop itself in one case: when the offset being consumed is reported as out of range by the brokers. In this case you should decide what you want to do (try a different offset, notify a human, etc) and handle it appropriately. For all other error cases, it will just keep retrying. By default, it logs these errors to sarama.Logger; if you want to be notified directly of all errors, set your config's Consumer.Return.Errors to true and read from the Errors channel, using a select statement or a separate goroutine. Check out the Consumer examples to see implementations of these different approaches.

To terminate such a for/range loop while the loop is executing, call AsyncClose. This will kick off the process of consumer tear-down & return imediately. Continue to loop, servicing the Messages channel until the teardown process AsyncClose initiated closes it (thus terminating the for/range loop). If you've already ceased reading Messages, call Close; this will signal the PartitionConsumer's goroutines to begin shutting down (just like AsyncClose), but will also drain the Messages channel, harvest all errors & return them once cleanup has completed.

type PartitionError Uses

type PartitionError struct {
    Partition int32
    Err       KError
}

type PartitionMetadata Uses

type PartitionMetadata struct {
    Err             KError
    ID              int32
    Leader          int32
    Replicas        []int32
    Isr             []int32
    OfflineReplicas []int32
}

type PartitionOffsetManager Uses

type PartitionOffsetManager interface {
    // NextOffset returns the next offset that should be consumed for the managed
    // partition, accompanied by metadata which can be used to reconstruct the state
    // of the partition consumer when it resumes. NextOffset() will return
    // `config.Consumer.Offsets.Initial` and an empty metadata string if no offset
    // was committed for this partition yet.
    NextOffset() (int64, string)

    // MarkOffset marks the provided offset, alongside a metadata string
    // that represents the state of the partition consumer at that point in time. The
    // metadata string can be used by another consumer to restore that state, so it
    // can resume consumption.
    //
    // To follow upstream conventions, you are expected to mark the offset of the
    // next message to read, not the last message read. Thus, when calling `MarkOffset`
    // you should typically add one to the offset of the last consumed message.
    //
    // Note: calling MarkOffset does not necessarily commit the offset to the backend
    // store immediately for efficiency reasons, and it may never be committed if
    // your application crashes. This means that you may end up processing the same
    // message twice, and your processing should ideally be idempotent.
    MarkOffset(offset int64, metadata string)

    // ResetOffset resets to the provided offset, alongside a metadata string that
    // represents the state of the partition consumer at that point in time. Reset
    // acts as a counterpart to MarkOffset, the difference being that it allows to
    // reset an offset to an earlier or smaller value, where MarkOffset only
    // allows incrementing the offset. cf MarkOffset for more details.
    ResetOffset(offset int64, metadata string)

    // Errors returns a read channel of errors that occur during offset management, if
    // enabled. By default, errors are logged and not returned over this channel. If
    // you want to implement any custom error handling, set your config's
    // Consumer.Return.Errors setting to true, and read from this channel.
    Errors() <-chan *ConsumerError

    // AsyncClose initiates a shutdown of the PartitionOffsetManager. This method will
    // return immediately, after which you should wait until the 'errors' channel has
    // been drained and closed. It is required to call this function, or Close before
    // a consumer object passes out of scope, as it will otherwise leak memory. You
    // must call this before calling Close on the underlying client.
    AsyncClose()

    // Close stops the PartitionOffsetManager from managing offsets. It is required to
    // call this function (or AsyncClose) before a PartitionOffsetManager object
    // passes out of scope, as it will otherwise leak memory. You must call this
    // before calling Close on the underlying client.
    Close() error
}

PartitionOffsetManager uses Kafka to store and fetch consumed partition offsets. You MUST call Close() on a partition offset manager to avoid leaks, it will not be garbage-collected automatically when it passes out of scope.

type PartitionOffsetMetadata Uses

type PartitionOffsetMetadata struct {
    Partition int32
    Offset    int64
    Metadata  *string
}

type Partitioner Uses

type Partitioner interface {
    // Partition takes a message and partition count and chooses a partition
    Partition(message *ProducerMessage, numPartitions int32) (int32, error)

    // RequiresConsistency indicates to the user of the partitioner whether the
    // mapping of key->partition is consistent or not. Specifically, if a
    // partitioner requires consistency then it must be allowed to choose from all
    // partitions (even ones known to be unavailable), and its choice must be
    // respected by the caller. The obvious example is the HashPartitioner.
    RequiresConsistency() bool
}

Partitioner is anything that, given a Kafka message and a number of partitions indexed [0...numPartitions-1], decides to which partition to send the message. RandomPartitioner, RoundRobinPartitioner and HashPartitioner are provided as simple default implementations.

This example shows how to assign partitions to your messages manually.

Code:

config := NewConfig()

// First, we tell the producer that we are going to partition ourselves.
config.Producer.Partitioner = NewManualPartitioner

producer, err := NewSyncProducer([]string{"localhost:9092"}, config)
if err != nil {
    log.Fatal(err)
}
defer func() {
    if err := producer.Close(); err != nil {
        log.Println("Failed to close producer:", err)
    }
}()

// Now, we set the Partition field of the ProducerMessage struct.
msg := &ProducerMessage{Topic: "test", Partition: 6, Value: StringEncoder("test")}

partition, offset, err := producer.SendMessage(msg)
if err != nil {
    log.Fatalln("Failed to produce message to kafka cluster.")
}

if partition != 6 {
    log.Fatal("Message should have been produced to partition 6!")
}

log.Printf("Produced message to partition %d with offset %d", partition, offset)

By default, Sarama uses the message's key to consistently assign a partition to a message using hashing. If no key is set, a random partition will be chosen. This example shows how you can partition messages randomly, even when a key is set, by overriding Config.Producer.Partitioner.

Code:

config := NewConfig()
config.Producer.Partitioner = NewRandomPartitioner

producer, err := NewSyncProducer([]string{"localhost:9092"}, config)
if err != nil {
    log.Fatal(err)
}
defer func() {
    if err := producer.Close(); err != nil {
        log.Println("Failed to close producer:", err)
    }
}()

msg := &ProducerMessage{Topic: "test", Key: StringEncoder("key is set"), Value: StringEncoder("test")}
partition, offset, err := producer.SendMessage(msg)
if err != nil {
    log.Fatalln("Failed to produce message to kafka cluster.")
}

log.Printf("Produced message to partition %d with offset %d", partition, offset)

func NewHashPartitioner Uses

func NewHashPartitioner(topic string) Partitioner

NewHashPartitioner returns a Partitioner which behaves as follows. If the message's key is nil then a random partition is chosen. Otherwise the FNV-1a hash of the encoded bytes of the message key is used, modulus the number of partitions. This ensures that messages with the same key always end up on the same partition.

func NewManualPartitioner Uses

func NewManualPartitioner(topic string) Partitioner

NewManualPartitioner returns a Partitioner which uses the partition manually set in the provided ProducerMessage's Partition field as the partition to produce to.

func NewRandomPartitioner Uses

func NewRandomPartitioner(topic string) Partitioner

NewRandomPartitioner returns a Partitioner which chooses a random partition each time.

func NewReferenceHashPartitioner Uses

func NewReferenceHashPartitioner(topic string) Partitioner

NewReferenceHashPartitioner is like NewHashPartitioner except that it handles absolute values in the same way as the reference Java implementation. NewHashPartitioner was supposed to do that but it had a mistake and now there are people depending on both behaviours. This will all go away on the next major version bump.

func NewRoundRobinPartitioner Uses

func NewRoundRobinPartitioner(topic string) Partitioner

NewRoundRobinPartitioner returns a Partitioner which walks through the available partitions one at a time.

type PartitionerConstructor Uses

type PartitionerConstructor func(topic string) Partitioner

PartitionerConstructor is the type for a function capable of constructing new Partitioners.

func NewCustomHashPartitioner Uses

func NewCustomHashPartitioner(hasher func() hash.Hash32) PartitionerConstructor

NewCustomHashPartitioner is a wrapper around NewHashPartitioner, allowing the use of custom hasher. The argument is a function providing the instance, implementing the hash.Hash32 interface. This is to ensure that each partition dispatcher gets its own hasher, to avoid concurrency issues by sharing an instance.

func NewCustomPartitioner Uses

func NewCustomPartitioner(options ...HashPartitionerOption) PartitionerConstructor

NewCustomPartitioner creates a default Partitioner but lets you specify the behavior of each component via options

type ProduceRequest Uses

type ProduceRequest struct {
    TransactionalID *string
    RequiredAcks    RequiredAcks
    Timeout         int32
    Version         int16 // v1 requires Kafka 0.9, v2 requires Kafka 0.10, v3 requires Kafka 0.11
    // contains filtered or unexported fields
}

func (*ProduceRequest) AddBatch Uses

func (r *ProduceRequest) AddBatch(topic string, partition int32, batch *RecordBatch)

func (*ProduceRequest) AddMessage Uses

func (r *ProduceRequest) AddMessage(topic string, partition int32, msg *Message)

func (*ProduceRequest) AddSet Uses

func (r *ProduceRequest) AddSet(topic string, partition int32, set *MessageSet)

type ProduceResponse Uses

type ProduceResponse struct {
    Blocks       map[string]map[int32]*ProduceResponseBlock
    Version      int16
    ThrottleTime time.Duration // only provided if Version >= 1
}

func (*ProduceResponse) AddTopicPartition Uses

func (r *ProduceResponse) AddTopicPartition(topic string, partition int32, err KError)

func (*ProduceResponse) GetBlock Uses

func (r *ProduceResponse) GetBlock(topic string, partition int32) *ProduceResponseBlock

type ProduceResponseBlock Uses

type ProduceResponseBlock struct {
    Err    KError
    Offset int64
    // only provided if Version >= 2 and the broker is configured with `LogAppendTime`
    Timestamp time.Time
}

type ProducerError Uses

type ProducerError struct {
    Msg *ProducerMessage
    Err error
}

ProducerError is the type of error generated when the producer fails to deliver a message. It contains the original ProducerMessage as well as the actual error value.

func (ProducerError) Error Uses

func (pe ProducerError) Error() string

type ProducerErrors Uses

type ProducerErrors []*ProducerError

ProducerErrors is a type that wraps a batch of "ProducerError"s and implements the Error interface. It can be returned from the Producer's Close method to avoid the need to manually drain the Errors channel when closing a producer.

func (ProducerErrors) Error Uses

func (pe ProducerErrors) Error() string

type ProducerMessage Uses

type ProducerMessage struct {
    Topic string // The Kafka topic for this message.
    // The partitioning key for this message. Pre-existing Encoders include
    // StringEncoder and ByteEncoder.
    Key Encoder
    // The actual message to store in Kafka. Pre-existing Encoders include
    // StringEncoder and ByteEncoder.
    Value Encoder

    // The headers are key-value pairs that are transparently passed
    // by Kafka between producers and consumers.
    Headers []RecordHeader

    // This field is used to hold arbitrary data you wish to include so it
    // will be available when receiving on the Successes and Errors channels.
    // Sarama completely ignores this field and is only to be used for
    // pass-through data.
    Metadata interface{}

    // Offset is the offset of the message stored on the broker. This is only
    // guaranteed to be defined if the message was successfully delivered and
    // RequiredAcks is not NoResponse.
    Offset int64
    // Partition is the partition that the message was sent to. This is only
    // guaranteed to be defined if the message was successfully delivered.
    Partition int32
    // Timestamp is the timestamp assigned to the message by the broker. This
    // is only guaranteed to be defined if the message was successfully
    // delivered, RequiredAcks is not NoResponse, and the Kafka broker is at
    // least version 0.10.0.
    Timestamp time.Time
    // contains filtered or unexported fields
}

ProducerMessage is the collection of elements passed to the Producer in order to send a message.

type Record Uses

type Record struct {
    Attributes     int8
    TimestampDelta time.Duration
    OffsetDelta    int64
    Key            []byte
    Value          []byte
    Headers        []*RecordHeader
    // contains filtered or unexported fields
}

type RecordBatch Uses

type RecordBatch struct {
    FirstOffset           int64
    PartitionLeaderEpoch  int32
    Version               int8
    Codec                 CompressionCodec
    CompressionLevel      int
    Control               bool
    LastOffsetDelta       int32
    FirstTimestamp        time.Time
    MaxTimestamp          time.Time
    ProducerID            int64
    ProducerEpoch         int16
    FirstSequence         int32
    Records               []*Record
    PartialTrailingRecord bool
    // contains filtered or unexported fields
}

type RecordHeader Uses

type RecordHeader struct {
    Key   []byte
    Value []byte
}

type Records Uses

type Records struct {
    MsgSet      *MessageSet
    RecordBatch *RecordBatch
    // contains filtered or unexported fields
}

Records implements a union type containing either a RecordBatch or a legacy MessageSet.

type RequestNotifierFunc Uses

type RequestNotifierFunc func(bytesRead, bytesWritten int)

RequestNotifierFunc is invoked when a mock broker processes a request successfully and will provides the number of bytes read and written.

type RequestResponse Uses

type RequestResponse struct {
    Request  protocolBody
    Response encoder
}

RequestResponse represents a Request/Response pair processed by MockBroker.

type RequiredAcks Uses

type RequiredAcks int16

RequiredAcks is used in Produce Requests to tell the broker how many replica acknowledgements it must see before responding. Any of the constants defined here are valid. On broker versions prior to 0.8.2.0 any other positive int16 is also valid (the broker will wait for that many acknowledgements) but in 0.8.2.0 and later this will raise an exception (it has been replaced by setting the `min.isr` value in the brokers configuration).

const (
    // NoResponse doesn't send any response, the TCP ACK is all you get.
    NoResponse RequiredAcks = 0
    // WaitForLocal waits for only the local commit to succeed before responding.
    WaitForLocal RequiredAcks = 1
    // WaitForAll waits for all in-sync replicas to commit before responding.
    // The minimum number of in-sync replicas is configured on the broker via
    // the `min.insync.replicas` configuration key.
    WaitForAll RequiredAcks = -1
)

type Resource Uses

type Resource struct {
    ResourceType AclResourceType
    ResourceName string
}

type ResourceAcls Uses

type ResourceAcls struct {
    Resource
    Acls []*Acl
}

type ResourceResponse Uses

type ResourceResponse struct {
    ErrorCode int16
    ErrorMsg  string
    Type      ConfigResourceType
    Name      string
    Configs   []*ConfigEntry
}

type SaslHandshakeRequest Uses

type SaslHandshakeRequest struct {
    Mechanism string
}

type SaslHandshakeResponse Uses

type SaslHandshakeResponse struct {
    Err               KError
    EnabledMechanisms []string
}

type StdLogger Uses

type StdLogger interface {
    Print(v ...interface{})
    Printf(format string, v ...interface{})
    Println(v ...interface{})
}

StdLogger is used to log error messages.

var Logger StdLogger = log.New(ioutil.Discard, "[Sarama] ", log.LstdFlags)

Logger is the instance of a StdLogger interface that Sarama writes connection management events to. By default it is set to discard all log messages via ioutil.Discard, but you can set it to redirect wherever you want.

type StringEncoder Uses

type StringEncoder string

StringEncoder implements the Encoder interface for Go strings so that they can be used as the Key or Value in a ProducerMessage.

func (StringEncoder) Encode Uses

func (s StringEncoder) Encode() ([]byte, error)

func (StringEncoder) Length Uses

func (s StringEncoder) Length() int

type SyncGroupRequest Uses

type SyncGroupRequest struct {
    GroupId          string
    GenerationId     int32
    MemberId         string
    GroupAssignments map[string][]byte
}

func (*SyncGroupRequest) AddGroupAssignment Uses

func (r *SyncGroupRequest) AddGroupAssignment(memberId string, memberAssignment []byte)

func (*SyncGroupRequest) AddGroupAssignmentMember Uses

func (r *SyncGroupRequest) AddGroupAssignmentMember(memberId string, memberAssignment *ConsumerGroupMemberAssignment) error

type SyncGroupResponse Uses

type SyncGroupResponse struct {
    Err              KError
    MemberAssignment []byte
}

func (*SyncGroupResponse) GetMemberAssignment Uses

func (r *SyncGroupResponse) GetMemberAssignment() (*ConsumerGroupMemberAssignment, error)

type SyncProducer Uses

type SyncProducer interface {

    // SendMessage produces a given message, and returns only when it either has
    // succeeded or failed to produce. It will return the partition and the offset
    // of the produced message, or an error if the message failed to produce.
    SendMessage(msg *ProducerMessage) (partition int32, offset int64, err error)

    // SendMessages produces a given set of messages, and returns only when all
    // messages in the set have either succeeded or failed. Note that messages
    // can succeed and fail individually; if some succeed and some fail,
    // SendMessages will return an error.
    SendMessages(msgs []*ProducerMessage) error

    // Close shuts down the producer and waits for any buffered messages to be
    // flushed. You must call this function before a producer object passes out of
    // scope, as it may otherwise leak memory. You must call this before calling
    // Close on the underlying client.
    Close() error
}

SyncProducer publishes Kafka messages, blocking until they have been acknowledged. It routes messages to the correct broker, refreshing metadata as appropriate, and parses responses for errors. You must call Close() on a producer to avoid leaks, it may not be garbage-collected automatically when it passes out of scope.

The SyncProducer comes with two caveats: it will generally be less efficient than the AsyncProducer, and the actual durability guarantee provided when a message is acknowledged depend on the configured value of `Producer.RequiredAcks`. There are configurations where a message acknowledged by the SyncProducer can still sometimes be lost.

For implementation reasons, the SyncProducer requires `Producer.Return.Errors` and `Producer.Return.Successes` to be set to true in its configuration.

This example shows the basic usage pattern of the SyncProducer.

Code:

producer, err := NewSyncProducer([]string{"localhost:9092"}, nil)
if err != nil {
    log.Fatalln(err)
}
defer func() {
    if err := producer.Close(); err != nil {
        log.Fatalln(err)
    }
}()

msg := &ProducerMessage{Topic: "my_topic", Value: StringEncoder("testing 123")}
partition, offset, err := producer.SendMessage(msg)
if err != nil {
    log.Printf("FAILED to send message: %s\n", err)
} else {
    log.Printf("> message sent to partition %d at offset %d\n", partition, offset)
}

func NewSyncProducer Uses

func NewSyncProducer(addrs []string, config *Config) (SyncProducer, error)

NewSyncProducer creates a new SyncProducer using the given broker addresses and configuration.

func NewSyncProducerFromClient Uses

func NewSyncProducerFromClient(client Client) (SyncProducer, error)

NewSyncProducerFromClient creates a new SyncProducer using the given client. It is still necessary to call Close() on the underlying client when shutting down this producer.

type TestReporter Uses

type TestReporter interface {
    Error(...interface{})
    Errorf(string, ...interface{})
    Fatal(...interface{})
    Fatalf(string, ...interface{})
}

TestReporter has methods matching go's testing.T to avoid importing `testing` in the main part of the library.

type Timestamp Uses

type Timestamp struct {
    *time.Time
}

type TopicDetail Uses

type TopicDetail struct {
    NumPartitions     int32
    ReplicationFactor int16
    ReplicaAssignment map[int32][]int32
    ConfigEntries     map[string]*string
}

type TopicError Uses

type TopicError struct {
    Err    KError
    ErrMsg *string
}

type TopicMetadata Uses

type TopicMetadata struct {
    Err        KError
    Name       string
    IsInternal bool // Only valid for Version >= 1
    Partitions []*PartitionMetadata
}

type TopicPartition Uses

type TopicPartition struct {
    Count      int32
    Assignment [][]int32
}

type TopicPartitionError Uses

type TopicPartitionError struct {
    Err    KError
    ErrMsg *string
}

type TxnOffsetCommitRequest Uses

type TxnOffsetCommitRequest struct {
    TransactionalID string
    GroupID         string
    ProducerID      int64
    ProducerEpoch   int16
    Topics          map[string][]*PartitionOffsetMetadata
}

type TxnOffsetCommitResponse Uses

type TxnOffsetCommitResponse struct {
    ThrottleTime time.Duration
    Topics       map[string][]*PartitionError
}

Directories

PathSynopsis
examples/http_server
mocksPackage mocks provides mocks that can be used for testing applications that use Sarama.
tools/kafka-console-consumer
tools/kafka-console-partitionconsumer
tools/kafka-console-producer

Package sarama imports 30 packages (graph) and is imported by 948 packages. Updated 2018-09-07. Refresh now. Tools for package owners.