server

package
v4.2.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 9, 2023 License: Apache-2.0 Imports: 29 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrFetchingBrokers error.
	ErrFetchingBrokers = status.Error(codes.Internal, "error fetching brokers")
	// ErrBrokerNotExist error.
	ErrBrokerNotExist = status.Error(codes.FailedPrecondition, "broker does not exist")
	// ErrBrokerIDEmpty error.
	ErrBrokerIDEmpty = status.Error(codes.InvalidArgument, "broker Id field must be specified")
	// ErrBrokerIDsEmpty error.
	ErrBrokerIDsEmpty = status.Error(codes.InvalidArgument, "broker Ids field must be specified")
)
View Source
var (
	// ErrFetchingTopics error.
	ErrFetchingTopics = status.Error(codes.Internal, "error fetching topics")
	// ErrTopicNotExist error.
	ErrTopicNotExist = status.Error(codes.NotFound, "topic does not exist")
	// ErrTopicNameEmpty error.
	ErrTopicNameEmpty = status.Error(codes.InvalidArgument, "topic Name field must be specified")
	// ErrTopicFieldMissing error.
	ErrTopicFieldMissing = status.Error(codes.InvalidArgument, "topic field missing in request body")
	// ErrTopicAlreadyExists error.
	ErrTopicAlreadyExists = status.Error(codes.AlreadyExists, "topic already exists")
	// ErrInsufficientBrokers error.
	ErrInsufficientBrokers = status.Error(codes.FailedPrecondition, "insufficient number of brokers")
	// ErrInvalidBrokerId error.
	ErrInvalidBrokerId = status.Error(codes.FailedPrecondition, "invalid broker id")
	// ErrTaggingTopicTimedOut
	ErrTaggingTopicTimedOut = status.Error(codes.DeadlineExceeded, "tagging topic timed out")
)
View Source
var (
	// ErrInvalidKafkaObjectType error.
	ErrInvalidKafkaObjectType = errors.New("invalid Kafka object type")
	// ErrKafkaObjectDoesNotExist error.
	ErrKafkaObjectDoesNotExist = errors.New("requested Kafka object does not exist")
	// ErrNilTagSet error.
	ErrNilTagSet = errors.New("must provide a non-nil or non-empty TagSet")
	// ErrNilTags error.
	ErrNilTags = errors.New("must provide a non-nil or non-empty tags")
)
View Source
var (
	// ErrGroupIDEmpty error.
	ErrGroupIDEmpty = status.Error(codes.InvalidArgument, "GroupId field must be specified")
)
View Source
var (
	// ErrRequestThrottleTimeout error.
	ErrRequestThrottleTimeout = errors.New("wait time exceeded")
)
View Source
var TagMarkTimeKey = "tagMarkedForDeletionTime"

Functions

func PartitionMapToReplicaAssignment

func PartitionMapToReplicaAssignment(pm *mapper.PartitionMap) kafkaadmin.ReplicaAssignment

PartitionMapToReplicaAssignment takes a *mapper.PartitionMap and transforms it into an admin.ReplicaAssignment.

Types

type BrokerSet

type BrokerSet map[uint32]*pb.Broker

BrokerSet is a mapping of broker IDs to *pb.Broker.

func (BrokerSet) IDs

func (b BrokerSet) IDs() []uint32

IDs returns a []uint32 of IDs from a BrokerSet.

type Checkpoint

type Checkpoint struct {
	Topic           string
	Partition       uint32
	ConsumerGroupID string
	UpstreamOffset  uint64
	Offset          uint64
	Metadata        string
}

Checkpoint holds a record emmitted from the MirrorCheckpointConnector in MirrorMaker2.

type Config

type Config struct {
	HTTPListen                 string
	GRPCListen                 string
	ReadReqRate                int
	WriteReqRate               int
	ZKTagsPrefix               string
	DefaultRequestTimeout      time.Duration
	TagCleanupFrequencyMinutes int
	TagAllowedStalenessMinutes int
	// contains filtered or unexported fields
}

Config holds Server configurations.

type ErrReservedTag

type ErrReservedTag struct {
	// contains filtered or unexported fields
}

ErrReservedTag error.

func (ErrReservedTag) Error

func (e ErrReservedTag) Error() string

type KafkaObject

type KafkaObject struct {
	Type string
	ID   string
}

KafkaObject holds an object type (broker, topic) and object identifier (ID, name).

func (KafkaObject) Complete

func (o KafkaObject) Complete() bool

Complete checks if a KafkaObject is valid and has a non-empty ID field value.

func (KafkaObject) Valid

func (o KafkaObject) Valid() bool

Valid checks if a KafkaObject has a valid Type field value.

type RequestThrottle

type RequestThrottle interface {
	Request(context.Context) error
}

RequestThrottle controls request rates with a configurable burst capacity and per-second rate backed with a token bucket.

func NewRequestThrottle

func NewRequestThrottle(cfg RequestThrottleConfig) (RequestThrottle, error)

NewRequestThrottle initializes a RequestThrottle.

type RequestThrottleConfig

type RequestThrottleConfig struct {
	// Burst capacity.
	Capacity int
	// Request rate (reqs/s).
	Rate int
}

RequestThrottleConfig specifies the RequestThrottle burst capacity and per-second rate limit.

type ReservedFields

type ReservedFields map[string]map[string]struct{}

ReservedFields is a mapping of object types (topic, broker) to a set of fields reserved for internal use; these are default fields that become searchable through the tags interface.

func GetReservedFields

func GetReservedFields() ReservedFields

GetReservedFields returns a map proto message types to field names considered reserved for internal use. All fields specified in the Registry proto messages are discovered here and reserved by default.

type Server

type Server struct {
	pb.UnimplementedRegistryServer
	Locking    cluster.Lock
	HTTPListen string
	GRPCListen string
	ZK         kafkazk.Handler

	Tags *TagHandler
	// contains filtered or unexported fields
}

Server implements the registry APIs.

func NewServer

func NewServer(c Config) (*Server, error)

NewServer initializes a *Server.

func (*Server) BrokerMappings

func (s *Server) BrokerMappings(ctx context.Context, req *pb.BrokerRequest) (*pb.TopicResponse, error)

BrokerMappings returns all topic names that have at least one partition held by the requested broker. The broker is specified in the BrokerRequest.ID field.

func (*Server) CreateTopic

func (s *Server) CreateTopic(ctx context.Context, req *pb.CreateTopicRequest) (*pb.Empty, error)

CreateTopic creates a topic if it doesn't exist. Topic tags can optionally be set at topic creation time. Additionally, topics can be created on a target set of brokers by specifying the broker tag(s) in the request.

func (*Server) DeleteBrokerTags

func (s *Server) DeleteBrokerTags(ctx context.Context, req *pb.BrokerRequest) (*pb.TagResponse, error)

DeleteBrokerTags deletes custom tags for the specified broker.

func (*Server) DeleteStaleTags

func (s *Server) DeleteStaleTags(ctx context.Context, now func() time.Time, c Config) error

DeleteStaleTags deletes any tags that have not had a kafka resource associated with them.

func (*Server) DeleteTopic

func (s *Server) DeleteTopic(ctx context.Context, req *pb.TopicRequest) (*pb.Empty, error)

DeleteTopic deletes the topic specified in the req.Topic.Name field.

func (*Server) DeleteTopicTags

func (s *Server) DeleteTopicTags(ctx context.Context, req *pb.TopicRequest) (*pb.TagResponse, error)

DeleteTopicTag deletes custom tags for the specified topic.

func (*Server) DialZK

func (s *Server) DialZK(ctx context.Context, wg *sync.WaitGroup, c *kafkazk.Config) error

DialZK takes a Context, WaitGroup and *kafkazk.Config and initializes a kafkazk.Handler. A background shutdown procedure is called when the context is cancelled.

func (*Server) EnablingLocking

func (s *Server) EnablingLocking(c *kafkazk.Config) error

EnablingLocking uses distributed locking for write operations.

func (*Server) GetBrokers

func (s *Server) GetBrokers(ctx context.Context, req *pb.BrokerRequest) (*pb.BrokerResponse, error)

GetBrokers gets brokers. If the input *pb.BrokerRequest Id field is non-zero, the specified broker is matched if it exists. Otherwise, all brokers found in ZooKeeper are matched. Matched brokers are then filtered by all tags specified, if specified, in the *pb.BrokerRequest tag field.

func (*Server) GetTopics

func (s *Server) GetTopics(ctx context.Context, req *pb.TopicRequest) (*pb.TopicResponse, error)

GetTopics gets topics. If the input *pb.TopicRequest Name field is non-nil, the specified topic is matched if it exists. Otherwise, all topics found in ZooKeeper are matched. Matched topics are then filtered by all tags specified, if specified, in the *pb.TopicRequest tag field.

func (*Server) InitKafkaAdmin

func (s *Server) InitKafkaAdmin(ctx context.Context, wg *sync.WaitGroup, cfg kafkaadmin.Config) error

InitKafkaAdmin takes a Context, WaitGroup and an admin.Config and initializes an admin.Client. A background shutdown procedure is called when the context is cancelled.

func (*Server) InitKafkaConsumer

func (s *Server) InitKafkaConsumer(ctx context.Context, wg *sync.WaitGroup, cfg kafkaadmin.Config) error

InitKafkaConsumer takes a Context, WaitGroup and an admin.Config and initializes a kafka.Consumer. A background shutdown procedure is called when the context is cancelled.

func (*Server) ListBrokers

func (s *Server) ListBrokers(ctx context.Context, req *pb.BrokerRequest) (*pb.BrokerResponse, error)

ListBrokers gets broker IDs. If the input *pb.BrokerRequest Id field is non-zero, the specified broker is matched if it exists. Otherwise, all brokers found in ZooKeeper are matched. Matched brokers are then filtered by all tags specified, if specified, in the *pb.BrokerRequest tag field.

func (*Server) ListTopics

func (s *Server) ListTopics(ctx context.Context, req *pb.TopicRequest) (*pb.TopicResponse, error)

ListTopics gets topic names. If the input *pb.TopicRequest Name field is non-nil, the specified topic is matched if it exists. Otherwise, all topics found in ZooKeeper are matched. Matched topics are then filtered by all tags specified, if specified, in the *pb.TopicRequest tag field.

func (*Server) LogRequest

func (s *Server) LogRequest(ctx context.Context, params string, reqID uint64)

LogRequest takes a request context and input parameters as a string and logs the request data.

func (*Server) MarkForDeletion

func (s *Server) MarkForDeletion(ctx context.Context, now func() time.Time) error

MarkForDeletion marks stored tags that have been stranded without an associated kafka resource.

func (*Server) ReassigningTopics

func (s *Server) ReassigningTopics(ctx context.Context, _ *pb.Empty) (*pb.TopicResponse, error)

ReassigningTopics returns a *pb.TopicResponse holding the names of all topics currently undergoing reassignment.

func (*Server) RunHTTP

func (s *Server) RunHTTP(ctx context.Context, wg *sync.WaitGroup) error

RunHTTP runs the HTTP endpoint.

func (*Server) RunRPC

func (s *Server) RunRPC(ctx context.Context, wg *sync.WaitGroup) error

RunRPC runs the gRPC endpoint.

func (*Server) RunTagCleanup

func (s *Server) RunTagCleanup(ctx context.Context, wg *sync.WaitGroup, c Config) error

runTagCleanup starts a background process deleting stale tags.

func (*Server) TagBroker

func (s *Server) TagBroker(ctx context.Context, req *pb.BrokerRequest) (*pb.TagResponse, error)

TagBroker sets custom tags for the specified broker. Any previously existing tags that were not specified in the request remain unmodified.

func (*Server) TagBrokers

func (s *Server) TagBrokers(ctx context.Context, req *pb.TagBrokersRequest) (*pb.TagBrokersResponse, error)

TagBrokers sets custom tags for the specified brokers. Any previously existing tags that were not specified in the request remain unmodified.

func (*Server) TagTopic

func (s *Server) TagTopic(ctx context.Context, req *pb.TopicRequest) (*pb.TagResponse, error)

TagTopic sets custom tags for the specified topic. Any previously existing tags that were not specified in the request remain unmodified.

func (*Server) TopicMappings

func (s *Server) TopicMappings(ctx context.Context, req *pb.TopicRequest) (*pb.BrokerResponse, error)

TopicMappings returns all broker IDs that hold at least one partition for the requested topic. The topic is specified in the TopicRequest.Name field.

func (*Server) TranslateOffsets

func (s *Server) TranslateOffsets(ctx context.Context, req *pb.TranslateOffsetRequest) (*pb.TranslateOffsetResponse, error)

TranslateOffsets translates the last committed remote consumer group's offset into the corresponding local offsets.

func (*Server) UnderReplicatedTopics

func (s *Server) UnderReplicatedTopics(ctx context.Context, _ *pb.Empty) (*pb.TopicResponse, error)

UnderReplicatedTopics returns a *pb.TopicResponse holding the names of all under replicated topics.

func (*Server) UnmappedBrokers

func (s *Server) UnmappedBrokers(ctx context.Context, req *pb.UnmappedBrokersRequest) (*pb.BrokerResponse, error)

UnmappedBrokers returns a list of broker IDs that hold no partitions. An optional list of topic names can be specified in the UnmappedBrokersRequest exclude field where partitions for those topics are not considered. For example, broker 1000 holds no partitions other than one belonging to the 'test0' topic. If UnmappedBrokers is called with 'test0' specified as an exclude name, broker 1000 will be returned in the BrokerResponse as an unmapped broker.

func (*Server) ValidateRequest

func (s *Server) ValidateRequest(ctx context.Context, req interface{}, kind int) (context.Context, context.CancelFunc, error)

ValidateRequest takes an incoming request context, params, and request kind. The request is logged and checked against the appropriate request throttler. If the incoming context did not have a deadline set, the server a derived context is created with the server default timeout. The child context and error are returned.

type TagCleaner

type TagCleaner struct {
	// contains filtered or unexported fields
}

func (*TagCleaner) RunTagCleanup

func (tc *TagCleaner) RunTagCleanup(s *Server, ctx context.Context, c Config)

RunTagCleanup is regularly checks for tags that are stale and need clean up.

type TagHandler

type TagHandler struct {
	Store TagStorage
}

TagHandler provides object filtering by tags along with tag storage and retrieval.

func NewTagHandler

func NewTagHandler(c TagHandlerConfig) (*TagHandler, error)

NewTagHandler initializes a TagHandler.

func (*TagHandler) FilterBrokers

func (t *TagHandler) FilterBrokers(in BrokerSet, tags Tags) (BrokerSet, error)

FilterBrokers takes a map of broker IDs to *pb.Broker and tags KV list. A filtered map is returned that includes brokers where all tags values match the provided input tag KVs. Additionally, any custom tags persisted in the TagStorage backend are populated into the Tags field for each matched object.

func (*TagHandler) FilterTopics

func (t *TagHandler) FilterTopics(in TopicSet, tags Tags) (TopicSet, error)

FilterTopics takes a map of topic names to *pb.Topic and tags KV list. A filtered map is returned that includes topics where all tags values match the provided input tag KVs. Additionally, any custom tags persisted in the TagStorage backend are populated into the Tags field for each matched object.

func (*TagHandler) TagSetFromObject

func (t *TagHandler) TagSetFromObject(o interface{}) (TagSet, error)

TagSetFromObject takes a protobuf type and returns the default TagSet along with any user-defined tags.

type TagHandlerConfig

type TagHandlerConfig struct {
	Prefix string
}

TagHandlerConfig holds TagHandler configuration.

type TagSet

type TagSet map[string]string

TagSet is a map of key:values.

func (TagSet) Equal

func (t1 TagSet) Equal(t2 TagSet) bool

Equal checks if the input TagSet has the same key:value pairs as the calling TagSet.

func (TagSet) Keys

func (t TagSet) Keys() []string

Keys returns a []string of all tag keys for a TagSet.

func (TagSet) Tags

func (t TagSet) Tags() Tags

Tags takes a TagSet and returns a Tags.

type TagStorage

type TagStorage interface {
	LoadReservedFields(ReservedFields) error
	FieldReserved(KafkaObject, string) bool
	SetTags(KafkaObject, TagSet) error
	GetTags(KafkaObject) (TagSet, error)
	DeleteTags(KafkaObject, []string) error
	GetAllTags() (map[KafkaObject]TagSet, error)
}

TagStorage handles tag persistence to stable storage.

type Tags

type Tags []string

Tags is a []string of "key:value" pairs.

func (Tags) Keys

func (t Tags) Keys() []string

Keys returns a []string of all tag keys for a Tags. It's possible to receive fully formed tags or just tag keys.

func (Tags) TagSet

func (t Tags) TagSet() (TagSet, error)

TagSet takes a Tags and returns a TagSet and error for any malformed tags. Tags are expected to be formatted as a comma delimited "key:value,key2:value2" string. TODO normalize all tag usage to lower case.

type TopicSet

type TopicSet map[string]*pb.Topic

TopicSet is a mapping of topic name to *pb.Topic.

func TopicSetFromSlice

func TopicSetFromSlice(s []string) TopicSet

TopicSetFromSlice converts a slice into a TopicSet for convenience

func (TopicSet) Names

func (t TopicSet) Names() []string

Names returns a []string of topic names from a TopicSet.

type ZKTagStorage

type ZKTagStorage struct {
	ReservedFields ReservedFields
	Prefix         string
	ZK             kafkazk.Handler
}

ZKTagStorage implements tag persistence in ZooKeeper.

func NewZKTagStorage

func NewZKTagStorage(c ZKTagStorageConfig) (*ZKTagStorage, error)

NewZKTagStorage initializes a ZKTagStorage.

func (*ZKTagStorage) DeleteTags

func (t *ZKTagStorage) DeleteTags(o KafkaObject, keysToDelete []string) error

DeleteTags deletes all tags in the list of keys for the requested KafkaObject.

func (*ZKTagStorage) FieldReserved

func (t *ZKTagStorage) FieldReserved(o KafkaObject, f string) bool

FieldReserved takes a KafkaObject and field name. A bool is returned that indicates whether the field is reserved for the respective KafkaObject type.

func (*ZKTagStorage) GetAllTags

func (t *ZKTagStorage) GetAllTags() (map[KafkaObject]TagSet, error)

GetAllTags returns all tags stored in the tagstore, keyed by the resource they correspond to.

func (*ZKTagStorage) GetAllTagsForType

func (t *ZKTagStorage) GetAllTagsForType(kafkaObjectType string) (map[KafkaObject]TagSet, error)

GetAllTagsForType gets all the tags for objects of the given type. A convenience method that makes getting every tag a little easier.

func (*ZKTagStorage) GetTags

func (t *ZKTagStorage) GetTags(o KafkaObject) (TagSet, error)

GetTags returns the TagSet for the requested KafkaObject.

func (*ZKTagStorage) Init

func (t *ZKTagStorage) Init() error

Init ensures the ZooKeeper connection is ready and any required znodes are created.

func (*ZKTagStorage) LoadReservedFields

func (t *ZKTagStorage) LoadReservedFields(r ReservedFields) error

LoadReservedFields takes a ReservedFields and stores it at ZKTagStorage.ReservedFields and returns an error.

func (*ZKTagStorage) SetTags

func (t *ZKTagStorage) SetTags(o KafkaObject, ts TagSet) error

SetTags takes a KafkaObject and TagSet and sets the tag key:values for the object.

type ZKTagStorageConfig

type ZKTagStorageConfig struct {
	ZKAddr string
	Prefix string
}

ZKTagStorageConfig holds ZKTagStorage configs.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL