beam: github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec Index | Files | Directories

package exec

import "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec"

Package exec contains runtime plan representation and execution. A pipeline must be translated to a runtime plan to be executed.

Index

Package Files

coder.go cogbk.go combine.go data.go datasink.go datasource.go decode.go discard.go emit.go encode.go flatten.go fn.go fn_arity.go fullvalue.go hash.go input.go multiplex.go pardo.go plan.go reshuffle.go sdf.go sdf_invokers.go sideinput.go status.go translate.go unit.go util.go window.go

func Convert Uses

func Convert(v interface{}, to reflect.Type) interface{}

Convert converts type of the runtime value to the desired one. It is needed to drop the universal type and convert Aggregate types.

func ConvertFn Uses

func ConvertFn(from, to reflect.Type) func(interface{}) interface{}

ConvertFn returns a function that converts type of the runtime value to the desired one. It is needed to drop the universal type and convert Aggregate types.

func DecodeWindowedValueHeader Uses

func DecodeWindowedValueHeader(dec WindowDecoder, r io.Reader) ([]typex.Window, typex.EventTime, error)

DecodeWindowedValueHeader deserializes a windowed value header.

func EncodeElement Uses

func EncodeElement(c ElementEncoder, val interface{}) ([]byte, error)

EncodeElement is a convenience function for encoding a single element into a byte slice.

func EncodeWindow Uses

func EncodeWindow(c WindowEncoder, w typex.Window) ([]byte, error)

EncodeWindow is a convenience function for encoding a single window into a byte slice.

func EncodeWindowedValueHeader Uses

func EncodeWindowedValueHeader(enc WindowEncoder, ws []typex.Window, t typex.EventTime, w io.Writer) error

EncodeWindowedValueHeader serializes a windowed value header.

func IsEmitterRegistered Uses

func IsEmitterRegistered(t reflect.Type) bool

IsEmitterRegistered returns whether an emitter maker has already been registered.

func IsInputRegistered Uses

func IsInputRegistered(t reflect.Type) bool

IsInputRegistered returns whether an input maker has already been registered.

func MultiFinishBundle Uses

func MultiFinishBundle(ctx context.Context, list ...Node) error

MultiFinishBundle calls StartBundle on multiple nodes. Convenience function.

func MultiStartBundle Uses

func MultiStartBundle(ctx context.Context, id string, data DataContext, list ...Node) error

MultiStartBundle calls StartBundle on multiple nodes. Convenience function.

func RegisterEmitter Uses

func RegisterEmitter(t reflect.Type, maker func(ElementProcessor) ReusableEmitter)

RegisterEmitter registers an emitter for the given type, such as "func(int)". If multiple emitters are registered for the same type, the last registration wins.

func RegisterInput Uses

func RegisterInput(t reflect.Type, maker func(ReStream) ReusableInput)

RegisterInput registers an input handler for the given type, such as "func(*int)bool". If multiple input handlers are registered for the same type, the last registration wins.

type Combine Uses

type Combine struct {
    UID     UnitID
    Fn      *graph.CombineFn
    UsesKey bool
    Out     Node

    PID string
    // contains filtered or unexported fields
}

Combine is a Combine executor. Combiners do not have side inputs (or output).

func (*Combine) Down Uses

func (n *Combine) Down(ctx context.Context) error

Down runs the ParDo's TeardownFn.

func (*Combine) FinishBundle Uses

func (n *Combine) FinishBundle(ctx context.Context) error

FinishBundle completes this node's processing of a bundle.

func (*Combine) GetPID Uses

func (n *Combine) GetPID() string

GetPID returns the PTransformID for this CombineFn.

func (*Combine) ID Uses

func (n *Combine) ID() UnitID

ID returns the UnitID for this node.

func (*Combine) ProcessElement Uses

func (n *Combine) ProcessElement(ctx context.Context, value *FullValue, values ...ReStream) error

ProcessElement combines elements grouped by key using the CombineFn's AddInput, MergeAccumulators, and ExtractOutput functions.

func (*Combine) StartBundle Uses

func (n *Combine) StartBundle(ctx context.Context, id string, data DataContext) error

StartBundle initializes processing this bundle for combines.

func (*Combine) String Uses

func (n *Combine) String() string

func (*Combine) Up Uses

func (n *Combine) Up(ctx context.Context) error

Up initializes this CombineFn and runs its SetupFn() method.

type ConvertToAccumulators Uses

type ConvertToAccumulators struct {
    *Combine
}

ConvertToAccumulators is an executor for converting an input value to an accumulator value.

func (*ConvertToAccumulators) ProcessElement Uses

func (n *ConvertToAccumulators) ProcessElement(ctx context.Context, value *FullValue, values ...ReStream) error

ProcessElement accepts an input value and returns an accumulator containing that one value.

func (*ConvertToAccumulators) String Uses

func (n *ConvertToAccumulators) String() string

type DataContext Uses

type DataContext struct {
    Data  DataManager
    State StateReader
}

DataContext holds connectors to various data connections, incl. state and side input.

type DataManager Uses

type DataManager interface {
    // OpenRead opens a closable byte stream for reading.
    OpenRead(ctx context.Context, id StreamID) (io.ReadCloser, error)
    // OpenWrite opens a closable byte stream for writing.
    OpenWrite(ctx context.Context, id StreamID) (io.WriteCloser, error)
}

DataManager manages external data byte streams. Each data stream can be opened by one consumer only.

type DataSink Uses

type DataSink struct {
    UID   UnitID
    SID   StreamID
    Coder *coder.Coder
    // contains filtered or unexported fields
}

DataSink is a Node.

func (*DataSink) Down Uses

func (n *DataSink) Down(ctx context.Context) error

func (*DataSink) FinishBundle Uses

func (n *DataSink) FinishBundle(ctx context.Context) error

func (*DataSink) ID Uses

func (n *DataSink) ID() UnitID

func (*DataSink) ProcessElement Uses

func (n *DataSink) ProcessElement(ctx context.Context, value *FullValue, values ...ReStream) error

func (*DataSink) StartBundle Uses

func (n *DataSink) StartBundle(ctx context.Context, id string, data DataContext) error

func (*DataSink) String Uses

func (n *DataSink) String() string

func (*DataSink) Up Uses

func (n *DataSink) Up(ctx context.Context) error

type DataSource Uses

type DataSource struct {
    UID   UnitID
    SID   StreamID
    Name  string
    Coder *coder.Coder
    Out   Node
    // contains filtered or unexported fields
}

DataSource is a Root execution unit.

func (*DataSource) Down Uses

func (n *DataSource) Down(ctx context.Context) error

Down resets the source.

func (*DataSource) FinishBundle Uses

func (n *DataSource) FinishBundle(ctx context.Context) error

FinishBundle resets the source.

func (*DataSource) ID Uses

func (n *DataSource) ID() UnitID

ID returns the UnitID for this node.

func (*DataSource) InitSplittable Uses

func (n *DataSource) InitSplittable()

InitSplittable initializes the SplittableUnit channel from the output unit, if it provides one.

func (*DataSource) Process Uses

func (n *DataSource) Process(ctx context.Context) error

Process opens the data source, reads and decodes data, kicking off element processing.

func (*DataSource) Progress Uses

func (n *DataSource) Progress() ProgressReportSnapshot

Progress returns a snapshot of the source's progress.

func (*DataSource) Split Uses

func (n *DataSource) Split(splits []int64, frac float64, bufSize int64) (SplitResult, error)

Split takes a sorted set of potential split indices and a fraction of the remainder to split at, selects and actuates a split on an appropriate split index, and returns the selected split index in a SplitResult if successful or an error when unsuccessful.

If the following transform is splittable, and the split indices and fraction allow for splitting on the currently processing element, then a sub-element split is performed, and the appropriate information is returned in the SplitResult.

The bufSize param specifies the estimated number of elements that will be sent to this DataSource, and is used to be able to perform accurate splits even if the DataSource has not yet received all its elements. A bufSize of 0 or less indicates that its unknown, and so uses the current known size.

func (*DataSource) StartBundle Uses

func (n *DataSource) StartBundle(ctx context.Context, id string, data DataContext) error

StartBundle initializes this datasource for the bundle.

func (*DataSource) String Uses

func (n *DataSource) String() string

func (*DataSource) Up Uses

func (n *DataSource) Up(ctx context.Context) error

Up initializes this datasource.

type Decoder Uses

type Decoder interface {
    // Decode decodes the []byte in to a value of the given type.
    Decode(reflect.Type, []byte) (interface{}, error)
}

Decoder is a uniform custom encoder interface. It wraps various forms of reflectx.Funcs.

type Discard Uses

type Discard struct {
    // UID is the unit identifier.
    UID UnitID
}

Discard silently discard all elements. It is implicitly inserted for any loose ends in the pipeline.

func (*Discard) Down Uses

func (d *Discard) Down(ctx context.Context) error

func (*Discard) FinishBundle Uses

func (d *Discard) FinishBundle(ctx context.Context) error

func (*Discard) ID Uses

func (d *Discard) ID() UnitID

func (*Discard) ProcessElement Uses

func (d *Discard) ProcessElement(ctx context.Context, value *FullValue, values ...ReStream) error

func (*Discard) StartBundle Uses

func (d *Discard) StartBundle(ctx context.Context, id string, data DataContext) error

func (*Discard) String Uses

func (d *Discard) String() string

func (*Discard) Up Uses

func (d *Discard) Up(ctx context.Context) error

type ElementDecoder Uses

type ElementDecoder interface {
    // Decode deserializes a value from the given reader.
    Decode(io.Reader) (*FullValue, error)
    // DecodeTo deserializes a value from the given reader into the provided FullValue.
    DecodeTo(io.Reader, *FullValue) error
}

ElementDecoder handles FullValue deserialization from a byte stream. The decoder can be reused, even if an error is encountered. Concurrency-safe.

func MakeElementDecoder Uses

func MakeElementDecoder(c *coder.Coder) ElementDecoder

MakeElementDecoder returns a ElementDecoder for the given coder. It panics if given a coder with stream types, such as GBK.

type ElementEncoder Uses

type ElementEncoder interface {
    // Encode serializes the given value to the writer.
    Encode(*FullValue, io.Writer) error
}

ElementEncoder handles FullValue serialization to a byte stream. The encoder can be reused, even if an error is encountered. Concurrency-safe.

func MakeElementEncoder Uses

func MakeElementEncoder(c *coder.Coder) ElementEncoder

MakeElementEncoder returns a ElementCoder for the given coder. It panics if given a coder with stream types, such as GBK.

type ElementProcessor Uses

type ElementProcessor interface {
    // Call processes a single element. If GBK or CoGBK result, the values
    // are populated. Otherwise, they're empty.
    // The *FullValue  is owned by the caller, and is not safe to cache.
    ProcessElement(ctx context.Context, elm *FullValue, values ...ReStream) error
}

ElementProcessor presents a component that can process an element.

type Encoder Uses

type Encoder interface {
    // Encode encodes the given value (of the given type).
    Encode(reflect.Type, interface{}) ([]byte, error)
}

Encoder is a uniform custom encoder interface. It wraps various forms of reflectx.Funcs.

type Expand Uses

type Expand struct {
    // UID is the unit identifier.
    UID UnitID

    ValueDecoders []ElementDecoder

    Out Node
}

func (*Expand) Down Uses

func (n *Expand) Down(ctx context.Context) error

func (*Expand) FinishBundle Uses

func (n *Expand) FinishBundle(ctx context.Context) error

func (*Expand) ID Uses

func (n *Expand) ID() UnitID

func (*Expand) ProcessElement Uses

func (n *Expand) ProcessElement(ctx context.Context, elm *FullValue, values ...ReStream) error

func (*Expand) StartBundle Uses

func (n *Expand) StartBundle(ctx context.Context, id string, data DataContext) error

func (*Expand) String Uses

func (n *Expand) String() string

func (*Expand) Up Uses

func (n *Expand) Up(ctx context.Context) error

type ExtractOutput Uses

type ExtractOutput struct {
    *Combine
}

ExtractOutput is an executor for extracting output from a lifted combine.

func (*ExtractOutput) ProcessElement Uses

func (n *ExtractOutput) ProcessElement(ctx context.Context, value *FullValue, values ...ReStream) error

ProcessElement accepts an accumulator value, and extracts the final return type from it.

func (*ExtractOutput) String Uses

func (n *ExtractOutput) String() string

type FixedKey Uses

type FixedKey struct {
    // UID is the unit identifier.
    UID UnitID
    // Key is the given key
    Key interface{}
    // Out is the successor node.
    Out Node
}

FixedKey transform any value into KV<K, V> for a fixed K.

func (*FixedKey) Down Uses

func (n *FixedKey) Down(ctx context.Context) error

func (*FixedKey) FinishBundle Uses

func (n *FixedKey) FinishBundle(ctx context.Context) error

func (*FixedKey) ID Uses

func (n *FixedKey) ID() UnitID

func (*FixedKey) ProcessElement Uses

func (n *FixedKey) ProcessElement(ctx context.Context, elm *FullValue, values ...ReStream) error

func (*FixedKey) StartBundle Uses

func (n *FixedKey) StartBundle(ctx context.Context, id string, data DataContext) error

func (*FixedKey) String Uses

func (n *FixedKey) String() string

func (*FixedKey) Up Uses

func (n *FixedKey) Up(ctx context.Context) error

type FixedReStream Uses

type FixedReStream struct {
    Buf []FullValue
}

FixedReStream is a simple in-memory ReStream.

func (*FixedReStream) Open Uses

func (n *FixedReStream) Open() (Stream, error)

Open returns the a Stream from the start of the in-memory ReStream.

type FixedStream Uses

type FixedStream struct {
    Buf []FullValue
    // contains filtered or unexported fields
}

FixedStream is a simple in-memory Stream from a fixed array.

func (*FixedStream) Close Uses

func (s *FixedStream) Close() error

Close releases the buffer, closing the stream.

func (*FixedStream) Read Uses

func (s *FixedStream) Read() (*FullValue, error)

Read produces the next value in the stream.

type Flatten Uses

type Flatten struct {
    // UID is the unit identifier.
    UID UnitID
    // N is the number of incoming edges.
    N   int
    // Out is the output node.
    Out Node
    // contains filtered or unexported fields
}

Flatten is a fan-in node. It ensures that Start/FinishBundle are only called once downstream.

func (*Flatten) Down Uses

func (m *Flatten) Down(ctx context.Context) error

func (*Flatten) FinishBundle Uses

func (m *Flatten) FinishBundle(ctx context.Context) error

func (*Flatten) ID Uses

func (m *Flatten) ID() UnitID

func (*Flatten) ProcessElement Uses

func (m *Flatten) ProcessElement(ctx context.Context, elm *FullValue, values ...ReStream) error

func (*Flatten) StartBundle Uses

func (m *Flatten) StartBundle(ctx context.Context, id string, data DataContext) error

func (*Flatten) String Uses

func (m *Flatten) String() string

func (*Flatten) Up Uses

func (m *Flatten) Up(ctx context.Context) error

type FullValue Uses

type FullValue struct {
    Elm  interface{} // Element or KV key.
    Elm2 interface{} // KV value, if not invalid

    Timestamp typex.EventTime
    Windows   []typex.Window
}

FullValue represents the full runtime value for a data element, incl. the implicit context. The result of a GBK or CoGBK is not a single FullValue. The consumer is responsible for converting the values to the correct type. To represent a nested KV with FullValues, assign a *FullValue to Elm/Elm2.

func Invoke Uses

func Invoke(ctx context.Context, ws []typex.Window, ts typex.EventTime, fn *funcx.Fn, opt *MainInput, extra ...interface{}) (*FullValue, error)

Invoke invokes the fn with the given values. The extra values must match the non-main side input and emitters. It returns the direct output, if any.

func InvokeWithoutEventTime Uses

func InvokeWithoutEventTime(ctx context.Context, fn *funcx.Fn, opt *MainInput, extra ...interface{}) (*FullValue, error)

InvokeWithoutEventTime runs the given function at time 0 in the global window.

func ReadAll Uses

func ReadAll(rs ReStream) ([]FullValue, error)

ReadAll read a full restream and returns the result.

func (*FullValue) String Uses

func (v *FullValue) String() string

type GenID Uses

type GenID struct {
    // contains filtered or unexported fields
}

GenID is a simple UnitID generator.

func (*GenID) New Uses

func (g *GenID) New() UnitID

New returns a fresh ID.

type Inject Uses

type Inject struct {
    // UID is the unit identifier.
    UID UnitID
    // N is the index (tag) in the union.
    N   int
    // ValueCoder is the encoder for the value part of the incoming KV<K,V>.
    ValueEncoder ElementEncoder
    // Out is the successor node.
    Out Node
}

Inject injects the predecessor index into each FullValue and encodes the value, effectively converting KV<X,Y> into KV<X,KV<int,[]byte>>. Used in combination with Expand.

func (*Inject) Down Uses

func (n *Inject) Down(ctx context.Context) error

func (*Inject) FinishBundle Uses

func (n *Inject) FinishBundle(ctx context.Context) error

func (*Inject) ID Uses

func (n *Inject) ID() UnitID

func (*Inject) ProcessElement Uses

func (n *Inject) ProcessElement(ctx context.Context, elm *FullValue, values ...ReStream) error

func (*Inject) StartBundle Uses

func (n *Inject) StartBundle(ctx context.Context, id string, data DataContext) error

func (*Inject) String Uses

func (n *Inject) String() string

func (*Inject) Up Uses

func (n *Inject) Up(ctx context.Context) error

type LiftedCombine Uses

type LiftedCombine struct {
    *Combine
    KeyCoder *coder.Coder
    // contains filtered or unexported fields
}

LiftedCombine is an executor for combining values before grouping by keys for a lifted combine. Partially groups values by key within a bundle, accumulating them in an in memory cache, before emitting them in the FinishBundle step.

func (*LiftedCombine) Down Uses

func (n *LiftedCombine) Down(ctx context.Context) error

Down tears down the cache.

func (*LiftedCombine) FinishBundle Uses

func (n *LiftedCombine) FinishBundle(ctx context.Context) error

FinishBundle iterates through the cached (key, accumulator) pairs, and then processes the value in the bundle as normal.

func (*LiftedCombine) ProcessElement Uses

func (n *LiftedCombine) ProcessElement(ctx context.Context, value *FullValue, values ...ReStream) error

ProcessElement takes a KV pair and combines values with the same key into an accumulator, caching them until the bundle is complete. If the cache grows too large, a random eviction policy is used.

func (*LiftedCombine) StartBundle Uses

func (n *LiftedCombine) StartBundle(ctx context.Context, id string, data DataContext) error

StartBundle initializes the in memory cache of keys to accumulators.

func (*LiftedCombine) String Uses

func (n *LiftedCombine) String() string

func (*LiftedCombine) Up Uses

func (n *LiftedCombine) Up(ctx context.Context) error

Up initializes the LiftedCombine.

type MainInput Uses

type MainInput struct {
    Key      FullValue
    Values   []ReStream
    RTracker sdf.RTracker
}

MainInput is the main input and is unfolded in the invocation, if present.

type MergeAccumulators Uses

type MergeAccumulators struct {
    *Combine
}

MergeAccumulators is an executor for merging accumulators from a lifted combine.

func (*MergeAccumulators) ProcessElement Uses

func (n *MergeAccumulators) ProcessElement(ctx context.Context, value *FullValue, values ...ReStream) error

ProcessElement accepts a stream of accumulator values with the same key and runs the MergeAccumulatorsFn over them repeatedly.

func (*MergeAccumulators) String Uses

func (n *MergeAccumulators) String() string

func (*MergeAccumulators) Up Uses

func (n *MergeAccumulators) Up(ctx context.Context) error

Up eagerly gets the optimized binary merge function.

type Multiplex Uses

type Multiplex struct {
    // UID is the unit identifier.
    UID UnitID
    // Out is a list of output nodes.
    Out []Node
}

Multiplex is a fan-out node. It simply forwards any call to all downstream nodes.

func (*Multiplex) Down Uses

func (m *Multiplex) Down(ctx context.Context) error

func (*Multiplex) FinishBundle Uses

func (m *Multiplex) FinishBundle(ctx context.Context) error

func (*Multiplex) ID Uses

func (m *Multiplex) ID() UnitID

func (*Multiplex) ProcessElement Uses

func (m *Multiplex) ProcessElement(ctx context.Context, elm *FullValue, values ...ReStream) error

func (*Multiplex) StartBundle Uses

func (m *Multiplex) StartBundle(ctx context.Context, id string, data DataContext) error

func (*Multiplex) String Uses

func (m *Multiplex) String() string

func (*Multiplex) Up Uses

func (m *Multiplex) Up(ctx context.Context) error

type Node Uses

type Node interface {
    Unit
    ElementProcessor
}

Node represents an single-bundle processing unit. Each node contains its processing continuation, notably other nodes.

type PairWithRestriction Uses

type PairWithRestriction struct {
    UID UnitID
    Fn  *graph.DoFn
    Out Node
    // contains filtered or unexported fields
}

PairWithRestriction is an executor for the expanded SDF step of the same name. This is the first step of an expanded SDF. It pairs each main input element with a restriction via the SDF's associated sdf.RestrictionProvider. This step is followed by SplitAndSizeRestrictions.

func (*PairWithRestriction) Down Uses

func (n *PairWithRestriction) Down(_ context.Context) error

Down currently does nothing.

func (*PairWithRestriction) FinishBundle Uses

func (n *PairWithRestriction) FinishBundle(ctx context.Context) error

FinishBundle resets the invokers.

func (*PairWithRestriction) ID Uses

func (n *PairWithRestriction) ID() UnitID

ID returns the UnitID for this unit.

func (*PairWithRestriction) ProcessElement Uses

func (n *PairWithRestriction) ProcessElement(ctx context.Context, elm *FullValue, values ...ReStream) error

ProcessElement expects elm to be the main input to the ParDo. See exec.FullValue for more details on the expected input.

ProcessElement creates an initial restriction representing the entire input. The output is in the structure <elem, restriction>, where elem is the main input originally passed in (i.e. the parameter elm). Windows and Timestamp are copied to the outer *FullValue. They can be left within the original element, but won't be used by later SDF steps.

Output Diagram:

*FullValue {
  Elm: *FullValue (original input)
  Elm2: Restriction
  Windows
  Timestamps
}

func (*PairWithRestriction) StartBundle Uses

func (n *PairWithRestriction) StartBundle(ctx context.Context, id string, data DataContext) error

StartBundle currently does nothing.

func (*PairWithRestriction) String Uses

func (n *PairWithRestriction) String() string

String outputs a human-readable description of this transform.

func (*PairWithRestriction) Up Uses

func (n *PairWithRestriction) Up(_ context.Context) error

Up performs one-time setup for this executor.

type ParDo Uses

type ParDo struct {
    UID     UnitID
    Fn      *graph.DoFn
    Inbound []*graph.Inbound
    Side    []SideInputAdapter
    Out     []Node

    PID string
    // contains filtered or unexported fields
}

ParDo is a DoFn executor.

func (*ParDo) Down Uses

func (n *ParDo) Down(ctx context.Context) error

Down performs best-effort teardown of DoFn resources. (May not run.)

func (*ParDo) FinishBundle Uses

func (n *ParDo) FinishBundle(ctx context.Context) error

FinishBundle does post-bundle processing operations for the DoFn. Note: This is not a "FinalizeBundle" operation. Data is not yet durably persisted at this point.

func (*ParDo) GetPID Uses

func (n *ParDo) GetPID() string

GetPID returns the PTransformID for this ParDo.

func (*ParDo) ID Uses

func (n *ParDo) ID() UnitID

ID returns the UnitID for this ParDo.

func (*ParDo) ProcessElement Uses

func (n *ParDo) ProcessElement(ctx context.Context, elm *FullValue, values ...ReStream) error

ProcessElement processes each parallel element with the DoFn.

func (*ParDo) StartBundle Uses

func (n *ParDo) StartBundle(ctx context.Context, id string, data DataContext) error

StartBundle does pre-bundle processing operation for the DoFn.

func (*ParDo) String Uses

func (n *ParDo) String() string

func (*ParDo) Up Uses

func (n *ParDo) Up(ctx context.Context) error

Up initializes this ParDo and does one-time DoFn setup.

type Plan Uses

type Plan struct {
    // contains filtered or unexported fields
}

Plan represents the bundle execution plan. It will generally be constructed from a part of a pipeline. A plan can be used to process multiple bundles serially.

func NewPlan Uses

func NewPlan(id string, units []Unit) (*Plan, error)

NewPlan returns a new bundle execution plan from the given units.

func UnmarshalPlan Uses

func UnmarshalPlan(desc *fnpb.ProcessBundleDescriptor) (*Plan, error)

UnmarshalPlan converts a model bundle descriptor into an execution Plan.

func (*Plan) Down Uses

func (p *Plan) Down(ctx context.Context) error

Down takes the plan and associated units down. Does not panic.

func (*Plan) Execute Uses

func (p *Plan) Execute(ctx context.Context, id string, manager DataContext) error

Execute executes the plan with the given data context and bundle id. Units are brought up on the first execution. If a bundle fails, the plan cannot be reused for further bundles. Does not panic. Blocking.

func (*Plan) ID Uses

func (p *Plan) ID() string

ID returns the plan identifier.

func (*Plan) Progress Uses

func (p *Plan) Progress() (ProgressReportSnapshot, bool)

Progress returns a snapshot of input progress of the plan, and associated metrics.

func (*Plan) SourcePTransformID Uses

func (p *Plan) SourcePTransformID() string

SourcePTransformID returns the ID of the data's origin PTransform.

func (*Plan) Split Uses

func (p *Plan) Split(s SplitPoints) (SplitResult, error)

Split takes a set of potential split indexes, and if successful returns the split result. Returns an error when unable to split.

func (*Plan) Store Uses

func (p *Plan) Store() *metrics.Store

Store returns the metric store for the last use of this plan.

func (*Plan) String Uses

func (p *Plan) String() string

type Port Uses

type Port struct {
    URL string
}

Port represents the connection port of external operations.

type ProcessSizedElementsAndRestrictions Uses

type ProcessSizedElementsAndRestrictions struct {
    PDo  *ParDo
    TfId string // Transform ID. Needed for splitting.

    // SU is a buffered channel for indicating when this unit is splittable.
    // When this unit is processing an element, it sends a SplittableUnit
    // interface through the channel. That interface can be received on other
    // threads and used to perform splitting or other related operation.
    //
    // This channel should be received on in a non-blocking manner, to avoid
    // hanging if no element is processing.
    //
    // Receiving the SplittableUnit prevents the current element from finishing
    // processing, so the element does not unexpectedly change during a split.
    // Therefore, receivers of the SplittableUnit must send it back through the
    // channel once finished with it, or it will block indefinitely.
    SU  chan SplittableUnit
    // contains filtered or unexported fields
}

ProcessSizedElementsAndRestrictions is an executor for the expanded SDF step of the same name. It is the final step of the expanded SDF. It sets up and invokes the user's SDF methods, similar to exec.ParDo but with slight changes to support the SDF's method signatures and the expected structure of the FullValue being received.

func (*ProcessSizedElementsAndRestrictions) Down Uses

func (n *ProcessSizedElementsAndRestrictions) Down(ctx context.Context) error

Down calls the ParDo's Down method.

func (*ProcessSizedElementsAndRestrictions) FinishBundle Uses

func (n *ProcessSizedElementsAndRestrictions) FinishBundle(ctx context.Context) error

FinishBundle resets the invokers and then calls the ParDo's FinishBundle method.

func (*ProcessSizedElementsAndRestrictions) GetInputId Uses

func (n *ProcessSizedElementsAndRestrictions) GetInputId() string

GetInputId returns the main input ID, since main input elements are being split.

func (*ProcessSizedElementsAndRestrictions) GetProgress Uses

func (n *ProcessSizedElementsAndRestrictions) GetProgress() float64

GetProgress returns the current restriction tracker's progress as a fraction.

func (*ProcessSizedElementsAndRestrictions) GetTransformId Uses

func (n *ProcessSizedElementsAndRestrictions) GetTransformId() string

GetTransformId returns this transform's transform ID.

func (*ProcessSizedElementsAndRestrictions) ID Uses

func (n *ProcessSizedElementsAndRestrictions) ID() UnitID

ID calls the ParDo's ID method.

func (*ProcessSizedElementsAndRestrictions) ProcessElement Uses

func (n *ProcessSizedElementsAndRestrictions) ProcessElement(_ context.Context, elm *FullValue, values ...ReStream) error

ProcessElement expects the same structure as the output of SplitAndSizeRestrictions, approximately <<elem, restriction>, size>. The only difference is that if the input was decoded in between the two steps, then single-element inputs were lifted from the *FullValue they were stored in.

Input Diagram:

*FullValue {
  Elm: *FullValue {
    Elm:  *FullValue (KV input) or InputType (single-element input)
    Elm2: Restriction
  }
  Elm2: float64 (size)
  Windows
  Timestamps
}

ProcessElement then creates a restriction tracker from the stored restriction and processes each element using the underlying ParDo and adding the restriction tracker to the normal invocation. Sizing information is present but currently ignored. Output is forwarded to the underlying ParDo's outputs.

func (*ProcessSizedElementsAndRestrictions) Split Uses

func (n *ProcessSizedElementsAndRestrictions) Split(f float64) (*FullValue, *FullValue, error)

Split splits the currently processing element using its restriction tracker. Then it returns an element for primary and residual, following the expected input structure to this unit, including updating the size of the split elements.

func (*ProcessSizedElementsAndRestrictions) StartBundle Uses

func (n *ProcessSizedElementsAndRestrictions) StartBundle(ctx context.Context, id string, data DataContext) error

StartBundle calls the ParDo's StartBundle method.

func (*ProcessSizedElementsAndRestrictions) String Uses

func (n *ProcessSizedElementsAndRestrictions) String() string

String outputs a human-readable description of this transform.

func (*ProcessSizedElementsAndRestrictions) Up Uses

func (n *ProcessSizedElementsAndRestrictions) Up(ctx context.Context) error

Up performs some one-time setup and then calls the ParDo's Up method.

type ProgressReportSnapshot Uses

type ProgressReportSnapshot struct {
    ID, Name, PID string
    Count         int64
}

ProgressReportSnapshot captures the progress reading an input source.

TODO(lostluck) 2020/02/06: Add a visitor pattern for collecting progress metrics from downstream Nodes.

type ReStream Uses

type ReStream interface {
    Open() (Stream, error)
}

ReStream is re-iterable stream, i.e., a Stream factory.

type ReshuffleInput Uses

type ReshuffleInput struct {
    UID   UnitID
    SID   StreamID
    Coder *coder.Coder // Coder for the input PCollection.
    Seed  int64
    Out   Node
    // contains filtered or unexported fields
}

ReshuffleInput is a Node.

func (*ReshuffleInput) Down Uses

func (n *ReshuffleInput) Down(ctx context.Context) error

Down is a no-op.

func (*ReshuffleInput) FinishBundle Uses

func (n *ReshuffleInput) FinishBundle(ctx context.Context) error

FinishBundle propagates finish bundle, and clears cached state.

func (*ReshuffleInput) ID Uses

func (n *ReshuffleInput) ID() UnitID

ID returns the unit debug id.

func (*ReshuffleInput) ProcessElement Uses

func (n *ReshuffleInput) ProcessElement(ctx context.Context, value *FullValue, values ...ReStream) error

func (*ReshuffleInput) StartBundle Uses

func (n *ReshuffleInput) StartBundle(ctx context.Context, id string, data DataContext) error

StartBundle is a no-op.

func (*ReshuffleInput) String Uses

func (n *ReshuffleInput) String() string

func (*ReshuffleInput) Up Uses

func (n *ReshuffleInput) Up(ctx context.Context) error

Up initializes the value and window encoders, and the random source.

type ReshuffleOutput Uses

type ReshuffleOutput struct {
    UID   UnitID
    SID   StreamID
    Coder *coder.Coder // Coder for the receiving PCollection.
    Out   Node
    // contains filtered or unexported fields
}

ReshuffleOutput is a Node.

func (*ReshuffleOutput) Down Uses

func (n *ReshuffleOutput) Down(ctx context.Context) error

Down is a no-op.

func (*ReshuffleOutput) FinishBundle Uses

func (n *ReshuffleOutput) FinishBundle(ctx context.Context) error

FinishBundle propagates finish bundle to downstream nodes.

func (*ReshuffleOutput) ID Uses

func (n *ReshuffleOutput) ID() UnitID

ID returns the unit debug id.

func (*ReshuffleOutput) ProcessElement Uses

func (n *ReshuffleOutput) ProcessElement(ctx context.Context, value *FullValue, values ...ReStream) error

func (*ReshuffleOutput) StartBundle Uses

func (n *ReshuffleOutput) StartBundle(ctx context.Context, id string, data DataContext) error

StartBundle is a no-op.

func (*ReshuffleOutput) String Uses

func (n *ReshuffleOutput) String() string

func (*ReshuffleOutput) Up Uses

func (n *ReshuffleOutput) Up(ctx context.Context) error

Up initializes the value and window encoders, and the random source.

type ReusableEmitter Uses

type ReusableEmitter interface {
    // Init resets the value. Can be called multiple times.
    Init(ctx context.Context, ws []typex.Window, t typex.EventTime) error
    // Value returns the side input value. Constant value.
    Value() interface{}
}

ReusableEmitter is a resettable value needed to hold the implicit context and emit event time.

type ReusableInput Uses

type ReusableInput interface {
    // Init initializes the value before use.
    Init() error
    // Value returns the side input value.
    Value() interface{}
    // Reset resets the value after use.
    Reset() error
}

ReusableInput is a resettable value, notably used to unwind iterators cheaply and cache materialized side input across invocations.

type Root Uses

type Root interface {
    Unit

    // Process processes the entire source, notably emitting elements to
    // downstream nodes.
    Process(ctx context.Context) error
}

Root represents a root processing unit. It contains its processing continuation, notably other nodes.

type SdfFallback Uses

type SdfFallback struct {
    PDo *ParDo
    // contains filtered or unexported fields
}

SdfFallback is an executor used when an SDF isn't expanded into steps by the runner, indicating that the runner doesn't support splitting. It executes all the SDF steps together in one unit.

func (*SdfFallback) Down Uses

func (n *SdfFallback) Down(ctx context.Context) error

Down calls the ParDo's Down method.

func (*SdfFallback) FinishBundle Uses

func (n *SdfFallback) FinishBundle(ctx context.Context) error

FinishBundle resets the invokers and then calls the ParDo's FinishBundle method.

func (*SdfFallback) ID Uses

func (n *SdfFallback) ID() UnitID

ID calls the ParDo's ID method.

func (*SdfFallback) ProcessElement Uses

func (n *SdfFallback) ProcessElement(_ context.Context, elm *FullValue, values ...ReStream) error

ProcessElement performs all the work from the steps above in one transform. This means creating initial restrictions, performing initial splits on those restrictions, and then creating restriction trackers and processing each restriction with the underlying ParDo. This executor skips the sizing step because sizing information is unnecessary for unexpanded SDFs.

func (*SdfFallback) StartBundle Uses

func (n *SdfFallback) StartBundle(ctx context.Context, id string, data DataContext) error

StartBundle calls the ParDo's StartBundle method.

func (*SdfFallback) String Uses

func (n *SdfFallback) String() string

String outputs a human-readable description of this transform.

func (*SdfFallback) Up Uses

func (n *SdfFallback) Up(ctx context.Context) error

Up performs some one-time setup and then calls the ParDo's Up method.

type SideInputAdapter Uses

type SideInputAdapter interface {
    NewIterable(ctx context.Context, reader StateReader, w typex.Window) (ReStream, error)
}

SideInputAdapter provides a concrete ReStream from a low-level side input reader. It encapsulates StreamID and coding as needed.

func NewSideInputAdapter Uses

func NewSideInputAdapter(sid StreamID, sideInputID string, c *coder.Coder) SideInputAdapter

NewSideInputAdapter returns a side input adapter for the given StreamID and coder. It expects a W<KV<K,V>> coder, because the protocol supports MultiSet access only.

type SplitAndSizeRestrictions Uses

type SplitAndSizeRestrictions struct {
    UID UnitID
    Fn  *graph.DoFn
    Out Node
    // contains filtered or unexported fields
}

SplitAndSizeRestrictions is an executor for the expanded SDF step of the same name. It is the second step of the expanded SDF, occuring after CreateInitialRestriction. It performs initial splits on the initial restrictions and adds sizing information, producing one or more output elements per input element. This step is followed by ProcessSizedElementsAndRestrictions.

func (*SplitAndSizeRestrictions) Down Uses

func (n *SplitAndSizeRestrictions) Down(_ context.Context) error

Down currently does nothing.

func (*SplitAndSizeRestrictions) FinishBundle Uses

func (n *SplitAndSizeRestrictions) FinishBundle(ctx context.Context) error

FinishBundle resets the invokers.

func (*SplitAndSizeRestrictions) ID Uses

func (n *SplitAndSizeRestrictions) ID() UnitID

ID returns the UnitID for this unit.

func (*SplitAndSizeRestrictions) ProcessElement Uses

func (n *SplitAndSizeRestrictions) ProcessElement(ctx context.Context, elm *FullValue, values ...ReStream) error

ProcessElement expects elm.Elm to hold the original input while elm.Elm2 contains the restriction.

Input Diagram:

*FullValue {
  Elm: *FullValue (original input)
  Elm2: Restriction
  Windows
  Timestamps
}

ProcessElement splits the given restriction into one or more restrictions and then sizes each. The outputs are in the structure <<elem, restriction>, size> where elem is the original main input to the unexpanded SDF. Windows and Timestamps are copied to each split output.

Output Diagram:

*FullValue {
  Elm: *FullValue {
    Elm:  *FullValue (original input)
    Elm2: Restriction
  }
  Elm2: float64 (size)
  Windows
  Timestamps
}

func (*SplitAndSizeRestrictions) StartBundle Uses

func (n *SplitAndSizeRestrictions) StartBundle(ctx context.Context, id string, data DataContext) error

StartBundle currently does nothing.

func (*SplitAndSizeRestrictions) String Uses

func (n *SplitAndSizeRestrictions) String() string

String outputs a human-readable description of this transform.

func (*SplitAndSizeRestrictions) Up Uses

func (n *SplitAndSizeRestrictions) Up(_ context.Context) error

Up performs one-time setup for this executor.

type SplitPoints Uses

type SplitPoints struct {
    // Splits is a list of desired split indices.
    Splits []int64
    Frac   float64

    // Estimated total number of elements (including unsent) for the source.
    // A zero value indicates unknown, instead use locally known size.
    BufSize int64
}

SplitPoints captures the split requested by the Runner.

type SplitResult Uses

type SplitResult struct {
    // Indices are always included, for both channel and sub-element splits.
    PI  int64 // Primary index, last element of the primary.
    RI  int64 // Residual index, first element of the residual.

    // Extra information included for sub-element splits. If PS and RS are
    // present then a sub-element split occurred.
    PS   []byte // Primary split. If an element is split, this is the encoded primary.
    RS   []byte // Residual split. If an element is split, this is the encoded residual.
    TId  string // Transform ID of the transform receiving the split elements.
    InId string // Input ID of the input the split elements are received from.
}

SplitResult contains the result of performing a split on a Plan.

type SplittableUnit Uses

type SplittableUnit interface {
    // Split performs a split on a fraction of a currently processing element
    // and returns the primary and residual elements resulting from it, or an
    // error if the split failed.
    Split(fraction float64) (primary, residual *FullValue, err error)

    // GetProgress returns the fraction of progress the current element has
    // made in processing. (ex. 0.0 means no progress, and 1.0 means fully
    // processed.)
    GetProgress() float64

    // GetTransformId returns the transform ID of the splittable unit.
    GetTransformId() string

    // GetInputId returns the local input ID of the input that the element being
    // split was received from.
    GetInputId() string
}

SplittableUnit is an interface that defines sub-element splitting operations for a unit, and provides access to them on other threads.

type StateReader Uses

type StateReader interface {
    // OpenSideInput opens a byte stream for reading iterable side input.
    OpenSideInput(ctx context.Context, id StreamID, sideInputID string, key, w []byte) (io.ReadCloser, error)
    // OpenIterable opens a byte stream for reading unwindowed iterables from the runner.
    OpenIterable(ctx context.Context, id StreamID, key []byte) (io.ReadCloser, error)
}

StateReader is the interface for reading side input data.

type Status Uses

type Status int

Status is the status of a unit or plan.

const (
    Initializing Status = iota
    Up
    Active
    Broken
    Down
)

func (Status) String Uses

func (s Status) String() string

type Stream Uses

type Stream interface {
    io.Closer
    Read() (*FullValue, error)
}

Stream is a FullValue reader. It returns io.EOF when complete, but can be prematurely closed.

type StreamID Uses

type StreamID struct {
    Port         Port
    PtransformID string
}

StreamID represents the static information needed to identify a data stream. Dynamic information, notably bundleID, is provided implicitly by the managers.

func (StreamID) String Uses

func (id StreamID) String() string

type Unit Uses

type Unit interface {
    // ID returns the unit ID.
    ID() UnitID

    // Up initializes the unit. It is separate from Unit construction to
    // make panic/error handling easier.
    Up(ctx context.Context) error

    // StartBundle signals that processing preconditions, such as availability
    // of side input, are met and starts the given bundle.
    StartBundle(ctx context.Context, id string, data DataContext) error

    // FinishBundle signals end of input and thus finishes the bundle. Any
    // data connections must be closed.
    FinishBundle(ctx context.Context) error

    // Down tears down the processing node. It is notably called if the unit
    // or plan encounters an error and must thus robustly handle cleanup of
    // unfinished bundles. If a unit itself (as opposed to downstream units)
    // is the cause of breakage, the error returned should indicate the root
    // cause.
    Down(ctx context.Context) error
}

Unit represents a processing unit capable of processing multiple bundles serially. Units are not required to be concurrency-safe. Each unit is responsible for propagating each data processing call downstream, i.e., all calls except Up/Down, as appropriate.

type UnitID Uses

type UnitID int

UnitID is a unit identifier. Used for debugging.

func IDs Uses

func IDs(list ...Node) []UnitID

IDs returns the unit IDs of the given nodes.

type WindowDecoder Uses

type WindowDecoder interface {
    // Decode deserializes a value from the given reader.
    Decode(io.Reader) ([]typex.Window, error)
}

WindowDecoder handles Window deserialization from a byte stream. The decoder can be reused, even if an error is encountered. Concurrency-safe.

func MakeWindowDecoder Uses

func MakeWindowDecoder(c *coder.WindowCoder) WindowDecoder

MakeWindowDecoder returns a WindowDecoder for the given window coder.

type WindowEncoder Uses

type WindowEncoder interface {
    // Encode serializes the given value to the writer.
    Encode([]typex.Window, io.Writer) error
    EncodeSingle(typex.Window, io.Writer) error
}

WindowEncoder handles Window serialization to a byte stream. The encoder can be reused, even if an error is encountered. Concurrency-safe.

func MakeWindowEncoder Uses

func MakeWindowEncoder(c *coder.WindowCoder) WindowEncoder

MakeWindowEncoder returns a WindowEncoder for the given window coder.

type WindowInto Uses

type WindowInto struct {
    UID UnitID
    Fn  *window.Fn
    Out Node
}

WindowInto places each element in one or more windows.

func (*WindowInto) Down Uses

func (w *WindowInto) Down(ctx context.Context) error

func (*WindowInto) FinishBundle Uses

func (w *WindowInto) FinishBundle(ctx context.Context) error

func (*WindowInto) ID Uses

func (w *WindowInto) ID() UnitID

func (*WindowInto) ProcessElement Uses

func (w *WindowInto) ProcessElement(ctx context.Context, elm *FullValue, values ...ReStream) error

func (*WindowInto) StartBundle Uses

func (w *WindowInto) StartBundle(ctx context.Context, id string, data DataContext) error

func (*WindowInto) String Uses

func (w *WindowInto) String() string

func (*WindowInto) Up Uses

func (w *WindowInto) Up(ctx context.Context) error

Directories

PathSynopsis
optimizedPackage optimized contains type-specialized shims for faster execution.

Package exec imports 38 packages (graph) and is imported by 11 packages. Updated 2020-09-01. Refresh now. Tools for package owners.