telly

package
v1.2.9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 1, 2022 License: BSD-3-Clause Imports: 9 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type InsertHook

type InsertHook func(key []byte, value interface{}) interface{}

type Option

type Option func(*options)

Option defines an option to modify the behavior of telly

func WithClientID

func WithClientID(clientID string) Option

WithClientID overwrites the default client ID to create consumers for sarama

func WithConsumerSaramaBuilder

func WithConsumerSaramaBuilder(cgb goka.SaramaConsumerBuilder) Option

WithConsumerSaramaBuilder replaces the default consumer group builder

func WithInitialLoad

func WithInitialLoad(loadFromPast time.Duration) Option

WithInitialLoad adds loading old data from the topic initially into rethinkdb. if loadFromPast == -1, it will load from the beginning. if loadFromPast == 0, loading will start from the end of the topic.

func WithInsertHook

func WithInsertHook(hook InsertHook) Option

WithInsertHook adds a hook that gets called on every new message added to the database

func WithPrimaryKey

func WithPrimaryKey(fieldName string) Option

WithPrimaryKey specifies the field-name that is being used as primary key when creating the table. If the table exists and the key is different, Telly will return an error, so changing the key is impossible without deleting the table first. By default, field name 'id' is used by rethinkdb. If your data type does not provide this field and the primary key is not set, rethinkdb will create a new id, thus making it impossible to overwrite entries. Note that we cannot use the message-key provided by Kafka. If you need to use the key, add an InsertHook which returns a new data structure containing the key as needed.

func WithRetention

func WithRetention(retention time.Duration, updatedFieldName string) Option

WithRetention runs a cleaner go-routine that cleans entries, oder than passed retention updatedFieldName specifies the rethinkdb-field (for nesting, do 'nested.field.timestamp') Note: telly does not check, if this column exists or if it actually contains a valid timestamp. It blindly deletes every row which has a value "older" than the one provided

func WithTester

func WithTester(tt *tester.Tester) Option

WithTester modifies the

func WithTopicManagerBuilder

func WithTopicManagerBuilder(tmb goka.TopicManagerBuilder) Option

WithTopicManagerBuilder replaces the default topic manager.

type Telly

type Telly struct {
	// contains filtered or unexported fields
}

Telly imports from kafka to rethinkdb

func NewTelly

func NewTelly(ctx context.Context, executor rdb.QueryExecutor, dbName string,
	table string, inputTopic string, codec goka.Codec,
	options ...Option) (*Telly, error)

NewTelly creates a new telly importer

func (*Telly) DropOffsets

func (t *Telly) DropOffsets() error

DropOffsets deletes the offsets document to make telly start from scratch

func (*Telly) Executor

func (t *Telly) Executor() rdb.QueryExecutor

func (*Telly) Run

func (t *Telly) Run(ctx context.Context, brokers []string) error

Run starts the importer and cleaner with passed brokers

func (*Telly) Table

func (t *Telly) Table() rdb.Term

Directories

Path Synopsis
cmd

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL