engine

package
v0.0.0-...-498d591 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 11, 2023 License: Unlicense Imports: 12 Imported by: 0

Documentation

Overview

Package engine handles the operational components of a runner, to track elements, watermarks, timers, triggers etc

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Config

type Config struct {
	// MaxBundleSize caps the number of elements permitted in a bundle.
	// 0 or less means this is ignored.
	MaxBundleSize int
}

type ElementManager

type ElementManager struct {
	// contains filtered or unexported fields
}

There comes a time in every Beam runner's life that they develop a type called a Watermark Manager, that becomes the core of the runner. Given that handling the watermark is the core of the Beam model, this is not super surprising.

Essentially, it needs to track the current watermarks for each pcollection and transform/stage. But it's tricky, since the watermarks for the PCollections are always relative to transforms/stages.

Key parts:

  • The parallel input's PCollection's watermark is relative to committed consumed elements. That is, the input elements consumed by the transform after a successful bundle, can advance the watermark, based on the minimum of their elements.
  • An output PCollection's watermark is relative to its producing transform, which relates to *all of it's outputs*.

This means that a PCollection's watermark is the minimum of all it's consuming transforms.

So, the watermark manager needs to track: Pending Elements for each stage, along with their windows and timestamps. Each transform's view of the watermarks for the PCollections.

Watermarks are always advanced based on consumed input.

func NewElementManager

func NewElementManager(config Config) *ElementManager

func (*ElementManager) AddStage

func (em *ElementManager) AddStage(ID string, inputIDs, sides, outputIDs []string)

AddStage adds a stage to this element manager, connecting it's pcollections and nodes to the watermark propagation graph.

func (*ElementManager) Bundles

func (em *ElementManager) Bundles(ctx context.Context, nextBundID func() string) <-chan RunBundle

func (*ElementManager) Impulse

func (em *ElementManager) Impulse(stageID string)

func (*ElementManager) InputForBundle

func (em *ElementManager) InputForBundle(rb RunBundle, info PColInfo) [][]byte

InputForBundle returns pre-allocated data for the given bundle, encoding the elements using the pcollection's coders.

func (*ElementManager) PersistBundle

func (em *ElementManager) PersistBundle(rb RunBundle, col2Coders map[string]PColInfo, d TentativeData, inputInfo PColInfo, residuals [][]byte)

PersistBundle updates the watermarks for the stage. Each stage has two monotonically increasing watermarks, the input watermark, and the output watermark.

MAX(CurrentInputWatermark, MIN(PendingElements, InputPCollectionWatermarks) MAX(CurrentOutputWatermark, MIN(InputWatermark, WatermarkHolds))

PersistBundle takes in the stage ID, ID of the bundle associated with the pending input elements, and the committed output elements.

type PColInfo

type PColInfo struct {
	GlobalID string
	WDec     exec.WindowDecoder
	WEnc     exec.WindowEncoder
	EDec     func(io.Reader) []byte
}

type RunBundle

type RunBundle struct {
	StageID   string
	BundleID  string
	Watermark mtime.Time
}

func (RunBundle) LogValue

func (rb RunBundle) LogValue() slog.Value

type TentativeData

type TentativeData struct {
	Raw map[string][][]byte
}

func (*TentativeData) WriteData

func (d *TentativeData) WriteData(colID string, data []byte)

WriteData adds data to a given global collectionID.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL