taskrunner

package module
v0.0.0-...-043c1d6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 23, 2017 License: MIT Imports: 7 Imported by: 0

README

Taskrunner GoDoc Build Status Go Report Card

Background

Package taskrunner provides an API for running concurrent tasks in a promise-like style without having to deal with concurrency directly.

The library provides an interface that must be implemented to run a Task concurrently and a concurrency-safe API for running tasks concurrently without managing the channels, goroutines and waitgroups yourself.

Note that using the library is generally less efficient than using Go's concurrency primitives directly. This library hopes to provide code reuse with access to concurrency without having to reimplement the same channel patterns over and over again.

Using TaskRunner

To use Taskrunner, you'll need some setup. Define a struct that implements the Task interface to use as the payload to Run.

One example of a naive Task could be something like sending emails.

type EmailPayload struct {
    Email string
    Message string
}

func (p *EmailPayload) Task(ctx context.Context) (interface{}, error) {
    if err := p.sendEmail(); err != nil {
        return nil, err
    }

    return nil, nil
}

func (p *EmailPayload) sendEmail() error {
    // send email
    // ...
    // check and return errors
}

Once you've implemented the Task, running the Task is as simple as creating a new Taskrunner, starting it and running your Task.

Run returns a function closure over the result of your Task so that you can retrieve it at a later time. Run accepts a context and passes it to your task so that you can control the deadline of your Task and even pass request-scoped items via the context.

// Configure the number of workers using a functional option.
runner, err := taskrunner.NewTaskRunner(taskrunner.OptionMaxGoroutines(runtime.NUMCPU + 1))
if err != nil {
    panic(err.Error())
}

// Start the runner.
if err := runner.Start(); err != nil {
    panic(err.Error())
}

// Get your promise.
promise := runner.Run(context.TODO(), &EmailPayload{
    Email: "...",
    Message: "...",
}})

// Check the result of your promise.
if _, err := promise(); err != nil {
    log.Errorf("sending email failed - err=%+v", err)
}

Examples

See the examples directory for some codified example usages of taskrunner.

  • helloworld shows how to run simple Tasks with no return value.
  • helloworld shows how to run simple Tasks with a return value and possible error state. Shows how to use the promises and how to perform a type assertion to safely check the return value.
  • expvar includes an example of accessing the metric hooks available via the Option functions and uses expvar to demonstrate.

Documentation

Overview

Package taskrunner provides an API for running concurrent tasks in a promise-like style without having to deal with concurrency directly.

The library provides an interface that must be implemented to run a Task concurrently and a concurrency-safe API for running tasks concurrently without managing the channels, goroutines and waitgroups yourself.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Option

type Option func(*TaskRunner) error

Option is a function that allows configuration of unexported TaskRunner fields. Allows the package to validate input parameters so that a TaskRunner cannot be incorrectly created.

func OptionMaxGoroutines

func OptionMaxGoroutines(n int) Option

OptionMaxGoroutines is a functional option for configuring the number of workers in a TaskRunner.

func OptionTaskCounter

func OptionTaskCounter(ctr metrics.Counter) Option

OptionTaskCounter allows access to the a metrics.Counter which aggregates the number of tasks processed.

func OptionTaskTimeHistogram

func OptionTaskTimeHistogram(histogram metrics.Histogram) Option

OptionTaskTimeHistogram allows access to customize a histogram for sampling average task times.

func OptionUnhandledPromisesGauge

func OptionUnhandledPromisesGauge(gauge metrics.Gauge) Option

OptionUnhandledPromisesGauge allows a go-kit metrics.Gauge to be passed-in collect the number of unhandled promises. Useful to discover if there is a leak of unhandled promises in-memory.

func OptionWorkersGauge

func OptionWorkersGauge(gauge metrics.Gauge) Option

OptionWorkersGauge allows access to the current number of workers via a go-kit metrics.Gauge.

type Promise

type Promise func() (interface{}, error)

Promise is a function that returns the result of an asynchronous task.

type Task

type Task interface {
	Task(context.Context) (interface{}, error)
}

Task is an interface for performing a given task. Task self-describes how the job is to be handled by the Task. Returns an error to report task completion.

type TaskRunner

type TaskRunner struct {
	// contains filtered or unexported fields
}

TaskRunner is a Runner capable of concurrently running Tasks. Runs multiple goroutines to process Tasks concurrently.

func NewTaskRunner

func NewTaskRunner(options ...func(*TaskRunner) error) (*TaskRunner, error)

NewTaskRunner creates an TaskRunner. Provides functional options for configuring the TaskRunner while also validating the input configurations. Returns an error if the TaskRunner is improperly configured.

func (*TaskRunner) Run

func (p *TaskRunner) Run(ctx context.Context, w Task) Promise

Run gives the Task to an available worker. The given context is used as a hook to cancel a running worker task. Run returns a closure over the result of a Task. When the result of a Task is desired, you can call the function to retrieve the result.

func (*TaskRunner) Start

func (p *TaskRunner) Start() error

Start prepares the TaskRunner for processing tasks. Once started, a TaskRunner is ready to accept Tasks.

func (*TaskRunner) Stop

func (p *TaskRunner) Stop() error

Stop performs a graceful shutdown of the Runner. In-progress Tasks are given some time to finish before exitting.

Directories

Path Synopsis
examples

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL