kafka

package
v1.0.0-beta.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 22, 2021 License: Apache-2.0 Imports: 17 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrSaramaSessionContextDone = errors.New("session context done")

Functions

func NewTLSConfig

func NewTLSConfig(configTLS TLSConfig) (*tls.Config, error)

Types

type AvroEncoder

type AvroEncoder struct {
	SchemaID int
	Content  []byte
}

TODO: move to serializer AvroEncoder encodes schemaId and Avro message.

func (*AvroEncoder) Encode

func (a *AvroEncoder) Encode() ([]byte, error)

func (*AvroEncoder) Length

func (a *AvroEncoder) Length() int

Length of schemaId and Content.

type AvroProducer

type AvroProducer struct {
	// contains filtered or unexported fields
}

func NewAvroProducer

func NewAvroProducer(
	brokers []string,
	kafkaVersion string,
	configTLS TLSConfig,
) (
	*AvroProducer, error,
)

func (*AvroProducer) Add

func (c *AvroProducer) Add(
	topic string,
	schema string,
	schemaID int,
	key []byte,
	value map[string]interface{},
) error

func (*AvroProducer) Close

func (c *AvroProducer) Close()

type Client

type Client interface {
	// Topics return all the topics present in kafka, it keeps a cache
	// which is refreshed every cacheValidity seconds.
	Topics() ([]string, error)

	// LastOffset returns the current offset for the topic partition.
	LastOffset(topic string, partition int32) (int64, error)

	// CurrentOffset talks to kafka and finds the current offset for the
	// consumer group. It makes call to all brokers to determine
	// the current offset. If group is not found it returns -1.
	CurrentOffset(id string, topic string, partition int32) (int64, error)

	// List the consumer groups available in the cluster.
	ListConsumerGroups() (map[string]string, error)

	// Delete a consumer group.
	DeleteConsumerGroup(name string) error
}

func NewClient

func NewClient(
	brokers []string,
	version string,
	configTLS TLSConfig,
) (
	Client, error,
)

type ConsumerGroupConfig

type ConsumerGroupConfig struct {
	GroupID      string `yaml:"groupID"`
	TopicRegexes string `yaml:"topicRegexes"`
	// TODO: LoaderTopicPrefix is "" for loader consumer groups
	// it should be an optional field, a pointer.
	LoaderTopicPrefix string       `yaml:"loaderTopicPrefix"` // default is there
	Kafka             KafkaConfig  `yaml:"kafka"`
	Sarama            SaramaConfig `yaml:"sarama"`
}

type ConsumerGroupInterface

type ConsumerGroupInterface interface {
	Topics() ([]string, error)

	// RefreshMetadata takes a list of topics and
	// queries the cluster to refresh the
	// available metadata for those topics.
	// If no topics are provided, it will refresh
	// metadata for all topics.
	RefreshMetadata(topics ...string) error

	LastOffset(topic string, partition int32) (int64, error)
	Consume(ctx context.Context, topics []string) error
	Close() error
}

func NewConsumerGroup

func NewConsumerGroup(
	config ConsumerGroupConfig,
	consumerGroupHandler sarama.ConsumerGroupHandler,
) (
	ConsumerGroupInterface,
	error,
)

func NewSaramaConsumerGroup

func NewSaramaConsumerGroup(
	config ConsumerGroupConfig,
	consumerGroupHandler sarama.ConsumerGroupHandler,
) (
	ConsumerGroupInterface,
	error,
)

type KafkaConfig

type KafkaConfig struct {
	Brokers     string    `yaml:"brokers"`
	Version     string    `yaml:"version"`     // default is there
	KafkaClient string    `yaml:"kafkaClient"` // default is there
	TLSConfig   TLSConfig `yaml:"tlsConfig"`
}

type Manager

type Manager struct {

	// ready is used to signal the main thread about the readiness of
	// the manager
	Ready chan bool
	// contains filtered or unexported fields
}

func NewManager

func NewManager(
	consumerGroup ConsumerGroupInterface,
	consumerGroupID string,
	regexes string,

) *Manager

func (*Manager) Consume

func (c *Manager) Consume(ctx context.Context, wg *sync.WaitGroup)

func (*Manager) SyncTopics

func (c *Manager) SyncTopics(
	ctx context.Context,
	wg *sync.WaitGroup,
)

type SaramaConfig

type SaramaConfig struct {
	Assignor                string `yaml:"assignor"` // default is there
	Oldest                  bool   `yaml:"oldest"`
	Log                     bool   `yaml:"log"`
	AutoCommit              bool   `yaml:"autoCommit"`
	SessionTimeoutSeconds   *int   `yaml:"sessionTimeoutSeconds,omitempty"`   // default 20s
	HearbeatIntervalSeconds *int   `yaml:"hearbeatIntervalSeconds,omitempty"` // default 6s
	MaxProcessingTime       *int32 `yaml:"maxProcessingTime,omitempty"`       // default is of sarama
}

type TLSConfig

type TLSConfig struct {
	Enable   bool   `yaml:"enable"`
	UserCert string `yaml:"userCert"`
	UserKey  string `yaml:"userKey"`
	CACert   string `yaml:"caCert"`
}

TLSConfig stores the base64 encoded string for all the certificate and keys required for the TLS setup and authentication with kafka for the client

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL