pipe

package
v1.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 29, 2022 License: MIT Imports: 17 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// IsSIGPIPE is an `ErrorMatcher` that matches `*exec.ExitError`s
	// that were caused by SIGPIPE. The match for `*exec.ExitError`s
	// uses `errors.As()`. Use like
	//
	//     p.Add(IgnoreError(someStage, IsSIGPIPE))
	IsSIGPIPE = IsSignal(syscall.SIGPIPE)

	// IsEPIPE is an `ErrorMatcher` that matches `syscall.EPIPE` using
	// `errors.Is()`. Use like
	//
	//     p.Add(IgnoreError(someStage, IsEPIPE))
	IsEPIPE = IsError(syscall.EPIPE)

	// IsErrClosedPipe is an `ErrorMatcher` that matches
	// `io.ErrClosedPipe` using `errors.Is()`. (`io.ErrClosedPipe` is
	// the error that results from writing to a closed
	// `*io.PipeWriter`.) Use like
	//
	//     p.Add(IgnoreError(someStage, IsErrClosedPipe))
	IsErrClosedPipe = IsError(io.ErrClosedPipe)

	// IsPipeError is an `ErrorMatcher` that matches a few different
	// errors that typically result if a stage writes to a subsequent
	// stage that has stopped reading from its stdin. Use like
	//
	//     p.Add(IgnoreError(someStage, IsPipeError))
	IsPipeError = AnyError(IsSIGPIPE, IsEPIPE, IsErrClosedPipe)
)
View Source
var ErrMemoryLimitExceeded = errors.New("memory limit exceeded")

ErrMemoryLimitExceeded is the error that will be used to kill a process, if necessary, from MemoryLimit.

View Source
var FinishEarly = errors.New("finish stage early")

FinishEarly is an error that can be returned by a `Stage` to request that the iteration be ended early (possibly without reading all of its input). This "error" is considered a successful return, and is not reported to the caller.

Functions

func ScanLFTerminatedLines

func ScanLFTerminatedLines(data []byte, atEOF bool) (advance int, token []byte, err error)

ScanLFTerminatedLines is a `bufio.SplitFunc` that splits its input into lines at LF characters (not treating CR specially).

Types

type AppendVars

type AppendVars func(context.Context, []EnvVar) []EnvVar

type ContextValueFunc

type ContextValueFunc func(context.Context) (string, bool)

type ContextValuesFunc

type ContextValuesFunc func(context.Context) []EnvVar

type Env

type Env struct {
	// The directory in which external commands should be executed by
	// default.
	Dir string

	// Vars are extra environment variables. These will override any
	// environment variables that would be inherited from the current
	// process.
	Vars []AppendVars
}

Env represents the environment that a pipeline stage should run in. It is passed to `Stage.Start()`.

type EnvVar

type EnvVar struct {
	// The name of the environment variable.
	Key string
	// The value.
	Value string
}

EnvVar represents an environment variable that will be provided to any child process spawned in this pipeline.

type ErrorFilter

type ErrorFilter func(err error) error

ErrorFilter is a function that can filter errors from `Stage.Wait()`. The original error (possibly nil) is passed in as an argument, and whatever the function returns is the error (possibly nil) that is actually emitted.

type ErrorMatcher

type ErrorMatcher func(err error) bool

ErrorMatcher decides whether its argument matches some class of errors (e.g., errors that we want to ignore). The function will only be invoked for non-nil errors.

func AnyError

func AnyError(ems ...ErrorMatcher) ErrorMatcher

AnyError returns an `ErrorMatcher` that returns true for an error that matches any of the `ems`.

func IsError

func IsError(target error) ErrorMatcher

IsError returns an ErrorIdentifier for the specified target error, matched using `errors.Is()`. Use like

p.Add(pipe.IgnoreError(someStage, IsError(io.EOF)))

func IsSignal

func IsSignal(signal syscall.Signal) ErrorMatcher

IsSIGPIPE returns an `ErrorMatcher` that matches `*exec.ExitError`s that were caused by the specified signal. The match for `*exec.ExitError`s uses `errors.As()`. Note that under Windows this always returns false, because on that platform `WaitStatus.Signaled()` isn't implemented (it is hardcoded to return `false`).

type Event

type Event struct {
	Command string
	Msg     string
	Err     error
	Context map[string]interface{}
}

Event represents anything that could happen during the pipeline execution

type LimitableStage

type LimitableStage interface {
	Stage

	GetRSSAnon(context.Context) (uint64, error)
	Kill(error)
}

LimitableStage is the superset of Stage that must be implemented by stages passed to MemoryLimit and MemoryObserver.

type LinewiseStageFunc

type LinewiseStageFunc func(
	ctx context.Context, env Env, line []byte, stdout *bufio.Writer,
) error

LinewiseStageFunc is a function that can be embedded in a `goStage`. It is called once per line in the input (where "line" can be defined via any `bufio.Scanner`). It should process the line and may write whatever it likes to `stdout`, which is a buffered writer whose contents are forwarded to the input of the next stage of the pipeline. The function needn't write one line of output per line of input.

The function mustn't retain copies of `line`, since it may be overwritten every time the function is called.

The function needn't flush or close `stdout` (this will be done automatically when all of the input has been processed).

If there is an error parsing the input into lines, or if this function returns an error, then the whole pipeline will be aborted with that error. However, if the function returns the special error `pipe.FinishEarly`, the stage will stop processing immediately with a `nil` error value.

The function will be called in a separate goroutine, so it must be careful to synchronize any data access aside from writing to `stdout`.

type NewPipeFn

type NewPipeFn func(opts ...Option) *Pipeline

type NewScannerFunc

type NewScannerFunc func(r io.Reader) (Scanner, error)

NewScannerFunc is used to create a `Scanner` for scanning input that is coming from `r`.

type Option

type Option func(*Pipeline)

Option is a type alias for Pipeline functional options.

func WithDir

func WithDir(dir string) Option

WithDir sets the default directory for running external commands.

func WithEnvVar

func WithEnvVar(key, value string) Option

WithEnvVar appends an environment variable for the pipeline.

func WithEnvVarFunc

func WithEnvVarFunc(key string, valueFunc ContextValueFunc) Option

WithEnvVarFunc appends a context-based environment variable for the pipeline.

func WithEnvVars

func WithEnvVars(b []EnvVar) Option

WithEnvVars appends several environment variable for the pipeline.

func WithEnvVarsFunc

func WithEnvVarsFunc(valuesFunc ContextValuesFunc) Option

WithEnvVarsFunc appends several context-based environment variables for the pipeline.

func WithEventHandler

func WithEventHandler(handler func(e *Event)) Option

WithEventHandler sets a handler for the pipeline. Setting one will emit and event for each process.

func WithStdin

func WithStdin(stdin io.Reader) Option

WithStdin assigns stdin to the first command in the pipeline.

func WithStdout

func WithStdout(stdout io.Writer) Option

WithStdout assigns stdout to the last command in the pipeline.

func WithStdoutCloser

func WithStdoutCloser(stdout io.WriteCloser) Option

WithStdoutCloser assigns stdout to the last command in the pipeline, and closes stdout when it's done.

type Pipeline

type Pipeline struct {
	// contains filtered or unexported fields
}

Pipeline represents a Unix-like pipe that can include multiple stages, including external processes but also and stages written in Go.

func New

func New(options ...Option) *Pipeline

NewPipeline returns a Pipeline struct with all of the `options` applied.

func (*Pipeline) Add

func (p *Pipeline) Add(stages ...Stage)

Add appends one or more stages to the pipeline.

func (*Pipeline) AddWithIgnoredError

func (p *Pipeline) AddWithIgnoredError(em ErrorMatcher, stages ...Stage)

AddWithIgnoredError appends one or more stages that are ignoring the passed in error to the pipeline.

func (*Pipeline) Output

func (p *Pipeline) Output(ctx context.Context) ([]byte, error)

func (*Pipeline) Run

func (p *Pipeline) Run(ctx context.Context) error

Run starts and waits for the commands in the pipeline.

func (*Pipeline) Start

func (p *Pipeline) Start(ctx context.Context) error

Start starts the commands in the pipeline. If `Start()` exits without an error, `Wait()` must also be called, to allow all resources to be freed.

func (*Pipeline) Wait

func (p *Pipeline) Wait() error

Wait waits for each stage in the pipeline to exit.

type Scanner

type Scanner interface {
	Scan() bool
	Bytes() []byte
	Err() error
}

Scanner defines the interface (which is implemented by `bufio.Scanner`) that is needed by `AddScannerFunction()`. See `bufio.Scanner` for how these methods should behave.

type Stage

type Stage interface {
	// Name returns the name of the stage.
	Name() string

	// Start starts the stage in the background, in the environment
	// described by `env`, and using `stdin` as input. (`stdin` should
	// be set to `nil` if the stage is to receive no input, which
	// might be the case for the first stage in a pipeline.) It
	// returns an `io.ReadCloser` from which the stage's output can be
	// read (or `nil` if it generates no output, which should only be
	// the case for the last stage in a pipeline). It is the stages'
	// responsibility to close `stdin` (if it is not nil) when it has
	// read all of the input that it needs, and to close the write end
	// of its output reader when it is done, as that is generally how
	// the subsequent stage knows that it has received all of its
	// input and can finish its work, too.
	//
	// If `Start()` returns without an error, `Wait()` must also be
	// called, to allow all resources to be freed.
	Start(ctx context.Context, env Env, stdin io.ReadCloser) (io.ReadCloser, error)

	// Wait waits for the stage to be done, either because it has
	// finished or because it has been killed due to the expiration of
	// the context passed to `Start()`.
	Wait() error
}

Stage is an element of a `Pipeline`.

func Command

func Command(command string, args ...string) Stage

Command returns a pipeline `Stage` based on the specified external `command`, run with the given command-line `args`. Its stdin and stdout are handled as usual, and its stderr is collected and included in any `*exec.ExitError` that the command might emit.

func CommandStage

func CommandStage(name string, cmd *exec.Cmd) Stage

CommandStage returns a pipeline `Stage` with the name `name`, based on the specified `cmd`. Its stdin and stdout are handled as usual, and its stderr is collected and included in any `*exec.ExitError` that the command might emit.

func FilterError

func FilterError(s Stage, filter ErrorFilter) Stage

func Function

func Function(name string, f StageFunc) Stage

Function returns a pipeline `Stage` that will run a `StageFunc` in a separate goroutine to process the data. See `StageFunc` for more information.

func IgnoreError

func IgnoreError(s Stage, em ErrorMatcher) Stage

IgnoreError creates a stage that acts like `s` except that it ignores any errors that are matched by `em`. Use like

p.Add(pipe.IgnoreError(
    someStage,
    func(err error) bool {
        var myError *MyErrorType
        return errors.As(err, &myError) && myError.foo == 42
    },
)

The second argument can also be one of the `ErrorMatcher`s that are provided by this package (e.g., `IsError(target)`, IsSignal(signal), `IsSIGPIPE`, `IsEPIPE`, `IsPipeError`), or one of the functions from the standard library that has the same signature (e.g., `os.IsTimeout`), or some combination of these (e.g., `AnyError(IsSIGPIPE, os.IsTimeout)`).

func LinewiseFunction

func LinewiseFunction(name string, f LinewiseStageFunc) Stage

LinewiseFunction returns a function-based `Stage`. The input will be split into LF-terminated lines and passed to the function one line at a time (without the LF). The function may emit output to its `stdout` argument. See the definition of `LinewiseStageFunc` for more information.

Note that the stage will emit an error if any line (including its end-of-line terminator) exceeds 64 kiB in length. If this is too short, use `ScannerFunction()` directly with your own `NewScannerFunc` as argument, or use `Function()` directly with your own `StageFunc`.

func MemoryLimit

func MemoryLimit(stage Stage, byteLimit uint64, eventHandler func(e *Event)) Stage

MemoryLimit watches the memory usage of the stage and stops it if it exceeds the given limit.

func MemoryObserver

func MemoryObserver(stage Stage, eventHandler func(e *Event)) Stage

MemoryObserver watches memory use of the stage and logs the maximum value when the stage exits.

func Print

func Print(a ...interface{}) Stage

func Printf

func Printf(format string, a ...interface{}) Stage

func Println

func Println(a ...interface{}) Stage

func ScannerFunction

func ScannerFunction(
	name string, newScanner NewScannerFunc, f LinewiseStageFunc,
) Stage

ScannerFunction creates a function-based `Stage`. The function will be passed input, one line at a time, and may emit output. See the definition of `LinewiseStageFunc` for more information.

type StageFunc

type StageFunc func(ctx context.Context, env Env, stdin io.Reader, stdout io.Writer) error

StageFunc is a function that can be used to power a `goStage`. It should read its input from `stdin` and write its output to `stdout`. `stdin` and `stdout` will be closed automatically (if necessary) once the function returns.

Neither `stdin` nor `stdout` are necessarily buffered. If the `StageFunc` requires buffering, it needs to arrange that itself.

A `StageFunc` is run in a separate goroutine, so it must be careful to synchronize any data access aside from reading and writing.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL