luci: Index | Files

package tqtesting

import ""

Package tqtesting contains helpers for running server/tq in tests and on localhost.


Package Files

doc.go loopback.go options.go scheduler.go


const ClockTag = "tq-scheduler-sleep"

ClockTag tags the clock used in scheduler's sleep.

func TasksCollector Uses

func TasksCollector(tl *TaskList) func(context.Context, *Task)

TasksCollector returns a callback that adds tasks to the given list.

Can be passed as TaskSucceeded or TaskFailed callback to the Scheduler.

Synchronizes access to the list internally, but the list should be read from only when the Scheduler is paused.

type Executor Uses

type Executor interface {
    // Execute is called from Run to execute the task.
    // The executor may execute the task right away in a blocking way or dispatch
    // it to some other goroutine. Either way it must call `done` callback when it
    // is done executing the task, indicating whether the task should be
    // reenqueued for a retry.
    // It is safe to call Scheduler's Submit from inside Execute.
    // Receives the exact same context as Run(...), in particular this context
    // is canceled when Run is done.
    Execute(ctx context.Context, t *Task, done func(retry bool))

Executor knows how to execute tasks when their ETA arrives.

type LoopbackHTTPExecutor Uses

type LoopbackHTTPExecutor struct {
    Handler http.Handler

LoopbackHTTPExecutor is an Executor that executes tasks by calling the given HTTP handler.

func (*LoopbackHTTPExecutor) Execute Uses

func (e *LoopbackHTTPExecutor) Execute(ctx context.Context, t *Task, done func(retry bool))

Execute dispatches the task to the HTTP handler in a dedicated goroutine.

Marks the task as failed if the response status code is outside of range [200-299].

type RunOption Uses

type RunOption interface {
    // contains filtered or unexported methods

RunOption influences behavior of Run call.

func ParallelExecute Uses

func ParallelExecute() RunOption

ParallelExecute instructs the scheduler to call executor's Execute method in a separate goroutine instead of serially in Run.

This more closely resembles real-life behavior but may introduce more unpredictability into tests due to races.

func StopAfterTask Uses

func StopAfterTask(taskClassID string) RunOption

StopAfterTask will stop the scheduler after it finishes executing the task of matching task class ID.

func StopWhenDrained Uses

func StopWhenDrained() RunOption

StopWhenDrained will stop the scheduler after it finishes executing the last task and there are no more tasks scheduled.

It is naturally racy if there are other goroutines that submit tasks concurrently. In this situation there may be a pending queue of tasks even if Run stops.

type Scheduler Uses

type Scheduler struct {
    // Executor knows how to execute tasks when their ETA arrives.
    Executor Executor

    // MaxAttempts is the maximum number of attempts for a task, including the
    // first attempt.
    // If negative the number of attempts is unlimited.
    // Default is 20.
    MaxAttempts int

    // MinBackoff is an initial retry delay for failed tasks.
    // It is doubled after each failed attempt until it reaches MaxBackoff after
    // which it stays constant.
    // Default is 1 sec.
    MinBackoff time.Duration

    // MaxBackoff is an upper limit on a retry delay.
    // Default is 5 min.
    MaxBackoff time.Duration

    // TaskSucceeded is called from within the executor's `done` callback whenever
    // a task finishes successfully, perhaps after a bunch of retries.
    // Receives the same context as passed to Run.
    TaskSucceeded func(ctx context.Context, task *Task)

    // TaskFailed is called from within the executor's `done` callback whenever
    // a task fails after being attempted MaxAttempts times.
    // Receives the same context as passed to Run.
    TaskFailed func(ctx context.Context, task *Task)
    // contains filtered or unexported fields

Scheduler knows how to execute submitted tasks when they are due.

This is a very primitive in-memory unholy hybrid of Cloud Tasks and PubSub services that can be used in tests and on localhost.

Must be configured before the first Run call.Can be reconfigured between Run calls, but changing the configuration while Run is running is not allowed.

Scheduler implements tq.Submitter interface.

func (*Scheduler) Run Uses

func (s *Scheduler) Run(ctx context.Context, opts ...RunOption)

Run executes the scheduler's loop until the context is canceled or one of the stop conditions are hit.

By default executes tasks serially. Pass ParallelExecute() option to execute them asynchronously.

Upon exit all executing tasks has finished, there still may be pending tasks.

Panics if Run is already running (perhaps in another goroutine).

func (*Scheduler) Submit Uses

func (s *Scheduler) Submit(ctx context.Context, p *reminder.Payload) error

Submit schedules a task for later execution.

func (*Scheduler) Tasks Uses

func (s *Scheduler) Tasks() TaskList

Tasks returns a snapshot of the scheduler state.

Recalculates it from scratch, so it is a pretty expensive call.

Tasks are ordered by ETA: currently executing tasks first, then scheduled tasks.

type Task Uses

type Task struct {
    Payload proto.Message // a clone of the original AddTask payload, if available

    Task    *taskspb.Task           // a clone of the Cloud Tasks task as passed to Submit
    Message *pubsubpb.PubsubMessage // a clone of the PubSub message as passed to Submit

    Name  string    // full task name (perhaps generated)
    Class string    // TaskClass.ID passed in RegisterTaskClass.
    ETA   time.Time // when the task is due, always set at now or in future

    Finished  time.Time // when the task finished last execution attempt
    Attempts  int       // 0 initially, incremented before each execution attempt
    Executing bool      // true if executing right now
    // contains filtered or unexported fields

Task represents an enqueued or executing task.

func (*Task) Copy Uses

func (t *Task) Copy() *Task

Copy makes a shallow copy of the task.

type TaskList Uses

type TaskList []*Task

TaskList is a collection of tasks.

func (TaskList) Executing Uses

func (tl TaskList) Executing() TaskList

Executing returns a list of tasks executing right now.

func (TaskList) Filter Uses

func (tl TaskList) Filter(cb func(*Task) bool) TaskList

Filter returns a new task list with tasks matching the filter.

func (TaskList) Payloads Uses

func (tl TaskList) Payloads() []proto.Message

Payloads returns a list with individual task payloads.

func (TaskList) Pending Uses

func (tl TaskList) Pending() TaskList

Pending returns a list of tasks waiting execution.

func (TaskList) SortByETA Uses

func (tl TaskList) SortByETA() TaskList

SortByETA sorts the list in-place by ETA.

Returns it to allow chaining calls.

Package tqtesting imports 22 packages (graph) and is imported by 6 packages. Updated 2021-01-26. Refresh now. Tools for package owners.