core

package
v0.9.71 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 29, 2020 License: Apache-2.0 Imports: 15 Imported by: 2

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func HashConfig

func HashConfig(config string) string

func IsInternalTrafficV2 added in v0.9.47

func IsInternalTrafficV2(db string, tbl string) bool

func MatchTxnTagPipelineName added in v0.9.47

func MatchTxnTagPipelineName(patterns []string, msg *Msg) (string, bool)

func SafeEncodeString added in v0.9.1

func SafeEncodeString(s string) string

Types

type AsynchronousOutput

type AsynchronousOutput interface {
	Start(msgAcker MsgAcker) error
	Output
}

type DDLMsg

type DDLMsg struct {
	Statement string
	AST       ast.StmtNode
}

type DMLMsg

type DMLMsg struct {
	Operation DMLOp
	Data      map[string]interface{}
	Old       map[string]interface{}
	Pks       map[string]interface{}
}

type DMLOp

type DMLOp string
const (
	Insert DMLOp = "insert"
	Update DMLOp = "update"
	Delete DMLOp = "delete"
)

type Emitter

type Emitter interface {
	// Emit use fs to modify messages and submit job to scheduler
	// msg is the message to send
	//
	Emit(msg *Msg) error
	Close() error
}

type EmptyRouter added in v0.9.17

type EmptyRouter struct{}

func (EmptyRouter) Exists added in v0.9.17

func (EmptyRouter) Exists(msg *Msg) bool

type IFilter

type IFilter interface {
	Configure(configData map[string]interface{}) error
	Filter(msg *Msg) (continueNext bool, err error)
	Close() error
}

type IFilterFactory

type IFilterFactory interface {
	NewFilter() IFilter
}

type IMatcher

type IMatcher interface {
	Configure(arg interface{}) error
	Match(msg *Msg) bool
}

type IMatcherFactory

type IMatcherFactory interface {
	NewMatcher() IMatcher
}

type IMatcherGroup

type IMatcherGroup []IMatcher

func (IMatcherGroup) Match

func (matcherGroup IMatcherGroup) Match(msg *Msg) bool

Match returns true if all matcher returns true

type Input

type Input interface {
	Start(emitter Emitter, router Router, positionCache position_cache.PositionCacheInterface) error
	Close()
	Stage() config.InputMode
	Done() chan position_repos.Position
	SendDeadSignal() error // for test only
	Wait()
}

type Msg

type Msg struct {
	Phase

	Type     MsgType
	Host     string
	Database string
	Table    string

	DdlMsg *DDLMsg
	DmlMsg *DMLMsg

	//
	// Timestamp, TimeZone, Oplog will be deprecated.
	//
	Timestamp time.Time // event generated at source
	TimeZone  *time.Location
	Oplog     *gtm.Op

	InputStreamKey *string

	// OutputDepHashed defines the dependency of this msg.
	OutputDepHashes []OutputHash
	Done            chan struct{}

	InputSequence *int64

	InputContext        interface{}
	AfterCommitCallback MsgCallbackFunc
	AfterAckCallback    MsgCallbackFunc
}

func (*Msg) BeforeWindowMoveForward added in v0.9.20

func (msg *Msg) BeforeWindowMoveForward()

func (*Msg) EventTime added in v0.9.20

func (msg *Msg) EventTime() time.Time

func (*Msg) GetPkSign

func (msg *Msg) GetPkSign() string

func (*Msg) ProcessTime added in v0.9.20

func (msg *Msg) ProcessTime() time.Time

func (*Msg) SequenceNumber added in v0.9.20

func (msg *Msg) SequenceNumber() int64

func (*Msg) String added in v0.9.44

func (msg *Msg) String() string

type MsgAcker

type MsgAcker interface {
	AckMsg(msg *Msg) error
}

type MsgCallbackFunc added in v0.9.29

type MsgCallbackFunc func(m *Msg) error

type MsgSubmitter

type MsgSubmitter interface {
	SubmitMsg(msg *Msg) error
}

type MsgType

type MsgType string
const (
	MsgDML MsgType = "dml"
	MsgDDL MsgType = "ddl"

	// ctl message is internal messages like
	// heartbeat, barrier that is not dml/ddl.
	MsgCtl MsgType = "ctl"

	// MsgCloseInputStream is used to tell the scheduler to close a stream
	MsgCloseInputStream MsgType = "closeInput"
)

type Output

type Output interface {
	Execute(msgs []*Msg) error
	GetRouter() Router
	Close()
}

type OutputHash added in v0.9.29

type OutputHash struct {
	Name string
	H    uint64
}

OutputHash defines the hash value of the message's output. Name is used just for better debug/test purpose.

type Phase added in v0.9.20

type Phase struct {
	Start          time.Time
	EnterEmitter   time.Time // also leave input
	LeaveEmitter   time.Time
	EnterScheduler time.Time // also enter submitter
	LeaveScheduler time.Time // also leave acker
	LeaveSubmitter time.Time
	EnterAcker     time.Time // also leave output
	EnterOutput    time.Time
}

type PositionCacheCreator added in v0.9.17

type PositionCacheCreator interface {
	NewPositionCache() (position_cache.PositionCacheInterface, error)
}

type Router added in v0.9.17

type Router interface {
	Exists(msg *Msg) bool
}

type Scheduler

type Scheduler interface {
	MsgSubmitter
	MsgAcker
	Healthy() bool
	Start(output Output) error
	Close()
}

type SynchronousOutput

type SynchronousOutput interface {
	Start() error
	Output
}

type TaskReportStage

type TaskReportStage string
const (
	ReportStageFull        TaskReportStage = "Full"
	ReportStageIncremental TaskReportStage = "Incremental"
)

type TaskReportStatus

type TaskReportStatus struct {
	Name       string          `json:"name"`
	ConfigHash string          `json:"configHash"`
	Position   string          `json:"position"`
	Stage      TaskReportStage `json:"stage"`
	Version    string          `json:"version"`
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL