orchestrator

package
v0.0.0-...-1b33b2a Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 5, 2024 License: Apache-2.0 Imports: 22 Imported by: 0

Documentation

Overview

Package orchestrator mainly implements a ETCD worker. A ETCD worker is used to read/write data from ETCD servers based on snapshot and data patches. Here is a detailed description of how the ETCD worker works:

			   ETCD Servers
				|       ^
				|       |
	   1. Watch |       | 5. Txn
				|       |
				v       |
			    EtcdWorker
				|       ^
				|       |
	   2. Update|       | 4. DataPatch
	   +--------+       +-------+
	   |                        |
	   |                        |
	   v         3.Tick         |
	ReactorState ----------> Reactor

 1. EtcdWorker watches the txn modification log from ETCD servers
 2. EtcdWorker updates the txn modification listened from ETCD servers by calling the Update function of ReactorState
 3. EtcdWorker calls the Tick function of Reactor, and EtcdWorker make sure the state of ReactorState is a consistent snapshot of ETCD servers
 4. Reactor is implemented by the upper layer application. Usually, Reactor will produce DataPatches when the Tick function called
    EtcdWorker apply all the DataPatches produced by Reactor
 5. EtcdWorker commits a txn to ETCD according to DataPatches

The upper layer application which is a user of EtcdWorker only need to implement Reactor and ReactorState interface. The ReactorState is used to maintenance status of ETCD, and the Reactor can produce DataPatches differently according to the ReactorState. The EtcdWorker make sure any ReactorState which perceived by Reactor must be a consistent snapshot of ETCD servers.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func InitMetrics

func InitMetrics(registry *prometheus.Registry)

InitMetrics registers all metrics in this file

Types

type ChangefeedReactorState

type ChangefeedReactorState struct {
	ID            model.ChangeFeedID
	Info          *model.ChangeFeedInfo
	Status        *model.ChangeFeedStatus
	TaskPositions map[model.CaptureID]*model.TaskPosition
	TaskStatuses  map[model.CaptureID]*model.TaskStatus
	Workloads     map[model.CaptureID]model.TaskWorkload
	// contains filtered or unexported fields
}

ChangefeedReactorState represents a changefeed state which stores all key-value pairs of a changefeed in ETCD

func NewChangefeedReactorState

func NewChangefeedReactorState(id model.ChangeFeedID) *ChangefeedReactorState

NewChangefeedReactorState creates a new changefeed reactor state

func (*ChangefeedReactorState) Active

func (s *ChangefeedReactorState) Active(captureID model.CaptureID) bool

Active return true if the changefeed is ready to be processed

func (*ChangefeedReactorState) CheckCaptureAlive

func (s *ChangefeedReactorState) CheckCaptureAlive(captureID model.CaptureID)

CheckCaptureAlive checks if the capture is alive, if the capture offline, the etcd worker will exit and throw the ErrLeaseExpired error.

func (*ChangefeedReactorState) CheckChangefeedNormal

func (s *ChangefeedReactorState) CheckChangefeedNormal()

CheckChangefeedNormal checks if the changefeed state is runable, if the changefeed status is not runable, the etcd worker will skip all patch of this tick the processor should call this function every tick to make sure the changefeed is runable

func (*ChangefeedReactorState) Exist

func (s *ChangefeedReactorState) Exist() bool

Exist returns false if all keys of this changefeed in ETCD is not exist

func (*ChangefeedReactorState) GetPatches

func (s *ChangefeedReactorState) GetPatches() [][]DataPatch

GetPatches implements the ReactorState interface

func (*ChangefeedReactorState) PatchInfo

PatchInfo appends a DataPatch which can modify the ChangeFeedInfo

func (*ChangefeedReactorState) PatchStatus

PatchStatus appends a DataPatch which can modify the ChangeFeedStatus

func (*ChangefeedReactorState) PatchTaskPosition

func (s *ChangefeedReactorState) PatchTaskPosition(captureID model.CaptureID, fn func(*model.TaskPosition) (*model.TaskPosition, bool, error))

PatchTaskPosition appends a DataPatch which can modify the TaskPosition of a specified capture

func (*ChangefeedReactorState) PatchTaskStatus

func (s *ChangefeedReactorState) PatchTaskStatus(captureID model.CaptureID, fn func(*model.TaskStatus) (*model.TaskStatus, bool, error))

PatchTaskStatus appends a DataPatch which can modify the TaskStatus of a specified capture

func (*ChangefeedReactorState) PatchTaskWorkload

func (s *ChangefeedReactorState) PatchTaskWorkload(captureID model.CaptureID, fn func(model.TaskWorkload) (model.TaskWorkload, bool, error))

PatchTaskWorkload appends a DataPatch which can modify the TaskWorkload of a specified capture

func (*ChangefeedReactorState) Update

func (s *ChangefeedReactorState) Update(key util.EtcdKey, value []byte, _ bool) error

Update implements the ReactorState interface

func (*ChangefeedReactorState) UpdateCDCKey

func (s *ChangefeedReactorState) UpdateCDCKey(key *etcd.CDCKey, value []byte) error

UpdateCDCKey updates the state by a parsed etcd key

type DataPatch

type DataPatch interface {
	Patch(valueMap map[util.EtcdKey][]byte, changedSet map[util.EtcdKey]struct{}) error
}

DataPatch represents an update of state

type EtcdWorker

type EtcdWorker struct {
	// contains filtered or unexported fields
}

EtcdWorker handles all interactions with Etcd

func NewEtcdWorker

func NewEtcdWorker(client *etcd.Client, prefix string, reactor Reactor, initState ReactorState) (*EtcdWorker, error)

NewEtcdWorker returns a new EtcdWorker

func (*EtcdWorker) Run

func (worker *EtcdWorker) Run(ctx context.Context, session *concurrency.Session, timerInterval time.Duration, captureAddr string) error

Run starts the EtcdWorker event loop. A tick is generated either on a timer whose interval is timerInterval, or on an Etcd event. If the specified etcd session is Done, this Run function will exit with cerrors.ErrEtcdSessionDone. And the specified etcd session is nil-safety.

type GlobalReactorState

type GlobalReactorState struct {
	Owner       map[string]struct{}
	Captures    map[model.CaptureID]*model.CaptureInfo
	Changefeeds map[model.ChangeFeedID]*ChangefeedReactorState
	// contains filtered or unexported fields
}

GlobalReactorState represents a global state which stores all key-value pairs in ETCD

func NewGlobalState

func NewGlobalState() *GlobalReactorState

NewGlobalState creates a new global state

func (*GlobalReactorState) GetPatches

func (s *GlobalReactorState) GetPatches() [][]DataPatch

GetPatches implements the ReactorState interface Every []DataPatch slice in [][]DataPatch slice is the patches of a ChangefeedReactorState

func (*GlobalReactorState) SetOnCaptureAdded

func (s *GlobalReactorState) SetOnCaptureAdded(f func(captureID model.CaptureID, addr string))

SetOnCaptureAdded registers a function that is called when a capture goes online.

func (*GlobalReactorState) SetOnCaptureRemoved

func (s *GlobalReactorState) SetOnCaptureRemoved(f func(captureID model.CaptureID))

SetOnCaptureRemoved registers a function that is called when a capture goes offline.

func (*GlobalReactorState) Update

func (s *GlobalReactorState) Update(key util.EtcdKey, value []byte, _ bool) error

Update implements the ReactorState interface

type MultiDataPatch

type MultiDataPatch func(valueMap map[util.EtcdKey][]byte, changedSet map[util.EtcdKey]struct{}) error

MultiDataPatch represents an update to many keys

func (MultiDataPatch) Patch

func (m MultiDataPatch) Patch(valueMap map[util.EtcdKey][]byte, changedSet map[util.EtcdKey]struct{}) error

Patch implements the DataPatch interface

type Reactor

type Reactor interface {
	Tick(ctx context.Context, state ReactorState) (nextState ReactorState, err error)
}

Reactor is a stateful transform of states. It models Owner and Processor, which reacts according to updates in Etcd.

type ReactorState

type ReactorState interface {
	// Update is called by EtcdWorker to notify the Reactor of a latest change to the Etcd state.
	Update(key util.EtcdKey, value []byte, isInit bool) error

	// GetPatches is called by EtcdWorker, and should return many slices of data patches that represents the changes
	// that a Reactor wants to apply to Etcd.
	// a slice of DataPatch will be committed as one ETCD txn
	GetPatches() [][]DataPatch
}

ReactorState models the Etcd state of a reactor

type ReactorStateTester

type ReactorStateTester struct {
	// contains filtered or unexported fields
}

ReactorStateTester is a helper struct for unit-testing an implementer of ReactorState

func NewReactorStateTester

func NewReactorStateTester(c *check.C, state ReactorState, initKVEntries map[string]string) *ReactorStateTester

NewReactorStateTester creates a new ReactorStateTester

func (*ReactorStateTester) ApplyPatches

func (t *ReactorStateTester) ApplyPatches() error

ApplyPatches calls the GetPatches method on the ReactorState and apply the changes to the mocked kv-store.

func (*ReactorStateTester) KVEntries

func (t *ReactorStateTester) KVEntries() map[string]string

KVEntries returns the contents of the mocked KV store.

func (*ReactorStateTester) MustApplyPatches

func (t *ReactorStateTester) MustApplyPatches()

MustApplyPatches calls ApplyPatches and must successfully

func (*ReactorStateTester) MustUpdate

func (t *ReactorStateTester) MustUpdate(key string, value []byte)

MustUpdate calls Update and must successfully

func (*ReactorStateTester) Update

func (t *ReactorStateTester) Update(key string, value []byte) error

Update is used to update keys in the mocked kv-store.

type SingleDataPatch

type SingleDataPatch struct {
	Key util.EtcdKey
	// Func should be a pure function that returns a new value given the old value.
	// The function is called each time the EtcdWorker initiates an Etcd transaction.
	Func func(old []byte) (newValue []byte, changed bool, err error)
}

SingleDataPatch represents an update to a given Etcd key

func (*SingleDataPatch) Patch

func (s *SingleDataPatch) Patch(valueMap map[util.EtcdKey][]byte, changedSet map[util.EtcdKey]struct{}) error

Patch implements the DataPatch interface

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL