cockroach: github.com/cockroachdb/cockroach/pkg/sql/execinfra Index | Files

package execinfra

import "github.com/cockroachdb/cockroach/pkg/sql/execinfra"

Index

Package Files

base.go consumerstatus_string.go flow_context.go metadata_test_receiver.go metadata_test_sender.go metrics.go operator.go processorsbase.go procstate_string.go readerbase.go scanbase.go server_config.go testutils.go

Constants

const (
    // StateRunning is the common state of a processor: it's producing rows for
    // its consumer and forwarding metadata from its input. Different processors
    // might have sub-states internally.
    //
    // If the consumer calls ConsumerDone or if the ProcOutputHelper.maxRowIdx is
    // reached, then the processor will transition to StateDraining. If the input
    // is exhausted, then the processor can transition to StateTrailingMeta
    // directly, although most always go through StateDraining.
    StateRunning procState = iota

    // StateDraining is the state in which the processor is forwarding metadata
    // from its input and otherwise ignoring all rows. Once the input is
    // exhausted, the processor will transition to StateTrailingMeta.
    //
    // In StateDraining, processors are required to swallow
    // ReadWithinUncertaintyIntervalErrors received from its sources. We're
    // already draining, so we don't care about whatever data generated this
    // uncertainty error. Besides generally seeming like a good idea, doing this
    // allows us to offer a nice guarantee to SQL clients: a read-only query that
    // produces at most one row, run as an implicit txn, never produces retriable
    // errors, regardless of the size of the row being returned (in relation to
    // the size of the result buffer on the connection). One would naively expect
    // that to be true: either the error happens before any rows have been
    // delivered to the client, in which case the auto-retries kick in, or, if a
    // row has been delivered, then the query is done and so how can there be an
    // error? What our naive friend is ignoring is that, if it weren't for this
    // code, it'd be possible for a retriable error to sneak in after the query's
    // limit has been satisfied but while processors are still draining. Note
    // that uncertainty errors are not retried automatically by the leaf
    // TxnCoordSenders (i.e. by refresh txn interceptor).
    //
    // Other categories of errors might be safe to ignore too; however we
    // can't ignore all of them. Generally, we need to ensure that all the
    // trailing metadata (e.g. LeafTxnFinalState's) make it to the gateway for
    // successful flows. If an error is telling us that some metadata might
    // have been dropped, we can't ignore that.
    StateDraining

    // StateTrailingMeta is the state in which the processor is outputting final
    // metadata such as the tracing information or the LeafTxnFinalState. Once all the
    // trailing metadata has been produced, the processor transitions to
    // StateExhausted.
    StateTrailingMeta

    // StateExhausted is the state of a processor that has no more rows or
    // metadata to produce.
    StateExhausted
)

go:generate stringer -type=procState

const (
    ScanVisibilityPublic             = execinfrapb.ScanVisibility_PUBLIC
    ScanVisibilityPublicAndNotPublic = execinfrapb.ScanVisibility_PUBLIC_AND_NOT_PUBLIC
)

Prettier aliases for execinfrapb.ScanVisibility values.

const MinAcceptedVersion execinfrapb.DistSQLVersion = 30

MinAcceptedVersion is the oldest version that the server is compatible with; see above.

const RowChannelBufSize = 16

RowChannelBufSize is the default buffer size of a RowChannel.

const StaticNodeID = roachpb.NodeID(3)

StaticNodeID is the default Node ID to be used in tests.

const Version execinfrapb.DistSQLVersion = 31

Version identifies the distsql protocol version.

This version is separate from the main CockroachDB version numbering; it is only changed when the distsql API changes.

The planner populates the version in SetupFlowRequest. A server only accepts requests with versions in the range MinAcceptedVersion to Version.

Is is possible used to provide a "window" of compatibility when new features are added. Example:

- we start with Version=1; distsql servers with version 1 only accept
  requests with version 1.
- a new distsql feature is added; Version is bumped to 2. The
  planner does not yet use this feature by default; it still issues
  requests with version 1.
- MinAcceptedVersion is still 1, i.e. servers with version 2
  accept both versions 1 and 2.
- after an upgrade cycle, we can enable the feature in the planner,
  requiring version 2.
- at some later point, we can choose to deprecate version 1 and have
  servers only accept versions >= 2 (by setting
  MinAcceptedVersion to 2).

ATTENTION: When updating these fields, add to version_history.txt explaining what changed.

Variables

var SettingWorkMemBytes = settings.RegisterByteSizeSetting(
    "sql.distsql.temp_storage.workmem",
    "maximum amount of memory in bytes a processor can use before falling back to temp storage",
    64*1024*1024,
)

SettingWorkMemBytes is a cluster setting that determines the maximum amount of RAM that a processor can use.

func DrainAndClose Uses

func DrainAndClose(
    ctx context.Context,
    dst RowReceiver,
    cause error,
    pushTrailingMeta func(context.Context),
    srcs ...RowSource,
)

DrainAndClose is a version of DrainAndForwardMetadata that drains multiple sources. These sources are assumed to be the only producers left for dst, so dst is closed once they're all exhausted (this is different from DrainAndForwardMetadata).

If cause is specified, it is forwarded to the consumer before all the drain metadata. This is intended to have been the error, if any, that caused the draining.

pushTrailingMeta is called after draining the sources and before calling dst.ProducerDone(). It gives the caller the opportunity to push some trailing metadata (e.g. tracing information and txn updates, if applicable).

srcs can be nil.

All errors are forwarded to the producer.

func DrainAndForwardMetadata Uses

func DrainAndForwardMetadata(ctx context.Context, src RowSource, dst RowReceiver)

DrainAndForwardMetadata calls src.ConsumerDone() (thus asking src for draining metadata) and then forwards all the metadata to dst.

When this returns, src has been properly closed (regardless of the presence or absence of an error). dst, however, has not been closed; someone else must call dst.ProducerDone() when all producers have finished draining.

It is OK to call DrainAndForwardMetadata() multiple times concurrently on the same dst (as RowReceiver.Push() is guaranteed to be thread safe).

func GenerateValuesSpec Uses

func GenerateValuesSpec(
    colTypes []*types.T, rows sqlbase.EncDatumRows, rowsPerChunk int,
) (execinfrapb.ValuesCoreSpec, error)

GenerateValuesSpec generates a ValuesCoreSpec that encodes the given rows. We pass the types as well because zero rows are allowed.

func GetLeafTxnFinalState Uses

func GetLeafTxnFinalState(ctx context.Context, txn *kv.Txn) *roachpb.LeafTxnFinalState

GetLeafTxnFinalState returns the txn metadata from a transaction if it is present and the transaction is a leaf transaction. It returns nil when called on a Root. This is done as a convenience allowing DistSQL processors to be oblivious about whether they're running in a Leaf or a Root.

NOTE(andrei): As of 04/2018, the txn is shared by all processors scheduled on a node, and so it's possible for multiple processors to send the same LeafTxnFinalState. The root TxnCoordSender doesn't care if it receives the same thing multiple times.

func GetTraceData Uses

func GetTraceData(ctx context.Context) []tracing.RecordedSpan

GetTraceData returns the trace data.

func GetWorkMemLimit Uses

func GetWorkMemLimit(config *ServerConfig) int64

GetWorkMemLimit returns the number of bytes determining the amount of RAM available to a single processor or operator.

func LimitHint Uses

func LimitHint(specLimitHint int64, post *execinfrapb.PostProcessSpec) (limitHint int64)

LimitHint returns the limit hint to set for a KVFetcher based on the spec's limit hint and the PostProcessSpec.

func MisplannedRanges Uses

func MisplannedRanges(
    ctx context.Context,
    spans []roachpb.Span,
    nodeID roachpb.NodeID,
    rdc *kvcoord.RangeDescriptorCache,
) (misplannedRanges []roachpb.RangeInfo)

MisplannedRanges queries the range cache for all the passed-in spans and returns the list of ranges whose leaseholder is not on the indicated node. Ranges with unknown leases are not included in the result.

func NewLimitedMonitor Uses

func NewLimitedMonitor(
    ctx context.Context, parent *mon.BytesMonitor, config *ServerConfig, name string,
) *mon.BytesMonitor

NewLimitedMonitor is a utility function used by processors to create a new limited memory monitor with the given name and start it. The returned monitor must be closed. The limit is determined by SettingWorkMemBytes but overridden to 1 if config.TestingKnobs.ForceDiskSpill is set or config.TestingKnobs.MemoryLimitBytes if not.

func NewMonitor Uses

func NewMonitor(ctx context.Context, parent *mon.BytesMonitor, name string) *mon.BytesMonitor

NewMonitor is a utility function used by processors to create a new memory monitor with the given name and start it. The returned monitor must be closed.

func NewTestDiskMonitor Uses

func NewTestDiskMonitor(ctx context.Context, st *cluster.Settings) *mon.BytesMonitor

NewTestDiskMonitor creates and starts a new disk monitor to be used in tests.

func NewTestMemMonitor Uses

func NewTestMemMonitor(ctx context.Context, st *cluster.Settings) *mon.BytesMonitor

NewTestMemMonitor creates and starts a new memory monitor to be used in tests. TODO(yuzefovich): consider reusing this in tree.MakeTestingEvalContext (currently it would create an import cycle, so this code will need to be moved).

func ProcessorSpan Uses

func ProcessorSpan(ctx context.Context, name string) (context.Context, opentracing.Span)

ProcessorSpan creates a child span for a processor (if we are doing any tracing). The returned span needs to be finished using tracing.FinishSpan.

func Run Uses

func Run(ctx context.Context, src RowSource, dst RowReceiver)

Run reads records from the source and outputs them to the receiver, properly draining the source of metadata and closing both the source and receiver.

src needs to have been Start()ed before calling this.

func SendTraceData Uses

func SendTraceData(ctx context.Context, dst RowReceiver)

SendTraceData collects the tracing information from the ctx and pushes it to dst. The ConsumerStatus returned by dst is ignored.

Note that the tracing data is distinct between different processors, since each one gets its own trace "recording group".

type ConsumerStatus Uses

type ConsumerStatus uint32

ConsumerStatus is the type returned by RowReceiver.Push(), informing a producer of a consumer's state.

const (
    // NeedMoreRows indicates that the consumer is still expecting more rows.
    NeedMoreRows ConsumerStatus = iota
    // DrainRequested indicates that the consumer will not process any more data
    // rows, but will accept trailing metadata from the producer.
    DrainRequested
    // ConsumerClosed indicates that the consumer will not process any more data
    // rows or metadata. This is also commonly returned in case the consumer has
    // encountered an error.
    ConsumerClosed
)

func (ConsumerStatus) String Uses

func (i ConsumerStatus) String() string

type DistSQLMetrics Uses

type DistSQLMetrics struct {
    QueriesActive     *metric.Gauge
    QueriesTotal      *metric.Counter
    FlowsActive       *metric.Gauge
    FlowsTotal        *metric.Counter
    FlowsQueued       *metric.Gauge
    QueueWaitHist     *metric.Histogram
    MaxBytesHist      *metric.Histogram
    CurBytesCount     *metric.Gauge
    VecOpenFDs        *metric.Gauge
    CurDiskBytesCount *metric.Gauge
    MaxDiskBytesHist  *metric.Histogram
}

DistSQLMetrics contains pointers to the metrics for monitoring DistSQL processing.

func MakeDistSQLMetrics Uses

func MakeDistSQLMetrics(histogramWindow time.Duration) DistSQLMetrics

MakeDistSQLMetrics instantiates the metrics holder for DistSQL monitoring.

func (*DistSQLMetrics) FlowStart Uses

func (m *DistSQLMetrics) FlowStart()

FlowStart registers the start of a new DistSQL flow.

func (*DistSQLMetrics) FlowStop Uses

func (m *DistSQLMetrics) FlowStop()

FlowStop registers the end of a DistSQL flow.

func (DistSQLMetrics) MetricStruct Uses

func (DistSQLMetrics) MetricStruct()

MetricStruct implements the metrics.Struct interface.

func (*DistSQLMetrics) QueryStart Uses

func (m *DistSQLMetrics) QueryStart()

QueryStart registers the start of a new DistSQL query.

func (*DistSQLMetrics) QueryStop Uses

func (m *DistSQLMetrics) QueryStop()

QueryStop registers the end of a DistSQL query.

type FlowCtx Uses

type FlowCtx struct {
    log.AmbientContext

    Cfg *ServerConfig

    // ID is a unique identifier for a remote flow. It is mainly used as a key
    // into the flowRegistry. Since local flows do not need to exist in the flow
    // registry (no inbound stream connections need to be performed), they are not
    // assigned ids. This is done for performance reasons, as local flows are
    // more likely to be dominated by setup time.
    ID  execinfrapb.FlowID

    // EvalCtx is used by all the processors in the flow to evaluate expressions.
    // Processors that intend to evaluate expressions with this EvalCtx should
    // get a copy with NewEvalCtx instead of storing a pointer to this one
    // directly (since some processor mutate the EvalContext they use).
    //
    // TODO(andrei): Get rid of this field and pass a non-shared EvalContext to
    // cores of the processors that need it.
    EvalCtx *tree.EvalContext

    // The transaction in which kv operations performed by processors in the flow
    // must be performed. Processors in the Flow will use this txn concurrently.
    // This field is generally not nil, except for flows that don't run in a
    // higher-level txn (like backfills).
    Txn *kv.Txn

    // nodeID is the ID of the node on which the processors using this FlowCtx
    // run.
    NodeID *base.SQLIDContainer

    // TraceKV is true if KV tracing was requested by the session.
    TraceKV bool

    // Local is true if this flow is being run as part of a local-only query.
    Local bool

    // TypeResolverFactory is used to construct transaction bound TypeResolvers
    // to resolve type references during flow setup. It is not safe for concurrent
    // use and is intended to be used only during flow setup and initialization.
    // The TypeResolverFactory is initialized when the FlowContext is created
    // on the gateway node using the planner's descs.Collection and is created
    // on remote nodes with a new descs.Collection. After the flow is complete,
    // all descriptors leased from the factory must be released.
    TypeResolverFactory *descs.DistSQLTypeResolverFactory
}

FlowCtx encompasses the configuration parameters needed for various flow components.

func (*FlowCtx) Codec Uses

func (ctx *FlowCtx) Codec() keys.SQLCodec

Codec returns the SQL codec for this flowCtx.

func (*FlowCtx) NewEvalCtx Uses

func (ctx *FlowCtx) NewEvalCtx() *tree.EvalContext

NewEvalCtx returns a modifiable copy of the FlowCtx's EvalContext. Processors should use this method any time they need to store a pointer to the EvalContext, since processors may mutate the EvalContext. Specifically, every processor that runs ProcOutputHelper.Init must pass in a modifiable EvalContext, since it stores that EvalContext in its exprHelpers and mutates them at runtime to ensure expressions are evaluated with the correct indexed var context.

func (*FlowCtx) Stopper Uses

func (ctx *FlowCtx) Stopper() *stop.Stopper

Stopper returns the stopper for this flowCtx.

func (*FlowCtx) TestingKnobs Uses

func (ctx *FlowCtx) TestingKnobs() TestingKnobs

TestingKnobs returns the distsql testing knobs for this flow context.

type LocalProcessor Uses

type LocalProcessor interface {
    RowSourcedProcessor
    // InitWithOutput initializes this processor.
    InitWithOutput(flowCtx *FlowCtx, post *execinfrapb.PostProcessSpec, output RowReceiver) error
    // SetInput initializes this LocalProcessor with an input RowSource. Not all
    // LocalProcessors need inputs, but this needs to be called if a
    // LocalProcessor expects to get its data from another RowSource.
    SetInput(ctx context.Context, input RowSource) error
}

LocalProcessor is a RowSourcedProcessor that needs to be initialized with its post processing spec and output row receiver. Most processors can accept these objects at creation time.

type MetadataTestLevel Uses

type MetadataTestLevel int

MetadataTestLevel represents the types of queries where metadata test processors are planned.

const (
    // Off represents that no metadata test processors are planned.
    Off MetadataTestLevel = iota
    // NoExplain represents that metadata test processors are planned for all
    // queries except EXPLAIN (DISTSQL) statements.
    NoExplain
    // On represents that metadata test processors are planned for all queries.
    On
)

type MetadataTestReceiver Uses

type MetadataTestReceiver struct {
    ProcessorBase
    // contains filtered or unexported fields
}

MetadataTestReceiver is a Processors that is complimentary to MetadataTestSender which checks that all metadata emitted by latter is received.

func NewMetadataTestReceiver Uses

func NewMetadataTestReceiver(
    flowCtx *FlowCtx,
    processorID int32,
    input RowSource,
    post *execinfrapb.PostProcessSpec,
    output RowReceiver,
    senders []string,
) (*MetadataTestReceiver, error)

NewMetadataTestReceiver creates a new MetadataTestReceiver.

func (*MetadataTestReceiver) ConsumerClosed Uses

func (mtr *MetadataTestReceiver) ConsumerClosed()

ConsumerClosed is part of the RowSource interface.

func (*MetadataTestReceiver) ConsumerDone Uses

func (mtr *MetadataTestReceiver) ConsumerDone()

ConsumerDone is part of the RowSource interface.

func (*MetadataTestReceiver) Next Uses

func (mtr *MetadataTestReceiver) Next() (sqlbase.EncDatumRow, *execinfrapb.ProducerMetadata)

Next is part of the RowSource interface.

This implementation doesn't follow the usual patterns of other processors; it makes more limited use of the ProcessorBase's facilities because it needs to inspect metadata while draining.

func (*MetadataTestReceiver) Start Uses

func (mtr *MetadataTestReceiver) Start(ctx context.Context) context.Context

Start is part of the RowSource interface.

type MetadataTestSender Uses

type MetadataTestSender struct {
    ProcessorBase
    // contains filtered or unexported fields
}

MetadataTestSender intersperses a metadata record after every row.

func NewMetadataTestSender Uses

func NewMetadataTestSender(
    flowCtx *FlowCtx,
    processorID int32,
    input RowSource,
    post *execinfrapb.PostProcessSpec,
    output RowReceiver,
    id string,
) (*MetadataTestSender, error)

NewMetadataTestSender creates a new MetadataTestSender.

func (*MetadataTestSender) ConsumerClosed Uses

func (mts *MetadataTestSender) ConsumerClosed()

ConsumerClosed is part of the RowSource interface.

func (*MetadataTestSender) Next Uses

func (mts *MetadataTestSender) Next() (sqlbase.EncDatumRow, *execinfrapb.ProducerMetadata)

Next is part of the RowSource interface.

func (*MetadataTestSender) Start Uses

func (mts *MetadataTestSender) Start(ctx context.Context) context.Context

Start is part of the RowSource interface.

type NoMetadataRowSource Uses

type NoMetadataRowSource struct {
    // contains filtered or unexported fields
}

NoMetadataRowSource is a wrapper on top of a RowSource that automatically forwards metadata to a RowReceiver. Data rows are returned through an interface similar to RowSource, except that, since metadata is taken care of, only the data rows are returned.

The point of this struct is that it'd be burdensome for some row consumers to have to deal with metadata.

func MakeNoMetadataRowSource Uses

func MakeNoMetadataRowSource(src RowSource, sink RowReceiver) NoMetadataRowSource

MakeNoMetadataRowSource builds a NoMetadataRowSource.

func (*NoMetadataRowSource) NextRow Uses

func (rs *NoMetadataRowSource) NextRow() (sqlbase.EncDatumRow, error)

NextRow is analogous to RowSource.Next. If the producer sends an error, we can't just forward it to metadataSink. We need to let the consumer know so that it's not under the impression that everything is hunky-dory and it can continue consuming rows. So, this interface returns the error. Just like with a raw RowSource, the consumer should generally call ConsumerDone() and drain.

type OpNode Uses

type OpNode interface {
    // ChildCount returns the number of children (inputs) of the operator.
    ChildCount(verbose bool) int

    // Child returns the nth child (input) of the operator.
    Child(nth int, verbose bool) OpNode
}

OpNode is an interface to operator-like structures with children.

type ProcOutputHelper Uses

type ProcOutputHelper struct {
    RowAlloc sqlbase.EncDatumRowAlloc

    // OutputTypes is the schema of the rows produced by the processor after
    // post-processing (i.e. the rows that are pushed through a router).
    //
    // If renderExprs is set, these types correspond to the types of those
    // expressions.
    // If outputCols is set, these types correspond to the types of
    // those columns.
    // If neither is set, this is the internal schema of the processor.
    OutputTypes []*types.T
    // contains filtered or unexported fields
}

ProcOutputHelper is a helper type that performs filtering and projection on the output of a processor.

func (*ProcOutputHelper) Close Uses

func (h *ProcOutputHelper) Close()

Close signals to the output that there will be no more rows.

func (*ProcOutputHelper) EmitRow Uses

func (h *ProcOutputHelper) EmitRow(
    ctx context.Context, row sqlbase.EncDatumRow,
) (ConsumerStatus, error)

EmitRow sends a row through the post-processing stage. The same row can be reused.

It returns the consumer's status that was observed when pushing this row. If an error is returned, it's coming from the ProcOutputHelper's filtering or rendering processing; the output has not been closed and it's the caller's responsibility to push the error to the output.

Note: check out rowexec.emitHelper() for a useful wrapper.

func (*ProcOutputHelper) Init Uses

func (h *ProcOutputHelper) Init(
    post *execinfrapb.PostProcessSpec,
    typs []*types.T,
    semaCtx *tree.SemaContext,
    evalCtx *tree.EvalContext,
    output RowReceiver,
) error

Init sets up a ProcOutputHelper. The types describe the internal schema of the processor (as described for each processor core spec); they can be omitted if there is no filtering expression. Note that the types slice may be stored directly; the caller should not modify it.

func (*ProcOutputHelper) NeededColumns Uses

func (h *ProcOutputHelper) NeededColumns() (colIdxs util.FastIntSet)

NeededColumns calculates the set of internal processor columns that are actually used by the post-processing stage.

func (*ProcOutputHelper) Output Uses

func (h *ProcOutputHelper) Output() RowReceiver

Output returns the output of the ProcOutputHelper.

func (*ProcOutputHelper) ProcessRow Uses

func (h *ProcOutputHelper) ProcessRow(
    ctx context.Context, row sqlbase.EncDatumRow,
) (_ sqlbase.EncDatumRow, moreRowsOK bool, _ error)

ProcessRow sends the invoked row through the post-processing stage and returns the post-processed row. Results from ProcessRow aren't safe past the next call to ProcessRow.

The moreRowsOK retval is true if more rows can be processed, false if the limit has been reached (if there's a limit). Upon seeing a false value, the caller is expected to start draining. Note that both a row and moreRowsOK=false can be returned at the same time: the row that satisfies the limit is returned at the same time as a DrainRequested status. In that case, the caller is supposed to both deal with the row and start draining.

func (*ProcOutputHelper) Reset Uses

func (h *ProcOutputHelper) Reset()

Reset resets this ProcOutputHelper, retaining allocated memory in its slices.

type ProcStateOpts Uses

type ProcStateOpts struct {
    // TrailingMetaCallback, if specified, is a callback to be called by
    // moveToTrailingMeta(). See ProcessorBase.TrailingMetaCallback.
    TrailingMetaCallback func(context.Context) []execinfrapb.ProducerMetadata
    // InputsToDrain, if specified, will be drained by DrainHelper().
    // MoveToDraining() calls ConsumerDone() on them, InternalClose() calls
    // ConsumerClosed() on them.
    InputsToDrain []RowSource
}

ProcStateOpts contains fields used by the ProcessorBase's family of functions that deal with draining and trailing metadata: the ProcessorBase implements generic useful functionality that needs to call back into the Processor.

type Processor Uses

type Processor interface {
    // OutputTypes returns the column types of the results (that are to be fed
    // through an output router).
    OutputTypes() []*types.T

    // Run is the main loop of the processor.
    Run(context.Context)
}

Processor is a common interface implemented by all processors, used by the higher-level flow orchestration code.

type ProcessorBase Uses

type ProcessorBase struct {
    Out     ProcOutputHelper
    FlowCtx *FlowCtx

    // EvalCtx is used for expression evaluation. It overrides the one in flowCtx.
    EvalCtx *tree.EvalContext

    // MemMonitor is the processor's memory monitor.
    MemMonitor *mon.BytesMonitor

    // Closed is set by InternalClose(). Once set, the processor's tracing span
    // has been closed.
    Closed bool

    // Ctx and span contain the tracing state while the processor is active
    // (i.e. hasn't been closed). Initialized using flowCtx.Ctx (which should not be otherwise
    // used).
    Ctx context.Context

    State procState

    // FinishTrace, if set, will be called before getting the trace data from
    // the span and adding the recording to the trailing metadata. Useful for
    // adding any extra information (e.g. stats) that should be captured in a
    // trace.
    FinishTrace func()
    // contains filtered or unexported fields
}

ProcessorBase is supposed to be embedded by Processors. It provides facilities for dealing with filtering and projection (through a ProcOutputHelper) and for implementing the RowSource interface (draining, trailing metadata).

If a Processor implements the RowSource interface, it's implementation is expected to look something like this:

// concatProcessor concatenates rows from two sources (first returns rows
// from the left, then from the right).
type concatProcessor struct {
  ProcessorBase
  l, r RowSource

  // leftConsumed is set once we've exhausted the left input; once set, we start
  // consuming the right input.
  leftConsumed bool
}

func newConcatProcessor(
  FlowCtx *FlowCtx, l RowSource, r RowSource, post *PostProcessSpec, output RowReceiver,
) (*concatProcessor, error) {
  p := &concatProcessor{l: l, r: r}
  if err := p.init(
    post, l.OutputTypes(), FlowCtx, output,
    // We pass the inputs to the helper, to be consumed by DrainHelper() later.
    ProcStateOpts{
      InputsToDrain: []RowSource{l, r},
      // If the proc needed to return any metadata at the end other than the
      // tracing info, or if it needed to cleanup any resources other than those
      // handled by InternalClose() (say, close some memory account), it'd pass
      // a TrailingMetaCallback here.
    },
  ); err != nil {
    return nil, err
  }
  return p, nil
}

// Start is part of the RowSource interface.
func (p *concatProcessor) Start(ctx context.Context) context.Context {
  p.l.Start(ctx)
  p.r.Start(ctx)
  return p.StartInternal(ctx, concatProcName)
}

// Next is part of the RowSource interface.
func (p *concatProcessor) Next() (sqlbase.EncDatumRow, *execinfrapb.ProducerMetadata) {
  // Loop while we haven't produced a row or a metadata record. We loop around
  // in several cases, including when the filtering rejected a row coming.
  for p.State == StateRunning {
    var row sqlbase.EncDatumRow
    var meta *ProducerMetadata
    if !p.leftConsumed {
      row, meta = p.l.Next()
    } else {
      row, meta = p.r.Next()
    }

    if meta != nil {
      // If we got an error, we need to forward it along and remember that we're
      // draining.
      if meta.Err != nil {
        p.MoveToDraining(nil /* err */)
      }
      return nil, meta
    }
    if row == nil {
      if !p.leftConsumed {
        p.leftConsumed = true
      } else {
        // In this case we know that both inputs are consumed, so we could
        // transition directly to StateTrailingMeta, but implementations are
        // encouraged to just use MoveToDraining() for uniformity; DrainHelper()
        // will transition to StateTrailingMeta() quickly.
        p.MoveToDraining(nil /* err */)
        break
      }
      continue
    }

    if outRow := p.ProcessRowHelper(row); outRow != nil {
      return outRow, nil
    }
  }
  return nil, p.DrainHelper()
}

// ConsumerDone is part of the RowSource interface.
func (p *concatProcessor) ConsumerDone() {
  p.MoveToDraining(nil /* err */)
}

// ConsumerClosed is part of the RowSource interface.
func (p *concatProcessor) ConsumerClosed() {
  // The consumer is done, Next() will not be called again.
  p.InternalClose()
}

func (*ProcessorBase) AddInputToDrain Uses

func (pb *ProcessorBase) AddInputToDrain(input RowSource)

AddInputToDrain adds an input to drain when moving the processor to a draining state.

func (*ProcessorBase) AppendTrailingMeta Uses

func (pb *ProcessorBase) AppendTrailingMeta(meta execinfrapb.ProducerMetadata)

AppendTrailingMeta appends metadata to the trailing metadata without changing the state to draining (as opposed to MoveToDraining).

func (*ProcessorBase) ConsumerDone Uses

func (pb *ProcessorBase) ConsumerDone()

ConsumerDone is part of the RowSource interface.

func (*ProcessorBase) DrainHelper Uses

func (pb *ProcessorBase) DrainHelper() *execinfrapb.ProducerMetadata

DrainHelper is supposed to be used in states draining and trailingMetadata. It deals with optionally draining an input and returning trailing meta. It also moves from StateDraining to StateTrailingMeta when appropriate.

func (*ProcessorBase) Init Uses

func (pb *ProcessorBase) Init(
    self RowSource,
    post *execinfrapb.PostProcessSpec,
    types []*types.T,
    flowCtx *FlowCtx,
    processorID int32,
    output RowReceiver,
    memMonitor *mon.BytesMonitor,
    opts ProcStateOpts,
) error

Init initializes the ProcessorBase.

func (*ProcessorBase) InitWithEvalCtx Uses

func (pb *ProcessorBase) InitWithEvalCtx(
    self RowSource,
    post *execinfrapb.PostProcessSpec,
    types []*types.T,
    flowCtx *FlowCtx,
    evalCtx *tree.EvalContext,
    processorID int32,
    output RowReceiver,
    memMonitor *mon.BytesMonitor,
    opts ProcStateOpts,
) error

InitWithEvalCtx initializes the ProcessorBase with a given EvalContext.

func (*ProcessorBase) InternalClose Uses

func (pb *ProcessorBase) InternalClose() bool

InternalClose helps processors implement the RowSource interface, performing common close functionality. Returns true iff the processor was not already closed.

Notably, it calls ConsumerClosed() on all the inputsToDrain.

if pb.InternalClose() {
  // Perform processor specific close work.
}

func (*ProcessorBase) MoveToDraining Uses

func (pb *ProcessorBase) MoveToDraining(err error)

MoveToDraining switches the processor to the StateDraining. Only metadata is returned from now on. In this state, the processor is expected to drain its inputs (commonly by using DrainHelper()).

If the processor has no input (ProcStateOpts.intputToDrain was not specified at init() time), then we move straight to the StateTrailingMeta.

An error can be optionally passed. It will be the first piece of metadata returned by DrainHelper().

func (*ProcessorBase) OutputTypes Uses

func (pb *ProcessorBase) OutputTypes() []*types.T

OutputTypes is part of the processor interface.

func (*ProcessorBase) ProcessRowHelper Uses

func (pb *ProcessorBase) ProcessRowHelper(row sqlbase.EncDatumRow) sqlbase.EncDatumRow

ProcessRowHelper is a wrapper on top of ProcOutputHelper.ProcessRow(). It takes care of handling errors and drain requests by moving the processor to StateDraining.

It takes a row and returns the row after processing. The return value can be nil, in which case the caller shouldn't return anything to its consumer; it should continue processing other rows, with the awareness that the processor might have been transitioned to the draining phase.

func (*ProcessorBase) Reset Uses

func (pb *ProcessorBase) Reset()

Reset resets this ProcessorBase, retaining allocated memory in slices.

func (*ProcessorBase) Run Uses

func (pb *ProcessorBase) Run(ctx context.Context)

Run is part of the processor interface.

func (*ProcessorBase) StartInternal Uses

func (pb *ProcessorBase) StartInternal(ctx context.Context, name string) context.Context

StartInternal prepares the ProcessorBase for execution. It returns the annotated context that's also stored in pb.Ctx.

type ProcessorConstructor Uses

type ProcessorConstructor func(
    ctx context.Context,
    flowCtx *FlowCtx,
    processorID int32,
    core *execinfrapb.ProcessorCoreUnion,
    post *execinfrapb.PostProcessSpec,
    inputs []RowSource,
    outputs []RowReceiver,
    localProcessors []LocalProcessor,
) (Processor, error)

ProcessorConstructor is a function that creates a Processor. It is abstracted away so that we could create mixed flows (i.e. a vectorized flow with wrapped processors) without bringing a dependency on sql/rowexec package into sql/colexec package.

type Releasable Uses

type Releasable interface {
    // Release allows this object to be returned to a memory pool. Objects must
    // not be used after Release is called.
    Release()
}

Releasable is an interface for objects than can be Released back into a memory pool when finished.

type RepeatableRowSource Uses

type RepeatableRowSource struct {
    // contains filtered or unexported fields
}

RepeatableRowSource is a RowSource used in benchmarks to avoid having to reinitialize a new RowSource every time during multiple passes of the input. It is intended to be initialized with all rows.

func NewRepeatableRowSource Uses

func NewRepeatableRowSource(types []*types.T, rows sqlbase.EncDatumRows) *RepeatableRowSource

NewRepeatableRowSource creates a RepeatableRowSource with the given schema and rows. types is optional if at least one row is provided.

func (*RepeatableRowSource) ConsumerClosed Uses

func (r *RepeatableRowSource) ConsumerClosed()

ConsumerClosed is part of the RowSource interface.

func (*RepeatableRowSource) ConsumerDone Uses

func (r *RepeatableRowSource) ConsumerDone()

ConsumerDone is part of the RowSource interface.

func (*RepeatableRowSource) Next Uses

func (r *RepeatableRowSource) Next() (sqlbase.EncDatumRow, *execinfrapb.ProducerMetadata)

Next is part of the RowSource interface.

func (*RepeatableRowSource) OutputTypes Uses

func (r *RepeatableRowSource) OutputTypes() []*types.T

OutputTypes is part of the RowSource interface.

func (*RepeatableRowSource) Reset Uses

func (r *RepeatableRowSource) Reset()

Reset resets the RepeatableRowSource such that a subsequent call to Next() returns the first row.

func (*RepeatableRowSource) Start Uses

func (r *RepeatableRowSource) Start(ctx context.Context) context.Context

Start is part of the RowSource interface.

type RowChannel Uses

type RowChannel struct {

    // The channel on which rows are delivered.
    C <-chan RowChannelMsg
    // contains filtered or unexported fields
}

RowChannel is a thin layer over a RowChannelMsg channel, which can be used to transfer rows between goroutines.

func (*RowChannel) ConsumerClosed Uses

func (rc *RowChannel) ConsumerClosed()

ConsumerClosed is part of the RowSource interface.

func (*RowChannel) ConsumerDone Uses

func (rc *RowChannel) ConsumerDone()

ConsumerDone is part of the RowSource interface.

func (*RowChannel) InitWithBufSizeAndNumSenders Uses

func (rc *RowChannel) InitWithBufSizeAndNumSenders(types []*types.T, chanBufSize, numSenders int)

InitWithBufSizeAndNumSenders initializes the RowChannel with a given buffer size and number of senders.

func (*RowChannel) InitWithNumSenders Uses

func (rc *RowChannel) InitWithNumSenders(types []*types.T, numSenders int)

InitWithNumSenders initializes the RowChannel with the default buffer size. numSenders is the number of producers that will be pushing to this channel. RowChannel will not be closed until it receives numSenders calls to ProducerDone().

func (*RowChannel) Next Uses

func (rc *RowChannel) Next() (sqlbase.EncDatumRow, *execinfrapb.ProducerMetadata)

Next is part of the RowSource interface.

func (*RowChannel) OutputTypes Uses

func (rc *RowChannel) OutputTypes() []*types.T

OutputTypes is part of the RowSource interface.

func (*RowChannel) ProducerDone Uses

func (rc *RowChannel) ProducerDone()

ProducerDone is part of the RowReceiver interface.

func (*RowChannel) Push Uses

func (rc *RowChannel) Push(
    row sqlbase.EncDatumRow, meta *execinfrapb.ProducerMetadata,
) ConsumerStatus

Push is part of the RowReceiver interface.

func (*RowChannel) Start Uses

func (rc *RowChannel) Start(ctx context.Context) context.Context

Start is part of the RowSource interface.

func (*RowChannel) Types Uses

func (rc *RowChannel) Types() []*types.T

Types is part of the RowReceiver interface.

type RowChannelMsg Uses

type RowChannelMsg struct {
    // Only one of these fields will be set.
    Row  sqlbase.EncDatumRow
    Meta *execinfrapb.ProducerMetadata
}

RowChannelMsg is the message used in the channels that implement local physical streams (i.e. the RowChannel's).

type RowReceiver Uses

type RowReceiver interface {
    // Push sends a record to the consumer of this RowReceiver. Exactly one of the
    // row/meta must be specified (i.e. either row needs to be non-nil or meta
    // needs to be non-Empty()). May block.
    //
    // The return value indicates the current status of the consumer. Depending on
    // it, producers are expected to drain or shut down. In all cases,
    // ProducerDone() needs to be called (after draining is done, if draining was
    // requested).
    //
    // Unless specifically permitted by the underlying implementation, (see
    // copyingRowReceiver, for example), the sender must not modify the row
    // and the metadata after calling this function.
    //
    // After DrainRequested is returned, it is expected that all future calls only
    // carry metadata (however that is not enforced and implementations should be
    // prepared to discard non-metadata rows). If ConsumerClosed is returned,
    // implementations have to ignore further calls to Push() (such calls are
    // allowed because there might be multiple producers for a single RowReceiver
    // and they might not all be aware of the last status returned).
    //
    // Implementations of Push() must be thread-safe.
    Push(row sqlbase.EncDatumRow, meta *execinfrapb.ProducerMetadata) ConsumerStatus

    // Types returns the types of the EncDatumRow that this RowReceiver expects
    // to be pushed.
    Types() []*types.T

    // ProducerDone is called when the producer has pushed all the rows and
    // metadata; it causes the RowReceiver to process all rows and clean up.
    //
    // ProducerDone() cannot be called concurrently with Push(), and after it
    // is called, no other method can be called.
    ProducerDone()
}

RowReceiver is any component of a flow that receives rows from another component. It can be an input synchronizer, a router, or a mailbox.

type RowSource Uses

type RowSource interface {
    // OutputTypes returns the schema for the rows in this source.
    OutputTypes() []*types.T

    // Start prepares the RowSource for future Next() calls and takes in the
    // context in which these future calls should operate. Start needs to be
    // called before Next/ConsumerDone/ConsumerClosed.
    //
    // RowSources that consume other RowSources are expected to Start() their
    // inputs.
    //
    // Implementations are expected to hold on to the provided context. They may
    // choose to derive and annotate it (Processors generally do). For convenience,
    // the possibly updated context is returned.
    Start(context.Context) context.Context

    // Next returns the next record from the source. At most one of the return
    // values will be non-empty. Both of them can be empty when the RowSource has
    // been exhausted - no more records are coming and any further method calls
    // will be no-ops.
    //
    // EncDatumRows returned by Next() are only valid until the next call to
    // Next(), although the EncDatums inside them stay valid forever.
    //
    // A ProducerMetadata record may contain an error. In that case, this
    // interface is oblivious about the semantics: implementers may continue
    // returning different rows on future calls, or may return an empty record
    // (thus asking the consumer to stop asking for rows). In particular,
    // implementers are not required to only return metadata records from this
    // point on (which means, for example, that they're not required to
    // automatically ask every producer to drain, in case there's multiple
    // producers). Therefore, consumers need to be aware that some rows might have
    // been skipped in case they continue to consume rows. Usually a consumer
    // should react to an error by calling ConsumerDone(), thus asking the
    // RowSource to drain, and separately discard any future data rows. A consumer
    // receiving an error should also call ConsumerDone() on any other input it
    // has.
    Next() (sqlbase.EncDatumRow, *execinfrapb.ProducerMetadata)

    // ConsumerDone lets the source know that we will not need any more data
    // rows. The source is expected to start draining and only send metadata
    // rows. May be called multiple times on a RowSource, even after
    // ConsumerClosed has been called.
    //
    // May block. If the consumer of the source stops consuming rows before
    // Next indicates that there are no more rows, ConsumerDone() and/or
    // ConsumerClosed() must be called; it is a no-op to call these methods after
    // all the rows were consumed (i.e. after Next() returned an empty row).
    ConsumerDone()

    // ConsumerClosed informs the source that the consumer is done and will not
    // make any more calls to Next(). Must only be called once on a given
    // RowSource.
    //
    // Like ConsumerDone(), if the consumer of the source stops consuming rows
    // before Next indicates that there are no more rows, ConsumerDone() and/or
    // ConsumerClosed() must be called; it is a no-op to call these methods after
    // all the rows were consumed (i.e. after Next() returned an empty row).
    ConsumerClosed()
}

RowSource is any component of a flow that produces rows that can be consumed by another component.

Communication components generally (e.g. RowBuffer, RowChannel) implement this interface. Some processors also implement it (in addition to implementing the Processor interface) - in which case those processors can be "fused" with their consumer (i.e. run in the consumer's goroutine).

type RowSourcedProcessor Uses

type RowSourcedProcessor interface {
    RowSource
    Run(context.Context)
}

RowSourcedProcessor is the union of RowSource and Processor.

type RuntimeStats Uses

type RuntimeStats interface {
    // GetCPUCombinedPercentNorm returns the recent user+system cpu usage,
    // normalized to 0-1 by number of cores.
    GetCPUCombinedPercentNorm() float64
}

RuntimeStats is an interface through which the rowexec layer can get information about runtime statistics.

type ServerConfig Uses

type ServerConfig struct {
    log.AmbientContext

    Settings     *cluster.Settings
    RuntimeStats RuntimeStats

    ClusterID   *base.ClusterIDContainer
    ClusterName string

    // NodeID is the id of the node on which this Server is running.
    NodeID *base.SQLIDContainer

    // Codec is capable of encoding and decoding sql table keys.
    Codec keys.SQLCodec

    // DB is a handle to the cluster.
    DB  *kv.DB
    // Executor can be used to run "internal queries". Note that Flows also have
    // access to an executor in the EvalContext. That one is "session bound"
    // whereas this one isn't.
    Executor sqlutil.InternalExecutor

    RPCContext   *rpc.Context
    Stopper      *stop.Stopper
    TestingKnobs TestingKnobs

    // ParentMemoryMonitor is normally the root SQL monitor. It should only be
    // used when setting up a server, or in tests.
    ParentMemoryMonitor *mon.BytesMonitor

    // TempStorage is used by some DistSQL processors to store rows when the
    // working set is larger than can be stored in memory.
    TempStorage diskmap.Factory

    // TempStoragePath is the path where the vectorized execution engine should
    // create files using TempFS.
    TempStoragePath string

    // TempFS is used by the vectorized execution engine to store columns when the
    // working set is larger than can be stored in memory.
    TempFS fs.FS

    // VecFDSemaphore is a weighted semaphore that restricts the number of open
    // file descriptors in the vectorized engine.
    VecFDSemaphore semaphore.Semaphore

    // BulkAdder is used by some processors to bulk-ingest data as SSTs.
    BulkAdder kvserverbase.BulkAdderFactory

    // DiskMonitor is used to monitor temporary storage disk usage. Actual disk
    // space used will be a small multiple (~1.1) of this because of RocksDB
    // space amplification.
    DiskMonitor *mon.BytesMonitor

    Metrics *DistSQLMetrics

    // JobRegistry manages jobs being used by this Server.
    JobRegistry *jobs.Registry

    // LeaseManager is a *sql.LeaseManager. It's stored as an `interface{}` due
    // to package dependency cycles
    LeaseManager interface{}

    // A handle to gossip used to broadcast the node's DistSQL version and
    // draining state.
    Gossip gossip.OptionalGossip

    NodeDialer *nodedialer.Dialer

    // SessionBoundInternalExecutorFactory is used to construct session-bound
    // executors. The idea is that a higher-layer binds some of the arguments
    // required, so that users of ServerConfig don't have to care about them.
    SessionBoundInternalExecutorFactory sqlutil.SessionBoundInternalExecutorFactory

    ExternalStorage        cloud.ExternalStorageFactory
    ExternalStorageFromURI cloud.ExternalStorageFromURIFactory

    // ProtectedTimestampProvider maintains the state of the protected timestamp
    // subsystem. It is queried during the GC process and in the handling of
    // AdminVerifyProtectedTimestampRequest.
    ProtectedTimestampProvider protectedts.Provider

    // RangeCache is used by processors that were supposed to have been planned on
    // the leaseholders of the data ranges that they're consuming. These
    // processors query the cache to see if they should communicate updates to the
    // gateway.
    RangeCache *kvcoord.RangeDescriptorCache
}

ServerConfig encompasses the configuration required to create a DistSQLServer.

type TestingKnobs Uses

type TestingKnobs struct {
    // RunBeforeBackfillChunk is called before executing each chunk of a
    // backfill during a schema change operation. It is called with the
    // current span and returns an error which eventually is returned to the
    // caller of SchemaChanger.exec(). It is called at the start of the
    // backfill function passed into the transaction executing the chunk.
    RunBeforeBackfillChunk func(sp roachpb.Span) error

    // RunAfterBackfillChunk is called after executing each chunk of a
    // backfill during a schema change operation. It is called just before
    // returning from the backfill function passed into the transaction
    // executing the chunk. It is always called even when the backfill
    // function returns an error, or if the table has already been dropped.
    RunAfterBackfillChunk func()

    // ForceDiskSpill forces any processors/operators that can fall back to disk
    // to fall back to disk immediately.
    ForceDiskSpill bool

    // MemoryLimitBytes specifies a maximum amount of working memory that a
    // processor that supports falling back to disk can use. Must be >= 1 to
    // enable. This is a more fine-grained knob than ForceDiskSpill when the
    // available memory needs to be controlled. Once this limit is hit, processors
    // employ their on-disk implementation regardless of applicable cluster
    // settings.
    MemoryLimitBytes int64

    // DrainFast, if enabled, causes the server to not wait for any currently
    // running flows to complete or give a grace period of minFlowDrainWait
    // to incoming flows to register.
    DrainFast bool

    // MetadataTestLevel controls whether or not additional metadata test
    // processors are planned, which send additional "RowNum" metadata that is
    // checked by a test receiver on the gateway.
    MetadataTestLevel MetadataTestLevel

    // DeterministicStats overrides stats which don't have reliable values, like
    // stall time and bytes sent. It replaces them with a zero value.
    DeterministicStats bool

    // CheckVectorizedFlowIsClosedCorrectly checks that all components in a flow
    // were closed explicitly in flow.Cleanup.
    CheckVectorizedFlowIsClosedCorrectly bool

    // Changefeed contains testing knobs specific to the changefeed system.
    Changefeed base.ModuleTestingKnobs

    // Flowinfra contains testing knobs specific to the flowinfra system
    Flowinfra base.ModuleTestingKnobs

    // EnableVectorizedInvariantsChecker, if enabled, will allow for planning
    // the invariant checkers between all columnar operators.
    EnableVectorizedInvariantsChecker bool

    // Forces bulk adder flush every time a KV batch is processed.
    BulkAdderFlushesEveryBatch bool

    // JobsTestingKnobs is jobs infra specific testing knobs.
    JobsTestingKnobs base.ModuleTestingKnobs
}

TestingKnobs are the testing knobs.

func (*TestingKnobs) ModuleTestingKnobs Uses

func (*TestingKnobs) ModuleTestingKnobs()

ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface.

Package execinfra imports 40 packages (graph) and is imported by 40 packages. Updated 2020-08-12. Refresh now. Tools for package owners.