beam: github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx Index | Files | Directories

package graphx

import "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx"

Package graphx provides facilities to help with the serialization of pipelines into a serializable graph structure suitable for the worker.

The registry's Register function is used by transform authors to make their type's methods available for remote invocation. The runner then uses the registry's Key and Lookup methods to access information supplied by transform authors.

The Encode* and Decode* methods are used to handle serialization of both regular Go data and the specific Beam data types. The Encode* methods are used after pipeline construction to turn the plan into a serializable form that can be sent for remote execution. The Decode* methods are used by the runner to recover the execution plan from the serialized form.

Index

Package Files

coder.go cogbk.go dataflow.go doc.go serialize.go translate.go tree.go user.go xlang.go

Constants

const (
    URNInject = "beam:go:transform:inject:v1"
    URNExpand = "beam:go:transform:expand:v1"
)
const (
    URNImpulse       = "beam:transform:impulse:v1"
    URNParDo         = "beam:transform:pardo:v1"
    URNFlatten       = "beam:transform:flatten:v1"
    URNGBK           = "beam:transform:group_by_key:v1"
    URNReshuffle     = "beam:transform:reshuffle:v1"
    URNCombinePerKey = "beam:transform:combine_per_key:v1"
    URNWindow        = "beam:transform:window:v1"

    // URNIterableSideInput = "beam:side_input:iterable:v1"
    URNMultimapSideInput = "beam:side_input:multimap:v1"

    URNGlobalWindowsWindowFn  = "beam:window_fn:global_windows:v1"
    URNFixedWindowsWindowFn   = "beam:window_fn:fixed_windows:v1"
    URNSlidingWindowsWindowFn = "beam:window_fn:sliding_windows:v1"
    URNSessionsWindowFn       = "beam:window_fn:session_windows:v1"

    // SDK constants
    URNDoFn = "beam:go:transform:dofn:v1"

    URNIterableSideInputKey = "beam:go:transform:iterablesideinputkey:v1"
    URNReshuffleInput       = "beam:go:transform:reshuffleinput:v1"
    URNReshuffleOutput      = "beam:go:transform:reshuffleoutput:v1"

    URNLegacyProgressReporting = "beam:protocol:progress_reporting:v0"
    URNMultiCore               = "beam:protocol:multi_core_bundle_processing:v1"

    URNRequiresSplittableDoFn = "beam:requirement:pardo:splittable_dofn:v1"

    URNArtifactGoWorker  = "beam:artifact:type:go_worker_binary:v1"
    URNArtifactStagingTo = "beam:artifact:role:staging_to:v1"
)

Model constants for interfacing with a Beam runner. TODO(lostluck): 2018/05/28 Extract these from their enum descriptors in the pipeline_v1 proto

func CreateEnvironment Uses

func CreateEnvironment(ctx context.Context, urn string, extractEnvironmentConfig func(context.Context) string) *pipepb.Environment

CreateEnvironment produces the appropriate payload for the type of environment.

func DecodeCoder Uses

func DecodeCoder(data string) (*coder.Coder, error)

DecodeCoder decodes a coder. Any custom coder function symbol must be resolvable via the runtime.GlobalSymbolResolver. The types must be encodable.

func DecodeCoderRef Uses

func DecodeCoderRef(c *CoderRef) (*coder.Coder, error)

DecodeCoderRef extracts a usable coder from the encoded runner form.

func DecodeCoderRefs Uses

func DecodeCoderRefs(list []*CoderRef) ([]*coder.Coder, error)

DecodeCoderRefs extracts usable coders from the encoded runner form.

func DecodeFn Uses

func DecodeFn(data string) (reflectx.Func, error)

DecodeFn encodes a function. The function symbol must be resolvable via the runtime.GlobalSymbolResolver. The parameter types must be encodable.

func DecodeMultiEdge Uses

func DecodeMultiEdge(edge *v1pb.MultiEdge) (graph.Opcode, *graph.Fn, *window.Fn, []*graph.Inbound, []*graph.Outbound, error)

DecodeMultiEdge converts the wire representation into the preprocessed components representing that edge. We deserialize to components to avoid inserting the edge into a graph or creating a detached edge.

func DecodeType Uses

func DecodeType(data string) (reflect.Type, error)

DecodeType decodes a type. Unless registered, the decoded type is only guaranteed to be isomorphic to the input with regard to data members. The returned type will have no methods.

func EncodeCoder Uses

func EncodeCoder(c *coder.Coder) (string, error)

EncodeCoder encodes a coder as a string. Any custom coder function symbol must be resolvable via the runtime.GlobalSymbolResolver. The types must be encodable.

func EncodeFn Uses

func EncodeFn(fn reflectx.Func) (string, error)

EncodeFn encodes a function and parameter types as a string. The function symbol must be resolvable via the runtime.GlobalSymbolResolver. The types must be encodable.

func EncodeMultiEdge Uses

func EncodeMultiEdge(edge *graph.MultiEdge) (*v1pb.MultiEdge, error)

EncodeMultiEdge converts the preprocessed representation into the wire representation of the multiedge, capturing input and output type information.

func EncodeType Uses

func EncodeType(t reflect.Type) (string, error)

EncodeType encodes a type as a string. Unless registered, the decoded type is only guaranteed to be isomorphic to the input with regard to data members. The returned type will have no methods.

func ExpandedComponents Uses

func ExpandedComponents(exp *graph.ExpandedTransform) *pipepb.Components

ExpandedComponents type asserts the Components field with interface{} type and returns its pipeline component proto representation

func ExpandedTransform Uses

func ExpandedTransform(exp *graph.ExpandedTransform) *pipepb.PTransform

ExpandedTransform type asserts the Transform field with interface{} type and returns its pipeline ptransform proto representation

func ExternalInputs Uses

func ExternalInputs(e *graph.MultiEdge) map[string]*graph.Node

ExternalInputs returns the map (tag -> graph node representing the pcollection) of input nodes with respect to the map (tag -> index of Inbound in MultiEdge.Input) of named inputs

func ExternalOutputs Uses

func ExternalOutputs(e *graph.MultiEdge) map[string]*graph.Node

ExternalOutputs returns the map (tag -> graph node representing the pcollection) of output nodes with respect to the map (tag -> index of Outbound in MultiEdge.Output) of named outputs

func MakeGBKUnionCoder Uses

func MakeGBKUnionCoder(gbk *graph.MultiEdge) *coder.Coder

MakeGBKUnionCoder returns CoGBK<K,KV<int,[]byte>> for a given CoGBK.

func MakeKVUnionCoder Uses

func MakeKVUnionCoder(gbk *graph.MultiEdge) *coder.Coder

MakeKVUnionCoder returns KV<K,KV<int,[]byte>> for a given CoGBK.

func Marshal Uses

func Marshal(edges []*graph.MultiEdge, opt *Options) (*pipepb.Pipeline, error)

Marshal converts a graph to a model pipeline.

func MarshalCoders Uses

func MarshalCoders(coders []*coder.Coder) ([]string, map[string]*pipepb.Coder)

MarshalCoders marshals a list of coders into model coders.

func UnmarshalCoders Uses

func UnmarshalCoders(ids []string, m map[string]*pipepb.Coder) ([]*coder.Coder, error)

UnmarshalCoders unmarshals coders.

type CoderMarshaller Uses

type CoderMarshaller struct {
    // contains filtered or unexported fields
}

CoderMarshaller incrementally builds a compact model representation of a set of coders. Identical coders are shared.

func NewCoderMarshaller Uses

func NewCoderMarshaller() *CoderMarshaller

NewCoderMarshaller returns a new CoderMarshaller.

func (*CoderMarshaller) Add Uses

func (b *CoderMarshaller) Add(c *coder.Coder) string

Add adds the given coder to the set and returns its id. Idempotent.

func (*CoderMarshaller) AddMulti Uses

func (b *CoderMarshaller) AddMulti(list []*coder.Coder) []string

AddMulti adds the given coders to the set and returns their ids. Idempotent.

func (*CoderMarshaller) AddWindowCoder Uses

func (b *CoderMarshaller) AddWindowCoder(w *coder.WindowCoder) string

AddWindowCoder adds a window coder.

func (*CoderMarshaller) Build Uses

func (b *CoderMarshaller) Build() map[string]*pipepb.Coder

Build returns the set of model coders. Note that the map may be larger than the number of coders added, because component coders are included.

type CoderRef Uses

type CoderRef struct {
    Type                 string      `json:"@type,omitempty"`
    Components           []*CoderRef `json:"component_encodings,omitempty"`
    IsWrapper            bool        `json:"is_wrapper,omitempty"`
    IsPairLike           bool        `json:"is_pair_like,omitempty"`
    IsStreamLike         bool        `json:"is_stream_like,omitempty"`
    PipelineProtoCoderID string      `json:"pipeline_proto_coder_id,omitempty"`
}

CoderRef defines the (structured) Coder in serializable form. It is an artifact of the CloudObject encoding.

func EncodeCoderRef Uses

func EncodeCoderRef(c *coder.Coder) (*CoderRef, error)

EncodeCoderRef returns the encoded form understood by the runner.

func EncodeCoderRefs Uses

func EncodeCoderRefs(list []*coder.Coder) ([]*CoderRef, error)

EncodeCoderRefs returns the encoded forms understood by the runner.

func WrapIterable Uses

func WrapIterable(c *CoderRef) *CoderRef

WrapIterable adds an iterable (stream) coder for Dataflow side input.

func WrapWindowed Uses

func WrapWindowed(c *CoderRef, wc *coder.WindowCoder) *CoderRef

WrapWindowed adds a windowed coder for Dataflow collections.

type CoderUnmarshaller Uses

type CoderUnmarshaller struct {
    // contains filtered or unexported fields
}

CoderUnmarshaller is an incremental unmarshaller of model coders. Identical coders are shared.

func NewCoderUnmarshaller Uses

func NewCoderUnmarshaller(m map[string]*pipepb.Coder) *CoderUnmarshaller

NewCoderUnmarshaller returns a new CoderUnmarshaller.

func (*CoderUnmarshaller) Coder Uses

func (b *CoderUnmarshaller) Coder(id string) (*coder.Coder, error)

Coder unmarshals a coder with the given id.

func (*CoderUnmarshaller) Coders Uses

func (b *CoderUnmarshaller) Coders(ids []string) ([]*coder.Coder, error)

Coders unmarshals a list of coder ids.

func (*CoderUnmarshaller) WindowCoder Uses

func (b *CoderUnmarshaller) WindowCoder(id string) (*coder.WindowCoder, error)

WindowCoder unmarshals a window coder with the given id.

type NamedEdge Uses

type NamedEdge struct {
    Name string
    Edge *graph.MultiEdge
}

NamedEdge is a named MultiEdge.

type NamedScope Uses

type NamedScope struct {
    Name  string
    Scope *graph.Scope
}

NamedScope is a named Scope.

type Options Uses

type Options struct {
    // Environment used to run the user code.
    Environment *pipepb.Environment
}

Options for marshalling a graph into a model pipeline.

type ScopeTree Uses

type ScopeTree struct {
    // Scope is the named scope at the root of the (sub)tree.
    Scope NamedScope
    // Edges are the named edges directly under this scope.
    Edges []NamedEdge

    // Children are the scopes directly under this scope.
    Children []*ScopeTree
}

ScopeTree is a convenient representation of the Scope-structure as a tree. Each ScopeTree may also be a subtree of another ScopeTree. The tree structure respects the underlying Scope structure, i.e., if Scope 'a' has a parent 'b' then the ScopeTree for 'b' must have the ScopeTree for 'a' as a child.

func NewScopeTree Uses

func NewScopeTree(edges []*graph.MultiEdge) *ScopeTree

NewScopeTree computes the ScopeTree for a set of edges.

Directories

PathSynopsis
schemaPackage schema contains utility functions for relating Go types and Beam Schemas.
v1Package v1 is a generated protocol buffer package.

Package graphx imports 22 packages (graph) and is imported by 7 packages. Updated 2020-09-20. Refresh now. Tools for package owners.