Documentation ¶
Index ¶
- Variables
- func ValidateACLs(kafkaAcls KafkaACLs) bool
- type ACL
- type ACLDiff
- type ACLDiffTransformed
- type ACLs
- type AlteredConfigs
- type ClusterAdmin
- type Config
- type Franz
- func (f *Franz) Close() error
- func (f *Franz) GetACLsDiff(kafkaACLs KafkaACLs) (ACLDiff, error)
- func (f *Franz) GetAcls() (KafkaACLs, error)
- func (f *Franz) GetTopicsDiff(topics []Topic) (TopicDiff, error)
- func (f *Franz) GetTopicsExisting(includeInternal bool) ([]Topic, error)
- func (f *Franz) HistoryEntries(req HistoryRequest) ([]Message, error)
- func (f *Franz) Monitor(req MonitorRequest) (*Receiver, error)
- func (f *Franz) NewProducer() (*Producer, error)
- func (f *Franz) Registry() Registry
- func (f *Franz) SetACLs(diff ACLDiff) error
- func (f *Franz) SetTopics(diff TopicDiff) error
- func (f *Franz) Status() ([]Status, error)
- type HistoryRequest
- type KafkaACLs
- type Message
- type MonitorRequest
- type Producer
- type Receiver
- type Registry
- type Result
- type Schema
- type Status
- type TLSConfig
- type Topic
- type TopicDiff
Constants ¶
This section is empty.
Variables ¶
var ( ErrNoMessages = errors.New("no messages available") ErrNoRegistry = errors.New("registry undefined") )
var OperationCode = map[string]sarama.AclOperation{ "Unknown": sarama.AclOperationUnknown, "Any": sarama.AclOperationAny, "All": sarama.AclOperationAll, "Read": sarama.AclOperationRead, "Write": sarama.AclOperationWrite, "Create": sarama.AclOperationCreate, "Delete": sarama.AclOperationDelete, "Alter": sarama.AclOperationAlter, "Describe": sarama.AclOperationDescribe, "CusterAction": sarama.AclOperationClusterAction, "DescribeConfigs": sarama.AclOperationDescribeConfigs, "AlterConfigs": sarama.AclOperationAlterConfigs, "IdempotentWrite": sarama.AclOperationIdempotentWrite, }
var PermissionCode = map[string]sarama.AclPermissionType{ "Unknown": sarama.AclPermissionUnknown, "Any": sarama.AclPermissionAny, "Deny": sarama.AclPermissionDeny, "Allow": sarama.AclPermissionAllow, }
Functions ¶
func ValidateACLs ¶
ValidateACLs runs some sanity checks on a list of ACLs currently it just checks for duplicated ACL entries
Types ¶
type ACL ¶
type ACL struct { Host string `json:"host"` Operation string `json:"operation"` PermissionType string `json:"permissionType" yaml:"action"` Principal string `json:"principal"` }
ACL represents a single ACL
type ACLDiff ¶
type ACLDiff struct { ToCreate []sarama.ResourceAcls ToDelete []sarama.ResourceAcls }
ACLDiff holds the diff in sarama form
func (ACLDiff) Transform ¶
func (a ACLDiff) Transform() ACLDiffTransformed
Transform transforms from sarama to franz native form.
type ACLDiffTransformed ¶
type ACLDiffTransformed struct { ToCreate KafkaACLs `json:"to_create" yaml:"to_create"` ToDelete KafkaACLs `json:"to_delete" yaml:"to_delete"` }
ACLDiffTransformed holds the diff in transformed, franz native form
type ACLs ¶
type ACLs struct { Version int `json:"version" yaml:"-"` Name string `json:"name" yaml:"name"` PatternType string `json:"pattern_type" yaml:"resourcePatternType,omitempty"` // optional, exists only for kafka version 2.1 and above ACL []ACL `json:"acls" yaml:"acl"` }
ACLs represents a versioned set of access control lists for a kafka resource. TODO: maybe name it ResourceACLs to align it with sarama
type AlteredConfigs ¶
type ClusterAdmin ¶
type ClusterAdmin struct {
// contains filtered or unexported fields
}
func (*ClusterAdmin) GetACLsDiff ¶
func (c *ClusterAdmin) GetACLsDiff(kafkaAcls KafkaACLs) (ACLDiff, error)
GetACLsDiff returns a diff between the passed in ACLs and the current configured ACLs
func (*ClusterAdmin) GetTransformedAcls ¶
func (c *ClusterAdmin) GetTransformedAcls() (KafkaACLs, error)
GetTransformedAcls returns the current ACL configuration in franz native form
func (*ClusterAdmin) SetKafkaAcls ¶
func (c *ClusterAdmin) SetKafkaAcls(diff ACLDiff) error
SetKafkaAcls takes a list of resource ACLs and does the appropriate actions (create, delete) to install these ACLs in Kafka
func (*ClusterAdmin) SetKafkaTopics ¶
func (c *ClusterAdmin) SetKafkaTopics(diff TopicDiff) error
SetKafkaTopics creates/alters/deletes the topics in Kafka to reach the configuration given in the file.
type Franz ¶
type Franz struct {
// contains filtered or unexported fields
}
func (*Franz) GetACLsDiff ¶
GetACLsDiff returns a diff between the passed in ACLs and the current configured ACLs
func (*Franz) GetTopicsExisting ¶
GetTopicsExisting retrieves the topic names and configuration parameters The configuration parameters are filtered - only the non-default are returned
func (*Franz) HistoryEntries ¶
func (f *Franz) HistoryEntries(req HistoryRequest) ([]Message, error)
func (*Franz) NewProducer ¶
type HistoryRequest ¶
type MonitorRequest ¶
type Producer ¶
type Producer struct {
// contains filtered or unexported fields
}
func (*Producer) SendMessage ¶
type Receiver ¶
type Receiver struct {
// contains filtered or unexported fields
}
type Schema ¶
type Schema schemaregistry.Schema
type Topic ¶
type Topic struct { Name string `json:"name" yaml:"name"` NumPartitions int `json:"num_partitions" yaml:"partitions"` ReplicationFactor int `json:"replication_factor" yaml:"replication"` Configs []sarama.ConfigEntry `json:"configs" yaml:"configs"` }
Topic represents a Kafka topic with its configuration
type TopicDiff ¶
type TopicDiff struct { ToCreate []Topic `json:"to_create" yaml:"to_create"` ToDelete []Topic `json:"to_delete" yaml:"to_delete"` ToAlter []AlteredConfigs `json:"to_alter" yaml:"to_alter"` }