framework

package
v0.0.0-...-be15534 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 30, 2024 License: Apache-2.0 Imports: 47 Imported by: 0

Documentation

Index

Constants

View Source
const (
	MasterTimedOut = MasterFailoverReasonCode(iota + 1)
	MasterReportedError
)

Defines all reason codes

View Source
const (
	ExitReasonUnknown = ExitReason(iota)
	ExitReasonFinished
	ExitReasonCanceled
	ExitReasonFailed
)

define some ExitReason

Variables

This section is empty.

Functions

func MockBaseMasterCreateWorker

func MockBaseMasterCreateWorker(
	t *testing.T,
	master *DefaultBaseMaster,
	workerType frameModel.WorkerType,
	config WorkerConfig,
	masterID frameModel.MasterID,
	workerID frameModel.WorkerID,
	executorID model.ExecutorID,
	resources []resModel.ResourceID,
	workerEpoch frameModel.Epoch,
)

MockBaseMasterCreateWorker mocks to create worker in base master

func MockBaseMasterCreateWorkerMetScheduleTaskError

func MockBaseMasterCreateWorkerMetScheduleTaskError(
	t *testing.T,
	master *DefaultBaseMaster,
	workerType frameModel.WorkerType,
	config WorkerConfig,
	masterID frameModel.MasterID,
	workerID frameModel.WorkerID,
	executorID model.ExecutorID,
)

MockBaseMasterCreateWorkerMetScheduleTaskError mocks ScheduleTask meets error

func MockBaseMasterWorkerHeartbeat

func MockBaseMasterWorkerHeartbeat(
	t *testing.T,
	master *DefaultBaseMaster,
	masterID frameModel.MasterID,
	workerID frameModel.WorkerID,
	executorID p2p.NodeID,
) error

MockBaseMasterWorkerHeartbeat sends HeartbeatPingMessage with mock message handler

func MockBaseMasterWorkerUpdateStatus

func MockBaseMasterWorkerUpdateStatus(
	ctx context.Context,
	t *testing.T,
	master *DefaultBaseMaster,
	masterID frameModel.MasterID,
	workerID frameModel.WorkerID,
	executorID p2p.NodeID,
	status *frameModel.WorkerStatus,
)

MockBaseMasterWorkerUpdateStatus mocks to store status in metastore and sends WorkerStatusMessage.

func MockBaseWorkerCheckSendMessage

func MockBaseWorkerCheckSendMessage(
	t *testing.T,
	worker *DefaultBaseWorker,
	topic p2p.Topic,
	message interface{},
)

MockBaseWorkerCheckSendMessage checks can receive one message from mock message sender

func MockBaseWorkerWaitUpdateStatus

func MockBaseWorkerWaitUpdateStatus(
	t *testing.T,
	worker *DefaultBaseWorker,
)

MockBaseWorkerWaitUpdateStatus checks can receive a update status message from mock message sender

func MockMasterPrepareMeta

func MockMasterPrepareMeta(ctx context.Context, t *testing.T, master *MockMasterImpl)

MockMasterPrepareMeta simulates the meta persistence for MockMasterImpl

func MustConvertWorkerType2JobType

func MustConvertWorkerType2JobType(tp WorkerType) engineModel.JobType

MustConvertWorkerType2JobType return the job type of worker type. Panic if it fail. TODO: let user register a unique identifier for the metric prefix

Types

type BaseJobMaster

type BaseJobMaster interface {
	Worker

	// MetaKVClient return business metastore kv client with job-level isolation
	MetaKVClient() metaModel.KVClient

	// MetricFactory return a promethus factory with some underlying labels(e.g. job-id, work-id)
	MetricFactory() promutil.Factory

	// Logger return a zap logger with some underlying fields(e.g. job-id)
	Logger() *zap.Logger

	// GetWorkers return the handle of all workers, from which we can get the worker status、worker id and
	// the method for sending message to specific worker
	GetWorkers() map[frameModel.WorkerID]WorkerHandle

	// CreateWorker requires the framework to dispatch a new worker.
	// If the worker needs to access certain file system resources, it must pass
	// resource ID via CreateWorkerOpt
	CreateWorker(
		workerType frameModel.WorkerType,
		config WorkerConfig,
		opts ...CreateWorkerOpt,
	) (frameModel.WorkerID, error)

	// UpdateJobStatus updates jobmaster(worker of jobmanager) status and
	// sends a 'status updated' message to jobmanager
	UpdateJobStatus(ctx context.Context, status frameModel.WorkerStatus) error

	// CurrentEpoch return the epoch of current job
	CurrentEpoch() frameModel.Epoch

	// SendMessage sends a message of specific topic to jobmanager in a blocking or nonblocking way
	SendMessage(ctx context.Context, topic p2p.Topic, message interface{}, nonblocking bool) error

	// Exit should be called when jobmaster (in user logic) wants to exit.
	// exitReason: ExitReasonFinished/ExitReasonCanceled/ExitReasonFailed
	Exit(ctx context.Context, exitReason ExitReason, err error, detail []byte) error

	// IsMasterReady returns whether the master has received heartbeats for all
	// workers after a fail-over. If this is the first time the JobMaster started up,
	// the return value is always true.
	IsMasterReady() bool

	// IsBaseJobMaster is an empty function used to prevent accidental implementation
	// of this interface.
	IsBaseJobMaster()

	// GetEnabledBucketStorage returns whether the bucket storage is enabled and the corresponding resource type
	// if the bucket exists
	GetEnabledBucketStorage() (bool, resModel.ResourceType)
}

BaseJobMaster defines an interface that can work as a job master, it embeds a Worker interface which can run on dataflow engine runtime, and also provides some utility methods.

func NewBaseJobMaster

func NewBaseJobMaster(
	ctx *dcontext.Context,
	jobMasterImpl JobMasterImpl,
	masterID frameModel.MasterID,
	workerID frameModel.WorkerID,
	tp frameModel.WorkerType,
	workerEpoch frameModel.Epoch,
) BaseJobMaster

NewBaseJobMaster creates a new DefaultBaseJobMaster instance

type BaseJobMasterExt

type BaseJobMasterExt interface {
	// TriggerOpenAPIInitialize is used to trigger the initialization of openapi handler.
	// It just delegates to the JobMasterImpl.OnOpenAPIInitialized.
	TriggerOpenAPIInitialize(apiGroup *gin.RouterGroup)

	// IsBaseJobMasterExt is an empty function used to prevent accidental implementation
	// of this interface.
	IsBaseJobMasterExt()
}

BaseJobMasterExt extends BaseJobMaster with some extra methods. These methods are used by framework and is not visible to JobMasterImpl.

type BaseMaster

type BaseMaster interface {
	Master

	// MetaKVClient return business metastore kv client with job-level isolation
	MetaKVClient() metaModel.KVClient

	// MetricFactory return a promethus factory with some underlying labels(e.g. job-id, work-id)
	MetricFactory() promutil.Factory

	// Logger return a zap logger with some underlying fields(e.g. job-id)
	Logger() *zap.Logger

	// MasterMeta return the meta data of master
	MasterMeta() *frameModel.MasterMeta

	// GetWorkers return the handle of all workers, from which we can get the worker status、worker id and
	// the method for sending message to specific worker
	GetWorkers() map[frameModel.WorkerID]WorkerHandle

	// IsMasterReady returns whether the master has received heartbeats for all
	// workers after a fail-over. If this is the first time the JobMaster started up,
	// the return value is always true.
	IsMasterReady() bool

	// Exit should be called when master (in user logic) wants to exit.
	// exitReason: ExitReasonFinished/ExitReasonCanceled/ExitReasonFailed
	// NOTE: Currently, no implement has used this method, but we still keep it to make the interface intact
	Exit(ctx context.Context, exitReason ExitReason, err error, detail []byte) error

	// CreateWorker is the latest version of CreateWorker, but with
	// a more flexible way of passing options.
	// If the worker needs to access certain file system resources, it must pass
	// resource ID via CreateWorkerOpt
	CreateWorker(
		workerType frameModel.WorkerType,
		config WorkerConfig,
		opts ...CreateWorkerOpt,
	) (frameModel.WorkerID, error)
}

BaseMaster defines the master interface, it embeds the Master interface and contains more core logic of a master

func NewBaseMaster

func NewBaseMaster(
	ctx *dcontext.Context,
	impl MasterImpl,
	id frameModel.MasterID,
	tp frameModel.WorkerType,
) BaseMaster

NewBaseMaster creates a new DefaultBaseMaster instance

type BaseWorker

type BaseWorker interface {
	Worker

	// MetaKVClient return business metastore kv client with job-level isolation
	MetaKVClient() metaModel.KVClient

	// MetricFactory return a promethus factory with some underlying labels(e.g. job-id, work-id)
	MetricFactory() promutil.Factory

	// Logger return a zap logger with some underlying fields(e.g. job-id)
	Logger() *zap.Logger

	// UpdateStatus persists the status to framework metastore if worker status is changed and
	// sends 'status updated message' to master.
	UpdateStatus(ctx context.Context, status frameModel.WorkerStatus) error

	// SendMessage sends a message of specific topic to master in a blocking or nonblocking way
	SendMessage(ctx context.Context, topic p2p.Topic, message interface{}, nonblocking bool) error

	// OpenStorage creates a resource and return the resource handle
	OpenStorage(
		ctx context.Context, resourcePath resModel.ResourceID, opts ...broker.OpenStorageOption,
	) (broker.Handle, error)

	// GetEnabledBucketStorage returns whether the bucket storage is enabled and
	// the resource type if bucket exists
	GetEnabledBucketStorage() (bool, resModel.ResourceType)

	// Exit should be called when worker (in user logic) wants to exit.
	// exitReason: ExitReasonFinished/ExitReasonCanceled/ExitReasonFailed
	Exit(ctx context.Context, exitReason ExitReason, err error, extBytes []byte) error
}

BaseWorker defines the worker interface, it embeds a Worker interface and adds more utility methods TODO: decouple the BaseWorker and WorkerService(for business)

func NewBaseWorker

func NewBaseWorker(
	ctx *dcontext.Context,
	impl WorkerImpl,
	workerID frameModel.WorkerID,
	masterID frameModel.MasterID,
	tp frameModel.WorkerType,
	epoch frameModel.Epoch,
) BaseWorker

NewBaseWorker creates a new BaseWorker instance

type BaseWorkerForTesting

type BaseWorkerForTesting struct {
	*DefaultBaseWorker
	Broker *broker.MockBroker
}

BaseWorkerForTesting mocks base worker

func MockBaseWorker

func MockBaseWorker(
	workerID frameModel.WorkerID,
	masterID frameModel.MasterID,
	workerImpl WorkerImpl,
) *BaseWorkerForTesting

MockBaseWorker creates a mock base worker for test

type CreateWorkerOpt

type CreateWorkerOpt = master.CreateWorkerOpt

CreateWorkerOpt specifies an option for creating a worker.

func CreateWorkerWithResourceRequirements

func CreateWorkerWithResourceRequirements(resources ...resModel.ResourceID) CreateWorkerOpt

CreateWorkerWithResourceRequirements specifies the resource requirement of a worker.

func CreateWorkerWithSelectors

func CreateWorkerWithSelectors(selectors ...*label.Selector) CreateWorkerOpt

CreateWorkerWithSelectors specifies the selectors used to dispatch the worker.

type DefaultBaseJobMaster

type DefaultBaseJobMaster struct {
	// contains filtered or unexported fields
}

DefaultBaseJobMaster implements BaseJobMaster interface

func (*DefaultBaseJobMaster) Close

func (d *DefaultBaseJobMaster) Close(ctx context.Context) error

Close implements BaseJobMaster.Close

func (*DefaultBaseJobMaster) CreateWorker

func (d *DefaultBaseJobMaster) CreateWorker(
	workerType frameModel.WorkerType,
	config WorkerConfig,
	opts ...CreateWorkerOpt,
) (frameModel.WorkerID, error)

CreateWorker implements BaseJobMaster.CreateWorker

func (*DefaultBaseJobMaster) CurrentEpoch

func (d *DefaultBaseJobMaster) CurrentEpoch() frameModel.Epoch

CurrentEpoch implements BaseJobMaster.CurrentEpoch

func (*DefaultBaseJobMaster) Exit

func (d *DefaultBaseJobMaster) Exit(ctx context.Context, exitReason ExitReason, err error, detail []byte) error

Exit implements BaseJobMaster.Exit

func (*DefaultBaseJobMaster) GetEnabledBucketStorage

func (d *DefaultBaseJobMaster) GetEnabledBucketStorage() (bool, resModel.ResourceType)

GetEnabledBucketStorage implements BaseJobMaster.GetEnabledBucketStorage

func (*DefaultBaseJobMaster) GetWorkers

GetWorkers implements BaseJobMaster.GetWorkers

func (*DefaultBaseJobMaster) ID

ID delegates the ID of inner worker

func (*DefaultBaseJobMaster) Init

Init implements BaseJobMaster.Init

func (*DefaultBaseJobMaster) IsBaseJobMaster

func (d *DefaultBaseJobMaster) IsBaseJobMaster()

IsBaseJobMaster implements BaseJobMaster.IsBaseJobMaster

func (*DefaultBaseJobMaster) IsBaseJobMasterExt

func (d *DefaultBaseJobMaster) IsBaseJobMasterExt()

IsBaseJobMasterExt implements BaseJobMaster.IsBaseJobMasterExt.

func (*DefaultBaseJobMaster) IsMasterReady

func (d *DefaultBaseJobMaster) IsMasterReady() bool

IsMasterReady implements BaseJobMaster.IsMasterReady

func (*DefaultBaseJobMaster) Logger

func (d *DefaultBaseJobMaster) Logger() *zap.Logger

Logger implements BaseJobMaster.Logger

func (*DefaultBaseJobMaster) MetaKVClient

func (d *DefaultBaseJobMaster) MetaKVClient() metaModel.KVClient

MetaKVClient implements BaseJobMaster.MetaKVClient

func (*DefaultBaseJobMaster) MetricFactory

func (d *DefaultBaseJobMaster) MetricFactory() promutil.Factory

MetricFactory implements BaseJobMaster.MetricFactory

func (*DefaultBaseJobMaster) NotifyExit

func (d *DefaultBaseJobMaster) NotifyExit(ctx context.Context, errIn error) (retErr error)

NotifyExit implements BaseJobMaster interface

func (*DefaultBaseJobMaster) Poll

Poll implements BaseJobMaster.Poll

func (*DefaultBaseJobMaster) SendMessage

func (d *DefaultBaseJobMaster) SendMessage(ctx context.Context, topic p2p.Topic, message interface{}, nonblocking bool) error

SendMessage delegates the SendMessage or inner worker

func (*DefaultBaseJobMaster) Stop

Stop implements BaseJobMaster.Stop

func (*DefaultBaseJobMaster) TriggerOpenAPIInitialize

func (d *DefaultBaseJobMaster) TriggerOpenAPIInitialize(apiGroup *gin.RouterGroup)

TriggerOpenAPIInitialize implements BaseJobMasterExt.TriggerOpenAPIInitialize.

func (*DefaultBaseJobMaster) UpdateJobStatus

func (d *DefaultBaseJobMaster) UpdateJobStatus(ctx context.Context, status frameModel.WorkerStatus) error

UpdateJobStatus implements BaseJobMaster.UpdateJobStatus

func (*DefaultBaseJobMaster) UpdateStatus

func (d *DefaultBaseJobMaster) UpdateStatus(ctx context.Context, status frameModel.WorkerStatus) error

UpdateStatus delegates the UpdateStatus of inner worker

type DefaultBaseMaster

type DefaultBaseMaster struct {
	Impl MasterImpl
	// contains filtered or unexported fields
}

DefaultBaseMaster implements BaseMaster interface

func MockBaseMaster

func MockBaseMaster(t *testing.T, id frameModel.MasterID, masterImpl MasterImpl) *DefaultBaseMaster

MockBaseMaster returns a mock DefaultBaseMaster

func (*DefaultBaseMaster) Close

func (m *DefaultBaseMaster) Close(ctx context.Context) error

Close implements BaseMaster.Close

func (*DefaultBaseMaster) CreateWorker

func (m *DefaultBaseMaster) CreateWorker(
	workerType frameModel.WorkerType,
	config WorkerConfig,
	opts ...CreateWorkerOpt,
) (frameModel.WorkerID, error)

CreateWorker implements BaseMaster.CreateWorker

func (*DefaultBaseMaster) DeleteProjectInfo

func (m *DefaultBaseMaster) DeleteProjectInfo(workerID frameModel.WorkerID)

DeleteProjectInfo delete the project info of specific worker NOTICEL Only used by JobMananger when stop job

func (*DefaultBaseMaster) Exit

func (m *DefaultBaseMaster) Exit(ctx context.Context, exitReason ExitReason, err error, detail []byte) error

Exit implements BaseMaster.Exit NOTE: Currently, no implement has used this method, but we still keep it to make the interface intact

func (*DefaultBaseMaster) GetProjectInfo

func (m *DefaultBaseMaster) GetProjectInfo(masterID frameModel.MasterID) tenant.ProjectInfo

GetProjectInfo get the project info of the worker [WARN]: Once 'DeleteProjectInfo' is called, 'GetProjectInfo' may return unexpected project info For JobManager: It will set the <jobID, projectInfo> pair in advance. So if we call 'GetProjectInfo' before 'DeleteProjectInfo', we can expect a correct projectInfo. For JobMaster: Master and worker always have the same projectInfo and workerProjectMap is empty

func (*DefaultBaseMaster) GetWorkers

func (m *DefaultBaseMaster) GetWorkers() map[frameModel.WorkerID]WorkerHandle

GetWorkers implements BaseMaster.GetWorkers

func (*DefaultBaseMaster) Init

func (m *DefaultBaseMaster) Init(ctx context.Context) error

Init implements BaseMaster.Init

func (*DefaultBaseMaster) InitProjectInfosAfterRecover

func (m *DefaultBaseMaster) InitProjectInfosAfterRecover(jobs []*frameModel.MasterMeta)

InitProjectInfosAfterRecover set project infos for all worker after master recover NOTICE: Only used by JobMananger when failover

func (*DefaultBaseMaster) IsMasterReady

func (m *DefaultBaseMaster) IsMasterReady() bool

IsMasterReady implements BaseMaster.IsMasterReady

func (*DefaultBaseMaster) Logger

func (m *DefaultBaseMaster) Logger() *zap.Logger

Logger implements BaseMaster.Logger

func (*DefaultBaseMaster) MasterID

func (m *DefaultBaseMaster) MasterID() frameModel.MasterID

MasterID implements BaseMaster.MasterID

func (*DefaultBaseMaster) MasterMeta

func (m *DefaultBaseMaster) MasterMeta() *frameModel.MasterMeta

MasterMeta implements BaseMaster.MasterMeta

func (*DefaultBaseMaster) MetaKVClient

func (m *DefaultBaseMaster) MetaKVClient() metaModel.KVClient

MetaKVClient returns the business space metaclient

func (*DefaultBaseMaster) MetricFactory

func (m *DefaultBaseMaster) MetricFactory() promutil.Factory

MetricFactory implements BaseMaster.MetricFactory

func (*DefaultBaseMaster) NotifyExit

func (m *DefaultBaseMaster) NotifyExit(ctx context.Context, errIn error) error

NotifyExit implements BaseWorker.NotifyExit

func (*DefaultBaseMaster) Poll

func (m *DefaultBaseMaster) Poll(ctx context.Context) error

Poll implements BaseMaster.Poll

func (*DefaultBaseMaster) PrepareWorkerConfig

func (m *DefaultBaseMaster) PrepareWorkerConfig(
	workerType frameModel.WorkerType, config WorkerConfig,
) (rawConfig []byte, workerID frameModel.WorkerID, err error)

PrepareWorkerConfig extracts information from WorkerConfig into detail fields.

  • If workerType is master type, the config is a `*MasterMeta` struct and contains pre allocated maseter ID, and json marshalled config.
  • If workerType is worker type, the config is a user defined config struct, we marshal it to byte slice as returned config, and generate a random WorkerID.

func (*DefaultBaseMaster) SetProjectInfo

func (m *DefaultBaseMaster) SetProjectInfo(workerID frameModel.WorkerID, projectInfo tenant.ProjectInfo)

SetProjectInfo set the project info of specific worker [NOTICE]: Only used by JobManager to set project for different job(worker for jobmanager)

func (*DefaultBaseMaster) Stop

func (m *DefaultBaseMaster) Stop(ctx context.Context) error

Stop implements Master.Stop

type DefaultBaseWorker

type DefaultBaseWorker struct {
	Impl WorkerImpl
	// contains filtered or unexported fields
}

DefaultBaseWorker implements BaseWorker interface, it also embeds an Impl which implements the WorkerImpl interface and passed from business logic.

func (*DefaultBaseWorker) Close

func (w *DefaultBaseWorker) Close(ctx context.Context) error

Close implements BaseWorker.Close TODO remove the return value from the signature.

func (*DefaultBaseWorker) Exit

func (w *DefaultBaseWorker) Exit(ctx context.Context, exitReason ExitReason, err error, extBytes []byte) (errRet error)

Exit implements BaseWorker.Exit

func (*DefaultBaseWorker) GetEnabledBucketStorage

func (w *DefaultBaseWorker) GetEnabledBucketStorage() (bool, resModel.ResourceType)

GetEnabledBucketStorage implements BaseWorker.GetEnabledBucketStorage

func (*DefaultBaseWorker) ID

ID implements BaseWorker.ID

func (*DefaultBaseWorker) Init

func (w *DefaultBaseWorker) Init(ctx context.Context) error

Init implements BaseWorker.Init

func (*DefaultBaseWorker) Logger

func (w *DefaultBaseWorker) Logger() *zap.Logger

Logger implements BaseMaster.Logger

func (*DefaultBaseWorker) MetaKVClient

func (w *DefaultBaseWorker) MetaKVClient() metaModel.KVClient

MetaKVClient implements BaseWorker.MetaKVClient

func (*DefaultBaseWorker) MetricFactory

func (w *DefaultBaseWorker) MetricFactory() promutil.Factory

MetricFactory implements BaseWorker.MetricFactory

func (*DefaultBaseWorker) NotifyExit

func (w *DefaultBaseWorker) NotifyExit(ctx context.Context, errIn error) (retErr error)

NotifyExit implements BaseWorker.NotifyExit

func (*DefaultBaseWorker) OpenStorage

func (w *DefaultBaseWorker) OpenStorage(
	ctx context.Context, resourcePath resModel.ResourceID, opts ...broker.OpenStorageOption,
) (broker.Handle, error)

OpenStorage implements BaseWorker.OpenStorage

func (*DefaultBaseWorker) Poll

func (w *DefaultBaseWorker) Poll(ctx context.Context) error

Poll implements BaseWorker.Poll

func (*DefaultBaseWorker) SendMessage

func (w *DefaultBaseWorker) SendMessage(
	ctx context.Context,
	topic p2p.Topic,
	message interface{},
	nonblocking bool,
) error

SendMessage implements BaseWorker.SendMessage

func (*DefaultBaseWorker) Stop

func (w *DefaultBaseWorker) Stop(ctx context.Context) error

Stop implements Worker.Stop, works the same as Worker.Close

func (*DefaultBaseWorker) UpdateStatus

func (w *DefaultBaseWorker) UpdateStatus(ctx context.Context, status frameModel.WorkerStatus) error

UpdateStatus updates the worker's status and tries to notify the master. The status is persisted if State or ErrorMsg has changed. Refer to (*WorkerState).HasSignificantChange.

If UpdateStatus returns without an error, then the status must have been persisted, but there is no guarantee that the master has received a notification. Note that if the master cannot handle the notifications fast enough, notifications can be lost.

type ExitReason

type ExitReason int

ExitReason is the type for exit reason

func WorkerStateToExitReason

func WorkerStateToExitReason(code frameModel.WorkerState) ExitReason

WorkerStateToExitReason translates WorkerState to ExitReason TODO: business logic should not sense 'WorkerState'

type JobMasterImpl

type JobMasterImpl interface {
	MasterImpl

	// OnCancel is triggered when a cancel message is received. It can be
	// triggered multiple times.
	// TODO: when it returns error, framework should close this jobmaster.
	OnCancel(ctx context.Context) error
	// OnOpenAPIInitialized is called as the first callback function of the JobMasterImpl
	// instance, the business logic should only register the OpenAPI handler in it.
	// The implementation must not retain the apiGroup.
	// Note: this function is called before Init().
	// Concurrent safety:
	// - this function is called as the first callback function of an JobMasterImpl
	//   instance, and it's not concurrent with other callbacks.
	OnOpenAPIInitialized(apiGroup *gin.RouterGroup)

	// IsJobMasterImpl is an empty function used to prevent accidental implementation
	// of this interface.
	IsJobMasterImpl()
}

JobMasterImpl is the implementation of a job master of dataflow engine. the implementation struct must embed the framework.BaseJobMaster interface, this interface will be initialized by the framework.

type Master

type Master interface {
	Init(ctx context.Context) error
	Poll(ctx context.Context) error
	MasterID() frameModel.MasterID
	Close(ctx context.Context) error
	Stop(ctx context.Context) error
	NotifyExit(ctx context.Context, errIn error) error
}

Master defines a basic interface that can run in dataflow engine runtime

type MasterFailoverReason

type MasterFailoverReason struct {
	Code     MasterFailoverReasonCode
	ErrorMsg string
}

MasterFailoverReason contains failover reason code and error message

type MasterFailoverReasonCode

type MasterFailoverReasonCode int32

MasterFailoverReasonCode is used as reason code

type MasterImpl

type MasterImpl interface {
	// InitImpl is called at the first time the MasterImpl instance is initialized
	// after OnOpenAPIInitialized. When InitImpl returns without error, framework
	// will try to persist an internal state so further failover will call OnMasterRecovered
	// rather than InitImpl.
	// Return:
	// - error to let the framework call CloseImpl, and framework may retry InitImpl
	//   later for some times. For non-retryable failure, business logic should
	//   call Exit.
	// Concurrent safety:
	// - this function is not concurrent with other callbacks.
	InitImpl(ctx context.Context) error

	// OnMasterRecovered is called when the MasterImpl instance has failover from
	// error by framework. For this MasterImpl instance, it's called after OnOpenAPIInitialized.
	// Return:
	// - error to let the framework call CloseImpl.
	// Concurrent safety:
	// - this function is not concurrent with other callbacks.
	OnMasterRecovered(ctx context.Context) error

	// Tick is called on a fixed interval after MasterImpl's InitImpl or OnMasterRecovered,
	// business logic can do some periodic tasks here.
	// Return:
	// - error to let the framework call CloseImpl.
	// Concurrent safety:
	// - this function may be concurrently called with other callbacks except for
	//   Tick itself, OnOpenAPIInitialized, InitImpl, OnMasterRecovered, CloseImpl,
	//   StopImpl.
	Tick(ctx context.Context) error

	// OnWorkerDispatched is called when the asynchronized action of CreateWorker
	// is finished. Only after OnWorkerDispatched, OnWorkerOnline and OnWorkerStatusUpdated
	// of the same worker may be called.
	// Return:
	// - error to let the framework call CloseImpl.
	// Concurrent safety:
	// - this function may be concurrently called with another worker's OnWorkerXXX,
	//   Tick, CloseImpl, StopImpl, OnCancel.
	OnWorkerDispatched(worker WorkerHandle, result error) error

	// OnWorkerOnline is called when the first heartbeat for a worker is received.
	// NOTE: OnWorkerOffline can appear without OnWorkerOnline
	// Return:
	// - error to let the framework call CloseImpl.
	// Concurrent safety:
	// - this function may be concurrently called with another worker's OnWorkerXXX,
	//   Tick, CloseImpl, StopImpl, OnCancel, the same worker's OnWorkerStatusUpdated.
	OnWorkerOnline(worker WorkerHandle) error

	// OnWorkerOffline is called as the consequence of worker's Exit or heartbeat
	// timed out. It's the last callback function among OnWorkerXXX for a worker.
	// Return:
	// - error to let the framework call CloseImpl.
	// Concurrent safety:
	// - this function may be concurrently called with another worker's OnWorkerXXX,
	//   Tick, CloseImpl, StopImpl, OnCancel.
	OnWorkerOffline(worker WorkerHandle, reason error) error

	// OnWorkerMessage is called when a customized message is received.
	OnWorkerMessage(worker WorkerHandle, topic p2p.Topic, message interface{}) error

	// OnWorkerStatusUpdated is called as the consequence of worker's UpdateStatus.
	// Return:
	// - error to let the framework call CloseImpl.
	// Concurrent safety:
	// - this function may be concurrently called with another worker's OnWorkerXXX,
	//   Tick, CloseImpl, StopImpl, OnCancel, the same worker's OnWorkerOnline.
	OnWorkerStatusUpdated(worker WorkerHandle, newStatus *frameModel.WorkerStatus) error

	// CloseImpl is called as the consequence of returning error from InitImpl,
	// OnMasterRecovered or Tick, the Tick will be stopped after entering this function.
	// And framework may try to create a new masterImpl instance afterwards.
	// Business logic is expected to release resources here, but business developer
	// should be aware that when the runtime is crashed, CloseImpl has no time to
	// be called.
	// TODO: no other callbacks will be called after and concurrent with CloseImpl
	// Concurrent safety:
	// - this function may be concurrently called with OnWorkerMessage, OnCancel,
	//   OnWorkerDispatched, OnWorkerOnline, OnWorkerOffline, OnWorkerStatusUpdated.
	CloseImpl(ctx context.Context)

	// StopImpl is called the consequence of business logic calls Exit. Tick will
	// be stopped after entering this function, and framework will treat this MasterImpl
	// as non-recoverable,
	// There's at most one invocation to StopImpl after Exit. If the runtime is
	// crashed, StopImpl has no time to be called.
	// Concurrent safety:
	// - this function may be concurrently called with OnWorkerMessage, OnCancel,
	//   OnWorkerDispatched, OnWorkerOnline, OnWorkerOffline, OnWorkerStatusUpdated.
	StopImpl(ctx context.Context)
}

MasterImpl defines the interface to implement a master, business logic can be added in the functions of this interface

type MessageRouter

type MessageRouter struct {
	// contains filtered or unexported fields
}

MessageRouter is a SPSC(single producer, single consumer) work model, since the message frequency is not high, we use a simple channel for message transit.

func NewMessageRouter

func NewMessageRouter(
	workerID frameModel.WorkerID,
	pool workerpool.AsyncPool,
	bufferSize int,
	routeFn func(topic p2p.Topic, msg p2p.MessageValue) error,
) *MessageRouter

NewMessageRouter creates a new MessageRouter

func (*MessageRouter) AppendMessage

func (r *MessageRouter) AppendMessage(topic p2p.Topic, msg p2p.MessageValue)

AppendMessage always appends a new message into buffer, if the message buffer is full, it will evicits the oldest message

func (*MessageRouter) Tick

func (r *MessageRouter) Tick(ctx context.Context) error

Tick should be called periodically, it receives message from buffer and route it

type MockHandle

type MockHandle = master.MockHandle

MockHandle is a mock for WorkerHandle. Re-exported for testing.

type MockMasterImpl

type MockMasterImpl struct {
	mock.Mock

	*DefaultBaseMaster
	// contains filtered or unexported fields
}

MockMasterImpl implements a mock MasterImpl

func NewMockMasterImpl

func NewMockMasterImpl(t *testing.T, masterID, id frameModel.MasterID) *MockMasterImpl

NewMockMasterImpl creates a new MockMasterImpl instance

func (*MockMasterImpl) CloseImpl

func (m *MockMasterImpl) CloseImpl(ctx context.Context)

CloseImpl implements MasterImpl.CloseImpl

func (*MockMasterImpl) GetFrameMetaClient

func (m *MockMasterImpl) GetFrameMetaClient() pkgOrm.Client

GetFrameMetaClient returns the framework meta client.

func (*MockMasterImpl) InitImpl

func (m *MockMasterImpl) InitImpl(ctx context.Context) error

InitImpl implements MasterImpl.InitImpl

func (*MockMasterImpl) MasterClient

func (m *MockMasterImpl) MasterClient() *client.MockServerMasterClient

MasterClient returns internal server master client

func (*MockMasterImpl) OnMasterRecovered

func (m *MockMasterImpl) OnMasterRecovered(ctx context.Context) error

OnMasterRecovered implements MasterImpl.OnMasterRecovered

func (*MockMasterImpl) OnWorkerDispatched

func (m *MockMasterImpl) OnWorkerDispatched(worker WorkerHandle, result error) error

OnWorkerDispatched implements MasterImpl.OnWorkerDispatched

func (*MockMasterImpl) OnWorkerMessage

func (m *MockMasterImpl) OnWorkerMessage(worker WorkerHandle, topic p2p.Topic, message interface{}) error

OnWorkerMessage implements MasterImpl.OnWorkerMessage

func (*MockMasterImpl) OnWorkerOffline

func (m *MockMasterImpl) OnWorkerOffline(worker WorkerHandle, reason error) error

OnWorkerOffline implements MasterImpl.OnWorkerOffline

func (*MockMasterImpl) OnWorkerOnline

func (m *MockMasterImpl) OnWorkerOnline(worker WorkerHandle) error

OnWorkerOnline implements MasterImpl.OnWorkerOnline

func (*MockMasterImpl) OnWorkerStatusUpdated

func (m *MockMasterImpl) OnWorkerStatusUpdated(worker WorkerHandle, newStatus *frameModel.WorkerStatus) error

OnWorkerStatusUpdated implements MasterImpl.OnWorkerStatusUpdated

func (*MockMasterImpl) Reset

func (m *MockMasterImpl) Reset()

Reset resets the mock data.

func (*MockMasterImpl) StopImpl

func (m *MockMasterImpl) StopImpl(ctx context.Context)

StopImpl implements MasterImpl.StopImpl

func (*MockMasterImpl) Tick

func (m *MockMasterImpl) Tick(ctx context.Context) error

Tick implements MasterImpl.Tick

func (*MockMasterImpl) TickCount

func (m *MockMasterImpl) TickCount() int64

TickCount returns tick invoke time

type MockWorkerHandler

type MockWorkerHandler struct {
	mock.Mock

	WorkerID frameModel.WorkerID
}

MockWorkerHandler implements WorkerHandle, RunningHandle and TombstoneHandle interface

func (*MockWorkerHandler) CleanTombstone

func (m *MockWorkerHandler) CleanTombstone(ctx context.Context) error

CleanTombstone implements TombstoneHandle.CleanTombstone

func (*MockWorkerHandler) GetTombstone

func (m *MockWorkerHandler) GetTombstone() master.TombstoneHandle

GetTombstone implements WorkerHandle.GetTombstone

func (*MockWorkerHandler) ID

ID implements WorkerHandle.ID

func (*MockWorkerHandler) IsTombStone

func (m *MockWorkerHandler) IsTombStone() bool

IsTombStone implements WorkerHandle.IsTombStone

func (*MockWorkerHandler) SendMessage

func (m *MockWorkerHandler) SendMessage(ctx context.Context, topic p2p.Topic, message interface{}, nonblocking bool) error

SendMessage implements RunningHandle.SendMessage

func (*MockWorkerHandler) Status

Status implements WorkerHandle.Status

func (*MockWorkerHandler) Unwrap

Unwrap implements WorkerHandle.Unwrap

type Worker

type Worker interface {
	Init(ctx context.Context) error
	Poll(ctx context.Context) error
	ID() runtime.RunnableID
	Close(ctx context.Context) error
	Stop(ctx context.Context) error
	NotifyExit(ctx context.Context, errIn error) error
}

Worker defines an interface that provides all methods that will be used in runtime(runner container)

type WorkerConfig

type WorkerConfig = interface{}

WorkerConfig stores worker config in any type

type WorkerHandle

type WorkerHandle = master.WorkerHandle

WorkerHandle alias to master.WorkerHandle

type WorkerImpl

type WorkerImpl interface {
	// InitImpl is called as the consequence of CreateWorker from jobmaster or failover,
	// business logic is expected to do initialization here.
	// Return:
	// - error to let the framework call CloseImpl.
	// Concurrent safety:
	// - this function is called as the first callback function of an WorkerImpl
	//   instance, and it's not concurrent with other callbacks.
	InitImpl(ctx context.Context) error

	// Tick is called on a fixed interval after WorkerImpl is initialized, business
	// logic can do some periodic tasks here.
	// Return:
	// - error to let the framework call CloseImpl.
	// Concurrent safety:
	// - this function may be concurrently called with OnMasterMessage.
	Tick(ctx context.Context) error

	// OnMasterMessage is called when worker receives master message, business developer
	// does not need to implement it.
	// TODO: move it out of WorkerImpl and should not be concurrent with CloseImpl.
	OnMasterMessage(ctx context.Context, topic p2p.Topic, message p2p.MessageValue) error

	// CloseImpl is called as the consequence of returning error from InitImpl or
	// Tick, the Tick will be stopped after entering this function. Business logic
	// is expected to release resources here, but business developer should be aware
	// that when the runtime is crashed, CloseImpl has no time to be called.
	// CloseImpl will only be called for once.
	// TODO: no other callbacks will be called after CloseImpl
	// Concurrent safety:
	// - this function may be concurrently called with OnMasterMessage.
	CloseImpl(ctx context.Context)
}

WorkerImpl is the implementation of a worker of dataflow engine. the implementation struct must embed the framework.BaseWorker interface, this interface will be initialized by the framework.

type WorkerType

type WorkerType = frameModel.WorkerType

WorkerType alias to model.WorkerType

Directories

Path Synopsis
internal

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL