cockroach: github.com/cockroachdb/cockroach/pkg/storage/rangefeed Index | Files

package rangefeed

import "github.com/cockroachdb/cockroach/pkg/storage/rangefeed"

Index

Package Files

metrics.go processor.go registry.go resolved_timestamp.go task.go

type Config Uses

type Config struct {
    log.AmbientContext
    Clock *hlc.Clock
    Span  roachpb.RSpan

    TxnPusher TxnPusher
    // PushTxnsInterval specifies the interval at which a Processor will push
    // all transactions in the unresolvedIntentQueue that are above the age
    // specified by PushTxnsAge.
    PushTxnsInterval time.Duration
    // PushTxnsAge specifies the age at which a Processor will begin to consider
    // a transaction old enough to push.
    PushTxnsAge time.Duration

    // EventChanCap specifies the capacity to give to the Processor's input
    // channel.
    EventChanCap int
    // EventChanTimeout specifies the maximum duration that methods will
    // wait to send on the Processor's input channel before giving up and
    // shutting down the Processor. 0 for no timeout.
    EventChanTimeout time.Duration

    // CheckStreamsInterval specifies interval at which a Processor will check
    // all streams to make sure they have not been canceled.
    CheckStreamsInterval time.Duration

    // Metrics is for production monitoring of RangeFeeds.
    Metrics *Metrics
}

Config encompasses the configuration required to create a Processor.

func (*Config) SetDefaults Uses

func (sc *Config) SetDefaults()

SetDefaults initializes unset fields in Config to values suitable for use by a Processor.

type Metrics Uses

type Metrics struct {
    RangeFeedCatchupScanNanos *metric.Counter

    RangeFeedSlowClosedTimestampLogN  log.EveryN
    RangeFeedSlowClosedTimestampNudge singleflight.Group
    // RangeFeedSlowClosedTimestampNudgeSem bounds the amount of work that can be
    // spun up on behalf of the RangeFeed nudger. We don't expect to hit this
    // limit, but it's here to limit the effect on stability in case something
    // unexpected happens.
    RangeFeedSlowClosedTimestampNudgeSem chan struct{}
}

Metrics are for production monitoring of RangeFeeds.

func NewMetrics Uses

func NewMetrics() *Metrics

NewMetrics makes the metrics for RangeFeeds monitoring.

func (*Metrics) MetricStruct Uses

func (*Metrics) MetricStruct()

MetricStruct implements the metric.Struct interface.

type Processor Uses

type Processor struct {
    Config
    // contains filtered or unexported fields
}

Processor manages a set of rangefeed registrations and handles the routing of logical updates to these registrations. While routing logical updates to rangefeed registrations, the processor performs two important tasks: 1. it translates logical updates into rangefeed events. 2. it transforms a range-level closed timestamp to a rangefeed-level resolved

timestamp.

func NewProcessor Uses

func NewProcessor(cfg Config) *Processor

NewProcessor creates a new rangefeed Processor. The corresponding goroutine should be launched using the Start method.

func (*Processor) ConsumeLogicalOps Uses

func (p *Processor) ConsumeLogicalOps(ops ...enginepb.MVCCLogicalOp) bool

ConsumeLogicalOps informs the rangefeed processor of the set of logical operations. It returns false if consuming the operations hit a timeout, as specified by the EventChanTimeout configuration. If the method returns false, the processor will have been stopped, so calling Stop is not necessary. Safe to call on nil Processor.

func (*Processor) ForwardClosedTS Uses

func (p *Processor) ForwardClosedTS(closedTS hlc.Timestamp) bool

ForwardClosedTS indicates that the closed timestamp that serves as the basis for the rangefeed processor's resolved timestamp has advanced. It returns false if forwarding the closed timestamp hit a timeout, as specified by the EventChanTimeout configuration. If the method returns false, the processor will have been stopped, so calling Stop is not necessary. Safe to call on nil Processor.

func (*Processor) Len Uses

func (p *Processor) Len() int

Len returns the number of registrations attached to the processor.

func (*Processor) Register Uses

func (p *Processor) Register(
    span roachpb.RSpan,
    startTS hlc.Timestamp,
    catchupIter engine.SimpleIterator,
    stream Stream,
    errC chan<- *roachpb.Error,
) bool

Register registers the stream over the specified span of keys.

The registration will not observe any events that were consumed before this method was called. It is undefined whether the registration will observe events that are consumed concurrently with this call. The channel will be provided an error when the registration closes.

The optionally provided "catch-up" iterator is used to read changes from the engine which occurred after the provided start timestamp.

If the method returns false, the processor will have been stopped, so calling Stop is not necessary.

NOT safe to call on nil Processor.

func (*Processor) Start Uses

func (p *Processor) Start(stopper *stop.Stopper, rtsIter engine.SimpleIterator)

Start launches a goroutine to process rangefeed events and send them to registrations.

The provided iterator is used to initialize the rangefeed's resolved timestamp. It must obey the contract of an iterator used for an initResolvedTSScan. The Processor promises to clean up the iterator by calling its Close method when it is finished. If the iterator is nil then no initialization scan will be performed and the resolved timestamp will immediately be considered initialized.

func (*Processor) Stop Uses

func (p *Processor) Stop()

Stop shuts down the processor and closes all registrations. Safe to call on nil Processor. It is not valid to restart a processor after it has been stopped.

func (*Processor) StopWithErr Uses

func (p *Processor) StopWithErr(pErr *roachpb.Error)

StopWithErr shuts down the processor and closes all registrations with the specified error. Safe to call on nil Processor. It is not valid to restart a processor after it has been stopped.

type Stream Uses

type Stream interface {
    // Context returns the context for this stream.
    Context() context.Context
    // Send blocks until it sends m, the stream is done, or the stream breaks.
    // Send must be safe to call on the same stream in different goroutines.
    Send(*roachpb.RangeFeedEvent) error
}

Stream is a object capable of transmitting RangeFeedEvents.

type TxnPusher Uses

type TxnPusher interface {
    // PushTxns attempts to push the specified transactions to a new
    // timestamp. It returns the resulting transaction protos.
    PushTxns(context.Context, []enginepb.TxnMeta, hlc.Timestamp) ([]roachpb.Transaction, error)
    // CleanupTxnIntentsAsync asynchronously cleans up intents owned
    // by the specified transactions.
    CleanupTxnIntentsAsync(context.Context, []roachpb.Transaction) error
}

TxnPusher is capable of pushing transactions to a new timestamp and cleaning up the intents of transactions that are found to be committed.

Package rangefeed imports 22 packages (graph) and is imported by 1 packages. Updated 2019-07-13. Refresh now. Tools for package owners.