beam: github.com/apache/beam/sdks/go/pkg/beam Index | Examples | Files | Directories

package beam

import "github.com/apache/beam/sdks/go/pkg/beam"

Package beam is an implementation of the Apache Beam (https://beam.apache.org) programming model in Go. Beam provides a simple, powerful model for building both batch and streaming parallel data processing pipelines.

For more on the Beam model see: https://beam.apache.org/documentation/programming-guide

For design choices this implementation makes see: https://s.apache.org/beam-go-sdk-design-rfc

Code:

// In order to start creating the pipeline for execution, a Pipeline object is needed.
p := beam.NewPipeline()
s := p.Root()

// The pipeline object encapsulates all the data and steps in your processing task.
// It is the basis for creating the pipeline's data sets as PCollections and its operations
// as transforms.

// The PCollection abstraction represents a potentially distributed,
// multi-element data set. You can think of a PCollection as “pipeline” data;
// Beam transforms use PCollection objects as inputs and outputs. As such, if
// you want to work with data in your pipeline, it must be in the form of a
// PCollection.

// Transformations are applied in a scoped fashion to the pipeline. The scope
// can be obtained from the pipeline object.

// Start by reading text from an input files, and receiving a PCollection.
lines := textio.Read(s, "protocol://path/file*.txt")

// Transforms are added to the pipeline so they are part of the work to be
// executed.  Since this transform has no PCollection as an input, it is
// considered a 'root transform'

// A pipeline can have multiple root transforms
moreLines := textio.Read(s, "protocol://other/path/file*.txt")

// Further transforms can be applied, creating an arbitrary, acyclic graph.
// Subsequent transforms (and the intermediate PCollections they produce) are
// attached to the same pipeline.
all := beam.Flatten(s, lines, moreLines)
wordRegexp := regexp.MustCompile(`[a-zA-Z]+('[a-z])?`)
words := beam.ParDo(s, func(line string, emit func(string)) {
    for _, word := range wordRegexp.FindAllString(line, -1) {
        emit(word)
    }
}, all)
formatted := beam.ParDo(s, strings.ToUpper, words)
textio.Write(s, "protocol://output/path", formatted)

// Applying a transform adds it to the pipeline, rather than executing it
// immediately.  Once the whole pipeline of transforms is constructed, the
// pipeline can be executed by a PipelineRunner.  The direct runner executes the
// transforms directly, sequentially, in this one process, which is useful for
// unit tests and simple experiments:
if err := direct.Execute(context.Background(), p); err != nil {
    fmt.Printf("Pipeline failed: %v", err)
}

Code:

// Metrics can be declared outside DoFns, and used inside..
outside := beam.NewCounter("example.namespace", "count")

extractWordsDofn := func(ctx context.Context, line string, emit func(string)) {
    // They can be defined at time of use within a DoFn, if necessary.
    inside := beam.NewDistribution("example.namespace", "characters")
    for _, word := range wordRE.FindAllString(line, -1) {
        emit(word)
        outside.Inc(ctx, 1)
        inside.Update(ctx, int64(len(word)))
    }
}
ctx := ctxWithPtransformID("example")
extractWordsDofn(ctx, "this has six words in it", func(string) {})
extractWordsDofn(ctx, "this has seven words in it, see?", func(string) {})

dumpAndClearMetrics()

Output:

Bundle: "exampleBundle" - PTransformID: "example"
	example.namespace.characters - count: 13 sum: 43 min: 2 max: 5
	example.namespace.count - value: 13

Code:

// Metrics can be used in multiple DoFns
c := beam.NewCounter("example.reusable", "count")

extractWordsDofn := func(ctx context.Context, line string, emit func(string)) {
    for _, word := range wordRE.FindAllString(line, -1) {
        emit(word)
        c.Inc(ctx, 1)
    }
}

extractRunesDofn := func(ctx context.Context, line string, emit func(rune)) {
    for _, r := range line {
        emit(r)
        c.Inc(ctx, 1)
    }
}
extractWordsDofn(ctxWithPtransformID("extract1"), "this has six words in it", func(string) {})

extractRunesDofn(ctxWithPtransformID("extract2"), "seven thousand", func(rune) {})

dumpAndClearMetrics()

Output:

Bundle: "exampleBundle" - PTransformID: "extract1"
	example.reusable.count - value: 6
Bundle: "exampleBundle" - PTransformID: "extract2"
	example.reusable.count - value: 14

Index

Examples

Package Files

coder.go combine.go create.go doc.go encoding.go external.go flatten.go forward.go gbk.go impulse.go metrics.go option.go pardo.go partition.go pcollection.go pipeline.go runner.go util.go validate.go windowing.go

Variables

var (
    TType = typex.TType
    UType = typex.UType
    VType = typex.VType
    WType = typex.WType
    XType = typex.XType
    YType = typex.YType
    ZType = typex.ZType
)

These are the reflect.Type instances of the universal types, which are used when binding actual types to "generic" DoFns that use Universal Types.

var EventTimeType = typex.EventTimeType

EventTimeType is the reflect.Type of EventTime.

var PipelineOptions = runtime.GlobalOptions

PipelineOptions are global options for the active pipeline. Options can be defined any time before execution and are re-created by the harness on remote execution workers. Global options should be used sparingly.

func DecodeFunc Uses

func DecodeFunc(data string) (reflectx.Func, error)

DecodeFunc encodes a function as a string. The function symbol must be resolvable via the runtime.GlobalSymbolResolver. The parameter types must be encodable.

func DecodeType Uses

func DecodeType(data string) (reflect.Type, error)

DecodeType decodes a type. Unless registered, the decoded type is only guaranteed to the isomorphic to the input and with no methods.

func EncodeCoder Uses

func EncodeCoder(c Coder) (string, error)

EncodeCoder encodes a coder as a string. Any custom coder function symbol must be resolvable via the runtime.GlobalSymbolResolver. The types must be encodable.

func EncodeFunc Uses

func EncodeFunc(fn reflectx.Func) (string, error)

EncodeFunc encodes a function and parameter types as a string. The function symbol must be resolvable via the runtime.GlobalSymbolResolver. The types must be encodable.

func EncodeType Uses

func EncodeType(t reflect.Type) (string, error)

EncodeType encodes a type as a string. Unless registered, the decoded type is only guaranteed to the isomorphic to the input and with no methods.

func Init Uses

func Init()

Init is the hook that all user code must call after flags processing and other static initialization, for now.

func Initialized Uses

func Initialized() bool

Initialized exposes the initialization status for runners.

func JSONEnc Uses

func JSONEnc(in T) ([]byte, error)

JSONEnc encodes the supplied value in JSON.

func Must2 Uses

func Must2(a, b PCollection, err error) (PCollection, PCollection)

Must2 returns the input, but panics if err != nil.

func NewPipelineWithRoot Uses

func NewPipelineWithRoot() (*Pipeline, Scope)

NewPipelineWithRoot creates a new empty pipeline and its root scope.

func ParDo0 Uses

func ParDo0(s Scope, dofn interface{}, col PCollection, opts ...Option)

ParDo0 inserts a ParDo with zero output transform into the pipeline.

func ParDo2 Uses

func ParDo2(s Scope, dofn interface{}, col PCollection, opts ...Option) (PCollection, PCollection)

ParDo2 inserts a ParDo with 2 outputs into the pipeline.

func ParDo3 Uses

func ParDo3(s Scope, dofn interface{}, col PCollection, opts ...Option) (PCollection, PCollection, PCollection)

ParDo3 inserts a ParDo with 3 outputs into the pipeline.

func ParDo4 Uses

func ParDo4(s Scope, dofn interface{}, col PCollection, opts ...Option) (PCollection, PCollection, PCollection, PCollection)

ParDo4 inserts a ParDo with 4 outputs into the pipeline.

func ParDo5 Uses

func ParDo5(s Scope, dofn interface{}, col PCollection, opts ...Option) (PCollection, PCollection, PCollection, PCollection, PCollection)

ParDo5 inserts a ParDo with 5 outputs into the pipeline.

func ParDo6 Uses

func ParDo6(s Scope, dofn interface{}, col PCollection, opts ...Option) (PCollection, PCollection, PCollection, PCollection, PCollection, PCollection)

ParDo6 inserts a ParDo with 6 outputs into the pipeline.

func ParDo7 Uses

func ParDo7(s Scope, dofn interface{}, col PCollection, opts ...Option) (PCollection, PCollection, PCollection, PCollection, PCollection, PCollection, PCollection)

ParDo7 inserts a ParDo with 7 outputs into the pipeline.

func ProtoEnc Uses

func ProtoEnc(in T) ([]byte, error)

ProtoEnc marshals the supplied proto.Message.

func RegisterFunction Uses

func RegisterFunction(fn interface{})

RegisterFunction allows function registration. It is beneficial for performance and is needed for functions -- such as custom coders -- serialized during unit tests, where the underlying symbol table is not available. It should be called in init() only. Returns the external key for the function.

func RegisterInit Uses

func RegisterInit(hook func())

RegisterInit registers an Init hook. Hooks are expected to be able to figure out whether they apply on their own, notably if invoked in a remote execution environment. They are all executed regardless of the runner.

func RegisterRunner Uses

func RegisterRunner(name string, fn func(ctx context.Context, p *Pipeline) error)

RegisterRunner associates the name with the supplied runner, making it available to execute a pipeline via Run.

func RegisterType Uses

func RegisterType(t reflect.Type)

RegisterType inserts "external" types into a global type registry to bypass serialization and preserve full method information. It should be called in init() only. TODO(wcn): the canonical definition of "external" is in v1.proto. We need user facing copy for this important concept.

func Run Uses

func Run(ctx context.Context, runner string, p *Pipeline) error

Run executes the pipeline using the selected registred runner. It is customary to define a "runner" with no default as a flag to let users control runner selection.

func UnwrapCoder Uses

func UnwrapCoder(c Coder) *coder.Coder

UnwrapCoder returns the internal coder.

func ValidateKVType Uses

func ValidateKVType(col PCollection) (typex.FullType, typex.FullType)

ValidateKVType panics if the type of the PCollection is not KV<A,B>. It returns (A,B).

func ValidateNonCompositeType Uses

func ValidateNonCompositeType(col PCollection) typex.FullType

ValidateNonCompositeType panics if the type of the PCollection is not a composite type. It returns the type.

type Coder Uses

type Coder struct {
    // contains filtered or unexported fields
}

Coder defines how to encode and decode values of type 'A' into byte streams. Coders are attached to PCollections of the same type. For PCollections consumed by GBK, the attached coders are required to be deterministic.

func DecodeCoder Uses

func DecodeCoder(data string) (Coder, error)

DecodeCoder decodes a coder. Any custom coder function symbol must be resolvable via the runtime.GlobalSymbolResolver. The types must be encodable.

func NewCoder Uses

func NewCoder(t FullType) Coder

NewCoder infers a Coder for any bound full type.

func (Coder) IsValid Uses

func (c Coder) IsValid() bool

IsValid returns true iff the Coder is valid. Any use of an invalid Coder will result in a panic.

func (Coder) String Uses

func (c Coder) String() string

func (Coder) Type Uses

func (c Coder) Type() FullType

Type returns the full type 'A' of elements the coder can encode and decode. 'A' must be a concrete full type, such as int or KV<int,string>.

type Counter Uses

type Counter struct {
    *metrics.Counter
}

Counter is a metric that can be incremented and decremented, and is aggregated by the sum.

func NewCounter Uses

func NewCounter(namespace, name string) Counter

NewCounter returns the Counter with the given namespace and name.

func (Counter) Dec Uses

func (c Counter) Dec(ctx context.Context, v int64)

Dec decrements the counter within by the given amount.

Code:

c := beam.NewCounter("example", "size")
c.Dec(ctx, int64(len("foobar")))

func (Counter) Inc Uses

func (c Counter) Inc(ctx context.Context, v int64)

Inc increments the counter within by the given amount.

Code:

c := beam.NewCounter("example", "size")
c.Inc(ctx, int64(len("foobar")))

type Distribution Uses

type Distribution struct {
    *metrics.Distribution
}

Distribution is a metric that records various statistics about the distribution of reported values.

func NewDistribution Uses

func NewDistribution(namespace, name string) Distribution

NewDistribution returns the Distribution with the given namespace and name.

func (Distribution) Update Uses

func (c Distribution) Update(ctx context.Context, v int64)

Update adds an observation to this distribution.

Code:

t := time.Millisecond * 42
d := beam.NewDistribution("example", "latency_micros")
d.Update(ctx, int64(t/time.Microsecond))

type EncodedCoder Uses

type EncodedCoder struct {
    // Coder is the coder to preserve across serialization.
    Coder Coder
}

EncodedCoder is a serialization wrapper around a coder for convenience.

func (EncodedCoder) MarshalJSON Uses

func (w EncodedCoder) MarshalJSON() ([]byte, error)

MarshalJSON returns the JSON encoding this value.

func (*EncodedCoder) UnmarshalJSON Uses

func (w *EncodedCoder) UnmarshalJSON(buf []byte) error

UnmarshalJSON sets the state of this instance from the passed in JSON.

type EncodedFunc Uses

type EncodedFunc struct {
    // Fn is the function to preserve across serialization.
    Fn reflectx.Func
}

EncodedFunc is a serialization wrapper around a function for convenience.

func (EncodedFunc) MarshalJSON Uses

func (w EncodedFunc) MarshalJSON() ([]byte, error)

MarshalJSON returns the JSON encoding this value.

func (*EncodedFunc) UnmarshalJSON Uses

func (w *EncodedFunc) UnmarshalJSON(buf []byte) error

UnmarshalJSON sets the state of this instance from the passed in JSON.

type EncodedType Uses

type EncodedType struct {
    // T is the type to preserve across serialization.
    T reflect.Type
}

EncodedType is a serialization wrapper around a type for convenience.

func (EncodedType) MarshalJSON Uses

func (w EncodedType) MarshalJSON() ([]byte, error)

MarshalJSON returns the JSON encoding this value.

func (*EncodedType) UnmarshalJSON Uses

func (w *EncodedType) UnmarshalJSON(buf []byte) error

UnmarshalJSON sets the state of this instance from the passed in JSON.

type EventTime Uses

type EventTime = typex.EventTime

EventTime represents the time of the event that generated an element. This is distinct from the time when an element is processed.

type FullType Uses

type FullType = typex.FullType

FullType represents the tree structure of data types processed by the graph. It allows representation of composite types, such as KV<int, string> or CoGBK<int, int>, as well as "generic" such types, KV<int,T> or CoGBK<X,Y>, where the free "type variables" are the fixed universal types: T, X, etc.

type Gauge Uses

type Gauge struct {
    *metrics.Gauge
}

Gauge is a metric that can have its new value set, and is aggregated by taking the last reported value.

func NewGauge Uses

func NewGauge(namespace, name string) Gauge

NewGauge returns the Gauge with the given namespace and name.

func (Gauge) Set Uses

func (c Gauge) Set(ctx context.Context, v int64)

Set sets the current value for this gauge.

Code:

g := beam.NewGauge("example", "progress")
g.Set(ctx, 42)

type Option Uses

type Option interface {
    // contains filtered or unexported methods
}

Option is an optional value or context to a transformation, used at pipeline construction time. The primary use case is providing side inputs.

type PCollection Uses

type PCollection struct {
    // contains filtered or unexported fields
}

PCollection is an immutable collection of values of type 'A', which must be a concrete type, such as int or KV<int,string>. A PCollection can contain either a bounded or unbounded number of elements. Bounded and unbounded PCollections are produced as the output of PTransforms (including root PTransforms like textio.Read), and can be passed as the inputs of other PTransforms. Some root transforms produce bounded PCollections and others produce unbounded ones.

Each element in a PCollection has an associated timestamp. Sources assign timestamps to elements when they create PCollections, and other PTransforms propagate these timestamps from their input to their output implicitly or explicitly.

Additionally, each element is assigned to a set of windows. By default, all elements are assigned into a single default window, GlobalWindow.

func AddFixedKey Uses

func AddFixedKey(s Scope, col PCollection) PCollection

AddFixedKey adds a fixed key (0) to every element.

func CoGroupByKey Uses

func CoGroupByKey(s Scope, cols ...PCollection) PCollection

CoGroupByKey inserts a CoGBK transform into the pipeline.

func Combine Uses

func Combine(s Scope, combinefn interface{}, col PCollection) PCollection

Combine inserts a global Combine transform into the pipeline. It expects a PCollection<T> as input where T is a concrete type.

func CombinePerKey Uses

func CombinePerKey(s Scope, combinefn interface{}, col PCollection) PCollection

CombinePerKey inserts a GBK and per-key Combine transform into the pipeline. It expects a PCollection<KV<K,T>>. The CombineFn may optionally take a key parameter.

func Create Uses

func Create(s Scope, values ...interface{}) PCollection

Create inserts a fixed set of values into the pipeline. The values must be of the same type 'A' and the returned PCollection is of type A.

The returned PCollections can be used as any other PCollections. The values are JSON-coded. Each runner may place limits on the sizes of the values and Create should generally only be used for small collections.

Code:

beam.Create(s, 5, 6, 7, 8, 9)               // PCollection<int>
beam.Create(s, []int{5, 6}, []int{7, 8, 9}) // PCollection<[]int>
beam.Create(s, []int{5, 6, 7, 8, 9})        // PCollection<[]int>
beam.Create(s, "a", "b", "c")               // PCollection<string>

func CreateList Uses

func CreateList(s Scope, list interface{}) PCollection

CreateList inserts a fixed set of values into the pipeline from a slice or array. It is a convenience wrapper over Create.

Code:

beam.CreateList(s, []int{5, 6, 7, 8, 9}) // PCollection<int>

func DropKey Uses

func DropKey(s Scope, col PCollection) PCollection

DropKey drops the key for an input PCollection<KV<A,B>>. It returns a PCollection<B>.

func DropValue Uses

func DropValue(s Scope, col PCollection) PCollection

DropValue drops the value for an input PCollection<KV<A,B>>. It returns a PCollection<A>.

func Explode Uses

func Explode(s Scope, col PCollection) PCollection

Explode is a PTransform that takes a single PCollection<[]A> and returns a PCollection<A> containing all the elements for each incoming slice.

Code:

d := beam.Create(s, []int{1, 2, 3, 4, 5}) // PCollection<[]int>
beam.Explode(s, d)                        // PCollection<int>

func External Uses

func External(s Scope, spec string, payload []byte, in []PCollection, out []FullType, bounded bool) []PCollection

External defines a Beam external transform. The interpretation of this primitive is runner specific. The runner is responsible for parsing the payload based on the spec provided to implement the behavior of the operation. Transform libraries should expose an API that captures the user's intent and serialize the payload as a byte slice that the runner will deserialize.

func Flatten Uses

func Flatten(s Scope, cols ...PCollection) PCollection

Flatten is a PTransform that takes either multiple PCollections of type 'A' and returns a single PCollection of type 'A' containing all the elements in all the input PCollections. The name "Flatten" suggests taking a list of lists and flattening them into a single list.

By default, the Coder of the output PCollection is the same as the Coder of the first PCollection.

Code:

a := textio.Read(s, "...some file path...") // PCollection<string>
b := textio.Read(s, "...some other file path...")
c := textio.Read(s, "...some third file path...")

beam.Flatten(s, a, b, c) // PCollection<String>

func GroupByKey Uses

func GroupByKey(s Scope, a PCollection) PCollection

GroupByKey is a PTransform that takes a PCollection of type KV<A,B>, groups the values by key and windows, and returns a PCollection of type GBK<A,B> representing a map from each distinct key and window of the input PCollection to an iterable over all the values associated with that key in the input per window. Each key in the output PCollection is unique within each window.

GroupByKey is analogous to converting a multi-map into a uni-map, and related to GROUP BY in SQL. It corresponds to the "shuffle" step between the Mapper and the Reducer in the MapReduce framework.

Two keys of type A are compared for equality by first encoding each of the keys using the Coder of the keys of the input PCollection, and then comparing the encoded bytes. This admits efficient parallel evaluation. Note that this requires that the Coder of the keys be deterministic.

By default, input and output PCollections share a key Coder and iterable values in the input and output PCollection share an element Coder.

GroupByKey is a key primitive in data-parallel processing, since it is the main way to efficiently bring associated data together into one location. It is also a key determiner of the performance of a data-parallel pipeline.

See CoGroupByKey for a way to group multiple input PCollections by a common key at once.

Code:

type Doc struct{}
var urlDocPairs beam.PCollection             // PCollection<KV<string, Doc>>
urlToDocs := beam.GroupByKey(s, urlDocPairs) // PCollection<CoGBK<string, Doc>>

// CoGBK parameters receive an iterator function with all values associated
// with the same key.
beam.ParDo0(s, func(key string, values func(*Doc) bool) {
    var cur Doc
    for values(&cur) {
        // ... process all docs having that url ...
    }
}, urlToDocs) // PCollection<KV<string, []Doc>>

func Impulse Uses

func Impulse(s Scope) PCollection

Impulse emits a single empty []byte into the global window. The resulting PCollection is a singleton of type []byte.

The purpose of Impulse is to trigger another transform, such as ones that take all information as side inputs.

Code:

beam.Impulse(s) // PCollection<[]byte>

func ImpulseValue Uses

func ImpulseValue(s Scope, value []byte) PCollection

ImpulseValue emits the supplied byte slice into the global window. The resulting PCollection is a singleton of type []byte.

Code:

beam.ImpulseValue(s, []byte{}) // PCollection<[]byte>

func Must Uses

func Must(a PCollection, err error) PCollection

Must returns the input, but panics if err != nil.

func MustN Uses

func MustN(list []PCollection, err error) []PCollection

MustN returns the input, but panics if err != nil.

func ParDo Uses

func ParDo(s Scope, dofn interface{}, col PCollection, opts ...Option) PCollection

ParDo is the core element-wise PTransform in Apache Beam, invoking a user-specified function on each of the elements of the input PCollection to produce zero or more output elements, all of which are collected into the output PCollection. Use one of the ParDo variants for a different number of output PCollections. The PCollections do no need to have the same types.

Elements are processed independently, and possibly in parallel across distributed cloud resources. The ParDo processing style is similar to what happens inside the "Mapper" or "Reducer" class of a MapReduce-style algorithm.

DoFns

The function to use to process each element is specified by a DoFn, either as single function or as a struct with methods, notably ProcessElement. The struct may also define Setup, StartBundle, FinishBundle and Teardown methods. The struct is JSON-serialized and may contain construction-time values.

Conceptually, when a ParDo transform is executed, the elements of the input PCollection are first divided up into some number of "bundles". These are farmed off to distributed worker machines (or run locally, if using the direct runner). For each bundle of input elements processing proceeds as follows:

* If a struct, a fresh instance of the argument DoFn is created on a
  worker from json serialization, and the Setup method is called on this
  instance, if present. A runner may reuse DoFn instances for multiple
  bundles. A DoFn that has terminated abnormally (by returning an error)
  will never be reused.
* The DoFn's StartBundle method, if provided, is called to initialize it.
* The DoFn's ProcessElement method is called on each of the input elements
  in the bundle.
* The DoFn's FinishBundle method, if provided, is called to complete its
  work. After FinishBundle is called, the framework will not again invoke
  ProcessElement or FinishBundle until a new call to StartBundle has
  occurred.
* If any of Setup, StartBundle, ProcessElement or FinishBundle methods
  return an error, the Teardown method, if provided, will be called on the
  DoFn instance.
* If a runner will no longer use a DoFn, the Teardown method, if provided,
  will be called on the discarded instance.

Each of the calls to any of the DoFn's processing methods can produce zero or more output elements. All of the of output elements from all of the DoFn instances are included in an output PCollection.

For example:

words := beam.ParDo(s, &Foo{...}, ...)
lengths := beam.ParDo(s, func (word string) int) {
      return len(word)
}, works)

Each output element has the same timestamp and is in the same windows as its corresponding input element. The timestamp can be accessed and/or emitted by including a EventTime-typed parameter. The name of the function or struct is used as the DoFn name. Function literals do not have stable names and should thus not be used in production code.

Side Inputs

While a ParDo processes elements from a single "main input" PCollection, it can take additional "side input" PCollections. These SideInput along with the DoFn parameter form express styles of accessing PCollection computed by earlier pipeline operations, passed in to the ParDo transform using SideInput options, and their contents accessible to each of the DoFn operations. For example:

words := ...
cufoff := ...  // Singleton PCollection<int>
smallWords := beam.ParDo(s, func (word string, cutoff int, emit func(string)) {
      if len(word) < cutoff {
           emit(word)
      }
}, words, beam.SideInput{Input: cutoff})

Additional Outputs

Optionally, a ParDo transform can produce zero or multiple output PCollections. Note the use of ParDo2 to specfic 2 outputs. For example:

words := ...
cufoff := ...  // Singleton PCollection<int>
small, big := beam.ParDo2(s, func (word string, cutoff int, small, big func(string)) {
      if len(word) < cutoff {
           small(word)
      } else {
           big(word)
      }
}, words, beam.SideInput{Input: cutoff})

By default, the Coders for the elements of each output PCollections is inferred from the concrete type.

No Global Shared State

There are three main ways to initialize the state of a DoFn instance processing a bundle:

* Define public instance variable state. This state will be automatically
  JSON serialized and then deserialized in the DoFn instances created for
  bundles. This method is good for state known when the original DoFn is
  created in the main program, if it's not overly large. This is not
  suitable for any state which must only be used for a single bundle, as
  DoFn's may be used to process multiple bundles.

* Compute the state as a singleton PCollection and pass it in as a side
  input to the DoFn. This is good if the state needs to be computed by the
  pipeline, or if the state is very large and so is best read from file(s)
  rather than sent as part of the DoFn's serialized state.

* Initialize the state in each DoFn instance, in a StartBundle method.
  This is good if the initialization doesn't depend on any information
  known only by the main program or computed by earlier pipeline
  operations, but is the same for all instances of this DoFn for all
  program executions, say setting up empty caches or initializing constant
  data.

ParDo operations are intended to be able to run in parallel across multiple worker machines. This precludes easy sharing and updating mutable state across those machines. There is no support in the Beam model for communicating and synchronizing updates to shared state across worker machines, so programs should not access any mutable global variable state in their DoFn, without understanding that the Go processes for the main program and workers will each have its own independent copy of such state, and there won't be any automatic copying of that state across Java processes. All information should be communicated to DoFn instances via main and side inputs and serialized state, and all output should be communicated from a DoFn instance via output PCollections, in the absence of external communication mechanisms written by user code.

Fault Tolerance

In a distributed system, things can fail: machines can crash, machines can be unable to communicate across the network, etc. While individual failures are rare, the larger the job, the greater the chance that something, somewhere, will fail. Beam runners may strive to mask such failures by retrying failed DoFn bundles. This means that a DoFn instance might process a bundle partially, then crash for some reason, then be rerun (often as a new process) on that same bundle and on the same elements as before. Sometimes two or more DoFn instances will be running on the same bundle simultaneously, with the system taking the results of the first instance to complete successfully. Consequently, the code in a DoFn needs to be written such that these duplicate (sequential or concurrent) executions do not cause problems. If the outputs of a DoFn are a pure function of its inputs, then this requirement is satisfied. However, if a DoFn's execution has external side-effects, such as performing updates to external HTTP services, then the DoFn's code needs to take care to ensure that those updates are idempotent and that concurrent updates are acceptable. This property can be difficult to achieve, so it is advisable to strive to keep DoFns as pure functions as much as possible.

Optimization

Beam runners may choose to apply optimizations to a pipeline before it is executed. A key optimization, fusion, relates to ParDo operations. If one ParDo operation produces a PCollection that is then consumed as the main input of another ParDo operation, the two ParDo operations will be fused together into a single ParDo operation and run in a single pass; this is "producer-consumer fusion". Similarly, if two or more ParDo operations have the same PCollection main input, they will be fused into a single ParDo that makes just one pass over the input PCollection; this is "sibling fusion".

If after fusion there are no more unfused references to a PCollection (e.g., one between a producer ParDo and a consumer ParDo), the PCollection itself is "fused away" and won't ever be written to disk, saving all the I/O and space expense of constructing it.

When Beam runners apply fusion optimization, it is essentially "free" to write ParDo operations in a very modular, composable style, each ParDo operation doing one clear task, and stringing together sequences of ParDo operations to get the desired overall effect. Such programs can be easier to understand, easier to unit-test, easier to extend and evolve, and easier to reuse in new programs. The predefined library of PTransforms that come with Beam makes heavy use of this modular, composable style, trusting to the runner to "flatten out" all the compositions into highly optimized stages.

See https://beam.apache.org/documentation/programming-guide/#transforms-pardo" for the web documentation for ParDo

Optionally, a ParDo transform can produce zero or multiple output PCollections. Note the use of ParDo2 to specify 2 outputs.

Code:

var words beam.PCollection  // PCollection<string>
var cutoff beam.PCollection // Singleton PCollection<int>
small, big := beam.ParDo2(s, func(word string, cutoff int, small, big func(string)) {
    if len(word) < cutoff {
        small(word)
    } else {
        big(word)
    }
}, words, beam.SideInput{Input: cutoff})

_, _ = small, big

func ParDoN Uses

func ParDoN(s Scope, dofn interface{}, col PCollection, opts ...Option) []PCollection

ParDoN inserts a ParDo with any number of outputs into the pipeline.

func Partition Uses

func Partition(s Scope, n int, fn interface{}, col PCollection) []PCollection

Partition takes a PCollection<T> and a PartitionFn, uses the PartitionFn to split the elements of the input PCollection into N partitions, and returns a []PCollection<T> that bundles N PCollection<T>s containing the split elements.

func Seq Uses

func Seq(s Scope, col PCollection, dofns ...interface{}) PCollection

Seq is a convenience helper to chain single-input/single-output ParDos together in a sequence.

Code:

a := textio.Read(s, "...some file path...") // PCollection<string>

beam.Seq(s, a,
    strconv.Atoi, // string to int
    func(i int) float64 { return float64(i) }, // int to float64
    math.Signbit, // float64 to bool
)   // PCollection<bool>

func SwapKV Uses

func SwapKV(s Scope, col PCollection) PCollection

SwapKV swaps the key and value for an input PCollection<KV<A,B>>. It returns a PCollection<KV<B,A>>.

func TryCoGroupByKey Uses

func TryCoGroupByKey(s Scope, cols ...PCollection) (PCollection, error)

TryCoGroupByKey inserts a CoGBK transform into the pipeline. Returns an error on failure.

func TryCombine Uses

func TryCombine(s Scope, combinefn interface{}, col PCollection) (PCollection, error)

TryCombine attempts to insert a global Combine transform into the pipeline. It may fail for multiple reasons, notably that the combinefn is not valid or cannot be bound -- due to type mismatch, say -- to the incoming PCollections.

func TryCombinePerKey Uses

func TryCombinePerKey(s Scope, combinefn interface{}, col PCollection) (PCollection, error)

TryCombinePerKey attempts to insert a per-key Combine transform into the pipeline. It may fail for multiple reasons, notably that the combinefn is not valid or cannot be bound -- due to type mismatch, say -- to the incoming PCollection.

func TryCreate Uses

func TryCreate(s Scope, values ...interface{}) (PCollection, error)

TryCreate inserts a fixed set of values into the pipeline. The values must be of the same type.

func TryExternal Uses

func TryExternal(s Scope, spec string, payload []byte, in []PCollection, out []FullType, bounded bool) ([]PCollection, error)

TryExternal attempts to perform the work of External, returning an error indicating why the operation failed.

func TryFlatten Uses

func TryFlatten(s Scope, cols ...PCollection) (PCollection, error)

TryFlatten merges incoming PCollections of type 'A' to a single PCollection of type 'A'. Returns an error indicating the set of PCollections that could not be flattened.

func TryGroupByKey Uses

func TryGroupByKey(s Scope, a PCollection) (PCollection, error)

TryGroupByKey inserts a GBK transform into the pipeline. Returns an error on failure.

func TryParDo Uses

func TryParDo(s Scope, dofn interface{}, col PCollection, opts ...Option) ([]PCollection, error)

TryParDo attempts to insert a ParDo transform into the pipeline. It may fail for multiple reasons, notably that the dofn is not valid or cannot be bound -- due to type mismatch, say -- to the incoming PCollections.

func TryWindowInto Uses

func TryWindowInto(s Scope, ws *window.Fn, col PCollection) (PCollection, error)

TryWindowInto attempts to insert a WindowInto transform.

func WindowInto Uses

func WindowInto(s Scope, ws *window.Fn, col PCollection) PCollection

WindowInto applies the windowing strategy to each element.

func (PCollection) Coder Uses

func (p PCollection) Coder() Coder

Coder returns the coder for the collection. The Coder is of type 'A'.

func (PCollection) IsValid Uses

func (p PCollection) IsValid() bool

IsValid returns true iff the PCollection is valid and part of a Pipeline. Any use of an invalid PCollection will result in a panic.

func (PCollection) SetCoder Uses

func (p PCollection) SetCoder(c Coder) error

SetCoder set the coder for the collection. The Coder must be of type 'A'.

func (PCollection) String Uses

func (p PCollection) String() string

func (PCollection) Type Uses

func (p PCollection) Type() FullType

Type returns the full type 'A' of the elements. 'A' must be a concrete type, such as int or KV<int,string>.

type Pipeline Uses

type Pipeline struct {
    // contains filtered or unexported fields
}

Pipeline manages a directed acyclic graph of primitive PTransforms, and the PCollections that the PTransforms consume and produce. Each Pipeline is self-contained and isolated from any other Pipeline. The Pipeline owns the PCollections and PTransforms and they can by used by that Pipeline only. Pipelines can safely be executed concurrently.

func NewPipeline Uses

func NewPipeline() *Pipeline

NewPipeline creates a new empty pipeline.

func (*Pipeline) Build Uses

func (p *Pipeline) Build() ([]*graph.MultiEdge, []*graph.Node, error)

Build validates the Pipeline and returns a lower-level representation for execution. It is called by runners only.

func (*Pipeline) Root Uses

func (p *Pipeline) Root() Scope

Root returns the root scope of the pipeline.

func (*Pipeline) String Uses

func (p *Pipeline) String() string

type Scope Uses

type Scope struct {
    // contains filtered or unexported fields
}

Scope is a hierarchical grouping for composite transforms. Scopes can be enclosed in other scopes and for a tree structure. For pipeline updates, the scope chain form a unique name. The scope chain can also be used for monitoring and visualization purposes.

func (Scope) IsValid Uses

func (s Scope) IsValid() bool

IsValid returns true iff the Scope is valid. Any use of an invalid Scope will result in a panic.

func (Scope) Scope Uses

func (s Scope) Scope(name string) Scope

Scope returns a sub-scope with the given name. The name provided may be augmented to ensure uniqueness.

func (Scope) String Uses

func (s Scope) String() string

type SideInput Uses

type SideInput struct {
    Input PCollection
}

SideInput provides a view of the given PCollection to the transformation.

Code:

// words and sample are PCollection<string>
var words, sample beam.PCollection
// analyzeFn emits values from the primary based on the singleton side input.
analyzeFn := func(primary string, side string, emit func(string)) {}
// Use beam.SideInput to declare that the sample PCollection is the side input.
beam.ParDo(s, analyzeFn, words, beam.SideInput{Input: sample})

type T Uses

type T = typex.T

T is a Universal Type used to represent "generic" types in DoFn and PCollection signatures. Each universal type is distinct from all others.

func JSONDec Uses

func JSONDec(t reflect.Type, in []byte) (T, error)

JSONDec decodes the supplied JSON into an instance of the supplied type.

func ProtoDec Uses

func ProtoDec(t reflect.Type, in []byte) (T, error)

ProtoDec unmarshals the supplied bytes into an instance of the supplied proto.Message type.

type TypeDefinition Uses

type TypeDefinition struct {
    // Var is the universal type defined.
    Var reflect.Type
    // T is the type it is bound to.
    T   reflect.Type
}

TypeDefinition provides construction-time type information that the platform cannot infer, such as structured storage sources. These types are universal types that appear as output only. Types that are inferrable should not be conveyed via this mechanism.

type U Uses

type U = typex.U

U is a Universal Type used to represent "generic" types in DoFn and PCollection signatures. Each universal type is distinct from all others.

type V Uses

type V = typex.V

V is a Universal Type used to represent "generic" types in DoFn and PCollection signatures. Each universal type is distinct from all others.

type W Uses

type W = typex.W

W is a Universal Type used to represent "generic" types in DoFn and PCollection signatures. Each universal type is distinct from all others.

type Window Uses

type Window = typex.Window

type X Uses

type X = typex.X

X is a Universal Type used to represent "generic" types in DoFn and PCollection signatures. Each universal type is distinct from all others.

type Y Uses

type Y = typex.Y

Y is a Universal Type used to represent "generic" types in DoFn and PCollection signatures. Each universal type is distinct from all others.

type Z Uses

type Z = typex.Z

Z is a Universal Type used to represent "generic" types in DoFn and PCollection signatures. Each universal type is distinct from all others.

Directories

PathSynopsis
artifactPackage artifact contains utilities for staging and retrieving artifacts.
artifact/gcsproxyPackage gcsproxy contains artifact staging and retrieval servers backed by GCS.
core/funcxPackage funcx contains functions and types used to perform type analysis of Beam functions.
core/graphPackage graph is the internal representation of the Beam execution plan.
core/graph/coderPackage coder contains coder representation and utilities.
core/graph/mtimePackage mtime contains a millisecond representation of time.
core/graph/windowPackage window contains window representation, windowing strategies and utilities.
core/metricsPackage metrics implements the Beam metrics API, described at http://s.apache.org/beam-metrics-api
core/runtimePackage runtime contains runtime hooks and utilities for pipeline options and type registration.
core/runtime/coderxPackage coderx contains primitive coders.
core/runtime/execPackage exec contains runtime plan representation and execution.
core/runtime/exec/optimizedPackage optimized contains type-specialized shims for faster execution.
core/runtime/graphxPackage graphx provides facilities to help with the serialization of pipelines into a serializable graph structure suitable for the worker.
core/runtime/graphx/v1Package v1 is a generated protocol buffer package.
core/runtime/harnessPackage harness implements the SDK side of the Beam FnAPI.
core/runtime/harness/initPackage init contains the harness initialization code defined by the FnAPI.
core/runtime/harness/sessionPackage session is a generated protocol buffer package.
core/runtime/pipelinexPackage pipelinex contains utilities for manipulating Beam proto pipelines.
core/typexPackage typex contains full type representation and utilities for type checking.
core/util/dotPackage dot produces DOT graphs from Beam graph representations.
core/util/hooksPackage hooks allows runners to tailor execution of the worker harness.
core/util/ioutilxPackage ioutilx contains additional io utilities.
core/util/protoxPackage protox contains utilities for working with protobufs.
core/util/reflectxPackage reflectx contains a set of reflection utilities and well-known types.
core/util/stringxPackage stringx contains utilities for working with strings.
core/util/symtabPackage symtab allows reading low-level symbol information from the symbol table.
io/avroioPackage avroio contains transforms for reading and writing avro files.
io/bigqueryioPackage bigqueryio provides transformations and utilities to interact with Google BigQuery.
io/filesystemPackage filesystem contains an extensible file system abstraction.
io/filesystem/gcsPackage gcs contains a Google Cloud Storage (GCS) implementation of the Beam file system.
io/filesystem/localPackage local contains a local file implementation of the Beam file system.
io/filesystem/memfsPackage memfs contains a in-memory Beam filesystem.
io/pubsubioPackage pubsubio provides access to PubSub on Dataflow streaming.
io/pubsubio/v1Package v1 is a generated protocol buffer package.
io/textioPackage textio contains transforms for reading and writing text files.
logPackage log contains a re-targetable context-aware logging system.
modelPackage model contains the portable Beam model contracts.
model/fnexecution_v1
model/jobmanagement_v1
model/pipeline_v1
options/gcpoptsPackage gcpopts contains shared options for Google Cloud Platform.
options/joboptsPackage jobopts contains shared options for job submission.
provisionPackage provision contains utilities for obtaining runtime provision, information -- such as pipeline options.
runners/dataflowPackage dataflow contains the Dataflow runner for submitting pipelines to Google Cloud Dataflow.
runners/dataflow/dataflowlibPackage dataflowlib translates a Beam pipeline model to the Dataflow API job model, for submission to Google Cloud Dataflow.
runners/directPackage direct contains the direct runner for running single-bundle pipelines in the current process.
runners/dotPackage dot is a Beam runner that "runs" a pipeline by producing a DOT graph of the execution plan.
runners/flinkPackage flink contains the Flink runner.
runners/universalPackage universal contains a general-purpose runner that can submit jobs to any portable Beam runner.
runners/universal/runnerlibPackage runnerlib contains utilities for submitting Go pipelines to a Beam model runner.
testing/passertPackage passert contains verification transformations for testing pipelines.
testing/ptestPackage ptest contains utilities for pipeline unit testing.
transforms/filterPackage filter contains transformations for removing pipeline elements based on various conditions.
transforms/statsPacakge stats contains transforms for statistical processing.
transforms/topPackage top contains transformations for finding the smallest (or largest) N elements based on arbitrary orderings.
util/errorxPackage errorx contains utilities for handling errors.
util/execxPackage execx contains wrappers and utilities for the exec package.
util/gcsxPackage gcsx contains utilities for working with Google Cloud Storage (GCS).
util/grpcxPackage grpcx contains utilities for working with gRPC.
util/pubsubxPackage pubsubx contains utilities for working with Google PubSub.
util/shimxPackage shimx specifies the templates for generating type assertion shims for Apache Beam Go SDK pipelines.
util/starcgenxPackage starcgenx is a Static Analysis Type Assertion shim and Registration Code Generator which provides an extractor to extract types from a package, in order to generate approprate shimsr a package so code can be generated for it.
x/beamxPackage beamx is a convenience package for beam.
x/debug
x/hooks/perfPackage perf is to add performance measuring hooks to a runner, such as cpu, or trace profiles.

Package beam imports 18 packages (graph) and is imported by 35 packages. Updated 2018-12-12. Refresh now. Tools for package owners.