sqlq

package module
v0.0.0-...-3352087 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 19, 2023 License: MIT Imports: 16 Imported by: 0

README

sqlq

⚠ still very experimental! proceed with care!

sqlq is a SQL-backed, persistent, job queuing solution.

Usage

To start using sqlq add it to your dependencies with:

go get -u github.com/mergestat/sqlq
Enqueing new jobs

Using the sqlq.Enqueue() function, push a job to a queue:

func NewSendMail(args ...string) *sqlq.JobDescription {
    var params = args // somehow serialize args ... using json.Encode() probably
    return sqlq.NewJobDesc("send-email", WithParameters(params));
}


var conn *sql.DB // handle to an open database connection

job, err := sqlq.Enqueue(conn, "default", NewSendMail(from, to, body))
if err != nil {
    // handle error here
}
De-queuing and processing jobs

To de-queue and process a job, you need to use one of the available runtimes. Currently, the only available runtime is the embed runtime available under github.com/mergestat/sqlq/runtime/embed package. The following example demonstrates how to implement a job handler for that runtime.

func SendMail() sqlq.Handlerfunc {
    return func(ctx context.Context, job *sqlq.Job) error { 
        // send mail's implementation
        return nil
    }
}

func main() {
    var upstream *sql.DB // handle to an open database connection
    var worker, _ = embed.NewWorker(upstream, embed.WorkerConfig{
		    Queues: []sqlq.Queue{"embed/default"}, // queues to watch
    })
    
    _ = worker.Register("send-mail", SendMail())
    
    // starting the worker will start the processing routine in background
    if err := worker.Start(); err != nil {
        // handle error
    }
    
    // wait here .. probably listening for os.Signal
    
    // make a clear / graceful exit
    if err := worker.Shutdown(5 * time.Second); err != nil {
        // graceful exit failed / didn't complete
        return nil
    }
}

These basic examples are just to give an idea. Refer to tests to see more varied use cases.

Developing

To start a postgres database in Docker and execute tests against it, run:

docker-compose up

and then:

go test ./... -postgres-url postgres://postgres:password@localhost:5432 -v

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrJobStateMismatch is returned if an optimistic locking failure occurs when transitioning
	// a job between states. It usually indicates that some other has updated the state of the job
	// after we have read it from the store.
	ErrJobStateMismatch = errors.New("job state mismatch")
)
View Source
var (
	// ErrSkipRetry must be used by the job handler routine to signal
	// that the job must not be retried (even when there are attempts left)
	ErrSkipRetry = errors.New("sqlq: skip retry")
)

Functions

func Cancelled

func Cancelled(cx Connection, job *Job) (err error)

Cancelled transitions the job to CANCELLED state and mark it as completed

func Error

func Error(cx Connection, job *Job, userError error) (err error)

Error transitions the job to ERROR state and mark it as completed. If the error is retryable (and there are still attempts left), we bump the attempt count and transition it to PENDING to be picked up again by a worker.

func IsCancelled

func IsCancelled(cx Connection, job *Job) (b bool, err error)

IsCancelled checks the current job state searching for a cancelling state, if so returns true

func Reap

func Reap(cx Connection, queues []Queue) (_ int64, err error)

Reap reaps any zombie process, processes where state is 'running' but the job hasn't pinged in a while, in the given queues. It moves any job with remaining attempts back to the queue while dumping all others in to the errored state.

func Success

func Success(cx Connection, job *Job) (err error)

Success transitions the job to SUCCESS state and mark it as completed.

func WithKeepAlive

func WithKeepAlive(n time.Duration) func(*JobDescription)

WithKeepAlive sets the keepalive ping duration for the job.

func WithMaxRetries

func WithMaxRetries(n int) func(*JobDescription)

WithMaxRetries sets the maximum retry limit for the job.

func WithParameters

func WithParameters(params []byte) func(*JobDescription)

WithParameters is used to pass additional parameters / arguments to the job. Note that params must be a JSON-encoded value.

func WithPriority

func WithPriority(p int) func(*JobDescription)

WithPriority sets the job's priority.

func WithRetention

func WithRetention(dur time.Duration) func(*JobDescription)

WithRetention sets the retention policy for a job. A completed job will be cleaned up after its retention policy expires.

func WithTypeName

func WithTypeName(names []string) func(*DequeueFilters)

Types

type Connection

type Connection interface {
	// QueryContext executes a query that returns rows.
	QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error)

	// ExecContext executes a query that doesn't return rows.
	ExecContext(context.Context, string, ...interface{}) (sql.Result, error)
}

Connection is a utility abstraction over sql connection / pool / transaction objects. This is so that the semantics of the operation (should we roll back on error?) are defined by the caller of the routine and not decided by the routine itself.

This allows user to run Enqueue() (and other) operations in, say, a common transaction, with the rest of the business logic. It ensures that the job is only queued if the business logic finishes successfully and commits, else the Enqueue() is also rolled back.

type DequeueFilters

type DequeueFilters struct {
	// contains filtered or unexported fields
}

DequeueFilters are a set of optional filters that can be used with Dequeue()

type Handler

type Handler interface {

	// Process implements the job's logic to process the given job.
	//
	// It should return nil if the processing of the job is successful.
	//
	// If Process() returns an error or panics, the job is marked as failed.
	Process(ctx context.Context, job *Job) error
}

Handler is the user-provided implementation of the job's business logic.

type HandlerFunc

type HandlerFunc func(ctx context.Context, job *Job) error

func (HandlerFunc) Process

func (fn HandlerFunc) Process(ctx context.Context, job *Job) error

type Job

type Job struct {
	ID       uuid.UUID `db:"id"`
	Queue    Queue     `db:"queue"`
	TypeName string    `db:"typename"`
	Priority int       `db:"priority"`
	Status   JobState  `db:"status"`

	Parameters []byte `db:"parameters"`
	Result     []byte `db:"result"`

	CreatedAt   time.Time    `db:"created_at"`
	StartedAt   sql.NullTime `db:"started_at"`
	CompletedAt sql.NullTime `db:"completed_at"`

	RunAfter     time.Duration `db:"run_after"`
	RetentionTTL time.Duration `db:"retention_ttl"`
	LastQueuedAt sql.NullTime  `db:"last_queued_at"`

	KeepAlive     time.Duration `db:"keepalive_interval"`
	LastKeepAlive sql.NullTime  `db:"last_keepalive"`

	MaxRetries int `db:"max_retries"`
	Attempt    int `db:"attempt"`
	// contains filtered or unexported fields
}

Job represents an instance of a task / job in a queue.

func AttachLogger

func AttachLogger(be LogBackend, job *Job) *Job

AttachLogger attaches a new Logger to the given job, that logs to the provided backend.

func AttachPinger

func AttachPinger(cx Connection, job *Job) *Job

AttachPinger attaches a new Pinger to the given job.

func AttachResultWriter

func AttachResultWriter(cx Connection, job *Job) *Job

AttachResultWriter attaches a result writer to the given job, using the provided Connection as the backend to write to.

func Dequeue

func Dequeue(db *sql.DB, queues []Queue, filterFuncs ...func(*DequeueFilters)) (_ *Job, err error)

Dequeue dequeues a single job from one of the provided queues.

It takes into account several factors such as a queue's concurrency and priority settings as well a job's priority. It ensures that by executing this job, the queue's concurrency setting would not be violated.

func Enqueue

func Enqueue(cx Connection, queue Queue, desc *JobDescription) (_ *Job, err error)

Enqueue enqueues a new job in the given queue based on provided description.

The queue is created if it doesn't already exist. By default, a job is created in 'pending' state. It returns the newly created job instance.

func (*Job) Logger

func (job *Job) Logger() *Logger

Logger returns an instance of sqlq.Logger service that manages user-emitted logs for this job. A logger is only available when running in the context of a runtime. Trying to call Logger() in any other context would cause a panic().

func (*Job) Pinger

func (job *Job) Pinger() Pinger

Pinger returns an instance of sqlq.Pinger service that sends keepalive pings for the job. A pinger is only available when running in the context of a runtime. Trying to call Pinger() in any other context would cause a panic().

func (*Job) ResultWriter

func (job *Job) ResultWriter() io.WriteCloser

ResultWriter returns an io.WriteCloser that can be used to save the result of a job. User must close the writer to actually save the data. A result writer is only available when running in the context of a runtime. Trying to call ResultWriter() in any other context would cause a panic().

func (*Job) SendKeepAlive

func (job *Job) SendKeepAlive(ctx context.Context, d time.Duration) error

SendKeepAlive is a utility method that sends periodic keep-alive pings, every d duration.

This function starts an infinite for-loop that periodically sends ping using job.Pinger(). Call this function in a background goroutine, like:

go job.SendKeepAlive(ctx, job.KeepAlive)

To stop sending pings, cancel the context and the function will return.

type JobDescription

type JobDescription struct {
	// contains filtered or unexported fields
}

JobDescription describes a job to be enqueued. Note that it is just a set of options (that closely resembles) for a job, and not an actual instance of a job. It is used by Enqueue() to create a new Job.

func NewJobDesc

func NewJobDesc(typeName string, opts ...func(*JobDescription)) *JobDescription

NewJobDesc creates a new JobDescription for a job with given typename and with the provided opts.

type JobState

type JobState uint

JobState represents the status a job is in and serves as the basis of a job's state machine.

                   +-----------+
Enqueue()----------|  PENDING  |----------------+
                   +-----------+                |
                         |                      |
                         |                      |
                   +-----|-----+            +---|----+
                   |  RUNNING  |------------| Retry? |
                   +-----------+            +--------+
                         |                      |
                         |                      |
                         |                      |
                   +-----|-----+            +---|-----+
                   |  SUCCESS  |            | ERRORED |
                   +-----------+            +---------+

A job starts in the PENDING state when it is Enqueue()'d. A worker would than pick it up and transition it to RUNNING. If the job finishes without any error, it is moved to SUCCESS. If an error occurs and runtime determines that the job can be retried, it will move it back to the PENDING state, else it will move it to the ERRORED state.

const (
	StateInvalid    JobState = iota // invalid
	StatePending                    // pending
	StateRunning                    // running
	StateSuccess                    // success
	StateErrored                    // errored
	StateCancelling                 // cancelling
	StateCancelled                  // cancelled
)

func (*JobState) Scan

func (i *JobState) Scan(src interface{}) error

func (JobState) String

func (i JobState) String() string

func (JobState) Value

func (i JobState) Value() (driver.Value, error)

type LogBackend

type LogBackend interface {
	// Write writes the message at given level for the given job
	Write(job *Job, level LogLevel, msg string) (int, error)
}

LogBackend service provides the backend / sink implementation where the log messages are displayed and / or persisted.

type LogBackendAdapter

type LogBackendAdapter func(*Job, LogLevel, string) (int, error)

LogBackendAdapter is a utility type to use complying functions are log backends.

func (LogBackendAdapter) Write

func (fn LogBackendAdapter) Write(job *Job, level LogLevel, msg string) (int, error)

type LogLevel

type LogLevel int

LogLevel defines the logging levels used when sending log messages

const (
	// DebugLevel is used to log messages at debug level
	DebugLevel LogLevel = iota

	// InfoLevel is used to log messages at info level
	InfoLevel

	// WarnLevel is used to log messages at warn level
	WarnLevel

	// ErrorLevel is used to log messages at error level
	ErrorLevel
)

func (LogLevel) String

func (i LogLevel) String() string

type Logger

type Logger struct {
	// contains filtered or unexported fields
}

Logger is used to log messages from a job's handler to a given backend

func NewLogger

func NewLogger(job *Job, be LogBackend) *Logger

NewLogger returns a new Logger for the given Job, which write logs to the provided backend.

func (*Logger) Debug

func (log *Logger) Debug(msg string)

func (*Logger) Debugf

func (log *Logger) Debugf(msg string, args ...interface{})

func (*Logger) Error

func (log *Logger) Error(msg string)

func (*Logger) Errorf

func (log *Logger) Errorf(msg string, args ...interface{})

func (*Logger) Info

func (log *Logger) Info(msg string)

func (*Logger) Infof

func (log *Logger) Infof(msg string, args ...interface{})

func (*Logger) Warn

func (log *Logger) Warn(msg string)

func (*Logger) Warnf

func (log *Logger) Warnf(msg string, args ...interface{})

type Pinger

type Pinger interface {
	Ping(context.Context) error
}

type Queue

type Queue string

Queue represents a named group / queue where jobs can be pushed / enqueued.

Directories

Path Synopsis
runtime
embed
Package embed provides an embeddable runtime for sqlq workers.
Package embed provides an embeddable runtime for sqlq workers.
Package schema provides sql schema migrations for sqlq.
Package schema provides sql schema migrations for sqlq.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL