Documentation ¶
Index ¶
- Variables
- func ScanLFTerminatedLines(data []byte, atEOF bool) (advance int, token []byte, err error)
- type Env
- type ErrorFilter
- type ErrorMatcher
- type LinewiseStageFunc
- type NewScannerFunc
- type Option
- type Pipeline
- func (p *Pipeline) Add(stages ...Stage)
- func (p *Pipeline) AddWithIgnoredError(em ErrorMatcher, stages ...Stage)
- func (p *Pipeline) Output(ctx context.Context) ([]byte, error)
- func (p *Pipeline) Run(ctx context.Context) error
- func (p *Pipeline) Start(ctx context.Context) error
- func (p *Pipeline) Wait() error
- type Scanner
- type Stage
- func Command(command string, args ...string) Stage
- func CommandStage(name string, cmd *exec.Cmd) Stage
- func FilterError(s Stage, filter ErrorFilter) Stage
- func Function(name string, f StageFunc) Stage
- func IgnoreError(s Stage, em ErrorMatcher) Stage
- func LinewiseFunction(name string, f LinewiseStageFunc) Stage
- func Print(a ...interface{}) Stage
- func Printf(format string, a ...interface{}) Stage
- func Println(a ...interface{}) Stage
- func ScannerFunction(name string, newScanner NewScannerFunc, f LinewiseStageFunc) Stage
- type StageFunc
Constants ¶
This section is empty.
Variables ¶
var ( // IsSIGPIPE is an `ErrorMatcher` that matches `*exec.ExitError`s // that were caused by SIGPIPE. The match for `*exec.ExitError`s // uses `errors.As()`. Use like // // p.Add(IgnoreError(someStage, IsSIGPIPE)) IsSIGPIPE = IsSignal(syscall.SIGPIPE) // IsEPIPE is an `ErrorMatcher` that matches `syscall.EPIPE` using // `errors.Is()`. Use like // // p.Add(IgnoreError(someStage, IsEPIPE)) IsEPIPE = IsError(syscall.EPIPE) // IsErrClosedPipe is an `ErrorMatcher` that matches // `io.ErrClosedPipe` using `errors.Is()`. (`io.ErrClosedPipe` is // the error that results from writing to a closed // `*io.PipeWriter`.) Use like // // p.Add(IgnoreError(someStage, IsErrClosedPipe)) IsErrClosedPipe = IsError(io.ErrClosedPipe) // IsPipeError is an `ErrorMatcher` that matches a few different // errors that typically result if a stage writes to a subsequent // stage that has stopped reading from its stdin. Use like // // p.Add(IgnoreError(someStage, IsPipeError)) IsPipeError = AnyError(IsSIGPIPE, IsEPIPE, IsErrClosedPipe) )
var FinishEarly = errors.New("finish stage early")
FinishEarly is an error that can be returned by a `Stage` to request that the iteration be ended early (possibly without reading all of its input). This "error" is considered a successful return, and is not reported to the caller.
Functions ¶
Types ¶
type Env ¶
type Env struct { // The directory in which external commands should be executed by // default. Dir string }
Env represents the environment that a pipeline stage should run in. It is passed to `Stage.Start()`.
type ErrorFilter ¶
ErrorFilter is a function that can filter errors from `Stage.Wait()`. The original error (possibly nil) is passed in as an argument, and whatever the function returns is the error (possibly nil) that is actually emitted.
type ErrorMatcher ¶
ErrorMatcher decides whether its argument matches some class of errors (e.g., errors that we want to ignore). The function will only be invoked for non-nil errors.
func AnyError ¶
func AnyError(ems ...ErrorMatcher) ErrorMatcher
AnyError returns an `ErrorMatcher` that returns true for an error that matches any of the `ems`.
func IsError ¶
func IsError(target error) ErrorMatcher
IsError returns an ErrorIdentifier for the specified target error, matched using `errors.Is()`. Use like
p.Add(pipe.IgnoreError(someStage, IsError(io.EOF)))
func IsSignal ¶
func IsSignal(signal syscall.Signal) ErrorMatcher
IsSIGPIPE returns an `ErrorMatcher` that matches `*exec.ExitError`s that were caused by the specified signal. The match for `*exec.ExitError`s uses `errors.As()`. Note that under Windows this always returns false, because on that platform `WaitStatus.Signaled()` isn't implemented (it is hardcoded to return `false`).
type LinewiseStageFunc ¶
type LinewiseStageFunc func( ctx context.Context, env Env, line []byte, stdout *bufio.Writer, ) error
LinewiseStageFunc is a function that can be embedded in a `goStage`. It is called once per line in the input (where "line" can be defined via any `bufio.Scanner`). It should process the line and may write whatever it likes to `stdout`, which is a buffered writer whose contents are forwarded to the input of the next stage of the pipeline. The function needn't write one line of output per line of input.
The function mustn't retain copies of `line`, since it may be overwritten every time the function is called.
The function needn't flush or close `stdout` (this will be done automatically when all of the input has been processed).
If there is an error parsing the input into lines, or if this function returns an error, then the whole pipeline will be aborted with that error. However, if the function returns the special error `pipe.FinishEarly`, the stage will stop processing immediately with a `nil` error value.
The function will be called in a separate goroutine, so it must be careful to synchronize any data access aside from writing to `stdout`.
type NewScannerFunc ¶
NewScannerFunc is used to create a `Scanner` for scanning input that is coming from `r`.
type Option ¶
type Option func(*Pipeline)
Option is a type alias for Pipeline functional options.
func WithStdout ¶
WithStdout assigns stdout to the last command in the pipeline.
func WithStdoutCloser ¶
func WithStdoutCloser(stdout io.WriteCloser) Option
WithStdoutCloser assigns stdout to the last command in the pipeline, and closes stdout when it's done.
type Pipeline ¶
type Pipeline struct {
// contains filtered or unexported fields
}
Pipeline represents a Unix-like pipe that can include multiple stages, including external processes but also and stages written in Go.
func (*Pipeline) AddWithIgnoredError ¶
func (p *Pipeline) AddWithIgnoredError(em ErrorMatcher, stages ...Stage)
AddWithIgnoredError appends one or more stages that are ignoring the passed in error to the pipeline.
type Scanner ¶
Scanner defines the interface (which is implemented by `bufio.Scanner`) that is needed by `AddScannerFunction()`. See `bufio.Scanner` for how these methods should behave.
type Stage ¶
type Stage interface { // Name returns the name of the stage. Name() string // Start starts the stage in the background, in the environment // described by `env`, and using `stdin` as input. (`stdin` should // be set to `nil` if the stage is to receive no input, which // might be the case for the first stage in a pipeline.) It // returns an `io.ReadCloser` from which the stage's output can be // read (or `nil` if it generates no output, which should only be // the case for the last stage in a pipeline). It is the stages' // responsibility to close `stdin` (if it is not nil) when it has // read all of the input that it needs, and to close the write end // of its output reader when it is done, as that is generally how // the subsequent stage knows that it has received all of its // input and can finish its work, too. // // If `Start()` returns without an error, `Wait()` must also be // called, to allow all resources to be freed. Start(ctx context.Context, env Env, stdin io.ReadCloser) (io.ReadCloser, error) // Wait waits for the stage to be done, either because it has // finished or because it has been killed due to the expiration of // the context passed to `Start()`. Wait() error }
Stage is an element of a `Pipeline`.
func Command ¶
Command returns a pipeline `Stage` based on the specified external `command`, run with the given command-line `args`. Its stdin and stdout are handled as usual, and its stderr is collected and included in any `*exec.ExitError` that the command might emit.
func CommandStage ¶
Command returns a pipeline `Stage` with the name `name`, based on the specified `cmd`. Its stdin and stdout are handled as usual, and its stderr is collected and included in any `*exec.ExitError` that the command might emit.
func FilterError ¶
func FilterError(s Stage, filter ErrorFilter) Stage
func Function ¶
Function returns a pipeline `Stage` that will run a `StageFunc` in a separate goroutine to process the data. See `StageFunc` for more information.
func IgnoreError ¶
func IgnoreError(s Stage, em ErrorMatcher) Stage
IgnoreError creates a stage that acts like `s` except that it ignores any errors that are matched by `em`. Use like
p.Add(pipe.IgnoreError( someStage, func(err error) bool { var myError *MyErrorType return errors.As(err, &myError) && myError.foo == 42 }, )
The second argument can also be one of the `ErrorMatcher`s that are provided by this package (e.g., `IsError(target)`, IsSignal(signal), `IsSIGPIPE`, `IsEPIPE`, `IsPipeError`), or one of the functions from the standard library that has the same signature (e.g., `os.IsTimeout`), or some combination of these (e.g., `AnyError(IsSIGPIPE, os.IsTimeout)`).
func LinewiseFunction ¶
func LinewiseFunction(name string, f LinewiseStageFunc) Stage
LinewiseFunction returns a function-based `Stage`. The input will be split into LF-terminated lines and passed to the function one line at a time (without the LF). The function may emit output to its `stdout` argument. See the definition of `LinewiseStageFunc` for more information.
Note that the stage will emit an error if any line (including its end-of-line terminator) exceeds 64 kiB in length. If this is too short, use `ScannerFunction()` directly with your own `NewScannerFunc` as argument, or use `Function()` directly with your own `StageFunc`.
func ScannerFunction ¶
func ScannerFunction( name string, newScanner NewScannerFunc, f LinewiseStageFunc, ) Stage
ScannerFunction creates a function-based `Stage`. The function will be passed input, one line at a time, and may emit output. See the definition of `LinewiseStageFunc` for more information.
type StageFunc ¶
StageFunc is a function that can be used to power a `goStage`. It should read its input from `stdin` and write its output to `stdout`. `stdin` and `stdout` will be closed automatically (if necessary) once the function returns.
Neither `stdin` nor `stdout` are necessarily buffered. If the `StageFunc` requires buffering, it needs to arrange that itself.
A `StageFunc` is run in a separate goroutine, so it must be careful to synchronize any data access aside from reading and writing.