luci: go.chromium.org/luci/tumble Index | Files

package tumble

import "go.chromium.org/luci/tumble"

Package tumble is a distributed multi-stage transaction processor for appengine.

What it is

Tumble allows you to make multi-entity-group transaction chains, even when you need to affect more than the number of entities allowed by appengine (currently capped at 25 entity groups). These chains can be transactionally started from a single entity group, and will process 'in the background'. Tumble guarantees that once a transaction chain starts, it will eventually complete, though it makes no guarantees of how long that might take.

This can be used for doing very large-scale fan-out, and also for large-scale fan-in.

How it works

An app using tumble declares one or more Mutation object. These objects are responsible for enacting a single link in the transaction chain, and may affect entities within a single entity group. Mutations must be idempotent (they will occasionally be run more than once). Mutations primarially implement a RollForward method which transactionally manipulates an entity, and then returns zero or more Mutations (which may be for other entities). Tumble's task queues and/or cron job (see Setup), will eventually pick up these new Mutations and process them, possibly introducing more Mutations, etc.

When the app wants to begin a transaction chain, it uses tumble.EnterTransaction, allows the app to transactionally manipulate the starting entity, and also return one or more Mutation objects. If the transaction is successful, EnterTransaction will also fire off any necessary taskqueue tasks to process the new mutations in the background.

When the transaction is committed, it's committed along with all the Mutations it produced. Either they're all committed successfully (and so the tumble transaction chain is started), or none of them are committed.

Required Setup

There are a couple prerequisites for using tumble.

1. You must register the tumble routes in your appengine module. You can do this like:

import (
  "net/http"

  "github.com/julienschmidt/httprouter"
  "go.chromium.org/luci/appengine/gaemiddleware"
  "go.chromium.org/luci/tumble"
)

var tumbleService = tumble.Service{}

def init() {
  router := httprouter.New()
  tumbleService.InstallHandlers(router, gaemiddleware.BaseProd())
  http.Handle("/", router)
}

Make sure /internal/tumble routes in app.yaml (and/or dispatch.yaml) point to the module with the Tumble. Additionally, make sure Tumble routes are protected with `login: admin`, as they should never be accessed from non-backend processes.

For example:

handlers:
- url: /internal/tumble/.*
  script: _go_app
  secure: always
  login: admin

2. You must add the following index to your index.yaml:

- kind: tumble.Mutation
  properties:
  - name: ExpandedShard
  - name: TargetRoot

2a. If you enable DelayedMutations in your configuration, you must also add

- kind: tumble.Mutation
  properties:
  - name: TargetRoot
  - name: ProcessAfter

3. You must add a new taskqueue for tumble (example parameters):

- name: tumble
  rate: 32/s
  bucket_size: 32
  retry_parameters:
    task_age_limit: 2m   # aggressive task age pruning is desirable
    min_backoff_seconds: 2
    max_backoff_seconds: 6
    max_doublings: 7     # tops out at 2**(6 - 1) * 2 == 128 sec

4. All Mutation implementations must be registered at init() time using tumble.Register((*MyMutation)(nil)).

Optional Setup

You may choose to add a new cron entry. This prevents work from slipping through the cracks. If your app has constant tumble throughput and good key distribution, this is not necessary.

- description: tumble fire_all_tasks invocation
  url: /internal/tumble/fire_all_tasks
  schedule: every 5 minutes  # maximum task latency you can tolerate.

Index

Package Files

add_to_journal.go config.go doc.go fire_tasks.go model_mutation.go process.go service.go settings.go tumble.go tumbletest.go

func AddToJournal Uses

func AddToJournal(c context.Context, m ...Mutation) error

AddToJournal records one or more Mutation to the tumble journal, but does not execute any of them. This does so by running a transaction on a pseudo-random entity group, and journaling the mutations there.

func CancelNamedMutations Uses

func CancelNamedMutations(c context.Context, parent *ds.Key, names ...string) error

CancelNamedMutations does a best-effort cancellation of the named mutations.

func PutNamedMutations Uses

func PutNamedMutations(c context.Context, parent *ds.Key, muts map[string]Mutation) error

PutNamedMutations writes the provided named mutations to the datastore.

Named mutations are singletons children of a given `parent`. So for any given `parent`, there can be only one mutation with any given name. Named mutations, by design, cannot collide with unnamed (anonymous) mutations. `parent` does NOT need to be a root Key; the mutations will be created as direct descendants of the provided parent entity. The names of the various mutations should be valid datastore key StringIDs, which generally means that they should be UTF-8 strings.

The current implementation reserves 2 characters of the StringID (as of writing this means that a named mutation name may only be 498 bytes long).

This can be used to leverage tumble for e.g. cancellable, delayed cleanup tasks (like timeouts).

This may be called from within an existing datastore transaction which includes `parent` to make this Put atomic with the remainder of the transaction.

If called multiple times with the same name, the newly named mutation will overwrite the existing mutation (assuming it hasn't run already).

func Register Uses

func Register(mut Mutation)

Register allows |mut| to be played by the tumble backend. This should be called at init() time once for every Mutation implementation.

This will also gob.Register your mutation implementation.

Example:

Register((*MyMutationImpl)(nil))

func RunMutation Uses

func RunMutation(c context.Context, m Mutation) error

RunMutation immediately runs the Mutation `m` in a transaction. This method should be used to start a tumble chain when you have transactional checks to do (e.g. `m` implements the first transactional link in the chain).

Usually this is called from your application's handlers to begin a tumble state machine as a result of some API interaction.

func RunUnbuffered Uses

func RunUnbuffered(c context.Context, root *ds.Key, fn func(context.Context) ([]Mutation, error)) error

RunUnbuffered opens a lightweight unbuffered transaction on "root" runs "fn" inside of it. Any mutations returned by "fn" will be registered at the end of the transaction if "fn" doesn't return an error.

This is useful as an initial starting point without incurring any of the overhead of spinning up a new buffered transaction.

During "fn"'s execution, standard Tumble operations such as PutNamedMutation and CancelNamedMutation may be performed.

type Config Uses

type Config struct {
    // NumShards is the number of tumble shards that will process concurrently.
    // It defaults to 32.
    NumShards uint64 `json:"numShards,omitempty"`

    // NumGoroutines is the number of gorountines that will process in parallel
    // in a single shard. Each goroutine will process exactly one root entity.
    // It defaults to 16.
    NumGoroutines int `json:"numGoroutines,omitempty"`

    // TemporalMinDelay is the minimum number of seconds to wait before the
    // task queue entry for a given shard will run. It defaults to 1 second.
    TemporalMinDelay clockflag.Duration `json:"temporalMinDelay,omitempty"`

    // TemporalRoundFactor is the number of seconds to batch together in task
    // queue tasks. It defaults to 4 seconds.
    TemporalRoundFactor clockflag.Duration `json:"temporalRoundFactor,omitempty"`

    // ProcessLoopDuration is the maximum lifetime of a process loop. A process
    // batch will refrain from re-entering its loop after this much time has
    // elapsed.
    //
    // This is not a hard termination boundary. If a loop round starts before
    // ProcessLoopDuration has been reached, it will be permitted to continue past
    // the duration.
    //
    // If this is <= 0, the process will loop at most once.
    ProcessLoopDuration clockflag.Duration `json:"processLoopDuration,omitempty"`

    // DustSettleTimeout is the amount of time to wait in between mutation
    // processing iterations.
    //
    // This should be chosen as a compromise between higher expectations of the
    // eventually-consistent datastore and task processing latency.
    DustSettleTimeout clockflag.Duration `json:"dustSettleTimeout,omitempty"`

    // MaxNoWorkDelay is the maximum amount of time to wait in between mutation
    // processing iterations when there was no work the previous iteration.
    //
    // When no work has been done, each round will begin by waiting
    // DustSettleTimeout seconds (minimum of 1 second). If no work was done that
    // round, this will continue to exponentially grow each successive no-work
    // round until capped at MaxNoWorkDelay. If work is encountered during any
    // round, the delay is reset.
    //
    // If MaxNoWorkDelay is <= 0, the delay will continue exponentially growing
    // until the shard terminates.
    //
    // This should be chosen as a compromise between higher expectations of the
    // eventually-consistent datastore and task processing latency.
    MaxNoWorkDelay clockflag.Duration `json:"MaxNoWorkDelay,omitempty"`

    // NoWorkDelayGrowth is the exponential growth factor for the delay in
    // between processing loop rounds when no work was done.
    //
    // If NoWorkDelayGrowth is <= 1, a growth factor of 1 will be used.
    NoWorkDelayGrowth int `json:"NoWorkDelayGrowth,omitempty"`

    // ProcessMaxBatchSize is the number of mutations that each processor
    // goroutine will attempt to include in each commit.
    //
    // It defaults to 128. A negative value means no limit.
    ProcessMaxBatchSize int `json:"processMaxBatchSize,omitempty"`

    // DelayedMutations enables the 'DelayedMutation' mutation subtype.
    //
    // If you set this to true, you MUST also add the second index mentioned
    // in the package docs.
    DelayedMutations bool `json:"delayedMutations,omitempty"`
}

Config is the set of tweakable things for tumble. If you use something other than the defaults (e.g. unset values), you must ensure that all aspects of your application use the same config.

The JSON annotations are for settings module storage (see settings.go).

type DelayedMutation Uses

type DelayedMutation interface {
    Mutation

    // ProcessAfter will be called once when scheduling this Mutation. The
    // mutation will be recorded to datastore immediately, but tumble will skip it
    // for processing until at least the time that's returned here. Multiple calls
    // to this method should always return the same time.
    //
    // A Time value in the past will get reset to "next available time slot",
    // unless HighPriority() returns true.
    ProcessAfter() time.Time

    // HighPriority indicates that this mutation should be processed before
    // others, if possible, and must be set in conjunction with a ProcessAfter
    // timestamp that occurs in the past.
    //
    // Tumble works by processing Mutations in the order of their creation, or
    // ProcessAfter times, whichever is later. If HighPriority is true, then a
    // ProcessAfter time in the past will take precedence over Mutations which
    // may actually have been recorded after this one, in the event that tumble
    // is processing tasks slower than they're being created.
    HighPriority() bool
}

DelayedMutation is a Mutation which allows you to defer its processing until a certain absolute time.

As a side effect, tumble will /mostly/ process items in their chronological ProcessAfter order, instead of the undefined order.

Your tumble configuration must have DelayedMutations set, and you must have added the appropriate index to use these. If DelayedMutations is not set, then tumble will ignore the ProcessAfter and HighPriorty values here, and process mutations as quickly as possible in no particular order.

type Mutation Uses

type Mutation interface {
    // Root returns a datastore.Key which will be used to derive the Key for the
    // entity group which this Mutation will operate on. This is used to batch
    // together Entries for more efficient processing.
    //
    // Returning nil is an error.
    Root(c context.Context) *ds.Key

    // RollForward performs the action of the Mutation.
    //
    // It is only considered successful if it returns nil. If it returns non-nil,
    // then it will be retried at a later time. If it never returns nil, then it
    // will never be flushed from tumble's queue, and you'll have to manually
    // delete it or fix the code so that it can be handled without error.
    //
    // This method runs inside of a single-group transaction. It must modify only
    // the entity group specified by Root().
    //
    // As a side effect, RollForward may return new arbitrary Mutations. These
    // will be committed in the same transaction as RollForward.
    //
    // The context contains an implementation of "luci/gae/service/datastore",
    // using the "luci/gae/filter/txnBuf" transaction buffer. This means that
    // all functionality (including additional transactions) is available, with
    // the limitations mentioned by that package (notably, no cursors are
    // allowed).
    RollForward(c context.Context) ([]Mutation, error)
}

Mutation is the interface that your tumble mutations must implement.

Mutation implementations can be registered with the Register function.

type Service Uses

type Service struct {
    // Namespaces is a function that returns the datastore namespaces that Tumble
    // will poll.
    //
    // If nil, Tumble will be executed against all namespaces registered in the
    // datastore.
    Namespaces func(context.Context) ([]string, error)
}

Service is an instance of a Tumble service. It installs its handlers into an HTTP router and services Tumble request tasks.

func (*Service) FireAllTasks Uses

func (s *Service) FireAllTasks(c context.Context) error

FireAllTasks searches for work in all namespaces, and fires off a process task for any shards it finds that have at least one Mutation present to ensure that no work languishes forever. This may not be needed in a constantly-loaded system with good tumble key distribution.

func (*Service) FireAllTasksHandler Uses

func (s *Service) FireAllTasksHandler(c *router.Context)

FireAllTasksHandler is an HTTP handler that expects `logging` and `luci/gae` services to be installed into the context.

FireAllTasksHandler verifies that it was called within an Appengine Cron request, and then invokes the FireAllTasks function.

func (*Service) InstallHandlers Uses

func (s *Service) InstallHandlers(r *router.Router, base router.MiddlewareChain)

InstallHandlers installs http handlers.

'base' is usually gaemiddleware.BaseProd(), but can also be its derivative if something else it needed in the context.

func (*Service) ProcessShardHandler Uses

func (s *Service) ProcessShardHandler(ctx *router.Context, loop bool)

ProcessShardHandler is an HTTP handler that expects `logging` and `luci/gae` services to be installed into the context.

ProcessShardHandler verifies that its being run as a taskqueue task and that the following parameters exist and are well-formed:

* timestamp: decimal-encoded UNIX/UTC timestamp in seconds.
* shard_id: decimal-encoded shard identifier.

ProcessShardHandler then invokes ProcessShard with the parsed parameters. It runs in the namespace of the task which scheduled it and processes mutations for that namespace.

type Testing Uses

type Testing struct {
    Service
}

Testing is a high-level testing object for testing applications that use tumble.

func (*Testing) AdvanceTime Uses

func (t *Testing) AdvanceTime(c context.Context)

AdvanceTime advances the test clock enough so that Iterate will be able to pick up tasks in the task queue.

func (*Testing) Context Uses

func (t *Testing) Context() context.Context

Context generates a correctly configured context with:

* luci/gae/impl/memory
* luci/luci-go/common/clock/testclock
* luci/luci-go/common/logging/memlogger
* luci/luci-go/server/settings (MemoryStorage)

It also correctly configures the "tumble.Mutation" indexes and taskqueue named in this Testing config.

func (*Testing) Drain Uses

func (t *Testing) Drain(c context.Context) int

Drain will run a loop, advancing time and iterating through tumble mutations until tumble's queue is empty. It returns the total number of processed shards.

func (*Testing) DrainAll Uses

func (t *Testing) DrainAll(c context.Context) int

DrainAll iterates over all namespaces and drains each independently.

func (*Testing) DumpLog Uses

func (t *Testing) DumpLog(c context.Context)

DumpLog dumps the current memory logger to stdout to help with debugging.

func (*Testing) EnableDelayedMutations Uses

func (t *Testing) EnableDelayedMutations(c context.Context)

EnableDelayedMutations turns on delayed mutations for this context.

func (*Testing) FireAllTasks Uses

func (t *Testing) FireAllTasks(c context.Context)

FireAllTasks will force all tumble shards to run in the future.

func (*Testing) GetConfig Uses

func (t *Testing) GetConfig(c context.Context) *Config

GetConfig retrieves the current tumble settings

func (*Testing) Iterate Uses

func (t *Testing) Iterate(c context.Context) int

Iterate makes a single iteration of the tumble service worker, and returns the number of shards that were processed. Iterate operates on the Context's current namespace.

It will skip all work items if the test clock hasn't advanced in time enough.

func (*Testing) IterateAll Uses

func (t *Testing) IterateAll(c context.Context) int

IterateAll iterates over all namespaces and calls Iterate on each.

func (*Testing) MustGetNamespaces Uses

func (t *Testing) MustGetNamespaces(c context.Context) []string

MustGetNamespaces returns all active namespaces in t's Service.

If the namespace function returns an error, MustGetNamespaces will panic.

func (*Testing) ResetLog Uses

func (t *Testing) ResetLog(c context.Context)

ResetLog resets the current memory logger to the empty state.

func (*Testing) UpdateSettings Uses

func (t *Testing) UpdateSettings(c context.Context, cfg *Config)

UpdateSettings changes the tumble settings in the context to match cfg.

If cfg == nil, this resets the settings to their default values.

Package tumble imports 45 packages (graph) and is imported by 16 packages. Updated 2018-12-16. Refresh now. Tools for package owners.