Documentation ¶
Index ¶
- Constants
- Variables
- type Middleware
- type Result
- type Service
- type Task
- type TaskFunc
- type TaskManager
- func (tm *TaskManager) CancelAll()
- func (tm *TaskManager) CancelTask(id uuid.UUID)
- func (tm *TaskManager) ExecuteTask(id uuid.UUID, timeout time.Duration) (interface{}, error)
- func (tm *TaskManager) GetActiveTasks() int
- func (tm *TaskManager) GetCancelledTasks() <-chan Task
- func (tm *TaskManager) GetResults() []Result
- func (tm *TaskManager) GetTask(id uuid.UUID) (task *Task, err error)
- func (tm *TaskManager) GetTasks() []Task
- func (tm *TaskManager) IsEmpty() bool
- func (tm *TaskManager) RegisterTask(ctx context.Context, task Task) error
- func (tm *TaskManager) RegisterTasks(ctx context.Context, tasks ...Task)
- func (tm *TaskManager) StartWorkers()
- func (tm *TaskManager) Stop()
- func (tm *TaskManager) StreamResults() <-chan Result
- func (tm *TaskManager) Wait(timeout time.Duration)
- type TaskStatus
Constants ¶
const ( // ContextDeadlineReached means the context is past its deadline. ContextDeadlineReached = TaskStatus(1) // RateLimited means the number of concurrent tasks per second exceeded the maximum allowed. RateLimited = TaskStatus(2) // Cancelled means `CancelTask` was invked and the `Task` was cancelled. Cancelled = TaskStatus(3) // Failed means the `Task` failed. Failed = TaskStatus(4) // Queued means the `Task` is queued. Queued = TaskStatus(5) // Running means the `Task` is running. Running = TaskStatus(6) // Invalid means the `Task` is invalid. Invalid = TaskStatus(7) // Completed means the `Task` is completed. Completed = TaskStatus(8) )
CancelReason values
- 1: `ContextDeadlineReached`
- 2: `RateLimited`
- 3: `Cancelled`
- 4: `Failed`
- 5: `Queued`
- 6: `Running`
- 7: `Invalid`
- 8: `Completed`
const ( // DefaultMaxTasks is the default maximum number of tasks that can be executed at once DefaultMaxTasks = 10 // DefaultTasksPerSecond is the default rate limit of tasks that can be executed per second DefaultTasksPerSecond = 5 // DefaultTimeout is the default timeout for tasks DefaultTimeout = 5 // DefaultRetryDelay is the default delay between retries DefaultRetryDelay = 1 // DefaultMaxRetries is the default maximum number of retries DefaultMaxRetries = 3 )
Variables ¶
var ( // ErrInvalidTaskID is returned when a task has an invalid ID ErrInvalidTaskID = errors.New("invalid task id") // ErrInvalidTaskFunc is returned when a task has an invalid function ErrInvalidTaskFunc = errors.New("invalid task function") // ErrInvalidTaskContext is returned when a task has an invalid context ErrInvalidTaskContext = errors.New("invalid task context") // ErrTaskNotFound is returned when a task is not found ErrTaskNotFound = errors.New("task not found") // ErrTaskTimeout is returned when a task times out ErrTaskTimeout = errors.New("task timeout") // ErrTaskCancelled is returned when a task is cancelled ErrTaskCancelled = errors.New("task cancelled") // ErrTaskAlreadyStarted is returned when a task is already started ErrTaskAlreadyStarted = errors.New("task already started") // ErrTaskCompleted is returned when a task is already completed ErrTaskCompleted = errors.New("task completed") )
Errors returned by the TaskManager
Functions ¶
This section is empty.
Types ¶
type Middleware ¶
Middleware describes a `Service` middleware.
type Result ¶ added in v0.0.4
type Result struct { Task *Task // the task that produced the result Result interface{} // the result of the task Error error // the error returned by the task }
Result is a task result
type Service ¶
type Service interface { // RegisterTask registers a new task to the worker RegisterTask(ctx context.Context, task Task) error // RegisterTasks registers multiple tasks to the worker RegisterTasks(ctx context.Context, tasks ...Task) // StartWorkers starts the task manager's workers StartWorkers() // Wait for all tasks to finish Wait(timeout time.Duration) // Stop the task manage Stop() // CancelAll cancels all tasks CancelAll() // CancelTask cancels a task by its ID CancelTask(id uuid.UUID) // GetActiveTasks returns the number of active tasks GetActiveTasks() int // StreamResults streams the `Result` channel StreamResults() <-chan Result // GetResults retruns the `Result` channel GetResults() []Result // GetCancelledTasks gets the cancelled tasks channel GetCancelledTasks() <-chan Task // GetTask gets a task by its ID GetTask(id uuid.UUID) (task *Task, err error) // GetTasks gets all tasks GetTasks() []Task // ExecuteTask executes a task given its ID and returns the result ExecuteTask(id uuid.UUID, timeout time.Duration) (interface{}, error) }
Service is an interface for a task manager
func RegisterMiddleware ¶
func RegisterMiddleware(svc Service, mw ...Middleware) Service
RegisterMiddleware registers middlewares to the `Service`.
type Task ¶
type Task struct { ID uuid.UUID `json:"id"` // ID is the id of the task Name string `json:"name"` // Name is the name of the task Description string `json:"description"` // Description is the description of the task Priority int `json:"priority"` // Priority is the priority of the task Execute TaskFunc `json:"-"` // Execute is the function that will be executed by the task Ctx context.Context `json:"context"` // Ctx is the context of the task CancelFunc context.CancelFunc `json:"-"` // CancelFunc is the cancel function of the task Status TaskStatus `json:"task_status"` // TaskStatus is stores the status of the task Result atomic.Value `json:"result"` // Result is the result of the task Error atomic.Value `json:"error"` // Error is the error of the task Started atomic.Int64 `json:"started"` // Started is the time the task started Completed atomic.Int64 `json:"completed"` // Completed is the time the task completed Cancelled atomic.Int64 `json:"cancelled"` // Cancelled is the time the task was cancelled Retries int `json:"retries"` // Retries is the maximum number of retries for failed tasks RetryDelay time.Duration `json:"retry_delay"` // RetryDelay is the time delay between retries for failed tasks // contains filtered or unexported fields }
Task represents a function that can be executed by the task manager
func (*Task) CancelledChan ¶ added in v0.0.4
func (task *Task) CancelledChan() <-chan struct{}
CancelledChan returns a channel which gets closed when the task is cancelled.
func (*Task) ShouldSchedule ¶ added in v0.0.5
ShouldSchedule returns an error if the task should not be scheduled
func (*Task) WaitCancelled ¶ added in v0.0.4
func (task *Task) WaitCancelled()
WaitCancelled waits for the task to be cancelled
type TaskFunc ¶ added in v0.0.2
type TaskFunc func() (interface{}, error)
TaskFunc signature of `Task` function
type TaskManager ¶
type TaskManager struct { Registry sync.Map // Registry is a map of registered tasks Results chan Result // Results is the channel of results Tasks chan Task // Tasks is the channel of tasks Cancelled chan Task // Cancelled is the channel of cancelled tasks Timeout time.Duration // Timeout is the default timeout for tasks MaxWorkers int // MaxWorkers is the maximum number of workers that can be started MaxTasks int // MaxTasks is the maximum number of tasks that can be executed at once RetryDelay time.Duration // RetryDelay is the delay between retries MaxRetries int // MaxRetries is the maximum number of retries // contains filtered or unexported fields }
TaskManager is a struct that manages a pool of goroutines that can execute tasks
func NewTaskManager ¶
func NewTaskManager(ctx context.Context, maxWorkers int, maxTasks int, tasksPerSecond float64, timeout time.Duration, retryDelay time.Duration, maxRetries int) *TaskManager
NewTaskManager creates a new task manager
- `ctx` is the context for the task manager
- `maxWorkers` is the number of workers to start, if not specified, the number of CPUs will be used
- `maxTasks` is the maximum number of tasks that can be executed at once, defaults to 10
- `tasksPerSecond` is the rate limit of tasks that can be executed per second, defaults to 1
- `timeout` is the default timeout for tasks, defaults to 5 minute
- `retryDelay` is the default delay between retries, defaults to 1 second
- `maxRetries` is the default maximum number of retries, defaults to 3
func NewTaskManagerWithDefaults ¶ added in v0.0.5
func NewTaskManagerWithDefaults(ctx context.Context) *TaskManager
NewTaskManagerWithDefaults creates a new task manager with default values
- `maxWorkers`: `runtime.NumCPU()`
- `maxTasks`: 10
- `tasksPerSecond`: 5
- `timeout`: 5 minute
- `retryDelay`: 1 second
- `maxRetries`: 3
func (*TaskManager) CancelTask ¶
func (tm *TaskManager) CancelTask(id uuid.UUID)
CancelTask cancels a task by its ID
func (*TaskManager) ExecuteTask ¶ added in v0.0.2
ExecuteTask executes a task given its ID and returns the result
- It gets the task by ID and locks the mutex to access the task data.
- If the task has already been started, it cancels it and returns an error.
- If the task is invalid, it sends it to the cancelled channel and returns an error.
- If the task is already running, it returns an error.
- It creates a new context for this task and waits for the result to be available and return it.
- It reserves a token from the limiter and waits for the task execution.
- If the token reservation fails, it waits for a delay and tries again.
- It executes the task and sends the result to the results channel.
- If the task execution fails, it retries the task up to max retries with a delay between retries.
- If the task fails with all retries exhausted, it cancels the task and returns an error.
func (*TaskManager) GetActiveTasks ¶ added in v0.0.4
func (tm *TaskManager) GetActiveTasks() int
GetActiveTasks returns the number of active tasks
func (*TaskManager) GetCancelledTasks ¶ added in v0.0.7
func (tm *TaskManager) GetCancelledTasks() <-chan Task
GetCancelledTasks gets the cancelled tasks channel Example usage:
get the cancelled tasks cancelledTasks := tm.GetCancelledTasks()
select { case task := <-cancelledTasks:
fmt.Printf("Task %s was cancelled\n", task.ID.String())
default:
fmt.Println("No tasks have been cancelled yet") }
func (*TaskManager) GetResults ¶
func (tm *TaskManager) GetResults() []Result
GetResults gets the results channel
func (*TaskManager) GetTask ¶
func (tm *TaskManager) GetTask(id uuid.UUID) (task *Task, err error)
GetTask gets a task by its ID
func (*TaskManager) IsEmpty ¶ added in v0.0.7
func (tm *TaskManager) IsEmpty() bool
IsEmpty checks if the task scheduler queue is empty
func (*TaskManager) RegisterTask ¶
func (tm *TaskManager) RegisterTask(ctx context.Context, task Task) error
RegisterTask registers a new task to the task manager
func (*TaskManager) RegisterTasks ¶ added in v0.0.4
func (tm *TaskManager) RegisterTasks(ctx context.Context, tasks ...Task)
RegisterTasks registers multiple tasks to the task manager at once
func (*TaskManager) StartWorkers ¶ added in v0.0.4
func (tm *TaskManager) StartWorkers()
StartWorkers starts the task manager and its goroutines
func (*TaskManager) Stop ¶
func (tm *TaskManager) Stop()
Stop stops the task manager and waits for all tasks to finish
func (*TaskManager) StreamResults ¶ added in v0.0.5
func (tm *TaskManager) StreamResults() <-chan Result
StreamResults streams the results channel
func (*TaskManager) Wait ¶ added in v0.0.4
func (tm *TaskManager) Wait(timeout time.Duration)
Wait waits for all tasks to complete or for the timeout to elapse
type TaskStatus ¶ added in v0.0.4
type TaskStatus uint8
TaskStatus is a value used to represent the task status.
func (TaskStatus) String ¶ added in v0.0.4
func (ts TaskStatus) String() string
String returns the string representation of the task status.