worker

package module
v0.0.8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 19, 2023 License: MPL-2.0 Imports: 11 Imported by: 0

README

go-worker

Go CodeQL Go Report Card Go Reference

go-worker provides a simple way to manage and execute tasks concurrently and prioritized, leveraging a TaskManager that spawns a pool of workers. Each Task represents a function scheduled by priority.

Features

  • Task prioritization: You can register tasks with a priority level influencing the execution order.
  • Concurrent execution: Tasks are executed concurrently by a pool of workers.
  • Middleware: You can apply middleware to the TaskManager to add additional functionality.
  • Results: You can access the results of the tasks via the Results channel.
  • Rate limiting: You can rate limit the tasks schedule by setting a maximum number of jobs per second.
  • Cancellation: You can cancel Tasks before or while they are running.

API

Initialization

Create a new TaskManager by calling the NewTaskManager() function with the following parameters:

  • maxWorkers is the number of workers to start. If 0 is specified, it will default to the number of available CPUs
  • maxTasks is the maximum number of tasks that can be executed at once, defaults to 10
  • tasksPerSecond is the rate limit of tasks that can be executed per second, defaults to 1
  • timeout is the default timeout for tasks, defaults to 5 minute
  • retryDelay is the default delay between retries, defaults to 1 second
  • maxRetries is the default maximum number of retries, defaults to 3
tm := worker.NewTaskManager(4, 10, 5, time.Second*30, time.Second*30, 3)
Registering Tasks

Register new tasks by calling the RegisterTasks() method of the TaskManager struct and passing in a variadic number of tasks.

id := uuid.New()
task := worker.Task{
    ID:          id,
    Name:        "Some task",
    Description: "Here goes the description of the task",
    Priority:    10,
    Fn: func() (interface{}, error) {
        emptyFile, err := os.Create(path.Join("examples", "test", "res", fmt.Sprintf("1st__EmptyFile___%v.txt", j)))
        if err != nil {
            log.Fatal(err)
        }
        emptyFile.Close()
        time.Sleep(time.Second)
        return fmt.Sprintf("** task number %v with id %s executed", j, id), err
    },
    Retries:    10,
    RetryDelay: 3,
}

task2 := worker.Task{
    ID:       uuid.New(),
    Priority: 10,
    Fn:       func() (val interface{}, err error){ return "Hello, World!", err },
}

tm.RegisterTasks(context.Background(), task, task2)
Stopping the Task Manager

You can stop the task manager and its goroutines by calling the Stop() method of the TaskManager struct.

tm.Stop()
Results

The results of the tasks can be accessed via the Results channel of the TaskManager, calling the GetResults() method.

for result := range tm.GetResults() {
   // Do something with the result
}

Cancellation

You can cancel a Task by calling the CancelTask() method of the TaskManager struct and passing in the task ID as a parameter.

tm.CancelTask(task.ID)

You can cancel all tasks by calling the CancelAllTasks() method of the TaskManager struct.

tm.CancelAllTasks()
Middleware

You can apply middleware to the TaskManager by calling the RegisterMiddleware() function and passing in the TaskManager and the middleware functions.

tm = worker.RegisterMiddleware(tm,
    //middleware.YourMiddleware,
    func(next worker.Service) worker.Service {
        return middleware.NewLoggerMiddleware(next, logger)
    },
)
Example
package main

import (
    "context"
    "fmt"
    "time"

    "github.com/google/uuid"
    worker "github.com/hyp3rd/go-worker"
    "github.com/hyp3rd/go-worker/middleware"
)

func main() {
    tm := worker.NewTaskManager(4, 10, 5, time.Second*3, time.Second*30, 3)

    defer tm.Close()

    var srv worker.Service = tm
    // apply middleware in the same order as you want to execute them
    srv = worker.RegisterMiddleware(tm,
        // middleware.YourMiddleware,
        func(next worker.Service) worker.Service {
            return middleware.NewLoggerMiddleware(next, middleware.DefaultLogger())
        },
    )

    defer srv.Close()

    task := worker.Task{
        ID:       uuid.New(),
        Priority: 1,
        Fn: func() (val interface{}, err error) {
            return func(a int, b int) (val interface{}, err error) {
                return a + b, err
            }(2, 5)
        },
    }

    // Invalid task, it doesn't have a function
    task1 := worker.Task{
        ID:       uuid.New(),
        Priority: 10,
        // Fn:       func() (val interface{}, err error) { return "Hello, World from Task 1!", err },
    }

    task2 := worker.Task{
        ID:       uuid.New(),
        Priority: 5,
        Fn: func() (val interface{}, err error) {
            time.Sleep(time.Second * 2)
            return "Hello, World from Task 2!", err
        },
    }

    task3 := worker.Task{
        ID:       uuid.New(),
        Priority: 90,
        Fn: func() (val interface{}, err error) {
            // Simulate a long running task
            // time.Sleep(3 * time.Second)
            return "Hello, World from Task 3!", err
        },
    }

    task4 := worker.Task{
        ID:       uuid.New(),
        Priority: 150,
        Fn: func() (val interface{}, err error) {
            // Simulate a long running task
            time.Sleep(1 * time.Second)
            return "Hello, World from Task 4!", err
        },
    }

    srv.RegisterTasks(context.Background(), task, task1, task2, task3)

    srv.CancelTask(task3.ID)

    srv.RegisterTask(context.Background(), task4)

    // Print results
    for result := range srv.GetResults() {
        fmt.Println(result)
    }

    tasks := srv.GetTasks()
    for _, task := range tasks {
        fmt.Println(task)
    }
}

Conclusion

The worker package provides an efficient way to manage and execute tasks concurrently and with prioritization. The package is highly configurable and can be used in various scenarios.

License

This project is licensed under the MIT License - see the LICENSE file for details.

Documentation

Index

Constants

View Source
const (
	// ContextDeadlineReached means the context is past its deadline.
	ContextDeadlineReached = TaskStatus(1)
	// RateLimited means the number of concurrent tasks per second exceeded the maximum allowed.
	RateLimited = TaskStatus(2)
	// Cancelled means `CancelTask` was invked and the `Task` was cancelled.
	Cancelled = TaskStatus(3)
	// Failed means the `Task` failed.
	Failed = TaskStatus(4)
	// Queued means the `Task` is queued.
	Queued = TaskStatus(5)
	// Running means the `Task` is running.
	Running = TaskStatus(6)
	// Invalid means the `Task` is invalid.
	Invalid = TaskStatus(7)
	// Completed means the `Task` is completed.
	Completed = TaskStatus(8)
)

CancelReason values

  • 1: `ContextDeadlineReached`
  • 2: `RateLimited`
  • 3: `Cancelled`
  • 4: `Failed`
  • 5: `Queued`
  • 6: `Running`
  • 7: `Invalid`
  • 8: `Completed`
View Source
const (
	// DefaultMaxTasks is the default maximum number of tasks that can be executed at once
	DefaultMaxTasks = 10
	// DefaultTasksPerSecond is the default rate limit of tasks that can be executed per second
	DefaultTasksPerSecond = 5
	// DefaultTimeout is the default timeout for tasks
	DefaultTimeout = 5
	// DefaultRetryDelay is the default delay between retries
	DefaultRetryDelay = 1
	// DefaultMaxRetries is the default maximum number of retries
	DefaultMaxRetries = 3
)

Variables

View Source
var (
	// ErrInvalidTaskID is returned when a task has an invalid ID
	ErrInvalidTaskID = errors.New("invalid task id")
	// ErrInvalidTaskFunc is returned when a task has an invalid function
	ErrInvalidTaskFunc = errors.New("invalid task function")
	// ErrInvalidTaskContext is returned when a task has an invalid context
	ErrInvalidTaskContext = errors.New("invalid task context")
	// ErrTaskNotFound is returned when a task is not found
	ErrTaskNotFound = errors.New("task not found")
	// ErrTaskTimeout is returned when a task times out
	ErrTaskTimeout = errors.New("task timeout")
	// ErrTaskCancelled is returned when a task is cancelled
	ErrTaskCancelled = errors.New("task cancelled")
	// ErrTaskAlreadyStarted is returned when a task is already started
	ErrTaskAlreadyStarted = errors.New("task already started")
	// ErrTaskCompleted is returned when a task is already completed
	ErrTaskCompleted = errors.New("task completed")
)

Errors returned by the TaskManager

Functions

This section is empty.

Types

type Middleware

type Middleware func(Service) Service

Middleware describes a `Service` middleware.

type Result added in v0.0.4

type Result struct {
	Task   *Task       // the task that produced the result
	Result interface{} // the result of the task
	Error  error       // the error returned by the task
}

Result is a task result

func (*Result) String added in v0.0.4

func (r *Result) String() string

String returns a string representation of the result

type Service

type Service interface {
	// RegisterTask registers a new task to the worker
	RegisterTask(ctx context.Context, task Task) error
	// RegisterTasks registers multiple tasks to the worker
	RegisterTasks(ctx context.Context, tasks ...Task)
	// StartWorkers starts the task manager's workers
	StartWorkers()
	// Wait for all tasks to finish
	Wait(timeout time.Duration)
	// Stop the task manage
	Stop()
	// CancelAll cancels all tasks
	CancelAll()
	// CancelTask cancels a task by its ID
	CancelTask(id uuid.UUID)
	// GetActiveTasks returns the number of active tasks
	GetActiveTasks() int
	// StreamResults streams the `Result` channel
	StreamResults() <-chan Result
	// GetResults retruns the `Result` channel
	GetResults() []Result
	// GetCancelledTasks gets the cancelled tasks channel
	GetCancelledTasks() <-chan Task
	// GetTask gets a task by its ID
	GetTask(id uuid.UUID) (task *Task, err error)
	// GetTasks gets all tasks
	GetTasks() []Task
	// ExecuteTask executes a task given its ID and returns the result
	ExecuteTask(id uuid.UUID, timeout time.Duration) (interface{}, error)
}

Service is an interface for a task manager

func RegisterMiddleware

func RegisterMiddleware(svc Service, mw ...Middleware) Service

RegisterMiddleware registers middlewares to the `Service`.

type Task

type Task struct {
	ID          uuid.UUID          `json:"id"`          // ID is the id of the task
	Name        string             `json:"name"`        // Name is the name of the task
	Description string             `json:"description"` // Description is the description of the task
	Priority    int                `json:"priority"`    // Priority is the priority of the task
	Execute     TaskFunc           `json:"-"`           // Execute is the function that will be executed by the task
	Ctx         context.Context    `json:"context"`     // Ctx is the context of the task
	CancelFunc  context.CancelFunc `json:"-"`           // CancelFunc is the cancel function of the task
	Status      TaskStatus         `json:"task_status"` // TaskStatus is stores the status of the task
	Result      atomic.Value       `json:"result"`      // Result is the result of the task
	Error       atomic.Value       `json:"error"`       // Error is the error of the task
	Started     atomic.Int64       `json:"started"`     // Started is the time the task started
	Completed   atomic.Int64       `json:"completed"`   // Completed is the time the task completed
	Cancelled   atomic.Int64       `json:"cancelled"`   // Cancelled is the time the task was cancelled
	Retries     int                `json:"retries"`     // Retries is the maximum number of retries for failed tasks
	RetryDelay  time.Duration      `json:"retry_delay"` // RetryDelay is the time delay between retries for failed tasks
	// contains filtered or unexported fields
}

Task represents a function that can be executed by the task manager

func NewTask added in v0.0.7

func NewTask(ctx context.Context, fn TaskFunc) (*Task, error)

NewTask creates a new task with the provided function and context

func (*Task) CancelledChan added in v0.0.4

func (task *Task) CancelledChan() <-chan struct{}

CancelledChan returns a channel which gets closed when the task is cancelled.

func (*Task) IsValid added in v0.0.2

func (task *Task) IsValid() (err error)

IsValid returns an error if the task is invalid

func (*Task) ShouldSchedule added in v0.0.5

func (task *Task) ShouldSchedule() error

ShouldSchedule returns an error if the task should not be scheduled

func (*Task) WaitCancelled added in v0.0.4

func (task *Task) WaitCancelled()

WaitCancelled waits for the task to be cancelled

type TaskFunc added in v0.0.2

type TaskFunc func() (interface{}, error)

TaskFunc signature of `Task` function

type TaskManager

type TaskManager struct {
	Registry   sync.Map      // Registry is a map of registered tasks
	Results    chan Result   // Results is the channel of results
	Tasks      chan Task     // Tasks is the channel of tasks
	Cancelled  chan Task     // Cancelled is the channel of cancelled tasks
	Timeout    time.Duration // Timeout is the default timeout for tasks
	MaxWorkers int           // MaxWorkers is the maximum number of workers that can be started
	MaxTasks   int           // MaxTasks is the maximum number of tasks that can be executed at once
	RetryDelay time.Duration // RetryDelay is the delay between retries
	MaxRetries int           // MaxRetries is the maximum number of retries
	// contains filtered or unexported fields
}

TaskManager is a struct that manages a pool of goroutines that can execute tasks

func NewTaskManager

func NewTaskManager(ctx context.Context, maxWorkers int, maxTasks int, tasksPerSecond float64, timeout time.Duration, retryDelay time.Duration, maxRetries int) *TaskManager

NewTaskManager creates a new task manager

  • `ctx` is the context for the task manager
  • `maxWorkers` is the number of workers to start, if not specified, the number of CPUs will be used
  • `maxTasks` is the maximum number of tasks that can be executed at once, defaults to 10
  • `tasksPerSecond` is the rate limit of tasks that can be executed per second, defaults to 1
  • `timeout` is the default timeout for tasks, defaults to 5 minute
  • `retryDelay` is the default delay between retries, defaults to 1 second
  • `maxRetries` is the default maximum number of retries, defaults to 3

func NewTaskManagerWithDefaults added in v0.0.5

func NewTaskManagerWithDefaults(ctx context.Context) *TaskManager

NewTaskManagerWithDefaults creates a new task manager with default values

  • `maxWorkers`: `runtime.NumCPU()`
  • `maxTasks`: 10
  • `tasksPerSecond`: 5
  • `timeout`: 5 minute
  • `retryDelay`: 1 second
  • `maxRetries`: 3

func (*TaskManager) CancelAll

func (tm *TaskManager) CancelAll()

CancelAll cancels all tasks

func (*TaskManager) CancelTask

func (tm *TaskManager) CancelTask(id uuid.UUID)

CancelTask cancels a task by its ID

func (*TaskManager) ExecuteTask added in v0.0.2

func (tm *TaskManager) ExecuteTask(id uuid.UUID, timeout time.Duration) (interface{}, error)

ExecuteTask executes a task given its ID and returns the result

  • It gets the task by ID and locks the mutex to access the task data.
  • If the task has already been started, it cancels it and returns an error.
  • If the task is invalid, it sends it to the cancelled channel and returns an error.
  • If the task is already running, it returns an error.
  • It creates a new context for this task and waits for the result to be available and return it.
  • It reserves a token from the limiter and waits for the task execution.
  • If the token reservation fails, it waits for a delay and tries again.
  • It executes the task and sends the result to the results channel.
  • If the task execution fails, it retries the task up to max retries with a delay between retries.
  • If the task fails with all retries exhausted, it cancels the task and returns an error.

func (*TaskManager) GetActiveTasks added in v0.0.4

func (tm *TaskManager) GetActiveTasks() int

GetActiveTasks returns the number of active tasks

func (*TaskManager) GetCancelledTasks added in v0.0.7

func (tm *TaskManager) GetCancelledTasks() <-chan Task

GetCancelledTasks gets the cancelled tasks channel Example usage:

get the cancelled tasks cancelledTasks := tm.GetCancelledTasks()

select { case task := <-cancelledTasks:

fmt.Printf("Task %s was cancelled\n", task.ID.String())

default:

fmt.Println("No tasks have been cancelled yet")
}

func (*TaskManager) GetResults

func (tm *TaskManager) GetResults() []Result

GetResults gets the results channel

func (*TaskManager) GetTask

func (tm *TaskManager) GetTask(id uuid.UUID) (task *Task, err error)

GetTask gets a task by its ID

func (*TaskManager) GetTasks

func (tm *TaskManager) GetTasks() []Task

GetTasks gets all tasks

func (*TaskManager) IsEmpty added in v0.0.7

func (tm *TaskManager) IsEmpty() bool

IsEmpty checks if the task scheduler queue is empty

func (*TaskManager) RegisterTask

func (tm *TaskManager) RegisterTask(ctx context.Context, task Task) error

RegisterTask registers a new task to the task manager

func (*TaskManager) RegisterTasks added in v0.0.4

func (tm *TaskManager) RegisterTasks(ctx context.Context, tasks ...Task)

RegisterTasks registers multiple tasks to the task manager at once

func (*TaskManager) StartWorkers added in v0.0.4

func (tm *TaskManager) StartWorkers()

StartWorkers starts the task manager and its goroutines

func (*TaskManager) Stop

func (tm *TaskManager) Stop()

Stop stops the task manager and waits for all tasks to finish

func (*TaskManager) StreamResults added in v0.0.5

func (tm *TaskManager) StreamResults() <-chan Result

StreamResults streams the results channel

func (*TaskManager) Wait added in v0.0.4

func (tm *TaskManager) Wait(timeout time.Duration)

Wait waits for all tasks to complete or for the timeout to elapse

type TaskStatus added in v0.0.4

type TaskStatus uint8

TaskStatus is a value used to represent the task status.

func (TaskStatus) String added in v0.0.4

func (ts TaskStatus) String() string

String returns the string representation of the task status.

Directories

Path Synopsis
examples

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL