marshal

package
v0.0.0-...-80cc6d1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 29, 2018 License: BSD-3-Clause Imports: 16 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// MarshalTopic is the main topic used for coordination. This must be constant across all
	// consumers that you want to coordinate.
	MarshalTopic = "__marshal"

	// HeartbeatInterval is the main timing used to determine how "chatty" the system is and how
	// fast it responds to failures of consumers. THIS VALUE MUST BE THE SAME BETWEEN ALL CONSUMERS
	// as it is critical to coordination.
	HeartbeatInterval = 60 // Measured in seconds.
)

Variables

This section is empty.

Functions

func SetLogger

func SetLogger(l *logging.Logger)

SetLogger can be called with a logging.Logger in order to overwrite our internal logger. Useful if you need to control the logging (such as in tests).

Types

type CommitToken

type CommitToken struct {
	// contains filtered or unexported fields
}

CommitToken is a minimal structure that contains only the information necessary to mark a message committed. This is done so that you can throw away the message instead of holding on to it in memory.

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer allows you to safely consume data from a given topic in such a way that you don't need to worry about partitions and can safely split the load across as many processes as might be consuming from this topic. However, you should ONLY create one Consumer per topic in your application!

func (*Consumer) Commit

func (c *Consumer) Commit(msg *Message) error

Commit is called when you've finished processing a message. This operation marks the offset as committed internally and is suitable for at-least-once processing because we do not immediately write the offsets to storage. We will flush the offsets periodically (based on the heartbeat interval).

func (*Consumer) CommitByToken

func (c *Consumer) CommitByToken(token CommitToken) error

CommitByToken is called when you've finished processing a message. In the at-least-once consumption case, this will allow the "last processed offset" to move forward so that we can never see this message again. This particular method is used when you've only got a CommitToken to commit from.

func (*Consumer) ConsumeChannel

func (c *Consumer) ConsumeChannel() <-chan *Message

ConsumeChannel returns a read-only channel. Messages that are retrieved from Kafka will be made available in this channel.

func (*Consumer) Flush

func (c *Consumer) Flush() error

Flush will cause us to upate all of the committed offsets. This operation can be performed to periodically sync offsets without waiting on the internal flushing mechanism.

func (*Consumer) GetCurrentLag

func (c *Consumer) GetCurrentLag() int64

GetCurrentLag returns the number of messages that this consumer is lagging by. Note that this value can be unstable in the beginning of a run, as we might not have claimed all of partitions we will end up claiming, or we might have overclaimed and need to back off. Ideally this will settle towards 0. If it continues to rise, that implies there isn't enough consumer capacity.

func (*Consumer) GetCurrentLoad

func (c *Consumer) GetCurrentLoad() int

GetCurrentLoad returns a number representing the "load" of this consumer. Think of this like a load average in Unix systems: the numbers are kind of related to how much work the system is doing, but by itself they don't tell you much.

func (*Consumer) GetCurrentTopicClaims

func (c *Consumer) GetCurrentTopicClaims() (map[string]bool, error)

GetCurrentTopicClaims returns the topics that are currently claimed by this consumer. It should be relevent only when ClaimEntireTopic is set

func (*Consumer) PrintState

func (c *Consumer) PrintState()

PrintState outputs the status of the consumer.

func (*Consumer) Terminate

func (c *Consumer) Terminate(release bool) bool

Terminate instructs the consumer to clean up and allow other consumers to begin consuming. (If you do not call this method before exiting, things will still work, but more slowly.)

func (*Consumer) Terminated

func (c *Consumer) Terminated() bool

Terminated returns whether or not this consumer has been terminated.

func (*Consumer) TopicClaims

func (c *Consumer) TopicClaims() <-chan map[string]bool

TopicClaims returns a read-only channel that receives updates for topic claims. It's only relevant when CLaimEntireTopic is set

type ConsumerOptions

type ConsumerOptions struct {
	// FastReclaim instructs the consumer to attempt to reclaim any partitions
	// that are presently claimed by the ClientID/GroupID we have. This is useful
	// for situations where your ClientID is predictable/stable and you want to
	// minimize churn during restarts. This is dangerous if you have two copies
	// of your application running with the same ClientID/GroupID.
	// TODO: Create an instance ID for Marshaler such that we can detect when
	// someone else has decided to use our Client/Group.
	//
	// Note that this option ignores MaximumClaims, so it is possible to
	// exceed the claim limit if the ClientID previously held more claims.
	FastReclaim bool

	// ClaimEntireTopic makes Marshal handle claims on the entire topic rather than
	// on a per-partition basis. This is used with sharded produce/consume setups.
	// Defaults to false.
	ClaimEntireTopic bool

	// GreedyClaims indicates whether we should attempt to claim all unclaimed
	// partitions on start. This is appropriate in low QPS type environments.
	// Defaults to false/off.
	GreedyClaims bool

	// ReleaseClaimsIfBehind indicates whether to release a claim if a consumer
	// is consuming at a rate slower than the partition is being produced to.
	ReleaseClaimsIfBehind bool

	// The maximum number of claims this Consumer is allowed to hold simultaneously.
	// MaximumClaims indicates the maximum number of partitions to be claimed when
	// ClaimEntireTopic is set to false. Otherwise, it indicates the maximum number
	// of topics to claim.
	// Set to 0 (default) to allow an unlimited number of claims.
	//
	// Using this option will leave some partitions/topics completely unclaimed
	// if the number of Consumers in this GroupID falls below the number of
	// partitions/topics that exist.
	//
	// Note this limit does not apply to claims made via FastReclaim.
	MaximumClaims int
}

ConsumerOptions represents all of the options that a consumer can be configured with.

func NewConsumerOptions

func NewConsumerOptions() ConsumerOptions

NewConsumerOptions returns a default set of options for the Consumer.

type KafkaCluster

type KafkaCluster struct {
	// contains filtered or unexported fields
}

KafkaCluster is a user-agnostic view of the world. It connects to a Kafka cluster and runs rationalizers to observe the complete world state.

func Dial

func Dial(name string, brokers []string, options MarshalOptions) (*KafkaCluster, error)

Dial returns a new cluster object which can be used to instantiate a number of Marshalers that all use the same cluster. You may pass brokerConf or may set it to nil.

func (*KafkaCluster) IsGroupPaused

func (c *KafkaCluster) IsGroupPaused(groupID string) bool

IsGroupPaused returns true if the given consumer group is paused. TODO(pihu) This just checks the expiry time, and not the admin ID.

func (*KafkaCluster) NewMarshaler

func (c *KafkaCluster) NewMarshaler(clientID, groupID string) (*Marshaler, error)

NewMarshaler creates a Marshaler off of an existing cluster. This is more efficient if you're creating multiple instances, since they can share the same underlying cluster.

func (*KafkaCluster) Terminate

func (c *KafkaCluster) Terminate()

Terminate is called when we're done with the marshaler and want to shut down.

func (*KafkaCluster) Terminated

func (c *KafkaCluster) Terminated() bool

Terminated returns whether or not we have been terminated.

type MarshalOptions

type MarshalOptions struct {
	// BrokerConnectionLimit is used to set the maximum simultaneous number of connections
	// that can be made to each broker.
	// Default: 30.
	BrokerConnectionLimit int

	// ConsumeRequestTimeout sets the time that we ask Kafka to wait before returning any
	// data to us. Setting this high uses more connections and can lead to some latency
	// but keeps the load on Kafka minimal. Use this to balance QPS against latency.
	//
	// Default: 1 millisecond.
	ConsumeRequestTimeout time.Duration

	// MarshalRequestTimeout is used for our coordination requests. This should be reasonable
	// at default, but is left as a tunable in case you have clients that are claiming an
	// extremely large number of partitions and are too slow. The overall Marshal latency
	// is impacted by this value as well as the MarshalRequestRetryWait below.
	//
	// Default: 1 millisecond.
	MarshalRequestTimeout time.Duration

	// MarshalRequestRetryWait is the time between consume requests Marshal generates. This
	// should be set to balance the above timeouts to prevent hammering the server.
	//
	// Default: 500 milliseconds.
	MarshalRequestRetryWait time.Duration

	// MaxMessageSize is the maximum size in bytes of messages that can be returned. This
	// must be set to the size of the largest messages your cluster is allowed to store,
	// else you will end up with stalled streams. I.e., Kafka will never send you a message
	// if the message is larger than this value but we can't detect that, we just think
	// there is no data.
	//
	// Default: 2,000,000 bytes.
	MaxMessageSize int32

	// MaxMessageQueue is the number of messages to retrieve from Kafka and store in-memory
	// waiting for consumption. This is per-Consumer and independent of message size so you
	// should adjust this for your consumption patterns.
	//
	// Default: 1000 messages.
	MaxMessageQueue int
}

MarshalOptions contains various tunables that can be used to adjust the configuration of the underlying system.

func NewMarshalOptions

func NewMarshalOptions() MarshalOptions

NewMarshalOptions returns a set of MarshalOptions populated with defaults.

type Marshaler

type Marshaler struct {
	// contains filtered or unexported fields
}

Marshaler is the coordinator type. It is designed to be used once per (client, group) and is thread safe. Creating one of these will create connections to your Kafka cluster and begin actively monitoring the coordination topic.

func NewMarshaler

func NewMarshaler(clientID, groupID string, brokers []string) (*Marshaler, error)

NewMarshaler connects to a cluster (given broker addresses) and prepares to handle marshalling requests. Given the way this system works, the marshaler has to process all messages in the topic before it's safely able to begin operating. This might take a while. NOTE: If you are creating multiple marshalers in your program, you should instead call Dial and then use the NewMarshaler method on that object.

func (*Marshaler) ClaimPartition

func (m *Marshaler) ClaimPartition(topicName string, partID int) bool

ClaimPartition is how you can actually claim a partition. If you call this, Marshal will attempt to claim the partition on your behalf. This is the low level function, you probably want to use a MarshaledConsumer. Returns a bool on whether or not the claim succeeded and whether you can continue.

func (*Marshaler) Claimed

func (m *Marshaler) Claimed(topicName string, partID int) bool

Claimed returns the current status on whether or not a partition is claimed by any other consumer in our group (including ourselves). A topic/partition that does not exist is considered to be unclaimed.

func (*Marshaler) ClientID

func (m *Marshaler) ClientID() string

ClientID returns the client ID we're using

func (*Marshaler) CommitOffsets

func (m *Marshaler) CommitOffsets(topicName string, partID int, offset int64) error

CommitOffsets will commit the partition offsets to Kafka so it's available in the long-term storage of the offset coordination system. Note: this method does not ensure that this Marshal instance owns the topic/partition in question.

func (*Marshaler) GetLastPartitionClaim

func (m *Marshaler) GetLastPartitionClaim(topicName string, partID int) PartitionClaim

GetLastPartitionClaim returns a PartitionClaim structure for a given partition. The structure describes the consumer that is currently or most recently claiming this partition. This is a copy of the claim structure, so changing it cannot change the world state.

func (*Marshaler) GetPartitionClaim

func (m *Marshaler) GetPartitionClaim(topicName string, partID int) PartitionClaim

GetPartitionClaim returns a PartitionClaim structure for a given partition. The structure describes the consumer that is currently claiming this partition. This is a copy of the claim structure, so changing it cannot change the world state.

func (*Marshaler) GetPartitionOffsets

func (m *Marshaler) GetPartitionOffsets(topicName string, partID int) (PartitionOffsets, error)

GetPartitionOffsets returns the current state of a topic/partition. This has to hit Kafka thrice to ask about a partition, but it returns the full state of information that can be used to calculate consumer lag.

func (*Marshaler) GroupID

func (m *Marshaler) GroupID() string

GroupID returns the group ID we're using

func (*Marshaler) Heartbeat

func (m *Marshaler) Heartbeat(topicName string, partID int, offset int64) error

Heartbeat will send an update for other people to know that we're still alive and still owning this partition. Returns an error if anything has gone wrong (at which point we can no longer assert we have the lock).

func (*Marshaler) NewConsumer

func (m *Marshaler) NewConsumer(topicNames []string, options ConsumerOptions) (*Consumer, error)

NewConsumer instantiates a consumer object for a given topic. You must create a separate consumer for every individual topic that you want to consume from. Please see the documentation on ConsumerBehavior.

func (*Marshaler) Partitions

func (m *Marshaler) Partitions(topicName string) int

Partitions returns the count of how many partitions are in a given topic. Returns 0 if a topic is unknown.

func (*Marshaler) PrintState

func (m *Marshaler) PrintState()

PrintState will take the current state of the Marshal world and print it verbosely to the logging output. This is used in the rare case where we're self-terminating or on request from the user.

func (*Marshaler) ReleasePartition

func (m *Marshaler) ReleasePartition(topicName string, partID int, offset int64) error

ReleasePartition will send an update for other people to know that we're done with a partition. Returns an error if anything has gone wrong (at which point we can no longer assert we have the lock).

func (*Marshaler) Terminate

func (m *Marshaler) Terminate()

Terminate is called when we're done with the marshaler and want to shut down.

func (*Marshaler) Terminated

func (m *Marshaler) Terminated() bool

Terminated returns whether or not we have been terminated.

func (*Marshaler) Topics

func (m *Marshaler) Topics() []string

Topics returns the list of known topics.

type Message

type Message proto.Message

Message is a container for Kafka messages.

func (*Message) CommitToken

func (m *Message) CommitToken() CommitToken

CommitToken returns a CommitToken for a message. This can be passed to the CommitByToken method.

type PartitionClaim

type PartitionClaim struct {
	InstanceID    string
	ClientID      string
	GroupID       string
	LastRelease   int64
	LastHeartbeat int64
	CurrentOffset int64
	// contains filtered or unexported fields
}

PartitionClaim contains claim information about a given partition.

func (*PartitionClaim) Claimed

func (p *PartitionClaim) Claimed() bool

Claimed returns whether or not the PartitionClaim indicates a valid (as of this invocation) claim.

type PartitionOffsets

type PartitionOffsets struct {
	Current   int64
	Earliest  int64
	Latest    int64
	Committed int64
}

PartitionOffsets is a record of offsets for a given partition. Contains information combined from Kafka and our current state.

A Kafka partition consists of N messages with offsets. In the basic case, you can think of an offset like an array index. With log compaction and other trickery it acts more like a sparse array, but it's a close enough metaphor.

We keep track of four values for offsets:

 offsets       1     2     3     7     9    10    11
partition  [ msg1, msg2, msg3, msg4, msg5, msg6, msg7, ... ]
              ^                  ^                      ^
              \- Earliest        |                      |
                                 \- Current          Latest

In this example, Earliest is 1 which is the "oldest" offset within the partition. At any given time this offset might become invalid if a log rolls so we might update it.

Current is 7, which is the offset of the NEXT message i.e. this message has not been consumed yet.

Latest is 12, which is the offset that Kafka will assign to the message that next gets committed to the partition. This offset does not yet exist, and might never.

Committed is the value recorded in Kafka's committed offsets system.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL