logical_broker

package
v0.0.0-...-fcf41b5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 3, 2022 License: MIT Imports: 26 Imported by: 0

Documentation

Index

Constants

View Source
const (
	InternalControlKey         = "__krouter_control"
	InternalTopicTopicPointers = "__krouter_topic_pointers"
)
View Source
const (
	GroupCoordinatorRedisKeyFmt          = "{group-%s}-coordinator"
	GroupGenerationRedisKeyFmt           = "{group-%s}-generation"
	GroupTopicOffsetRedisKeyFmt          = "{group-%s}-offset-topic"
	GroupTopicPartitionOffsetRedisKeyFmt = GroupTopicOffsetRedisKeyFmt + "-%s-partition-%d"
)
View Source
const (
	InternalTopicAcls = "__krouter_acls"
)
View Source
const (
	InternalTopicBrokers = "__krouter_brokers"
)
View Source
const (
	InternalTopicTopicConfig = "__krouter_topic_configs"
)

Variables

This section is empty.

Functions

This section is empty.

Types

type Authorizer

type Authorizer struct {
	// contains filtered or unexported fields
}

func NewAuthorizer

func NewAuthorizer(log logr.Logger, kafkaClient *kgo.Client) (*Authorizer, error)

func (*Authorizer) ConsumeAcls

func (c *Authorizer) ConsumeAcls()

func (*Authorizer) CreateAcl

func (c *Authorizer) CreateAcl(operation models.ACLOperation, resourceType models.ACLResourceType, patternType models.ACLPatternType, resourceName string, principal string, permission models.ACLPermission) error

func (*Authorizer) CreateTestACLs

func (c *Authorizer) CreateTestACLs() error

func (*Authorizer) GetAcls

func (c *Authorizer) GetAcls(resourceType models.ACLResourceType, resourceName *string, patternType models.ACLPatternType, principal *string, operation models.ACLOperation, permission models.ACLPermission) []*models.ACL

func (*Authorizer) WaitSynced

func (c *Authorizer) WaitSynced()

type Cluster

type Cluster struct {
	Name string
	// contains filtered or unexported fields
}

func NewCluster

func NewCluster(name string, addrs []string, log logr.Logger, controller *Controller) (*Cluster, error)

func (*Cluster) APICreateTopic

func (c *Cluster) APICreateTopic(topicName string, partitions int32, replicationFactor int16, config map[string]*string) (*models.Topic, error)

func (*Cluster) APIDeleteTopicCluster

func (c *Cluster) APIDeleteTopicCluster(topicName string) error

func (*Cluster) APIGetTopic

func (c *Cluster) APIGetTopic(topicName string) (*models.Topic, error)

func (*Cluster) APIUpdateTopic

func (c *Cluster) APIUpdateTopic(topicName string, partitions int32, config map[string]*string) (*models.Topic, error)

func (*Cluster) Close

func (c *Cluster) Close() error

func (*Cluster) ConsumeTopicConfigs

func (c *Cluster) ConsumeTopicConfigs()

func (*Cluster) Fetch

func (c *Cluster) Fetch(brokerID int32, request *kmsg.FetchRequest) (*kmsg.FetchResponse, error)

func (*Cluster) ListOffsets

func (c *Cluster) ListOffsets(brokerID int32, request *kmsg.ListOffsetsRequest) (*kmsg.ListOffsetsResponse, error)

func (*Cluster) Produce

func (c *Cluster) Produce(brokerID int32, transactionID *string, timeoutMillis int32, topics []kmsg.ProduceRequestTopic) (*kmsg.ProduceResponse, error)

func (*Cluster) TopicMetadata

func (c *Cluster) TopicMetadata(ctx context.Context, topics []string) (*kmsg.MetadataResponse, error)

func (*Cluster) WaitSynced

func (c *Cluster) WaitSynced()

type Controller

type Controller struct {
	// contains filtered or unexported fields
}

func NewController

func NewController(log logr.Logger, addrs []string, redisClient *redisw.RedisClient) (*Controller, error)

func (*Controller) APIDeleteTopicPointer

func (c *Controller) APIDeleteTopicPointer(topicName string) error

func (*Controller) APIGetTopicPointer

func (c *Controller) APIGetTopicPointer(topicName string) (*string, error)

func (*Controller) APISetTopicPointer

func (c *Controller) APISetTopicPointer(topicName string, cluster string) error

func (*Controller) ConsumeTopicPointers

func (c *Controller) ConsumeTopicPointers()

func (*Controller) CreateInternalTopics

func (c *Controller) CreateInternalTopics() error

func (*Controller) DescribeGroup

func (c *Controller) DescribeGroup(group string) (*kmsg.DescribeGroupsResponse, error)

func (*Controller) FindGroupCoordinator

func (c *Controller) FindGroupCoordinator(consumerGroup string) (*kmsg.FindCoordinatorResponse, error)

func (*Controller) FindTransactionCoordinator

func (c *Controller) FindTransactionCoordinator(transaction string) (*kmsg.FindCoordinatorResponse, error)

func (*Controller) GetAuthorizer

func (c *Controller) GetAuthorizer() *Authorizer

func (*Controller) HeartBeat

func (c *Controller) HeartBeat(request *kmsg.HeartbeatRequest) (*kmsg.HeartbeatResponse, error)

func (*Controller) InitProducer

func (c *Controller) InitProducer(transactionTimeoutDuration time.Duration) (*kmsg.InitProducerIDResponse, error)

func (*Controller) JoinGroup

func (c *Controller) JoinGroup(request *kmsg.JoinGroupRequest) (*kmsg.JoinGroupResponse, error)

func (*Controller) LeaveGroup

func (c *Controller) LeaveGroup(request *kmsg.LeaveGroupRequest) (*kmsg.LeaveGroupResponse, error)

func (*Controller) OffsetCommit

func (c *Controller) OffsetCommit(group, topic string, groupGenerationId, partition int32, offset int64) (errors.KafkaError, error)

func (*Controller) OffsetFetch

func (c *Controller) OffsetFetch(group, topic string, partition int32) (int64, error)

func (*Controller) OffsetFetchAllTopics

func (c *Controller) OffsetFetchAllTopics(group string) (map[string]map[int32]int64, error)

func (*Controller) Start

func (c *Controller) Start() error

func (*Controller) SyncGroupCustom

func (c *Controller) SyncGroupCustom(request *franz.SyncGroupRequest) (*kmsg.SyncGroupResponse, error)

type LogicalBroker

type LogicalBroker struct {
	AdvertiseListener *net.TCPAddr
	ClusterID         string
	BrokerID          int32
	// contains filtered or unexported fields
}

func InitBroker

func InitBroker(log logr.Logger, advertiseListener *net.TCPAddr, clusterID string, brokerID int32, redisAddresses []string) (*LogicalBroker, error)

func (*LogicalBroker) Close

func (b *LogicalBroker) Close() error

func (*LogicalBroker) ConsumeBrokers

func (b *LogicalBroker) ConsumeBrokers()

func (*LogicalBroker) GetBroker

func (b *LogicalBroker) GetBroker(brokerID int32) *models.Broker

func (*LogicalBroker) GetClusterByTopic

func (b *LogicalBroker) GetClusterByTopic(topicName string) *Cluster

func (*LogicalBroker) GetClusters

func (b *LogicalBroker) GetClusters() map[string]*Cluster

func (*LogicalBroker) GetController

func (b *LogicalBroker) GetController() *Controller

func (*LogicalBroker) GetRegisteredBrokers

func (b *LogicalBroker) GetRegisteredBrokers() []*models.Broker

func (*LogicalBroker) GetTopic

func (b *LogicalBroker) GetTopic(topicName string) (*Cluster, *models.Topic)

func (*LogicalBroker) GetTopics

func (b *LogicalBroker) GetTopics() ([]*models.Topic, error)

func (*LogicalBroker) Start

func (b *LogicalBroker) Start() error

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL