Documentation ¶
Overview ¶
Package stream provides filters that can be chained together in a manner similar to Unix pipelines. A simple example that prints all go files under the current directory:
stream.Run( stream.Find("."), stream.Grep(`\.go$`), stream.WriteLines(os.Stdout), )
stream.Run is passed a list of filters that are chained together (stream.Find, stream.Grep, stream.WriteLines are filters). Each filter takes as input a sequence of strings and produces a sequence of strings. The empty sequence is passed as input to the first filter. The output of one filter is fed as input to the next filter.
stream.Run is just one way to execute filters. Others are stream.Contents (returns the output of the last filter as a []string), and stream.ForEach (executes a supplied function for every output item).
Error handling ¶
Filter execution can result in errors. These are returned from stream functions normally. For example, the following call will return a non-nil error.
err := stream.Run( stream.Items("hello", "world"), stream.Grep("["), // Invalid regular expression stream.WriteLines(os.Stdout), ) // err will be non-nil
User defined filters ¶
Each filter takes as input a sequence of strings (read from a channel) and produces as output a sequence of strings (written to a channel). The stream package provides a bunch of useful filters. Applications can define their own filters easily. For example, here is a filter that repeats every input n times:
func Repeat(n int) stream.FilterFunc { return func(arg stream.Arg) error { for s := range arg.In { for i := 0; i < n; i++ { arg.Out <- s } } return nil } } stream.Run( stream.Items("hello", "world"), Repeat(2), stream.WriteLines(os.Stdout), )
The output will be:
hello hello world world
Note that Repeat returns a FilterFunc, a function type that implements the Filter interface. This is a common implementation pattern: many simple filters can be expressed as a single function of type FilterFunc.
Tunable Filters ¶
FilterFunc is an appropriate type to use for most filters like Repeat above. However for some filters, dynamic customization is appropriate. Such filters provide their own implementation of the Filter interface with extra methods. For example, stream.Sort provides extra methods that can be used to control how items are sorted:
stream.Run( stream.Command("ls", "-l"), stream.Sort().Num(5), // Sort numerically by size (column 5) stream.WriteLines(os.Stdout), )
Acknowledgments ¶
The interface of this package is inspired by the http://labix.org/pipe package. Users may wish to consider that package in case it fits their needs better.
Index ¶
- func Contents(filters ...Filter) ([]string, error)
- func ForEach(filter Filter, fn func(s string)) error
- func Run(filters ...Filter) error
- type Arg
- type Filter
- func Cat(filenames ...string) Filter
- func Columns(columns ...int) Filter
- func Command(command string, args ...string) Filter
- func DropFirst(n int) Filter
- func DropLast(n int) Filter
- func First(n int) Filter
- func Grep(r string) Filter
- func GrepNot(r string) Filter
- func If(fn func(string) bool) Filter
- func Items(items ...string) Filter
- func Last(n int) Filter
- func Map(fn func(string) string) Filter
- func NumberLines() Filter
- func Numbers(x, y int) Filter
- func Parallel(n int, f Filter) Filter
- func ReadLines(reader io.Reader) Filter
- func Repeat(s string, n int) Filter
- func Reverse() Filter
- func Sample(n int) Filter
- func SampleWithSeed(n int, seed int64) Filter
- func Sequence(filters ...Filter) Filter
- func Substitute(r, replacement string) Filter
- func Uniq() Filter
- func UniqWithCount() Filter
- func WriteLines(writer io.Writer) Filter
- type FilterFunc
- type FindFilter
- type SortFilter
- func (s *SortFilter) By(less func(a, b string) bool) *SortFilter
- func (s *SortFilter) Num(n int) *SortFilter
- func (s *SortFilter) NumDecreasing(n int) *SortFilter
- func (s *SortFilter) RunFilter(arg Arg) error
- func (s *SortFilter) Text(n int) *SortFilter
- func (s *SortFilter) TextDecreasing(n int) *SortFilter
- type XargsFilter
Examples ¶
- Cat
- Columns
- Command
- Command (OutputOnly)
- Command (WithError)
- Contents
- DropFirst
- DropLast
- Find
- Find (Error)
- FindFilter.SkipDirIf
- First
- ForEach
- Grep
- GrepNot
- If
- Items
- Last
- Map
- NumberLines
- Numbers
- Parallel
- ReadLines
- Reverse
- Run
- Sample
- SampleWithSeed
- Sequence
- Sort
- Sort (MultipleColumns)
- Substitute
- Uniq
- UniqWithCount
- WriteLines
- Xargs
- Xargs (SplitArguments)
- XargsFilter.LimitArgs
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func Contents ¶
Contents returns a slice that contains all items that are the output of filters.
Example ¶
package main import ( "github.com/ghemawat/stream" "fmt" ) func main() { out, err := stream.Contents(stream.Numbers(1, 3)) fmt.Println(out, err) }
Output: [1 2 3] <nil>
func ForEach ¶
ForEach calls fn(s) for every item s in the output of filter and returns either nil, or any error reported by the execution of the filter.
Example ¶
package main import ( "github.com/ghemawat/stream" "fmt" ) func main() { err := stream.ForEach(stream.Numbers(1, 5), func(s string) { fmt.Print(s) }) if err != nil { panic(err) } }
Output: 12345
func Run ¶
Run executes the sequence of filters and discards all output. It returns either nil, an error if any filter reported an error.
Example ¶
package main import ( "github.com/ghemawat/stream" "fmt" "os" ) func main() { err := stream.Run( stream.Items("line 1", "line 2"), stream.WriteLines(os.Stdout), ) fmt.Println("error:", err) }
Output: line 1 line 2 error: <nil>
Types ¶
type Arg ¶
Arg contains the data passed to Filter.Run. Arg.In is a channel that produces the input to the filter, and Arg.Out is a channel that receives the output from the filter.
type Filter ¶
type Filter interface { // RunFilter reads a sequence of items from Arg.In and produces a // sequence of items on Arg.Out. RunFilter returns nil on success, // an error otherwise. RunFilter must *not* close the Arg.Out // channel. RunFilter(Arg) error }
The Filter interface represents a process that takes as input a sequence of strings from a channel and produces a sequence on another channel.
func Cat ¶
Cat emits each line from each named file in order. If no arguments are specified, Cat copies its input to its output.
Example ¶
package main import ( "github.com/ghemawat/stream" "os" ) func main() { stream.Run( stream.Cat("stream_test.go"), stream.Grep("^func ExampleCat"), stream.WriteLines(os.Stdout), ) }
Output: func ExampleCat() {
func Columns ¶
Columns splits each item into columns and yields the concatenation (separated by spaces) of the columns numbers passed as arguments. Columns are numbered starting at 1. If a column number is bigger than the number of columns in an item, it is skipped.
Example ¶
package main import ( "github.com/ghemawat/stream" "os" ) func main() { stream.Run( stream.Items("hello world"), stream.Columns(2, 3, 1), stream.WriteLines(os.Stdout), ) }
Output: world hello
func Command ¶
Command executes "command args...".
The filter's input items are fed as standard input to the command, one line per input item. The standard output of the command is split into lines and the lines form the output of the filter (with trailing newlines removed).
Example ¶
package main import ( "github.com/ghemawat/stream" "os" ) func main() { stream.Run( stream.Numbers(1, 100), stream.Command("wc", "-l"), stream.WriteLines(os.Stdout), ) }
Output: 100
Example (OutputOnly) ¶
package main import ( "github.com/ghemawat/stream" "os" ) func main() { stream.Run( stream.Command("find", ".", "-type", "f", "-print"), stream.Grep(`^\./stream.*\.go$`), stream.Sort(), stream.WriteLines(os.Stdout), ) }
Output: ./stream.go ./stream_test.go
Example (WithError) ¶
package main import ( "github.com/ghemawat/stream" "fmt" ) func main() { err := stream.Run(stream.Command("no_such_command")) if err == nil { fmt.Println("execution of missing command succeeded unexpectedly") } }
Output:
func DropFirst ¶
DropFirst yields all items except for the first n items that it receives.
Example ¶
package main import ( "github.com/ghemawat/stream" "os" ) func main() { stream.Run( stream.Numbers(1, 10), stream.DropFirst(8), stream.WriteLines(os.Stdout), ) }
Output: 9 10
func DropLast ¶
DropLast yields all items except for the last n items that it receives.
Example ¶
package main import ( "github.com/ghemawat/stream" "os" ) func main() { stream.Run( stream.Numbers(1, 10), stream.DropLast(3), stream.WriteLines(os.Stdout), ) }
Output: 1 2 3 4 5 6 7
func First ¶
First yields the first n items that it receives.
Example ¶
package main import ( "github.com/ghemawat/stream" "os" ) func main() { stream.Run( stream.Numbers(1, 10), stream.First(3), stream.WriteLines(os.Stdout), ) }
Output: 1 2 3
func Grep ¶
Grep emits every input x that matches the regular expression r.
Example ¶
package main import ( "github.com/ghemawat/stream" "os" ) func main() { stream.Run( stream.Numbers(1, 12), stream.Grep(".."), stream.WriteLines(os.Stdout), ) }
Output: 10 11 12
func GrepNot ¶
GrepNot emits every input x that does not match the regular expression r.
Example ¶
package main import ( "github.com/ghemawat/stream" "os" ) func main() { stream.Run( stream.Numbers(1, 12), stream.GrepNot("^.$"), stream.WriteLines(os.Stdout), ) }
Output: 10 11 12
func If ¶
If emits every input x for which fn(x) is true.
Example ¶
package main import ( "github.com/ghemawat/stream" "os" ) func main() { stream.Run( stream.Numbers(1, 12), stream.If(func(s string) bool { return len(s) > 1 }), stream.WriteLines(os.Stdout), ) }
Output: 10 11 12
func Items ¶
Items emits items.
Example ¶
package main import ( "github.com/ghemawat/stream" "os" ) func main() { stream.Run( stream.Items("hello", "world"), stream.WriteLines(os.Stdout), ) }
Output: hello world
func Last ¶
Last yields the last n items that it receives.
Example ¶
package main import ( "github.com/ghemawat/stream" "os" ) func main() { stream.Run( stream.Numbers(1, 10), stream.Last(2), stream.WriteLines(os.Stdout), ) }
Output: 9 10
func Map ¶
Map calls fn(x) for every item x and yields the outputs of the fn calls.
Example ¶
package main import ( "github.com/ghemawat/stream" "fmt" "os" ) func main() { stream.Run( stream.Items("hello", "there", "how", "are", "you?"), stream.Map(func(s string) string { return fmt.Sprintf("%d %s", len(s), s) }), stream.WriteLines(os.Stdout), ) }
Output: 5 hello 5 there 3 how 3 are 4 you?
func NumberLines ¶
func NumberLines() Filter
NumberLines prefixes its item with its index in the input sequence (starting at 1) followed by a space.
Example ¶
package main import ( "github.com/ghemawat/stream" "os" ) func main() { stream.Run( stream.Items("a", "b"), stream.NumberLines(), stream.WriteLines(os.Stdout), ) }
Output: 1 a 2 b
func Numbers ¶
Numbers emits the integers x..y
Example ¶
package main import ( "github.com/ghemawat/stream" "os" ) func main() { stream.Run( stream.Numbers(2, 5), stream.WriteLines(os.Stdout), ) }
Output: 2 3 4 5
func Parallel ¶
Parallel returns a Filter that runs n copies of f. The input to the Parallel Filter is divided up amongst the n copies. The output of the n copies is merged (in an unspecified order) and forms the output of the Parallel filter.
Example ¶
package main import ( "github.com/ghemawat/stream" "fmt" "os" ) func main() { stream.Run( stream.Items("hello", "there", "how", "are", "you?"), stream.Parallel(4, stream.Map(func(s string) string { return fmt.Sprintf("%d %s", len(s), s) }), ), stream.Sort(), stream.WriteLines(os.Stdout), ) }
Output: 3 are 3 how 4 you? 5 hello 5 there
func ReadLines ¶
ReadLines emits each line found in reader.
Example ¶
package main import ( "github.com/ghemawat/stream" "bytes" "os" ) func main() { stream.Run( stream.ReadLines(bytes.NewBufferString("the\nquick\nbrown\nfox\n")), stream.Sort(), stream.WriteLines(os.Stdout), ) }
Output: brown fox quick the
func Reverse ¶
func Reverse() Filter
Reverse yields items in the reverse of the order it received them.
Example ¶
package main import ( "github.com/ghemawat/stream" "os" ) func main() { stream.Run( stream.Items("a", "b"), stream.Reverse(), stream.WriteLines(os.Stdout), ) }
Output: b a
func Sample ¶
Sample picks n pseudo-randomly chosen input items. Different executions of a Sample filter will chose different items.
Example ¶
package main import ( "github.com/ghemawat/stream" "os" ) func main() { stream.Run( stream.Numbers(100, 200), stream.Sample(4), stream.WriteLines(os.Stdout), ) // Output not checked since it is non-deterministic. }
Output:
func SampleWithSeed ¶
SampleWithSeed picks n pseudo-randomly chosen input items. It uses seed as the argument for its random number generation and therefore different executions of SampleWithSeed with the same arguments will chose the same items.
Example ¶
package main import ( "github.com/ghemawat/stream" "os" ) func main() { stream.Run( stream.Numbers(1, 100), stream.SampleWithSeed(2, 100), stream.Sort().Num(1), stream.WriteLines(os.Stdout), ) }
Output: 11 46
func Sequence ¶
Sequence returns a filter that is the concatenation of all filter arguments. The output of a filter is fed as input to the next filter.
Example ¶
package main import ( "github.com/ghemawat/stream" "fmt" ) func main() { stream.ForEach(stream.Sequence( stream.Numbers(1, 25), stream.Grep("3"), ), func(s string) { fmt.Println(s) }) }
Output: 3 13 23
func Substitute ¶
Substitute replaces all occurrences of the regular expression r in an input item with replacement. The replacement string can contain $1, $2, etc. which represent submatches of r.
Example ¶
package main import ( "github.com/ghemawat/stream" "os" ) func main() { stream.Run( stream.Numbers(1, 5), stream.Substitute("(3)", "$1$1"), stream.WriteLines(os.Stdout), ) }
Output: 1 2 33 4 5
func Uniq ¶
func Uniq() Filter
Uniq squashes adjacent identical items in arg.In into a single output.
Example ¶
package main import ( "github.com/ghemawat/stream" "os" ) func main() { stream.Run( stream.Items("a", "b", "b", "c", "b"), stream.Uniq(), stream.WriteLines(os.Stdout), ) }
Output: a b c b
func UniqWithCount ¶
func UniqWithCount() Filter
UniqWithCount squashes adjacent identical items in arg.In into a single output prefixed with the count of identical items followed by a space.
Example ¶
package main import ( "github.com/ghemawat/stream" "os" ) func main() { stream.Run( stream.Items("a", "b", "b", "c"), stream.UniqWithCount(), stream.WriteLines(os.Stdout), ) }
Output: 1 a 2 b 1 c
func WriteLines ¶
WriteLines prints each input item s followed by a newline to writer; and in addition it emits s. Therefore WriteLines() can be used like the "tee" command, which can often be useful for debugging.
Example ¶
package main import ( "github.com/ghemawat/stream" "os" ) func main() { stream.Run( stream.Numbers(1, 3), stream.WriteLines(os.Stdout), ) }
Output: 1 2 3
type FilterFunc ¶
FilterFunc is an adapter type that allows the use of ordinary functions as Filters. If f is a function with the appropriate signature, FilterFunc(f) is a Filter that calls f.
func (FilterFunc) RunFilter ¶
func (f FilterFunc) RunFilter(arg Arg) error
RunFilter calls this function. It implements the Filter interface.
type FindFilter ¶
type FindFilter struct {
// contains filtered or unexported fields
}
FindFilter is a filter that produces matching nodes under a filesystem directory.
func Find ¶
func Find(dir string) *FindFilter
Find returns a filter that produces matching nodes under a filesystem directory. The items yielded by the filter will be prefixed by dir. E.g., if dir contains subdir/file, the filter will yield dir/subdir/file. By default, the filter matches all types of files (regular files, directories, symbolic links, etc.). This behavior can be adjusted by calling FindFilter methods before executing the filter.
Example ¶
package main import ( "github.com/ghemawat/stream" "os" ) func main() { stream.Run( stream.Find(".").IfMode(os.FileMode.IsRegular), stream.Grep("stream"), stream.WriteLines(os.Stdout), ) }
Output: stream.go stream_test.go
Example (Error) ¶
package main import ( "github.com/ghemawat/stream" "fmt" ) func main() { err := stream.Run(stream.Find("/no_such_dir")) if err == nil { fmt.Println("stream.Find did not return expected error") } }
Output:
func (*FindFilter) IfMode ¶
func (f *FindFilter) IfMode(fn func(os.FileMode) bool) *FindFilter
IfMode adjusts f so it only matches nodes for which fn(mode) returns true.
func (*FindFilter) RunFilter ¶
func (f *FindFilter) RunFilter(arg Arg) error
RunFilter yields contents of a filesystem tree. It implements the Filter interface.
func (*FindFilter) SkipDirIf ¶
func (f *FindFilter) SkipDirIf(fn func(d string) bool) *FindFilter
SkipDirIf adjusts f so that if fn(d) returns true for a directory d, d and all of d's descendents are skipped.
Example ¶
package main import ( "github.com/ghemawat/stream" "os" ) func main() { stream.Run( stream.Find(".").SkipDirIf(func(d string) bool { return d == ".git" }), stream.Grep("x"), stream.WriteLines(os.Stdout), ) }
Output: regexp.go xargs.go
type SortFilter ¶
type SortFilter struct {
// contains filtered or unexported fields
}
SortFilter is a Filter that sorts its input items by a sequence of sort keys. If two items compare equal by all specified sort keys (this always happens if no sort keys are specified), the items are compared lexicographically.
func Sort ¶
func Sort() *SortFilter
Sort returns a filter that sorts its input items. By default, the filter sorts items lexicographically. However this can be adjusted by calling methods like Num, Text that add sorting keys to the filter.
Example ¶
package main import ( "github.com/ghemawat/stream" "os" ) func main() { stream.Run( stream.Items("banana", "apple", "cheese", "apple"), stream.Sort(), stream.WriteLines(os.Stdout), ) }
Output: apple apple banana cheese
Example (MultipleColumns) ¶
package main import ( "github.com/ghemawat/stream" "os" ) func main() { // Sort numerically by column 1. Break ties by sorting // lexicographically by column 2. stream.Run( stream.Items( "1970 march", "1970 feb", "1950 june", "1980 sep", ), stream.Sort().Num(1).Text(2), stream.WriteLines(os.Stdout), ) }
Output: 1950 june 1970 feb 1970 march 1980 sep
func (*SortFilter) By ¶
func (s *SortFilter) By(less func(a, b string) bool) *SortFilter
By adds a sort key to sort by the output of the specified less function.
func (*SortFilter) Num ¶
func (s *SortFilter) Num(n int) *SortFilter
Num sets the next sort key to sort by column n in numeric order. Column 0 means the entire string. Items that do not have column n sort to the front. Items whose column n is not a number sort to the end.
func (*SortFilter) NumDecreasing ¶
func (s *SortFilter) NumDecreasing(n int) *SortFilter
NumDecreasing sets the next sort key to sort by column n in reverse numeric order. Column 0 means the entire string. Items that do not have column n sort to the end. Items whose column n is not a number sort to the front.
func (*SortFilter) RunFilter ¶
func (s *SortFilter) RunFilter(arg Arg) error
RunFilter sorts items by the specified sorting keys. It implements the Filter interface.
func (*SortFilter) Text ¶
func (s *SortFilter) Text(n int) *SortFilter
Text sets the next sort key to sort by column n in lexicographic order. Column 0 means the entire string. Items that do not have column n sort to the front.
func (*SortFilter) TextDecreasing ¶
func (s *SortFilter) TextDecreasing(n int) *SortFilter
TextDecreasing sets the next sort key to sort by column n in reverse lexicographic order. Column 0 means the entire string. Items that do not have column n sort to the end.
type XargsFilter ¶
type XargsFilter struct {
// contains filtered or unexported fields
}
XargsFilter is a Filter that applies a command to every input item.
func Xargs ¶
func Xargs(command string, args ...string) *XargsFilter
Xargs returns a filter that executes "command args... items..." where items are the input to the filter. The handling of items may be split across multiple executions of command (typically to meet command line length restrictions). The standard output of the execution(s) is split into lines and the lines form the output of the filter (with trailing newlines removed).
Example ¶
package main import ( "github.com/ghemawat/stream" "os" ) func main() { stream.Run( stream.Numbers(1, 5), stream.Xargs("echo"), stream.WriteLines(os.Stdout), ) }
Output: 1 2 3 4 5
Example (SplitArguments) ¶
package main import ( "github.com/ghemawat/stream" "os" ) func main() { // Xargs should split the long list of arguments into // three executions to keep command length below 4096. stream.Run( stream.Numbers(1, 2000), stream.Xargs("echo"), stream.Command("wc", "-l"), stream.WriteLines(os.Stdout), ) }
Output: 3
func (*XargsFilter) LimitArgs ¶
func (x *XargsFilter) LimitArgs(n int) *XargsFilter
LimitArgs adjusts x so that no more than n input items are passed to a single command execution. If not called, an unspecified number of input items may be handled via a single command execution.
Example ¶
package main import ( "github.com/ghemawat/stream" "os" ) func main() { stream.Run( stream.Numbers(1, 5), stream.Xargs("echo").LimitArgs(2), stream.WriteLines(os.Stdout), ) }
Output: 1 2 3 4 5
func (*XargsFilter) RunFilter ¶
func (x *XargsFilter) RunFilter(arg Arg) error
RunFilter implements the Filter interface: it reads a sequence of items from arg.In and passes them as arguments to "command args...".