kafkasync

package module
v1.0.10 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 15, 2020 License: LGPL-3.0 Imports: 8 Imported by: 0

README

Generic key/value source to topic synchronization package.

Example

package main

import (
	"flag"
	"strings"
	"time"

	"github.com/mcluseau/kafka-sync"
	"github.com/Shopify/sarama"
	"github.com/golang/glog"
)

func main() {
	brokers := flag.String("brokers", "kafka:9092", "Kafka brokers, comma separated")
	flag.Set("logtostderr", "true")
	flag.Parse()

	conf := sarama.NewConfig()
	conf.Producer.Return.Successes = true
	conf.Producer.RequiredAcks = sarama.WaitForAll

	kafka, err := sarama.NewClient(strings.Split(*brokers, ","), conf)
	if err != nil {
		glog.Fatal(err)
	}

	kvSource := make(chan kafkasync.KeyValue, 10)
	go fetchData(kvSource)

	syncer := kafkasync.New("tests.data2kafka")
	stats, err := syncer.Sync(kafka, kvSource)
	if err != nil {
		glog.Fatal(err)
	}

	glog.Infof("Sync stats:")
	stats.Log()

	if stats.ErrorCount > 0 {
		glog.Fatalf("%d send errors", stats.Count)
	}
}

func fetchData(kvSource chan kafkasync.KeyValue) {
	defer close(kvSource)

	kvSource <- kafkasync.KeyValue{
		Key:   []byte("test-key"),
		Value: []byte("test-value"),
	}
	kvSource <- kafkasync.KeyValue{
		Key:   []byte("test-key-changing"),
		Value: []byte("test-value " + time.Now().String()),
	}
	kvSource <- kafkasync.KeyValue{
		Key:   []byte("test-key-var " + time.Now().String()),
		Value: []byte("test-value"),
	}
}

Documentation

Index

Constants

This section is empty.

Variables

View Source
var Debug = false

Functions

This section is empty.

Types

type KeyValue

type KeyValue = diff.KeyValue

type Stats

type Stats struct {
	// Diff statistics
	Created   uint64
	Modified  uint64
	Deleted   uint64
	Unchanged uint64

	// Producer statistics
	SendCount    uint64
	SuccessCount int64
	ErrorCount   int64

	// The count of defined key values.
	Count uint64

	// Performance statistics
	MessagesInTopic   uint64
	ReadTopicDuration time.Duration
	SyncDuration      time.Duration
	TotalDuration     time.Duration
	// contains filtered or unexported fields
}

func NewStats

func NewStats() *Stats

func (*Stats) Elapsed

func (stats *Stats) Elapsed() time.Duration

func (*Stats) Log

func (stats *Stats) Log()

func (*Stats) LogString added in v1.0.1

func (stats *Stats) LogString() string

type Syncer

type Syncer struct {
	// The topic to synchronize.
	Topic string

	// The topic's partition to synchronize.
	Partition int32

	// The value to use when a key is removed.
	RemovedValue []byte

	// Don't really send messages
	DryRun bool
}

func New

func New(topic string) Syncer

func (*Syncer) ApplyChanges

func (s *Syncer) ApplyChanges(changes <-chan diff.Change, send func(KeyValue), stats *Stats, cancel <-chan bool)

func (*Syncer) IndexTopic

func (s *Syncer) IndexTopic(kafka sarama.Client, index diff.Index) (msgCount uint64, err error)

func (*Syncer) SetupProducer

func (s *Syncer) SetupProducer(kafka sarama.Client, stats *Stats) (send func(KeyValue), finish func())

func (Syncer) Sync

func (s Syncer) Sync(kafka sarama.Client, kvSource <-chan KeyValue, cancel <-chan bool) (stats *Stats, err error)

Sync synchronize a key-indexed data source with a topic.

The kvSource channel provides values in the reference store. It MUST NOT produce duplicate keys.

func (Syncer) SyncWithIndex

func (s Syncer) SyncWithIndex(kafka sarama.Client, kvSource <-chan KeyValue, topicIndex diff.Index, cancel <-chan bool) (stats *Stats, err error)

SyncWithIndex synchronize a data source with a topic, using the given index.

The kvSource channel provides values in the reference store. It MUST NOT produce duplicate keys.

func (Syncer) SyncWithPrepopulatedIndex

func (s Syncer) SyncWithPrepopulatedIndex(kafka sarama.Client, kvSource <-chan KeyValue, topicIndex diff.Index, cancel <-chan bool) (stats *Stats, err error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL