tasks

package module
v1.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 16, 2024 License: MIT Imports: 5 Imported by: 22

README

Tasks

codecov Go Report Card PkgGoDev

Package tasks is an easy to use in-process scheduler for recurring tasks in Go. Tasks is focused on high frequency tasks that run quick, and often. The goal of Tasks is to support concurrent running tasks at scale without scheduler induced jitter.

Tasks is focused on accuracy of task execution. To do this each task is called within it's own goroutine. This ensures that long execution of a single invocation does not throw the schedule as a whole off track.

For simplicity this task scheduler uses the time.Duration type to specify intervals. This allows for a simple interface and flexible control over when tasks are executed.

Key Features

  • Concurrent Execution: Tasks are executed in their own goroutines, ensuring accurate scheduling even when individual tasks take longer to complete.
  • Optimized Goroutine Scheduling: Tasks leverages Go's time.AfterFunc() function to reduce sleeping goroutines and optimize CPU scheduling.
  • Flexible Task Intervals: Tasks uses the time.Duration type to specify intervals, offering a simple interface and flexible control over task execution timing.
  • Delayed Task Start: Schedule tasks to start at a later time by specifying a start time, allowing for greater control over task execution.
  • One-Time Tasks: Schedule tasks to run only once by setting the RunOnce flag, ideal for single-use tasks or one-time actions.
  • Custom Error Handling: Define a custom error handling function to handle errors returned by tasks, enabling tailored error handling logic.

Usage

Here are some examples to help you get started with Tasks:

Basic Usage
// Start the Scheduler
scheduler := tasks.New()
defer scheduler.Stop()

// Add a task
id, err := scheduler.Add(&tasks.Task{
  Interval: 30 * time.Second,
  TaskFunc: func() error {
    // Put your logic here
  },
})
if err != nil {
  // Do Stuff
}
Delayed Scheduling

Sometimes schedules need to started at a later time. This package provides the ability to start a task only after a certain time. The below example shows this in practice.

// Add a recurring task for every 30 days, starting 30 days from now
id, err := scheduler.Add(&tasks.Task{
  Interval: 30 * (24 * time.Hour),
  StartAfter: time.Now().Add(30 * (24 * time.Hour)),
  TaskFunc: func() error {
    // Put your logic here
  },
})
if err != nil {
  // Do Stuff
}
One-Time Tasks

It is also common for applications to run a task only once. The below example shows scheduling a task to run only once after waiting for 60 seconds.

// Add a one time only task for 60 seconds from now
id, err := scheduler.Add(&tasks.Task{
  Interval: 60 * time.Second,
  RunOnce:  true,
  TaskFunc: func() error {
    // Put your logic here
  },
})
if err != nil {
  // Do Stuff
}
Custom Error Handling

One powerful feature of Tasks is that it allows users to specify custom error handling. This is done by allowing users to define a function that is called when a task returns an error. The below example shows scheduling a task that logs when an error occurs.

// Add a task with custom error handling
id, err := scheduler.Add(&tasks.Task{
  Interval: 30 * time.Second,
  TaskFunc: func() error {
    // Put your logic here
  },
  ErrFunc: func(e error) {
    log.Printf("An error occurred when executing task %s - %s", id, e)
  },
})
if err != nil {
  // Do Stuff
}

For more details on usage, see the GoDoc.

Contributing

Contributions are welcome! Please see CONTRIBUTING.md for more details.

Documentation

Overview

Package tasks is an easy to use in-process scheduler for recurring tasks in Go. Tasks is focused on high frequency tasks that run quick, and often. The goal of Tasks is to support concurrent running tasks at scale without scheduler induced jitter.

Tasks is focused on accuracy of task execution. To do this each task is called within it's own goroutine. This ensures that long execution of a single invocation does not throw the schedule as a whole off track.

As usage of this scheduler scales, it is expected to have a larger number of sleeping goroutines. As it is designed to leverage Go's ability to optimize goroutine CPU scheduling.

For simplicity this task scheduler uses the time.Duration type to specify intervals. This allows for a simple interface and flexible control over when tasks are executed.

Below is an example of starting the scheduler and registering a new task that runs every 30 seconds.

// Start the Scheduler
scheduler := tasks.New()
defer scheduler.Stop()

// Add a task
id, err := scheduler.Add(&tasks.Task{
	Interval: time.Duration(30 * time.Second),
	TaskFunc: func() error {
		// Put your logic here
	},
})
if err != nil {
	// Do Stuff
}

Sometimes schedules need to started at a later time. This package provides the ability to start a task only after a certain time. The below example shows this in practice.

// Add a recurring task for every 30 days, starting 30 days from now
id, err := scheduler.Add(&tasks.Task{
	Interval: time.Duration(30 * (24 * time.Hour)),
	StartAfter: time.Now().Add(30 * (24 * time.Hour)),
	TaskFunc: func() error {
		// Put your logic here
	},
})
if err != nil {
	// Do Stuff
}

It is also common for applications to run a task only once. The below example shows scheduling a task to run only once after waiting for 60 seconds.

// Add a one time only task for 60 seconds from now
id, err := scheduler.Add(&tasks.Task{
	Interval: time.Duration(60 * time.Second)
	RunOnce:  true,
	TaskFunc: func() error {
		// Put your logic here
	},
})
if err != nil {
	// Do Stuff
}

One powerful feature of Tasks is that it allows users to specify custom error handling. This is done by allowing users to define a function that is called when a task returns an error. The below example shows scheduling a task that logs when an error occurs.

// Add a task with custom error handling
id, err := scheduler.Add(&tasks.Task{
	Interval: time.Duration(30 * time.Second),
	TaskFunc: func() error {
		// Put your logic here
	}(),
	ErrFunc: func(e error) {
		log.Printf("An error occurred when executing task %s - %s", id, e)
	},
})
if err != nil {
	// Do Stuff
}

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrIDInUse is returned when a Task ID is specified but already used.
	ErrIDInUse = fmt.Errorf("ID already used")
)

Functions

This section is empty.

Types

type Scheduler

type Scheduler struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

Scheduler stores the internal task list and provides an interface for task management.

func New

func New() *Scheduler

New will create a new scheduler instance that allows users to create and manage tasks.

func (*Scheduler) Add

func (schd *Scheduler) Add(t *Task) (string, error)

Add will add a task to the task list and schedule it. Once added, tasks will wait the defined time interval and then execute. This means a task with a 15 second interval will be triggered 15 seconds after Add is complete. Not before or after (excluding typical machine time jitter).

// Add a task
id, err := scheduler.Add(&tasks.Task{
	Interval: time.Duration(30 * time.Second),
	TaskFunc: func() error {
		// Put your logic here
	}(),
	ErrFunc: func(err error) {
		// Put custom error handling here
	}(),
})
if err != nil {
	// Do stuff
}

func (*Scheduler) AddWithID added in v1.0.1

func (schd *Scheduler) AddWithID(id string, t *Task) error

AddWithID will add a task with an ID to the task list and schedule it. It will return an error if the ID is in-use. Once added, tasks will wait the defined time interval and then execute. This means a task with a 15 second interval will be triggered 15 seconds after Add is complete. Not before or after (excluding typical machine time jitter).

// Add a task
id := xid.New()
err := scheduler.AddWithID(id, &tasks.Task{
	Interval: time.Duration(30 * time.Second),
	TaskFunc: func() error {
		// Put your logic here
	}(),
	ErrFunc: func(err error) {
		// Put custom error handling here
	}(),
})
if err != nil {
	// Do stuff
}

func (*Scheduler) Del

func (schd *Scheduler) Del(name string)

Del will unschedule the specified task and remove it from the task list. Deletion will prevent future invocations of a task, but not interrupt a trigged task.

func (*Scheduler) Lookup

func (schd *Scheduler) Lookup(name string) (*Task, error)

Lookup will find the specified task from the internal task list using the task ID provided.

The returned task should be treated as read-only, and not modified outside of this package. Doing so, may cause panics.

func (*Scheduler) Stop

func (schd *Scheduler) Stop()

Stop is used to unschedule and delete all tasks owned by the scheduler instance.

func (*Scheduler) Tasks

func (schd *Scheduler) Tasks() map[string]*Task

Tasks is used to return a copy of the internal tasks map.

The returned task should be treated as read-only, and not modified outside of this package. Doing so, may cause panics.

type Task

type Task struct {
	sync.RWMutex

	// TaskContext allows for user-defined context that is passed to task functions.
	TaskContext TaskContext

	// Interval is the frequency that the task executes. Defining this at 30 seconds, will result in a task that
	// runs every 30 seconds.
	//
	// The below are common examples to get started with.
	//
	//  // Every 30 seconds
	//  time.Duration(30 * time.Second)
	//  // Every 5 minutes
	//  time.Duration(5 * time.Minute)
	//  // Every 12 hours
	//  time.Duration(12 * time.Hour)
	//  // Every 30 days
	//  time.Duration(30 * (24 * time.Hour))
	//
	Interval time.Duration

	// RunOnce is used to set this task as a single execution task. By default, tasks will continue executing at
	// the interval specified until deleted. With RunOnce enabled the first execution of the task will result in
	// the task self deleting.
	RunOnce bool

	// RunSingleInstance is used to set a task as a single instance task. By default, tasks will continue executing at
	// the interval specified until deleted. With RunSingleInstance enabled a subsequent task execution will be skipped
	// if the previous task execution is still running.
	//
	// This is useful for tasks that may take longer than the interval to execute. This will prevent multiple instances
	// of the same task from running concurrently.
	RunSingleInstance bool

	// StartAfter is used to specify a start time for the scheduler. When set, tasks will wait for the specified
	// time to start the schedule timer.
	StartAfter time.Time

	// TaskFunc is the user defined function to execute as part of this task.
	//
	// Either TaskFunc or FuncWithTaskContext must be defined. If both are defined, FuncWithTaskContext will be used.
	TaskFunc func() error

	// ErrFunc allows users to define a function that is called when tasks return an error. If ErrFunc is nil,
	// errors from tasks will be ignored.
	//
	// Either ErrFunc or ErrFuncWithTaskContext must be defined. If both are defined, ErrFuncWithTaskContext will be used.
	ErrFunc func(error)

	// FuncWithTaskContext is a user defined function to execute as part of this task. This function is used in
	// place of TaskFunc with the difference in that it will pass the user defined context from the Task configurations.
	//
	// Either TaskFunc or FuncWithTaskContext must be defined. If both are defined, FuncWithTaskContext will be used.
	FuncWithTaskContext func(TaskContext) error

	// ErrFuncWithTaskContext allows users to define a function that is called when tasks return an error.
	// If ErrFunc is nil, errors from tasks will be ignored. This function is used in place of ErrFunc with
	// the difference in that it will pass the user defined context from the Task configurations.
	//
	// Either ErrFunc or ErrFuncWithTaskContext must be defined. If both are defined, ErrFuncWithTaskContext will be used.
	ErrFuncWithTaskContext func(TaskContext, error)
	// contains filtered or unexported fields
}

Task contains the scheduled task details and control mechanisms. This struct is used during the creation of tasks. It allows users to control how and when tasks are executed.

func (*Task) Clone added in v1.1.0

func (t *Task) Clone() *Task

Clone will create a copy of the existing task. This is useful for creating a new task with the same properties as an existing task. It is also used internally when creating a new task.

type TaskContext added in v1.1.0

type TaskContext struct {
	// Context is a user-defined context.
	Context context.Context
	// contains filtered or unexported fields
}

func (TaskContext) ID added in v1.1.0

func (ctx TaskContext) ID() string

ID will return the task ID. This is the same as the ID generated by the scheduler when adding a task. If the task was added with AddWithID, this will be the same as the ID provided.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL