franz

package
v0.0.0-...-c9decf8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 29, 2024 License: Apache-2.0 Imports: 18 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrNoMessages = errors.New("no messages available")
	ErrNoRegistry = errors.New("registry undefined")
)

Functions

func ValidateACLs

func ValidateACLs(kafkaAcls KafkaACLs) bool

ValidateACLs runs some sanity checks on a list of ACLs currently it just checks for duplicated ACL entries

Types

type ACL

type ACL struct {
	Host           string `json:"host"`
	Operation      string `json:"operation"`
	PermissionType string `json:"permissionType" yaml:"action"`
	Principal      string `json:"principal"`
}

ACL represents a single ACL

type ACLDiff

type ACLDiff struct {
	ToCreate []sarama.ResourceAcls
	ToDelete []sarama.ResourceAcls
}

ACLDiff holds the diff in sarama form

func (ACLDiff) Transform

func (a ACLDiff) Transform() ACLDiffTransformed

Transform transforms from sarama to franz native form.

type ACLDiffTransformed

type ACLDiffTransformed struct {
	ToCreate KafkaACLs `json:"to_create" yaml:"to_create"`
	ToDelete KafkaACLs `json:"to_delete" yaml:"to_delete"`
}

ACLDiffTransformed holds the diff in transformed, franz native form

type ACLs

type ACLs struct {
	Version     int    `json:"version" yaml:"-"`
	Name        string `json:"name" yaml:"name"`
	PatternType string `json:"pattern_type" yaml:"resourcePatternType,omitempty"` // optional, exists only for kafka version 2.1 and above
	ACL         []ACL  `json:"acls" yaml:"acl"`
}

ACLs represents a versioned set of access control lists for a kafka resource. TODO: maybe name it ResourceACLs to align it with sarama

type AlteredConfigs

type AlteredConfigs struct {
	TopicName string
	Configs   map[string]*string
}

type ClusterAdmin

type ClusterAdmin struct {
	// contains filtered or unexported fields
}

func (*ClusterAdmin) GetACLsDiff

func (c *ClusterAdmin) GetACLsDiff(kafkaAcls KafkaACLs) (ACLDiff, error)

GetACLsDiff returns a diff between the passed in ACLs and the current configured ACLs

func (*ClusterAdmin) GetTransformedAcls

func (c *ClusterAdmin) GetTransformedAcls() (KafkaACLs, error)

GetTransformedAcls returns the current ACL configuration in franz native form

func (*ClusterAdmin) SetKafkaAcls

func (c *ClusterAdmin) SetKafkaAcls(diff ACLDiff) error

SetKafkaAcls takes a list of resource ACLs and does the appropriate actions (create, delete) to install these ACLs in Kafka

func (*ClusterAdmin) SetKafkaTopics

func (c *ClusterAdmin) SetKafkaTopics(diff TopicDiff) error

SetKafkaTopics creates/alters/deletes the topics in Kafka to reach the configuration given in the file.

type Config

type Config struct {
	KafkaVersion   string
	Brokers        []string
	SchemaRegistry string
	*TLSConfig
}

type Franz

type Franz struct {
	// contains filtered or unexported fields
}

func New

func New(c Config, verbose bool) (*Franz, error)

func (*Franz) Close

func (f *Franz) Close() error

func (*Franz) GetACLsDiff

func (f *Franz) GetACLsDiff(kafkaACLs KafkaACLs) (ACLDiff, error)

GetACLsDiff returns a diff between the passed in ACLs and the current configured ACLs

func (*Franz) GetAcls

func (f *Franz) GetAcls() (KafkaACLs, error)

getAcls is the command run by kafka acls list

func (*Franz) GetTopicsDiff

func (f *Franz) GetTopicsDiff(topics []Topic) (TopicDiff, error)

func (*Franz) GetTopicsExisting

func (f *Franz) GetTopicsExisting(includeInternal bool) ([]Topic, error)

GetTopicsExisting retrieves the topic names and configuration parameters The configuration parameters are filtered - only the non-default are returned

func (*Franz) HistoryEntries

func (f *Franz) HistoryEntries(req HistoryRequest) ([]Message, error)

func (*Franz) Monitor

func (f *Franz) Monitor(req MonitorRequest) (*Receiver, error)

func (*Franz) NewProducer

func (f *Franz) NewProducer() (*Producer, error)

func (*Franz) Registry

func (f *Franz) Registry() Registry

func (*Franz) SetACLs

func (f *Franz) SetACLs(diff ACLDiff) error

setAcls is the command run by kafka acls setAcls

func (*Franz) SetTopics

func (f *Franz) SetTopics(diff TopicDiff) error

func (*Franz) Status

func (f *Franz) Status() ([]Status, error)

type HistoryRequest

type HistoryRequest struct {
	Topic      string
	From, To   time.Time // To takes precedence over Count
	Count      int64
	Partitions []int32
	Decode     bool
}

type KafkaACLs

type KafkaACLs struct {
	Cluster       []ACLs
	ConsumerGroup []ACLs
	Topic         []ACLs
}

type Message

type Message struct {
	Topic      string
	Timestamp  time.Time
	Partition  int32
	Key, Value string
	Offset     int64
}

type MonitorRequest

type MonitorRequest struct {
	Topic      string
	Partitions []int32
	Count      int64
	Follow     bool
	Decode     bool
}

type Producer

type Producer struct {
	// contains filtered or unexported fields
}

func (*Producer) Close

func (p *Producer) Close() error

func (*Producer) SendMessage

func (p *Producer) SendMessage(topic, msg, key string) error

func (*Producer) SendMessageEncoded

func (p *Producer) SendMessageEncoded(topic, msg, key string, schemaID uint32) error

SendMessageEncoded encodes and sends the JSON format msg with Avro serialization

type Receiver

type Receiver struct {
	// contains filtered or unexported fields
}

func (*Receiver) Next

func (r *Receiver) Next() (Message, error)

Next retrieves the next message. If there are no more messages, io.EOF is returned. Not thread-safe.

func (*Receiver) Stop

func (r *Receiver) Stop()

Stop instructs the receiver to finish receiving messages. Does not block until all goroutines have finished. Instead, Next() can be called to drain the remaining messages and wait until all goroutines have finished.

type Registry

type Registry interface {
	Subjects() ([]string, error)
	SchemaByID(uint32) (string, error)
	SchemaBySubject(string) (Schema, error)
}

type Result

type Result struct {
	// contains filtered or unexported fields
}

type Schema

type Schema schemaregistry.Schema

type Status

type Status struct {
	Topic     string
	Partition int32
	Leader    int32
	Replicas  []int32
	ISR       []int32
	Internal  bool
}

type TLSConfig

type TLSConfig struct {
	CertFile, KeyFile, CaFile string
}

type Topic

type Topic struct {
	Name              string               `json:"name" yaml:"name"`
	NumPartitions     int                  `json:"num_partitions" yaml:"partitions"`
	ReplicationFactor int                  `json:"replication_factor" yaml:"replication"`
	Configs           []sarama.ConfigEntry `json:"configs" yaml:"configs"`
}

Topic represents a Kafka topic with its configuration

type TopicDiff

type TopicDiff struct {
	ToCreate []Topic          `json:"to_create" yaml:"to_create"`
	ToDelete []Topic          `json:"to_delete" yaml:"to_delete"`
	ToAlter  []AlteredConfigs `json:"to_alter" yaml:"to_alter"`
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL