beam

package
v0.0.0-...-498d591 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 11, 2023 License: Unlicense Imports: 26 Imported by: 0

Documentation

Overview

Package beam is an experimental mockup of an Apache Beam Go SDK API that leverages generics, and a more opinionated construction method. It exists to explore the ergonomics and feasibility of such an approach.

This one in particular is a variant on allinone, which avoids the use of separate goroutines and channels to pass around elements.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Expand

func Expand[I Composite[O], O any](parent *Scope, name string, comp I) O

func MakeCoder

func MakeCoder[E any]() coders.Coder[E]

func ParDo

func ParDo[E Element, DF Transform[E]](s *Scope, input Output[E], dofn DF, opts ...Options) DF

ParDo takes the users's DoFn and returns the same type for downstream piepline construction.

The returned DoFn's emitter fields can then be used as inputs into other DoFns. What if we used Emitters as PCollections directly? Obviously, we'd rename the type PCollection or similar If only to also

Types

type AccumulatorCreator

type AccumulatorCreator[A Element] interface {
	CreateAccumulator() A
	AccumulatorMerger[A]
}

AccumulatorCreator is an interface to allow combiners to produce a more sophisticated accumulator type, when the zero value is inappropriate for accumulation.

type AccumulatorMerger

type AccumulatorMerger[A Element] interface {
	MergeAccumulators(A, A) A
}

AccumulatorMerger is an interface for combiners that only need a binary merge, and the input, output, and accumulator types are all the same.

type AfterBundle

type AfterBundle struct {
	// contains filtered or unexported fields
}

AfterBundle allows a DoFn to register a function that runs after the bundle has been durably committed. Emiting elements here will fail.

TODO consider moving this to a simple interface function. Upside, not likely to try to incorrectly emit in the closure. Downside, the caching for anything to finalize needs to be stored in the DoFn struct this violates the potential of a ConfigOnly DoFn.

func (*AfterBundle) Do

func (*AfterBundle) Do(dfc bundleFinalizer, finalizeBundle func() error)

type Combiner

type Combiner[A, I, O Element, AM AccumulatorMerger[A]] struct {
	// contains filtered or unexported fields
}

Combiners represent an optimizable approach to aggregating, by breaking down the aggregation into 3 component types.

func AddMerge

func AddMerge[A, I Element, IA InputAdder[A, I]](c IA) Combiner[A, I, A, IA]

AddMerge produces a Combiner from an InputAdder.

func FullCombine

func FullCombine[A, I, O Element, C FullCombiner[A, I, O]](c C) Combiner[A, I, O, C]

MergeExtract produces a Combiner from a FullCombiner.

func MergeExtract

func MergeExtract[A, O Element, OE OutputExtractor[A, O]](c OE) Combiner[A, A, O, OE]

MergeExtract produces a Combiner from an OutputExtractor.

func SimpleMerge

func SimpleMerge[A Element, AM AccumulatorMerger[A]](c AM) Combiner[A, A, A, AM]

SimpleMerge produces a Combiner from an AccumulatorMerger.

type Composite

type Composite[O any] interface {
	Expand(s *Scope) O
}

Composite transforms allow structural re-use of sub pipelines.

type Counter

type Counter struct {
	// contains filtered or unexported fields
}

func (*Counter) Inc

func (c *Counter) Inc(dfc metricSource, diff int64)

type DFC

type DFC[E Element] struct {
	// contains filtered or unexported fields
}

DFC is the DoFn Context for simple DoFns.

func (*DFC[E]) Process

func (c *DFC[E]) Process(perElm Process[E]) error

Process is where you set the per Element processing function that accepts elements. Process returns an error to allow inlining with the error return from a Transform's ProcessBundle method.

func (*DFC[E]) ToElmC

func (c *DFC[E]) ToElmC(eventTime time.Time) ElmC

ToElmC is to get the appropriate element context for elements not derived from a specific element directly.

This derives the element windows, and sets a no-firing pane.

type Element

type Element interface {
	any // Sadly, can't really restrict this without breaking iterators in GBK results.
}

type ElmC

type ElmC struct {
	// contains filtered or unexported fields
}

ElmC is the catch all context for the current element.

This includes * Key (state and timers) * Windows * Timestamp * Pane

Provides the downstream emission context, so it actually sends data to the next DoFn.

func (*ElmC) EventTime

func (e *ElmC) EventTime() time.Time

type FullCombiner

type FullCombiner[A, I, O Element] interface {
	InputAdder[A, I]
	AccumulatorMerger[A]
	OutputExtractor[A, O]
}

type InputAdder

type InputAdder[A, I Element] interface {
	AddInput(A, I) A
	AccumulatorMerger[A]
}

InputAdder is an interface to allow combiners to incorporate an input type

type Iter

type Iter[V Element] struct {
	// contains filtered or unexported fields
}

func (*Iter[V]) All

func (it *Iter[V]) All() func(perElm func(elm V) bool)

All allows a single iteration of its stream of values.

type KV

type KV[K, V Element] struct {
	Key   K
	Value V
}

type Keys

type Keys interface {
	comparable
}

type ObserveWindow

type ObserveWindow struct{}

ObserveWindow indicates this DoFn needs to be aware of windows explicitly. Typical use is to embed ObserveWindows as a field.

func (*ObserveWindow) Of

func (*ObserveWindow) Of(ec ElmC) any

type OnBundleFinish

type OnBundleFinish struct{}

OnBundleFinish allows a DoFn to register a function that runs just before a bundle finishes. Elements may be emitted downstream, if an ElmC is retrieved from the DFC.

func (*OnBundleFinish) Do

func (*OnBundleFinish) Do(dfc bundleFinisher, finishBundle func() error)

Do registers a callback to execute after all bundle elements have been processed. Any resources that a DoFn needs explicitly cleaned up explicitly rather than implicitly via garbage collection, should be called here.

Only a single callback may be registered, and it will be the last one passed to Do.

type Options

type Options = beamopts.Options

Options configure Run, ParDo, and Combine with specific features. Each function takes a variadic list of options, where properties set in later options override the value of previously set properties.

func Endpoint

func Endpoint(endpoint string) Options

Endpoint sets the url when applicable, such as the JobManagement endpoint for submitting jobs or for configuring a target for expansion services.

func Name

func Name(name string) Options

Name sets the name of the pipeline or transform in question, typically to make it easier to refer to.

type Output

type Output[E Element] struct {
	// contains filtered or unexported fields
}

Output represents an output of a DoFn.

At pipeline construction time, they represent an output PCollection, and can be connected as inputs to downstream DoFns.

At pipeline execution time, they are used in a ProcessBundle method to emit elements and pass along per element context, such as the EventTime and Window.

func CombinePerKey

func CombinePerKey[K Keys, A, I, O Element, AM AccumulatorMerger[A]](s *Scope, input Output[KV[K, I]], comb Combiner[A, I, O, AM]) Output[KV[K, O]]

func Flatten

func Flatten[E Element](s *Scope, inputs ...Output[E]) Output[E]

Flatten joins together multiple Emitters of the same type into a single Emitter for downstream consumption.

func GBK

func GBK[K Keys, V Element](s *Scope, input Output[KV[K, V]], opts ...Options) Output[KV[K, Iter[V]]]

GBK produces an output PCollection of grouped values.

func Impulse

func Impulse(s *Scope) Output[[]byte]

Impulse adds an impulse transform to the graph, which emits single element to downstream transforms, allowing processing to begin.

The element is a single byte slice in the global window, with an event timestamp at the start of the global window.

func Reshuffle

func Reshuffle[E Element](s *Scope, input Output[E], opts ...Options) Output[E]

Reshuffle inserts a fusion break in the pipeline, preventing a producer transform from being fused with the consuming transform.

func (*Output[E]) Emit

func (emt *Output[E]) Emit(ec ElmC, elm E)

Emit the element within the current element's context.

The ElmC value is sourced from the DFC.Process method.

type OutputExtractor

type OutputExtractor[A, O Element] interface {
	AccumulatorMerger[A]
	ExtractOutput(A) O
}

type Pipeline

type Pipeline struct {
	Counters map[string]int64
}

Pipeline is a handle to a running or terminated pipeline for programmatic access to the given runner.

func Run

func Run(ctx context.Context, expand func(*Scope) error, opts ...Options) (Pipeline, error)

Run begins executes the pipeline built in the construction function.

type Process

type Process[E Element] func(ElmC, E) error

Process is the function type for handling a single element in a bundle.

Typically a closure returned from a Transform's ProcessBundle method.

Errors returned from Process functions abort bundle processing, and may cause pipeline termination. A runner may retry a bundle that has failed.

type Scope

type Scope struct {
	// contains filtered or unexported fields
}

Scope is used for building pipeline graphs.

Scope is a hierarchical grouping for composite transforms. Scopes can be enclosed in other scopes and for a tree structure. For pipeline updates, the scope chain form a unique name. The scope chain can also be used for monitoring and visualization purposes.

func (*Scope) String

func (s *Scope) String() string

type SideInputIter

type SideInputIter[E Element] struct {
	// contains filtered or unexported fields
}

func AsSideIter

func AsSideIter[E Element](emt Output[E]) SideInputIter[E]

AsSideIter initializes an IterSideInput from a valid upstream Emitter. It allows access to the data of that Emitter's PCollection,

func (*SideInputIter[E]) All

func (si *SideInputIter[E]) All(ec ElmC) func(perElm func(elm E) bool)

type SideInputMap

type SideInputMap[K, V Element] struct {
	// contains filtered or unexported fields
}

SideInputMap allows a side input to be accessed via multip-map key lookups.

func AsSideMap

func AsSideMap[K, V Element](emt Output[KV[K, V]]) SideInputMap[K, V]

AsSideMap initializes a MapSideInput from a valid upstream Emitter.

func (*SideInputMap[K, V]) Get

func (si *SideInputMap[K, V]) Get(ec ElmC, k K) func(perElm func(elm V) bool)

Get looks up an iterator of values associated with the key.

func (*SideInputMap[K, V]) Keys

func (si *SideInputMap[K, V]) Keys(ec ElmC) func(perElm func(elm K) bool)

Get looks up an iterator of values associated with the key.

type StateBag

type StateBag[E Element] struct {
	// contains filtered or unexported fields
}

type StateCombining

type StateCombining[E Element] struct {
	// contains filtered or unexported fields
}

type StateMap

type StateMap[K, V Element] struct {
	// contains filtered or unexported fields
}

type StateSet

type StateSet[E Element] struct {
	// contains filtered or unexported fields
}

type StateValue

type StateValue[E Element] struct {
	// contains filtered or unexported fields
}

type TimerEvent

type TimerEvent struct {
	// contains filtered or unexported fields
}

type TimerProcessing

type TimerProcessing struct {
	// contains filtered or unexported fields
}

type Transform

type Transform[E Element] interface {
	ProcessBundle(ctx context.Context, dfc *DFC[E]) error
}

Transform is the only interface that needs to be implemented by most DoFns.

Directories

Path Synopsis
extworker
Package extworker provides an external worker service and related utilities.
Package extworker provides an external worker service and related utilities.
pipelinex
Package pipelinex contains utilities for manipulating Beam proto pipelines.
Package pipelinex contains utilities for manipulating Beam proto pipelines.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL