stepper

package module
v0.0.9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 20, 2023 License: MIT Imports: 10 Imported by: 0

README

Stepper

A simple, efficient, concurrent task runner.

  • Simple. Run tasks and schedule jobs with GO.
  • Database agnostic. Stepper supports MongoDB, Postgresql (beta).
  • Concurrent. Stepper can be used in an unlimited number of instances.
  • Scalable. Split one task into small subtasks which will run on different nodes.

Install

go get github.com/matroskin13/stepper
go get github.com/matroskin13/stepper/engines/mongo

or

go get github.com/matroskin13/stepper/engines/pg

Getting started

package main

import (
    "log"

    "github.com/matroskin13/stepper"
    "github.com/matroskin13/stepper/engines/mongo"
)

func main() {
    mongoEngine, err := mongo.NewMongo("mongodb://localhost:27017", "example_database")
    if err != nil {
        log.Fatal(err)
    }

    ctx := context.Background()

    service := stepper.NewService(mongoEngine)

    // Will publish a task on startup
    if err := service.Publish(ctx, "example-task", []byte("Hello world")); err != nil {
        log.Fatal(err)
    }

    service.TaskHandler("example-task", func(ctx stepper.Context, data []byte) error {
        fmt.Println(string(data))

        return nil
    })

    if err := service.Listen(ctx); err != nil {
        log.Fatal(err)
    }
}

Or you can use PostgresQL:

import "github.com/matroskin13/stepper/engines/pg"
engine, err := pg.NewPG("postgres://postgres:test@localhost:5432/postgres")
if err != nil {
    log.Fatal(err)
}

Table of Contents

Publish task

If you use the stepper you will use a lot of things but first of all you will publish and execute tasks. Let's discuss how you can publish tasks.

Simple way
service.Publish(context.Background(), "example-task", []byte("hello"))

The example shows the simple way to publish a task. The code will publish a task with a name example-task and content hello.

But also the stepper allows you to use additional options.

Publish with delay

If you don't want to execute a task immediately you can set up a delay.

service.Publish(
    context.Background(),
    "example-task",
    []byte("hello"),
    stepper.SetDelay(time.Minute * 1),
)

Or you can use particular a date

service.Publish(
    context.Background(),
    "example-task",
    []byte("hello"),
    stepper.LaunchAt(time.Now().Add(time.Minute * 10)),
)

Execute a task

The second part of the Stepper is execution of tasks in queue.

Simple way
s.TaskHandler("example-task", func(ctx stepper.Context, data []byte) error {
    fmt.Println(string(data))

    return nil
})

The example shows the simple way to execute a task.

Error handling

If your handler returns an error, a task will be returned to the queue. And the task will be held in the queue for 10 seconds. But you can set up a delay manually.

s.TaskHandler("example-task", func(ctx stepper.Context, data []byte) error {
    ctx.SetRetryAfter(time.Minute) // will be returned in 1 minute
    return fmt.Errorf("some error")
})
Bind a state

If you have a log running task, you can bind a state of task (cursor for example), and if your task failed you will be able to continue the task with the last state

s.TaskHandler("example-task", func(ctx stepper.Context, data []byte) error {
    var lastId int

    if err := ctx.BindState(&lastId); err != nil {
        return err
    }

    iter := getSomethingFromId(lastId) // something like a mongodb iterator or anything else

    for iter.Next() {
        lastId = ... // do something

        if err := ctx.SetState(lastId); err != nil {
            return err
        }
    }

    return nil
})

Subtasks

The most powerful feature of the stepper is creating subtasks. The feature allows you to split a long-running task into separate tasks which will run on different nodes. And when all subtasks will be completed the stepper will call a onFinish hook of parent task.

Create a subtask

The following example shows how to spawn subtasks within a main task.

s.TaskHandler("task-with-threads", func(ctx stepper.Context, data []byte) error {
    fmt.Println("have received the word for splitting: ", string(data))

    for _, symbol := range strings.Split(string(data), "") {
        ctx.CreateSubtask(stepper.CreateTask{
            Data: []byte(symbol),
        })
    }

    return nil
}).Subtask(func(ctx stepper.Context, data []byte) error {
    fmt.Printf("[letter-subtask]: have received symbol: %s\r\n", data)
    return nil
}).OnFinish(func(ctx stepper.Context, data []byte) error {
    fmt.Println("subtasks are over")
    return nil
})

Or you can use existing a subtask:

ctx.CreateSubtask(stepper.CreateTask{
    Name: "some-task",
    Data: []byte(symbol),
})

Repeated tasks

If you want to run repeatead task (cron) you can use jobs

s.RegisterJob(context.Background(), &stepper.JobConfig{
    Name:    "log-job",
    Pattern: "@every 10s",
}, func(ctx stepper.Context) error {
    fmt.Println("wake up the log-job")
    return nil
})

Read https://pkg.go.dev/github.com/robfig/cron#hdr-CRON_Expression_Format for more information about a pattern.

Also you can create subtasks from a job:

s.RegisterJob(context.Background(), &stepper.JobConfig{
    Name:    "log-job",
    Pattern: "@every 10s",
}, func(ctx stepper.Context) error {
    fmt.Println("wake up the log-job")

    ctx.CreateSubtask(stepper.CreateTask{
        Name: "log-subtask",
        Data: []byte("Hello 1 subtask"),
    })

    return nil
}).OnFinish(func(ctx stepper.Context, data []byte) error {
    fmt.Println("success job log-job")

    return nil
})

Middlewares

Retry

The retry middleware allows you to limit a number of retries.

service := stepper.NewService(db)

s.UseMiddleware(middlewares.Retry(middlewares.RetryOptions{
    Interval:   time.Second * 5,
    MaxRetries: 3,
}))
Prometheus
service := stepper.NewService(db)

prometheusMiddleware := middlewares.NewPrometheus()

s.UseMiddleware(prometheusMiddleware.GetMiddleware())

go func() {
    http.ListenAndServe(":3999", promhttp.HandlerFor(prometheusMiddleware.GetRegistry(), promhttp.HandlerOpts{}))
}()

if err := s.Listen(context.Background()); err != nil {
    log.Fatal(err)
}

The prometheus middleware provides following metrics:

prometheus.NewCounterVec(prometheus.CounterOpts{
    Name: "stepper_task_execution",
    Help: "Count of all task executions",
}, []string{"task", "status"})

prometheus.NewHistogramVec(prometheus.HistogramOpts{
    Name:    "stepper_task_duration_seconds",
    Help:    "Duration of all executions",
    Buckets: []float64{.025, .05, .1, .25, .5, 1, 2.5, 5, 10, 20, 30},
}, []string{"task", "status"})

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Apply

func Apply[T any](initial *T, callbacks []func(*T)) *T

func Or

func Or[T comparable](first, second T) T

func Pool

func Pool[T any](ctx context.Context, count int, consumer func(*T)) chan *T

Types

type Context

type Context interface {
	Task() *Task
	Context() context.Context
	CreateSubtask(sub CreateTask)
	BindState(state any) error
	SetState(state any) error
	SetRetryAfter(timeout time.Duration)
	SetContext(ctx context.Context)
}

type CreateTask

type CreateTask struct {
	Name        string
	Data        []byte
	CustomId    string
	LaunchAfter time.Duration
	LaunchAt    time.Time
}

type Engine

type Engine interface {
	TaskEngine
	JobEngine
}

type Handler

type Handler func(ctx Context, data []byte) error

type HandlerStruct

type HandlerStruct interface {
	OnFinish(h Handler) HandlerStruct
	Subtask(handler Handler) HandlerStruct
	UseMiddleware(middlewares ...MiddlewareHandler)
	DependOnCustomId() HandlerStruct
}

type Job

type Job struct {
	Status        string          `json:"status"`
	Name          string          `json:"name"`
	Pattern       string          `json:"pattern"`
	NextLaunchAt  time.Time       `json:"naxtLaunchAt"`
	EngineContext context.Context `json:"-"`
}

func (*Job) CalculateNextLaunch

func (j *Job) CalculateNextLaunch() error

type JobConfig

type JobConfig struct {
	Tags    []string
	Name    string
	Pattern string
}

func (*JobConfig) NextLaunch

func (c *JobConfig) NextLaunch() (time.Time, error)

type JobEngine

type JobEngine interface {
	FindNextJob(ctx context.Context, statuses []string) (*Job, error)
	GetUnreleasedJobChildren(ctx context.Context, name string) (*Task, error)
	Release(ctx context.Context, job *Job, nextLaunchAt time.Time) error
	WaitForSubtasks(ctx context.Context, job *Job) error
	RegisterJob(ctx context.Context, cfg *JobConfig) error
	Init(ctx context.Context) error
}

type JobHandler

type JobHandler func(ctx Context) error

type MiddlewareFunc

type MiddlewareFunc func(ctx Context, t *Task) error

type MiddlewareHandler

type MiddlewareHandler func(t MiddlewareFunc) MiddlewareFunc

type PublishOption

type PublishOption func(c *CreateTask)

func LaunchAt

func LaunchAt(t time.Time) PublishOption

func SetDelay

func SetDelay(d time.Duration) PublishOption

type Service

type Service struct {
	// contains filtered or unexported fields
}

func (*Service) Listen

func (s *Service) Listen(ctx context.Context) error

func (*Service) ListenJobs

func (s *Service) ListenJobs(ctx context.Context) error

func (*Service) ListenTasks

func (s *Service) ListenTasks(ctx context.Context) error

func (*Service) ListenWaitingJobs

func (s *Service) ListenWaitingJobs(ctx context.Context) error

func (*Service) ListenWaitingTasks

func (s *Service) ListenWaitingTasks(ctx context.Context) error

func (*Service) Publish

func (s *Service) Publish(ctx context.Context, name string, data []byte, options ...PublishOption) error

func (*Service) RegisterJob

func (s *Service) RegisterJob(ctx context.Context, config *JobConfig, h JobHandler) HandlerStruct

func (*Service) TaskHandler

func (s *Service) TaskHandler(name string, h Handler, middlewares ...MiddlewareHandler) HandlerStruct

func (*Service) UseMiddleware

func (s *Service) UseMiddleware(h MiddlewareHandler)

type Stepper

type Stepper interface {
	TaskHandler(name string, handler Handler, middlewares ...MiddlewareHandler) HandlerStruct
	Listen(ctx context.Context) error
	Publish(ctx context.Context, name string, data []byte, options ...PublishOption) error
	RegisterJob(ctx context.Context, config *JobConfig, h JobHandler) HandlerStruct
	UseMiddleware(h MiddlewareHandler)
}

func NewService

func NewService(engine Engine) Stepper

type Task

type Task struct {
	ID               string            `json:"_id"`
	CustomId         string            `bson:"custom_id"`
	Name             string            `json:"name"`
	Data             []byte            `json:"data"`
	JobId            string            `json:"jobId"`
	Parent           string            `json:"parent"`
	LaunchAt         time.Time         `json:"launchAt"`
	Status           string            `json:"status"`
	LockAt           *time.Time        `json:"lock_at"`
	State            []byte            `json:"state"`
	MiddlewaresState map[string][]byte `json:"middlewares_state"`
	EngineContext    context.Context   `json:"-"`
}

func (*Task) IsWaiting

func (t *Task) IsWaiting() bool

type TaskEngine

type TaskEngine interface {
	GetRelatedTask(ctx context.Context, task *Task) (*Task, error)
	FindNextTask(ctx context.Context, statuses []string) (*Task, error)
	ReleaseTask(ctx context.Context, task *Task) error
	WaitTaskForSubtasks(ctx context.Context, task *Task) error
	FailTask(ctx context.Context, task *Task, err error, timeout time.Duration) error
	CreateTask(ctx context.Context, task *Task) error
	GetUnreleasedTaskChildren(ctx context.Context, task *Task) (*Task, error)
	SetState(ctx context.Context, task *Task, state []byte) error
	CollectMetrics(ctx context.Context) error
}

Directories

Path Synopsis
engines
pg

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL