relay

package
v1.0.7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 21, 2021 License: Apache-2.0 Imports: 36 Imported by: 2

Documentation

Index

Constants

This section is empty.

Variables

View Source
var NewRelay = NewRealRelay

NewRelay creates an instance of Relay.

Functions

func RegisterMetrics

func RegisterMetrics(registry *prometheus.Registry)

RegisterMetrics register metrics.

Types

type Config

type Config struct {
	EnableGTID  bool     `toml:"enable-gtid" json:"enable-gtid"`
	AutoFixGTID bool     `toml:"auto-fix-gtid" json:"auto-fix-gtid"`
	RelayDir    string   `toml:"relay-dir" json:"relay-dir"`
	ServerID    uint32   `toml:"server-id" json:"server-id"`
	Flavor      string   `toml:"flavor" json:"flavor"`
	Charset     string   `toml:"charset" json:"charset"`
	From        DBConfig `toml:"data-source" json:"data-source"`

	// synchronous start point (if no meta saved before)
	// do not need to specify binlog-pos, because relay will fetch the whole file
	BinLogName string `toml:"binlog-name" json:"binlog-name"`
	BinlogGTID string `toml:"binlog-gtid" json:"binlog-gtid"`

	// for binlog reader retry
	ReaderRetry retry.ReaderRetryConfig `toml:"reader-retry" json:"reader-retry"`
}

Config is the configuration for Relay.

func (*Config) String

func (c *Config) String() string

type DBConfig

type DBConfig struct {
	Host     string `toml:"host" json:"host"`
	User     string `toml:"user" json:"user"`
	Password string `toml:"password" json:"-"` // omit it for privacy
	Port     int    `toml:"port" json:"port"`
}

DBConfig is the DB configuration.

type LocalMeta

type LocalMeta struct {
	sync.RWMutex

	BinLogName string `toml:"binlog-name" json:"binlog-name"`
	BinLogPos  uint32 `toml:"binlog-pos" json:"binlog-pos"`
	BinlogGTID string `toml:"binlog-gtid" json:"binlog-gtid"`
	// contains filtered or unexported fields
}

LocalMeta implements Meta by save info in local

func (*LocalMeta) AddDir

func (lm *LocalMeta) AddDir(serverUUID string, newPos *mysql.Position, newGTID gtid.Set) error

AddDir implements Meta.AddDir

func (*LocalMeta) AdjustWithStartPos

func (lm *LocalMeta) AdjustWithStartPos(binlogName string, binlogGTID string, enableGTID bool, latestBinlogName string, latestBinlogGTID string) (bool, error)

AdjustWithStartPos implements Meta.AdjustWithStartPos, return whether adjusted

func (*LocalMeta) Dir

func (lm *LocalMeta) Dir() string

Dir implements Meta.Dir

func (*LocalMeta) Dirty

func (lm *LocalMeta) Dirty() bool

Dirty implements Meta.Dirty

func (*LocalMeta) Flush

func (lm *LocalMeta) Flush() error

Flush implements Meta.Flush

func (*LocalMeta) GTID

func (lm *LocalMeta) GTID() (string, gtid.Set)

GTID implements Meta.GTID

func (*LocalMeta) Load

func (lm *LocalMeta) Load() error

Load implements Meta.Load

func (*LocalMeta) Pos

func (lm *LocalMeta) Pos() (string, mysql.Position)

Pos implements Meta.Pos

func (*LocalMeta) Save

func (lm *LocalMeta) Save(pos mysql.Position, gset gtid.Set) error

Save implements Meta.Save

func (*LocalMeta) String

func (lm *LocalMeta) String() string

String implements Meta.String

func (*LocalMeta) TrimUUIDs

func (lm *LocalMeta) TrimUUIDs() ([]string, error)

TrimUUIDs implements Meta.TrimUUIDs

func (*LocalMeta) UUID

func (lm *LocalMeta) UUID() string

UUID implements Meta.UUID

type Meta

type Meta interface {
	// Load loads meta information for the recently active server
	Load() error

	// AdjustWithStartPos adjusts current pos / GTID with start pos
	// if current pos / GTID is meaningless, update to start pos or last pos when start pos is meaningless
	// else do nothing
	AdjustWithStartPos(binlogName string, binlogGTID string, enableGTID bool, latestBinlogName string, latestBinlogGTID string) (bool, error)

	// Save saves meta information
	Save(pos mysql.Position, gset gtid.Set) error

	// Flush flushes meta information
	Flush() error

	// Dirty checks whether meta in memory is dirty (need to Flush)
	Dirty() bool

	// AddDir adds sub relay directory for server UUID (without suffix)
	// the added sub relay directory's suffix is incremented
	// after sub relay directory added, the internal binlog pos should be reset
	// and binlog pos will be set again when new binlog events received
	// @serverUUID should be a server_uuid for MySQL or MariaDB
	// if set @newPos / @newGTID, old value will be replaced
	AddDir(serverUUID string, newPos *mysql.Position, newGTID gtid.Set) error

	// Pos returns current (UUID with suffix, Position) pair
	Pos() (string, mysql.Position)

	// GTID returns current (UUID with suffix, GTID) pair
	GTID() (string, gtid.Set)

	// UUID returns current UUID (with suffix)
	UUID() string

	// TrimUUIDs trim invalid UUIDs from memory and update the server-uuid.index file
	// return trimmed UUIDs
	TrimUUIDs() ([]string, error)

	// Dir returns current relay log (sub) directory
	Dir() string

	// String returns string representation of current meta info
	String() string
}

Meta represents binlog meta information for sync source when re-syncing, we should reload meta info to guarantee continuous transmission in order to support master-slave switching, Meta should support switching binlog meta info to newer master should support the case, where switching from A to B, then switching from B back to A

func NewLocalMeta

func NewLocalMeta(flavor, baseDir string) Meta

NewLocalMeta creates a new LocalMeta

type Process

type Process interface {
	// Init initial relat log unit
	Init(ctx context.Context) (err error)
	// Process run background logic of relay log unit
	Process(ctx context.Context, pr chan pb.ProcessResult)
	// SwitchMaster switches relay's master server
	SwitchMaster(ctx context.Context, req *pb.SwitchRelayMasterRequest) error
	// Migrate  resets  binlog position
	Migrate(ctx context.Context, binlogName string, binlogPos uint32) error
	// ActiveRelayLog returns the earliest active relay log info in this operator
	ActiveRelayLog() *pkgstreamer.RelayLogInfo
	// Reload reloads config
	Reload(newCfg *Config) error
	// Update updates config
	Update(cfg *config.SubTaskConfig) error
	// Resume resumes paused relay log process unit
	Resume(ctx context.Context, pr chan pb.ProcessResult)
	// Pause pauses a running relay log process unit
	Pause()
	// Error returns error message if having one
	Error() interface{}
	// Status returns status of relay log process unit
	Status() interface{}
	// Close does some clean works
	Close()
	// IsClosed returns whether relay log process unit was closed
	IsClosed() bool
}

Process defines mysql-like relay log process unit

func NewRealRelay

func NewRealRelay(cfg *Config) Process

NewRealRelay creates an instance of Relay.

type Relay

type Relay struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

Relay relays mysql binlog to local file.

func (*Relay) ActiveRelayLog

func (r *Relay) ActiveRelayLog() *pkgstreamer.RelayLogInfo

ActiveRelayLog returns the current active RelayLogInfo

func (*Relay) Close

func (r *Relay) Close()

Close implements the dm.Unit interface.

func (*Relay) Error

func (r *Relay) Error() interface{}

Error implements the dm.Unit interface.

func (*Relay) Init

func (r *Relay) Init(ctx context.Context) (err error)

Init implements the dm.Unit interface.

func (*Relay) IsClosed

func (r *Relay) IsClosed() bool

IsClosed tells whether Relay unit is closed or not.

func (*Relay) IsFreshTask

func (r *Relay) IsFreshTask() (bool, error)

IsFreshTask implements Unit.IsFreshTask

func (*Relay) Migrate

func (r *Relay) Migrate(ctx context.Context, binlogName string, binlogPos uint32) error

Migrate reset binlog pos and name, create sub dir

func (*Relay) Pause

func (r *Relay) Pause()

Pause pauses the process, it can be resumed later

func (*Relay) Process

func (r *Relay) Process(ctx context.Context, pr chan pb.ProcessResult)

Process implements the dm.Unit interface.

func (*Relay) Reload

func (r *Relay) Reload(newCfg *Config) error

Reload updates config

func (*Relay) Resume

func (r *Relay) Resume(ctx context.Context, pr chan pb.ProcessResult)

Resume resumes the paused process

func (*Relay) Status

func (r *Relay) Status() interface{}

Status implements the dm.Unit interface.

func (*Relay) SwitchMaster

func (r *Relay) SwitchMaster(ctx context.Context, req *pb.SwitchRelayMasterRequest) error

SwitchMaster switches relay's master server before call this from dmctl, you must ensure that relay catches up previous master we can not check this automatically in this func because master already changed switch master server steps:

  1. use dmctl to pause relay
  2. ensure relay catching up current master server (use `query-status`)
  3. switch master server for upstream * change relay's master config, TODO * change master behind VIP
  4. use dmctl to switch relay's master server (use `switch-relay-master`)
  5. use dmctl to resume relay

func (*Relay) Type

func (r *Relay) Type() pb.UnitType

Type implements the dm.Unit interface.

func (*Relay) Update

func (r *Relay) Update(cfg *config.SubTaskConfig) error

Update implements Unit.Update

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL