sink

package
v0.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 5, 2019 License: Apache-2.0 Imports: 14 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Shard

func Shard(rows []client.Row, destination Destination, jobConfig *rules.JobConfig) (map[uint32][]client.Row, int)

Types

type AresDatabase

type AresDatabase struct {
	ServiceConfig config.ServiceConfig
	JobConfig     *rules.JobConfig
	Scope         tally.Scope
	ClusterName   string
	Connector     client.Connector
}

AresDatabase is an implementation of Database interface for saving data to ares

func (*AresDatabase) Cluster

func (db *AresDatabase) Cluster() string

Cluster returns the DB cluster name

func (*AresDatabase) Save

func (db *AresDatabase) Save(destination Destination, rows []client.Row) error

Save saves a batch of row objects into a destination

func (*AresDatabase) Shutdown

func (db *AresDatabase) Shutdown()

Shutdown will clean up resources that needs to be cleaned up

type Destination

type Destination struct {
	// Table is table name
	Table string
	// ColumnNames are the list of column names after sorted
	ColumnNames []string
	// PrimaryKeys maps primary key columnName to its columnID after sorted
	PrimaryKeys map[string]int
	// PrimaryKeysInSchema maps primary key columnName to its columnID defined in schema
	PrimaryKeysInSchema map[string]int
	// AresUpdateModes defines update modes
	AresUpdateModes []memCom.ColumnUpdateMode
	// NumShards is the number of shards in the aresDB cluster
	NumShards uint32
}

Destination contains the table and columns that each job is storing data into also records the behavior when encountering key errors

type KafkaPublisher

type KafkaPublisher struct {
	sarama.SyncProducer
	client.UpsertBatchBuilder

	ServiceConfig config.ServiceConfig
	JobConfig     *rules.JobConfig
	Scope         tally.Scope
	ClusterName   string
}

func (*KafkaPublisher) Cluster

func (kp *KafkaPublisher) Cluster() string

Cluster returns the DB cluster name

func (*KafkaPublisher) Save

func (kp *KafkaPublisher) Save(destination Destination, rows []client.Row) error

Save saves a batch of row objects into a destination

func (*KafkaPublisher) Shutdown

func (kp *KafkaPublisher) Shutdown()

Shutdown will clean up resources that needs to be cleaned up

type Sink

type Sink interface {
	// Cluster returns the DB cluster name
	Cluster() string

	// Save will save the rows into underlying database
	Save(destination Destination, rows []client.Row) error

	// Shutdown will close the connections to the database
	Shutdown()
}

Sink is abstraction for interactions with downstream storage layer

func NewAresDatabase

func NewAresDatabase(
	serviceConfig config.ServiceConfig, jobConfig *rules.JobConfig, cluster string,
	sinkCfg config.SinkConfig, aresControllerClient controllerCli.ControllerClient) (Sink, error)

NewAresDatabase initialize an AresDatabase cluster

func NewKafkaPublisher

func NewKafkaPublisher(serviceConfig config.ServiceConfig, jobConfig *rules.JobConfig, cluster string,
	sinkCfg config.SinkConfig, aresControllerClient controllerCli.ControllerClient) (Sink, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL