locker

package module
v0.0.0-...-4eddc46 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 8, 2017 License: Apache-2.0 Imports: 12 Imported by: 1

README

Datastore Locker

Provides a lock / lease mechanism to prevent duplicate execution of appengine tasks and an easy way to continue long-running processes. Used by datastore-mapper.

Sometimes it's extremely difficult if not impossible to make tasks truly idempotent. e.g. if your task sends an email or charges a credit card then executing it twice could be 'a bad thing'. Actually preventing a task from running more than once can be challenging though ...

Although named tasks can be used to prevent duplicate tasks being enqueued they cannot be used together with datastore transactions which leaves you with the possibility that either the task scheduling or the datastore write could fail.

An unnamed task can be enqueued within a transaction to ensure that both happen or neither happen but that then leaves the possibility of that operation being repeated (if running in it's own task) which could cause duplicate tasks.

Whichever approach is used, the taskqueue only promises at-least-once delivery so there is also the chance that appengine will execute a task more than once.

The solution is to coordinate execution using information in the task with information in a datastore entity. This package aims to make it easy to restrict task execution by using a lease / lock mechanism.

By obtaining a lock on an entity within a datastore transaction we ensure that only a single instance of any task will be executed at once and, once processed, that duplicate execution will be prevented.

It can be used with single tasks or to chain a series of tasks in sequence with the sequence number used to prevent any old tasks being re-executed.

An exceptional situation can occur if a failure happens during processing of a task and the result cannot be communicated back to appengine (this is a platform issue). In this case the lock / lease is already held but the system cannot determine if the task completed or maybe it just failed to clear the lock. The locker will allow a timeout before querying the appengine logs to determine the task status. In the case of a complete failure with no log information, a timeout will prevent deadlock by overwriting the expired lock / lease.

Both overwritten locks and permanently failing tasks (past a configurable number of retries) can be alerted by email as needing further investigation.

Usage

See the example project for a simple demonstration of locker being used.

Embed the locker.Lock field within the struct you want lock on.

Foo struct {
    locker.Struct
    Value string `datastore:"value"`  
}

Create an instance of the locker and configure as required:

l := locker.NewLocker()

l := locker.NewLocker(
  locker.LeaseDuration(time.Duration(5)*time.Minute),
  locker.LeaseTimeout(time.Duration(15)*time.Minute),
  locker.AlertOnOverwrite,
)

l := locker.NewLocker(locker.LogVerbose)

Schedule a task to be executed once:

key := datastore.NewKey(c, "foo", "", 1, nil)
entity := &Foo{Value:"bar"}
err := l.Schedule(c, key, entity, "/task/handler/url", nil)
if err != nil {
  // operation failed (entity not saved and task not enqueued)
}

Handle the task execution:

func init() {
  http.Handle("/task/handler/url", locker.Handle(fooHandler, fooFactory)
}

// the task handler needs a factory to construct an instance of our entity
func fooFactory() interface{} {
  return new(Foo)
}

// the handler for the task will be passed the appengine context, request, datastore key and entity
func foohandler(c context.Context, r *http.Request, key *datastore.Key, entity locker.Lockable) error {
  foo := entity.(*Foo)

  switch foo.Sequence {
    case 1:
      // step 1 processing, e.g. charge credit card
      // schedule another task to follow this one:
      return l.Schedule(c, key, entity, "/task/handler/url", nil)
    case 2:
      // step 2 processing, e.g. send confirmation email
      // mark the task as completed (to prevent the last task re-executing)
      return l.Complete(c, key, entity)
  }

  // returning an error would cause the task to be failed and retried (normal task semantics)
  // a configurable number of retries can be set to prevent endless attempts from happening
  return nil
}

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrLockFailed signals that a lock attempt failed and the task trying to
	// aquire it should be retried. Using ServiceUnavailable (503) causes a task
	// retry without flooding the logs with visible errors.
	ErrLockFailed = Error{http.StatusServiceUnavailable, "lock failed (retry)"}

	// ErrTaskExpired signals that the task's sequence no is behind the sequence
	// stored on the entity. This means processing has already moved on and this
	// task should be dropped. This is likely caused by a spurious task re-execution.
	// Using OK (200) causes a task to be marked as successful so it won't be retried.
	ErrTaskExpired = Error{http.StatusOK, "task expired (abandon)"}

	// ErrTaskFailed signals that the task has failed permanently (tried more than
	// the MaxRetries allowed) so should be abandoned.
	// Using OK (200) causes a task to be marked as successful so it won't be retried.
	ErrTaskFailed = Error{http.StatusOK, "task failed permanently (abandon)"}
)

Functions

func AlertOnFailure

func AlertOnFailure(l *Locker) error

AlertOnFailure sets the config setting for a locker

func AlertOnOverwrite

func AlertOnOverwrite(l *Locker) error

AlertOnOverwrite sets the config setting for a locker

func DefaultQueue

func DefaultQueue(queue string) func(*Locker) error

DefaultQueue sets the config setting for a locker

func Host

func Host(host string) func(*Locker) error

Host sets the config setting for a locker

func LeaseDuration

func LeaseDuration(duration time.Duration) func(*Locker) error

LeaseDuration sets the config setting for a locker

func LeaseTimeout

func LeaseTimeout(duration time.Duration) func(*Locker) error

LeaseTimeout sets the config setting for a locker

func LogVerbose

func LogVerbose(l *Locker) error

LogVerbose sets the config setting for a locker

func MaxRetries

func MaxRetries(retries int) func(*Locker) error

MaxRetries sets the config setting for a locker

func QueueFromContext

func QueueFromContext(c context.Context) (string, bool)

QueueFromContext retrieves the per-request queue from context

func WithQueue

func WithQueue(c context.Context, queue string) context.Context

WithQueue creates a new context with the queue value

Types

type EntityFactory

type EntityFactory func() Lockable

EntityFactory creates a new entity instance for use by the lock handler

type Error

type Error struct {
	Response int
	// contains filtered or unexported fields
}

Error is a custom error that includes the recommended http response to return to control task completion / re-attempts.

func (Error) Error

func (e Error) Error() string

type Lock

type Lock struct {
	// Timestamp is the time that this lock was written
	Timestamp time.Time `datastore:"lock_ts"`

	// Request is the request id that obtained the current lock
	RequestID string `datastore:"lock_req,noindex"`

	// Sequence is the task sequence number
	Sequence int `datastore:"lock_seq,noindex"`

	// Retries is the number of retries that have been attempted
	Retries int `datastore:"lock_try,noindex"`
}

Lock adds additional information to a datastore entity used to ensure that only a single instance of a task can execute and a sequence of tasks will execute in the correct order.

The appengine taskqueue guaranteed at-least-once task execution so we need to try to detect and prevent spurious re-execution. At the same time, these repeat executions may be necessary if a task has died or disconnected and left things in an unknown state - the lock information can be used to avoid deadlocks by allowing a lease to timeout in a controlled manner.

This lock struct should be embedded within the entity:

MyEntity struct {
    locker.Lock
    Value           string `datastore:"value"`
}

func (*Lock) Complete

func (l *Lock) Complete()

Complete sets the lock properties to indicate it is complete as a safeguard in-case the last task is repeated. It should be called if writing the entity status within the app.

type Lockable

type Lockable interface {

	// Complete marks the current lock as complete
	Complete()
	// contains filtered or unexported methods
}

Lockable is the interface that lockable entities must implement they will do this automatically simply by embedding lock in the struct This is used to ensure than entities we deal with have our Lock struct embedded and gives us a way to access it

type Locker

type Locker struct {
	// Once a lock has been held longer than this duration the logs API
	// will be checked to determine if the request has completed or not
	LeaseDuration time.Duration

	// On rare occassions entries may be missing from the logs so if a
	// lock has been held for more than this duration we assume that the
	// task has died. 10 mins is the task timeout on a frontend instance.
	LeaseTimeout time.Duration

	// MaxRetries is the maximum number of retries to allow
	MaxRetries int

	// LogVerbose sets verbose logging of lock operations
	LogVerbose bool

	// AlertOnFailure will set an alert email to be sent to admins if
	// a task fails permanently (more than the MaxRetries reached)
	AlertOnFailure bool

	// AlertOnOverwrite will set an alert email to be sent to admins
	// if a lock is being overwritten. This is normally an exceptional
	// situation but may investigation to ensure correct operation of
	// the system
	AlertOnOverwrite bool

	// DefaultQueue is the name of the task-queue to schedule tasks on.
	// The default (empty string) is to use the default task queue.
	DefaultQueue string

	// Host is the name of the host to schedule tasks to run on.
	// This is better controlled using the target property of the taskqueue
	// but when running in the dev environment that doesn't work so this
	// provides a way to override it
	Host string
}

Locker is the instance that stores configuration

func NewLocker

func NewLocker(options ...Option) (*Locker, error)

NewLocker creates a new configured Locker instance

func (*Locker) Aquire

func (l *Locker) Aquire(c context.Context, key *datastore.Key, entity Lockable, sequence int) error

Aquire attempts to get and lock an entity with the given identifier If successful it will write a new lock entity to the datastore and return nil, otherwise it will return an error to indicate the reason for failure.

func (*Locker) Complete

func (l *Locker) Complete(c context.Context, key *datastore.Key, entity Lockable) error

Complete marks a task as completed

func (*Locker) Handle

func (l *Locker) Handle(handler TaskHandler, factory EntityFactory) http.Handler

Handle wraps a task handler with task / lock processing

func (*Locker) NewTask

func (l *Locker) NewTask(key *datastore.Key, entity Lockable, path string, params url.Values) *taskqueue.Task

NewTask creates a new taskqueue.Task for the entity with the correct headers set to match those on the entity

func (*Locker) Parse

func (l *Locker) Parse(c context.Context, r *http.Request) (*datastore.Key, int, error)

Parse returns the namespace, datatore.Key, sequence and queue name from a task request

func (*Locker) Schedule

func (l *Locker) Schedule(c context.Context, key *datastore.Key, entity Lockable, path string, params url.Values) error

Schedule schedules a task with lock

type Option

type Option func(*Locker) error

Option is the signature for locker configuration options

type TaskHandler

type TaskHandler func(c context.Context, r *http.Request, key *datastore.Key, entity Lockable) error

TaskHandler is the signature of the task handler

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL