Documentation ¶
Index ¶
- Constants
- func SetLogger(l *logging.Logger)
- type CommitToken
- type Consumer
- func (c *Consumer) Commit(msg *Message) error
- func (c *Consumer) CommitByToken(token CommitToken) error
- func (c *Consumer) ConsumeChannel() <-chan *Message
- func (c *Consumer) Flush() error
- func (c *Consumer) GetCurrentLag() int64
- func (c *Consumer) GetCurrentLoad() int
- func (c *Consumer) GetCurrentTopicClaims() (map[string]bool, error)
- func (c *Consumer) PrintState()
- func (c *Consumer) Terminate(release bool) bool
- func (c *Consumer) Terminated() bool
- func (c *Consumer) TopicClaims() <-chan map[string]bool
- type ConsumerOptions
- type KafkaCluster
- type MarshalOptions
- type Marshaler
- func (m *Marshaler) ClaimPartition(topicName string, partID int) bool
- func (m *Marshaler) Claimed(topicName string, partID int) bool
- func (m *Marshaler) ClientID() string
- func (m *Marshaler) CommitOffsets(topicName string, partID int, offset int64) error
- func (m *Marshaler) GetLastPartitionClaim(topicName string, partID int) PartitionClaim
- func (m *Marshaler) GetPartitionClaim(topicName string, partID int) PartitionClaim
- func (m *Marshaler) GetPartitionOffsets(topicName string, partID int) (PartitionOffsets, error)
- func (m *Marshaler) GroupID() string
- func (m *Marshaler) Heartbeat(topicName string, partID int, offset int64) error
- func (m *Marshaler) NewConsumer(topicNames []string, options ConsumerOptions) (*Consumer, error)
- func (m *Marshaler) Partitions(topicName string) int
- func (m *Marshaler) PrintState()
- func (m *Marshaler) ReleasePartition(topicName string, partID int, offset int64) error
- func (m *Marshaler) Terminate()
- func (m *Marshaler) Terminated() bool
- func (m *Marshaler) Topics() []string
- type Message
- type PartitionClaim
- type PartitionOffsets
Constants ¶
const ( // MarshalTopic is the main topic used for coordination. This must be constant across all // consumers that you want to coordinate. MarshalTopic = "__marshal" // HeartbeatInterval is the main timing used to determine how "chatty" the system is and how // fast it responds to failures of consumers. THIS VALUE MUST BE THE SAME BETWEEN ALL CONSUMERS // as it is critical to coordination. HeartbeatInterval = 60 // Measured in seconds. )
Variables ¶
This section is empty.
Functions ¶
Types ¶
type CommitToken ¶
type CommitToken struct {
// contains filtered or unexported fields
}
CommitToken is a minimal structure that contains only the information necessary to mark a message committed. This is done so that you can throw away the message instead of holding on to it in memory.
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
Consumer allows you to safely consume data from a given topic in such a way that you don't need to worry about partitions and can safely split the load across as many processes as might be consuming from this topic. However, you should ONLY create one Consumer per topic in your application!
func (*Consumer) Commit ¶
Commit is called when you've finished processing a message. This operation marks the offset as committed internally and is suitable for at-least-once processing because we do not immediately write the offsets to storage. We will flush the offsets periodically (based on the heartbeat interval).
func (*Consumer) CommitByToken ¶
func (c *Consumer) CommitByToken(token CommitToken) error
CommitByToken is called when you've finished processing a message. In the at-least-once consumption case, this will allow the "last processed offset" to move forward so that we can never see this message again. This particular method is used when you've only got a CommitToken to commit from.
func (*Consumer) ConsumeChannel ¶
ConsumeChannel returns a read-only channel. Messages that are retrieved from Kafka will be made available in this channel.
func (*Consumer) Flush ¶
Flush will cause us to upate all of the committed offsets. This operation can be performed to periodically sync offsets without waiting on the internal flushing mechanism.
func (*Consumer) GetCurrentLag ¶
GetCurrentLag returns the number of messages that this consumer is lagging by. Note that this value can be unstable in the beginning of a run, as we might not have claimed all of partitions we will end up claiming, or we might have overclaimed and need to back off. Ideally this will settle towards 0. If it continues to rise, that implies there isn't enough consumer capacity.
func (*Consumer) GetCurrentLoad ¶
GetCurrentLoad returns a number representing the "load" of this consumer. Think of this like a load average in Unix systems: the numbers are kind of related to how much work the system is doing, but by itself they don't tell you much.
func (*Consumer) GetCurrentTopicClaims ¶
GetCurrentTopicClaims returns the topics that are currently claimed by this consumer. It should be relevent only when ClaimEntireTopic is set
func (*Consumer) PrintState ¶
func (c *Consumer) PrintState()
PrintState outputs the status of the consumer.
func (*Consumer) Terminate ¶
Terminate instructs the consumer to clean up and allow other consumers to begin consuming. (If you do not call this method before exiting, things will still work, but more slowly.)
func (*Consumer) Terminated ¶
Terminated returns whether or not this consumer has been terminated.
func (*Consumer) TopicClaims ¶
TopicClaims returns a read-only channel that receives updates for topic claims. It's only relevant when CLaimEntireTopic is set
type ConsumerOptions ¶
type ConsumerOptions struct { // FastReclaim instructs the consumer to attempt to reclaim any partitions // that are presently claimed by the ClientID/GroupID we have. This is useful // for situations where your ClientID is predictable/stable and you want to // minimize churn during restarts. This is dangerous if you have two copies // of your application running with the same ClientID/GroupID. // TODO: Create an instance ID for Marshaler such that we can detect when // someone else has decided to use our Client/Group. // // Note that this option ignores MaximumClaims, so it is possible to // exceed the claim limit if the ClientID previously held more claims. FastReclaim bool // ClaimEntireTopic makes Marshal handle claims on the entire topic rather than // on a per-partition basis. This is used with sharded produce/consume setups. // Defaults to false. ClaimEntireTopic bool // GreedyClaims indicates whether we should attempt to claim all unclaimed // partitions on start. This is appropriate in low QPS type environments. // Defaults to false/off. GreedyClaims bool // ReleaseClaimsIfBehind indicates whether to release a claim if a consumer // is consuming at a rate slower than the partition is being produced to. ReleaseClaimsIfBehind bool // The maximum number of claims this Consumer is allowed to hold simultaneously. // MaximumClaims indicates the maximum number of partitions to be claimed when // ClaimEntireTopic is set to false. Otherwise, it indicates the maximum number // of topics to claim. // Set to 0 (default) to allow an unlimited number of claims. // // Using this option will leave some partitions/topics completely unclaimed // if the number of Consumers in this GroupID falls below the number of // partitions/topics that exist. // // Note this limit does not apply to claims made via FastReclaim. MaximumClaims int }
ConsumerOptions represents all of the options that a consumer can be configured with.
func NewConsumerOptions ¶
func NewConsumerOptions() ConsumerOptions
NewConsumerOptions returns a default set of options for the Consumer.
type KafkaCluster ¶
type KafkaCluster struct {
// contains filtered or unexported fields
}
KafkaCluster is a user-agnostic view of the world. It connects to a Kafka cluster and runs rationalizers to observe the complete world state.
func Dial ¶
func Dial(name string, brokers []string, options MarshalOptions) (*KafkaCluster, error)
Dial returns a new cluster object which can be used to instantiate a number of Marshalers that all use the same cluster. You may pass brokerConf or may set it to nil.
func (*KafkaCluster) IsGroupPaused ¶
func (c *KafkaCluster) IsGroupPaused(groupID string) bool
IsGroupPaused returns true if the given consumer group is paused. TODO(pihu) This just checks the expiry time, and not the admin ID.
func (*KafkaCluster) NewMarshaler ¶
func (c *KafkaCluster) NewMarshaler(clientID, groupID string) (*Marshaler, error)
NewMarshaler creates a Marshaler off of an existing cluster. This is more efficient if you're creating multiple instances, since they can share the same underlying cluster.
func (*KafkaCluster) Terminate ¶
func (c *KafkaCluster) Terminate()
Terminate is called when we're done with the marshaler and want to shut down.
func (*KafkaCluster) Terminated ¶
func (c *KafkaCluster) Terminated() bool
Terminated returns whether or not we have been terminated.
type MarshalOptions ¶
type MarshalOptions struct { // BrokerConnectionLimit is used to set the maximum simultaneous number of connections // that can be made to each broker. // Default: 30. BrokerConnectionLimit int // ConsumeRequestTimeout sets the time that we ask Kafka to wait before returning any // data to us. Setting this high uses more connections and can lead to some latency // but keeps the load on Kafka minimal. Use this to balance QPS against latency. // // Default: 1 millisecond. ConsumeRequestTimeout time.Duration // MarshalRequestTimeout is used for our coordination requests. This should be reasonable // at default, but is left as a tunable in case you have clients that are claiming an // extremely large number of partitions and are too slow. The overall Marshal latency // is impacted by this value as well as the MarshalRequestRetryWait below. // // Default: 1 millisecond. MarshalRequestTimeout time.Duration // MarshalRequestRetryWait is the time between consume requests Marshal generates. This // should be set to balance the above timeouts to prevent hammering the server. // // Default: 500 milliseconds. MarshalRequestRetryWait time.Duration // MaxMessageSize is the maximum size in bytes of messages that can be returned. This // must be set to the size of the largest messages your cluster is allowed to store, // else you will end up with stalled streams. I.e., Kafka will never send you a message // if the message is larger than this value but we can't detect that, we just think // there is no data. // // Default: 2,000,000 bytes. MaxMessageSize int32 // MaxMessageQueue is the number of messages to retrieve from Kafka and store in-memory // waiting for consumption. This is per-Consumer and independent of message size so you // should adjust this for your consumption patterns. // // Default: 1000 messages. MaxMessageQueue int }
MarshalOptions contains various tunables that can be used to adjust the configuration of the underlying system.
func NewMarshalOptions ¶
func NewMarshalOptions() MarshalOptions
NewMarshalOptions returns a set of MarshalOptions populated with defaults.
type Marshaler ¶
type Marshaler struct {
// contains filtered or unexported fields
}
Marshaler is the coordinator type. It is designed to be used once per (client, group) and is thread safe. Creating one of these will create connections to your Kafka cluster and begin actively monitoring the coordination topic.
func NewMarshaler ¶
NewMarshaler connects to a cluster (given broker addresses) and prepares to handle marshalling requests. Given the way this system works, the marshaler has to process all messages in the topic before it's safely able to begin operating. This might take a while. NOTE: If you are creating multiple marshalers in your program, you should instead call Dial and then use the NewMarshaler method on that object.
func (*Marshaler) ClaimPartition ¶
ClaimPartition is how you can actually claim a partition. If you call this, Marshal will attempt to claim the partition on your behalf. This is the low level function, you probably want to use a MarshaledConsumer. Returns a bool on whether or not the claim succeeded and whether you can continue.
func (*Marshaler) Claimed ¶
Claimed returns the current status on whether or not a partition is claimed by any other consumer in our group (including ourselves). A topic/partition that does not exist is considered to be unclaimed.
func (*Marshaler) CommitOffsets ¶
CommitOffsets will commit the partition offsets to Kafka so it's available in the long-term storage of the offset coordination system. Note: this method does not ensure that this Marshal instance owns the topic/partition in question.
func (*Marshaler) GetLastPartitionClaim ¶
func (m *Marshaler) GetLastPartitionClaim(topicName string, partID int) PartitionClaim
GetLastPartitionClaim returns a PartitionClaim structure for a given partition. The structure describes the consumer that is currently or most recently claiming this partition. This is a copy of the claim structure, so changing it cannot change the world state.
func (*Marshaler) GetPartitionClaim ¶
func (m *Marshaler) GetPartitionClaim(topicName string, partID int) PartitionClaim
GetPartitionClaim returns a PartitionClaim structure for a given partition. The structure describes the consumer that is currently claiming this partition. This is a copy of the claim structure, so changing it cannot change the world state.
func (*Marshaler) GetPartitionOffsets ¶
func (m *Marshaler) GetPartitionOffsets(topicName string, partID int) (PartitionOffsets, error)
GetPartitionOffsets returns the current state of a topic/partition. This has to hit Kafka thrice to ask about a partition, but it returns the full state of information that can be used to calculate consumer lag.
func (*Marshaler) Heartbeat ¶
Heartbeat will send an update for other people to know that we're still alive and still owning this partition. Returns an error if anything has gone wrong (at which point we can no longer assert we have the lock).
func (*Marshaler) NewConsumer ¶
func (m *Marshaler) NewConsumer(topicNames []string, options ConsumerOptions) (*Consumer, error)
NewConsumer instantiates a consumer object for a given topic. You must create a separate consumer for every individual topic that you want to consume from. Please see the documentation on ConsumerBehavior.
func (*Marshaler) Partitions ¶
Partitions returns the count of how many partitions are in a given topic. Returns 0 if a topic is unknown.
func (*Marshaler) PrintState ¶
func (m *Marshaler) PrintState()
PrintState will take the current state of the Marshal world and print it verbosely to the logging output. This is used in the rare case where we're self-terminating or on request from the user.
func (*Marshaler) ReleasePartition ¶
ReleasePartition will send an update for other people to know that we're done with a partition. Returns an error if anything has gone wrong (at which point we can no longer assert we have the lock).
func (*Marshaler) Terminate ¶
func (m *Marshaler) Terminate()
Terminate is called when we're done with the marshaler and want to shut down.
func (*Marshaler) Terminated ¶
Terminated returns whether or not we have been terminated.
type Message ¶
Message is a container for Kafka messages.
func (*Message) CommitToken ¶
func (m *Message) CommitToken() CommitToken
CommitToken returns a CommitToken for a message. This can be passed to the CommitByToken method.
type PartitionClaim ¶
type PartitionClaim struct { InstanceID string ClientID string GroupID string LastRelease int64 LastHeartbeat int64 CurrentOffset int64 // contains filtered or unexported fields }
PartitionClaim contains claim information about a given partition.
func (*PartitionClaim) Claimed ¶
func (p *PartitionClaim) Claimed() bool
Claimed returns whether or not the PartitionClaim indicates a valid (as of this invocation) claim.
type PartitionOffsets ¶
PartitionOffsets is a record of offsets for a given partition. Contains information combined from Kafka and our current state.
A Kafka partition consists of N messages with offsets. In the basic case, you can think of an offset like an array index. With log compaction and other trickery it acts more like a sparse array, but it's a close enough metaphor.
We keep track of four values for offsets:
offsets 1 2 3 7 9 10 11 partition [ msg1, msg2, msg3, msg4, msg5, msg6, msg7, ... ] ^ ^ ^ \- Earliest | | \- Current Latest
In this example, Earliest is 1 which is the "oldest" offset within the partition. At any given time this offset might become invalid if a log rolls so we might update it.
Current is 7, which is the offset of the NEXT message i.e. this message has not been consumed yet.
Latest is 12, which is the offset that Kafka will assign to the message that next gets committed to the partition. This offset does not yet exist, and might never.
Committed is the value recorded in Kafka's committed offsets system.