cockroach: Index | Files

package flowinfra

import ""


Package Files

flow.go flow_registry.go flow_scheduler.go inbound.go outbox.go stats.pb.go stream_decoder.go stream_encoder.go testing_knobs.go


const (
    FlowNotStarted flowStatus = iota

Flow status indicators.

const PreferredEncoding = descpb.DatumEncoding_ASCENDING_KEY

PreferredEncoding is the encoding used for EncDatums that don't already have an encoding available.


var (
    ErrInvalidLengthStats = fmt.Errorf("proto: negative length found during unmarshaling")
    ErrIntOverflowStats   = fmt.Errorf("proto: integer overflow")
var SettingFlowStreamTimeout = settings.RegisterNonNegativeDurationSetting(
    "amount of time incoming streams wait for a flow to be set up before erroring out",

SettingFlowStreamTimeout is a cluster setting that sets the default flow stream timeout.

func IsFlowRetryableError Uses

func IsFlowRetryableError(e error) bool

IsFlowRetryableError returns true if an error represents a retryable flow error.

type Flow Uses

type Flow interface {
    // Setup sets up all the infrastructure for the flow as defined by the flow
    // spec. The flow will then need to be started and run. A new context (along
    // with a context cancellation function) is derived. The new context must be
    // used when running a flow so that all components running in their own
    // goroutines could listen for a cancellation on the same context.
    Setup(ctx context.Context, spec *execinfrapb.FlowSpec, opt FuseOpt) (context.Context, error)

    // SetTxn is used to provide the transaction in which the flow will run.
    // It needs to be called after Setup() and before Start/Run.

    // Start starts the flow. Processors run asynchronously in their own goroutines.
    // Wait() needs to be called to wait for the flow to finish.
    // See Run() for a synchronous version.
    // Generally if errors are encountered during the setup part, they're returned.
    // But if the flow is a synchronous one, then no error is returned; instead the
    // setup error is pushed to the syncFlowConsumer. In this case, a subsequent
    // call to f.Wait() will not block.
    Start(_ context.Context, doneFn func()) error

    // Run runs the flow to completion. The last processor is run in the current
    // goroutine; others may run in different goroutines depending on how the flow
    // was configured.
    // f.Wait() is called internally, so the call blocks until all the flow's
    // goroutines are done.
    // The caller needs to call f.Cleanup().
    Run(_ context.Context, doneFn func()) error

    // Wait waits for all the goroutines for this flow to exit. If the context gets
    // canceled before all goroutines exit, it calls f.cancel().

    // IsLocal returns whether this flow does not have any remote execution.
    IsLocal() bool

    // IsVectorized returns whether this flow will run with vectorized execution.
    IsVectorized() bool

    // GetFlowCtx returns the flow context of this flow.
    GetFlowCtx() *execinfra.FlowCtx

    // AddStartable accumulates a Startable object.

    // GetID returns the flow ID.
    GetID() execinfrapb.FlowID

    // Cleanup should be called when the flow completes (after all processors and
    // mailboxes exited).

    // ConcurrentExecution returns true if multiple processors/operators in the
    // flow will execute concurrently (i.e. if not all of them have been fused).
    // Can only be called after Setup().
    ConcurrentExecution() bool

Flow represents a flow which consists of processors and streams.

type FlowBase Uses

type FlowBase struct {
    // contains filtered or unexported fields

FlowBase is the shared logic between row based and vectorized flows. It implements Flow interface for convenience and for usage in tests, but if FlowBase.Setup is called, it'll panic.

func NewFlowBase Uses

func NewFlowBase(
    flowCtx execinfra.FlowCtx,
    flowReg *FlowRegistry,
    syncFlowConsumer execinfra.RowReceiver,
    localProcessors []execinfra.LocalProcessor,
) *FlowBase

NewFlowBase creates a new FlowBase.

func (*FlowBase) AddRemoteStream Uses

func (f *FlowBase) AddRemoteStream(streamID execinfrapb.StreamID, streamInfo *InboundStreamInfo)

AddRemoteStream adds a remote stream to this flow.

func (*FlowBase) AddStartable Uses

func (f *FlowBase) AddStartable(s Startable)

AddStartable is part of the Flow interface.

func (*FlowBase) CheckInboundStreamID Uses

func (f *FlowBase) CheckInboundStreamID(sid execinfrapb.StreamID) error

CheckInboundStreamID takes a stream ID and returns an error if an inbound stream already exists with that ID in the inbound streams map, creating the inbound streams map if it is nil.

func (*FlowBase) Cleanup Uses

func (f *FlowBase) Cleanup(ctx context.Context)

Cleanup is part of the Flow interface. NOTE: this implements only the shared clean up logic between row-based and vectorized flows.

func (*FlowBase) ConcurrentExecution Uses

func (f *FlowBase) ConcurrentExecution() bool

ConcurrentExecution is part of the Flow interface.

func (*FlowBase) GetCancelFlowFn Uses

func (f *FlowBase) GetCancelFlowFn() context.CancelFunc

GetCancelFlowFn returns the context cancellation function of the context of this flow.

func (*FlowBase) GetCtxDone Uses

func (f *FlowBase) GetCtxDone() <-chan struct{}

GetCtxDone returns done channel of the context of this flow.

func (*FlowBase) GetFlowCtx Uses

func (f *FlowBase) GetFlowCtx() *execinfra.FlowCtx

GetFlowCtx is part of the Flow interface.

func (*FlowBase) GetID Uses

func (f *FlowBase) GetID() execinfrapb.FlowID

GetID is part of the Flow interface.

func (*FlowBase) GetLocalProcessors Uses

func (f *FlowBase) GetLocalProcessors() []execinfra.LocalProcessor

GetLocalProcessors return the execinfra.LocalProcessors of this flow.

func (*FlowBase) GetSyncFlowConsumer Uses

func (f *FlowBase) GetSyncFlowConsumer() execinfra.RowReceiver

GetSyncFlowConsumer returns the special syncFlowConsumer outbox.

func (*FlowBase) GetWaitGroup Uses

func (f *FlowBase) GetWaitGroup() *sync.WaitGroup

GetWaitGroup returns the wait group of this flow.

func (*FlowBase) IsLocal Uses

func (f *FlowBase) IsLocal() bool

IsLocal returns whether this flow does not have any remote execution.

func (*FlowBase) IsVectorized Uses

func (f *FlowBase) IsVectorized() bool

IsVectorized returns whether this flow will run with vectorized execution.

func (*FlowBase) Run Uses

func (f *FlowBase) Run(ctx context.Context, doneFn func()) error

Run is part of the Flow interface.

func (*FlowBase) SetProcessors Uses

func (f *FlowBase) SetProcessors(processors []execinfra.Processor)

SetProcessors overrides the current f.processors with the provided processors. This is used to set up the vectorized flow.

func (*FlowBase) SetTxn Uses

func (f *FlowBase) SetTxn(txn *kv.Txn)

SetTxn is part of the Flow interface.

func (*FlowBase) Setup Uses

func (f *FlowBase) Setup(
    ctx context.Context, spec *execinfrapb.FlowSpec, _ FuseOpt,
) (context.Context, error)

Setup is part of the Flow interface.

func (*FlowBase) Start Uses

func (f *FlowBase) Start(ctx context.Context, doneFn func()) error

Start is part of the Flow interface.

func (*FlowBase) Wait Uses

func (f *FlowBase) Wait()

Wait is part of the Flow interface.

type FlowRegistry Uses

type FlowRegistry struct {
    // contains filtered or unexported fields

FlowRegistry allows clients to look up flows by ID and to wait for flows to be registered. Multiple clients can wait concurrently for the same flow.

func NewFlowRegistry Uses

func NewFlowRegistry(instID base.SQLInstanceID) *FlowRegistry

NewFlowRegistry creates a new FlowRegistry.

instID is the ID of the current node. Used for debugging; pass 0 if you don't care.

func (*FlowRegistry) ConnectInboundStream Uses

func (fr *FlowRegistry) ConnectInboundStream(
    ctx context.Context,
    flowID execinfrapb.FlowID,
    streamID execinfrapb.StreamID,
    stream execinfrapb.DistSQL_FlowStreamServer,
    timeout time.Duration,
) (_ *FlowBase, _ InboundStreamHandler, _ func(), retErr error)

ConnectInboundStream finds the InboundStreamInfo for the given <flowID,streamID> pair and marks it as connected. It waits up to timeout for the stream to be registered with the registry. It also sends the handshake messages to the producer of the stream.

stream is the inbound stream.

It returns the Flow that the stream is connecting to, the receiver that the stream must push data to and a cleanup function that must be called to unregister the flow from the registry after all the data has been pushed.

The cleanup function will decrement the flow's WaitGroup, so that Flow.Wait() is not blocked on this stream any more. In case an error is returned, the cleanup function is nil, the Flow is not considered connected and is not cleaned up.

func (*FlowRegistry) Drain Uses

func (fr *FlowRegistry) Drain(
    flowDrainWait time.Duration, minFlowDrainWait time.Duration, reporter func(int, string),

Drain waits at most flowDrainWait for currently running flows to finish and at least minFlowDrainWait for any incoming flows to be registered. If there are still flows active after flowDrainWait, Drain waits an extra expectedConnectionTime so that any flows that were registered at the end of the time window have a reasonable amount of time to connect to their consumers, thus unblocking them. The FlowRegistry rejects any new flows once it has finished draining.

Note that since local flows are not added to the registry, they are not waited for. However, this is fine since there should be no local flows running when the FlowRegistry drains as the draining logic starts with draining all client connections to a node.

The reporter callback, if non-nil, is called on a best effort basis to report work that needed to be done and which may or may not have been done by the time this call returns. See the explanation in pkg/server/drain.go for details.

func (*FlowRegistry) RegisterFlow Uses

func (fr *FlowRegistry) RegisterFlow(
    ctx context.Context,
    id execinfrapb.FlowID,
    f *FlowBase,
    inboundStreams map[execinfrapb.StreamID]*InboundStreamInfo,
    timeout time.Duration,
) (retErr error)

RegisterFlow makes a flow accessible to ConnectInboundStream. Any concurrent ConnectInboundStream calls that are waiting for this flow are woken up.

It is expected that UnregisterFlow will be called at some point to remove the flow from the registry.

inboundStreams are all the remote streams that will be connected into this flow. If any of them is not connected within timeout, errors are propagated. The inboundStreams are expected to have been initialized with their WaitGroups (the group should have been incremented). RegisterFlow takes responsibility for calling Done() on that WaitGroup; this responsibility will be forwarded forward by ConnectInboundStream. In case this method returns an error, the WaitGroup will be decremented.

func (*FlowRegistry) Undrain Uses

func (fr *FlowRegistry) Undrain()

Undrain causes the FlowRegistry to start accepting flows again.

func (*FlowRegistry) UnregisterFlow Uses

func (fr *FlowRegistry) UnregisterFlow(id execinfrapb.FlowID)

UnregisterFlow removes a flow from the registry. Any subsequent ConnectInboundStream calls for the flow will fail to find it and time out.

type FlowScheduler Uses

type FlowScheduler struct {
    // contains filtered or unexported fields

FlowScheduler manages running flows and decides when to queue and when to start flows. The main interface it presents is ScheduleFlows, which passes a flow to be run.

func NewFlowScheduler Uses

func NewFlowScheduler(
    ambient log.AmbientContext,
    stopper *stop.Stopper,
    settings *cluster.Settings,
    metrics *execinfra.DistSQLMetrics,
) *FlowScheduler

NewFlowScheduler creates a new FlowScheduler.

func (*FlowScheduler) ScheduleFlow Uses

func (fs *FlowScheduler) ScheduleFlow(ctx context.Context, f Flow) error

ScheduleFlow is the main interface of the flow scheduler: it runs or enqueues the given flow.

If the flow can start immediately, errors encountered when starting the flow are returned. If the flow is enqueued, these error will be later ignored.

func (*FlowScheduler) Start Uses

func (fs *FlowScheduler) Start()

Start launches the main loop of the scheduler.

type FuseOpt Uses

type FuseOpt bool

FuseOpt specifies options for processor fusing at Flow.Setup() time.

const (
    // FuseNormally means fuse what you can, but don't serialize unordered input
    // synchronizers.
    FuseNormally FuseOpt = false
    // FuseAggressively means serialize unordered input synchronizers.
    // This is useful for flows that might have mutations which can't have any
    // concurrency.
    FuseAggressively = true

type InboundStreamHandler Uses

type InboundStreamHandler interface {
    // run is called once a FlowStream RPC is handled and a stream is obtained to
    // make this stream accessible to the rest of the flow.
        ctx context.Context, stream execinfrapb.DistSQL_FlowStreamServer, firstMsg *execinfrapb.ProducerMessage, f *FlowBase,
    ) error
    // timeout is called with an error, which results in the teardown of the
    // stream strategy with the given error.
    // WARNING: timeout may block.
    Timeout(err error)

InboundStreamHandler is a handler of an inbound stream.

type InboundStreamInfo Uses

type InboundStreamInfo struct {
    // contains filtered or unexported fields

InboundStreamInfo represents the endpoint where a data stream from another node connects to a flow. The external node initiates this process through a FlowStream RPC, which uses (*Flow).connectInboundStream() to associate the stream to a receiver to push rows to.

func NewInboundStreamInfo Uses

func NewInboundStreamInfo(
    receiver InboundStreamHandler, waitGroup *sync.WaitGroup,
) *InboundStreamInfo

NewInboundStreamInfo returns a new InboundStreamInfo.

type Outbox Uses

type Outbox struct {
    // RowChannel implements the RowReceiver interface.
    // contains filtered or unexported fields

Outbox implements an outgoing mailbox as a RowReceiver that receives rows and sends them to a gRPC stream. Its core logic runs in a goroutine. We send rows when we accumulate outboxBufRows or every outboxFlushPeriod (whichever comes first).

func NewOutbox Uses

func NewOutbox(
    flowCtx *execinfra.FlowCtx,
    nodeID roachpb.NodeID,
    flowID execinfrapb.FlowID,
    streamID execinfrapb.StreamID,
) *Outbox

NewOutbox creates a new Outbox.

func NewOutboxSyncFlowStream Uses

func NewOutboxSyncFlowStream(stream execinfrapb.DistSQL_RunSyncFlowServer) *Outbox

NewOutboxSyncFlowStream sets up an outbox for the special "sync flow" stream. The flow context should be provided via SetFlowCtx when it is available.

func (*Outbox) Err Uses

func (m *Outbox) Err() error

Err returns the error (if any occurred) while Outbox was running.

func (*Outbox) Init Uses

func (m *Outbox) Init(typs []*types.T)

Init initializes the Outbox.

func (*Outbox) SetFlowCtx Uses

func (m *Outbox) SetFlowCtx(flowCtx *execinfra.FlowCtx)

SetFlowCtx sets the flow context for the Outbox.

func (*Outbox) Start Uses

func (m *Outbox) Start(ctx context.Context, wg *sync.WaitGroup, flowCtxCancel context.CancelFunc)

Start starts the outbox.

type OutboxStats Uses

type OutboxStats struct {
    BytesSent int64 `protobuf:"varint,1,opt,name=bytes_sent,json=bytesSent,proto3" json:"bytes_sent,omitempty"`

OutboxStats are the stats collected by an Outbox.

func (*OutboxStats) Descriptor Uses

func (*OutboxStats) Descriptor() ([]byte, []int)

func (*OutboxStats) Marshal Uses

func (m *OutboxStats) Marshal() (dAtA []byte, err error)

func (*OutboxStats) MarshalTo Uses

func (m *OutboxStats) MarshalTo(dAtA []byte) (int, error)

func (*OutboxStats) ProtoMessage Uses

func (*OutboxStats) ProtoMessage()

func (*OutboxStats) Reset Uses

func (m *OutboxStats) Reset()

func (*OutboxStats) Size Uses

func (m *OutboxStats) Size() (n int)

func (*OutboxStats) Stats Uses

func (os *OutboxStats) Stats() map[string]string

Stats implements the SpanStats interface.

func (*OutboxStats) StatsForQueryPlan Uses

func (os *OutboxStats) StatsForQueryPlan() []string

StatsForQueryPlan implements the DistSQLSpanStats interface.

func (*OutboxStats) String Uses

func (m *OutboxStats) String() string

func (*OutboxStats) Unmarshal Uses

func (m *OutboxStats) Unmarshal(dAtA []byte) error

func (*OutboxStats) XXX_DiscardUnknown Uses

func (m *OutboxStats) XXX_DiscardUnknown()

func (*OutboxStats) XXX_Marshal Uses

func (m *OutboxStats) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*OutboxStats) XXX_Merge Uses

func (dst *OutboxStats) XXX_Merge(src proto.Message)

func (*OutboxStats) XXX_Size Uses

func (m *OutboxStats) XXX_Size() int

func (*OutboxStats) XXX_Unmarshal Uses

func (m *OutboxStats) XXX_Unmarshal(b []byte) error

type Releasable Uses

type Releasable interface {
    // Release allows this object to be returned to a memory pool. Objects must
    // not be used after Release is called.

Releasable is an interface for objects than can be Released back into a memory pool when finished.

type RowInboundStreamHandler Uses

type RowInboundStreamHandler struct {

RowInboundStreamHandler is an InboundStreamHandler for the row based flow. It is exported since it is the default for the flow infrastructure.

func (RowInboundStreamHandler) Run Uses

func (s RowInboundStreamHandler) Run(
    ctx context.Context,
    stream execinfrapb.DistSQL_FlowStreamServer,
    firstMsg *execinfrapb.ProducerMessage,
    f *FlowBase,
) error

Run is part of the InboundStreamHandler interface.

func (RowInboundStreamHandler) Timeout Uses

func (s RowInboundStreamHandler) Timeout(err error)

Timeout is part of the InboundStreamHandler interface.

type Startable Uses

type Startable interface {
    Start(ctx context.Context, wg *sync.WaitGroup, ctxCancel context.CancelFunc)

Startable is any component that can be started (a router or an outbox).

type StartableFn Uses

type StartableFn func(context.Context, *sync.WaitGroup, context.CancelFunc)

StartableFn is an adapter when a customer function (i.e. a custom goroutine) needs to become Startable.

func (StartableFn) Start Uses

func (f StartableFn) Start(ctx context.Context, wg *sync.WaitGroup, ctxCancel context.CancelFunc)

Start is a part of the Startable interface.

type StreamDecoder Uses

type StreamDecoder struct {
    // contains filtered or unexported fields

StreamDecoder converts a sequence of ProducerMessage to rows and metadata records.

Sample usage:

sd := StreamDecoder{}
var row sqlbase.EncDatumRow
for each message in stream {
    err := sd.AddMessage(msg)
    if err != nil { ... }
    for {
        row, meta, err := sd.GetRow(row)
        if err != nil { ... }
        if row == nil && meta.Empty() {
            // No more rows in this message.
        // Use <row>

AddMessage can be called multiple times before getting the rows, but this will cause data to accumulate internally.

func (*StreamDecoder) AddMessage Uses

func (sd *StreamDecoder) AddMessage(ctx context.Context, msg *execinfrapb.ProducerMessage) error

AddMessage adds the data in a ProducerMessage to the decoder.

The StreamDecoder may keep a reference to msg.Data.RawBytes and msg.Data.Metadata until all the rows in the message are retrieved with GetRow.

If an error is returned, no records have been buffered in the StreamDecoder.

func (*StreamDecoder) GetRow Uses

func (sd *StreamDecoder) GetRow(
    rowBuf sqlbase.EncDatumRow,
) (sqlbase.EncDatumRow, *execinfrapb.ProducerMetadata, error)

GetRow returns a row received in the stream. A row buffer can be provided optionally.

Returns an empty row if there are no more rows received so far.

A decoding error may be returned. Note that these are separate from error coming from the upstream (through ProducerMetadata.Err).

func (*StreamDecoder) Types Uses

func (sd *StreamDecoder) Types() []*types.T

Types returns the types of the columns; can only be used after we received at least one row.

type StreamEncoder Uses

type StreamEncoder struct {
    // contains filtered or unexported fields

StreamEncoder converts EncDatum rows into a sequence of ProducerMessage.

Sample usage:

se := StreamEncoder{}

for {
    for ... {
       err := se.AddRow(...)
    msg := se.FormMessage(nil)
    // Send out message.

func (*StreamEncoder) AddMetadata Uses

func (se *StreamEncoder) AddMetadata(ctx context.Context, meta execinfrapb.ProducerMetadata)

AddMetadata encodes a metadata message. Unlike AddRow(), it cannot fail. This is important for the caller because a failure to encode a piece of metadata (particularly one that contains an error) would not be recoverable.

Metadata records lose their ordering wrt the data rows. The convention is that the StreamDecoder will return them first, before the data rows, thus ensuring that rows produced _after_ an error are not received _before_ the error.

func (*StreamEncoder) AddRow Uses

func (se *StreamEncoder) AddRow(row sqlbase.EncDatumRow) error

AddRow encodes a message.

func (*StreamEncoder) FormMessage Uses

func (se *StreamEncoder) FormMessage(ctx context.Context) *execinfrapb.ProducerMessage

FormMessage populates a message containing the rows added since the last call to FormMessage. The returned ProducerMessage should be treated as immutable.

func (*StreamEncoder) HasHeaderBeenSent Uses

func (se *StreamEncoder) HasHeaderBeenSent() bool

HasHeaderBeenSent returns whether the header has been sent.

func (*StreamEncoder) Init Uses

func (se *StreamEncoder) Init(types []*types.T)

Init initializes the encoder.

func (*StreamEncoder) SetHeaderFields Uses

func (se *StreamEncoder) SetHeaderFields(flowID execinfrapb.FlowID, streamID execinfrapb.StreamID)

SetHeaderFields sets the header fields.

type TestingKnobs Uses

type TestingKnobs struct {
    // FlowRegistryDraining overrides draining state of the registry.
    FlowRegistryDraining func() bool

TestingKnobs are the testing knobs for flowinfra.

func (*TestingKnobs) ModuleTestingKnobs Uses

func (*TestingKnobs) ModuleTestingKnobs()

ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface.

Package flowinfra imports 32 packages (graph) and is imported by 13 packages. Updated 2020-08-12. Refresh now. Tools for package owners.