tasks

package
v0.0.11 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 8, 2022 License: MIT Imports: 10 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// The size of the buffered channels used by the TaskManager
	ManagerBufferSize uint64 = 1024
	// Timeout in seconds for the network interactions
	ManagerNetworkTimeout time.Duration = 1 * time.Second
	// Time in which the scheduler it's going to enter in the main loop to spawn tasks
	ManagerProcessingTime time.Duration = 3 * time.Second
	// how many blocks after we sent a kill sign to a task to consider it
	// unresponsive and removing from the scheduler mapping
	ManagerHeightToleranceBeforeRemoving uint64 = 50
	// how many block we wait before removing a response from the TaskManager
	ManagerResponseToleranceBeforeRemoving uint64 = 50
)
View Source
const (
	// Common errors.
	ErrorLoadingDkgState              = "error loading dkgState: %v"
	ErrorDuringPreparation            = "error during the preparation: %v"
	ErrorGettingAccusableParticipants = "error getting accusableParticipants: %v"
	ErrorGettingValidators            = "error getting validators: %v"
	FailedGettingTxnOpts              = "failed getting txn opts: %v"
	FailedGettingCallOpts             = "failed getting call opts: %v"
	FailedGettingIsValidator          = "failed getting isValidator: %v"
	NobodyToAccuse                    = "nobody to accuse"
)
View Source
const (
	// How much time we are going to poll to check if the task is completed
	ExecutorPoolingTime time.Duration = 7 * time.Second
)

Variables

View Source
var (
	ErrTaskKilled                   = errors.New("the task was killed, aborting execution")
	ErrTaskExecutionMechanismClosed = errors.New("tasks execution mechanism is closed, aborting execution")
)

Functions

This section is empty.

Types

type BaseTask

type BaseTask struct {

	// Unique Id of the task
	ID string `json:"id"`
	// Task name/type
	Name string `json:"name"`
	// If this task can be executed in parallel with other tasks of the same type/name
	AllowMultiExecution bool `json:"allowMultiExecution"`
	// Subscription options (if the task should be retried, finality delay, etc)
	SubscribeOptions *transaction.SubscribeOptions `json:"subscribeOptions"`
	// Which block the task should be started. In case the start is 0 the task is
	// started immediately.
	Start uint64 `json:"start"`
	// Which block the task should be ended. In case the end is 0 the task runs
	// forever (until the task succeeds, or it's killed, be careful when using this).
	// Otherwise, the task will end at the specified block.
	End uint64 `json:"end"`
	// contains filtered or unexported fields
}

func NewBaseTask

func NewBaseTask(start uint64, end uint64, allowMultiExecution bool, subscribeOptions *transaction.SubscribeOptions) *BaseTask

NewBaseTask creates a new Base task. BaseTask should be the base of any task. This function is called outside the scheduler to create the object to be scheduled.

func (*BaseTask) Finish

func (bt *BaseTask) Finish(err error)

Finish executes the cleanup logic once a task finishes.

func (*BaseTask) GetAllowMultiExecution

func (bt *BaseTask) GetAllowMultiExecution() bool

GetAllowMultiExecution returns if a task type allows multiple execution.

func (*BaseTask) GetClient

func (bt *BaseTask) GetClient() layer1.Client

GetClient returns the layer1 client implemented by the task.

func (*BaseTask) GetContractsHandler

func (bt *BaseTask) GetContractsHandler() layer1.AllSmartContracts

GetContractsHandler returns the handler that has access to all different layer1 smart contracts.

func (*BaseTask) GetDB

func (bt *BaseTask) GetDB() *db.Database

GetDB returns the database where the task can save and load its state.

func (*BaseTask) GetEnd

func (bt *BaseTask) GetEnd() uint64

GetEnd gets the end date in blocks of a task. In case 0, the task does not have an end block.

func (*BaseTask) GetId

func (bt *BaseTask) GetId() string

GetId gets the task unique ID.

func (*BaseTask) GetLogger

func (bt *BaseTask) GetLogger() *logrus.Entry

GetLogger returns the task logger.

func (*BaseTask) GetName

func (bt *BaseTask) GetName() string

GetName get the name of the task.

func (*BaseTask) GetStart

func (bt *BaseTask) GetStart() uint64

GetStart gets the start date of a task. Returns 0 if a task does not have a start date (started immediately).

func (*BaseTask) GetSubscribeOptions

func (bt *BaseTask) GetSubscribeOptions() *transaction.SubscribeOptions

GetSubscribeOptions gets the transactionWatcher subscribeOptions specific for a task.

func (*BaseTask) Initialize

func (bt *BaseTask) Initialize(database *db.Database, logger *logrus.Entry, eth layer1.Client, contracts layer1.AllSmartContracts, name string, id string, start uint64, end uint64, allowMultiExecution bool, subscribeOptions *transaction.SubscribeOptions, taskResponseChan InternalTaskResponseChan) error

Initialize initializes the task after its creation. It should be only called by the task scheduler during task spawn as separated go routine. This function all the parameters for task execution and control by the scheduler.

func (*BaseTask) Kill added in v0.0.8

func (bt *BaseTask) Kill()

Kill a running task. This only can be done once.

func (*BaseTask) KillChan added in v0.0.8

func (bt *BaseTask) KillChan() <-chan struct{}

KillChan returns a channel that is closed when the Task was killed.

func (*BaseTask) Lock added in v0.0.7

func (bt *BaseTask) Lock()

func (*BaseTask) Unlock added in v0.0.7

func (bt *BaseTask) Unlock()

func (*BaseTask) WasKilled

func (bt *BaseTask) WasKilled() bool

WasKilled return true if the task was killed.

type InternalTaskResponseChan added in v0.0.7

type InternalTaskResponseChan interface {
	Add(id string, err error)
}

InternalTaskResponseChan to be implemented by a response channel used for communication between the TaskManager and TaskExecutor.

type Task

type Task interface {
	Lock()
	Unlock()
	Initialize(database *db.Database, logger *logrus.Entry, eth layer1.Client, contracts layer1.AllSmartContracts, name string, id string, start uint64, end uint64, allowMultiExecution bool, subscribeOptions *transaction.SubscribeOptions, taskResponseChan InternalTaskResponseChan) error
	Prepare(ctx context.Context) *TaskErr
	Execute(ctx context.Context) (*types.Transaction, *TaskErr)
	ShouldExecute(ctx context.Context) (bool, *TaskErr)
	Finish(err error)
	Kill()
	KillChan() <-chan struct{}
	WasKilled() bool
	GetId() string
	GetStart() uint64
	GetEnd() uint64
	GetName() string
	GetAllowMultiExecution() bool
	GetSubscribeOptions() *transaction.SubscribeOptions
	GetClient() layer1.Client
	GetContractsHandler() layer1.AllSmartContracts
	GetLogger() *logrus.Entry
}

Task to be implemented by every task to be used by TaskHandler.

type TaskErr

type TaskErr struct {
	// contains filtered or unexported fields
}

func NewTaskErr

func NewTaskErr(message string, isRecoverable bool) *TaskErr

func (*TaskErr) Error

func (e *TaskErr) Error() string

func (*TaskErr) IsRecoverable

func (e *TaskErr) IsRecoverable() bool

type TaskState

type TaskState interface {
	PersistState(txn *badger.Txn) error
	LoadState(txn *badger.Txn) error
}

TaskState to be implemented by every task for persistence.

Directories

Path Synopsis
dkg
tests

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL