task

package module
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 26, 2015 License: MIT Imports: 5 Imported by: 1

README

task

License GoDoc Build Status Coverage

task is a Go utility package for simplifying execution of tasks.

Why

Go makes it easy to run your code concurrently. Many times you just want to run a group of tasks in series, concurrently, or even compose them in order to achieve more complex tasks. You also want to be able to cancel individual or a group of tasks, wait for them to finish executing, or store and share data with them.

This package draws inspiration from golang.org/x/net/context and github.com/gorilla/context.

Install

First, you need to install the package:

go get -u github.com/imkira/go-task

Then, you need to include it in your source:

import "github.com/imkira/go-task"

The package will be imported with task as name.

Documentation

For advanced usage, make sure to check the available documentation here.

How to Use

Concepts
  • Task: Task is an interface of something you want to run. It can only be run once.
  • Group: Group is a specialized type of of Task that run a group of tasks in a certain way. There are two types of Groups available: ConcurrentGroup and SerialGroup. They are for running tasks concurrently or in series, respectively.
  • Context: Context is an interface that allows you to store/retrieve data and share it with tasks.
Example: Creating a Task

You can create a Task by adopting the Task interface.

The following are the recommended 2 ways to do it.

Using NewTaskWithFunc (easier)
import "github.com/imkira/go-task"

func main() {
  // create the function to be run
  run := func(t task.Task, ctx task.Context) {
    // write your task below
    // ...
  }

  // create task passing it the function you want to run.
  t := task.NewTaskWithFunc(run)

  // run task (without context)
  t.Run(nil)
}
Embedding StandardTask in your struct
import "github.com/imkira/go-task"

type fooTask struct {
  // embed StandardTask which includes the basic framework for Tasks.
  *task.StandardTask

  // add fields below
  // ...
}

// You just need to implement the Run function like this.
func (ft *fooTask) Run(ctx Context) {
  // In case you embed StandardTask, like in this example, the following
  // Start and Finish logic is required.
  if err := ft.Start(); err != nil {
    return
  }
  // defer here is absolutely required. If this task panics, the parent tasks
  // will wait for this task forever.
  defer ft.Finish()

  // finally, write your task below
  // ...
}

func main() {
  t := &fooTask{
    StandardTask: task.NewStandardTask(),
  }

  // run task (without context)
  t.Run(nil)
}
Example: Creating a Task Group
Example: ConcurrentGroup

If you want to run tasks concurrently, you can do it like:

func main() {
  // Create a ConcurrentGroup. Don't forget, Groups are also Tasks.
  g1 := task.NewConcurrentGroup()

  // Calling AddChild with a Task adds it to the group.
  // Here, we add 3 tasks. Each print their name.
  g1.AddChild(newPrintTask("task1"))
  g1.AddChild(newPrintTask("task2"))
  g1.AddChild(newPrintTask("task3"))

  // Run all 3 tasks concurrently.
  // Although each task takes about 1 second to run, it should all still
  // take about second because they are run concurrently (and spend most of its
  // time just sleeping).
  g1.Run(nil)
}

func newPrintTask(str string) task.Task {
  run := func(t task.Task, ctx task.Context) {
    time.Sleep(1 * time.Second)
    fmt.Println(str)
  }
  return task.NewTaskWithFunc(run)
}

You can also control, for a given ConcurrentGroup, the maximum number of tasks you want to be executing simultaneously at any given moment. You just need to set the MaxConcurrency property of the ConcurrentGroup before running it.

  g1 := task.NewConcurrentGroup()
  g1.MaxConcurrency = 100

For a more elaborate example please check examples/maxconcurrency.go.

Example: SerialGroup

If you want to run tasks in series, you can do it like:

func main() {
  // Create a SerialGroup. Don't forget, Groups are also Tasks.
  g1 := task.NewSerialGroup()

  // Calling AddChild with a Task adds it to the group.
  // Here, we add 3 tasks. Each print their name.
  g1.AddChild(newPrintTask("task1"))
  g1.AddChild(newPrintTask("task2"))
  g1.AddChild(newPrintTask("task3"))

  // Run all 3 tasks in series.
  // Since each task takes about 1 second to execute, it should all take
  // about 3 seconds.
  g1.Run(nil)
}

func newPrintTask(str string) task.Task {
  run := func(t task.Task, ctx task.Context) {
    time.Sleep(1 * time.Second)
    fmt.Println(str)
  }
  return task.NewTaskWithFunc(run)
}
Example: Task Trees

Because Groups are also Tasks, you can compose tasks into trees of tasks. Please check examples/tree.go for an example on how to do it.

Example: Waiting for Tasks

Suppose you want to know when a particular task finished executing.

You can do it for individual Tasks or for task Groups.

  // create t1, t2, t3 tasks
  // ...

  // here we create a ConcurrentGroup and we add 3 tasks: t1, t2, and t3.
  g1 := task.NewConcurrentGroup()
  g1.AddChild(t1)
  g1.AddChild(t2)
  g1.AddChild(t3)

  // run the group in a separate goroutine so it doesn't block here
  go g1.Run(nil)

  // wait for Task t2 to finish
  err1 := t2.Wait()
  // err1 will be returned if Task t2 is cancelled.

  // wait for the whole Group g1 to finish
  err2 := g1.Wait()
  // err2 will be returned if Task t2 is cancelled.
Example: Cancelling Tasks

Sometimes you want to abort execution of a task. In case of a group, if a task is cancelled, the group gets cancelled too as soon as possible.

Example: Cancelling Tasks Explicitely
  // create longTask
  // ...

  // run a long task
  go longTask.Run(nil)

  // cancel task 1 second later
  time.AfterFunc(time.Second, func() {
    // Cancel task with custom error.
    // You can also pass nil (it will become task.ErrTaskCancelled).
    longTask.Cancel(fooError)
  })

  // wait for longTask to finish executing
  err := longTask.Wait()
  // err will be the same as fooError if the task was cancelled
Example: Cancelling Tasks with Timeouts
  // create longTask
  // ...

  // set a maximum duration of 3 seconds for this task.
  longTask.SetTimeout(3 * time.Second)

  // run and wait for a long task
  longTask.Run(nil)

  err := longTask.Err()
  // err will be the ErrTaskDeadlineExceeded if the task took more than 3
  // seconds to execute.
Example: Cancelling Tasks with Deadlines
  // create longTask
  // ...

  // set a maximum duration of 3 seconds for this task.
  task2.SetDeadline(time.Now().Add(3 * time.Second))

  // run and wait for a long task
  longTask.Run(nil)

  err := longTask.Err()
  // err will be the ErrTaskDeadlineExceeded if the task took more than 3
  // seconds to execute.

Please note this example is "almost" equivalent to the previous. But there is actually a big difference: With SetTimeout, the deadline is automatically set right when the task is about to run. With SetDeadline, the deadline is set the moment you call SetDeadline. For this reason, you should use SetTimeout for task duration restrictions (e.g., the task should not take more than X time), and SetDeadline for absolute date/time restrictions (e.g., the task should finish before midnight).

Example: Cancelling Tasks with Signals
  // create longTask
  // ...

  // cancel task if SIGTERM and SIGINT (ctrl-c) is detected
  task.SetSignals(syscall.SIGINT, syscall.SIGTERM)

  // run and wait for a long task
  longTask.Run(nil)

  err := longTask.Err()
  // err will be the ErrTaskSignalReceived if a signal is received while the
  // task is running.
Example: Sharing Data with Contexts

Contexts are useful for sharing data with tasks.

func main() {
  // create a Context object
  ctx := task.NewContext()

  // put what you want inside
  ctx.Set("foo", "bar")
  ctx.Set("true", true)
  ctx.Set("one", 1)
  ctx.Set("pi", 3.14)

  // create task that prints the context
  t := newPrintContextTask()

  // run it using the context
  t.Run(ctx)
}

func newPrintContextTask() task.Task {
  run := func(t task.Task, ctx task.Context) {
    fmt.Println(ctx.Get("foo"))
    fmt.Println(ctx.Get("true"))
    fmt.Println(ctx.Get("one"))
    fmt.Println(ctx.Get("pi"))
  }
  return task.NewTaskWithFunc(run)
}
More Examples

For more elaborate examples please check examples.

Contribute

Found a bug? Want to contribute and add a new feature?

Please fork this project and send me a pull request!

License

go-task is licensed under the MIT license:

www.opensource.org/licenses/MIT

Copyright (c) 2015 Mario Freitas. See LICENSE.txt for further details.

Documentation

Overview

Package task is an utility package for simplifying execution of tasks.

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrTaskCancelled happens when the task can be cancelled (it is in a
	// cancellable state) and is cancelled.
	ErrTaskCancelled = errors.New("task cancelled")

	// ErrTaskDeadlineExceeded happens when a timeout or deadline is defined for
	// the task, and it is triggered before the task finishes executing.
	ErrTaskDeadlineExceeded = errors.New("task deadline exceeded")

	// ErrTaskSignalReceived happens when one or more signals defined in the task
	// are triggered by the OS, before the task finishes executing.
	ErrTaskSignalReceived = errors.New("task received signal")

	// ErrInvalidTaskState happens when the task tries to do to an invalid state
	// transition (eg., starting or finishing the same task twice, etc...).
	ErrInvalidTaskState = errors.New("invalid task state")
)

Functions

This section is empty.

Types

type ConcurrentGroup

type ConcurrentGroup struct {
	*StandardTask

	// MaxConcurrency is a limit to the maximum number of goroutines that can
	// execute simultaneously. If you set it above 0 this limit is enforced,
	// otherwise it is unlimited. By default, it is set to 0 (unlimited).
	MaxConcurrency int
}

ConcurrentGroup is a task that runs its sub-tasks concurrently. ConcurrentGroup implements the Task interface so you can compose it with other tasks.

func NewConcurrentGroup

func NewConcurrentGroup() *ConcurrentGroup

NewConcurrentGroup creates a new ConcurrentGroup.

func (*ConcurrentGroup) Run

func (g *ConcurrentGroup) Run(ctx Context)

Run runs all tasks concurrently, according to the maximum concurrency setting, and waits until all of them are done. On the first error returned by one of the child tasks, the group will be cancelled. If, in any way, the group is cancelled, no more child tasks will be run. This function will always wait until no more child tasks in the group are running.

type Context

type Context interface {
	// Get gets the value associated to the given key, or nil if there is none.
	Get(key string) interface{}

	// GetOk gets the value, and a boolean indicating the existence
	// of such key. When the key exists (value, true) will be returned, otherwise
	// (nil, false) will be returned.
	GetOk(key string) (interface{}, bool)

	// Set sets the given value in the given key.
	Set(key string, value interface{})

	// Delete deletes the given key.
	Delete(key string)
}

Context is an interface for helping you share data between tasks.

func NewContext

func NewContext() Context

NewContext creates a new Context.

type Func

type Func func(task Task, ctx Context)

Func is a function that implements the Run function of a task.

type SerialGroup

type SerialGroup struct {
	*StandardTask
}

SerialGroup is a task that runs its sub-tasks serially (in sequence). SerialGroup implements the Task interface so you can compose it with other tasks.

func NewSerialGroup

func NewSerialGroup() *SerialGroup

NewSerialGroup creates a new SerialGroup.

func (*SerialGroup) Run

func (g *SerialGroup) Run(ctx Context)

Run runs all tasks serially and waits until all of them are done. On the first error returned by one of the tasks, no more child tasks are run, and the group is cancelled allowing currently executing tasks to cleanup and finish.

type StandardTask

type StandardTask struct {
	// contains filtered or unexported fields
}

StandardTask is an interface that helps you more easily implement Tasks. StandardTask implements all functionality required by the Task interface, with the exception of the Run function which is not included. This makes it ideal for embedding StandardTask in your struct and then implementing the required Run function.

func NewStandardTask

func NewStandardTask() *StandardTask

NewStandardTask creates a new StandardTask object.

func (*StandardTask) AddChild

func (st *StandardTask) AddChild(child Task)

AddChild adds a child task to this task.

func (*StandardTask) Cancel

func (st *StandardTask) Cancel(err error)

Cancel cancels the task with an error. If nil is passed, the default ErrTaskCancelled is used instead.

func (*StandardTask) Cancelled

func (st *StandardTask) Cancelled() <-chan struct{}

Cancelled returns a channel that's closed when the task is cancelled.

func (*StandardTask) Err

func (st *StandardTask) Err() error

Err returns the error (if any) the task was cancelled with.

func (*StandardTask) Finish

func (st *StandardTask) Finish() error

Finish finishes the task after being used.

func (*StandardTask) Finished

func (st *StandardTask) Finished() <-chan struct{}

Finished returns a channel that's closed when the task is finished.

func (*StandardTask) SetDeadline

func (st *StandardTask) SetDeadline(t time.Time)

SetDeadline sets the time after which the task should be automatically cancelled.

func (*StandardTask) SetSignals

func (st *StandardTask) SetSignals(s ...os.Signal)

SetSignals sets the list of signals that should cancel this task should they be triggered by the OS. Please note that an additional goroutine will be created in order to detect the signals and cancel the task.

func (*StandardTask) SetTimeout

func (st *StandardTask) SetTimeout(d time.Duration)

SetTimeout sets the duration after which the task should be automatically cancelled.

func (*StandardTask) Start

func (st *StandardTask) Start() error

Start prepares the task before being used.

func (*StandardTask) Started

func (st *StandardTask) Started() <-chan struct{}

Started returns a channel that's closed when the task is started.

func (*StandardTask) Wait

func (st *StandardTask) Wait() error

Wait waits for the task to finish executing, and returns the resulting error (if any). Wait also returns immediately if the task is cancelled before even starting execution.

type Task

type Task interface {
	// AddChild adds a child task to this task.
	AddChild(child Task)

	// SetDeadline sets the time after which the task should be
	// automatically cancelled.
	SetDeadline(t time.Time)

	// SetTimeout sets the duration after which the task should be
	// automatically cancelled.
	SetTimeout(d time.Duration)

	// SetSignals sets the list of signals that should cancel this task should
	// they be triggered by the OS. Please note that an additional goroutine will
	// be created in order to detect the signals and cancel the task.
	SetSignals(s ...os.Signal)

	// Run runs the task.
	Run(ctx Context)

	// Start prepares the task before being used.
	Start() error

	// Started returns a channel that's closed when the task is started.
	Started() <-chan struct{}

	// Cancel cancels the task with an error. If nil is passed, the default
	// ErrTaskCancelled is used instead.
	Cancel(error)

	// Cancelled returns a channel that's closed when the task is cancelled.
	Cancelled() <-chan struct{}

	// Finish finishes the task after being used.
	Finish() error

	// Finished returns a channel that's closed when the task is finished.
	Finished() <-chan struct{}

	// Wait waits for the task to finish executing, and returns the resulting
	// error (if any). Wait also returns immediately if the task is cancelled
	// before even starting execution.
	Wait() error

	// Err returns the error (if any) the task was cancelled with.
	Err() error
}

Task is an interface that helps you define, run and control a task. You can set deadlines and timeouts, you can cancel it, or wait for the it until it finishes execution or is canceled before starting.

func NewTaskWithFunc

func NewTaskWithFunc(run Func) Task

NewTaskWithFunc takes a function and wraps it in a Task.

Directories

Path Synopsis
Execute 1000 tasks concurrently but limit maximum concurrency to 100 simultaneous goroutines.
Execute 1000 tasks concurrently but limit maximum concurrency to 100 simultaneous goroutines.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL