protocol

package
v1.6.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 28, 2023 License: Apache-2.0 Imports: 4 Imported by: 5

Documentation

Overview

Package protocol - Burrow types and interfaces. The protocol module provides the definitions for most of the common Burrow types and interfaces that are used in the rest of the application. The documentation here is primarily targeted at developers of Burrow modules, and not the end user.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type ApplicationContext

type ApplicationContext struct {
	// Logger is a configured zap.Logger instance. It is to be used by the main routine directly, and the main routine
	// creates loggers for each of the coordinators to use that have fields set to identify that coordinator.
	//
	// This field can be set prior to calling core.Start() in order to pre-configure the logger. If it is not set,
	// core.Start() will set up a default logger using the application config.
	Logger *zap.Logger

	// LogLevel is an AtomicLevel instance that has been used to set the default level of the Logger. It is used to
	// dynamically adjust the logging level (such as via an HTTP call)
	//
	// If Logger has been set prior to calling core.Start(), LogLevel must be set as well.
	LogLevel *zap.AtomicLevel

	// This is used by the main Burrow routines to signal that the configuration is valid. The rest of the code should
	// not care about this, as the application will exit if the configuration is not valid.
	ConfigurationValid bool

	// This is a ZookeeperClient that can be used by any coordinator or module in order to store metadata about their
	// operation, or to create locks. Any module that uses this client must honor the ZookeeperRoot, which is the root
	// path under which all ZNodes should be created.
	Zookeeper     ZookeeperClient
	ZookeeperRoot string

	// This is a boolean flag which is set or unset by the Zookeeper coordinator to signal when the connection is
	// established. It should be used in coordination with the ZookeeperExpired condition to monitor when the session
	// has expired. This indicates locks have been released or watches must be reset.
	ZookeeperConnected bool
	ZookeeperExpired   *sync.Cond

	// This is the channel over which any module should send a consumer group evaluation request. It is serviced by the
	// evaluator Coordinator, and passed to an appropriate evaluator module.
	EvaluatorChannel chan *EvaluatorRequest

	// This is the channel over which any module should send storage requests for storage of offsets and group
	// information, or to fetch the same information. It is serviced by the storage Coordinator.
	StorageChannel chan *StorageRequest

	// This is a boolean flag which is set by the last subsystem, the consumer, in order to signal when Burrow is ready
	AppReady bool
}

ApplicationContext is a structure that holds objects that are used across all coordinators and modules. This is used in lieu of passing individual arguments to all functions.

type ConsumerGroupStatus

type ConsumerGroupStatus struct {
	// The name of the cluster in which the group exists
	Cluster string `json:"cluster"`

	// The name of the consumer group
	Group string `json:"group"`

	// The status of the consumer group. This is either NOTFOUND, OK, WARN, or ERR. It is calculated from the highest
	// Status for the individual partitions
	Status StatusConstant `json:"status"`

	// A number between 0.0 and 1.0 that describes the percentage complete the partition information is for this group.
	// A partition that has a Complete value of less than 1.0 will be treated as zero.
	Complete float32 `json:"complete"`

	// A slice of PartitionStatus objects showing individual partition status. If the request ShowAll field was true,
	// this slice will contain every partition consumed by the group. If ShowAll was false, this slice will only
	// contain the partitions that have a status of WARN or above.
	Partitions []*PartitionStatus `json:"partitions"`

	// A count of the total number of partitions that the group has committed offsets for. Note, this may not be the
	// same as the total number of partitions consumed by the group, if Burrow has not seen commits for all partitions
	// yet.
	TotalPartitions int `json:"partition_count"`

	// A PartitionStatus object for the partition with the highest CurrentLag value
	Maxlag *PartitionStatus `json:"maxlag"`

	// The sum of all partition CurrentLag values for the group
	TotalLag uint64 `json:"totallag"`
}

ConsumerGroupStatus is the response object that is sent in reply to an EvaluatorRequest. It describes the current status of a single consumer group.

type ConsumerOffset

type ConsumerOffset struct {
	// The offset that is stored
	Offset int64 `json:"offset"`

	// The offset of this __consumer_offsets commit
	Order int64 `json:"-"`

	// The timestamp at which the offset was committed
	Timestamp int64 `json:"timestamp"`

	// The timestamp at which the commit was seen by burrow
	ObservedTimestamp int64 `json:"observedAt"`

	// The number of messages that the consumer was behind at the time that the offset was committed. This number is
	// not updated after the offset was committed, so it does not represent the current lag of the consumer.
	Lag *Lag `json:"lag"`
}

ConsumerOffset represents a single offset stored. It is used as part of the response to a StorageFetchConsumer request

type ConsumerPartition

type ConsumerPartition struct {
	// A slice containing a ConsumerOffset object for each offset Burrow has stored for this partition. This can be any
	// length up to the number of intervals Burrow has been configured to store, depending on how many offset commits
	// have been seen for this partition
	Offsets []*ConsumerOffset `json:"offsets"`

	// A slice containing the history of broker offsets stored for this partition. This is used for evaluation only,
	// and as such it is not provided when encoding to JSON (for HTTP responses)
	BrokerOffsets []int64 `json:"-"`

	// A string that describes the consumer host that currently owns this partition, if the information is available
	// (for active new consumers)
	Owner string `json:"owner"`

	// A string containing the client_id set by the consumer (for active new consumers)
	ClientID string `json:"client_id"`

	// The current number of messages that the consumer is behind for this partition. This is calculated using the
	// last committed offset and the current broker end offset
	CurrentLag uint64 `json:"current-lag"`
}

ConsumerPartition represents the information stored for a group for a single partition. It is used as part of the response to a StorageFetchConsumer request

type ConsumerPartitions

type ConsumerPartitions []*ConsumerPartition

ConsumerPartitions describes all partitions for a single topic. The index indicates the partition ID, and the value is a pointer to a ConsumerPartition object with the offset information for that partition.

type ConsumerTopics

type ConsumerTopics map[string]ConsumerPartitions

ConsumerTopics is the response that is sent for a StorageFetchConsumer request. It is a map of topic names to ConsumerPartitions objects that describe that topic

type Coordinator

type Coordinator interface {
	// Configure is called to initially set up the coordinator. In this func, it should validate any configurations
	// that the coordinator requires, and then call the Configure func for each of its modules. If there are any errors
	// in configuration, it is expected that this call will panic. The coordinator may also set up data structures that
	// are critical for the subsystem, such as communication channels. It must NOT make any connections to resources
	// outside of the coordinator itself, including either the storage or evaluator channels in the application context.
	Configure()

	// Start is called to start the operation of the coordinator. In this func, the coordinator should call the Start
	// func for any of its modules, and then start any additional logic the coordinator needs to run. This func must
	// return (any running code must be started as a goroutine). If there is a problem starting up, the coordinator
	// should stop anything it has already started and return a non-nil error.
	Start() error

	// Stop is called to stop operation of the coordinator. In this func, the coordinator should call the Stop func for
	// any of its modules, and stop any goroutines that it has started. While it can return an error if there is a
	// problem, the errors are mostly ignored.
	Stop() error
}

Coordinator is a common interface for all subsystem coordinators so that the core routine can manage them in a consistent manner. The interface provides a way to configure the coordinator, and then methods to start it and stop it safely. It is expected that when any of these funcs are called, the coordinator will then call the corresponding func on its modules.

The struct that implements this interface is expected to have an App and Log literal, at the very least. The App literal will contain the protocol.ApplicationContext object with resources that the coordinator and modules may use. The Log literal will be set up with a logger that has fields set that identify the coordinator. These are set up by the core routine before Configure is called. The coordinator can use Log to create the individual loggers for the modules it controls.

type EvaluatorRequest

type EvaluatorRequest struct {
	// Reply is the channel over which the evaluator will send the status response. The sender should expect to receive
	// only one message over this channel for each request, and the channel will not be closed after the response is
	// sent (to facilitate the notifier, which uses a single channel for all responses)
	Reply chan *ConsumerGroupStatus

	// The name of the cluster in which the group is found
	Cluster string

	// The name of the group to get the status for
	Group string

	// If ShowAll is true, the returned status object contains a partition entry for every partition the group consumes,
	// regardless of the state of that partition. If false (the default), only partitions that have a status of WARN
	// or above are returned in the status object.
	ShowAll bool
}

EvaluatorRequest is sent over the EvaluatorChannel that is stored in the application context. It is a query for the status of a group in a cluster. The response to this query is sent over the reply channel. This request is typically used in the HTTP server and notifier subsystems.

type Lag added in v1.3.0

type Lag struct {
	Value uint64
}

Lag is just a wrapper for a uint64, but it can be `nil`

func (Lag) MarshalJSON added in v1.3.0

func (lag Lag) MarshalJSON() ([]byte, error)

MarshalJSON should just treat lag as a nullable number, not a nested struct

func (*Lag) UnmarshalJSON added in v1.3.0

func (lag *Lag) UnmarshalJSON(b []byte) error

UnmarshalJSON reads lag from a JSON number

type Module

type Module interface {
	// Configure is called to initially set up the module. The name of the module, as well as the root string to be
	// used when looking up configurations with viper, are provided. In this func, the module must completely validate
	// it's own configuration, and panic if it is not correct. It may also set up data structures that are critical
	// for the module. It must NOT make any connections to resources outside of the module itself, including either
	// the storage or evaluator channels in the application context.
	Configure(name string, configRoot string)

	// Start is called to start the operation of the module. In this func, the module should make connections to
	// external resources and start operation. This func must return (any running code must be started as a goroutine).
	// If there is a problem starting up, the module should stop anything it has already started and return a non-nil
	// error.
	Start() error

	// Stop is called to stop operation of the module. In this func, the module should clean up any goroutines it has
	// started and close any external connections. While it can return an error if there is a problem, the errors are
	// mostly ignored.
	Stop() error
}

Module is a common interface for all modules so that they can be manipulated by the coordinators in the same way. The interface provides a way to configure the module, and then methods to start it and stop it safely. Each coordinator may have its own Module interface definition, as well, that adds specific requirements for that type of module.

The struct that implements this interface is expected to have an App and Log literal, at the very least. The App literal will contain the protocol.ApplicationContext object with resources that the module may use. The Log literal will be set up with a logger that has fields set that identify the module. These are set up by the module's coordinator before Configure is called.

type PartitionStatus

type PartitionStatus struct {
	// The topic name for this partition
	Topic string `json:"topic"`

	// The partition ID
	Partition int32 `json:"partition"`

	// If available (for active new consumers), the consumer host that currently owns this partiton
	Owner string `json:"owner"`

	// If available (for active new consumers), the client_id of the consumer that currently owns this partition
	ClientID string `json:"client_id"`

	// The status of the partition
	Status StatusConstant `json:"status"`

	// A ConsumerOffset object that describes the first (oldest) offset that Burrow is storing for this partition
	Start *ConsumerOffset `json:"start"`

	// A ConsumerOffset object that describes the last (latest) offset that Burrow is storing for this partition
	End *ConsumerOffset `json:"end"`

	// The current number of messages that the consumer is behind for this partition. This is calculated using the
	// last committed offset and the current broker end offset
	CurrentLag uint64 `json:"current_lag"`

	// A number between 0.0 and 1.0 that describes the percentage complete the offset information is for this partition.
	// For example, if Burrow has been configured to store 10 offsets, and Burrow has only stored 7 commits for this
	// partition, Complete will be 0.7
	Complete float32 `json:"complete"`
}

PartitionStatus represents the state of a single consumed partition

type StatusConstant

type StatusConstant int

StatusConstant describes the state of a partition or group as a single value. These values are ordered from least to most "bad", with zero being reserved to indicate that a group is not found.

const (
	// StatusNotFound indicates that the consumer group does not exist. It is not used for partition status.
	StatusNotFound StatusConstant = 0

	// StatusOK indicates that a partition is in a good state. For a group, it indicates that all partitions are in a
	// good state.
	StatusOK StatusConstant = 1

	// StatusWarning indicates that a partition is lagging - it is making progress, but falling further behind. For a
	// group, it indicates that one or more partitions are lagging.
	StatusWarning StatusConstant = 2

	// StatusError indicates that a group has one or more partitions that are in the Stop, Stall, or Rewind states. It
	// is not used for partition status.
	StatusError StatusConstant = 3

	// StatusStop indicates that the consumer has not committed an offset for that partition in some time, and the lag
	// is non-zero. It is not used for group status.
	StatusStop StatusConstant = 4

	// StatusStall indicates that the consumer is committing offsets for the partition, but they are not increasing and
	// the lag is non-zero. It is not used for group status.
	StatusStall StatusConstant = 5

	// StatusRewind indicates that the consumer has committed an offset for the partition that is less than the
	// previous offset. It is not used for group status.
	StatusRewind StatusConstant = 6
)

func (StatusConstant) MarshalJSON

func (c StatusConstant) MarshalJSON() ([]byte, error)

MarshalJSON implements the json.Marshaler interface. The status is the string representation of StatusConstant

func (StatusConstant) MarshalText

func (c StatusConstant) MarshalText() ([]byte, error)

MarshalText implements the encoding.TextMarshaler interface. The status is the string representation of StatusConstant

func (StatusConstant) String

func (c StatusConstant) String() string

String returns a string representation of a StatusConstant

type StorageRequest

type StorageRequest struct {
	// The type of request that this struct encapsulates
	RequestType StorageRequestConstant

	// If the RequestType is a "Fetch" request, Reply must contain a channel to receive the response on
	Reply chan interface{}

	// The name of the cluster to which the request applies. Required for all request types except StorageFetchClusters
	Cluster string

	// The name of the consumer group to which the request applies
	Group string

	// The name of the topic to which the request applies
	Topic string

	// The ID of the partition to which the request applies
	Partition int32

	// For StorageSetBrokerOffset requests, TopicPartitionCount indicates the total number of partitions for the topic
	TopicPartitionCount int32

	// For StorageSetBrokerOffset and StorageSetConsumerOffset requests, the offset to store
	Offset int64

	// For StorageSetConsumerOffset requests, the offset of the offset commit itself (i.e. the __consumer_offsets offset)
	Order int64

	// For StorageSetConsumerOffset requests, the timestamp of the offset being stored
	Timestamp int64

	// For StorageSetConsumerOwner requests, a string describing the consumer host that owns the partition
	Owner string

	// For StorageSetConsumerOwner requests, a string containing the client_id set by the consumer
	ClientID string
}

StorageRequest is sent over the StorageChannel that is stored in the application context. It is a query to either send information to the storage subsystem, or retrieve information from it . The RequestType indicates the particular type of request. "Set" and "Clear" requests do not get a response. "Fetch" requests will send a response over the Reply channel supplied in the request

type StorageRequestConstant

type StorageRequestConstant int

StorageRequestConstant is used in StorageRequest to indicate the type of request. Numeric ordering is not important

const (
	// StorageSetBrokerOffset is the request type to store a broker offset. Requires Cluster, Topic, Partition,
	// TopicPartitionCount, and Offset fields
	StorageSetBrokerOffset StorageRequestConstant = 0

	// StorageSetConsumerOffset is the request type to store a consumer offset. Requires Cluster, Group, Topic,
	// Partition, Offset, and Timestamp fields
	StorageSetConsumerOffset StorageRequestConstant = 1

	// StorageSetConsumerOwner is the request type to store a consumer owner. Requires Cluster, Group, Topic, Partition,
	// and Owner fields
	StorageSetConsumerOwner StorageRequestConstant = 2

	// StorageSetDeleteTopic is the request type to remove a topic from the broker and all consumers. Requires Cluster,
	// Group, and Topic fields
	StorageSetDeleteTopic StorageRequestConstant = 3

	// StorageSetDeleteGroup is the request type to remove a consumer group. Requires Cluster and Group fields
	StorageSetDeleteGroup StorageRequestConstant = 4

	// StorageFetchClusters is the request type to retrieve a list of clusters. Requires Reply. Returns a []string
	StorageFetchClusters StorageRequestConstant = 5

	// StorageFetchConsumers is the request type to retrieve a list of consumer groups in a cluster. Requires Reply and
	// Cluster fields. Returns a []string
	StorageFetchConsumers StorageRequestConstant = 6

	// StorageFetchTopics is the request type to retrieve a list of topics in a cluster. Requires Reply and Cluster
	// fields. Returns a []string
	StorageFetchTopics StorageRequestConstant = 7

	// StorageFetchConsumer is the request type to retrieve all stored information for a single consumer group. Requires
	// Reply, Cluster, and Group fields. Returns a ConsumerTopics object
	StorageFetchConsumer StorageRequestConstant = 8

	// StorageFetchTopic is the request type to retrieve the current broker offsets (one per partition) for a topic.
	// Requires Reply, Cluster, and Topic fields.
	// Returns a []int64
	StorageFetchTopic StorageRequestConstant = 9

	// StorageClearConsumerOwners is the request type to remove all partition owner information for a single group.
	// Requires Cluster and Group fields
	StorageClearConsumerOwners StorageRequestConstant = 10

	// StorageFetchConsumersForTopic is the request type to obtain a list of all consumer groups consuming from a topic.
	// Returns a []string
	StorageFetchConsumersForTopic StorageRequestConstant = 11
)

func (StorageRequestConstant) MarshalJSON

func (c StorageRequestConstant) MarshalJSON() ([]byte, error)

MarshalJSON implements the json.Marshaler interface. The status is the string representation of StorageRequestConstant

func (StorageRequestConstant) MarshalText

func (c StorageRequestConstant) MarshalText() ([]byte, error)

MarshalText implements the encoding.TextMarshaler interface. The status is the string representation of StorageRequestConstant

func (StorageRequestConstant) String

func (c StorageRequestConstant) String() string

String returns a string representation of a StorageRequestConstant for logging

type ZookeeperClient

type ZookeeperClient interface {
	// Close the connection to Zookeeper
	Close()

	// For the given path in Zookeeper, return a slice of strings which list the immediate child nodes. This method also
	// sets a watch on the children of the specified path, providing an event channel that will receive a message when
	// the watch fires
	ChildrenW(path string) ([]string, *zk.Stat, <-chan zk.Event, error)

	// For the given path in Zookeeper, return the data in the node as a byte slice. This method also sets a watch on
	// the children of the specified path, providing an event channel that will receive a message when the watch fires
	GetW(path string) ([]byte, *zk.Stat, <-chan zk.Event, error)

	// For the given path in Zookeeper, return a boolean stating whether or not the node exists.
	// The method does not set watch on the node, but verifies existence of a node to avoid authentication error.
	Exists(path string) (bool, *zk.Stat, error)

	// For the given path in Zookeeper, return a boolean stating whether or not the node exists. This method also sets
	// a watch on the node (exists if it does not currently exist, or a data watch otherwise), providing an event
	// channel that will receive a message when the watch fires
	ExistsW(path string) (bool, *zk.Stat, <-chan zk.Event, error)

	// Create makes a new ZNode at the specified path with the contents set to the data byte-slice. Flags can be
	// provided to specify that this is an ephemeral or sequence node, and an ACL must be provided. If no ACL is
	// desired, specify
	//  zk.WorldACL(zk.PermAll)
	Create(string, []byte, int32, []zk.ACL) (string, error)

	// NewLock creates a lock using the provided path. Multiple Zookeeper clients, using the same lock path, can
	// synchronize with each other to assure that only one client has the lock at any point.
	NewLock(path string) ZookeeperLock
}

ZookeeperClient is a minimal interface for working with a Zookeeper connection. We provide this interface, rather than using the underlying library directly, as it makes it easier to test code that uses Zookeeper. This interface should be expanded with additional methods as needed.

Note that the interface is specified in the protocol package, rather than in the helper package or the zookeeper coordinator package, as it has to be referenced by ApplicationContext. Moving it elsewhere generates a dependency loop.

type ZookeeperLock

type ZookeeperLock interface {
	// Lock acquires the lock, blocking until it is able to do so, and returns nil. If the lock cannot be acquired, such
	// as if the session has been lost, a non-nil error will be returned instead.
	Lock() error

	// Unlock releases the lock, returning nil. If there is an error releasing the lock, such as if it is not held, an
	// error is returned instead.
	Unlock() error
}

ZookeeperLock is an interface for the operation of a lock in Zookeeper. Multiple Zookeeper clients, using the same lock path, can synchronize with each other to assure that only one client has the lock at any point.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL