river

package
v1.0.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 8, 2023 License: MIT Imports: 23 Imported by: 0

README

单元测试环境部署

elasticsearch部署

用于测试的es配置参考/etc/river-test/elasticsearch.yml

network.host: 0.0.0.0

http:
  port: 9200

script.engine.groovy:
  inline:
    update: on
    search: on
  file:
    update: on
    search: on

elasticfence.disabled: true

运行容器

docker run --name es-river-test -p 9200:9200 \
    -v /etc/river-test/elasticsearch.yml:/usr/share/elasticsearch/config/elasticsearch.yml \
    -d img.ones.pro/library/elasticsearch-v2.4.1:v0.0.3-109-g9d19c

mysql部署

用于测试的mysql参考配置/etc/river-test/my.cnf

[mysqld]

port = 3306

init-connect = 'SET NAMES utf8mb4'
character-set-server = utf8mb4
performance_schema_max_table_instances = 200
table_definition_cache=200
table_open_cache=128

character_set_server=utf8mb4
collation_server=utf8mb4_unicode_ci
init_connect='SET NAMES utf8mb4'

skip-name-resolve
back_log = 600
max_connections = 1000
max_connect_errors = 6000
open_files_limit = 65535
max_allowed_packet = 512M
binlog_cache_size = 1M
max_heap_table_size = 8M
tmp_table_size = 16M
read_buffer_size = 2M
read_rnd_buffer_size = 8M
sort_buffer_size = 8M
join_buffer_size = 8M
thread_cache_size = 8
query_cache_size = 8M
query_cache_limit = 2M
key_buffer_size = 4M
ft_min_word_len = 4
transaction_isolation = REPEATABLE-READ


datadir=/var/lib/mysql
socket=/var/lib/mysql/mysql.sock
server-id = 1
log-bin = mysql-bin
log_slave_updates = on
auto_increment_increment=1
auto_increment_offset=1


log_bin = mysql-bin
binlog_format = row
expire_logs_days = 7
slow_query_log = 1
long_query_time = 1
slow_query_log_file = /var/lib/mysql/mysql-slow.log
performance_schema = 0
explicit_defaults_for_timestamp
skip-external-locking
default-storage-engine = InnoDB
innodb_file_per_table = 1
innodb_open_files = 500
innodb_buffer_pool_size = 8G
innodb_write_io_threads = 4
innodb_read_io_threads = 4
innodb_thread_concurrency = 0
innodb_purge_threads = 1
innodb_flush_log_at_trx_commit = 2
innodb_log_buffer_size = 2M
innodb_log_file_size = 32M
innodb_log_files_in_group = 3
innodb_max_dirty_pages_pct = 90
innodb_lock_wait_timeout = 120
bulk_insert_buffer_size = 8M
myisam_sort_buffer_size = 8M
myisam_max_sort_file_size = 10G
myisam_repair_threads = 1
interactive_timeout = 28800
wait_timeout = 28800
net_read_timeout = 600
net_write_timeout = 600


#GTID:
gtid_mode=on
enforce_gtid_consistency=on
session_track_gtids=OWN_GTID
session_track_state_change=ON

#skip-grant-tables

# Disabling symbolic-links is recommended to prevent assorted security risks
symbolic-links=0

log-error=/var/log/mysqld.log
pid-file=/var/run/mysqld/mysqld.pid

启动容器

docker run --name mysql-river-test -p 3306:3306 \
    -v /etc/river-test/my.cnf:/etc/my.cnf \
    -e MYSQL_ROOT_PASSWORD=root -d img.ones.pro/library/mysql:v5.7.30

docker run --name mysql-health-test -p 3307:3306 \
    -v /etc/river-test/my.cnf:/etc/my.cnf \
    -e MYSQL_ROOT_PASSWORD=root -d img.ones.pro/library/mysql:v5.7.30

授权及建库

docker exec mysql-river-test  mysql -uroot -proot -e "grant all privileges  on *.* to bang@'%' identified by \"bang\";flush privileges;create database test;"
docker exec mysql-health-test   mysql -uroot -proot -e "grant all privileges  on *.* to bang@'%' identified by \"bang\";flush privileges;create database test;"

运行单元测试

go test

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrRuleNotExist = errors.New("rule is not exist")

Functions

func EquelMysqlGTIDSet

func EquelMysqlGTIDSet(set1, set2 *mysql.MysqlGTIDSet) bool

func GetPKValues

func GetPKValues(table *schema.Table, row []interface{}) ([]interface{}, error)

Get primary keys in one row for a table, a table may use multi fields as the PK

func GetStopPosByUUIDSet

func GetStopPosByUUIDSet(uuidSet *mysql.UUIDSet) int64

func HtmlStrip

func HtmlStrip(input interface{}) (string, error)

func IsStringSliceDiff

func IsStringSliceDiff(str1 []string, str2 []string) bool

func ParseMysqlGTIDSet

func ParseMysqlGTIDSet(str string) (*mysql.MysqlGTIDSet, error)

Types

type ActionMapping

type ActionMapping struct {
	DBAction       string `toml:"db_action"`
	ESAction       string `toml:"es_action"`
	Script         string `toml:"script"`
	ScriptFile     string `toml:"script_file"`
	ScriptId       string `toml:"script_id"`
	ScriptedUpsert bool   `toml:"scripted_upsert"`
}

type Config

type Config struct {
	MyAddr     string `toml:"my_addr"`
	MyUser     string `toml:"my_user"`
	MyPassword string `toml:"my_pass"`
	MyCharset  string `toml:"my_charset"`

	ESAddr string `toml:"es_addr"`

	StatAddr string `toml:"stat_addr"`

	ServerID uint32 `toml:"server_id"`
	Flavor   string `toml:"flavor"`
	DataDir  string `toml:"data_dir"`

	DumpExec string `toml:"mysqldump"`

	Sources []SourceConfig `toml:"source"`

	Rules []*Rule `toml:"rule"`

	HealthCheckInterval     int    `toml:"health_check_interval"`
	HealthCheckPosThreshold int    `toml:"health_check_pos_threshold"`
	HealthCheckEnable       bool   `toml:"health_check_enable"`
	HealthCheckOuputDir     string `toml:"health_check_output_dir"`
}

func NewConfig

func NewConfig(data string) (*Config, error)

func NewConfigWithFile

func NewConfigWithFile(name string) (*Config, error)

type River

type River struct {
	// contains filtered or unexported fields
}

In Elasticsearch, river is a pluggable service within Elasticsearch pulling data then indexing it into Elasticsearch. We use this definition here too, although it may not run within Elasticsearch. Maybe later I can implement a acutal river in Elasticsearch, but I must learn java. :-)

func NewRiver

func NewRiver(c *Config) (*River, error)

func (*River) Close

func (r *River) Close()

Close closes the River

func (*River) Ctx

func (r *River) Ctx() context.Context

Ctx returns the internal context for outside use.

func (*River) Run

func (r *River) Run() error

type Rule

type Rule struct {
	Schema string `toml:"schema"`
	Table  string `toml:"table"`
	Index  string `toml:"index"`
	Type   string `toml:"type"`
	Parent string `toml:"parent"`

	IDColumns        string `toml:"id_columns"`
	HtmlStripColumns string `toml:"html_strip_columns"`
	JSONColumns      string `toml:"json_columns"`
	// Default, a MySQL table field name is mapped to Elasticsearch field name.
	// Sometimes, you want to use different name, e.g, the MySQL file name is title,
	// but in Elasticsearch, you want to name it my_title.
	FieldMapping map[string]string `toml:"field"`

	// 指定es字段可由其他es字段拼接而成,目前仅支持INSERT数据库事件
	// 后续可优化成 "{{id_key}}{{number}}" 的模版形式以支持更多场景
	ESFieldMapping map[string]string `toml:"es_field"`

	// MySQL table information
	TableInfo *schema.Table

	CustomActionMapping []*ActionMapping `toml:"action"`

	ActionMapping map[string]*ActionMapping
}

If you want to sync MySQL data into elasticsearch, you must set a rule to let use know how to do it. The mapping rule may thi: schema + table <-> index + document type. schema and table is for MySQL, index and document type is for Elasticsearch.

type SourceConfig

type SourceConfig struct {
	Schema string   `toml:"schema"`
	Tables []string `toml:"tables"`
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL