river

package
v0.0.0-...-91fe0c8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 11, 2018 License: MIT Imports: 27 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrRuleNotExist = errors.New("rule is not exist")

Functions

This section is empty.

Types

type Config

type Config struct {
	MyAddr     string `toml:"my_addr"`
	MyUser     string `toml:"my_user"`
	MyPassword string `toml:"my_pass"`
	MyCharset  string `toml:"my_charset"`

	ESHttps    bool   `toml:"es_https"`
	ESAddr     string `toml:"es_addr"`
	ESUser     string `toml:"es_user"`
	ESPassword string `toml:"es_pass"`

	StatAddr string `toml:"stat_addr"`

	ServerID uint32 `toml:"server_id"`
	Flavor   string `toml:"flavor"`
	DataDir  string `toml:"data_dir"`

	DumpExec       string `toml:"mysqldump"`
	SkipMasterData bool   `toml:"skip_master_data"`

	Sources []SourceConfig `toml:"source"`

	Rules []*Rule `toml:"rule"`

	BulkSize int `toml:"bulk_size"`

	FlushBulkTime TomlDuration `toml:"flush_bulk_time"`

	SkipNoPkTable bool `toml:"skip_no_pk_table"`
}

func NewConfig

func NewConfig(data string) (*Config, error)

func NewConfigWithFile

func NewConfigWithFile(name string) (*Config, error)

type River

type River struct {
	// contains filtered or unexported fields
}

In Elasticsearch, river is a pluggable service within Elasticsearch pulling data then indexing it into Elasticsearch. We use this definition here too, although it may not run within Elasticsearch. Maybe later I can implement a acutal river in Elasticsearch, but I must learn java. :-)

func NewRiver

func NewRiver(c *Config) (*River, error)

func (*River) Close

func (r *River) Close()

func (*River) Ctx

func (r *River) Ctx() context.Context

func (*River) Run

func (r *River) Run() error

Run syncs the data from MySQL and inserts to ES.

type Rule

type Rule struct {
	Schema string   `toml:"schema"`
	Table  string   `toml:"table"`
	Index  string   `toml:"index"`
	Type   string   `toml:"type"`
	Parent string   `toml:"parent"`
	ID     []string `toml:"id"`

	// Default, a MySQL table field name is mapped to Elasticsearch field name.
	// Sometimes, you want to use different name, e.g, the MySQL file name is title,
	// but in Elasticsearch, you want to name it my_title.
	FieldMapping map[string]string `toml:"field"`

	// MySQL table information
	TableInfo *schema.Table

	//only MySQL fields in filter will be synced , default sync all fields
	Filter []string `toml:"filter"`

	// Elasticsearch pipeline
	// To pre-process documents before indexing
	Pipeline string `toml:"pipeline"`
}

If you want to sync MySQL data into elasticsearch, you must set a rule to let use know how to do it. The mapping rule may thi: schema + table <-> index + document type. schema and table is for MySQL, index and document type is for Elasticsearch.

func (*Rule) CheckFilter

func (r *Rule) CheckFilter(field string) bool

type SourceConfig

type SourceConfig struct {
	Schema string   `toml:"schema"`
	Tables []string `toml:"tables"`
}

type TomlDuration

type TomlDuration struct {
	time.Duration
}

func (*TomlDuration) UnmarshalText

func (d *TomlDuration) UnmarshalText(text []byte) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL