executor

package
v0.0.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 30, 2023 License: MIT Imports: 10 Imported by: 0

Documentation

Overview

Package executor contains basic implementation of an executor framework. The client facing interface is provided by the ExecutionService which creates ExecutorPool and Dispatcher. Task interface is defined so client would implement these methods on client specific Type to undertake specific work. When a task is submitted to the ExecutionService it submits it to the Dispatcher which assigns a channel on which task execution result / response is received. Along with attaching a channel to the task, it creates a separate routine to listen to response arrival. If the submitted task is blocking the calling routine is actually waiting for the response so Dispatcher returns the response upon receiving as well as undertakes the house keeping work of recycling the channel for another task. If the submitted task is asynchronous, caller is returned upon task submission even though there is a routine waiting for the response to undertake the house keeping.

ExecutorPool maintains a set of executors for async tasks and another one for blocking tasks.

MIT License Author: Umesh Patil, Neosemantix, Inc.

Index

Constants

View Source
const DefaultCfgFileName = "default-cfg.json"

Name of a configuration file which contains default values; in the same folder where you would find execution-service.go. If user allows to use the default configuration, then in absence of user provided configuration values in this file will be used.

View Source
const ExecServiceCfgJsonElementName = "ExecServiceSettings"

Name of the Json element in any Json Configuration file which contains ExecServiceCfg structure value. Note that we do not support only part settings, the constant refers to a Json segment which will contain values for all 3 config structures.

View Source
const TaskStatusCompletedFailed = 500
View Source
const TaskStatusCompletedSuccessfully = 200
View Source
const TaskStatusFailedToSubmit = 1
View Source
const TaskStatusNotSubmitted = 0
View Source
const TaskStatusSubmitted = 100
View Source
const TestTaskExecDurationLowerLimit = 8
View Source
const TestTaskExecDurationUpperLimit = 128

Variables

View Source
var GlobalRand *rand.Rand
View Source
var StaticBox *packr.Box

Functions

func GetRandomBoolean

func GetRandomBoolean() bool

func RandomTestTaskExecTime

func RandomTestTaskExecTime() int

func SetupRand

func SetupRand()

func SetupTestTask

func SetupTestTask()

Types

type Dispatcher

type Dispatcher struct {
	JobStats *TaskStats
	// contains filtered or unexported fields
}

Dispatcher type which hold reference to executor pool, channels used for getting back task execution results and go routines waiting on task results.

func NewDispatcher

func NewDispatcher(cfg DispatcherCfg, ep *ExecutorPool) *Dispatcher

create a dispatcher with the given number of Response channel counts max channel count should be equal to number of executors in the executor pool if tasks are generally very short running, channels can be less we each dedicated channel for each task execution

task submission on the dispatcher happens in the calling 'go routine / thread' (Alternate design could be waiting tasks are in map, we use one single fixed channel on which responses for all tasks are published and on the receiving side based on task id of the response object, we match waiting tasks in the map (key task id) and release the response.)

cc: how many channels to create to listen back task result

cp: capacity - buffer size - for each channel

ep: executor pool

wfc: whether to block the submission for availability a channel to hear back the task result. It does not apply for async tasks.

func (*Dispatcher) Start

func (disp *Dispatcher) Start()

func (*Dispatcher) Stop

func (disp *Dispatcher) Stop()

func (*Dispatcher) Submit

func (disp *Dispatcher) Submit(tsk Task) (error, *Response)

type DispatcherCfg

type DispatcherCfg struct {

	// Number of channels used to receive back task execution results
	ChannelCount int `json:"channel_count"`

	// Channel buffer size
	ChannelCapacity int `json:"channel_capacity"`

	// Whether caller should wait for response channel availability while
	// submitting a task
	WaitForChanAvail bool `json:"wait_for_chan_avail"`
}

type ExecCfg

type ExecCfg struct {

	// How many maximum number tasks accepted by the executor when it is
	// already executing a task. These tasks will form the queue.
	TaskQueueCapacity int `json:"task_queue_capacity"`

	// If true, despite the full task queue capacity, caller invoking
	// Submit method will wait i.e. will be blocked. By default we keep it
	// false. So once the task queue is full, subsequent attempts to add a task
	// will fail as long as the queue if filled.
	WaitForAvailability bool `json:"wait_for_availability"`
}

Executor configuration parameters

type ExecPoolCfg

type ExecPoolCfg struct {

	// Number of executors which will be used to handle async tasks
	AsyncTaskExecutorCount int `json:"async_task_executor_count"`

	// Number of executors which will be used to hand blocking task,
	// caller is waiting for the execution result.
	BlockingTaskExecutorCount int `json:"blocking_task_executor_count"`
}

type ExecServiceCfg

type ExecServiceCfg struct {
	Dispatcher DispatcherCfg `json:"DispatcherSettings"`
	ExexPool   ExecPoolCfg   `json:"ExecPoolSettings"`
	Executor   ExecCfg       `json:"ExecutorSettings"`
	Monitoring MonitoringCfg `json:"MonitoringSettings"`
}

Configuration for the entire execution service which comprises of configuration for Dispatcher, Executor Pool and for each Executor.

func (*ExecServiceCfg) MakeExecServiceFromCfg added in v0.0.4

func (esc *ExecServiceCfg) MakeExecServiceFromCfg() *ExecutionService

Start a new execution service from the given configuration. For the returned execution service, the given cfg is in use.

type ExecutionService

type ExecutionService struct {
	Monitor         *util.Monitor // exposed for testing purposes
	ServiceCfgInUse *ExecServiceCfg
	// contains filtered or unexported fields
}

func NewExecutionService

func NewExecutionService(cfgFileName string, useDefault bool) *ExecutionService

Caller can pass the configuration file name which will contain all parameters needed to start the execution service. The file will be searched in the directory as pointed by the environmental variable GO_CFG_HOME. If the environmental variable is not set or file is not found; caller can indicate whether default configuration file is to be used or not. If configuration is found, method returns with a Fatal Log call.

func (*ExecutionService) CloneCfg added in v0.0.4

func (es *ExecutionService) CloneCfg() *ExecServiceCfg

Clone the configuration in use of the given execution service

func (ExecutionService) GetData added in v0.0.4

func (es ExecutionService) GetData() util.Blob

func (ExecutionService) Name added in v0.0.4

func (es ExecutionService) Name() string

func (*ExecutionService) Start

func (es *ExecutionService) Start()

func (*ExecutionService) Stop

func (es *ExecutionService) Stop()

func (*ExecutionService) Submit

func (es *ExecutionService) Submit(tsk Task) (error, *Response)

type Executor

type Executor interface {
	Start()

	Submit(t Task) error

	HowManyInQueue() int

	WaitForAvailability(wfa bool)

	Stop()
}

We start with core Executor contract as an interface. As expected it has common methods like Start, Stop and Submit to receive a task. User can also specify whether we wait for availability of an internal buffer to accept the incoming task.

func NewExecutor

func NewExecutor(cfg ExecCfg) Executor

type ExecutorPool

type ExecutorPool struct {
	// contains filtered or unexported fields
}

Holds two separate arrays of executors - one for blocking tasks and the other for async execution. ExecutorPool also fulfills the actual Executor contract: Start, Stop, Submit and other methods. That makes it consistent.

func NewExecutorPool

func NewExecutorPool(epCfg ExecPoolCfg, cfg ExecCfg) *ExecutorPool

async: how many executors for execution of async tasks blocked: how many executors for execution of blocked tasks wfa: wait for availability in the queue for an executor

func (*ExecutorPool) HowManyInQueue

func (es *ExecutorPool) HowManyInQueue() int

func (*ExecutorPool) Start

func (es *ExecutorPool) Start()

func (*ExecutorPool) Stop

func (es *ExecutorPool) Stop()

func (*ExecutorPool) Submit

func (es *ExecutorPool) Submit(tsk Task) error

func (*ExecutorPool) TotalExecutorCount

func (es *ExecutorPool) TotalExecutorCount() int

type MonitoringCfg added in v0.0.4

type MonitoringCfg struct {
	MonitoringFrequency int `json:"MonitoringFrequency"`
	MonDataChanBufSz    int `json:"ChannelBufferSize"`
}

Configuration about how the monitoring is done at runtime.

type Response

type Response struct {
	// Id of the task to which this response corresponds to
	TaskId int

	// Task status - whether it succeeded or failed or some other state
	Status int

	// Output of the successfully executed task.
	// For now we assume it will be a JSON string
	Result string

	Errors []error
}

func FailedToSubmitResponse

func FailedToSubmitResponse(tid int) *Response

func NewResponse

func NewResponse(tid int) *Response

type Task

type Task interface {
	GetId() int

	Execute() Response

	SetRespChan(rc chan Response)

	GetRespChan() chan Response

	IsBlocking() bool
}

Basic interface client of executor module should implement so as to get the work done. It has standard id and core execute methods. Also it needs to carry the channel with it on which the result of execution will be reported. It will also reveal whether it is a blocking task or not.

type TaskStats added in v0.0.4

type TaskStats struct {
	sync.Mutex
	UpSinceWhen            time.Time `json:"up_since_when"`
	TotalTasksSubmitted    int       `json:"total_tasks_submitted"`
	BlockingTasksSubmitted int       `json:"blocking_tasks_submitted"`
	AsyncTasksSubmitted    int       `json:"async_tasks_submitted"`
	TasksInExecution       int       `json:"tasks_in_execution"`
}

It tracks various task statistics submitted and executed through a dispatcher.

type TestTask

type TestTask struct {
	// contains filtered or unexported fields
}

func NewBlockingTestTask added in v0.0.4

func NewBlockingTestTask(ed int, blocking bool) *TestTask

func NewTestTask

func NewTestTask(ed int) *TestTask

func (*TestTask) Execute

func (tt *TestTask) Execute() Response

func (*TestTask) GetId

func (tt *TestTask) GetId() int

func (*TestTask) GetRespChan

func (tt *TestTask) GetRespChan() chan Response

func (*TestTask) IsBlocking

func (tt *TestTask) IsBlocking() bool

func (*TestTask) SetRespChan

func (tt *TestTask) SetRespChan(rc chan Response)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL