Documentation ¶
Overview ¶
Package engine handles the operational components of a runner, to track elements, watermarks, timers, triggers etc
Index ¶
- type Config
- type ElementManager
- func (em *ElementManager) AddStage(ID string, inputIDs, sides, outputIDs []string)
- func (em *ElementManager) Bundles(ctx context.Context, nextBundID func() string) <-chan RunBundle
- func (em *ElementManager) Impulse(stageID string)
- func (em *ElementManager) InputForBundle(rb RunBundle, info PColInfo) [][]byte
- func (em *ElementManager) PersistBundle(rb RunBundle, col2Coders map[string]PColInfo, d TentativeData, ...)
- type PColInfo
- type RunBundle
- type TentativeData
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Config ¶
type Config struct { // MaxBundleSize caps the number of elements permitted in a bundle. // 0 or less means this is ignored. MaxBundleSize int }
type ElementManager ¶
type ElementManager struct {
// contains filtered or unexported fields
}
There comes a time in every Beam runner's life that they develop a type called a Watermark Manager, that becomes the core of the runner. Given that handling the watermark is the core of the Beam model, this is not super surprising.
Essentially, it needs to track the current watermarks for each pcollection and transform/stage. But it's tricky, since the watermarks for the PCollections are always relative to transforms/stages.
Key parts:
- The parallel input's PCollection's watermark is relative to committed consumed elements. That is, the input elements consumed by the transform after a successful bundle, can advance the watermark, based on the minimum of their elements.
- An output PCollection's watermark is relative to its producing transform, which relates to *all of it's outputs*.
This means that a PCollection's watermark is the minimum of all it's consuming transforms.
So, the watermark manager needs to track: Pending Elements for each stage, along with their windows and timestamps. Each transform's view of the watermarks for the PCollections.
Watermarks are always advanced based on consumed input.
func NewElementManager ¶
func NewElementManager(config Config) *ElementManager
func (*ElementManager) AddStage ¶
func (em *ElementManager) AddStage(ID string, inputIDs, sides, outputIDs []string)
AddStage adds a stage to this element manager, connecting it's pcollections and nodes to the watermark propagation graph.
func (*ElementManager) Bundles ¶
func (em *ElementManager) Bundles(ctx context.Context, nextBundID func() string) <-chan RunBundle
func (*ElementManager) Impulse ¶
func (em *ElementManager) Impulse(stageID string)
func (*ElementManager) InputForBundle ¶
func (em *ElementManager) InputForBundle(rb RunBundle, info PColInfo) [][]byte
InputForBundle returns pre-allocated data for the given bundle, encoding the elements using the pcollection's coders.
func (*ElementManager) PersistBundle ¶
func (em *ElementManager) PersistBundle(rb RunBundle, col2Coders map[string]PColInfo, d TentativeData, inputInfo PColInfo, residuals [][]byte)
PersistBundle updates the watermarks for the stage. Each stage has two monotonically increasing watermarks, the input watermark, and the output watermark.
MAX(CurrentInputWatermark, MIN(PendingElements, InputPCollectionWatermarks) MAX(CurrentOutputWatermark, MIN(InputWatermark, WatermarkHolds))
PersistBundle takes in the stage ID, ID of the bundle associated with the pending input elements, and the committed output elements.
type PColInfo ¶
type PColInfo struct { GlobalID string WDec exec.WindowDecoder WEnc exec.WindowEncoder EDec func(io.Reader) []byte }
type TentativeData ¶
func (*TentativeData) WriteData ¶
func (d *TentativeData) WriteData(colID string, data []byte)
WriteData adds data to a given global collectionID.