Documentation ¶
Index ¶
- func AutoMigrate(db *gorm.DB) error
- func NewORMClient(selfID model.CaptureID, db *gorm.DB) *ormClient
- func NewUUIDGenerator(config string, db *gorm.DB) uuidGenerator
- type CaptureOb
- func (c *CaptureOb[T]) Advance(cp metadata.CaptureProgress) error
- func (c *CaptureOb[T]) GetChangefeed(cfs ...metadata.ChangefeedUUID) (infos []*metadata.ChangefeedInfo, err error)
- func (c *CaptureOb[T]) GetChangefeedProgress(cfs ...metadata.ChangefeedUUID) (progresses map[metadata.ChangefeedUUID]metadata.ChangefeedProgress, err error)
- func (c *CaptureOb[T]) GetChangefeedState(cfs ...metadata.ChangefeedUUID) (states []*metadata.ChangefeedState, err error)
- func (c *CaptureOb[T]) OnOwnerLaunched(cf metadata.ChangefeedUUID) metadata.OwnerObservation
- func (c *CaptureOb[T]) OwnerChanges() <-chan metadata.ScheduledChangefeed
- func (c *CaptureOb[T]) PostOwnerRemoved(cf metadata.ChangefeedUUID, taskPosition metadata.ChangefeedProgress) error
- func (c *CaptureOb[T]) ProcessorChanges() <-chan metadata.ScheduledChangefeed
- func (c *CaptureOb[T]) Run(egCtx context.Context, ...) (err error)
- type ChangefeedInfoDO
- type ChangefeedStateDO
- type ClientOptionFunc
- type ControllerOb
- func (c *ControllerOb[T]) CleanupChangefeed(cf metadata.ChangefeedUUID) error
- func (c *ControllerOb[T]) CreateChangefeed(cf *metadata.ChangefeedInfo, up *model.UpstreamInfo) (metadata.ChangefeedIdent, error)
- func (c *ControllerOb[T]) GetChangefeedSchedule(cf metadata.ChangefeedUUID) (metadata.ScheduledChangefeed, error)
- func (c *ControllerOb[T]) RefreshCaptures() (captures []*model.CaptureInfo, changed bool)
- func (c *ControllerOb[T]) RemoveChangefeed(cf metadata.ChangefeedUUID) error
- func (c *ControllerOb[T]) ScheduleSnapshot() (ss []metadata.ScheduledChangefeed, cs []*model.CaptureInfo, err error)
- func (c *ControllerOb[T]) SetOwner(target metadata.ScheduledChangefeed) error
- type LeaderChecker
- type OwnerOb
- func (o *OwnerOb[T]) PauseChangefeed() error
- func (o *OwnerOb[T]) ResumeChangefeed() error
- func (o *OwnerOb[T]) Self() metadata.ChangefeedUUID
- func (o *OwnerOb[T]) SetChangefeedFailed(err *model.RunningError) error
- func (o *OwnerOb[T]) SetChangefeedFinished() error
- func (o *OwnerOb[T]) SetChangefeedPending(err *model.RunningError) error
- func (o *OwnerOb[T]) SetChangefeedRemoved() error
- func (o *OwnerOb[T]) SetChangefeedWarning(warn *model.RunningError) error
- func (o *OwnerOb[T]) UpdateChangefeed(info *metadata.ChangefeedInfo) error
- type ProgressDO
- type ScheduleDO
- type TxnAction
- type TxnContext
- type UpstreamDO
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func AutoMigrate ¶
AutoMigrate checks the metadata-related tables and creates or changes the table structure as needed based on in-memory struct definition.
func NewORMClient ¶
NewORMClient creates a new ORM client.
func NewUUIDGenerator ¶
NewUUIDGenerator creates a new UUID generator.
Types ¶
type CaptureOb ¶
type CaptureOb[T TxnContext] struct { // election related fields. metadata.Elector // contains filtered or unexported fields }
CaptureOb is an implement for metadata.CaptureObservation.
func NewCaptureObservation ¶
func NewCaptureObservation( backendDB *sql.DB, selfInfo *model.CaptureInfo, opts ...ClientOptionFunc, ) (*CaptureOb[*gorm.DB], error)
NewCaptureObservation creates a capture observation.
func (*CaptureOb[T]) Advance ¶
func (c *CaptureOb[T]) Advance(cp metadata.CaptureProgress) error
Advance updates the progress of the capture.
func (*CaptureOb[T]) GetChangefeed ¶
func (c *CaptureOb[T]) GetChangefeed(cfs ...metadata.ChangefeedUUID) (infos []*metadata.ChangefeedInfo, err error)
GetChangefeed returns the changefeeds with the given UUIDs.
func (*CaptureOb[T]) GetChangefeedProgress ¶
func (c *CaptureOb[T]) GetChangefeedProgress( cfs ...metadata.ChangefeedUUID, ) (progresses map[metadata.ChangefeedUUID]metadata.ChangefeedProgress, err error)
GetChangefeedProgress returns the progress of the changefeed with the given UUID.
func (*CaptureOb[T]) GetChangefeedState ¶
func (c *CaptureOb[T]) GetChangefeedState(cfs ...metadata.ChangefeedUUID) (states []*metadata.ChangefeedState, err error)
GetChangefeedState returns the state of the changefeed with the given UUID.
func (*CaptureOb[T]) OnOwnerLaunched ¶
func (c *CaptureOb[T]) OnOwnerLaunched(cf metadata.ChangefeedUUID) metadata.OwnerObservation
OnOwnerLaunched is called when the owner of a changefeed is launched.
func (*CaptureOb[T]) OwnerChanges ¶
func (c *CaptureOb[T]) OwnerChanges() <-chan metadata.ScheduledChangefeed
OwnerChanges returns a channel that receives changefeeds when the owner of the changefeed changes.
func (*CaptureOb[T]) PostOwnerRemoved ¶
func (c *CaptureOb[T]) PostOwnerRemoved(cf metadata.ChangefeedUUID, taskPosition metadata.ChangefeedProgress) error
PostOwnerRemoved is called when the owner of a changefeed is removed.
func (*CaptureOb[T]) ProcessorChanges ¶
func (c *CaptureOb[T]) ProcessorChanges() <-chan metadata.ScheduledChangefeed
ProcessorChanges returns a channel that receives changefeeds when the changefeed changes.
type ChangefeedInfoDO ¶
type ChangefeedInfoDO struct { metadata.ChangefeedInfo RemovedAt *time.Time `gorm:"column:removed_at;type:datetime(6)" json:"removed_at"` Version uint64 `gorm:"column:version;type:bigint(20) unsigned;not null" json:"version"` UpdateAt time.Time `gorm:"column:update_at;type:datetime(6);not null;autoUpdateTime" json:"update_at"` }
ChangefeedInfoDO mapped from table <changefeed_info>
func (*ChangefeedInfoDO) GetKey ¶
func (c *ChangefeedInfoDO) GetKey() metadata.ChangefeedUUID
GetKey returns the key of the changefeed info.
func (*ChangefeedInfoDO) GetUpdateAt ¶
func (c *ChangefeedInfoDO) GetUpdateAt() time.Time
GetUpdateAt returns the update time of the changefeed info.
func (*ChangefeedInfoDO) GetVersion ¶
func (c *ChangefeedInfoDO) GetVersion() uint64
GetVersion returns the version of the changefeed info.
func (*ChangefeedInfoDO) TableName ¶
func (*ChangefeedInfoDO) TableName() string
TableName ChangefeedInfo's table name
type ChangefeedStateDO ¶
type ChangefeedStateDO struct { metadata.ChangefeedState Version uint64 `gorm:"column:version;type:bigint(20) unsigned;not null" json:"version"` UpdateAt time.Time `gorm:"column:update_at;type:datetime(6);not null;autoUpdateTime" json:"update_at"` }
ChangefeedStateDO mapped from table <changefeed_state>
func (*ChangefeedStateDO) GetKey ¶
func (c *ChangefeedStateDO) GetKey() metadata.ChangefeedUUID
GetKey returns the key of the changefeed state.
func (*ChangefeedStateDO) GetUpdateAt ¶
func (c *ChangefeedStateDO) GetUpdateAt() time.Time
GetUpdateAt returns the update time of the changefeed state.
func (*ChangefeedStateDO) GetVersion ¶
func (c *ChangefeedStateDO) GetVersion() uint64
GetVersion returns the version of the changefeed state.
func (*ChangefeedStateDO) TableName ¶
func (*ChangefeedStateDO) TableName() string
TableName ChangefeedState's table name
type ClientOptionFunc ¶
type ClientOptionFunc func(*clientOptions)
ClientOptionFunc is the option function for the client.
func WithMaxExecTime ¶
func WithMaxExecTime(d time.Duration) ClientOptionFunc
WithMaxExecTime sets the maximum execution time of the client.
type ControllerOb ¶
type ControllerOb[T TxnContext] struct { // contains filtered or unexported fields }
ControllerOb is an implement for metadata.ControllerObservation.
func (*ControllerOb[T]) CleanupChangefeed ¶
func (c *ControllerOb[T]) CleanupChangefeed(cf metadata.ChangefeedUUID) error
CleanupChangefeed removes the changefeed info, schedule info and state info of the given changefeed. Note that this function should only be called when the owner is removed and changefeed is marked as removed.
func (*ControllerOb[T]) CreateChangefeed ¶
func (c *ControllerOb[T]) CreateChangefeed(cf *metadata.ChangefeedInfo, up *model.UpstreamInfo) (metadata.ChangefeedIdent, error)
CreateChangefeed initializes the changefeed info, schedule info and state info of the given changefeed. It also updates or creates the upstream info depending on whether the upstream info exists.
func (*ControllerOb[T]) GetChangefeedSchedule ¶
func (c *ControllerOb[T]) GetChangefeedSchedule(cf metadata.ChangefeedUUID) (metadata.ScheduledChangefeed, error)
GetChangefeedSchedule Get current schedule of the given changefeed.
func (*ControllerOb[T]) RefreshCaptures ¶
func (c *ControllerOb[T]) RefreshCaptures() (captures []*model.CaptureInfo, changed bool)
RefreshCaptures Fetch the latest capture list in the TiCDC cluster.
func (*ControllerOb[T]) RemoveChangefeed ¶
func (c *ControllerOb[T]) RemoveChangefeed(cf metadata.ChangefeedUUID) error
RemoveChangefeed removes the changefeed info
func (*ControllerOb[T]) ScheduleSnapshot ¶
func (c *ControllerOb[T]) ScheduleSnapshot() (ss []metadata.ScheduledChangefeed, cs []*model.CaptureInfo, err error)
ScheduleSnapshot Get a snapshot of all changefeeds current schedule.
func (*ControllerOb[T]) SetOwner ¶
func (c *ControllerOb[T]) SetOwner(target metadata.ScheduledChangefeed) error
SetOwner Schedule a changefeed owner to a given target.
type LeaderChecker ¶
type LeaderChecker[T TxnContext] interface { TxnWithLeaderLock(ctx context.Context, leaderID string, fn func(T) error) error }
LeaderChecker enables the controller to ensure its leadership during a series of actions.
type OwnerOb ¶
type OwnerOb[T TxnContext] struct { // contains filtered or unexported fields }
OwnerOb is an implement for metadata.OwnerObservation.
func (*OwnerOb[T]) PauseChangefeed ¶
PauseChangefeed pauses a changefeed. nolint:unused
func (*OwnerOb[T]) ResumeChangefeed ¶
ResumeChangefeed resumes a changefeed. nolint:unused
func (*OwnerOb[T]) Self ¶
func (o *OwnerOb[T]) Self() metadata.ChangefeedUUID
Self returns the changefeed info of the owner. nolint:unused
func (*OwnerOb[T]) SetChangefeedFailed ¶
func (o *OwnerOb[T]) SetChangefeedFailed(err *model.RunningError) error
SetChangefeedFailed set the changefeed to state failed. nolint:unused
func (*OwnerOb[T]) SetChangefeedFinished ¶
SetChangefeedFinished set the changefeed to state finished. nolint:unused
func (*OwnerOb[T]) SetChangefeedPending ¶
func (o *OwnerOb[T]) SetChangefeedPending(err *model.RunningError) error
SetChangefeedPending sets the changefeed to state pending. nolint:unused
func (*OwnerOb[T]) SetChangefeedRemoved ¶
SetChangefeedRemoved set the changefeed to state removed. nolint:unused
func (*OwnerOb[T]) SetChangefeedWarning ¶
func (o *OwnerOb[T]) SetChangefeedWarning(warn *model.RunningError) error
SetChangefeedWarning set the changefeed to state warning. nolint:unused
func (*OwnerOb[T]) UpdateChangefeed ¶
func (o *OwnerOb[T]) UpdateChangefeed(info *metadata.ChangefeedInfo) error
UpdateChangefeed updates changefeed metadata, must be called on a paused one. nolint:unused
type ProgressDO ¶
type ProgressDO struct { CaptureID model.CaptureID `gorm:"column:capture_id;type:varchar(128);primaryKey" json:"capture_id"` Progress *metadata.CaptureProgress `gorm:"column:progress;type:longtext" json:"progress"` Version uint64 `gorm:"column:version;type:bigint(20) unsigned;not null" json:"version"` UpdateAt time.Time `gorm:"column:update_at;type:datetime(6);not null;autoUpdateTime" json:"update_at"` }
ProgressDO mapped from table <progress>
func (*ProgressDO) GetKey ¶
func (p *ProgressDO) GetKey() model.CaptureID
GetKey returns the key of the progress.
func (*ProgressDO) GetUpdateAt ¶
func (p *ProgressDO) GetUpdateAt() time.Time
GetUpdateAt returns the update time of the progress.
func (*ProgressDO) GetVersion ¶
func (p *ProgressDO) GetVersion() uint64
GetVersion returns the version of the progress.
func (*ProgressDO) TableName ¶
func (*ProgressDO) TableName() string
TableName Progress's table name
type ScheduleDO ¶
type ScheduleDO struct { metadata.ScheduledChangefeed Version uint64 `gorm:"column:version;type:bigint(20) unsigned;not null" json:"version"` UpdateAt time.Time `gorm:"column:update_at;type:datetime(6);not null;autoUpdateTime" json:"update_at"` }
ScheduleDO mapped from table <schedule>
func (*ScheduleDO) GetKey ¶
func (s *ScheduleDO) GetKey() metadata.ChangefeedUUID
GetKey returns the key of the schedule.
func (*ScheduleDO) GetUpdateAt ¶
func (s *ScheduleDO) GetUpdateAt() time.Time
GetUpdateAt returns the update time of the schedule.
func (*ScheduleDO) GetVersion ¶
func (s *ScheduleDO) GetVersion() uint64
GetVersion returns the version of the schedule.
func (*ScheduleDO) TableName ¶
func (*ScheduleDO) TableName() string
TableName Schedule's table name
type TxnAction ¶
type TxnAction[T TxnContext] func(T) error
TxnAction is a series of operations that can be executed in a transaction and the generic type T represents the transaction context.
Note that in the current implementation the metadata operation and leader check are always in the same transaction context.
type TxnContext ¶
TxnContext is a type set that can be used as the transaction context.
type UpstreamDO ¶
type UpstreamDO struct { ID uint64 `gorm:"column:id;type:bigint(20) unsigned;primaryKey" json:"id"` Endpoints string `gorm:"column:endpoints;type:text;not null" json:"endpoints"` Config *security.Credential `gorm:"column:config;type:text" json:"config"` Version uint64 `gorm:"column:version;type:bigint(20) unsigned;not null" json:"version"` UpdateAt time.Time `gorm:"column:update_at;type:datetime(6);not null;autoUpdateTime" json:"update_at"` }
UpstreamDO mapped from table <upstream>
func (*UpstreamDO) GetKey ¶
func (u *UpstreamDO) GetKey() uint64
GetKey returns the key of the upstream.
func (*UpstreamDO) GetUpdateAt ¶
func (u *UpstreamDO) GetUpdateAt() time.Time
GetUpdateAt returns the update time of the upstream.
func (*UpstreamDO) GetVersion ¶
func (u *UpstreamDO) GetVersion() uint64
GetVersion returns the version of the upstream.
func (*UpstreamDO) TableName ¶
func (*UpstreamDO) TableName() string
TableName Upstream's table name