franz

package module
v0.0.0-...-c84241b Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 12, 2018 License: Apache-2.0 Imports: 21 Imported by: 1

README

GoDoc

Franz

go library that implements kafka protocol only; based heavily on the work in segmentio/kafka-go

Unlike other libraries that implement high level kafka readers. Franz is intended only to implement the core Kafka api protocol.

Segment

Thanks to Segment for creating kafka-go, the kafka library that much of this is based on.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// DefaultClientID is the default value used as ClientID of kafka
	// connections.
	DefaultClientID string
)
View Source
var DefaultDialer = &Dialer{
	Timeout:   10 * time.Second,
	DualStack: true,
}

DefaultDialer is the default dialer used when none is specified.

Functions

This section is empty.

Types

type Batch

type Batch struct {
	// contains filtered or unexported fields
}

A Batch is an iterator over a sequence of messages fetched from a kafka server.

Batches are created by calling (*Conn).ReadBatch. They hold a internal lock on the connection, which is released when the batch is closed. Failing to call a batch's Close method will likely result in a dead-lock when trying to use the connection.

Batches are safe to use concurrently from multiple goroutines.

func (*Batch) Close

func (batch *Batch) Close() error

Close closes the batch, releasing the connection lock and returning an error if reading the batch failed for any reason.

func (*Batch) HighWaterMark

func (batch *Batch) HighWaterMark() int64

Watermark returns the current highest watermark in a partition.

func (*Batch) Offset

func (batch *Batch) Offset() int64

Offset returns the offset of the next message in the batch.

func (*Batch) Read

func (batch *Batch) Read(b []byte) (int, error)

Read reads the value of the next message from the batch into b, returning the number of bytes read, or an error if the next message couldn't be read.

If an error is returned the batch cannot be used anymore and calling Read again will keep returning that error. All errors except io.EOF (indicating that the program consumed all messages from the batch) are also returned by Close.

The method fails with io.ErrShortBuffer if the buffer passed as argument is too small to hold the message value.

func (*Batch) ReadMessage

func (batch *Batch) ReadMessage() (Message, error)

ReadMessage reads and return the next message from the batch.

Because this method allocate memory buffers for the message key and value it is less memory-efficient than Read, but has the advantage of never failing with io.ErrShortBuffer.

func (*Batch) Throttle

func (batch *Batch) Throttle() time.Duration

Throttle gives the throttling duration applied by the kafka server on the connection.

type Broker

type Broker struct {
	Host string
	Port int
	ID   int
}

Broker carries the metadata associated with a kafka broker.

type Conn

type Conn struct {
	// contains filtered or unexported fields
}

Conn represents a connection to a kafka broker.

Instances of Conn are safe to use concurrently from multiple goroutines.

func Dial

func Dial(network string, address string) (*Conn, error)

Dial is a convenience wrapper for DefaultDialer.Dial.

func DialContext

func DialContext(ctx context.Context, network string, address string) (*Conn, error)

DialContext is a convenience wrapper for DefaultDialer.DialContext.

func DialLeader

func DialLeader(ctx context.Context, network string, address string, topic string, partition int) (*Conn, error)

DialLeader is a convenience wrapper for DefaultDialer.DialLeader.

func NewConn

func NewConn(conn net.Conn, topic string, partition int) *Conn

NewConn returns a new kafka connection for the given topic and partition.

func NewConnWith

func NewConnWith(conn net.Conn, config ConnConfig) *Conn

NewConnWith returns a new kafka connection configured with config.

func (*Conn) Close

func (c *Conn) Close() error

Close closes the kafka connection.

func (*Conn) DescribeGroupsV1

func (c *Conn) DescribeGroupsV1(request DescribeGroupsRequestV1) (DescribeGroupsResponseV1, error)

describeGroups retrieves the specified groups

See http://kafka.apache.org/protocol.html#The_Messages_DescribeGroups

func (*Conn) FindCoordinatorV1

func (c *Conn) FindCoordinatorV1(request FindCoordinatorRequestV1) (FindCoordinatorResponseV1, error)

findCoordinator finds the coordinator for the specified group or transaction

See http://kafka.apache.org/protocol.html#The_Messages_FindCoordinator

func (*Conn) ListGroupsV1

func (c *Conn) ListGroupsV1(request ListGroupsRequestV1) (ListGroupsResponseV1, error)

listGroups lists all the consumer groups

See http://kafka.apache.org/protocol.html#The_Messages_ListGroups

func (*Conn) ListOffsetsV1

func (c *Conn) ListOffsetsV1(request ListOffsetRequestV1) (ListOffsetResponseV1, error)

func (*Conn) LocalAddr

func (c *Conn) LocalAddr() net.Addr

LocalAddr returns the local network address.

func (*Conn) MetadataV0

func (c *Conn) MetadataV0(request MetadataRequestV0) (*MetadataResponseV0, error)

offsetFetch fetches the offsets for the specified topic partitions

See http://kafka.apache.org/protocol.html#The_Messages_OffsetFetch

func (*Conn) OffsetFetchV3

func (c *Conn) OffsetFetchV3(request OffsetFetchRequestV3) (OffsetFetchResponseV3, error)

offsetFetch fetches the offsets for the specified topic partitions

See http://kafka.apache.org/protocol.html#The_Messages_OffsetFetch

func (*Conn) RemoteAddr

func (c *Conn) RemoteAddr() net.Addr

RemoteAddr returns the remote network address.

type ConnConfig

type ConnConfig struct {
	ClientID  string
	Topic     string
	Partition int
}

ConnConfig is a configuration object used to create new instances of Conn.

type DescribeGroupsRequestV1

type DescribeGroupsRequestV1 struct {
	// List of groupIds to request metadata for (an empty groupId array
	// will return empty group metadata).
	GroupIDs []string
}

See http://kafka.apache.org/protocol.html#The_Messages_DescribeGroups

type DescribeGroupsResponseV1

type DescribeGroupsResponseV1 struct {
	// Duration in milliseconds for which the request was throttled due
	// to quota violation (Zero if the request did not violate any quota)
	ThrottleTimeMS int32

	// Groups holds selected group information
	Groups []DescribeGroupsResponseV1Group
}

type DescribeGroupsResponseV1Group

type DescribeGroupsResponseV1Group struct {
	// ErrorCode holds response error code
	ErrorCode int16

	// GroupID holds the unique group identifier
	GroupID string

	// State holds current state of the group (one of: Dead, Stable, AwaitingSync,
	// PreparingRebalance, or empty if there is no active group)
	State string

	// ProtocolType holds the current group protocol type (will be empty if there is
	// no active group)
	ProtocolType string

	// Protocol holds the current group protocol (only provided if the group is Stable)
	Protocol string

	// Members contains the current group members (only provided if the group is not Dead)
	Members []DescribeGroupsResponseV1Member
}

type DescribeGroupsResponseV1Member

type DescribeGroupsResponseV1Member struct {
	// MemberID assigned by the group coordinator
	MemberID string

	// ClientID used in the member's latest join group request
	ClientID string

	// ClientHost used in the request session corresponding to the member's
	// join group.
	ClientHost string

	// MemberMetadata the metadata corresponding to the current group protocol
	// in use (will only be present if the group is stable).
	MemberMetadata []byte

	// MemberAssignments provided by the group leader (will only be present if
	// the group is stable).
	//
	// See consumer groups section of https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
	MemberAssignments []byte
}

type Dialer

type Dialer struct {
	// Unique identifier for client connections established by this Dialer.
	ClientID string

	// Timeout is the maximum amount of time a dial will wait for a connect to
	// complete. If Deadline is also set, it may fail earlier.
	//
	// The default is no timeout.
	//
	// When dialing a name with multiple IP addresses, the timeout may be
	// divided between them.
	//
	// With or without a timeout, the operating system may impose its own
	// earlier timeout. For instance, TCP timeouts are often around 3 minutes.
	Timeout time.Duration

	// Deadline is the absolute point in time after which dials will fail.
	// If Timeout is set, it may fail earlier.
	// Zero means no deadline, or dependent on the operating system as with the
	// Timeout option.
	Deadline time.Time

	// LocalAddr is the local address to use when dialing an address.
	// The address must be of a compatible type for the network being dialed.
	// If nil, a local address is automatically chosen.
	LocalAddr net.Addr

	// DualStack enables RFC 6555-compliant "Happy Eyeballs" dialing when the
	// network is "tcp" and the destination is a host name with both IPv4 and
	// IPv6 addresses. This allows a client to tolerate networks where one
	// address family is silently broken.
	DualStack bool

	// FallbackDelay specifies the length of time to wait before spawning a
	// fallback connection, when DualStack is enabled.
	// If zero, a default delay of 300ms is used.
	FallbackDelay time.Duration

	// KeepAlive specifies the keep-alive period for an active network
	// connection.
	// If zero, keep-alives are not enabled. Network protocols that do not
	// support keep-alives ignore this field.
	KeepAlive time.Duration

	// Resolver optionally specifies an alternate resolver to use.
	Resolver Resolver

	// TLS enables Dialer to open secure connections.  If nil, standard net.Conn
	// will be used.
	TLS *tls.Config
}

The Dialer type mirrors the net.Dialer API but is designed to open kafka connections instead of raw network connections.

func (*Dialer) Dial

func (d *Dialer) Dial(network string, address string) (*Conn, error)

Dial connects to the address on the named network.

func (*Dialer) DialContext

func (d *Dialer) DialContext(ctx context.Context, network string, address string) (*Conn, error)

DialContext connects to the address on the named network using the provided context.

The provided Context must be non-nil. If the context expires before the connection is complete, an error is returned. Once successfully connected, any expiration of the context will not affect the connection.

When using TCP, and the host in the address parameter resolves to multiple network addresses, any dial timeout (from d.Timeout or ctx) is spread over each consecutive dial, such that each is given an appropriate fraction of the time to connect. For example, if a host has 4 IP addresses and the timeout is 1 minute, the connect to each single address will be given 15 seconds to complete before trying the next one.

func (*Dialer) DialLeader

func (d *Dialer) DialLeader(ctx context.Context, network string, address string, topic string, partition int) (*Conn, error)

DialLeader opens a connection to the leader of the partition for a given topic.

The address given to the DialContext method may not be the one that the connection will end up being established to, because the dialer will lookup the partition leader for the topic and return a connection to that server. The original address is only used as a mechanism to discover the configuration of the kafka cluster that we're connecting to.

func (*Dialer) LookupLeader

func (d *Dialer) LookupLeader(ctx context.Context, network string, address string, topic string, partition int) (Broker, error)

LookupLeader searches for the kafka broker that is the leader of the partition for a given topic, returning a Broker value representing it.

func (*Dialer) LookupPartitions

func (d *Dialer) LookupPartitions(ctx context.Context, network string, address string, topic string) ([]Partition, error)

LookupPartitions returns the list of partitions that exist for the given topic.

type DurationStats

type DurationStats struct {
	Avg time.Duration `metric:"avg" type:"gauge"`
	Min time.Duration `metric:"min" type:"gauge"`
	Max time.Duration `metric:"max" type:"gauge"`
}

DurationStats is a data structure that carries a summary of observed duration values. The average, minimum, and maximum are reported.

type Error

type Error int

Error represents the different error codes that may be returned by kafka.

const (
	Unknown                            Error = -1
	OffsetOutOfRange                   Error = 1
	InvalidMessage                     Error = 2
	UnknownTopicOrPartition            Error = 3
	InvalidMessageSize                 Error = 4
	LeaderNotAvailable                 Error = 5
	NotLeaderForPartition              Error = 6
	RequestTimedOut                    Error = 7
	BrokerNotAvailable                 Error = 8
	ReplicaNotAvailable                Error = 9
	MessageSizeTooLarge                Error = 10
	StaleControllerEpoch               Error = 11
	OffsetMetadataTooLarge             Error = 12
	GroupLoadInProgress                Error = 14
	GroupCoordinatorNotAvailable       Error = 15
	NotCoordinatorForGroup             Error = 16
	InvalidTopic                       Error = 17
	RecordListTooLarge                 Error = 18
	NotEnoughReplicas                  Error = 19
	NotEnoughReplicasAfterAppend       Error = 20
	InvalidRequiredAcks                Error = 21
	IllegalGeneration                  Error = 22
	InconsistentGroupProtocol          Error = 23
	InvalidGroupId                     Error = 24
	UnknownMemberId                    Error = 25
	InvalidSessionTimeout              Error = 26
	RebalanceInProgress                Error = 27
	InvalidCommitOffsetSize            Error = 28
	TopicAuthorizationFailed           Error = 29
	GroupAuthorizationFailed           Error = 30
	ClusterAuthorizationFailed         Error = 31
	InvalidTimestamp                   Error = 32
	UnsupportedSASLMechanism           Error = 33
	IllegalSASLState                   Error = 34
	UnsupportedVersion                 Error = 35
	TopicAlreadyExists                 Error = 36
	InvalidPartitionNumber             Error = 37
	InvalidReplicationFactor           Error = 38
	InvalidReplicaAssignment           Error = 39
	InvalidConfiguration               Error = 40
	NotController                      Error = 41
	InvalidRequest                     Error = 42
	UnsupportedForMessageFormat        Error = 43
	PolicyViolation                    Error = 44
	OutOfOrderSequenceNumber           Error = 45
	DuplicateSequenceNumber            Error = 46
	InvalidProducerEpoch               Error = 47
	InvalidTransactionState            Error = 48
	InvalidProducerIDMapping           Error = 49
	InvalidTransactionTimeout          Error = 50
	ConcurrentTransactions             Error = 51
	TransactionCoordinatorFenced       Error = 52
	TransactionalIDAuthorizationFailed Error = 53
	SecurityDisabled                   Error = 54
	BrokerAuthorizationFailed          Error = 55
)

func (Error) Description

func (e Error) Description() string

Description returns a human readable description of cause of the error.

func (Error) Error

func (e Error) Error() string

Error satisfies the error interface.

func (Error) Temporary

func (e Error) Temporary() bool

Temporary returns true if the operation that generated the error may succeed if retried at a later time.

func (Error) Timeout

func (e Error) Timeout() bool

Timeout returns true if the error was due to a timeout.

func (Error) Title

func (e Error) Title() string

Title returns a human readable title for the error.

type FindCoordinatorRequestV1

type FindCoordinatorRequestV1 struct {
	// CoordinatorKey holds id to use for finding the coordinator (for groups, this is
	// the groupId, for transactional producers, this is the transactional id)
	CoordinatorKey string

	// CoordinatorType indicates type of coordinator to find (0 = group, 1 = transaction)
	CoordinatorType int8
}

FindCoordinatorRequestV1 requests the coordinator for the specified group or transaction

See http://kafka.apache.org/protocol.html#The_Messages_FindCoordinator

type FindCoordinatorResponseV1

type FindCoordinatorResponseV1 struct {
	// ThrottleTimeMS holds the duration in milliseconds for which the request
	// was throttled due to quota violation (Zero if the request did not violate
	// any quota)
	ThrottleTimeMS int32

	// ErrorCode holds response error code
	ErrorCode int16

	// ErrorMessage holds response error message
	ErrorMessage string

	// Coordinator holds host and port information for the coordinator
	Coordinator FindCoordinatorResponseV1Coordinator
}

type FindCoordinatorResponseV1Coordinator

type FindCoordinatorResponseV1Coordinator struct {
	// NodeID holds the broker id.
	NodeID int32

	// Host of the broker
	Host string

	// Port on which broker accepts requests
	Port int32
}

type ListGroupsRequestV1

type ListGroupsRequestV1 struct {
}

type ListGroupsResponseV1

type ListGroupsResponseV1 struct {
	// ThrottleTimeMS holds the duration in milliseconds for which the request
	// was throttled due to quota violation (Zero if the request did not violate
	// any quota)
	ThrottleTimeMS int32

	// ErrorCode holds response error code
	ErrorCode int16
	Groups    []ListGroupsResponseV1Group
}

type ListGroupsResponseV1Group

type ListGroupsResponseV1Group struct {
	// GroupID holds the unique group identifier
	GroupID      string
	ProtocolType string
}

type ListOffsetRequestV1

type ListOffsetRequestV1 struct {
	ReplicaID int32
	Topics    []ListOffsetRequestV1Topic
}

type ListOffsetRequestV1Partition

type ListOffsetRequestV1Partition struct {
	Partition int32

	// Time is used to ask for all messages before a certain time (ms).
	//
	// There are two special values. Specify -1 to receive the latest offset (i.e.
	// the offset of the next coming message) and -2 to receive the earliest
	// available offset. This applies to all versions of the API. Note that because
	// offsets are pulled in descending order, asking for the earliest offset will
	// always return you a single element.
	Time int64
}

type ListOffsetRequestV1Topic

type ListOffsetRequestV1Topic struct {
	TopicName  string
	Partitions []ListOffsetRequestV1Partition
}

type ListOffsetResponseV1

type ListOffsetResponseV1 struct {
	Responses []ListOffsetResponseV1Response
}

type ListOffsetResponseV1Partition

type ListOffsetResponseV1Partition struct {
	Partition int32
	ErrorCode int16
	Timestamp int64
	Offset    int64
}

type ListOffsetResponseV1Response

type ListOffsetResponseV1Response struct {
	Topic              string
	PartitionResponses []ListOffsetResponseV1Partition
}

type Message

type Message struct {
	// Topic is reads only and MUST NOT be set when writing messages
	Topic string

	// Partition is reads only and MUST NOT be set when writing messages
	Partition int
	Offset    int64
	Key       []byte
	Value     []byte
	Time      time.Time
}

Message is a data structure representing kafka messages.

type MetadataRequestV0

type MetadataRequestV0 []string

type MetadataResponseV0

type MetadataResponseV0 struct {
	Brokers []*MetadataResponseV0Broker
	Topics  []*MetadataResponseV0Topic
}

func (*MetadataResponseV0) Free

func (t *MetadataResponseV0) Free()

type MetadataResponseV0Broker

type MetadataResponseV0Broker struct {
	NodeID int32
	Host   string
	Port   int32
}

func (*MetadataResponseV0Broker) Free

func (t *MetadataResponseV0Broker) Free()

type MetadataResponseV0Partition

type MetadataResponseV0Partition struct {
	PartitionErrorCode int16
	PartitionID        int32
	Leader             int32
	Replicas           []int32
	Isr                []int32
}

type MetadataResponseV0Topic

type MetadataResponseV0Topic struct {
	TopicErrorCode int16
	TopicName      string
	Partitions     []MetadataResponseV0Partition
}

func (*MetadataResponseV0Topic) Free

func (t *MetadataResponseV0Topic) Free()

type OffsetFetchRequestV3

type OffsetFetchRequestV3 struct {
	// GroupID holds the unique group identifier
	GroupID string

	// Topics to fetch offsets.
	Topics []OffsetFetchRequestV3Topic
}

type OffsetFetchRequestV3Topic

type OffsetFetchRequestV3Topic struct {
	// Topic name
	Topic string

	// Partitions to fetch offsets
	Partitions []int32
}

type OffsetFetchResponseV3

type OffsetFetchResponseV3 struct {
	// ThrottleTimeMS holds the duration in milliseconds for which the request
	// was throttled due to quota violation (Zero if the request did not violate
	// any quota)
	ThrottleTimeMS int32

	// Responses holds topic partition offsets
	Responses []OffsetFetchResponseV3Response

	// ErrorCode holds response error code
	ErrorCode int16
}

type OffsetFetchResponseV3PartitionResponse

type OffsetFetchResponseV3PartitionResponse struct {
	// Partition ID
	Partition int32

	// Offset of last committed message
	Offset int64

	// Metadata client wants to keep
	Metadata string

	// ErrorCode holds response error code
	ErrorCode int16
}

type OffsetFetchResponseV3Response

type OffsetFetchResponseV3Response struct {
	// Topic name
	Topic string

	// PartitionResponses holds offsets by partition
	PartitionResponses []OffsetFetchResponseV3PartitionResponse
}

type Partition

type Partition struct {
	Topic    string
	Leader   Broker
	Replicas []Broker
	Isr      []Broker
	ID       int
}

Partition carries the metadata associated with a kafka partition.

type Resolver

type Resolver interface {
	// LookupHost looks up the given host using the local resolver.
	// It returns a slice of that host's addresses.
	LookupHost(ctx context.Context, host string) (addrs []string, err error)
}

The Resolver interface is used as an abstraction to provide service discovery of the hosts of a kafka cluster.

type SummaryStats

type SummaryStats struct {
	Avg int64 `metric:"avg" type:"gauge"`
	Min int64 `metric:"min" type:"gauge"`
	Max int64 `metric:"max" type:"gauge"`
}

SummaryStats is a data structure that carries a summary of observed values. The average, minimum, and maximum are reported.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL