luci: Index | Files

package sweep

import ""


Package Files

batching.go distributed.go helpers.go scan.go

func Scan Uses

func Scan(ctx context.Context, p *ScanParams) ([]*reminder.Reminder, partition.SortedPartitions)

Scan scans the given partition of the Reminders' keyspace.

Returns a list of stale reminders which likely match crashed AddTask calls. The caller is expected to eventually execute corresponding Cloud Tasks calls and delete these reminders, lest they'll be rediscovered during the next scan.

If unable to complete the scan of the given part of the keyspace and Level is less than 2, it intelligently partitions the not-yet-scanned keyspace into several partitions for the follow up and returns them as well.

Logs errors inside, but doesn't return them.

type BatchProcessor Uses

type BatchProcessor struct {
    Context   context.Context    // the context to use for processing
    DB        db.DB              // DB to use to fetch reminders from
    Submitter internal.Submitter // knows how to submit tasks

    BatchSize         int // max size of a single reminder batch
    ConcurrentBatches int // how many concurrent batches to process
    // contains filtered or unexported fields

BatchProcessor handles reminders in batches.

func (*BatchProcessor) Enqueue Uses

func (p *BatchProcessor) Enqueue(ctx context.Context, r []*reminder.Reminder)

Enqueue adds reminder to the to-be-processed queue.

Must be called only between Start and Stop. Drops reminders on the floor if the context is canceled.

func (*BatchProcessor) Start Uses

func (p *BatchProcessor) Start() error

Start launches background processor goroutines.

func (*BatchProcessor) Stop Uses

func (p *BatchProcessor) Stop() int

Stop waits until all enqueues reminders are processed and then stops the processor.

Returns the total number of successfully processed reminders.

type Distributed Uses

type Distributed struct {
    // EnqueueSweepTask submits the task for execution somewhere in the fleet.
    EnqueueSweepTask func(ctx context.Context, task *tqpb.SweepTask) error
    // Submitter is used to submit Cloud Tasks requests.
    Submitter internal.Submitter

Distributed implements distributed sweeping.

Requires its EnqueueSweepTask callback to be configured in a way that enqueued tasks eventually result in ExecSweepTask call (perhaps in a different process).

func (*Distributed) ExecSweepTask Uses

func (d *Distributed) ExecSweepTask(ctx context.Context, task *tqpb.SweepTask) error

ExecSweepTask executes a previously enqueued sweep task.

Note: we never want to retry failed ExecSweepTask. These tasks fork. If we retry on transient errors that are not really transient we may accidentally blow up with exponential number of tasks. Better just to wait for the next fresh sweep. For that reason the implementation is careful not to return errors marked with transient.Tag.

type ScanParams Uses

type ScanParams struct {
    DB            db.DB                // DB to use to fetch reminders
    Partition     *partition.Partition // the keyspace partition to scan
    KeySpaceBytes int                  // length of the reminder keys (usually 16)

    TasksPerScan        int // caps maximum number of reminders to process
    SecondaryScanShards int // caps the number of follow-up scans

    Level int // recursion level (0 == the root task)

ScanParams contains parameters for the Scan call.

Package sweep imports 18 packages (graph) and is imported by 2 packages. Updated 2021-01-23. Refresh now. Tools for package owners.