taskmanager

package
v0.0.0-...-7558982 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 13, 2018 License: MIT Imports: 20 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Command

type Command struct {
	Type         string                 `json:"type"`
	TaskName     string                 `json:"taskname,omitempty"`
	Params       map[string]interface{} `json:"params,omitempty"`
	Timeout      int64                  `json:"timeout"`
	ReplyChannel chan CommandReply
}

Command sent on the command channel. Might be specific to a task or generic. The type can be one of 'status', 'set', 'stop', 'listworkers', 'stopworkers' or 'stoppedworkers'

func (*Command) Broadcast

func (cmd *Command) Broadcast(outChannels map[int]chan Command) bool

Broadcast the command to other channels, wait for all the replies and close the channel

func (*Command) Fail

func (cmd *Command) Fail(msg string) bool

Fail sends a Reply with a failure message

func (*Command) Forward

func (cmd *Command) Forward(outChannel chan Command) bool

Forward the command to another channel, wait for the reply and close the channel

func (*Command) SafeReply

func (cmd *Command) SafeReply(reply CommandReply) error

SafeReply sends a reply but recovers from the panic if the output channel is closed

func (*Command) SafeSend

func (cmd *Command) SafeSend(out chan Command) error

SafeSend attempts sending a command to an output channel, recovering from the panic if the channel was closed in the meanwhile

func (Command) Send

func (cmd Command) Send(outChannel chan Command) string

Send the Command and get the response(s) as string

func (Command) String

func (cmd Command) String() string

Implement String() interface

func (*Command) Success

func (cmd *Command) Success(msg string) bool

Success sends a Reply with a success message

func (Command) ToJSON

func (cmd Command) ToJSON() []byte

ToJSON converts an interaction object to a JSON byte array

type CommandReply

type CommandReply struct {
	Reply  string
	Params map[string]interface{}
	Error  error
}

CommandReply is the type of a reply on the ReplyChannel for a Command. It contains the successful response (string) or an error on command failure

func (CommandReply) String

func (r CommandReply) String() string

String implements the Stringer interface

type HTTPHandler

type HTTPHandler struct {
	Host           string
	Port           int
	CommandChannel chan Command
	Logger         *log.Logger
}

HTTPHandler holds the configuration for the HTTP handler

func (*HTTPHandler) Run

func (handler *HTTPHandler) Run()

Run the HTTP handler, which exposes an HTTP interface to control tasks

type JobqueueKeepAliveHandler

type JobqueueKeepAliveHandler struct {
	Host   string
	Port   int
	Topic  string
	Logger *log.Logger
}

JobqueueKeepAliveHandler contains the configuration for the Keep-Alive handler

func (*JobqueueKeepAliveHandler) Run

func (handler *JobqueueKeepAliveHandler) Run(keepalives chan<- KeepAlive)

Run method: Listen to keep-alive messages sent via ZeroMQ and forward them to a channel

type KeepAlive

type KeepAlive struct {
	Pid      int
	TaskName string
}

KeepAlive contains the Pid of the process and the name of the Task

func (KeepAlive) String

func (x KeepAlive) String() string

Implement String() interface

type KeepAliveConf

type KeepAliveConf struct {
	InboundPort  int    `json:"inbound_port,omitempty"`
	InternalPort int    `json:"internal_port,omitempty"`
	Host         string `json:"host,omitempty"`
	StallTimeout int64  `json:"stall_timeout,omitempty"`
	GracePeriod  int64  `json:"grace_period,omitempty"`
}

KeepAliveConf contains the configuration for Keep-Alive handler and ZeroMQ channel

type Runner

type Runner struct {
	Conf TaskManagerConf
	// contains filtered or unexported fields
}

Runner is a container for Task Managers

func NewRunner

func NewRunner(taskMgrConf TaskManagerConf) (Runner, error)

NewRunner Returns an instance of a Task Manager Runner

func (*Runner) ListTasks

func (taskRunner *Runner) ListTasks() []string

ListTasks - List available tasks

func (*Runner) Run

func (taskRunner *Runner) Run()

Run a task, and keep its workers at the desired cardinality

type SignalHandler

type SignalHandler struct {
	CommandChannel chan Command
	Logger         *log.Logger
	ForceTimeout   int64
}

SignalHandler wrap the command channel

func (*SignalHandler) Run

func (handler *SignalHandler) Run()

Run the Signal handler, to intercept interrupts and shut down processes cleanly

type TaskManager

type TaskManager struct {
	Name          string   `json:"name,omitempty"`           // task name
	Cmd           string   `json:"cmd,omitempty"`            // cli command
	Args          []string `json:"args,omitempty"`           // cli args
	Cardinality   int      `json:"cardinality,omitempty"`    // number of workers
	StallTimeout  int64    `json:"stall_timeout,omitempty"`  // consider the worker dead if no keep-alives are received for this period (ms)
	GracePeriod   int64    `json:"grace_period,omitempty"`   // grace period (ms) before killing a worker after being asked to stop
	AutoStart     bool     `json:"autostart,omitempty"`      // whether to start the task automatically
	CaptureOutput bool     `json:"capture_output,omitempty"` // whether to capture the output and send it to stdout
	Active        bool
	// contains filtered or unexported fields
}

A TaskManager is a process manager for a specific task, keeping the cardinality of the number of worker processes to the desired value, and managing keep-alives

func NewTaskManager

func NewTaskManager(name string, keepAliveConf KeepAliveConf, feedback chan Command) TaskManager

NewTaskManager creates a new Task Manager instance

func (*TaskManager) CopyFrom

func (task *TaskManager) CopyFrom(autotask TaskManager, tPath string)

CopyFrom Updates settings for a task manager from another task manager

func (*TaskManager) ListWorkers

func (task *TaskManager) ListWorkers() []string

ListWorkers returns status information from each worker process for this task

func (*TaskManager) MaintainWorkerCardinality

func (task *TaskManager) MaintainWorkerCardinality() error

MaintainWorkerCardinality keeps the number of workers to the desired cardinality

func (*TaskManager) RunCommand

func (task *TaskManager) RunCommand(cmd Command)

RunCommand runs a command on this task. Results are sent to the reply channel of the command itself

func (*TaskManager) Set

func (task *TaskManager) Set(cmd Command)

Set task options (only "cardinality" is supported ATM)

func (*TaskManager) Start

func (task *TaskManager) Start(commands chan Command, cmd Command)

Start the workers for this task

func (*TaskManager) StartWorker

func (task *TaskManager) StartWorker() error

StartWorker creates a new worker process

func (*TaskManager) Status

func (task *TaskManager) Status() (ret map[string]string)

Status gets the status for this task (number of active workers, last alive TS, etc.)

func (*TaskManager) Stop

func (task *TaskManager) Stop()

Stop asks all of this task's workers stop gracefully (or forcefully if they don't terminate in a timely fashion)

func (*TaskManager) StopWorker

func (task *TaskManager) StopWorker(pid int, ch chan Command)

StopWorker sends a SIGTERM signal to the worker process identified by the given pid

func (*TaskManager) StopWorkers

func (task *TaskManager) StopWorkers()

StopWorkers asks all workers for this task to stop and waits for them to terminate

func (*TaskManager) StopWorkersByPid

func (task *TaskManager) StopWorkersByPid(pids []int)

StopWorkersByPid asks all workers in the pid list to stop and waits for them to terminate

type TaskManagerConf

type TaskManagerConf struct {
	Path         string                 `json:"path,omitempty"`
	FileSuffix   string                 `json:"filesuffix,omitempty"`
	Port         int                    `json:"port,omitempty"`
	Autotasks    map[string]TaskManager `json:"autotasks,omitempty"`
	Keepalives   KeepAliveConf          `json:"keepalives,omitempty"`
	ForceTimeout int64                  `json:"force_timeout"`
	Profiler     profiler.Config        `json:"profiler"`
}

TaskManagerConf contains the configuration for the Task Manager

type Worker

type Worker struct {
	Taskname            string         `json:"taskname"`
	Command             string         `json:"command,omitempty"`
	Args                []string       `json:"args,omitempty"`
	CaptureOutput       bool           `json:"capture_output,omitempty"`
	StallTimeout        int64          `json:"stall_timeout,omitempty"`
	GracePeriod         int64          `json:"grace_period,omitempty"` // grace period (ms) before killing a worker after being asked to stop
	Pid                 int            `json:"pid,omitempty"`
	StartedAt           time.Time      `json:"started_at,omitempty"`
	LastAliveAt         time.Time      `json:"last_alive_at,omitempty"`
	Logger              *log.Logger    `json:"-"` // don't export
	TaskFeedbackChannel chan<- Command `json:"-"` // don't export
	CommandsChannel     chan Command   `json:"-"` // don't export
	// contains filtered or unexported fields
}

Worker manages a single worker process

func (*Worker) HasStalled

func (w *Worker) HasStalled() bool

HasStalled checks if the worker process is alive and has sent a keep-alive message recently

func (*Worker) Info

func (w *Worker) Info(replyChan chan<- CommandReply)

Info returns the status of the current worker process

func (*Worker) IsProcessAlive

func (w *Worker) IsProcessAlive() bool

IsProcessAlive checks if the process is still around @see http://stackoverflow.com/questions/15204162/check-if-a-process-exists-in-go-way

func (*Worker) Start

func (w *Worker) Start() (*WorkerInfo, error)

Start spawns a new worker process

func (*Worker) Stop

func (w *Worker) Stop(replyChan chan<- CommandReply)

Stop the worker process

func (*Worker) String

func (w *Worker) String() string

String implements the Stringer interface

type WorkerInfo

type WorkerInfo struct {
	Pid              int
	KeepAliveChannel chan KeepAlive
	CommandChannel   chan Command
}

WorkerInfo is a wrapper for the process pid and the channels to communicate with the task manager

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL