overseer

package module
v1.0.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 28, 2022 License: MIT Imports: 16 Imported by: 0

README


Overseer logo

Overseer

Project name Build status Coverage report Go Report Card Go Reference

Simple process manager library.

  • Note: The master branch is the development branch. To make sure you use the correct version, use the repository tags.

At the heart of this library is the os/exec.Cmd from Go-lang and the first wrapper for that is the Cmd struct.
The Overseer struct can supervise one or more Cmds running at the same time.
You can safely run multiple Overseer instances at the same time.

There are 3 states in the normal lifecycle of a proc: starting, running, finished.
If the process is killed prematurely, the states are: starting, running, interrupted.
If the process cannot start, the states are: starting, fatal.

Overseer API

Setting up a logger is optional, but if you want to use it, it must be called before creating a new Overseer.
By default, the logger is DefaultLogger from ShinyTrinkets/meta-logger/default.go.
To disable the logger completely, you need to create a Logger interface (with functions Info and Error) that don't do anything.

  • NewOverseer() - Returns a new instance of the Overseer process manager.
  • Add(id string, exec string, args ...interface{}) - Register a proc, without starting it. The id must be unique. The name of the executable is exec. The args of the executable are args.
  • Remove(id string) - Unregister a proc, only if it's not running. The id must be unique.
  • SuperviseAll() - This is the main function. Supervise all registered processes and block until they finish. This includes killing all the processes when the main program exits. The function can be called again, after all the processes are finished. The status of the running processes can be watched live with the Watch() function.
  • Supervise(id string) - Supervise one registered process and block until it finishes. This includes checking if the process was killed from the outside, delaying the start and restarting in case of failure (failure means the program has an exit code != 0 or it ran with errors). The function can be called again, after the process is finished.
  • Watch(outputChan chan *ProcessJSON) - Subscribe to all state changes via the provided output channel. The channel will receive status changes for all the added procs, but you can easily identify the one your are interested in from the ID, Group, etc. Note that for each proc you will receive only 2 or 3 messages that represent all the possible states (eg: starting, running, finished).
  • UnWatch(outputChan chan *ProcessJSON) - Un-subscribe from the state changes, by un-registering the channel.
  • Stop(id string) - Stops the process by sending its process group a SIGTERM signal and resets RetryTimes to 0 so the process doesn't restart.
  • Signal(id string, sig syscall.Signal) - Sends an OS signal to the process group.
  • StopAll(kill bool) - Cycles and stops all processes. If "kill" is false, all procs receive SIGTERM to allow a graceful shut down. If "kill" is true, all procs receive SIGKILL and they are killed immediately.

Cmd API

It's recommended to use the higher level Overseer, instead of Cmd directly.
If you use Cmd directly, keep in mind that it is one use only. After starting a instance, it cannot be started again. However, you can Clone your instance and start the clone. The Supervise method from the Overseer does all of that for you.

  • NewCmd(name string, args ...interface{}) - Returns a new instance of Cmd.
  • Clone() - Clones a Cmd. All the options are copied, but the state of the original object is lost.
  • Start() - Starts the command and immediately returns a channel that the caller can use to receive the final Status of the command when it ends. The function can only be called once.
  • Stop() - Stops the command by sending its process group a SIGTERM signal.
  • Signal(sig syscall.Signal) - Sends an OS signal to the process group.
  • Status() - Returns the Status of the command at any time. The Status struct contains: PID, Exit code, Error (if it's the case) Start and Stop timestamps, Runtime in seconds.
  • IsInitialState() - true if the Cmd is in initial state.
  • IsRunningState() - true if the Cmd is starting, or running.
  • IsFinalState() - true if the Cmd is in a final state.

Project highlights

  • real-time status
  • real-time stdout and stderr
  • complete and consolidated return
  • easy to track process state
  • proper process termination on program exit
  • portable command line binary for managing procs
  • heavily tested, very good test coverage
  • no race conditions

For examples of usage, please check the Examples folder, the manager tests, the Overseer command line app, or the Spinal app.

Similar libraries

Icon is made by Freepik from www.flaticon.com and licensed by CC 3.0 BY.


License

MIT © Cristi Constantin.

Documentation

Overview

Package overseer ; cmd runs external commands with concurrent access to output and status. It wraps the Go standard library os/exec.Command to correctly handle reading output (STDOUT and STDERR) while a command is running and killing a command. All operations are safe to call from multiple goroutines.

Credit: https://github.com/go-cmd/cmd Copyright (c) 2017 go-cmd & contribuitors The architecture is quite heavily modified from the original version

A basic example that runs env and prints its output:

import (
    "fmt"
    cmd "https://github.com/ShinyTrinkets/overseer"
)

func main() {
    // Create a Cmd
    envCmd := cmd.NewCmd("env")

    // Run and wait for Cmd to return Status
    status := <-envCmd.Start()

    // Print each line of STDOUT from Cmd
    for _, line := range status.Stdout {
        fmt.Println(line)
    }
}

Commands can be ran synchronously (blocking) or asynchronously (non-blocking):

envCmd := cmd.NewCmd("env") // create

status := <-envCmd.Start() // run blocking

statusChan := envCmd.Start() // run non-blocking
// Do other work while Cmd is running...
status <- statusChan // blocking

Start returns a channel to which the final Status is sent when the command finishes for any reason. The first example blocks receiving on the channel. The second example is non-blocking because it saves the channel and receives on it later. Only one final status is sent to the channel; use Done for multiple goroutines to wait for the command to finish, then call Status to get the final status.

Package overseer ;

Index

Constants

View Source
const (
	// DEFAULT_LINE_BUFFER_SIZE is the default size of the OutputStream line buffer.
	// The default value is usually sufficient, but if ErrLineBufferOverflow errors
	// occur, try increasing the size by calling OutputBuffer.SetLineBufferSize.
	DEFAULT_LINE_BUFFER_SIZE = 16384

	// DEFAULT_STREAM_CHAN_SIZE is the default string channel size for a Cmd when
	// Options.Streaming is true. The string channel size can have a minor
	// performance impact if too small by causing OutputStream.Write to block
	// excessively.
	DEFAULT_STREAM_CHAN_SIZE = 1000
)
View Source
const (
	INITIAL = iota
	IDLE
	STARTING
	RUNNING
	STOPPING
	INTERRUPT // final state (used when stopped or signaled)
	FINISHED  // final state (used then was a natural exit)
	FATAL     // final state (used when was an error while starting)

)
View Source
const (
	IDLE_STATE     = "idle"
	DONE_STATE     = "done"
	FATAL_STATE    = "fatal"
	RUNNING_STATE  = "running"
	STOPPING_STATE = "stopping"
	UNKNOWN_STATE  = "unknown"
)

Variables

View Source
var NewPublisher = pubsub.NewPublisher

Functions

func SetupLogBuilder added in v1.0.0

func SetupLogBuilder(b ml.LogBuilderType)

SetupLogBuilder is called to add user's provided log builder. It's just a wrapper around meta-logger SetupLogBuilder.

Types

type Attrs added in v1.0.0

type Attrs = ml.Attrs

Attrs is a type alias

type Backoff

type Backoff struct {
	//Factor is the multiplying factor for each increment step
	Factor float64
	//Jitter eases contention by randomizing backoff steps
	Jitter bool
	//Min and Max are the minimum and maximum values of the counter
	Min, Max time.Duration
	// contains filtered or unexported fields
}

Backoff is a time.Duration counter, starting at Min. After every call to the Duration method the current timing is multiplied by Factor, but it never exceeds Max.

Backoff is not generally concurrent-safe, but the ForAttempt method can be used concurrently.

func (*Backoff) Attempt

func (b *Backoff) Attempt() float64

Attempt returns the current attempt counter value.

func (*Backoff) Duration

func (b *Backoff) Duration() time.Duration

Duration returns the duration for the current attempt before incrementing the attempt counter. See ForAttempt.

func (*Backoff) ForAttempt

func (b *Backoff) ForAttempt(attempt float64) time.Duration

ForAttempt returns the duration for a specific attempt. This is useful if you have a large number of independent Backoffs, but don't want use unnecessary memory storing the Backoff parameters per Backoff. The first attempt should be 0.

ForAttempt is concurrent-safe.

func (*Backoff) Reset

func (b *Backoff) Reset()

Reset restarts the current attempt counter at zero.

type Cmd

type Cmd struct {
	*sync.Mutex

	Name       string
	Group      string
	Args       []string
	Env        []string
	Dir        string
	DelayStart uint        // Nr of milli-seconds to delay the start (used by the manager)
	RetryTimes uint        // Nr of times to restart on failure (used by the manager)
	Stdout     chan string // streaming STDOUT if enabled, else nil (see Options)
	Stderr     chan string // streaming STDERR if enabled, else nil (see Options)
	State      CmdState    // The state of the cmd (stopped, started, etc)
	// contains filtered or unexported fields
}

Cmd represents an external command, similar to the Go built-in os/exec.Cmd. A Cmd cannot be reused after calling Start, but it can be cloned with Clone. To create a new Cmd, call NewCmd.

func NewCmd

func NewCmd(name string, args ...interface{}) *Cmd

NewCmd creates a new Cmd for the given command name and arguments. The command is not started until Start is called.

func (*Cmd) Clone added in v1.0.0

func (c *Cmd) Clone() *Cmd

Clone clones a Cmd. All the options are copied, but the state of the original object is lost.

func (*Cmd) Done

func (c *Cmd) Done() <-chan struct{}

Done returns a channel that's closed when the command stops running. This method is useful for multiple goroutines to wait for the command to finish. Call Status after the command finishes to get its final status.

func (*Cmd) IsFinalState

func (c *Cmd) IsFinalState() bool

IsFinalState returns true if the Cmd is in a final state. Final states are definitive and cannot be exited from.

func (*Cmd) IsInitialState added in v1.0.0

func (c *Cmd) IsInitialState() bool

IsInitialState returns true if the Cmd is in the initial state.

func (*Cmd) IsRunningState added in v1.0.0

func (c *Cmd) IsRunningState() bool

IsRunningState returns true if the Cmd is starting, or running.

func (*Cmd) Signal

func (c *Cmd) Signal(sig syscall.Signal) error

Signal sends OS signal to the process group.

func (*Cmd) Start

func (c *Cmd) Start() <-chan Status

Start starts the command and immediately returns a channel that the caller can use to receive the final Status of the command when it ends. The caller can start the command and wait like,

status := <-myCmd.Start() // blocking

or start the command asynchronously and be notified later when it ends,

statusChan := myCmd.Start() // non-blocking
// Do other work while Cmd is running...
status := <-statusChan // blocking

Exactly one Status is sent on the channel when the command ends. The channel is not closed. Any Go error is set to Status.Error. Start is idempotent; it always returns the same channel.

func (*Cmd) Status

func (c *Cmd) Status() Status

Status returns the Status of the command at any time. It is safe to call concurrently by multiple goroutines.

With buffered output, Status.Stdout and Status.Stderr contain the full output as of the Status call time. For example, if the command counts to 3 and three calls are made between counts, Status.Stdout contains:

"1"
"1 2"
"1 2 3"

The caller is responsible for tailing the buffered output if needed. Else, consider using streaming output. When the command finishes, buffered output is complete and final.

Status.Runtime is updated while the command is running and final when it finishes.

func (*Cmd) Stop

func (c *Cmd) Stop() error

Stop stops the command by sending its process group a SIGTERM signal. Stop makes sure the command doesn't restart, by resetting the retry times. Stop is idempotent. An error should only be returned in the rare case that Stop is called immediately after the command ends but before Start can update its internal state.

type CmdState

type CmdState uint8

CmdState represents a Cmd state

func (CmdState) String

func (p CmdState) String() string

type ErrLineBufferOverflow

type ErrLineBufferOverflow struct {
	Line       string // Unterminated line that caused the error
	BufferSize int    // Internal line buffer size
	BufferFree int    // Free bytes in line buffer
}

ErrLineBufferOverflow is returned by OutputStream.Write when the internal line buffer is filled before a newline character is written to terminate a line. Increasing the line buffer size by calling OutputStream.SetLineBufferSize can help prevent this error.

func (ErrLineBufferOverflow) Error

func (e ErrLineBufferOverflow) Error() string

type LogMsg added in v1.0.0

type LogMsg struct {
	Type uint8
	Text string
}

type Logger added in v1.0.0

type Logger = ml.Logger

Logger is a type alias

type Options

type Options struct {
	Group      string
	Dir        string
	Env        []string
	DelayStart uint
	RetryTimes uint

	// If Buffered is true, STDOUT and STDERR are written to Status.Stdout and
	// Status.Stderr. The caller can call Cmd.Status to read output at intervals.
	// See Cmd.Status for more info.
	Buffered bool

	// If Streaming is true, Cmd.Stdout and Cmd.Stderr channels are created and
	// STDOUT and STDERR output lines are written them in real time. This is
	// faster and more efficient than polling Cmd.Status. The caller must read both
	// streaming channels, else lines are dropped silently.
	Streaming bool
}

Options represents customizations for NewCmd.

type OutputBuffer

type OutputBuffer struct {
	*sync.Mutex
	// contains filtered or unexported fields
}

OutputBuffer represents command output that is saved, line by line, in an unbounded buffer. It is safe for multiple goroutines to read while the command is running and after it has finished. If output is small (a few megabytes) and not read frequently, an output buffer is a good solution.

A Cmd in this package uses an OutputBuffer for both STDOUT and STDERR by default when created by calling NewCmd. To use OutputBuffer directly with a Go standard library os/exec.Command:

import "os/exec"
import cmd "github.com/ShinyTrinkets/overseer"
runnableCmd := exec.Command(...)
stdout := cmd.NewOutputBuffer()
runnableCmd.Stdout = stdout

While runnableCmd is running, call stdout.Lines() to read all output currently written.

func NewOutputBuffer

func NewOutputBuffer() *OutputBuffer

NewOutputBuffer creates a new output buffer. The buffer is unbounded and safe for multiple goroutines to read while the command is running by calling Lines.

func (*OutputBuffer) Lines

func (rw *OutputBuffer) Lines() []string

Lines returns lines of output written by the Cmd. It is safe to call while the Cmd is running and after it has finished. Subsequent calls returns more lines, if more lines were written. "\r\n" are stripped from the lines.

func (*OutputBuffer) Write

func (rw *OutputBuffer) Write(p []byte) (n int, err error)

Write makes OutputBuffer implement the io.Writer interface. Do not call this function directly.

type OutputStream

type OutputStream struct {
	// contains filtered or unexported fields
}

OutputStream represents real time, line by line output from a running Cmd. Lines are terminated by a single newline preceded by an optional carriage return. Both newline and carriage return are stripped from the line when sent to a caller-provided channel.

The caller must begin receiving before starting the Cmd. Write blocks on the channel; the caller must always read the channel. The channel is not closed by the OutputStream.

A Cmd in this package uses an OutputStream for both STDOUT and STDERR when created by calling NewCmd and Options.Streaming is true. To use OutputStream directly with a Go standard library os/exec.Command:

import "os/exec"
import cmd "github.com/ShinyTrinkets/overseer"

stdoutChan := make(chan string, 100)
go func() {
    for line := range stdoutChan {
        // Do something with the line
    }
}()

runnableCmd := exec.Command(...)
stdout := cmd.NewOutputStream(stdoutChan)
runnableCmd.Stdout = stdout

While runnableCmd is running, lines are sent to the channel as soon as they are written and newline-terminated by the command. After the command finishes, the caller should wait for the last lines to be sent:

for len(stdoutChan) > 0 {
    time.Sleep(10 * time.Millisecond)
}

Since the channel is not closed by the OutputStream, the two indications that all lines have been sent and received are the command finishing and the channel size being zero.

func NewOutputStream

func NewOutputStream(streamChan chan string) *OutputStream

NewOutputStream creates a new streaming output on the given channel. The caller must begin receiving on the channel before the command is started. The OutputStream never closes the channel.

func (*OutputStream) Lines

func (rw *OutputStream) Lines() <-chan string

Lines returns the channel to which lines are sent. This is the same channel passed to NewOutputStream.

func (*OutputStream) SetLineBufferSize

func (rw *OutputStream) SetLineBufferSize(n int)

SetLineBufferSize sets the internal line buffer size. The default is DEFAULT_LINE_BUFFER_SIZE. This function must be called immediately after NewOutputStream, and it is not safe to call by multiple goroutines.

Increasing the line buffer size can help reduce ErrLineBufferOverflow errors.

func (*OutputStream) Write

func (rw *OutputStream) Write(p []byte) (n int, err error)

Write makes OutputStream implement the io.Writer interface. Do not call this function directly.

type Overseer

type Overseer struct {
	// contains filtered or unexported fields
}

Overseer structure. For instantiating, it's best to use the NewOverseer() function.

func NewOverseer

func NewOverseer() *Overseer

NewOverseer creates a new process manager. After creating it, Add the procs and call SuperviseAll.

func (*Overseer) Add

func (ovr *Overseer) Add(id string, exec string, args ...interface{}) *Cmd

Add (register) a process, without starting it.

func (*Overseer) Cmd added in v1.0.1

func (ovr *Overseer) Cmd(id string) *Cmd

func (*Overseer) HasProc

func (ovr *Overseer) HasProc(id string) bool

HasProc checks if a proc has been added to the manager.

func (*Overseer) IsRunning added in v1.0.0

func (ovr *Overseer) IsRunning() bool

IsRunning returns True if SuperviseAll is running

func (*Overseer) IsStopping added in v1.0.0

func (ovr *Overseer) IsStopping() bool

IsStopping returns True if StopAll was called and the procs are preparing to close

func (*Overseer) ListAll

func (ovr *Overseer) ListAll() []string

ListAll returns the names of all the procs in alphabetic order.

func (*Overseer) ListGroup added in v1.0.0

func (ovr *Overseer) ListGroup(group string) []string

ListGroup returns the names of all the procs from a specific group.

func (*Overseer) Remove

func (ovr *Overseer) Remove(id string) bool

Remove (un-register) a process, if it's not running.

func (*Overseer) Signal

func (ovr *Overseer) Signal(id string, sig syscall.Signal) error

Signal sends OS signal to the process group.

func (*Overseer) Status

func (ovr *Overseer) Status(id string) *ProcessJSON

Status returns a child process status, ready to be converted to JSON. Use this after calling ListAll() or ListGroup() (PID, Exit code, Error, Runtime seconds, Stdout, Stderr)

func (*Overseer) Stop

func (ovr *Overseer) Stop(id string) error

Stop the process by sending its process group a SIGTERM signal. The process can be started again, if needed.

func (*Overseer) StopAll

func (ovr *Overseer) StopAll(kill bool)

StopAll cycles and kills all child procs. Used when exiting the program.

func (*Overseer) SubscribeStateEvent added in v1.0.2

func (or *Overseer) SubscribeStateEvent() <-chan interface{}

StateEvent returns the channel emitting state changes.

func (*Overseer) Supervise

func (ovr *Overseer) Supervise(id string) int

Supervise launches a process and restart it in case of failure.

func (*Overseer) SuperviseAll

func (ovr *Overseer) SuperviseAll()

SuperviseAll is the *main* function. Supervise all registered processes and wait for them to finish.

func (*Overseer) UnWatch added in v1.0.0

func (ovr *Overseer) UnWatch(outputChan chan *ProcessJSON)

UnWatch allows un-subscribing from state changes.

func (*Overseer) UnWatchLogs added in v1.0.0

func (ovr *Overseer) UnWatchLogs(logChan chan *LogMsg)

UnWatchLogs allows un-subscribing from log messages.

func (*Overseer) WaitIdle added in v1.0.1

func (or *Overseer) WaitIdle()

WaitIdle waits for the next IDLE state (finish all)

func (*Overseer) Watch added in v1.0.0

func (ovr *Overseer) Watch(outputChan chan *ProcessJSON)

Watch allows subscribing to state changes via provided output channel.

func (*Overseer) WatchLogs added in v1.0.0

func (ovr *Overseer) WatchLogs(logChan chan *LogMsg)

WatchLogs allows subscribing to log messages via provided output channel.

type OvrState added in v1.0.0

type OvrState uint8

OvrState represents a Overseer state

func (OvrState) String added in v1.0.0

func (p OvrState) String() string

type ProcessJSON added in v1.0.0

type ProcessJSON struct {
	ID         string    `json:"id"`
	Group      string    `json:"group"`
	Cmd        string    `json:"cmd"`
	Dir        string    `json:"dir"`
	PID        int       `json:"PID"`
	State      string    `json:"state"`
	ExitCode   int       `json:"exitCode"` // exit code of process
	Error      error     `json:"error"`    // Go error
	StartTime  time.Time `json:"startTime"`
	DelayStart uint      `json:"delayStart"`
	RetryTimes uint      `json:"retryTimes"`
}

ProcessJSON public structure

type Publisher added in v1.0.5

type Publisher = pubsub.Publisher

Publisher is a type alias

type Status

type Status struct {
	Cmd     string
	PID     int
	Exit    int      // exit code of process
	Error   error    // Go error
	StartTs int64    // Unix ts (nanoseconds), zero if Cmd not started
	StopTs  int64    // Unix ts (nanoseconds), zero if Cmd not started or running
	Runtime float64  // seconds, zero if Cmd not started
	Stdout  []string // buffered STDOUT; see Cmd.Status for more info
	Stderr  []string // buffered STDERR; see Cmd.Status for more info
}

Status represents the running status and consolidated return of a Cmd. It can be obtained any time by calling Cmd.Status. If StartTs > 0, the command has started. If StopTs > 0, the command has stopped. After the command finishes for any reason, this combination of values indicates success (presuming the command only exits zero on success):

Exit     = 0
Error    = nil

Error is a Go error from the underlying os/exec.Cmd.Start or os/exec.Cmd.Wait. If not nil, the command either failed to start (it never ran) or it started but was terminated unexpectedly (probably signaled). In either case, the command failed. Callers should check Error first. If nil, then check Exit and Status.

Directories

Path Synopsis
examples

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL