harness

package
v0.0.0-...-498d591 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 11, 2023 License: Unlicense Imports: 20 Imported by: 0

Documentation

Index

Constants

View Source
const (
	StateWriteAppend writeTypeEnum = 0
	StateWriteClear  writeTypeEnum = 1
)

Variables

Dial is a convenience wrapper over grpc.Dial. It can be overridden to provide a customized dialing behavior.

Functions

func DefaultDial

func DefaultDial(ctx context.Context, endpoint string, timeout time.Duration) (*grpc.ClientConn, error)

DefaultDial is a dialer that specifies an insecure blocking connection with a timeout.

func Main

func Main(ctx context.Context, controlEndpoint string, opts Options, exec ExecFunc) error

Types

type Control

type Control struct {
	// contains filtered or unexported fields
}

func (*Control) GetOrLookupPlan

func (ctrl *Control) GetOrLookupPlan(dc DataContext, unmarshal func(pbd *fnpb.ProcessBundleDescriptor) any) (any, error)

GetOrLookupPlan does a layered check to get an execution plan.

If there's a cached plan available already. If so, we're done. Otherwise we'll need to build a new one from a ProcessBundleDescriptor. We first check the local cache, and if it doesn't exist, we request it from the runner, reducing duplicate requests.

func (*Control) RegisterMonitor

func (ctrl *Control) RegisterMonitor(dc DataContext, monFn Monitor)

func (*Control) RegisterSplitter

func (ctrl *Control) RegisterSplitter(dc DataContext, splitFn Splitter)

type DataChannel

type DataChannel struct {
	// contains filtered or unexported fields
}

DataChannel manages a single gRPC stream over the Data API. Data from multiple bundles can be multiplexed over this stream. Data is pushed over the channel, so data for a reader may arrive before the reader connects. Thread-safe.

func (*DataChannel) OpenElementChan

func (c *DataChannel) OpenElementChan(ctx context.Context, ptransformID string, instID instructionID, expectedTimerTransforms []string) (<-chan Elements, error)

OpenElementChan returns a channel of typex.Elements for the given instruction and ptransform.

func (*DataChannel) OpenTimerWrite

func (c *DataChannel) OpenTimerWrite(ctx context.Context, ptransformID string, instID instructionID, family string) io.WriteCloser

OpenTimerWrite returns io.WriteCloser for the given timerFamilyID, instruction and ptransform.

func (*DataChannel) OpenWrite

func (c *DataChannel) OpenWrite(ctx context.Context, ptransformID string, instID instructionID) io.WriteCloser

OpenWrite returns an io.WriteCloser of the data elements for the given instruction and ptransform.

type DataChannelManager

type DataChannelManager struct {
	// contains filtered or unexported fields
}

DataChannelManager manages data channels over the Data API. A fixed number of channels are generally used, each managing multiple logical byte streams. Thread-safe.

func (*DataChannelManager) Open

func (m *DataChannelManager) Open(ctx context.Context, port Port) (*DataChannel, error)

Open opens a R/W DataChannel over the given port.

type DataContext

type DataContext struct {
	Data  *ScopedDataManager
	State *ScopedStateManager
	// contains filtered or unexported fields
}

DataContext holds connectors to various data connections, incl. state and side input.

type Elements

type Elements struct {
	Data, Timers                []byte
	TimerFamilyID, PtransformID string
}

Elements holds data or timers sent across the data channel. If TimerFamilyID is populated, it's a timer, otherwise it's data elements.

type Monitor

type Monitor func() (map[string]*pipepb.MonitoringInfo, map[string][]byte)

Monitor is a function that returns any new labels, and the set of payloads being returned to the runner.

type NextBuffer

type NextBuffer interface {
	NextBuf() ([]byte, error)
	Close() error
}

type Options

type Options struct {
	RunnerCapabilities []string // URNs for what runners are able to understand over the FnAPI.
	LoggingEndpoint    string   // Endpoint for remote logging.
	StatusEndpoint     string   // Endpoint for worker status reporting.
}

Options for harness.Main that affect execution of the harness, such as runner capabilities.

type Port

type Port struct {
	URL string
}

Port represents the connection port of external operations.

type ScopedDataManager

type ScopedDataManager struct {
	// contains filtered or unexported fields
}

ScopedDataManager scopes the global gRPC data manager to a single instruction. The indirection makes it easier to control access.

func NewScopedDataManager

func NewScopedDataManager(mgr *DataChannelManager, instID instructionID) *ScopedDataManager

NewScopedDataManager returns a ScopedDataManager for the given instruction.

func (*ScopedDataManager) Close

func (s *ScopedDataManager) Close() error

Close prevents new IO for this instruction.

func (*ScopedDataManager) OpenElementChan

func (s *ScopedDataManager) OpenElementChan(ctx context.Context, id StreamID, expectedTimerTransforms []string) (<-chan Elements, error)

OpenElementChan returns a channel of Elements on the given stream.

func (*ScopedDataManager) OpenTimerWrite

func (s *ScopedDataManager) OpenTimerWrite(ctx context.Context, id StreamID, family string) (io.WriteCloser, error)

OpenTimerWrite opens an io.WriteCloser on the given stream to write timers

func (*ScopedDataManager) OpenWrite

func (s *ScopedDataManager) OpenWrite(ctx context.Context, id StreamID) (io.WriteCloser, error)

OpenWrite opens an io.WriteCloser on the given stream.

type ScopedStateManager

type ScopedStateManager struct {
	// contains filtered or unexported fields
}

ScopedStateManager scopes the global gRPC state manager to a single instruction for side input use. The indirection makes it easier to control access.

func NewScopedStateManager

func NewScopedStateManager(mgr *StateChannelManager, instID instructionID) *ScopedStateManager

NewScopedStateManager returns a ScopedStateReader for the given instruction.

func (*ScopedStateManager) Close

func (s *ScopedStateManager) Close() error

Close closes all open readers.

func (*ScopedStateManager) OpenReader

func (s *ScopedStateManager) OpenReader(ctx context.Context, url string, key *fnpb.StateKey) (NextBuffer, error)

func (*ScopedStateManager) OpenWriter

func (s *ScopedStateManager) OpenWriter(ctx context.Context, url string, key *fnpb.StateKey, wt writeTypeEnum) (io.Writer, error)

type Splitter

Splitter is a function that requests a split from a given set of plans.

type StateChannel

type StateChannel struct {
	DoneCh <-chan struct{}
	// contains filtered or unexported fields
}

StateChannel manages state transactions over a single gRPC connection. It does not need to track readers and writers as carefully as the DataChannel, because the state protocol is request-based.

func (*StateChannel) Send

Send sends a state request and returns the response.

type StateChannelManager

type StateChannelManager struct {
	// contains filtered or unexported fields
}

StateChannelManager manages data channels over the State API. A fixed number of channels are generally used, each managing multiple logical byte streams. Thread-safe.

func (*StateChannelManager) Open

Open opens a R/W StateChannel over the given port.

type StreamID

type StreamID struct {
	Port         Port
	PtransformID string
}

StreamID represents the static information needed to identify a data stream. Dynamic information, notably bundleID, is provided implicitly by the managers.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL