workpool

package
v0.0.0-...-3270118 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 15, 2018 License: Apache-2.0 Imports: 10 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type CommandPipe

type CommandPipe struct {
	// contains filtered or unexported fields
}

CommandPipe is used to send text commands (typically it is JSON) to a worker.

func NewCommandPipe

func NewCommandPipe() *CommandPipe

NewCommandPipe is the default factory method for CommandPipe

func (*CommandPipe) Register

func (cp *CommandPipe) Register(cmd *exec.Cmd)

Register connects the pipe with a provided command (Cmd)

func (*CommandPipe) SendBytes

func (cp *CommandPipe) SendBytes(command []byte)

SendBytes sends specified bytes to the pipe

type Master

type Master struct {
	// contains filtered or unexported fields
}

Master handles distribution of tasks to workers and also collecting results.

func NewMaster

func NewMaster(conf *MasterConf) *Master

NewMaster is a standard constructor for Master

func (*Master) GetTask

func (m *Master) GetTask(taskID string) *Task

GetTask returns a specific task identified by task ID. In case there is no such task, nil is returned.

func (*Master) Info

func (m *Master) Info() *MasterInfo

Info returns overview information used on the "info" page of the API server.

func (*Master) Reload

func (m *Master) Reload()

Reload reloads Master and all the workers. This can be used to update service configuration.

func (*Master) SendTask

func (m *Master) SendTask(name string, jsonArgs []byte) *Task

SendTask sends a new task to Master

func (*Master) Start

func (m *Master) Start()

Start initializes all the worker processes and starts to listen for tasks. The function is non-blocking.

func (*Master) Stop

func (m *Master) Stop()

Stop stops all the workers

type MasterConf

type MasterConf struct {

	// PoolSize specifies number of workers
	PoolSize int `json:"poolSize"`

	// Program specifies program name (basically the first
	// element of a command we want to use as a worker)
	Program string `json:"program"`

	// PogramArgs specifies all other command arguments
	// ("the rest" in list terminology)
	ProgramArgs []string `json:"programArgs"`

	// ExecMaxSeconds specifies a time a task have
	// to execute (i.e. we start to count once the
	// task is actually started - not equeued).
	ExecMaxSeconds int `json:"execMaxSeconds"`

	TaskResultPersistMaxSeconds int `json:"taskResultPersistMaxSeconds"`

	MaxResponsePipeBufferSize int `json:"maxResponsePipeBufferSize"`
}

MasterConf is a Master configuration

type MasterInfo

type MasterInfo struct {
	PoolSize    int
	WorkersInfo []WorkerInfo
}

type ResponsePipe

type ResponsePipe struct {
	// contains filtered or unexported fields
}

ResponsePipe is used to receive data from worker command line program. As it is expected that data can be quite large, an internal Scanner can be configured to work with a buffer of a custom size.

func NewResponsePipe

func NewResponsePipe(maxBufferSize int) *ResponsePipe

NewResponsePipe is a default factory for ResponsePipe

func (*ResponsePipe) Channel

func (cp *ResponsePipe) Channel() chan string

Channel returns pipes channel where received text lines are sent.

func (*ResponsePipe) Register

func (cp *ResponsePipe) Register(cmd *exec.Cmd)

Register configures the pipe to work with a specified command (Cmd).

type Task

type Task struct {
	TaskID  string      `json:"taskID"`
	Status  int         `json:"status"`
	Fn      string      `json:"fn"`
	Args    interface{} `json:"args"`
	Error   string      `json:"error"`
	Result  interface{} `json:"result"`
	Created int64       `json:"created"`
	Updated int64       `json:"updated"`
}

func (*Task) AgeSecons

func (t *Task) AgeSecons() int

func (*Task) IsDone

func (t *Task) IsDone() bool

func (*Task) SecondsSinceUpdate

func (t *Task) SecondsSinceUpdate() int

func (*Task) String

func (t *Task) String() string

func (*Task) Touch

func (t *Task) Touch()

type Worker

type Worker struct {
	// contains filtered or unexported fields
}

Worker controls execution of an external task (program) via Cmd. The task must be able to receive commands via its standard input and response its status via its standard output. In general we suppose that each line in the streams represents either a command or a response. It means that the data must be encoded so possible new line characters which are part of commands do not split a single command into multiple commands.

func NewWorker

func NewWorker(workerEvent chan *WorkerStatus, maxResponsePipeBufferSize int, command string, args ...string) *Worker

NewWorker is a default factory for Worker

func (*Worker) Call

func (w *Worker) Call(taskID string, fn string, args interface{})

Call sends a speicifed command to the worker. Generally, the args is expected to be JSON-encodable data. Konserver does not care about it contents and just passes it to the worker.

func (*Worker) GetPID

func (w *Worker) GetPID() int

GetPID returns actual PID of a respective external task. If nothing is running yet then -1 is returned.

func (*Worker) Info

func (w *Worker) Info() WorkerInfo

func (*Worker) Reload

func (w *Worker) Reload()

Reload sends SIGHUP to the running task

func (*Worker) Start

func (w *Worker) Start()

Start runs the Worker - both communication in-memory pipes are set and the Worker is listening via a specific channel to responses of the task.

func (*Worker) Stop

func (w *Worker) Stop()

Stop kills the external task

func (*Worker) String

func (w *Worker) String() string

type WorkerInfo

type WorkerInfo struct {
	PID        int
	LastStatus string
	TaskID     string
}

type WorkerStatus

type WorkerStatus struct {
	TaskID    string      `json:"taskID"`
	Status    int         `json:"status"`
	Error     string      `json:"error"`
	Traceback []string    `json:"traceback"`
	Result    interface{} `json:"result"`
	// contains filtered or unexported fields
}

WorkerStatus describes current state and task (if applicable) info.

func (*WorkerStatus) IsDone

func (ws *WorkerStatus) IsDone() bool

func (*WorkerStatus) ReadableStatus

func (ws *WorkerStatus) ReadableStatus() string

func (*WorkerStatus) Worker

func (ws *WorkerStatus) Worker() *Worker

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL