agent

package module
v0.0.0-...-2ae163f Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 6, 2020 License: MIT Imports: 6 Imported by: 2

README

sync4kafka

This is simple synchronization library that connects to single kafka broker and syncs all messages from all topics / partitions that broker is leader for into outbound go channel.

It keeps dynamically listen to broker metadata updates, picks up or drops re-assigned partitions, and keep consuming all relevant messages.

Primarily developed for https://github.com/dvsekhvalnov/k-ray and unlikely can be very useful outside of it.

Status

Experimental. Born from hackathon project. Not really follows any go best practices and patterns for channels.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Consume

func Consume(in <-chan *MetadataChanges, ctx *globals) <-chan *sarama.ConsumerMessage

func Diff

func Diff(in <-chan *Server, ctx *globals) <-chan *MetadataChanges

func FetchOffsets

func FetchOffsets(ctx *globals, partition *PartitionInfo) int64

func MarkOffsets

func MarkOffsets(ctx *globals, offset int64, topic string, partition int32)

func Refresh

func Refresh(in <-chan time.Time, ctx *globals) <-chan *Server

func Sync

func Sync(cfg *Config) <-chan *sarama.ConsumerMessage

func TrackOffsets

func TrackOffsets(in <-chan *sarama.ConsumerMessage, ctx *globals) <-chan *sarama.ConsumerMessage

Types

type Config

type Config struct {
	*sarama.Config

	// Local broker URL to connect to
	BrokerUrl string

	// How often refresh metadata about topics in ms
	MetadataRefreshMs int64

	// Optional. Broker advertised url to figure out local broker ID
	// if not provided, will be attempted to auto discovery based on existsting network
	// interfaces
	BrokerAdvertisedUrl string

	// Optional. Whether to commit & track consumed partitions offsets. When set to 'true',
	// will periodically commit position back to kafka __consumer_offsets topic.
	// Default is 'false'.
	AutotrackOffsets bool

	// Optional. consumer group name when AutotrackOffsets set to true
	ConsumerGroup string

	// Optional. Callback to retrive partition offset information if stored outside of Kafka
	// When AutotrackOffsets if used, the value is ignored and kafka based offset management is used
	// Default is no offset tracking, will re-consume all matching topics
	FetchOffsets func(partition *PartitionInfo) int64
	// contains filtered or unexported fields
}

func NewConfig

func NewConfig() *Config

func (*Config) Exclude

func (cfg *Config) Exclude(patterns ...string) *Config

func (*Config) Include

func (cfg *Config) Include(patterns ...string) *Config

type KafkaPartition

type KafkaPartition struct {
	Topic    string
	Metadata *sarama.PartitionMetadata
}

type MetadataChanges

type MetadataChanges struct {
	Added   []PartitionInfo
	Removed []PartitionInfo
}

type PartitionInfo

type PartitionInfo struct {
	Topic string
	ID    int32
}

type Server

type Server struct {
	// contains filtered or unexported fields
}

func DiscoverServer

func DiscoverServer(url string, cfg *Config) *Server

func RefreshMetadata

func RefreshMetadata(ctx *globals) *Server

func (*Server) Partitions

func (server *Server) Partitions() []KafkaPartition

type StdLogger

type StdLogger interface {
	Print(v ...interface{})
	Printf(format string, v ...interface{})
	Println(v ...interface{})
}

Logger is used for library logging

var Log StdLogger = log.New(ioutil.Discard, "[KafkaSyncer] [DEBUG]", log.LstdFlags)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL