stream

package module
v0.0.0-...-696b145 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 20, 2017 License: Apache-2.0 Imports: 13 Imported by: 67

README

Stream package

Package stream provides filters that can be chained together in a manner similar to Unix pipelines. A simple example that prints all go files under the current directory:

stream.Run(
	stream.Find("."),
	stream.Grep(`\.go$`),
	stream.WriteLines(os.Stdout),
)

Installation

go get github.com/ghemawat/stream

See godoc for further documentation and examples.

Documentation

Overview

Package stream provides filters that can be chained together in a manner similar to Unix pipelines. A simple example that prints all go files under the current directory:

stream.Run(
	stream.Find("."),
	stream.Grep(`\.go$`),
	stream.WriteLines(os.Stdout),
)

stream.Run is passed a list of filters that are chained together (stream.Find, stream.Grep, stream.WriteLines are filters). Each filter takes as input a sequence of strings and produces a sequence of strings. The empty sequence is passed as input to the first filter. The output of one filter is fed as input to the next filter.

stream.Run is just one way to execute filters. Others are stream.Contents (returns the output of the last filter as a []string), and stream.ForEach (executes a supplied function for every output item).

Error handling

Filter execution can result in errors. These are returned from stream functions normally. For example, the following call will return a non-nil error.

err := stream.Run(
	stream.Items("hello", "world"),
	stream.Grep("["), // Invalid regular expression
	stream.WriteLines(os.Stdout),
)
// err will be non-nil

User defined filters

Each filter takes as input a sequence of strings (read from a channel) and produces as output a sequence of strings (written to a channel). The stream package provides a bunch of useful filters. Applications can define their own filters easily. For example, here is a filter that repeats every input n times:

func Repeat(n int) stream.FilterFunc {
	return func(arg stream.Arg) error {
		for s := range arg.In {
			for i := 0; i < n; i++ {
				arg.Out <- s
			}
		}
		return nil
	}
}

stream.Run(
	stream.Items("hello", "world"),
	Repeat(2),
	stream.WriteLines(os.Stdout),
)

The output will be:

hello
hello
world
world

Note that Repeat returns a FilterFunc, a function type that implements the Filter interface. This is a common implementation pattern: many simple filters can be expressed as a single function of type FilterFunc.

Tunable Filters

FilterFunc is an appropriate type to use for most filters like Repeat above. However for some filters, dynamic customization is appropriate. Such filters provide their own implementation of the Filter interface with extra methods. For example, stream.Sort provides extra methods that can be used to control how items are sorted:

stream.Run(
	stream.Command("ls", "-l"),
	stream.Sort().Num(5),  // Sort numerically by size (column 5)
	stream.WriteLines(os.Stdout),
)

Acknowledgments

The interface of this package is inspired by the http://labix.org/pipe package. Users may wish to consider that package in case it fits their needs better.

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func Contents

func Contents(filters ...Filter) ([]string, error)

Contents returns a slice that contains all items that are the output of filters.

Example
package main

import (
	"github.com/ghemawat/stream"

	"fmt"
)

func main() {
	out, err := stream.Contents(stream.Numbers(1, 3))
	fmt.Println(out, err)
}
Output:

[1 2 3] <nil>

func ForEach

func ForEach(filter Filter, fn func(s string)) error

ForEach calls fn(s) for every item s in the output of filter and returns either nil, or any error reported by the execution of the filter.

Example
package main

import (
	"github.com/ghemawat/stream"

	"fmt"
)

func main() {
	err := stream.ForEach(stream.Numbers(1, 5), func(s string) {
		fmt.Print(s)
	})
	if err != nil {
		panic(err)
	}
}
Output:

12345

func Run

func Run(filters ...Filter) error

Run executes the sequence of filters and discards all output. It returns either nil, an error if any filter reported an error.

Example
package main

import (
	"github.com/ghemawat/stream"

	"fmt"
	"os"
)

func main() {
	err := stream.Run(
		stream.Items("line 1", "line 2"),
		stream.WriteLines(os.Stdout),
	)
	fmt.Println("error:", err)
}
Output:

line 1
line 2
error: <nil>

Types

type Arg

type Arg struct {
	In  <-chan string
	Out chan<- string
	// contains filtered or unexported fields
}

Arg contains the data passed to Filter.Run. Arg.In is a channel that produces the input to the filter, and Arg.Out is a channel that receives the output from the filter.

type Filter

type Filter interface {
	// RunFilter reads a sequence of items from Arg.In and produces a
	// sequence of items on Arg.Out.  RunFilter returns nil on success,
	// an error otherwise.  RunFilter must *not* close the Arg.Out
	// channel.
	RunFilter(Arg) error
}

The Filter interface represents a process that takes as input a sequence of strings from a channel and produces a sequence on another channel.

func Cat

func Cat(filenames ...string) Filter

Cat emits each line from each named file in order. If no arguments are specified, Cat copies its input to its output.

Example
package main

import (
	"github.com/ghemawat/stream"

	"os"
)

func main() {
	stream.Run(
		stream.Cat("stream_test.go"),
		stream.Grep("^func ExampleCat"),
		stream.WriteLines(os.Stdout),
	)
}
Output:

func ExampleCat() {

func Columns

func Columns(columns ...int) Filter

Columns splits each item into columns and yields the concatenation (separated by spaces) of the columns numbers passed as arguments. Columns are numbered starting at 1. If a column number is bigger than the number of columns in an item, it is skipped.

Example
package main

import (
	"github.com/ghemawat/stream"

	"os"
)

func main() {
	stream.Run(
		stream.Items("hello world"),
		stream.Columns(2, 3, 1),
		stream.WriteLines(os.Stdout),
	)
}
Output:

world hello

func Command

func Command(command string, args ...string) Filter

Command executes "command args...".

The filter's input items are fed as standard input to the command, one line per input item. The standard output of the command is split into lines and the lines form the output of the filter (with trailing newlines removed).

Example
package main

import (
	"github.com/ghemawat/stream"

	"os"
)

func main() {
	stream.Run(
		stream.Numbers(1, 100),
		stream.Command("wc", "-l"),
		stream.WriteLines(os.Stdout),
	)
}
Output:

100
Example (OutputOnly)
package main

import (
	"github.com/ghemawat/stream"

	"os"
)

func main() {
	stream.Run(
		stream.Command("find", ".", "-type", "f", "-print"),
		stream.Grep(`^\./stream.*\.go$`),
		stream.Sort(),
		stream.WriteLines(os.Stdout),
	)

}
Output:

./stream.go
./stream_test.go
Example (WithError)
package main

import (
	"github.com/ghemawat/stream"

	"fmt"
)

func main() {
	err := stream.Run(stream.Command("no_such_command"))
	if err == nil {
		fmt.Println("execution of missing command succeeded unexpectedly")
	}
}
Output:

func DropFirst

func DropFirst(n int) Filter

DropFirst yields all items except for the first n items that it receives.

Example
package main

import (
	"github.com/ghemawat/stream"

	"os"
)

func main() {
	stream.Run(
		stream.Numbers(1, 10),
		stream.DropFirst(8),
		stream.WriteLines(os.Stdout),
	)
}
Output:

9
10

func DropLast

func DropLast(n int) Filter

DropLast yields all items except for the last n items that it receives.

Example
package main

import (
	"github.com/ghemawat/stream"

	"os"
)

func main() {
	stream.Run(
		stream.Numbers(1, 10),
		stream.DropLast(3),
		stream.WriteLines(os.Stdout),
	)
}
Output:

1
2
3
4
5
6
7

func First

func First(n int) Filter

First yields the first n items that it receives.

Example
package main

import (
	"github.com/ghemawat/stream"

	"os"
)

func main() {
	stream.Run(
		stream.Numbers(1, 10),
		stream.First(3),
		stream.WriteLines(os.Stdout),
	)
}
Output:

1
2
3

func Grep

func Grep(r string) Filter

Grep emits every input x that matches the regular expression r.

Example
package main

import (
	"github.com/ghemawat/stream"

	"os"
)

func main() {
	stream.Run(
		stream.Numbers(1, 12),
		stream.Grep(".."),
		stream.WriteLines(os.Stdout),
	)
}
Output:

10
11
12

func GrepNot

func GrepNot(r string) Filter

GrepNot emits every input x that does not match the regular expression r.

Example
package main

import (
	"github.com/ghemawat/stream"

	"os"
)

func main() {
	stream.Run(
		stream.Numbers(1, 12),
		stream.GrepNot("^.$"),
		stream.WriteLines(os.Stdout),
	)
}
Output:

10
11
12

func If

func If(fn func(string) bool) Filter

If emits every input x for which fn(x) is true.

Example
package main

import (
	"github.com/ghemawat/stream"

	"os"
)

func main() {
	stream.Run(
		stream.Numbers(1, 12),
		stream.If(func(s string) bool { return len(s) > 1 }),
		stream.WriteLines(os.Stdout),
	)
}
Output:

10
11
12

func Items

func Items(items ...string) Filter

Items emits items.

Example
package main

import (
	"github.com/ghemawat/stream"

	"os"
)

func main() {
	stream.Run(
		stream.Items("hello", "world"),
		stream.WriteLines(os.Stdout),
	)
}
Output:

hello
world

func Last

func Last(n int) Filter

Last yields the last n items that it receives.

Example
package main

import (
	"github.com/ghemawat/stream"

	"os"
)

func main() {
	stream.Run(
		stream.Numbers(1, 10),
		stream.Last(2),
		stream.WriteLines(os.Stdout),
	)
}
Output:

9
10

func Map

func Map(fn func(string) string) Filter

Map calls fn(x) for every item x and yields the outputs of the fn calls.

Example
package main

import (
	"github.com/ghemawat/stream"

	"fmt"
	"os"
)

func main() {
	stream.Run(
		stream.Items("hello", "there", "how", "are", "you?"),
		stream.Map(func(s string) string {
			return fmt.Sprintf("%d %s", len(s), s)
		}),
		stream.WriteLines(os.Stdout),
	)
}
Output:

5 hello
5 there
3 how
3 are
4 you?

func NumberLines

func NumberLines() Filter

NumberLines prefixes its item with its index in the input sequence (starting at 1) followed by a space.

Example
package main

import (
	"github.com/ghemawat/stream"

	"os"
)

func main() {
	stream.Run(
		stream.Items("a", "b"),
		stream.NumberLines(),
		stream.WriteLines(os.Stdout),
	)
}
Output:

    1 a
    2 b

func Numbers

func Numbers(x, y int) Filter

Numbers emits the integers x..y

Example
package main

import (
	"github.com/ghemawat/stream"

	"os"
)

func main() {
	stream.Run(
		stream.Numbers(2, 5),
		stream.WriteLines(os.Stdout),
	)
}
Output:

2
3
4
5

func Parallel

func Parallel(n int, f Filter) Filter

Parallel returns a Filter that runs n copies of f. The input to the Parallel Filter is divided up amongst the n copies. The output of the n copies is merged (in an unspecified order) and forms the output of the Parallel filter.

Example
package main

import (
	"github.com/ghemawat/stream"

	"fmt"
	"os"
)

func main() {
	stream.Run(
		stream.Items("hello", "there", "how", "are", "you?"),
		stream.Parallel(4,
			stream.Map(func(s string) string {
				return fmt.Sprintf("%d %s", len(s), s)
			}),
		),
		stream.Sort(),
		stream.WriteLines(os.Stdout),
	)
}
Output:

3 are
3 how
4 you?
5 hello
5 there

func ReadLines

func ReadLines(reader io.Reader) Filter

ReadLines emits each line found in reader.

Example
package main

import (
	"github.com/ghemawat/stream"

	"bytes"
	"os"
)

func main() {
	stream.Run(
		stream.ReadLines(bytes.NewBufferString("the\nquick\nbrown\nfox\n")),
		stream.Sort(),
		stream.WriteLines(os.Stdout),
	)
}
Output:

brown
fox
quick
the

func Repeat

func Repeat(s string, n int) Filter

Repeat emits n copies of s.

func Reverse

func Reverse() Filter

Reverse yields items in the reverse of the order it received them.

Example
package main

import (
	"github.com/ghemawat/stream"

	"os"
)

func main() {
	stream.Run(
		stream.Items("a", "b"),
		stream.Reverse(),
		stream.WriteLines(os.Stdout),
	)
}
Output:

b
a

func Sample

func Sample(n int) Filter

Sample picks n pseudo-randomly chosen input items. Different executions of a Sample filter will chose different items.

Example
package main

import (
	"github.com/ghemawat/stream"

	"os"
)

func main() {
	stream.Run(
		stream.Numbers(100, 200),
		stream.Sample(4),
		stream.WriteLines(os.Stdout),
	)
	// Output not checked since it is non-deterministic.
}
Output:

func SampleWithSeed

func SampleWithSeed(n int, seed int64) Filter

SampleWithSeed picks n pseudo-randomly chosen input items. It uses seed as the argument for its random number generation and therefore different executions of SampleWithSeed with the same arguments will chose the same items.

Example
package main

import (
	"github.com/ghemawat/stream"

	"os"
)

func main() {
	stream.Run(
		stream.Numbers(1, 100),
		stream.SampleWithSeed(2, 100),
		stream.Sort().Num(1),
		stream.WriteLines(os.Stdout),
	)
}
Output:

11
46

func Sequence

func Sequence(filters ...Filter) Filter

Sequence returns a filter that is the concatenation of all filter arguments. The output of a filter is fed as input to the next filter.

Example
package main

import (
	"github.com/ghemawat/stream"

	"fmt"
)

func main() {
	stream.ForEach(stream.Sequence(
		stream.Numbers(1, 25),
		stream.Grep("3"),
	), func(s string) { fmt.Println(s) })
}
Output:

3
13
23

func Substitute

func Substitute(r, replacement string) Filter

Substitute replaces all occurrences of the regular expression r in an input item with replacement. The replacement string can contain $1, $2, etc. which represent submatches of r.

Example
package main

import (
	"github.com/ghemawat/stream"

	"os"
)

func main() {
	stream.Run(
		stream.Numbers(1, 5),
		stream.Substitute("(3)", "$1$1"),
		stream.WriteLines(os.Stdout),
	)
}
Output:

1
2
33
4
5

func Uniq

func Uniq() Filter

Uniq squashes adjacent identical items in arg.In into a single output.

Example
package main

import (
	"github.com/ghemawat/stream"

	"os"
)

func main() {
	stream.Run(
		stream.Items("a", "b", "b", "c", "b"),
		stream.Uniq(),
		stream.WriteLines(os.Stdout),
	)
}
Output:

a
b
c
b

func UniqWithCount

func UniqWithCount() Filter

UniqWithCount squashes adjacent identical items in arg.In into a single output prefixed with the count of identical items followed by a space.

Example
package main

import (
	"github.com/ghemawat/stream"

	"os"
)

func main() {
	stream.Run(
		stream.Items("a", "b", "b", "c"),
		stream.UniqWithCount(),
		stream.WriteLines(os.Stdout),
	)
}
Output:

1 a
2 b
1 c

func WriteLines

func WriteLines(writer io.Writer) Filter

WriteLines prints each input item s followed by a newline to writer; and in addition it emits s. Therefore WriteLines() can be used like the "tee" command, which can often be useful for debugging.

Example
package main

import (
	"github.com/ghemawat/stream"

	"os"
)

func main() {
	stream.Run(
		stream.Numbers(1, 3),
		stream.WriteLines(os.Stdout),
	)
}
Output:

1
2
3

type FilterFunc

type FilterFunc func(Arg) error

FilterFunc is an adapter type that allows the use of ordinary functions as Filters. If f is a function with the appropriate signature, FilterFunc(f) is a Filter that calls f.

func (FilterFunc) RunFilter

func (f FilterFunc) RunFilter(arg Arg) error

RunFilter calls this function. It implements the Filter interface.

type FindFilter

type FindFilter struct {
	// contains filtered or unexported fields
}

FindFilter is a filter that produces matching nodes under a filesystem directory.

func Find

func Find(dir string) *FindFilter

Find returns a filter that produces matching nodes under a filesystem directory. The items yielded by the filter will be prefixed by dir. E.g., if dir contains subdir/file, the filter will yield dir/subdir/file. By default, the filter matches all types of files (regular files, directories, symbolic links, etc.). This behavior can be adjusted by calling FindFilter methods before executing the filter.

Example
package main

import (
	"github.com/ghemawat/stream"

	"os"
)

func main() {
	stream.Run(
		stream.Find(".").IfMode(os.FileMode.IsRegular),
		stream.Grep("stream"),
		stream.WriteLines(os.Stdout),
	)
}
Output:

stream.go
stream_test.go
Example (Error)
package main

import (
	"github.com/ghemawat/stream"

	"fmt"
)

func main() {
	err := stream.Run(stream.Find("/no_such_dir"))
	if err == nil {
		fmt.Println("stream.Find did not return expected error")
	}
}
Output:

func (*FindFilter) IfMode

func (f *FindFilter) IfMode(fn func(os.FileMode) bool) *FindFilter

IfMode adjusts f so it only matches nodes for which fn(mode) returns true.

func (*FindFilter) RunFilter

func (f *FindFilter) RunFilter(arg Arg) error

RunFilter yields contents of a filesystem tree. It implements the Filter interface.

func (*FindFilter) SkipDirIf

func (f *FindFilter) SkipDirIf(fn func(d string) bool) *FindFilter

SkipDirIf adjusts f so that if fn(d) returns true for a directory d, d and all of d's descendents are skipped.

Example
package main

import (
	"github.com/ghemawat/stream"

	"os"
)

func main() {
	stream.Run(
		stream.Find(".").SkipDirIf(func(d string) bool { return d == ".git" }),
		stream.Grep("x"),
		stream.WriteLines(os.Stdout),
	)
}
Output:

regexp.go
xargs.go

type SortFilter

type SortFilter struct {
	// contains filtered or unexported fields
}

SortFilter is a Filter that sorts its input items by a sequence of sort keys. If two items compare equal by all specified sort keys (this always happens if no sort keys are specified), the items are compared lexicographically.

func Sort

func Sort() *SortFilter

Sort returns a filter that sorts its input items. By default, the filter sorts items lexicographically. However this can be adjusted by calling methods like Num, Text that add sorting keys to the filter.

Example
package main

import (
	"github.com/ghemawat/stream"

	"os"
)

func main() {
	stream.Run(
		stream.Items("banana", "apple", "cheese", "apple"),
		stream.Sort(),
		stream.WriteLines(os.Stdout),
	)
}
Output:

apple
apple
banana
cheese
Example (MultipleColumns)
package main

import (
	"github.com/ghemawat/stream"

	"os"
)

func main() {
	// Sort numerically by column 1. Break ties by sorting
	// lexicographically by column 2.
	stream.Run(
		stream.Items(
			"1970 march",
			"1970 feb",
			"1950 june",
			"1980 sep",
		),
		stream.Sort().Num(1).Text(2),
		stream.WriteLines(os.Stdout),
	)
}
Output:

1950 june
1970 feb
1970 march
1980 sep

func (*SortFilter) By

func (s *SortFilter) By(less func(a, b string) bool) *SortFilter

By adds a sort key to sort by the output of the specified less function.

func (*SortFilter) Num

func (s *SortFilter) Num(n int) *SortFilter

Num sets the next sort key to sort by column n in numeric order. Column 0 means the entire string. Items that do not have column n sort to the front. Items whose column n is not a number sort to the end.

func (*SortFilter) NumDecreasing

func (s *SortFilter) NumDecreasing(n int) *SortFilter

NumDecreasing sets the next sort key to sort by column n in reverse numeric order. Column 0 means the entire string. Items that do not have column n sort to the end. Items whose column n is not a number sort to the front.

func (*SortFilter) RunFilter

func (s *SortFilter) RunFilter(arg Arg) error

RunFilter sorts items by the specified sorting keys. It implements the Filter interface.

func (*SortFilter) Text

func (s *SortFilter) Text(n int) *SortFilter

Text sets the next sort key to sort by column n in lexicographic order. Column 0 means the entire string. Items that do not have column n sort to the front.

func (*SortFilter) TextDecreasing

func (s *SortFilter) TextDecreasing(n int) *SortFilter

TextDecreasing sets the next sort key to sort by column n in reverse lexicographic order. Column 0 means the entire string. Items that do not have column n sort to the end.

type XargsFilter

type XargsFilter struct {
	// contains filtered or unexported fields
}

XargsFilter is a Filter that applies a command to every input item.

func Xargs

func Xargs(command string, args ...string) *XargsFilter

Xargs returns a filter that executes "command args... items..." where items are the input to the filter. The handling of items may be split across multiple executions of command (typically to meet command line length restrictions). The standard output of the execution(s) is split into lines and the lines form the output of the filter (with trailing newlines removed).

Example
package main

import (
	"github.com/ghemawat/stream"

	"os"
)

func main() {
	stream.Run(
		stream.Numbers(1, 5),
		stream.Xargs("echo"),
		stream.WriteLines(os.Stdout),
	)
}
Output:

1 2 3 4 5
Example (SplitArguments)
package main

import (
	"github.com/ghemawat/stream"

	"os"
)

func main() {
	// Xargs should split the long list of arguments into
	// three executions to keep command length below 4096.
	stream.Run(
		stream.Numbers(1, 2000),
		stream.Xargs("echo"),
		stream.Command("wc", "-l"),
		stream.WriteLines(os.Stdout),
	)
}
Output:

3

func (*XargsFilter) LimitArgs

func (x *XargsFilter) LimitArgs(n int) *XargsFilter

LimitArgs adjusts x so that no more than n input items are passed to a single command execution. If not called, an unspecified number of input items may be handled via a single command execution.

Example
package main

import (
	"github.com/ghemawat/stream"

	"os"
)

func main() {
	stream.Run(
		stream.Numbers(1, 5),
		stream.Xargs("echo").LimitArgs(2),
		stream.WriteLines(os.Stdout),
	)
}
Output:

1 2
3 4
5

func (*XargsFilter) RunFilter

func (x *XargsFilter) RunFilter(arg Arg) error

RunFilter implements the Filter interface: it reads a sequence of items from arg.In and passes them as arguments to "command args...".

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL