kafkamysql

package module
v0.0.0-...-9f72696 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 24, 2017 License: MIT Imports: 13 Imported by: 0

README

Kafka -> Mysql Build Status

This package simplifies loading kafka events into mysql compliant databases. You can use it as a standalone cli tool or as a package within your application.

CLI

./kafka-mysql --brokers=127.0.0.1:9092,127.0.0.2:9092 --group=test-consumer --topics=topic_a --fields=field_a,field_b --mysql-database=test --mysql-table=test_data
Options
  • - -brokers List of kafka brokers
  • - -zookeepers List of zookeeper nodes (used to fetch kafka brokers if thery are not specified with --brokers parameter), default: 127.0.0.1:2181
  • - -group Consumer group name
  • - -topics List of kafka topics
  • - -fields List of fields you want to load to database
  • - -mysql-host Mysql host, default: 127.0.0.1
  • - -mysql-port Mysql port, default: 3306
  • - -mysql-user Mysql user, default: root
  • - -mysql-password Mysql password
  • - -mysql-database Mysql database name
  • - -mysql-table Mysql table name
  • - -upsert-interval Mysql upstert query interval (milliseconds), default: 2000ms
  • - -upsert-size Number of events in one upsert query, default: 4000
  • - -initial-offset Initial consumer group offset [newest, oldest], default: newest
  • - -fetch-size Kafka consumer default fetch size (bytes), default: 1MB
  • - -connection-timeout Kafka connection timeout (seconds), default: 1s
  • - -max-retries Number of retries if query goes wrong, default: 3

Documentation

Index

Constants

This section is empty.

Variables

View Source
var Logger = log.New(os.Stdout, "[kafka-mysql] ", log.Flags())
View Source
var Version = "0.1.0"

Functions

func ParseJson

func ParseJson(events [][]byte) ([]map[string]interface{}, error)

Types

type Config

type Config struct {
	ConfigFile string

	Fields            []string
	ConnectionTimeout int `json:"connection_timeout"`
	UpsertInterval    int `json:"upsert_interval"`
	UpsertSize        int `json:"upsert_size"`
	MaxRetries        int `json:"max_retries"`
	Mysql             *MysqlConfig
	Kafka             *KafkaConfig
	// contains filtered or unexported fields
}

func NewConfig

func NewConfig() *Config

func (*Config) ParseFlags

func (c *Config) ParseFlags() error

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

func NewConsumer

func NewConsumer(config *Config) (*Consumer, error)

func (*Consumer) Batches

func (c *Consumer) Batches(handler func([][]byte) error)

func (*Consumer) Close

func (c *Consumer) Close() error

type KafkaConfig

type KafkaConfig struct {
	Brokers       []string
	Zookeepers    []string
	Topics        []string
	ConsumerGroup string `json:"consumer_group"`
	InitialOffset string `json:"initial_offset"`
	FetchSize     int    `json:"fetch_size"`
}

type Loader

type Loader struct {
	// contains filtered or unexported fields
}

func NewLoader

func NewLoader(config *Config) (*Loader, error)

func (*Loader) Upsert

func (l *Loader) Upsert(rows []map[string]interface{}) (int64, error)

type MysqlConfig

type MysqlConfig struct {
	Host     string
	Port     int
	User     string
	Password string
	Database string
	Table    string
}

Directories

Path Synopsis
cmd

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL