stream: github.com/ghemawat/stream Index | Examples | Files

package stream

import "github.com/ghemawat/stream"

Package stream provides filters that can be chained together in a manner similar to Unix pipelines. A simple example that prints all go files under the current directory:

stream.Run(
	stream.Find("."),
	stream.Grep(`\.go$`),
	stream.WriteLines(os.Stdout),
)

stream.Run is passed a list of filters that are chained together (stream.Find, stream.Grep, stream.WriteLines are filters). Each filter takes as input a sequence of strings and produces a sequence of strings. The empty sequence is passed as input to the first filter. The output of one filter is fed as input to the next filter.

stream.Run is just one way to execute filters. Others are stream.Contents (returns the output of the last filter as a []string), and stream.ForEach (executes a supplied function for every output item).

Error handling

Filter execution can result in errors. These are returned from stream functions normally. For example, the following call will return a non-nil error.

err := stream.Run(
	stream.Items("hello", "world"),
	stream.Grep("["), // Invalid regular expression
	stream.WriteLines(os.Stdout),
)
// err will be non-nil

User defined filters

Each filter takes as input a sequence of strings (read from a channel) and produces as output a sequence of strings (written to a channel). The stream package provides a bunch of useful filters. Applications can define their own filters easily. For example, here is a filter that repeats every input n times:

func Repeat(n int) stream.FilterFunc {
	return func(arg stream.Arg) error {
		for s := range arg.In {
			for i := 0; i < n; i++ {
				arg.Out <- s
			}
		}
		return nil
	}
}

stream.Run(
	stream.Items("hello", "world"),
	Repeat(2),
	stream.WriteLines(os.Stdout),
)

The output will be:

hello
hello
world
world

Note that Repeat returns a FilterFunc, a function type that implements the Filter interface. This is a common implementation pattern: many simple filters can be expressed as a single function of type FilterFunc.

Tunable Filters

FilterFunc is an appropriate type to use for most filters like Repeat above. However for some filters, dynamic customization is appropriate. Such filters provide their own implementation of the Filter interface with extra methods. For example, stream.Sort provides extra methods that can be used to control how items are sorted:

stream.Run(
	stream.Command("ls", "-l"),
	stream.Sort().Num(5),  // Sort numerically by size (column 5)
	stream.WriteLines(os.Stdout),
)

Acknowledgments

The interface of this package is inspired by the http://labix.org/pipe package. Users may wish to consider that package in case it fits their needs better.

Index

Examples

Package Files

command.go filters.go find.go firstlast.go io.go parallel.go regexp.go sample.go sort.go stream.go xargs.go

func Contents Uses

func Contents(filters ...Filter) ([]string, error)

Contents returns a slice that contains all items that are the output of filters.

Code:

out, err := stream.Contents(stream.Numbers(1, 3))
fmt.Println(out, err)

Output:

[1 2 3] <nil>

func ForEach Uses

func ForEach(filter Filter, fn func(s string)) error

ForEach calls fn(s) for every item s in the output of filter and returns either nil, or any error reported by the execution of the filter.

Code:

err := stream.ForEach(stream.Numbers(1, 5), func(s string) {
    fmt.Print(s)
})
if err != nil {
    panic(err)
}

Output:

12345

func Run Uses

func Run(filters ...Filter) error

Run executes the sequence of filters and discards all output. It returns either nil, an error if any filter reported an error.

Code:

err := stream.Run(
    stream.Items("line 1", "line 2"),
    stream.WriteLines(os.Stdout),
)
fmt.Println("error:", err)

Output:

line 1
line 2
error: <nil>

type Arg Uses

type Arg struct {
    In  <-chan string
    Out chan<- string
    // contains filtered or unexported fields
}

Arg contains the data passed to Filter.Run. Arg.In is a channel that produces the input to the filter, and Arg.Out is a channel that receives the output from the filter.

type Filter Uses

type Filter interface {
    // RunFilter reads a sequence of items from Arg.In and produces a
    // sequence of items on Arg.Out.  RunFilter returns nil on success,
    // an error otherwise.  RunFilter must *not* close the Arg.Out
    // channel.
    RunFilter(Arg) error
}

The Filter interface represents a process that takes as input a sequence of strings from a channel and produces a sequence on another channel.

func Cat Uses

func Cat(filenames ...string) Filter

Cat emits each line from each named file in order. If no arguments are specified, Cat copies its input to its output.

Code:

stream.Run(
    stream.Cat("stream_test.go"),
    stream.Grep("^func ExampleCat"),
    stream.WriteLines(os.Stdout),
)

Output:

func ExampleCat() {

func Columns Uses

func Columns(columns ...int) Filter

Columns splits each item into columns and yields the concatenation (separated by spaces) of the columns numbers passed as arguments. Columns are numbered starting at 1. If a column number is bigger than the number of columns in an item, it is skipped.

Code:

stream.Run(
    stream.Items("hello world"),
    stream.Columns(2, 3, 1),
    stream.WriteLines(os.Stdout),
)

Output:

world hello

func Command Uses

func Command(command string, args ...string) Filter

Command executes "command args...".

The filter's input items are fed as standard input to the command, one line per input item. The standard output of the command is split into lines and the lines form the output of the filter (with trailing newlines removed).

Code:

stream.Run(
    stream.Numbers(1, 100),
    stream.Command("wc", "-l"),
    stream.WriteLines(os.Stdout),
)

Output:

100

Code:

stream.Run(
    stream.Command("find", ".", "-type", "f", "-print"),
    stream.Grep(`^\./stream.*\.go$`),
    stream.Sort(),
    stream.WriteLines(os.Stdout),
)

Output:

./stream.go
./stream_test.go

Code:

err := stream.Run(stream.Command("no_such_command"))
if err == nil {
    fmt.Println("execution of missing command succeeded unexpectedly")
}

func DropFirst Uses

func DropFirst(n int) Filter

DropFirst yields all items except for the first n items that it receives.

Code:

stream.Run(
    stream.Numbers(1, 10),
    stream.DropFirst(8),
    stream.WriteLines(os.Stdout),
)

Output:

9
10

func DropLast Uses

func DropLast(n int) Filter

DropLast yields all items except for the last n items that it receives.

Code:

stream.Run(
    stream.Numbers(1, 10),
    stream.DropLast(3),
    stream.WriteLines(os.Stdout),
)

Output:

1
2
3
4
5
6
7

func First Uses

func First(n int) Filter

First yields the first n items that it receives.

Code:

stream.Run(
    stream.Numbers(1, 10),
    stream.First(3),
    stream.WriteLines(os.Stdout),
)

Output:

1
2
3

func Grep Uses

func Grep(r string) Filter

Grep emits every input x that matches the regular expression r.

Code:

stream.Run(
    stream.Numbers(1, 12),
    stream.Grep(".."),
    stream.WriteLines(os.Stdout),
)

Output:

10
11
12

func GrepNot Uses

func GrepNot(r string) Filter

GrepNot emits every input x that does not match the regular expression r.

Code:

stream.Run(
    stream.Numbers(1, 12),
    stream.GrepNot("^.$"),
    stream.WriteLines(os.Stdout),
)

Output:

10
11
12

func If Uses

func If(fn func(string) bool) Filter

If emits every input x for which fn(x) is true.

Code:

stream.Run(
    stream.Numbers(1, 12),
    stream.If(func(s string) bool { return len(s) > 1 }),
    stream.WriteLines(os.Stdout),
)

Output:

10
11
12

func Items Uses

func Items(items ...string) Filter

Items emits items.

Code:

stream.Run(
    stream.Items("hello", "world"),
    stream.WriteLines(os.Stdout),
)

Output:

hello
world

func Last Uses

func Last(n int) Filter

Last yields the last n items that it receives.

Code:

stream.Run(
    stream.Numbers(1, 10),
    stream.Last(2),
    stream.WriteLines(os.Stdout),
)

Output:

9
10

func Map Uses

func Map(fn func(string) string) Filter

Map calls fn(x) for every item x and yields the outputs of the fn calls.

Code:

stream.Run(
    stream.Items("hello", "there", "how", "are", "you?"),
    stream.Map(func(s string) string {
        return fmt.Sprintf("%d %s", len(s), s)
    }),
    stream.WriteLines(os.Stdout),
)

Output:

5 hello
5 there
3 how
3 are
4 you?

func NumberLines Uses

func NumberLines() Filter

NumberLines prefixes its item with its index in the input sequence (starting at 1) followed by a space.

Code:

stream.Run(
    stream.Items("a", "b"),
    stream.NumberLines(),
    stream.WriteLines(os.Stdout),
)

Output:

    1 a
    2 b

func Numbers Uses

func Numbers(x, y int) Filter

Numbers emits the integers x..y

Code:

stream.Run(
    stream.Numbers(2, 5),
    stream.WriteLines(os.Stdout),
)

Output:

2
3
4
5

func Parallel Uses

func Parallel(n int, f Filter) Filter

Parallel returns a Filter that runs n copies of f. The input to the Parallel Filter is divided up amongst the n copies. The output of the n copies is merged (in an unspecified order) and forms the output of the Parallel filter.

Code:

stream.Run(
    stream.Items("hello", "there", "how", "are", "you?"),
    stream.Parallel(4,
        stream.Map(func(s string) string {
            return fmt.Sprintf("%d %s", len(s), s)
        }),
    ),
    stream.Sort(),
    stream.WriteLines(os.Stdout),
)

Output:

3 are
3 how
4 you?
5 hello
5 there

func ReadLines Uses

func ReadLines(reader io.Reader) Filter

ReadLines emits each line found in reader.

Code:

stream.Run(
    stream.ReadLines(bytes.NewBufferString("the\nquick\nbrown\nfox\n")),
    stream.Sort(),
    stream.WriteLines(os.Stdout),
)

Output:

brown
fox
quick
the

func Repeat Uses

func Repeat(s string, n int) Filter

Repeat emits n copies of s.

func Reverse Uses

func Reverse() Filter

Reverse yields items in the reverse of the order it received them.

Code:

stream.Run(
    stream.Items("a", "b"),
    stream.Reverse(),
    stream.WriteLines(os.Stdout),
)

Output:

b
a

func Sample Uses

func Sample(n int) Filter

Sample picks n pseudo-randomly chosen input items. Different executions of a Sample filter will chose different items.

Code:

stream.Run(
    stream.Numbers(100, 200),
    stream.Sample(4),
    stream.WriteLines(os.Stdout),
)
// Output not checked since it is non-deterministic.

func SampleWithSeed Uses

func SampleWithSeed(n int, seed int64) Filter

SampleWithSeed picks n pseudo-randomly chosen input items. It uses seed as the argument for its random number generation and therefore different executions of SampleWithSeed with the same arguments will chose the same items.

Code:

stream.Run(
    stream.Numbers(1, 100),
    stream.SampleWithSeed(2, 100),
    stream.Sort().Num(1),
    stream.WriteLines(os.Stdout),
)

Output:

11
46

func Sequence Uses

func Sequence(filters ...Filter) Filter

Sequence returns a filter that is the concatenation of all filter arguments. The output of a filter is fed as input to the next filter.

Code:

stream.ForEach(stream.Sequence(
    stream.Numbers(1, 25),
    stream.Grep("3"),
), func(s string) { fmt.Println(s) })

Output:

3
13
23

func Substitute Uses

func Substitute(r, replacement string) Filter

Substitute replaces all occurrences of the regular expression r in an input item with replacement. The replacement string can contain $1, $2, etc. which represent submatches of r.

Code:

stream.Run(
    stream.Numbers(1, 5),
    stream.Substitute("(3)", "$1$1"),
    stream.WriteLines(os.Stdout),
)

Output:

1
2
33
4
5

func Uniq Uses

func Uniq() Filter

Uniq squashes adjacent identical items in arg.In into a single output.

Code:

stream.Run(
    stream.Items("a", "b", "b", "c", "b"),
    stream.Uniq(),
    stream.WriteLines(os.Stdout),
)

Output:

a
b
c
b

func UniqWithCount Uses

func UniqWithCount() Filter

UniqWithCount squashes adjacent identical items in arg.In into a single output prefixed with the count of identical items followed by a space.

Code:

stream.Run(
    stream.Items("a", "b", "b", "c"),
    stream.UniqWithCount(),
    stream.WriteLines(os.Stdout),
)

Output:

1 a
2 b
1 c

func WriteLines Uses

func WriteLines(writer io.Writer) Filter

WriteLines prints each input item s followed by a newline to writer; and in addition it emits s. Therefore WriteLines() can be used like the "tee" command, which can often be useful for debugging.

Code:

stream.Run(
    stream.Numbers(1, 3),
    stream.WriteLines(os.Stdout),
)

Output:

1
2
3

type FilterFunc Uses

type FilterFunc func(Arg) error

FilterFunc is an adapter type that allows the use of ordinary functions as Filters. If f is a function with the appropriate signature, FilterFunc(f) is a Filter that calls f.

func (FilterFunc) RunFilter Uses

func (f FilterFunc) RunFilter(arg Arg) error

RunFilter calls this function. It implements the Filter interface.

type FindFilter Uses

type FindFilter struct {
    // contains filtered or unexported fields
}

FindFilter is a filter that produces matching nodes under a filesystem directory.

func Find Uses

func Find(dir string) *FindFilter

Find returns a filter that produces matching nodes under a filesystem directory. The items yielded by the filter will be prefixed by dir. E.g., if dir contains subdir/file, the filter will yield dir/subdir/file. By default, the filter matches all types of files (regular files, directories, symbolic links, etc.). This behavior can be adjusted by calling FindFilter methods before executing the filter.

Code:

stream.Run(
    stream.Find(".").IfMode(os.FileMode.IsRegular),
    stream.Grep("stream"),
    stream.WriteLines(os.Stdout),
)

Output:

stream.go
stream_test.go

Code:

err := stream.Run(stream.Find("/no_such_dir"))
if err == nil {
    fmt.Println("stream.Find did not return expected error")
}

func (*FindFilter) IfMode Uses

func (f *FindFilter) IfMode(fn func(os.FileMode) bool) *FindFilter

IfMode adjusts f so it only matches nodes for which fn(mode) returns true.

func (*FindFilter) RunFilter Uses

func (f *FindFilter) RunFilter(arg Arg) error

RunFilter yields contents of a filesystem tree. It implements the Filter interface.

func (*FindFilter) SkipDirIf Uses

func (f *FindFilter) SkipDirIf(fn func(d string) bool) *FindFilter

SkipDirIf adjusts f so that if fn(d) returns true for a directory d, d and all of d's descendents are skipped.

Code:

stream.Run(
    stream.Find(".").SkipDirIf(func(d string) bool { return d == ".git" }),
    stream.Grep("x"),
    stream.WriteLines(os.Stdout),
)

Output:

regexp.go
xargs.go

type SortFilter Uses

type SortFilter struct {
    // contains filtered or unexported fields
}

SortFilter is a Filter that sorts its input items by a sequence of sort keys. If two items compare equal by all specified sort keys (this always happens if no sort keys are specified), the items are compared lexicographically.

func Sort Uses

func Sort() *SortFilter

Sort returns a filter that sorts its input items. By default, the filter sorts items lexicographically. However this can be adjusted by calling methods like Num, Text that add sorting keys to the filter.

Code:

stream.Run(
    stream.Items("banana", "apple", "cheese", "apple"),
    stream.Sort(),
    stream.WriteLines(os.Stdout),
)

Output:

apple
apple
banana
cheese

Code:

// Sort numerically by column 1. Break ties by sorting
// lexicographically by column 2.
stream.Run(
    stream.Items(
        "1970 march",
        "1970 feb",
        "1950 june",
        "1980 sep",
    ),
    stream.Sort().Num(1).Text(2),
    stream.WriteLines(os.Stdout),
)

Output:

1950 june
1970 feb
1970 march
1980 sep

func (*SortFilter) By Uses

func (s *SortFilter) By(less func(a, b string) bool) *SortFilter

By adds a sort key to sort by the output of the specified less function.

func (*SortFilter) Num Uses

func (s *SortFilter) Num(n int) *SortFilter

Num sets the next sort key to sort by column n in numeric order. Column 0 means the entire string. Items that do not have column n sort to the front. Items whose column n is not a number sort to the end.

func (*SortFilter) NumDecreasing Uses

func (s *SortFilter) NumDecreasing(n int) *SortFilter

NumDecreasing sets the next sort key to sort by column n in reverse numeric order. Column 0 means the entire string. Items that do not have column n sort to the end. Items whose column n is not a number sort to the front.

func (*SortFilter) RunFilter Uses

func (s *SortFilter) RunFilter(arg Arg) error

RunFilter sorts items by the specified sorting keys. It implements the Filter interface.

func (*SortFilter) Text Uses

func (s *SortFilter) Text(n int) *SortFilter

Text sets the next sort key to sort by column n in lexicographic order. Column 0 means the entire string. Items that do not have column n sort to the front.

func (*SortFilter) TextDecreasing Uses

func (s *SortFilter) TextDecreasing(n int) *SortFilter

TextDecreasing sets the next sort key to sort by column n in reverse lexicographic order. Column 0 means the entire string. Items that do not have column n sort to the end.

type XargsFilter Uses

type XargsFilter struct {
    // contains filtered or unexported fields
}

XargsFilter is a Filter that applies a command to every input item.

func Xargs Uses

func Xargs(command string, args ...string) *XargsFilter

Xargs returns a filter that executes "command args... items..." where items are the input to the filter. The handling of items may be split across multiple executions of command (typically to meet command line length restrictions). The standard output of the execution(s) is split into lines and the lines form the output of the filter (with trailing newlines removed).

Code:

stream.Run(
    stream.Numbers(1, 5),
    stream.Xargs("echo"),
    stream.WriteLines(os.Stdout),
)

Output:

1 2 3 4 5

Code:

// Xargs should split the long list of arguments into
// three executions to keep command length below 4096.
stream.Run(
    stream.Numbers(1, 2000),
    stream.Xargs("echo"),
    stream.Command("wc", "-l"),
    stream.WriteLines(os.Stdout),
)

Output:

3

func (*XargsFilter) LimitArgs Uses

func (x *XargsFilter) LimitArgs(n int) *XargsFilter

LimitArgs adjusts x so that no more than n input items are passed to a single command execution. If not called, an unspecified number of input items may be handled via a single command execution.

Code:

stream.Run(
    stream.Numbers(1, 5),
    stream.Xargs("echo").LimitArgs(2),
    stream.WriteLines(os.Stdout),
)

Output:

1 2
3 4
5

func (*XargsFilter) RunFilter Uses

func (x *XargsFilter) RunFilter(arg Arg) error

RunFilter implements the Filter interface: it reads a sequence of items from arg.In and passes them as arguments to "command args...".

Package stream imports 13 packages (graph) and is imported by 2 packages. Updated 2018-12-04. Refresh now. Tools for package owners.