fastjob

package module
v0.0.0-...-f410e65 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 8, 2019 License: MIT Imports: 8 Imported by: 0

README

fastjob

GoDoc Go Report Card CircleCI codecov

Fastjob is a fast and robust job queue using GoogleCloud PubSub 🛰

Work In Progress

Design objectives:

  • Robustness: never lose a job.
  • Reliability: never let the main queue be blocked by failing jobs.

Strategies:

  • Robustness: only one external dependencies: PubSub.
  • Robustness: the durability is garanteed by PubSub.
  • Reliability: the core features are mostly only the PubSub semantics and features (but extensible).
  • Reliability: route failing jobs to a dead letter queue.

Usage

Define a job:
type PingHTTP struct{
    Url string
}

func (m *PingHTTP) Name() string {
	return "PingHTTP"
}

func (m *PingHTTP) Perform(ctx context.Context) error {
    _, err := http.Post(m.Url)
	return err
}

Note: the job will be JSON encoded, only public fields should be used to define the job inputs.

Register the job:
registry := fastjob.NewRegistry().WithJob(&PingHTTP{})
Run the worker:
client, err := pubsub.NewClient(ctx, "my-gcp-project-id")
sub := client.Subscription("sub-test")
config := fastjob.NewConfig(registry)

worker := fastjob.NewPubsubWorker(config, sub)
worker.Run(ctx)
Enqueue a job:
runner := fastjob.NewPubSubRunner(client, topicName)

job := &PingHTTP{Url: "http://example.org/hello"}
err = runner.Enqueue(ctx, job)
Use a local runner for testing:
runner := fastjob.NewLocalRunner()

err = runner.Enqueue(ctx, job)

License

MIT

Documentation

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewConfig

func NewConfig(registry *JobRegistry) *config

Types

type Executor

type Executor struct {
	// contains filtered or unexported fields
}

func NewExecutor

func NewExecutor(config *config) *Executor

func (*Executor) Execute

func (e *Executor) Execute(ctx context.Context, request *JobRequest) error

Execute runs a job, recover panics,

type Job

type Job interface {
	Name() string
	Perform(context.Context) error
}

Job is the interface that jobs must implement

type JobRegistry

type JobRegistry struct {
	// contains filtered or unexported fields
}

func NewRegistry

func NewRegistry() *JobRegistry

func (*JobRegistry) Get

func (r *JobRegistry) Get(name string) (JobType, error)

func (*JobRegistry) WithJob

func (r *JobRegistry) WithJob(job Job) *JobRegistry

func (*JobRegistry) WithJobs

func (r *JobRegistry) WithJobs(jobs ...Job) *JobRegistry

type JobRequest

type JobRequest struct {
	RequestID   string
	RequestTime int64

	JobName string
	JobData []byte
}

func NewJobRequest

func NewJobRequest(job Job) (*JobRequest, error)

func (*JobRequest) String

func (r *JobRequest) String() string

type JobType

type JobType func() Job

JobType is the job factory. Used to allocate a job struct when receiving a job request to process.

type LocalRunner

type LocalRunner struct {
	// contains filtered or unexported fields
}

LocalRunner runs jobs synchronously

func (*LocalRunner) Enqueue

func (r *LocalRunner) Enqueue(ctx context.Context, job Job) error

type Logger

type Logger interface {
	Debugf(ctx context.Context, format string, args ...interface{})
	Infof(ctx context.Context, format string, args ...interface{})
	Errorf(ctx context.Context, err error, format string, args ...interface{})
}

type PubSubRunner

type PubSubRunner struct {
	// contains filtered or unexported fields
}

PubSubRunner enqueues jobs through GCP pubsub

func (*PubSubRunner) Enqueue

func (r *PubSubRunner) Enqueue(ctx context.Context, job Job) error

type PubsubWorker

type PubsubWorker struct {
	// contains filtered or unexported fields
}

PubsubWorker

func NewPubsubWorker

func NewPubsubWorker(config *config, subscription *pubsub.Subscription) *PubsubWorker

NewPubsubWorker creates a new PubSub worker to execute jobs enqueued in a PubSub Topic.

Example
ctx := context.Background()

client, err := pubsub.NewClient(ctx, "my-gcp-project-id")
if err != nil {
	// TODO: Handle error.
}

sub := client.Subscription("subscription-test")

// https://github.com/googleapis/google-cloud-go/wiki/Fine-Tuning-PubSub-Receive-Performance
// Defaults:
//   MaxExtension:           10 * time.Minute,
//   MaxOutstandingMessages: 1000,
//   MaxOutstandingBytes:    1e9,
//   NumGoroutines:          1,
sub.ReceiveSettings.MaxExtension = 1 * time.Minute
sub.ReceiveSettings.MaxOutstandingMessages = 100
sub.ReceiveSettings.MaxOutstandingBytes = 100e6
sub.ReceiveSettings.NumGoroutines = 1

registry := fastjob.NewRegistry().WithJobs(&MockJob{})
config := fastjob.NewConfig(registry)
worker := fastjob.NewPubsubWorker(config, sub)

err = worker.Run(ctx)
if err != nil {
	// TODO: Handle error.
}
Output:

func (*PubsubWorker) Run

func (w *PubsubWorker) Run(ctx context.Context) error

Run runs the Pubsub worker until an error occurs or the context is cancelled

type Runner

type Runner interface {
	Enqueue(context.Context, Job) error
}

func NewLocalRunner

func NewLocalRunner(config *config) Runner

func NewPubSubRunner

func NewPubSubRunner(client *pubsub.Client, topicName string) Runner

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL