conveyor

package module
v0.0.0-...-821d495 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 25, 2024 License: AGPL-3.0 Imports: 13 Imported by: 0

README

Conveyor Gopher

Chain-able background job runner

Conveyor

Go Report Card Go Reference

  • ⛓️ Chain-able background jobs
  • 📠 Typed payloads using protobuf
  • 🛠️ Built in admin, that can be embedded within your app

An asynchronous job runner, enabling developers to efficiently chain jobs together in a highly structured manner. This capability allows for the creation of complex job hierarchies, where a single job can trigger multiple child jobs, followed by an heir job that executes upon the completion of all child jobs.

Conveyor can manage multiple layers of chained jobs, streamlining the execution process and enhancing code performance. It's not just about running tasks; it's about doing so in a way that's both logical and efficient, tailored for developers looking for precision and control in their asynchronous operations.

Leveraging protobuf, Conveyor ensures all jobs are precisely typed. Using protobufs allows for structured data that evolves with your application. It also supports a language-neutral ecosystem. This versatility opens the door for other programming languages to initiate jobs and serve as Conveyor workers, enhancing interoperability and flexibility across different coding environments.

Conveyor incorporates a built-in admin package, designed to enhance operational efficiency and oversight. This package integrates seamlessly, providing a http.Handler that can be served where you need it, whether that's within an existing admin service or protected by simple HTTP basic authentication. The admin interface offers comprehensive visibility into the queue, enabling admins to view and cancel jobs as needed.

Requirements

Example

Producer
package main

import (
	"context"
	"log/slog"
	"os"

	"github.com/jpoz/conveyor/_examples/basic"
	conveyor "github.com/jpoz/conveyor"
	"github.com/jpoz/conveyor/config"
)

func main() {
	cfg := config.Project{
		RedisURL: "redis://localhost:6379",
		Logger:   slog.Default(),
	}

	client, err := conveyor.NewClient(&cfg)
	if err != nil {
		slog.Error("failed to create client", slog.String("error", err.Error()))
		os.Exit(1)
	}

	result, err := client.Enqueue(context.Background(), &basic.BasicJob{
		Name:  "Bob",
		Count: 123,
	})
	if err != nil {
		slog.Error("failed to enqueue job", slog.String("error", err.Error()))
		os.Exit(1)
	}

	slog.Info("job enqueued", slog.String("uuid", result.Uuid))
}
Worker

Run basic.RunBasicJob when a basic.BasicJob is received.

package main

import (
	"context"
	"log/slog"
	"os"

	"github.com/jpoz/conveyor/_examples/basic"
	conveyor "github.com/jpoz/conveyor"
	"github.com/jpoz/conveyor/config"
)

func main() {
	cfg := config.Project{
		RedisURL: "redis://localhost:6379",
		Logger:   slog.Default(),
	}

	w, err := conveyor.NewWorker(&cfg)
	if err != nil {
		slog.Error("failed to create worker", slog.String("error", err.Error()))
		os.Exit(1)
	}

	err = w.RegisterJobs(basic.RunBasicJob)
	if err != nil {
		slog.Error("failed to register jobs", slog.String("error", err.Error()))
		os.Exit(1)
	}

	w.Run(context.Background())
}
TODO
  • Move pkg out of pkg dir
  • Fix error when hub is used outside of the project
  • Add Addr to Project config
  • Fail worker if Redis doesn't connect
  • s/RegisterJobs/Register

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ClientContextKey = &ContextKey{"client"}
View Source
var ErrJobNotRegistered = errors.New("job not registered")
View Source
var ErrNoRegisteredJobs = errors.New("no registered jobs")
View Source
var JobContextKey = &ContextKey{"job"}

Functions

func AddClientToContext

func AddClientToContext(ctx context.Context, obj *Client) context.Context

func CurrentJob

func CurrentJob(ctx context.Context) *wire.Job

func JobContext

func JobContext(ctx context.Context, obj *wire.Job) context.Context

Types

type Client

type Client struct {
	// contains filtered or unexported fields
}

func CurrentClient

func CurrentClient(ctx context.Context) *Client

func NewClient

func NewClient(cfg config.ClientConfig) (*Client, error)

func (*Client) Close

func (c *Client) Close() error

func (*Client) Enqueue

func (c *Client) Enqueue(ctx context.Context, msg proto.Message, opts ...JobOption) (*Result, error)

func (*Client) EnqueueChild

func (c *Client) EnqueueChild(ctx context.Context, msg proto.Message) (*Result, error)

func (*Client) EnqueueHeir

func (c *Client) EnqueueHeir(ctx context.Context, msg proto.Message) (*Result, error)

type ContextKey

type ContextKey struct{ Name string }

type JobOption

type JobOption func(job *wire.Job) error

func Delay

func Delay(delay time.Duration) JobOption

func RunAt

func RunAt(t time.Time) JobOption

type Result

type Result struct {
	Uuid string
}

type Worker

type Worker struct {
	ID string
	// contains filtered or unexported fields
}

func NewWorker

func NewWorker(cfg config.WorkerConfig) (*Worker, error)

func (*Worker) CallJob

func (w *Worker) CallJob(ctx context.Context, job *wire.Job) error

func (*Worker) Close

func (w *Worker) Close() error

func (*Worker) ContextFor

func (w *Worker) ContextFor(job *wire.Job) context.Context

func (*Worker) Heartbeat

func (w *Worker) Heartbeat(ctx context.Context) error

func (*Worker) Periodic

func (w *Worker) Periodic(ctx context.Context, duration time.Duration, fn func(ctx context.Context) error) error

func (*Worker) RegisterJob

func (w *Worker) RegisterJob(fn any) error

func (*Worker) RegisterJobs

func (w *Worker) RegisterJobs(fn ...any) error

func (*Worker) Run

func (w *Worker) Run(ctx context.Context) error

func (*Worker) Use

func (w *Worker) Use(fn ...WorkerMiddleware)

type WorkerMiddleware

type WorkerMiddleware func(ctx context.Context, job *wire.Job) error

Directories

Path Synopsis
_examples
hub
views
templ: version: v0.2.598
templ: version: v0.2.598

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL