config

package
v0.9.71 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 29, 2020 License: Apache-2.0 Imports: 11 Imported by: 3

Documentation

Index

Constants

View Source
const PipelineConfigV3Version = "1.0"

Variables

View Source
var (
	TxnBufferLimit = 1024
	MaxNrGravity   = 16
)
View Source
var DefaultBinlogSyncerTimeout = "10s"

Functions

This section is empty.

Types

type Config added in v0.9.10

type Config struct {
	*flag.FlagSet `json:"-"`

	EtcdEndpoints string `toml:"etcd-endpoints" json:"etcd-endpoints"`

	PipelineConfig PipelineConfigV3 `toml:"pipeline" json:"pipeline"`

	// Log related configuration.
	Log logutil.LogConfig `toml:"log" json:"log"`

	HttpAddr string `toml:"http-addr" json:"http-addr"`

	PositionFile         string `toml:"position-file" json:"position-file"`
	BlockProfileRate     int    ` toml:"-" json:"-"`
	MutexProfileFraction int    ` toml:"-" json:"-"`
	ConfigFile           string `toml:"-" json:"-"`
	ClearPosition        bool   `toml:"-" json:"-"`
	Version              bool
}

Config is the configuration.

func LoadConfigFromFile added in v0.9.10

func LoadConfigFromFile(path string) *Config

func NewConfig added in v0.9.10

func NewConfig() *Config

NewConfig creates a new config.

func NewConfigFromString

func NewConfigFromString(configString string) (*Config, error)

func (*Config) ConfigFromFile added in v0.9.10

func (c *Config) ConfigFromFile(path string) error

ConfigFromFile loads config from file.

func (*Config) ParseCmd added in v0.9.10

func (c *Config) ParseCmd(arguments []string) error

ParseCmd parses flag definitions from argument list

type DBConfig added in v0.9.44

type DBConfig struct {
	Host     string `toml:"host" json:"host" mapstructure:"host"`
	Location string `toml:"location" json:"location" mapstructure:"location"`
	Username string `toml:"username" json:"username" mapstructure:"username"`
	Password string `toml:"password" json:"password" mapstructure:"password"`
	Port     int    `toml:"port" json:"port" mapstructure:"port"`
	Schema   string `toml:"schema" json:"schema" mapstructure:"schema"`
	// Timeout for establishing connections, aka dial timeout.
	// The value must be a decimal number with a unit suffix ("ms", "s", "m", "h"), such as "30s", "0.5m" or "1m30s".
	Timeout string `toml:"timeout" json:"timeout" mapstructure:"timeout"`
	// I/O read timeout.
	// The value must be a decimal number with a unit suffix ("ms", "s", "m", "h"), such as "30s", "0.5m" or "1m30s".
	ReadTimeout string `toml:"read-timeout" json:"read-timeout" mapstructure:"read-timeout"`

	// I/O write timeout.
	// The value must be a decimal number with a unit suffix ("ms", "s", "m", "h"), such as "30s", "0.5m" or "1m30s".
	WriteTimeout string `toml:"write-timeout" json:"write-timeout" mapstructure:"write-timeout"`

	MaxIdle                int           `toml:"max-idle" json:"max-idle" mapstructure:"max-idle"`
	MaxOpen                int           `toml:"max-open" json:"max-open" mapstructure:"max-open"`
	MaxLifeTimeDurationStr string        `toml:"max-life-time-duration" json:"max-life-time-duration" mapstructure:"max-life-time-duration"`
	MaxLifeTimeDuration    time.Duration `toml:"-" json:"-" mapstructure:"-"`
}

DBConfig is the DB configuration.

func (*DBConfig) ValidateAndSetDefault added in v0.9.44

func (dbc *DBConfig) ValidateAndSetDefault() error

type Fetch

type Fetch struct {
	Min     int32 `toml:"min" json:"min"`
	Default int32 `toml:"default" json:"default"`
	Max     int32 `toml:"max" json:"max"`
}

type Flush

type Flush struct {
	Bytes       int    `mapstructure:"bytes" toml:"bytes" json:"bytes"`
	Messages    int    `mapstructure:"messages" toml:"messages" json:"messages"`
	Frequency   string `mapstructure:"frequency" toml:"frequency" json:"frequency"`
	MaxMessages int    `mapstructure:"max-messages" toml:"max-messages" json:"max-messages"`
}

type GenericPluginConfig added in v0.9.35

type GenericPluginConfig struct {
	Type   string                 `yaml:"type"  json:"type"  toml:"type"`
	Config map[string]interface{} `yaml:"config"  json:"config,omitempty"  toml:"config,omitempty"`
}

type GtmConfig added in v0.9.10

type GtmConfig struct {
	UseBufferDuration bool `mapstructure:"use-buffer-duration" toml:"use-buffer-duration" json:"use-buffer-duration"`
	BufferSize        int  `mapstructure:"buffer-size" toml:"buffer-size" json:"buffer-size"`
	ChannelSize       int  `mapstructure:"channel-size" toml:"channel-size" json:"channel-size"`
	BufferDurationMs  int  `mapstructure:"buffer-duration-ms" toml:"buffer-duration-ms" json:"buffer-duration-ms"`
}

type InputConfig added in v0.9.10

type InputConfig struct {
	Type   string                 `yaml:"type"  json:"type"  toml:"type"`
	Mode   InputMode              `yaml:"mode" json:"mode" toml:"mode"`
	Config map[string]interface{} `yaml:"config"  json:"config"  toml:"config"`
}

type InputMode added in v0.9.10

type InputMode string
const (
	Unknown     InputMode = "unknown"
	Batch       InputMode = "batch"
	Stream      InputMode = "stream"
	Replication InputMode = "replication" // scan + binlog
)

func (InputMode) Valid added in v0.9.17

func (mode InputMode) Valid() error

type KafkaCommonConfig

type KafkaCommonConfig struct {
	ClientID          string `mapstructure:"client-id" toml:"client-id" json:"client-id"`
	ChannelBufferSize int    `mapstructure:"channel-buffer-size" toml:"channel-buffer-size" json:"channel-buffer-size"`
}

type KafkaConsumerConfig

type KafkaConsumerConfig struct {
	MaxWaitTime string `mapstructure:"max-wait-time" toml:"max-wait-time" json:"max-wait-time"`
	// Fetch is the namespace for controlling how many bytes are retrieved by any
	// given request.
	Fetch Fetch `mapstructure:"fetch" toml:"fetch" json:"fetch"`

	Offsets Offsets `mapstructure:"offsets" toml:"offsets" json:"offsets"`
}

type KafkaGlobalConfig

type KafkaGlobalConfig struct {
	BrokerAddrs []string             `mapstructure:"broker-addrs" toml:"broker-addrs" json:"broker-addrs"`
	CertFile    string               `mapstructure:"cert-file" toml:"cert-file" json:"cert-file"`
	KeyFile     string               `mapstructure:"key-file" toml:"key-file" json:"key-file"`
	CaFile      string               `mapstructure:"ca-file" toml:"ca-file" json:"ca-file"`
	VerifySSL   bool                 `mapstructure:"verify-ssl" toml:"verify-ssl" json:"verify-ssl"`
	Mode        string               `mapstructure:"mode" toml:"mode" json:"mode"`
	Producer    *KafkaProducerConfig `mapstructure:"producer" toml:"producer" json:"producer"`
	Net         *KafkaNetConfig      `mapstructure:"net" toml:"net" json:"net"`
}

type KafkaNetConfig

type KafkaNetConfig struct {
	// SASL based authentication with broker. While there are multiple SASL authentication methods
	// the current implementation is limited to plaintext (SASL/PLAIN) authentication
	SASL SASL `mapstructure:"sasl" toml:"sasl" json:"sasl"`

	// KeepAlive specifies the keep-alive period for an active network connection.
	// If zero, keep-alives are disabled. (default is 0: disabled).
	KeepAlive time.Duration
}

type KafkaProducerConfig

type KafkaProducerConfig struct {
	Flush Flush `mapstructure:"flush" toml:"flush" json:"flush"`
}

type MongoConfigs added in v0.9.10

type MongoConfigs struct {
	MongoSources   []MongoSource    `toml:"mongo-sources" json:"mongo-sources"`
	PositionSource *MongoConnConfig `toml:"position-conn" json:"position-conn"`
	GtmConfig      *GtmConfig       `toml:"gtm-config" json:"gtm-config"`
}

type MongoConnConfig added in v0.9.10

type MongoConnConfig struct {
	Host     string `mapstructure:"host" toml:"host" json:"host"`
	Port     int    `mapstructure:"port" toml:"port" json:"port"`
	Username string `mapstructure:"username" toml:"username" json:"username"`
	Password string `mapstructure:"password" toml:"password" json:"password"`
	Database string `mapstructure:"database" toml:"database" json:"database"`
	Direct   bool   `mapstructure:"direct" toml:"direct" json:"direct"`
}

func (MongoConnConfig) URI added in v0.9.44

func (cfg MongoConnConfig) URI() string

type MongoPosition added in v0.9.10

type MongoPosition bson.MongoTimestamp

type MongoSource added in v0.9.10

type MongoSource struct {
	MongoConnConfig *MongoConnConfig `mapstructure:"source" toml:"source" json:"source"`
	StartPosition   *MongoPosition   `mapstructure:"start-position" toml:"start-position" json:"start-position"`
}

type MySQLBinlogPosition added in v0.9.44

type MySQLBinlogPosition struct {
	BinLogFileName string `toml:"binlog-name" json:"binlog-name" mapstructure:"binlog-name"`
	BinLogFilePos  uint32 `toml:"binlog-pos" json:"binlog-pos" mapstructure:"binlog-pos"`
	BinlogGTID     string `toml:"binlog-gtid" json:"binlog-gtid" mapstructure:"binlog-gtid"`
}

type MySQLConfig added in v0.9.10

type MySQLConfig struct {
	IgnoreBiDirectionalData bool                 `mapstructure:"ignore-bidirectional-data" toml:"ignore-bidirectional-data" json:"ignore-bidirectional-data"`
	Source                  *DBConfig            `mapstructure:"source" toml:"source" json:"source"`
	SourceSlave             *DBConfig            `mapstructure:"source-slave" toml:"source-slave" json:"source-slave"`
	StartPosition           *MySQLBinlogPosition `mapstructure:"start-position" toml:"start-position" json:"start-position"`
}

type Offsets

type Offsets struct {
	CommitInterval string `mapstructure:"commit-interval" toml:"commit-interval" json:"commit-interval"`
}

type PipelineConfig added in v0.9.10

type PipelineConfig struct {
	PipelineName string `toml:"name" json:"name"`

	// Deprecated!
	// DetectTxn txn is used in: bi-directional transfer, dynamic route
	DetectTxn bool `toml:"detect-txn" json:"detect-txn"`

	// UniqueSourceName name of the server
	UniqueSourceName string `toml:"unique-source-name" json:"unique-source-name"`

	Input        string `toml:"input" json:"input"`
	Output       string `toml:"output" json:"output"`
	OutputFormat string `toml:"output-format" json:"output-format"`

	MongoConfig      *MongoConfigs     `toml:"mongo" json:"mongo"`
	MySQLConfig      *MySQLConfig      `toml:"mysql" json:"mysql"`
	SourceTiDBConfig *SourceTiDBConfig `toml:"source-tidb" json:"source-tidb"`
	SourceProbeCfg   *SourceProbeCfg   `toml:"source-probe-config" json:"source-probe-config"`

	KafkaGlobalConfig *KafkaGlobalConfig `toml:"kafka-global" json:"kafka-global"`

	//
	// RouteMode, DynamicKafkaRouteConfig, StaticKafkaRouteConfig, and DBRoutes
	// are route related configuration
	RouteMode string `toml:"route-mode" json:"route-mode"`

	TableConfig []*TableConfig `toml:"table-config" json:"table-config"`

	TargetMySQL *DBConfig `toml:"target-mysql" json:"target-mysql"`

	TargetMySQLWorkerCfg *TargetMySQLWorkerConfig `toml:"target-mysql-worker" json:"target-mysql-worker"`

	//
	// internal configurations that is not exposed to users
	//
	DisableBinlogChecker bool   `toml:"-" json:"-"`
	DebugBinlog          bool   `toml:"-" json:"-"`
	BinlogSyncerTimeout  string `toml:"-" json:"-"`
}

type PipelineConfigV2 added in v0.9.10

type PipelineConfigV2 struct {
	PipelineName     string                 `mapstructure:"name" toml:"name" json:"name"`
	InputPlugins     map[string]interface{} `toml:"input" json:"input" mapstructure:"input"`
	FilterPlugins    []interface{}          `mapstructure:"filters" toml:"filters" json:"filters,omitempty"`
	OutputPlugins    map[string]interface{} `mapstructure:"output" toml:"output" json:"output"`
	SchedulerPlugins map[string]interface{} `mapstructure:"scheduler" toml:"scheduler" json:"scheduler,omitempty"`
}

func DecodeTomlString added in v0.9.10

func DecodeTomlString(s string) (*PipelineConfigV2, error)

func (*PipelineConfigV2) IsV3 added in v0.9.10

func (c *PipelineConfigV2) IsV3() bool

func (*PipelineConfigV2) ToV3 added in v0.9.10

type PipelineConfigV3 added in v0.9.10

type PipelineConfigV3 struct {
	PipelineName    string                `yaml:"name" toml:"name" json:"name"`
	InternalDBName  string                `yaml:"internal-db-name" toml:"internal-db-name" json:"internal-db-name"`
	Version         string                `yaml:"version" toml:"version" json:"version"`
	InputPlugin     InputConfig           `yaml:"input" toml:"input" json:"input"`
	FilterPlugins   []GenericPluginConfig `yaml:"filters" toml:"filters" json:"filters,omitempty"`
	OutputPlugin    GenericPluginConfig   `yaml:"output" toml:"output" json:"output"`
	SchedulerPlugin *GenericPluginConfig  `yaml:"scheduler" toml:"scheduler" json:"scheduler,omitempty"`
}

func (*PipelineConfigV3) DeepCopy added in v0.9.10

func (c *PipelineConfigV3) DeepCopy() PipelineConfigV3

func (*PipelineConfigV3) SetDefault added in v0.9.10

func (c *PipelineConfigV3) SetDefault()

type SASL

type SASL struct {
	Enable   bool   `mapstructure:"enable" toml:"enable" json:"enable"`
	User     string `mapstructure:"user" toml:"user" json:"user"`
	Password string `mapstructure:"password" toml:"password" json:"password"`
}

type SourceKafkaConfig added in v0.9.10

type SourceKafkaConfig struct {
	BrokerConfig KafkaGlobalConfig    `mapstructure:"brokers" toml:"brokers" json:"brokers"`
	GroupID      string               `mapstructure:"group-id" toml:"group-id" json:"group-id"`
	Topics       []string             `mapstructure:"topics" toml:"topics" json:"topics"`
	ConsumeFrom  string               `mapstructure:"consume-from" toml:"consume-from" json:"consume-from"`
	Common       KafkaCommonConfig    `mapstructure:"common" toml:"common" json:"common"`
	Consumer     *KafkaConsumerConfig `mapstructure:"consumer" toml:"consumer" json:"consumer"`
}

type SourceProbeCfg added in v0.9.10

type SourceProbeCfg struct {
	SourceMySQL *DBConfig `mapstructure:"mysql" toml:"mysql" json:"mysql"`
	Annotation  string    `mapstructure:"annotation" toml:"annotation" json:"annotation"`
}

type SourceTiDBConfig added in v0.9.10

type SourceTiDBConfig struct {
	SourceDB    *DBConfig          `mapstructure:"source-db" toml:"source-db" json:"source-db"`
	SourceKafka *SourceKafkaConfig `mapstructure:"source-kafka" toml:"source-kafka" json:"source-kafka"`
	// OffsetStoreConfig       *SourceProbeCfg    `mapstructure:"offset-store" toml:"offset-store" json:"offset-store"`
	PositionRepo            *GenericPluginConfig `mapstructure:"position-repo" toml:"position-repo" json:"position-repo"`
	IgnoreBiDirectionalData bool                 `mapstructure:"ignore-bidirectional-data" toml:"ignore-bidirectional-data" json:"ignore-bidirectional-data"`
}

type TableConfig

type TableConfig struct {
	Schema string `toml:"schema" json:"schema"`
	Table  string `toml:"table" json:"table"`

	RenameColumns map[string]string `toml:"rename-columns" json:"rename-columns"`
	IgnoreColumns []string          `toml:"ignore-columns" json:"ignore-columns"`

	PkOverride []string `toml:"pk-override" json:"pk-override"`

	ScanColumn string `toml:"scan-column" json:"scan-column"`
	ScanType   string `toml:"scan-type" json:"scan-type"`
}

type TargetMySQLWorkerConfig

type TargetMySQLWorkerConfig struct {
	EnableDDL          bool     `toml:"enable-ddl" json:"enable-ddl"`
	UseBidirection     bool     `toml:"use-bidirection" json:"use-bidirection"`
	UseShadingProxy    bool     `toml:"use-shading-proxy" json:"use-shading-proxy"`
	SQLExecutionEngine string   `toml:"sql-execution-engine" json:"sql-execution-engine"`
	Plugins            []string `toml:"plugins" json:"plugins"`
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL