river

package module
v0.0.0-...-862f905 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 18, 2024 License: MPL-2.0 Imports: 37 Imported by: 0

Documentation

Overview

Package river is a robust high-performance job processing system for Go and Postgres.

See homepage, docs, and godoc.

Being built for Postgres, River encourages the use of the same database for application data and job queue. By enqueueing jobs transactionally along with other database changes, whole classes of distributed systems problems are avoided. Jobs are guaranteed to be enqueued if their transaction commits, are removed if their transaction rolls back, and aren't visible for work _until_ commit. See transactional enqueueing for more background on this philosophy.

Job args and workers

Jobs are defined in struct pairs, with an implementation of `JobArgs` and one of `Worker`.

Job args contain `json` annotations and define how jobs are serialized to and from the database, along with a "kind", a stable string that uniquely identifies the job.

type SortArgs struct {
	// Strings is a slice of strings to sort.
	Strings []string `json:"strings"`
}

func (SortArgs) Kind() string { return "sort" }

Workers expose a `Work` function that dictates how jobs run.

type SortWorker struct {
    // An embedded WorkerDefaults sets up default methods to fulfill the rest of
    // the Worker interface:
    river.WorkerDefaults[SortArgs]
}

func (w *SortWorker) Work(ctx context.Context, job *river.Job[SortArgs]) error {
    sort.Strings(job.Args.Strings)
    fmt.Printf("Sorted strings: %+v\n", job.Args.Strings)
    return nil
}

Registering workers

Jobs are uniquely identified by their "kind" string. Workers are registered on start up so that River knows how to assign jobs to workers:

workers := river.NewWorkers()
// AddWorker panics if the worker is already registered or invalid:
river.AddWorker(workers, &SortWorker{})

Starting a client

A River `Client` provides an interface for job insertion and manages job processing and maintenance services. A client's created with a database pool, driver, and config struct containing a `Workers` bundle and other settings. Here's a client `Client` working one queue (`"default"`) with up to 100 worker goroutines at a time:

riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{
    Queues: map[string]river.QueueConfig{
        river.QueueDefault: {MaxWorkers: 100},
    },
    Workers: workers,
})

if err != nil {
    panic(err)
}

// Run the client inline. All executed jobs will inherit from ctx:
if err := riverClient.Start(ctx); err != nil {
    panic(err)
}

## Stopping

The client should also be stopped on program shutdown:

// Stop fetching new work and wait for active jobs to finish.
if err := riverClient.Stop(ctx); err != nil {
    panic(err)
}

There are some complexities around ensuring clients stop cleanly, but also in a timely manner. See graceful shutdown for more details on River's stop modes.

Inserting jobs

`Client.InsertTx` is used in conjunction with an instance of job args to insert a job to work on a transaction:

_, err = riverClient.InsertTx(ctx, tx, SortArgs{
    Strings: []string{
        "whale", "tiger", "bear",
    },
}, nil)

if err != nil {
    panic(err)
}

See the `InsertAndWork` example for complete code.

Other features

Development

See developing River.

Example (BatchInsert)

Example_batchInsert demonstrates how many jobs can be inserted for work as part of a single operation.

package main

import (
	"context"
	"fmt"
	"log/slog"

	"github.com/jackc/pgx/v5/pgxpool"

	"weavelab.xyz/river"
	"weavelab.xyz/river/internal/riverinternaltest"
	"weavelab.xyz/river/internal/util/slogutil"
	"weavelab.xyz/river/riverdriver/riverpgxv5"
)

type BatchInsertArgs struct{}

func (BatchInsertArgs) Kind() string { return "batch_insert" }

// BatchInsertWorker is a job worker demonstrating use of custom
// job-specific insertion options.
type BatchInsertWorker struct {
	river.WorkerDefaults[BatchInsertArgs]
}

func (w *BatchInsertWorker) Work(ctx context.Context, job *river.Job[BatchInsertArgs]) error {
	fmt.Printf("Worked a job\n")
	return nil
}

// Example_batchInsert demonstrates how many jobs can be inserted for work as
// part of a single operation.
func main() {
	ctx := context.Background()

	dbPool, err := pgxpool.NewWithConfig(ctx, riverinternaltest.DatabaseConfig("river_testdb_example"))
	if err != nil {
		panic(err)
	}
	defer dbPool.Close()

	// Required for the purpose of this test, but not necessary in real usage.
	if err := riverinternaltest.TruncateRiverTables(ctx, dbPool); err != nil {
		panic(err)
	}

	workers := river.NewWorkers()
	river.AddWorker(workers, &BatchInsertWorker{})

	riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{
		Logger: slog.New(&slogutil.SlogMessageOnlyHandler{Level: slog.LevelWarn}),
		Queues: map[string]river.QueueConfig{
			river.QueueDefault: {MaxWorkers: 100},
		},
		Workers: workers,
	})
	if err != nil {
		panic(err)
	}

	// Out of example scope, but used to wait until a job is worked.
	subscribeChan, subscribeCancel := riverClient.Subscribe(river.EventKindJobCompleted)
	defer subscribeCancel()

	if err := riverClient.Start(ctx); err != nil {
		panic(err)
	}

	count, err := riverClient.InsertMany(ctx, []river.InsertManyParams{
		{Args: BatchInsertArgs{}},
		{Args: BatchInsertArgs{}},
		{Args: BatchInsertArgs{}},
		{Args: BatchInsertArgs{}, InsertOpts: &river.InsertOpts{Priority: 3}},
		{Args: BatchInsertArgs{}, InsertOpts: &river.InsertOpts{Priority: 4}},
	})
	if err != nil {
		panic(err)
	}
	fmt.Printf("Inserted %d jobs\n", count)

	waitForNJobs(subscribeChan, 5)

	if err := riverClient.Stop(ctx); err != nil {
		panic(err)
	}

}
Output:

Inserted 5 jobs
Worked a job
Worked a job
Worked a job
Worked a job
Worked a job
Example (CompleteJobWithinTx)

Example_completeJobWithinTx demonstrates how to transactionally complete a job alongside other database changes being made.

package main

import (
	"context"
	"fmt"
	"log/slog"

	"github.com/jackc/pgx/v5/pgxpool"

	"weavelab.xyz/river"
	"weavelab.xyz/river/internal/riverinternaltest"
	"weavelab.xyz/river/internal/util/slogutil"
	"weavelab.xyz/river/riverdriver/riverpgxv5"
)

type TransactionalArgs struct{}

func (TransactionalArgs) Kind() string { return "transactional_worker" }

// TransactionalWorker is a job worker which runs an operation on the database
// and transactionally completes the current job.
//
// While this example is simplified, any operations could be performed within
// the transaction such as inserting additional jobs or manipulating other data.
type TransactionalWorker struct {
	river.WorkerDefaults[TransactionalArgs]
	dbPool *pgxpool.Pool
}

func (w *TransactionalWorker) Work(ctx context.Context, job *river.Job[TransactionalArgs]) error {
	tx, err := w.dbPool.Begin(ctx)
	if err != nil {
		return err
	}
	defer tx.Rollback(ctx)

	var result int
	if err := tx.QueryRow(ctx, "SELECT 1").Scan(&result); err != nil {
		return err
	}

	// The function needs to know the type of the database driver in use by the
	// Client, but the other generic parameters can be inferred.
	jobAfter, err := river.JobCompleteTx[*riverpgxv5.Driver](ctx, tx, job)
	if err != nil {
		return err
	}
	fmt.Printf("Transitioned TransactionalWorker job from %q to %q\n", job.State, jobAfter.State)

	if err = tx.Commit(ctx); err != nil {
		return err
	}
	return nil
}

// Example_completeJobWithinTx demonstrates how to transactionally complete
// a job alongside other database changes being made.
func main() {
	ctx := context.Background()

	dbPool, err := pgxpool.NewWithConfig(ctx, riverinternaltest.DatabaseConfig("river_testdb_example"))
	if err != nil {
		panic(err)
	}
	defer dbPool.Close()

	// Required for the purpose of this test, but not necessary in real usage.
	if err := riverinternaltest.TruncateRiverTables(ctx, dbPool); err != nil {
		panic(err)
	}

	workers := river.NewWorkers()
	river.AddWorker(workers, &TransactionalWorker{dbPool: dbPool})
	river.AddWorker(workers, &SortWorker{})

	riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{
		Logger: slog.New(&slogutil.SlogMessageOnlyHandler{Level: slog.LevelWarn}),
		Queues: map[string]river.QueueConfig{
			river.QueueDefault: {MaxWorkers: 100},
		},
		Workers: workers,
	})
	if err != nil {
		panic(err)
	}

	// Not strictly needed, but used to help this test wait until job is worked.
	subscribeChan, subscribeCancel := riverClient.Subscribe(river.EventKindJobCompleted)
	defer subscribeCancel()

	if err := riverClient.Start(ctx); err != nil {
		panic(err)
	}

	if _, err = riverClient.Insert(ctx, TransactionalArgs{}, nil); err != nil {
		panic(err)
	}

	waitForNJobs(subscribeChan, 1)

	if err := riverClient.Stop(ctx); err != nil {
		panic(err)
	}

}
Output:

Transitioned TransactionalWorker job from "running" to "completed"
Example (CronJob)

Example_cronJob demonstrates how to create a cron job with a more complex schedule using a third party cron package to parse more elaborate crontab syntax.

package main

import (
	"context"
	"fmt"
	"log/slog"

	"github.com/jackc/pgx/v5/pgxpool"
	"github.com/robfig/cron/v3"

	"weavelab.xyz/river"
	"weavelab.xyz/river/internal/riverinternaltest"
	"weavelab.xyz/river/internal/util/slogutil"
	"weavelab.xyz/river/riverdriver/riverpgxv5"
)

type CronJobArgs struct{}

// Kind is the unique string name for this job.
func (CronJobArgs) Kind() string { return "cron" }

// CronJobWorker is a job worker for sorting strings.
type CronJobWorker struct {
	river.WorkerDefaults[CronJobArgs]
}

func (w *CronJobWorker) Work(ctx context.Context, job *river.Job[CronJobArgs]) error {
	fmt.Printf("This job will run once immediately then every hour on the half hour\n")
	return nil
}

// Example_cronJob demonstrates how to create a cron job with a more complex
// schedule using a third party cron package to parse more elaborate crontab
// syntax.
func main() {
	ctx := context.Background()

	dbPool, err := pgxpool.NewWithConfig(ctx, riverinternaltest.DatabaseConfig("river_testdb_example"))
	if err != nil {
		panic(err)
	}
	defer dbPool.Close()

	// Required for the purpose of this test, but not necessary in real usage.
	if err := riverinternaltest.TruncateRiverTables(ctx, dbPool); err != nil {
		panic(err)
	}

	workers := river.NewWorkers()
	river.AddWorker(workers, &CronJobWorker{})

	schedule, err := cron.ParseStandard("30 * * * *") // every hour on the half hour
	if err != nil {
		panic(err)
	}

	riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{
		Logger: slog.New(&slogutil.SlogMessageOnlyHandler{Level: slog.LevelWarn}),
		PeriodicJobs: []*river.PeriodicJob{
			river.NewPeriodicJob(
				schedule,
				func() (river.JobArgs, *river.InsertOpts) {
					return CronJobArgs{}, nil
				},
				&river.PeriodicJobOpts{RunOnStart: true},
			),
		},
		Queues: map[string]river.QueueConfig{
			river.QueueDefault: {MaxWorkers: 100},
		},
		Workers: workers,
	})
	if err != nil {
		panic(err)
	}

	// Out of example scope, but used to wait until a job is worked.
	subscribeChan, subscribeCancel := riverClient.Subscribe(river.EventKindJobCompleted)
	defer subscribeCancel()

	// There's no need to explicitly insert a periodic job. One will be inserted
	// (and worked soon after) as the client starts up.
	if err := riverClient.Start(ctx); err != nil {
		panic(err)
	}

	waitForNJobs(subscribeChan, 1)

	if err := riverClient.Stop(ctx); err != nil {
		panic(err)
	}

}
Output:

This job will run once immediately then every hour on the half hour
Example (CustomInsertOpts)

Example_customInsertOpts demonstrates the use of a job with custom job-specific insertion options.

package main

import (
	"context"
	"fmt"
	"log/slog"

	"github.com/jackc/pgx/v5/pgxpool"

	"weavelab.xyz/river"
	"weavelab.xyz/river/internal/riverinternaltest"
	"weavelab.xyz/river/internal/util/slogutil"
	"weavelab.xyz/river/riverdriver/riverpgxv5"
)

type AlwaysHighPriorityArgs struct{}

func (AlwaysHighPriorityArgs) Kind() string { return "always_high_priority" }

// InsertOpts returns custom insert options that every job of this type will
// inherit by default.
func (AlwaysHighPriorityArgs) InsertOpts() river.InsertOpts {
	return river.InsertOpts{
		Queue: "high_priority",
	}
}

// AlwaysHighPriorityWorker is a job worker demonstrating use of custom
// job-specific insertion options.
type AlwaysHighPriorityWorker struct {
	river.WorkerDefaults[AlwaysHighPriorityArgs]
}

func (w *AlwaysHighPriorityWorker) Work(ctx context.Context, job *river.Job[AlwaysHighPriorityArgs]) error {
	fmt.Printf("Ran in queue: %s\n", job.Queue)
	return nil
}

type SometimesHighPriorityArgs struct{}

func (SometimesHighPriorityArgs) Kind() string { return "sometimes_high_priority" }

// SometimesHighPriorityWorker is a job worker that's made high-priority
// sometimes through the use of options at insertion time.
type SometimesHighPriorityWorker struct {
	river.WorkerDefaults[SometimesHighPriorityArgs]
}

func (w *SometimesHighPriorityWorker) Work(ctx context.Context, job *river.Job[SometimesHighPriorityArgs]) error {
	fmt.Printf("Ran in queue: %s\n", job.Queue)
	return nil
}

// Example_customInsertOpts demonstrates the use of a job with custom
// job-specific insertion options.
func main() {
	ctx := context.Background()

	dbPool, err := pgxpool.NewWithConfig(ctx, riverinternaltest.DatabaseConfig("river_testdb_example"))
	if err != nil {
		panic(err)
	}
	defer dbPool.Close()

	// Required for the purpose of this test, but not necessary in real usage.
	if err := riverinternaltest.TruncateRiverTables(ctx, dbPool); err != nil {
		panic(err)
	}

	workers := river.NewWorkers()
	river.AddWorker(workers, &AlwaysHighPriorityWorker{})
	river.AddWorker(workers, &SometimesHighPriorityWorker{})

	riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{
		Logger: slog.New(&slogutil.SlogMessageOnlyHandler{Level: slog.LevelWarn}),
		Queues: map[string]river.QueueConfig{
			river.QueueDefault: {MaxWorkers: 100},
			"high_priority":    {MaxWorkers: 100},
		},
		Workers: workers,
	})
	if err != nil {
		panic(err)
	}

	// Out of example scope, but used to wait until a job is worked.
	subscribeChan, subscribeCancel := riverClient.Subscribe(river.EventKindJobCompleted)
	defer subscribeCancel()

	if err := riverClient.Start(ctx); err != nil {
		panic(err)
	}

	// This job always runs in the high-priority queue because its job-specific
	// options on the struct above dictate that it will.
	_, err = riverClient.Insert(ctx, AlwaysHighPriorityArgs{}, nil)
	if err != nil {
		panic(err)
	}

	// This job will run in the high-priority queue because of the options given
	// at insertion time.
	_, err = riverClient.Insert(ctx, SometimesHighPriorityArgs{}, &river.InsertOpts{
		Queue: "high_priority",
	})
	if err != nil {
		panic(err)
	}

	waitForNJobs(subscribeChan, 2)

	if err := riverClient.Stop(ctx); err != nil {
		panic(err)
	}

}
Output:

Ran in queue: high_priority
Ran in queue: high_priority
Example (ErrorHandler)

Example_errorHandler demonstrates how to use the ErrorHandler interface for custom application telemetry.

package main

import (
	"context"
	"errors"
	"fmt"
	"log/slog"

	"github.com/jackc/pgx/v5/pgxpool"

	"weavelab.xyz/river"
	"weavelab.xyz/river/internal/riverinternaltest"
	"weavelab.xyz/river/internal/util/slogutil"
	"weavelab.xyz/river/riverdriver/riverpgxv5"
	"weavelab.xyz/river/rivertype"
)

type CustomErrorHandler struct{}

func (*CustomErrorHandler) HandleError(ctx context.Context, job *rivertype.JobRow, err error) *river.ErrorHandlerResult {
	fmt.Printf("Job errored with: %s\n", err)
	return nil
}

func (*CustomErrorHandler) HandlePanic(ctx context.Context, job *rivertype.JobRow, panicVal any) *river.ErrorHandlerResult {
	fmt.Printf("Job panicked with: %v\n", panicVal)

	// Either function can also set the job to be immediately cancelled.
	return &river.ErrorHandlerResult{SetCancelled: true}
}

type ErroringArgs struct {
	ShouldError bool
	ShouldPanic bool
}

func (ErroringArgs) Kind() string { return "erroring" }

// Here to make sure our jobs are never accidentally retried which would add
// additional output and fail the example.
func (ErroringArgs) InsertOpts() river.InsertOpts {
	return river.InsertOpts{MaxAttempts: 1}
}

type ErroringWorker struct {
	river.WorkerDefaults[ErroringArgs]
}

func (w *ErroringWorker) Work(ctx context.Context, job *river.Job[ErroringArgs]) error {
	switch {
	case job.Args.ShouldError:
		return errors.New("this job errored")
	case job.Args.ShouldPanic:
		panic("this job panicked")
	}
	return nil
}

// Example_errorHandler demonstrates how to use the ErrorHandler interface for
// custom application telemetry.
func main() {
	ctx := context.Background()

	dbPool, err := pgxpool.NewWithConfig(ctx, riverinternaltest.DatabaseConfig("river_testdb_example"))
	if err != nil {
		panic(err)
	}
	defer dbPool.Close()

	// Required for the purpose of this test, but not necessary in real usage.
	if err := riverinternaltest.TruncateRiverTables(ctx, dbPool); err != nil {
		panic(err)
	}

	workers := river.NewWorkers()
	river.AddWorker(workers, &ErroringWorker{})

	riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{
		ErrorHandler: &CustomErrorHandler{},
		Logger:       slog.New(&slogutil.SlogMessageOnlyHandler{Level: 9}), // Suppress logging so example output is cleaner (9 > slog.LevelError).
		Queues: map[string]river.QueueConfig{
			river.QueueDefault: {MaxWorkers: 10},
		},
		Workers: workers,
	})
	if err != nil {
		panic(err)
	}

	// Not strictly needed, but used to help this test wait until job is worked.
	subscribeChan, subscribeCancel := riverClient.Subscribe(river.EventKindJobCancelled, river.EventKindJobFailed)
	defer subscribeCancel()

	if err := riverClient.Start(ctx); err != nil {
		panic(err)
	}

	if _, err = riverClient.Insert(ctx, ErroringArgs{ShouldError: true}, nil); err != nil {
		panic(err)
	}

	// Wait for the first job before inserting another to guarantee test output
	// is ordered correctly.
	waitForNJobs(subscribeChan, 1)

	if _, err = riverClient.Insert(ctx, ErroringArgs{ShouldPanic: true}, nil); err != nil {
		panic(err)
	}

	waitForNJobs(subscribeChan, 1)

	if err := riverClient.Stop(ctx); err != nil {
		panic(err)
	}

}
Output:

Job errored with: this job errored
Job panicked with: this job panicked
Example (GracefulShutdown)

Example_gracefulShutdown demonstrates a realistic-looking stop loop for River. It listens for SIGINT/SIGTERM (like might be received by a Ctrl+C locally or on a platform like Heroku to stop a process) and when received, tries a soft stop that waits for work to finish. If it doesn't finish in time, a second SIGINT/SIGTERM will initiate a hard stop that cancels all jobs using context cancellation. A third will give up on the stop procedure and exit uncleanly.

package main

import (
	"context"
	"errors"
	"fmt"
	"log/slog"
	"os"
	"os/signal"
	"syscall"
	"time"

	"github.com/jackc/pgx/v5/pgxpool"

	"weavelab.xyz/river"
	"weavelab.xyz/river/internal/riverinternaltest"
	"weavelab.xyz/river/internal/util/slogutil"
	"weavelab.xyz/river/riverdriver/riverpgxv5"
)

type WaitsForCancelOnlyArgs struct{}

func (WaitsForCancelOnlyArgs) Kind() string { return "waits_for_cancel_only" }

// WaitsForCancelOnlyWorker is a worker that will never finish jobs until its
// context is cancelled.
type WaitsForCancelOnlyWorker struct {
	river.WorkerDefaults[WaitsForCancelOnlyArgs]
	jobStarted chan struct{}
}

func (w *WaitsForCancelOnlyWorker) Work(ctx context.Context, job *river.Job[WaitsForCancelOnlyArgs]) error {
	fmt.Printf("Working job that doesn't finish until cancelled\n")
	close(w.jobStarted)

	<-ctx.Done()
	fmt.Printf("Job cancelled\n")

	// In the event of cancellation, an error should be returned so that the job
	// goes back in the retry queue.
	return ctx.Err()
}

// Example_gracefulShutdown demonstrates a realistic-looking stop loop for
// River. It listens for SIGINT/SIGTERM (like might be received by a Ctrl+C
// locally or on a platform like Heroku to stop a process) and when received,
// tries a soft stop that waits for work to finish. If it doesn't finish in
// time, a second SIGINT/SIGTERM will initiate a hard stop that cancels all jobs
// using context cancellation. A third will give up on the stop procedure and
// exit uncleanly.
func main() {
	ctx := context.Background()

	dbPool, err := pgxpool.NewWithConfig(ctx, riverinternaltest.DatabaseConfig("river_testdb_example"))
	if err != nil {
		panic(err)
	}
	defer dbPool.Close()

	// Required for the purpose of this test, but not necessary in real usage.
	if err := riverinternaltest.TruncateRiverTables(ctx, dbPool); err != nil {
		panic(err)
	}

	jobStarted := make(chan struct{})

	workers := river.NewWorkers()
	river.AddWorker(workers, &WaitsForCancelOnlyWorker{jobStarted: jobStarted})

	riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{
		Logger: slog.New(&slogutil.SlogMessageOnlyHandler{Level: slog.LevelWarn}),
		Queues: map[string]river.QueueConfig{
			river.QueueDefault: {MaxWorkers: 100},
		},
		Workers: workers,
	})
	if err != nil {
		panic(err)
	}

	_, err = riverClient.Insert(ctx, WaitsForCancelOnlyArgs{}, nil)
	if err != nil {
		panic(err)
	}

	if err := riverClient.Start(ctx); err != nil {
		panic(err)
	}

	sigintOrTerm := make(chan os.Signal, 1)
	signal.Notify(sigintOrTerm, syscall.SIGINT, syscall.SIGTERM)

	// This is meant to be a realistic-looking stop goroutine that might go in a
	// real program. It waits for SIGINT/SIGTERM and when received, tries to stop
	// gracefully by allowing a chance for jobs to finish. But if that isn't
	// working, a second SIGINT/SIGTERM will tell it to terminate with prejudice and
	// it'll issue a hard stop that cancels the context of all active jobs. In
	// case that doesn't work, a third SIGINT/SIGTERM ignores River's stop procedure
	// completely and exits uncleanly.
	go func() {
		<-sigintOrTerm
		fmt.Printf("Received SIGINT/SIGTERM; initiating soft stop (try to wait for jobs to finish)\n")

		softStopCtx, softStopCtxCancel := context.WithTimeout(ctx, 10*time.Second)
		defer softStopCtxCancel()

		go func() {
			select {
			case <-sigintOrTerm:
				fmt.Printf("Received SIGINT/SIGTERM again; initiating hard stop (cancel everything)\n")
				softStopCtxCancel()
			case <-softStopCtx.Done():
				fmt.Printf("Soft stop timeout; initiating hard stop (cancel everything)\n")
			}
		}()

		err := riverClient.Stop(softStopCtx)
		if err != nil && !errors.Is(err, context.DeadlineExceeded) && !errors.Is(err, context.Canceled) {
			panic(err)
		}
		if err == nil {
			fmt.Printf("Soft stop succeeded\n")
			return
		}

		hardStopCtx, hardStopCtxCancel := context.WithTimeout(ctx, 10*time.Second)
		defer hardStopCtxCancel()

		// As long as all jobs respect context cancellation, StopAndCancel will
		// always work. However, in the case of a bug where a job blocks despite
		// being cancelled, it may be necessary to either ignore River's stop
		// result (what's shown here) or have a supervisor kill the process.
		err = riverClient.StopAndCancel(hardStopCtx)
		if err != nil && errors.Is(err, context.DeadlineExceeded) {
			fmt.Printf("Hard stop timeout; ignoring stop procedure and exiting unsafely\n")
		} else if err != nil {
			panic(err)
		}

		// hard stop succeeded
	}()

	// Make sure our job starts being worked before doing anything else.
	<-jobStarted

	// Cheat a little by sending a SIGTERM manually for the purpose of this
	// example (normally this will be sent by user or supervisory process). The
	// first SIGTERM tries a soft stop in which jobs are given a chance to
	// finish up.
	sigintOrTerm <- syscall.SIGTERM

	// The soft stop will never work in this example because our job only
	// respects context cancellation, but wait a short amount of time to give it
	// a chance. After it elapses, send another SIGTERM to initiate a hard stop.
	select {
	case <-riverClient.Stopped():
		// Will never be reached in this example because our job will only ever
		// finish on context cancellation.
		fmt.Printf("Soft stop succeeded\n")

	case <-time.After(100 * time.Millisecond):
		sigintOrTerm <- syscall.SIGTERM
		<-riverClient.Stopped()
	}

}
Output:

Working job that doesn't finish until cancelled
Received SIGINT/SIGTERM; initiating soft stop (try to wait for jobs to finish)
Received SIGINT/SIGTERM again; initiating hard stop (cancel everything)
Job cancelled
jobExecutor: Job errored
Example (InsertAndWork)

Example_insertAndWork demonstrates how to register job workers, start a client, and insert a job on it to be worked.

package main

import (
	"context"
	"fmt"
	"log/slog"
	"sort"

	"github.com/jackc/pgx/v5/pgxpool"

	"weavelab.xyz/river"
	"weavelab.xyz/river/internal/riverinternaltest"
	"weavelab.xyz/river/internal/util/slogutil"
	"weavelab.xyz/river/riverdriver/riverpgxv5"
)

type SortArgs struct {
	// Strings is a slice of strings to sort.
	Strings []string `json:"strings"`
}

func (SortArgs) Kind() string { return "sort" }

type SortWorker struct {
	river.WorkerDefaults[SortArgs]
}

func (w *SortWorker) Work(ctx context.Context, job *river.Job[SortArgs]) error {
	sort.Strings(job.Args.Strings)
	fmt.Printf("Sorted strings: %+v\n", job.Args.Strings)
	return nil
}

// Example_insertAndWork demonstrates how to register job workers, start a
// client, and insert a job on it to be worked.
func main() {
	ctx := context.Background()

	dbPool, err := pgxpool.NewWithConfig(ctx, riverinternaltest.DatabaseConfig("river_testdb_example"))
	if err != nil {
		panic(err)
	}
	defer dbPool.Close()

	// Required for the purpose of this test, but not necessary in real usage.
	if err := riverinternaltest.TruncateRiverTables(ctx, dbPool); err != nil {
		panic(err)
	}

	workers := river.NewWorkers()
	river.AddWorker(workers, &SortWorker{})

	riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{
		Logger: slog.New(&slogutil.SlogMessageOnlyHandler{Level: slog.LevelWarn}),
		Queues: map[string]river.QueueConfig{
			river.QueueDefault: {MaxWorkers: 100},
		},
		Workers: workers,
	})
	if err != nil {
		panic(err)
	}

	// Out of example scope, but used to wait until a job is worked.
	subscribeChan, subscribeCancel := riverClient.Subscribe(river.EventKindJobCompleted)
	defer subscribeCancel()

	if err := riverClient.Start(ctx); err != nil {
		panic(err)
	}

	// Start a transaction to insert a job. It's also possible to insert a job
	// outside a transaction, but this usage is recommended to ensure that all
	// data a job needs to run is available by the time it starts. Because of
	// snapshot visibility guarantees across transactions, the job will not be
	// worked until the transaction has committed.
	tx, err := dbPool.Begin(ctx)
	if err != nil {
		panic(err)
	}
	defer tx.Rollback(ctx)

	_, err = riverClient.InsertTx(ctx, tx, SortArgs{
		Strings: []string{
			"whale", "tiger", "bear",
		},
	}, nil)
	if err != nil {
		panic(err)
	}

	if err := tx.Commit(ctx); err != nil {
		panic(err)
	}

	waitForNJobs(subscribeChan, 1)

	if err := riverClient.Stop(ctx); err != nil {
		panic(err)
	}

}
Output:

Sorted strings: [bear tiger whale]
Example (JobCancel)

Example_jobCancel demonstrates how to permanently cancel a job from within Work using JobCancel.

package main

import (
	"context"
	"errors"
	"fmt"
	"log/slog"

	"github.com/jackc/pgx/v5/pgxpool"

	"weavelab.xyz/river"
	"weavelab.xyz/river/internal/riverinternaltest"
	"weavelab.xyz/river/internal/util/slogutil"
	"weavelab.xyz/river/riverdriver/riverpgxv5"
)

type CancellingArgs struct {
	ShouldCancel bool
}

func (args CancellingArgs) Kind() string { return "Cancelling" }

type CancellingWorker struct {
	river.WorkerDefaults[CancellingArgs]
}

func (w *CancellingWorker) Work(ctx context.Context, job *river.Job[CancellingArgs]) error {
	if job.Args.ShouldCancel {
		fmt.Println("cancelling job")
		return river.JobCancel(errors.New("this wrapped error message will be persisted to DB"))
	}
	return nil
}

// Example_jobCancel demonstrates how to permanently cancel a job from within
// Work using JobCancel.
func main() { //nolint:dupl
	ctx := context.Background()

	dbPool, err := pgxpool.NewWithConfig(ctx, riverinternaltest.DatabaseConfig("river_testdb_example"))
	if err != nil {
		panic(err)
	}
	defer dbPool.Close()

	// Required for the purpose of this test, but not necessary in real usage.
	if err := riverinternaltest.TruncateRiverTables(ctx, dbPool); err != nil {
		panic(err)
	}

	workers := river.NewWorkers()
	river.AddWorker(workers, &CancellingWorker{})

	riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{
		Logger: slog.New(&slogutil.SlogMessageOnlyHandler{Level: slog.LevelWarn}),
		Queues: map[string]river.QueueConfig{
			river.QueueDefault: {MaxWorkers: 10},
		},
		Workers: workers,
	})
	if err != nil {
		panic(err)
	}

	// Not strictly needed, but used to help this test wait until job is worked.
	subscribeChan, subscribeCancel := riverClient.Subscribe(river.EventKindJobCancelled)
	defer subscribeCancel()

	if err := riverClient.Start(ctx); err != nil {
		panic(err)
	}
	if _, err = riverClient.Insert(ctx, CancellingArgs{ShouldCancel: true}, nil); err != nil {
		panic(err)
	}
	waitForNJobs(subscribeChan, 1)

	if err := riverClient.Stop(ctx); err != nil {
		panic(err)
	}

}
Output:

cancelling job
Example (JobCancelFromClient)

Example_jobCancelFromClient demonstrates how to permanently cancel a job from any Client using JobCancel.

package main

import (
	"context"
	"errors"
	"log/slog"
	"time"

	"github.com/jackc/pgx/v5/pgxpool"

	"weavelab.xyz/river"
	"weavelab.xyz/river/internal/riverinternaltest"
	"weavelab.xyz/river/internal/util/slogutil"
	"weavelab.xyz/river/riverdriver/riverpgxv5"
)

type SleepingArgs struct{}

func (args SleepingArgs) Kind() string { return "SleepingWorker" }

type SleepingWorker struct {
	river.WorkerDefaults[CancellingArgs]
	jobChan chan int64
}

func (w *SleepingWorker) Work(ctx context.Context, job *river.Job[CancellingArgs]) error {
	w.jobChan <- job.ID
	select {
	case <-ctx.Done():
	case <-time.After(5 * time.Second):
		return errors.New("sleeping worker timed out")
	}
	return ctx.Err()
}

// Example_jobCancelFromClient demonstrates how to permanently cancel a job from
// any Client using JobCancel.
func main() {
	ctx := context.Background()

	dbPool, err := pgxpool.NewWithConfig(ctx, riverinternaltest.DatabaseConfig("river_testdb_example"))
	if err != nil {
		panic(err)
	}
	defer dbPool.Close()

	// Required for the purpose of this test, but not necessary in real usage.
	if err := riverinternaltest.TruncateRiverTables(ctx, dbPool); err != nil {
		panic(err)
	}

	jobChan := make(chan int64)

	workers := river.NewWorkers()
	river.AddWorker(workers, &SleepingWorker{jobChan: jobChan})

	riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{
		Logger: slog.New(&slogutil.SlogMessageOnlyHandler{Level: slog.LevelWarn}),
		Queues: map[string]river.QueueConfig{
			river.QueueDefault: {MaxWorkers: 10},
		},
		Workers: workers,
	})
	if err != nil {
		panic(err)
	}

	// Not strictly needed, but used to help this test wait until job is worked.
	subscribeChan, subscribeCancel := riverClient.Subscribe(river.EventKindJobCancelled)
	defer subscribeCancel()

	if err := riverClient.Start(ctx); err != nil {
		panic(err)
	}
	job, err := riverClient.Insert(ctx, CancellingArgs{ShouldCancel: true}, nil)
	if err != nil {
		panic(err)
	}
	select {
	case <-jobChan:
	case <-time.After(2 * time.Second):
		panic("no jobChan signal received")
	}

	// There is presently no way to wait for the client to be 100% ready, so we
	// sleep for a bit to give it time to start up. This is only needed in this
	// example because we need the notifier to be ready for it to receive the
	// cancellation signal.
	time.Sleep(500 * time.Millisecond)

	if _, err = riverClient.JobCancel(ctx, job.ID); err != nil {
		panic(err)
	}
	waitForNJobs(subscribeChan, 1)

	if err := riverClient.Stop(ctx); err != nil {
		panic(err)
	}

}
Output:

jobExecutor: job cancelled remotely
Example (JobSnooze)

Example_jobSnooze demonstrates how to snooze a job from within Work using JobSnooze. The job will be run again after 5 minutes and the snooze attempt will increment the job's max attempts, ensuring that one can snooze as many times as desired.

package main

import (
	"context"
	"fmt"
	"log/slog"
	"time"

	"github.com/jackc/pgx/v5/pgxpool"

	"weavelab.xyz/river"
	"weavelab.xyz/river/internal/riverinternaltest"
	"weavelab.xyz/river/internal/util/slogutil"
	"weavelab.xyz/river/riverdriver/riverpgxv5"
)

type SnoozingArgs struct {
	ShouldSnooze bool
}

func (args SnoozingArgs) Kind() string { return "Snoozing" }

type SnoozingWorker struct {
	river.WorkerDefaults[SnoozingArgs]
}

func (w *SnoozingWorker) Work(ctx context.Context, job *river.Job[SnoozingArgs]) error {
	if job.Args.ShouldSnooze {
		fmt.Println("snoozing job for 5 minutes")
		return river.JobSnooze(5 * time.Minute)
	}
	return nil
}

// Example_jobSnooze demonstrates how to snooze a job from within Work using
// JobSnooze. The job will be run again after 5 minutes and the snooze attempt
// will increment the job's max attempts, ensuring that one can snooze as many
// times as desired.
func main() { //nolint:dupl
	ctx := context.Background()

	dbPool, err := pgxpool.NewWithConfig(ctx, riverinternaltest.DatabaseConfig("river_testdb_example"))
	if err != nil {
		panic(err)
	}
	defer dbPool.Close()

	// Required for the purpose of this test, but not necessary in real usage.
	if err := riverinternaltest.TruncateRiverTables(ctx, dbPool); err != nil {
		panic(err)
	}

	workers := river.NewWorkers()
	river.AddWorker(workers, &SnoozingWorker{})

	riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{
		Logger: slog.New(&slogutil.SlogMessageOnlyHandler{Level: slog.LevelWarn}),
		Queues: map[string]river.QueueConfig{
			river.QueueDefault: {MaxWorkers: 10},
		},
		Workers: workers,
	})
	if err != nil {
		panic(err)
	}

	// The subscription bits are not needed in real usage, but are used to make
	// sure the test waits until the job is worked.
	subscribeChan, subscribeCancel := riverClient.Subscribe(river.EventKindJobSnoozed)
	defer subscribeCancel()

	if err := riverClient.Start(ctx); err != nil {
		panic(err)
	}
	if _, err = riverClient.Insert(ctx, SnoozingArgs{ShouldSnooze: true}, nil); err != nil {
		panic(err)
	}
	waitForNJobs(subscribeChan, 1)

	if err := riverClient.Stop(ctx); err != nil {
		panic(err)
	}

}
Output:

snoozing job for 5 minutes
Example (PeriodicJob)

Example_periodicJob demonstrates the use of a periodic job.

package main

import (
	"context"
	"fmt"
	"log/slog"
	"time"

	"github.com/jackc/pgx/v5/pgxpool"

	"weavelab.xyz/river"
	"weavelab.xyz/river/internal/riverinternaltest"
	"weavelab.xyz/river/internal/util/slogutil"
	"weavelab.xyz/river/riverdriver/riverpgxv5"
)

type PeriodicJobArgs struct{}

// Kind is the unique string name for this job.
func (PeriodicJobArgs) Kind() string { return "periodic" }

// PeriodicJobWorker is a job worker for sorting strings.
type PeriodicJobWorker struct {
	river.WorkerDefaults[PeriodicJobArgs]
}

func (w *PeriodicJobWorker) Work(ctx context.Context, job *river.Job[PeriodicJobArgs]) error {
	fmt.Printf("This job will run once immediately then approximately once every 15 minutes\n")
	return nil
}

// Example_periodicJob demonstrates the use of a periodic job.
func main() {
	ctx := context.Background()

	dbPool, err := pgxpool.NewWithConfig(ctx, riverinternaltest.DatabaseConfig("river_testdb_example"))
	if err != nil {
		panic(err)
	}
	defer dbPool.Close()

	// Required for the purpose of this test, but not necessary in real usage.
	if err := riverinternaltest.TruncateRiverTables(ctx, dbPool); err != nil {
		panic(err)
	}

	workers := river.NewWorkers()
	river.AddWorker(workers, &PeriodicJobWorker{})

	riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{
		Logger: slog.New(&slogutil.SlogMessageOnlyHandler{Level: slog.LevelWarn}),
		PeriodicJobs: []*river.PeriodicJob{
			river.NewPeriodicJob(
				river.PeriodicInterval(15*time.Minute),
				func() (river.JobArgs, *river.InsertOpts) {
					return PeriodicJobArgs{}, nil
				},
				&river.PeriodicJobOpts{RunOnStart: true},
			),
		},
		Queues: map[string]river.QueueConfig{
			river.QueueDefault: {MaxWorkers: 100},
		},
		Workers: workers,
	})
	if err != nil {
		panic(err)
	}

	// Out of example scope, but used to wait until a job is worked.
	subscribeChan, subscribeCancel := riverClient.Subscribe(river.EventKindJobCompleted)
	defer subscribeCancel()

	// There's no need to explicitly insert a periodic job. One will be inserted
	// (and worked soon after) as the client starts up.
	if err := riverClient.Start(ctx); err != nil {
		panic(err)
	}

	waitForNJobs(subscribeChan, 1)

	if err := riverClient.Stop(ctx); err != nil {
		panic(err)
	}

}
Output:

This job will run once immediately then approximately once every 15 minutes
Example (ScheduledJob)

Example_scheduledJob demonstrates how to schedule a job to be worked in the future.

package main

import (
	"context"
	"fmt"
	"log/slog"
	"time"

	"github.com/jackc/pgx/v5/pgxpool"

	"weavelab.xyz/river"
	"weavelab.xyz/river/internal/riverinternaltest"
	"weavelab.xyz/river/internal/util/slogutil"
	"weavelab.xyz/river/riverdriver/riverpgxv5"
)

type ScheduledArgs struct {
	Message string `json:"message"`
}

func (ScheduledArgs) Kind() string { return "scheduled" }

type ScheduledWorker struct {
	river.WorkerDefaults[ScheduledArgs]
}

func (w *ScheduledWorker) Work(ctx context.Context, job *river.Job[ScheduledArgs]) error {
	fmt.Printf("Message: %s\n", job.Args.Message)
	return nil
}

// Example_scheduledJob demonstrates how to schedule a job to be worked in the
// future.
func main() {
	ctx := context.Background()

	dbPool, err := pgxpool.NewWithConfig(ctx, riverinternaltest.DatabaseConfig("river_testdb_example"))
	if err != nil {
		panic(err)
	}
	defer dbPool.Close()

	// Required for the purpose of this test, but not necessary in real usage.
	if err := riverinternaltest.TruncateRiverTables(ctx, dbPool); err != nil {
		panic(err)
	}

	workers := river.NewWorkers()
	river.AddWorker(workers, &ScheduledWorker{})

	riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{
		Logger: slog.New(&slogutil.SlogMessageOnlyHandler{Level: slog.LevelWarn}),
		Queues: map[string]river.QueueConfig{
			river.QueueDefault: {MaxWorkers: 100},
		},
		Workers: workers,
	})
	if err != nil {
		panic(err)
	}

	if err := riverClient.Start(ctx); err != nil {
		panic(err)
	}

	_, err = riverClient.Insert(ctx,
		ScheduledArgs{
			Message: "hello from the future",
		},
		&river.InsertOpts{
			// Schedule the job to be worked in three hours.
			ScheduledAt: time.Now().Add(3 * time.Hour),
		})
	if err != nil {
		panic(err)
	}

	// Unlike most other examples, we don't wait for the job to be worked since
	// doing so would require making the job's scheduled time contrived, and the
	// example therefore less realistic/useful.

	if err := riverClient.Stop(ctx); err != nil {
		panic(err)
	}

}
Output:

Example (Subscription)

Example_subscription demonstrates the use of client subscriptions to receive events containing information about worked jobs.

package main

import (
	"context"
	"errors"
	"fmt"
	"time"
	"weavelab.xyz/monorail/shared/wlib/wlog"

	"github.com/jackc/pgx/v5/pgxpool"

	"weavelab.xyz/river"
	"weavelab.xyz/river/internal/rivercommon"
	"weavelab.xyz/river/internal/riverinternaltest"
	"weavelab.xyz/river/riverdriver/riverpgxv5"
)

type SubscriptionArgs struct {
	Cancel bool `json:"cancel"`
	Fail   bool `json:"fail"`
}

func (SubscriptionArgs) Kind() string { return "subscription" }

type SubscriptionWorker struct {
	river.WorkerDefaults[SubscriptionArgs]
}

func (w *SubscriptionWorker) Work(ctx context.Context, job *river.Job[SubscriptionArgs]) error {
	switch {
	case job.Args.Cancel:
		return river.JobCancel(errors.New("cancelling job"))
	case job.Args.Fail:
		return errors.New("failing job")
	}
	return nil
}

// Example_subscription demonstrates the use of client subscriptions to receive
// events containing information about worked jobs.
func main() {
	ctx := context.Background()

	dbPool, err := pgxpool.NewWithConfig(ctx, riverinternaltest.DatabaseConfig("river_testdb_example"))
	if err != nil {
		panic(err)
	}
	defer dbPool.Close()

	// Required for the purpose of this test, but not necessary in real usage.
	if err := riverinternaltest.TruncateRiverTables(ctx, dbPool); err != nil {
		panic(err)
	}

	workers := river.NewWorkers()
	river.AddWorker(workers, &SubscriptionWorker{})

	riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{
		Logger: wlog.Logger(), // Suppress logging so example output is cleaner (9 > slog.LevelError).
		Queues: map[string]river.QueueConfig{
			river.QueueDefault: {MaxWorkers: 100},
		},
		Workers: workers,
	})
	if err != nil {
		panic(err)
	}

	// Subscribers tell the River client the kinds of events they'd like to receive.
	completedChan, completedSubscribeCancel := riverClient.Subscribe(river.EventKindJobCompleted)
	defer completedSubscribeCancel()

	// Multiple simultaneous subscriptions are allowed.
	failedChan, failedSubscribeCancel := riverClient.Subscribe(river.EventKindJobFailed)
	defer failedSubscribeCancel()

	otherChan, otherSubscribeCancel := riverClient.Subscribe(river.EventKindJobCancelled, river.EventKindJobSnoozed)
	defer otherSubscribeCancel()

	if err := riverClient.Start(ctx); err != nil {
		panic(err)
	}

	// Insert one job for each subscription above: one to succeed, one to fail,
	// and one that's cancelled that'll arrive on the "other" channel.
	_, err = riverClient.Insert(ctx, SubscriptionArgs{}, nil)
	if err != nil {
		panic(err)
	}
	_, err = riverClient.Insert(ctx, SubscriptionArgs{Fail: true}, nil)
	if err != nil {
		panic(err)
	}
	_, err = riverClient.Insert(ctx, SubscriptionArgs{Cancel: true}, nil)
	if err != nil {
		panic(err)
	}

	waitForJob := func(subscribeChan <-chan *river.Event) {
		select {
		case event := <-subscribeChan:
			if event == nil {
				fmt.Printf("Channel is closed\n")
				return
			}

			fmt.Printf("Got job with state: %s\n", event.Job.State)
		case <-time.After(rivercommon.WaitTimeout()):
			panic("timed out waiting for job")
		}
	}

	waitForJob(completedChan)
	waitForJob(failedChan)
	waitForJob(otherChan)

	if err := riverClient.Stop(ctx); err != nil {
		panic(err)
	}

	fmt.Printf("Client stopped\n")

	// Try waiting again, but none of these work because stopping the client
	// closed all subscription channels automatically.
	waitForJob(completedChan)
	waitForJob(failedChan)
	waitForJob(otherChan)

}
Output:

Got job with state: completed
Got job with state: available
Got job with state: cancelled
Client stopped
Channel is closed
Channel is closed
Channel is closed
Example (UniqueJob)

Example_uniqueJob demonstrates the use of a job with custom job-specific insertion options.

package main

import (
	"context"
	"fmt"
	"log/slog"
	"time"

	"github.com/jackc/pgx/v5/pgxpool"

	"weavelab.xyz/river"
	"weavelab.xyz/river/internal/riverinternaltest"
	"weavelab.xyz/river/internal/util/slogutil"
	"weavelab.xyz/river/riverdriver/riverpgxv5"
)

// Account represents a minimal account including recent expenditures and a
// remaining total.
type Account struct {
	RecentExpenditures int
	AccountTotal       int
}

// Map of account ID -> account.
var allAccounts = map[int]Account{ //nolint:gochecknoglobals
	1: {RecentExpenditures: 100, AccountTotal: 1_000},
	2: {RecentExpenditures: 999, AccountTotal: 1_000},
}

type ReconcileAccountArgs struct {
	AccountID int `json:"account_id"`
}

func (ReconcileAccountArgs) Kind() string { return "reconcile_account" }

// InsertOpts returns custom insert options that every job of this type will
// inherit, including unique options.
func (ReconcileAccountArgs) InsertOpts() river.InsertOpts {
	return river.InsertOpts{
		UniqueOpts: river.UniqueOpts{
			ByArgs:   true,
			ByPeriod: 24 * time.Hour,
		},
	}
}

type ReconcileAccountWorker struct {
	river.WorkerDefaults[ReconcileAccountArgs]
}

func (w *ReconcileAccountWorker) Work(ctx context.Context, job *river.Job[ReconcileAccountArgs]) error {
	account := allAccounts[job.Args.AccountID]

	account.AccountTotal -= account.RecentExpenditures
	account.RecentExpenditures = 0

	fmt.Printf("Reconciled account %d; new total: %d\n", job.Args.AccountID, account.AccountTotal)

	return nil
}

// Example_uniqueJob demonstrates the use of a job with custom
// job-specific insertion options.
func main() {
	ctx := context.Background()

	dbPool, err := pgxpool.NewWithConfig(ctx, riverinternaltest.DatabaseConfig("river_testdb_example"))
	if err != nil {
		panic(err)
	}
	defer dbPool.Close()

	// Required for the purpose of this test, but not necessary in real usage.
	if err := riverinternaltest.TruncateRiverTables(ctx, dbPool); err != nil {
		panic(err)
	}

	workers := river.NewWorkers()
	river.AddWorker(workers, &ReconcileAccountWorker{})

	riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{
		Logger: slog.New(&slogutil.SlogMessageOnlyHandler{Level: slog.LevelWarn}),
		Queues: map[string]river.QueueConfig{
			river.QueueDefault: {MaxWorkers: 100},
		},
		Workers: workers,
	})
	if err != nil {
		panic(err)
	}

	// Out of example scope, but used to wait until a job is worked.
	subscribeChan, subscribeCancel := riverClient.Subscribe(river.EventKindJobCompleted)
	defer subscribeCancel()

	if err := riverClient.Start(ctx); err != nil {
		panic(err)
	}

	// First job insertion for account 1.
	_, err = riverClient.Insert(ctx, ReconcileAccountArgs{AccountID: 1}, nil)
	if err != nil {
		panic(err)
	}

	// Job is inserted a second time, but it doesn't matter because its unique
	// args cause the insertion to be skipped because it's meant to only run
	// once per account per 24 hour period.
	_, err = riverClient.Insert(ctx, ReconcileAccountArgs{AccountID: 1}, nil)
	if err != nil {
		panic(err)
	}

	// Cheat a little by waiting for the first job to come back so we can
	// guarantee that this example's output comes out in order.
	waitForNJobs(subscribeChan, 1)

	// Because the job is unique ByArgs, another job for account 2 is allowed.
	_, err = riverClient.Insert(ctx, ReconcileAccountArgs{AccountID: 2}, nil)
	if err != nil {
		panic(err)
	}

	waitForNJobs(subscribeChan, 1)

	if err := riverClient.Stop(ctx); err != nil {
		panic(err)
	}

}
Output:

Reconciled account 1; new total: 900
Reconciled account 2; new total: 1
Example (WorkFunc)

Example_workFunc demonstrates the use of river.WorkFunc, which can be used to easily add a worker with only a function instead of having to implement a full worker struct.

package main

import (
	"context"
	"fmt"
	"log/slog"

	"github.com/jackc/pgx/v5/pgxpool"

	"weavelab.xyz/river"
	"weavelab.xyz/river/internal/riverinternaltest"
	"weavelab.xyz/river/internal/util/slogutil"
	"weavelab.xyz/river/riverdriver/riverpgxv5"
)

type WorkFuncArgs struct {
	Message string `json:"message"`
}

func (WorkFuncArgs) Kind() string { return "work_func" }

// Example_workFunc demonstrates the use of river.WorkFunc, which can be used to
// easily add a worker with only a function instead of having to implement a
// full worker struct.
func main() {
	ctx := context.Background()

	dbPool, err := pgxpool.NewWithConfig(ctx, riverinternaltest.DatabaseConfig("river_testdb_example"))
	if err != nil {
		panic(err)
	}
	defer dbPool.Close()

	// Required for the purpose of this test, but not necessary in real usage.
	if err := riverinternaltest.TruncateRiverTables(ctx, dbPool); err != nil {
		panic(err)
	}

	workers := river.NewWorkers()
	river.AddWorker(workers, river.WorkFunc(func(ctx context.Context, job *river.Job[WorkFuncArgs]) error {
		fmt.Printf("Message: %s", job.Args.Message)
		return nil
	}))

	riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{
		Logger: slog.New(&slogutil.SlogMessageOnlyHandler{Level: slog.LevelWarn}),
		Queues: map[string]river.QueueConfig{
			river.QueueDefault: {MaxWorkers: 100},
		},
		Workers: workers,
	})
	if err != nil {
		panic(err)
	}

	// Out of example scope, but used to wait until a job is worked.
	subscribeChan, subscribeCancel := riverClient.Subscribe(river.EventKindJobCompleted)
	defer subscribeCancel()

	if err := riverClient.Start(ctx); err != nil {
		panic(err)
	}

	_, err = riverClient.Insert(ctx, WorkFuncArgs{
		Message: "hello from a function!",
	}, nil)
	if err != nil {
		panic(err)
	}

	waitForNJobs(subscribeChan, 1)

	if err := riverClient.Stop(ctx); err != nil {
		panic(err)
	}

}
Output:

Message: hello from a function!

Index

Examples

Constants

View Source
const (
	FetchCooldownDefault = 100 * time.Millisecond
	FetchCooldownMin     = 1 * time.Millisecond

	FetchPollIntervalDefault = 1 * time.Second
	FetchPollIntervalMin     = 1 * time.Millisecond

	JobTimeoutDefault  = 1 * time.Minute
	MaxAttemptsDefault = rivercommon.MaxAttemptsDefault
	PriorityDefault    = rivercommon.PriorityDefault
	QueueDefault       = rivercommon.QueueDefault
	QueueNumWorkersMax = 10_000
)
View Source
const (
	JobStateAvailable = rivertype.JobStateAvailable
	JobStateCancelled = rivertype.JobStateCancelled
	JobStateCompleted = rivertype.JobStateCompleted
	JobStateDiscarded = rivertype.JobStateDiscarded
	JobStateRetryable = rivertype.JobStateRetryable
	JobStateRunning   = rivertype.JobStateRunning
	JobStateScheduled = rivertype.JobStateScheduled
)

Variables

View Source
var ErrJobCancelledRemotely = JobCancel(errors.New("job cancelled remotely"))
View Source
var (
	// ErrNotFound is returned when a query by ID does not match any existing
	// rows. For example, attempting to cancel a job that doesn't exist will
	// return this error.
	ErrNotFound = errors.New("not found")
)

Functions

func AddWorker

func AddWorker[T JobArgs](workers *Workers, worker Worker[T])

AddWorker registers a Worker on the provided Workers bundle. Each Worker must be registered so that the Client knows it should handle a specific kind of job (as returned by its `Kind()` method).

Use by explicitly specifying a JobArgs type and then passing an instance of a worker for the same type:

river.AddWorker(workers, &SortWorker{})

Note that AddWorker can panic in some situations, such as if the worker is already registered or if its configuration is otherwise invalid. This default probably makes sense for most applications because you wouldn't want to start an application with invalid hardcoded runtime configuration. If you want to avoid panics, use AddWorkerSafely instead.

func AddWorkerSafely

func AddWorkerSafely[T JobArgs](workers *Workers, worker Worker[T]) error

AddWorkerSafely registers a worker on the provided Workers bundle. Unlike AddWorker, AddWorkerSafely does not panic and instead returns an error if the worker is already registered or if its configuration is invalid.

Use by explicitly specifying a JobArgs type and then passing an instance of a worker for the same type:

river.AddWorkerSafely[SortArgs](workers, &SortWorker{}).

func JobCancel

func JobCancel(err error) error

JobCancel wraps err and can be returned from a Worker's Work method to cancel the job at the end of execution. Regardless of whether or not the job has any remaining attempts, this will ensure the job does not execute again.

func JobSnooze

func JobSnooze(duration time.Duration) error

JobSnooze can be returned from a Worker's Work method to cause the job to be tried again after the specified duration. This also has the effect of incrementing the job's MaxAttempts by 1, meaning that jobs can be repeatedly snoozed without ever being discarded.

Panics if duration is < 0.

Types

type Client

type Client[TTx any] struct {
	// contains filtered or unexported fields
}

Client is a single isolated instance of River. Your application may use multiple instances operating on different databases or Postgres schemas within a single database.

func ClientFromContext

func ClientFromContext[TTx any](ctx context.Context) *Client[TTx]

ClientFromContext returns the Client from the context. This function can only be used within a Worker's Work() method because that is the only place River sets the Client on the context.

It panics if the context does not contain a Client, which will never happen from the context provided to a Worker's Work() method.

func ClientFromContextSafely

func ClientFromContextSafely[TTx any](ctx context.Context) (*Client[TTx], error)

ClientFromContext returns the Client from the context. This function can only be used within a Worker's Work() method because that is the only place River sets the Client on the context.

It returns an error if the context does not contain a Client, which will never happen from the context provided to a Worker's Work() method.

func NewClient

func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client[TTx], error)

NewClient creates a new Client with the given database driver and configuration.

Currently only one driver is supported, which is Pgx v5. See package riverpgxv5.

The function takes a generic parameter TTx representing a transaction type, but it can be omitted because it'll generally always be inferred from the driver. For example:

import "weavelab.xyz/river"
import "weavelab.xyz/river/riverdriver/riverpgxv5"

...

dbPool, err := pgxpool.New(ctx, os.Getenv("DATABASE_URL"))
if err != nil {
	// handle error
}
defer dbPool.Close()

riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{
	...
})
if err != nil {
	// handle error
}

func (*Client[TTx]) Insert

func (c *Client[TTx]) Insert(ctx context.Context, args JobArgs, opts *InsertOpts) (*rivertype.JobRow, error)

Insert inserts a new job with the provided args. Job opts can be used to override any defaults that may have been provided by an implementation of JobArgsWithInsertOpts.InsertOpts, as well as any global defaults. The provided context is used for the underlying Postgres insert and can be used to cancel the operation or apply a timeout.

jobRow, err := client.Insert(insertCtx, MyArgs{}, nil)
if err != nil {
	// handle error
}

func (*Client[TTx]) InsertMany

func (c *Client[TTx]) InsertMany(ctx context.Context, params []InsertManyParams) (int64, error)

InsertMany inserts many jobs at once using Postgres' `COPY FROM` mechanism, making the operation quite fast and memory efficient. Each job is inserted as an InsertManyParams tuple, which takes job args along with an optional set of insert options, which override insert options provided by an JobArgsWithInsertOpts.InsertOpts implementation or any client-level defaults. The provided context is used for the underlying Postgres inserts and can be used to cancel the operation or apply a timeout.

count, err := client.InsertMany(ctx, []river.InsertManyParams{
	{Args: BatchInsertArgs{}},
	{Args: BatchInsertArgs{}, InsertOpts: &river.InsertOpts{Priority: 3}},
})
if err != nil {
	// handle error
}

func (*Client[TTx]) InsertManyTx

func (c *Client[TTx]) InsertManyTx(ctx context.Context, tx TTx, params []InsertManyParams) (int64, error)

InsertManyTx inserts many jobs at once using Postgres' `COPY FROM` mechanism, making the operation quite fast and memory efficient. Each job is inserted as an InsertManyParams tuple, which takes job args along with an optional set of insert options, which override insert options provided by an JobArgsWithInsertOpts.InsertOpts implementation or any client-level defaults. The provided context is used for the underlying Postgres inserts and can be used to cancel the operation or apply a timeout.

count, err := client.InsertManyTx(ctx, tx, []river.InsertManyParams{
	{Args: BatchInsertArgs{}},
	{Args: BatchInsertArgs{}, InsertOpts: &river.InsertOpts{Priority: 3}},
})
if err != nil {
	// handle error
}

This variant lets a caller insert jobs atomically alongside other database changes. An inserted job isn't visible to be worked until the transaction commits, and if the transaction rolls back, so too is the inserted job.

func (*Client[TTx]) InsertTx

func (c *Client[TTx]) InsertTx(ctx context.Context, tx TTx, args JobArgs, opts *InsertOpts) (*rivertype.JobRow, error)

InsertTx inserts a new job with the provided args on the given transaction. Job opts can be used to override any defaults that may have been provided by an implementation of JobArgsWithInsertOpts.InsertOpts, as well as any global defaults. The provided context is used for the underlying Postgres insert and can be used to cancel the operation or apply a timeout.

jobRow, err := client.InsertTx(insertCtx, tx, MyArgs{}, nil)
if err != nil {
	// handle error
}

This variant lets a caller insert jobs atomically alongside other database changes. An inserted job isn't visible to be worked until the transaction commits, and if the transaction rolls back, so too is the inserted job.

func (*Client[TTx]) JobCancel

func (c *Client[TTx]) JobCancel(ctx context.Context, jobID int64) (*rivertype.JobRow, error)

JobCancel cancels the job with the given ID. If possible, the job is cancelled immediately and will not be retried. The provided context is used for the underlying Postgres update and can be used to cancel the operation or apply a timeout.

If the job is still in the queue (available, scheduled, or retryable), it is immediately marked as cancelled and will not be retried.

If the job is already finalized (cancelled, completed, or discarded), no changes are made.

If the job is currently running, it is not immediately cancelled, but is instead marked for cancellation. The client running the job will also be notified (via LISTEN/NOTIFY) to cancel the running job's context. Although the job's context will be cancelled, since Go does not provide a mechanism to interrupt a running goroutine the job will continue running until it returns. As always, it is important for workers to respect context cancellation and return promptly when the job context is done.

Once the cancellation signal is received by the client running the job, any error returned by that job will result in it being cancelled permanently and not retried. However if the job returns no error, it will be completed as usual.

In the event the running job finishes executing _before_ the cancellation signal is received but _after_ this update was made, the behavior depends on which state the job is being transitioned into (based on its return error):

  • If the job completed successfully, was cancelled from within, or was discarded due to exceeding its max attempts, the job will be updated as usual.
  • If the job was snoozed to run again later or encountered a retryable error, the job will be marked as cancelled and will not be attempted again.

Returns the up-to-date JobRow for the specified jobID if it exists. Returns ErrNotFound if the job doesn't exist.

func (*Client[TTx]) JobCancelTx

func (c *Client[TTx]) JobCancelTx(ctx context.Context, tx TTx, jobID int64) (*rivertype.JobRow, error)

JobCancelTx cancels the job with the given ID within the specified transaction. This variant lets a caller cancel a job atomically alongside other database changes. An cancelled job doesn't take effect until the transaction commits, and if the transaction rolls back, so too is the cancelled job.

If possible, the job is cancelled immediately and will not be retried. The provided context is used for the underlying Postgres update and can be used to cancel the operation or apply a timeout.

If the job is still in the queue (available, scheduled, or retryable), it is immediately marked as cancelled and will not be retried.

If the job is already finalized (cancelled, completed, or discarded), no changes are made.

If the job is currently running, it is not immediately cancelled, but is instead marked for cancellation. The client running the job will also be notified (via LISTEN/NOTIFY) to cancel the running job's context. Although the job's context will be cancelled, since Go does not provide a mechanism to interrupt a running goroutine the job will continue running until it returns. As always, it is important for workers to respect context cancellation and return promptly when the job context is done.

Once the cancellation signal is received by the client running the job, any error returned by that job will result in it being cancelled permanently and not retried. However if the job returns no error, it will be completed as usual.

In the event the running job finishes executing _before_ the cancellation signal is received but _after_ this update was made, the behavior depends on which state the job is being transitioned into (based on its return error):

  • If the job completed successfully, was cancelled from within, or was discarded due to exceeding its max attempts, the job will be updated as usual.
  • If the job was snoozed to run again later or encountered a retryable error, the job will be marked as cancelled and will not be attempted again.

Returns the up-to-date JobRow for the specified jobID if it exists. Returns ErrNotFound if the job doesn't exist.

func (*Client[TTx]) JobList

func (c *Client[TTx]) JobList(ctx context.Context, params *JobListParams) ([]*rivertype.JobRow, error)

JobList returns a paginated list of jobs matching the provided filters. The provided context is used for the underlying Postgres query and can be used to cancel the operation or apply a timeout.

params := river.NewJobListParams().WithLimit(10).State(river.JobStateCompleted)
jobRows, err := client.JobList(ctx, params)
if err != nil {
	// handle error
}

func (*Client[TTx]) JobListTx

func (c *Client[TTx]) JobListTx(ctx context.Context, tx TTx, params *JobListParams) ([]*rivertype.JobRow, error)

JobListTx returns a paginated list of jobs matching the provided filters. The provided context is used for the underlying Postgres query and can be used to cancel the operation or apply a timeout.

params := river.NewJobListParams().WithLimit(10).State(river.JobStateCompleted)
jobRows, err := client.JobListTx(ctx, tx, params)
if err != nil {
	// handle error
}

func (*Client[TTx]) Start

func (c *Client[TTx]) Start(ctx context.Context) error

Start starts the client's job fetching and working loops. Once this is called, the client will run in a background goroutine until stopped. All jobs are run with a context inheriting from the provided context, but with a timeout deadline applied based on the job's settings.

A graceful shutdown stops fetching new jobs but allows any previously fetched jobs to complete. This can be initiated with the Stop method.

A more abrupt shutdown can be achieved by either cancelling the provided context or by calling StopAndCancel. This will not only stop fetching new jobs, but will also cancel the context for any currently-running jobs. If using StopAndCancel, there's no need to also call Stop.

func (*Client[TTx]) Stop

func (c *Client[TTx]) Stop(ctx context.Context) error

Stop performs a graceful shutdown of the Client. It signals all producers to stop fetching new jobs and waits for any fetched or in-progress jobs to complete before exiting. If the provided context is done before shutdown has completed, Stop will return immediately with the context's error.

There's no need to call this method if a hard stop has already been initiated by cancelling the context passed to Start or by calling StopAndCancel.

func (*Client[TTx]) StopAndCancel

func (c *Client[TTx]) StopAndCancel(ctx context.Context) error

StopAndCancel shuts down the client and cancels all work in progress. It is a more aggressive stop than Stop because the contexts for any in-progress jobs are cancelled. However, it still waits for jobs to complete before returning, even though their contexts are cancelled. If the provided context is done before shutdown has completed, Stop will return immediately with the context's error.

This can also be initiated by cancelling the context passed to Run. There is no need to call this method if the context passed to Run is cancelled instead.

func (*Client[TTx]) Stopped

func (c *Client[TTx]) Stopped() <-chan struct{}

Stopped returns a channel that will be closed when the Client has stopped. It can be used to wait for a graceful shutdown to complete.

It is not affected by any contexts passed to Stop or StopAndCancel.

func (*Client[TTx]) Subscribe

func (c *Client[TTx]) Subscribe(kinds ...EventKind) (<-chan *Event, func())

Subscribe subscribes to the provided kinds of events that occur within the client, like EventKindJobCompleted for when a job completes.

Returns a channel over which to receive events along with a cancel function that can be used to cancel and tear down resources associated with the subscription. It's recommended but not necessary to invoke the cancel function. Resources will be freed when the client stops in case it's not.

The event channel is buffered and sends on it are non-blocking. Consumers must process events in a timely manner or it's possible for events to be dropped. Any slow operations performed in a response to a receipt (e.g. persisting to a database) should be made asynchronous to avoid event loss.

Callers must specify the kinds of events they're interested in. This allows for forward compatibility in case new kinds of events are added in future versions. If new event kinds are added, callers will have to explicitly add them to their requested list and ensure they can be handled correctly.

type ClientRetryPolicy

type ClientRetryPolicy interface {
	// NextRetry calculates when the next retry for a failed job should take place
	// given when it was last attempted and its number of attempts, or any other
	// of the job's properties a user-configured retry policy might want to
	// consider.
	NextRetry(job *rivertype.JobRow) time.Time
}

ClientRetryPolicy is an interface that can be implemented to provide a retry policy for how River deals with failed jobs at the client level (when a worker does not define an override for `NextRetry`). Jobs are scheduled to be retried in the future up until they've reached the job's max attempts, at which pointed they're set as discarded.

The ClientRetryPolicy does not have access to generics and operates on the raw JobRow struct with encoded args.

type Config

type Config struct {
	// AdvisoryLockPrefix is a configurable 32-bit prefix that River will use
	// when generating any key to acquire a Postgres advisory lock. All advisory
	// locks share the same 64-bit number space, so this allows a calling
	// application to guarantee that a River advisory lock will never conflict
	// with one of its own by cordoning each type to its own prefix.
	//
	// If this value isn't set, River defaults to generating key hashes across
	// the entire 64-bit advisory lock number space, which is large enough that
	// conflicts are exceedingly unlikely. If callers don't strictly need this
	// option then it's recommended to leave it unset because the prefix leaves
	// only 32 bits of number space for advisory lock hashes, so it makes
	// internally conflicting River-generated keys more likely.
	AdvisoryLockPrefix int32

	// CancelledJobRetentionPeriod is the amount of time to keep cancelled jobs
	// around before they're removed permanently.
	//
	// Defaults to 24 hours.
	CancelledJobRetentionPeriod time.Duration

	// CompletedJobRetentionPeriod is the amount of time to keep completed jobs
	// around before they're removed permanently.
	//
	// Defaults to 24 hours.
	CompletedJobRetentionPeriod time.Duration

	// DiscardedJobRetentionPeriod is the amount of time to keep discarded jobs
	// around before they're removed permanently.
	//
	// Defaults to 7 days.
	DiscardedJobRetentionPeriod time.Duration

	// ErrorHandler can be configured to be invoked in case of an error or panic
	// occurring in a job. This is often useful for logging and exception
	// tracking, but can also be used to customize retry behavior.
	ErrorHandler ErrorHandler

	// FetchCooldown is the minimum amount of time to wait between fetches of new
	// jobs. Jobs will only be fetched *at most* this often, but if no new jobs
	// are coming in via LISTEN/NOTIFY then feches may be delayed as long as
	// FetchPollInterval.
	//
	// Throughput is limited by this value.
	//
	// Defaults to 100 ms.
	FetchCooldown time.Duration

	// FetchPollInterval is the amount of time between periodic fetches for new
	// jobs. Typically new jobs will be picked up ~immediately after insert via
	// LISTEN/NOTIFY, but this provides a fallback.
	//
	// Defaults to 1 second.
	FetchPollInterval time.Duration

	// JobTimeout is the maximum amount of time a job is allowed to run before its
	// context is cancelled. A timeout of zero means JobTimeoutDefault will be
	// used, whereas a value of -1 means the job's context will not be cancelled
	// unless the Client is shutting down.
	//
	// Defaults to 1 minute.
	JobTimeout time.Duration

	// Logger is the structured logger to use for logging purposes. If none is
	// specified, logs will be emitted to STDOUT with messages at warn level
	// or higher.
	Logger *wlog.WLogger

	// PeriodicJobs are a set of periodic jobs to run at the specified intervals
	// in the client.
	PeriodicJobs []*PeriodicJob

	// Queues is a list of queue names for this client to operate on along with
	// configuration for the queue like the maximum number of workers to run for
	// each queue.
	//
	// This field may be omitted for a program that's only queueing jobs rather
	// than working them. If it's specified, then Workers must also be given.
	Queues map[string]QueueConfig

	// ReindexerSchedule is the schedule for running the reindexer. If nil, the
	// reindexer will run at midnight UTC every day.
	ReindexerSchedule PeriodicSchedule

	// RescueStuckJobsAfter is the amount of time a job can be running before it
	// is considered stuck. A stuck job which has not yet reached its max attempts
	// will be scheduled for a retry, while one which has exhausted its attempts
	// will be discarded.  This prevents jobs from being stuck forever if a worker
	// crashes or is killed.
	//
	// Note that this can result in repeat or duplicate execution of a job that is
	// not actually stuck but is still working. The value should be set higher
	// than the maximum duration you expect your jobs to run. Setting a value too
	// low will result in more duplicate executions, whereas too high of a value
	// will result in jobs being stuck for longer than necessary before they are
	// retried.
	//
	// RescueStuckJobsAfter must be greater than JobTimeout. Otherwise, jobs
	// would become eligible for rescue while they're still running.
	//
	// Defaults to 1 hour, or in cases where JobTimeout has been configured and
	// is greater than 1 hour, JobTimeout + 1 hour.
	RescueStuckJobsAfter time.Duration

	// RetryPolicy is a configurable retry policy for the client.
	//
	// Defaults to DefaultRetryPolicy.
	RetryPolicy ClientRetryPolicy

	// Workers is a bundle of registered job workers.
	//
	// This field may be omitted for a program that's only enqueueing jobs
	// rather than working them, but if it is configured the client can validate
	// ahead of time that a worker is properly registered for an inserted job.
	// (i.e.  That it wasn't forgotten by accident.)
	Workers *Workers
	// contains filtered or unexported fields
}

Config is the configuration for a Client.

type DefaultClientRetryPolicy

type DefaultClientRetryPolicy struct {
	// contains filtered or unexported fields
}

River's default retry policy.

func (*DefaultClientRetryPolicy) NextRetry

func (p *DefaultClientRetryPolicy) NextRetry(job *rivertype.JobRow) time.Time

NextRetry gets the next retry given for the given job, accounting for when it was last attempted and what attempt number that was. Reschedules using a basic exponential backoff of `ATTEMPT^4`, so after the first failure a new try will be scheduled in 1 seconds, 16 seconds after the second, 1 minute and 21 seconds after the third, etc.

In order to avoid penalizing jobs that are snoozed, the number of errors is used instead of the attempt count. This means that snoozing a job (even repeatedly) will not lead to a future error having a longer than expected retry delay.

type ErrorHandler

type ErrorHandler interface {
	// HandleError is invoked in case of an error occurring in a job.
	//
	// Context is descended from the one used to start the River client that
	// worked the job.
	HandleError(ctx context.Context, job *rivertype.JobRow, err error) *ErrorHandlerResult

	// HandlePanic is invoked in case of a panic occurring in a job.
	//
	// Context is descended from the one used to start the River client that
	// worked the job.
	HandlePanic(ctx context.Context, job *rivertype.JobRow, panicVal any) *ErrorHandlerResult
}

ErrorHandler provides an interface that will be invoked in case of an error or panic occurring in the job. This is often useful for logging and exception tracking, but can also be used to customize retry behavior.

type ErrorHandlerResult

type ErrorHandlerResult struct {
	// SetCancelled can be set to true to fail the job immediately and
	// permanently. By default it'll continue to follow the configured retry
	// schedule.
	SetCancelled bool
}

type Event

type Event struct {
	// Kind is the kind of event. Receivers should read this field and respond
	// accordingly. Subscriptions will only receive event kinds that they
	// requested when creating a subscription with Subscribe.
	Kind EventKind

	// Job contains job-related information.
	Job *rivertype.JobRow

	// JobStats are statistics about the run of a job.
	JobStats *JobStatistics
}

Event wraps an event that occurred within a River client, like a job being completed.

type EventKind

type EventKind string

EventKind is a kind of event to subscribe to from a client.

const (
	// EventKindJobCancelled occurs when a job is cancelled.
	EventKindJobCancelled EventKind = "job_cancelled"

	// EventKindJobCompleted occurs when a job is completed.
	EventKindJobCompleted EventKind = "job_completed"

	// EventKindJobFailed occurs when a job fails. Occurs both when a job fails
	// and will be retried and when a job fails for the last time and will be
	// discarded. Callers can use job fields like `Attempt` and `State` to
	// differentiate each type of occurrence.
	EventKindJobFailed EventKind = "job_failed"

	// EventKindJobSnoozed occurs when a job is snoozed.
	EventKindJobSnoozed EventKind = "job_snoozed"
)

type InsertManyParams

type InsertManyParams struct {
	// Args are the arguments of the job to insert.
	Args JobArgs

	// InsertOpts are insertion options for this job.
	InsertOpts *InsertOpts
}

InsertManyParams encapsulates a single job combined with insert options for use with batch insertion.

type InsertOpts

type InsertOpts struct {
	// MaxAttempts is the maximum number of total attempts (including both the
	// original run and all retries) before a job is abandoned and set as
	// discarded.
	MaxAttempts int

	// Metadata is a JSON object blob of arbitrary data that will be stored with
	// the job. Users should not overwrite or remove anything stored in this
	// field by River.
	Metadata []byte

	// Priority is the priority of the job, with 1 being the highest priority and
	// 4 being the lowest. When fetching available jobs to work, the highest
	// priority jobs will always be fetched before any lower priority jobs are
	// fetched. Note that if your workers are swamped with more high-priority jobs
	// then they can handle, lower priority jobs may not be fetched.
	//
	// Defaults to PriorityDefault.
	Priority int

	// Queue is the name of the job queue in which to insert the job.
	//
	// Defaults to QueueDefault.
	Queue string

	// ScheduledAt is a time in future at which to schedule the job (i.e. in
	// cases where it shouldn't be run immediately). The job is guaranteed not
	// to run before this time, but may run slightly after depending on the
	// number of other scheduled jobs and how busy the queue is.
	//
	// Use of this option generally only makes sense when passing options into
	// Insert rather than when a job args struct is implementing
	// JobArgsWithInsertOpts, however, it will work in both cases.
	ScheduledAt time.Time

	// Tags are an arbitrary list of keywords to add to the job. They have no
	// functional behavior and are meant entirely as a user-specified construct
	// to help group and categorize jobs.
	//
	// If tags are specified from both a job args override and from options on
	// Insert, the latter takes precedence. Tags are not merged.
	Tags []string

	// UniqueOpts returns options relating to job uniqueness. An empty struct
	// avoids setting any worker-level unique options.
	UniqueOpts UniqueOpts
}

InsertOpts are optional settings for a new job which can be provided at job insertion time. These will override any default InsertOpts settings provided by JobArgsWithInsertOpts, as well as any global defaults.

type Job

type Job[T JobArgs] struct {
	*rivertype.JobRow

	// Args are the arguments for the job.
	Args T
}

Job represents a single unit of work, holding both the arguments and information for a job with args of type T.

func JobCompleteTx

func JobCompleteTx[TDriver riverdriver.Driver[TTx], TTx any, TArgs JobArgs](ctx context.Context, tx TTx, job *Job[TArgs]) (*Job[TArgs], error)

JobCompleteTx marks the job as completed as part of transaction tx. If tx is rolled back, the completion will be as well.

The function needs to know the type of the River database driver, which is the same as the one in use by Client, but the other generic parameters can be inferred. An invocation should generally look like:

_, err := river.JobCompleteTx[*riverpgxv5.Driver](ctx, tx, job)
if err != nil {
	// handle error
}

Returns the updated, completed job.

type JobArgs

type JobArgs interface {
	// Kind is a string that uniquely identifies the type of job. This must be
	// provided on your job arguments struct.
	Kind() string
}

JobArgs is an interface that represents the arguments for a job of type T. These arguments are serialized into JSON and stored in the database.

type JobArgsWithInsertOpts

type JobArgsWithInsertOpts interface {
	// InsertOpts returns options for all jobs of this job type, overriding any
	// system defaults. These can also be overridden at insertion time.
	InsertOpts() InsertOpts
}

JobArgsWithInsertOpts is an extra interface that a job may implement on top of JobArgs to provide insertion-time options for all jobs of this type.

type JobListCursor

type JobListCursor struct {
	// contains filtered or unexported fields
}

JobListCursor is used to specify a starting point for a paginated job list query.

func JobListCursorFromJob

func JobListCursorFromJob(job *rivertype.JobRow) *JobListCursor

JobListCursorFromJob creates a JobListCursor from a JobRow.

func (JobListCursor) MarshalText

func (c JobListCursor) MarshalText() ([]byte, error)

MarshalText implements encoding.TextMarshaler to encode the cursor as an opaque string.

func (*JobListCursor) UnmarshalText

func (c *JobListCursor) UnmarshalText(text []byte) error

UnmarshalText implements encoding.TextUnmarshaler to decode the cursor from a previously marshaled string.

type JobListOrderByField

type JobListOrderByField int

JobListOrderByField specifies the field to sort by.

const (
	// JobListOrderByTime specifies that the sort should be by time. The specific
	// time field used will vary by job state.
	JobListOrderByTime JobListOrderByField = iota
)

type JobListParams

type JobListParams struct {
	// contains filtered or unexported fields
}

JobListParams specifies the parameters for a JobList query. It must be initialized with NewJobListParams. Params can be built by chaining methods on the JobListParams object:

params := NewJobListParams().OrderBy(JobListOrderByTime, SortOrderAsc).First(100)

func NewJobListParams

func NewJobListParams() *JobListParams

NewJobListParams creates a new JobListParams to return available jobs sorted by time in ascending order, returning 100 jobs at most.

func (*JobListParams) After

func (p *JobListParams) After(cursor *JobListCursor) *JobListParams

After returns an updated filter set that will only return jobs after the given cursor.

func (*JobListParams) First

func (p *JobListParams) First(count int) *JobListParams

First returns an updated filter set that will only return the first count jobs.

Count must be between 1 and 10000, inclusive, or this will panic.

func (*JobListParams) Metadata

func (p *JobListParams) Metadata(json string) *JobListParams

func (*JobListParams) OrderBy

func (p *JobListParams) OrderBy(field JobListOrderByField, direction SortOrder) *JobListParams

OrderBy returns an updated filter set that will sort the results using the specified field and direction.

func (*JobListParams) Queues

func (p *JobListParams) Queues(queues ...string) *JobListParams

Queues returns an updated filter set that will only return jobs from the given queues.

func (*JobListParams) State

func (p *JobListParams) State(state rivertype.JobState) *JobListParams

State returns an updated filter set that will only return jobs in the given state.

type JobStatistics

type JobStatistics struct {
	CompleteDuration  time.Duration // Time it took to set the job completed, discarded, or errored.
	QueueWaitDuration time.Duration // Time the job spent waiting in available state before starting execution.
	RunDuration       time.Duration // Time job spent running (measured around job worker.)
}

JobStatistics contains information about a single execution of a job.

type PeriodicJob

type PeriodicJob struct {
	// contains filtered or unexported fields
}

PeriodicJob is a configuration for a periodic job.

func NewPeriodicJob

func NewPeriodicJob(scheduleFunc PeriodicSchedule, constructorFunc PeriodicJobConstructor, opts *PeriodicJobOpts) *PeriodicJob

NewPeriodicJob returns a new PeriodicJob given a schedule and a constructor function.

The schedule returns a time until the next time the periodic job should run. The helper PeriodicInterval is available for jobs that should run on simple, fixed intervals (e.g. every 15 minutes), and a custom schedule or third party cron package can be used for more complex scheduling (see the cron example). The constructor function is invoked each time a periodic job's schedule elapses, returning job arguments to insert along with optional insertion options.

The periodic job scheduler is approximate and doesn't guarantee strong durability. It's started by the elected leader in a River cluster, and each periodic job is assigned an initial run time when that occurs. New run times are scheduled each time a job's target run time is reached and a new job inserted. However, each scheduler only retains in-memory state, so anytime a process quits or a new leader is elected, the whole process starts over without regard for the state of the last scheduler. The RunOnStart option can be used as a hedge to make sure that jobs with long run durations are guaranteed to occasionally run.

type PeriodicJobConstructor

type PeriodicJobConstructor func() (JobArgs, *InsertOpts)

PeriodicJobConstructor is a function that gets called each time the paired PeriodicSchedule is triggered.

A constructor must never block. It may return nil to indicate that no job should be inserted.

type PeriodicJobOpts

type PeriodicJobOpts struct {
	// RunOnStart can be used to indicate that a periodic job should insert an
	// initial job as a new scheduler is started. This can be used as a hedge
	// for jobs with longer scheduled durations that may not get to expiry
	// before a new scheduler is elected.
	RunOnStart bool
}

PeriodicJobOpts are options for a periodic job.

type PeriodicSchedule

type PeriodicSchedule interface {
	// Next returns the next time at which the job should be run given the
	// current time.
	Next(current time.Time) time.Time
}

PeriodicSchedule is a schedule for a periodic job. Periodic jobs should generally have an interval of at least 1 minute, and never less than one second.

func PeriodicInterval

func PeriodicInterval(interval time.Duration) PeriodicSchedule

PeriodicInterval returns a simple PeriodicSchedule that runs at the given interval.

type QueueConfig

type QueueConfig struct {
	// MaxWorkers is the maximum number of workers to run for the queue, or put
	// otherwise, the maximum parallelism to run.
	//
	// This is the maximum number of workers within this particular client
	// instance, but note that it doesn't control the total number of workers
	// across parallel processes. Installations will want to calculate their
	// total number by multiplying this number by the number of parallel nodes
	// running River clients configured to the same database and queue.
	//
	// Requires a minimum of 1, and a maximum of 10,000.
	MaxWorkers int
}

QueueConfig contains queue-specific configuration.

type SortOrder

type SortOrder int

SortOrder specifies the direction of a sort.

const (
	// SortOrderAsc specifies that the sort should in ascending order.
	SortOrderAsc SortOrder = iota
	// SortOrderDesc specifies that the sort should in descending order.
	SortOrderDesc
)

type UniqueOpts

type UniqueOpts struct {
	// ByArgs indicates that uniqueness should be enforced for any specific
	// instance of encoded args for a job.
	//
	// Default is false, meaning that as long as any other unique property is
	// enabled, uniqueness will be enforced for a kind regardless of input args.
	ByArgs bool

	// ByPeriod defines uniqueness within a given period. On an insert time is
	// rounded down to the nearest multiple of the given period, and a job is
	// only inserted if there isn't an existing job that will run between then
	// and the next multiple of the period.
	//
	// Default is no unique period, meaning that as long as any other unique
	// property is enabled, uniqueness will be enforced across all jobs of the
	// kind in the database, regardless of when they were scheduled.
	ByPeriod time.Duration

	// ByQueue indicates that uniqueness should be enforced within each queue.
	//
	// Default is false, meaning that as long as any other unique property is
	// enabled, uniqueness will be enforced for a kind across all queues.
	ByQueue bool

	// ByState indicates that uniqueness should be enforced across any of the
	// states in the given set. For example, if the given states were
	// `(scheduled, running)` then a new job could be inserted even if one of
	// the same kind was already being worked by the queue (new jobs are
	// inserted as `available`).
	//
	// Unlike other unique options, ByState gets a default when it's not set for
	// user convenience. The default is equivalent to:
	//
	// 	ByState: []rivertype.JobState{rivertype.JobStateAvailable, rivertype.JobStateCompleted, rivertype.JobStateRunning, rivertype.JobStateRetryable, rivertype.JobStateScheduled}
	//
	// With this setting, any jobs of the same kind that have been completed or
	// discarded, but not yet cleaned out by the system, won't count towards the
	// uniqueness of a new insert.
	ByState []rivertype.JobState
}

UniqueOpts contains parameters for uniqueness for a job.

When the options struct is uninitialized (its zero value) no uniqueness at is enforced. As each property is initialized, it's added as a dimension on the uniqueness matrix, and with any property on, the job's kind always counts toward uniqueness.

So for example, if only ByQueue is on, then for the given job kind, only a single instance is allowed in any given queue, regardless of other properties on the job. If both ByArgs and ByQueue are on, then for the given job kind, a single instance is allowed for each combination of args and queues. If either args or queue is changed on a new job, it's allowed to be inserted as a new job.

Uniquenes is checked at insert time by taking a Postgres advisory lock, doing a look up for an equivalent row, and inserting only if none was found. There's no database-level mechanism that guarantees jobs stay unique, so if an equivalent row is inserted out of band (or batch inserted, where a unique check doesn't occur), it's conceivable that duplicates could coexist.

type UnknownJobKindError

type UnknownJobKindError struct {
	// Kind is the string that was returned by the JobArgs Kind method.
	Kind string
}

UnknownJobKindError is returned when a Client fetches and attempts to work a job that has not been registered on the Client's Workers bundle (using AddWorker).

func (*UnknownJobKindError) Error

func (e *UnknownJobKindError) Error() string

Error returns the error string.

func (*UnknownJobKindError) Is

func (e *UnknownJobKindError) Is(target error) bool

Is implements the interface used by errors.Is to determine if errors are equivalent. It returns true for any other UnknownJobKindError without regard to the Kind string so it is possible to detect this type of error with:

errors.Is(err, &UnknownJobKindError{})

type Worker

type Worker[T JobArgs] interface {
	// NextRetry calculates when the next retry for a failed job should take
	// place given when it was last attempted and its number of attempts, or any
	// other of the job's properties a user-configured retry policy might want
	// to consider.
	//
	// Note that this method on a worker overrides any client-level retry policy.
	// To use the client-level retry policy, return an empty `time.Time{}` or
	// include WorkerDefaults to do this for you.
	NextRetry(job *Job[T]) time.Time

	// Timeout is the maximum amount of time the job is allowed to run before
	// its context is cancelled. A timeout of zero (the default) means the job
	// will inherit the Client-level timeout. A timeout of -1 means the job's
	// context will never time out.
	Timeout(job *Job[T]) time.Duration

	// Work performs the job and returns an error if the job failed. The context
	// will be configured with a timeout according to the worker settings and may
	// be cancelled for other reasons.
	//
	// If no error is returned, the job is assumed to have succeeded and will be
	// marked completed.
	//
	// It is important for any worker to respect context cancellation to enable
	// the client to respond to shutdown requests; there is no way to cancel a
	// running job that does not respect context cancellation, other than
	// terminating the process.
	Work(ctx context.Context, job *Job[T]) error
}

Worker is an interface that can perform a job with args of type T. A typical Worker implementation will be a struct that embeds WorkerDefaults, implements `Kind()` and `Work()`, and optionally overrides other methods to provide job-specific configuration for all jobs of that type:

type SleepArgs struct {
	Duration time.Duration `json:"duration"`
}

func (SleepArgs) Kind() string { return "sleep" }

type SleepWorker struct {
	WorkerDefaults[SleepArgs]
}

func (w *SleepWorker) Work(ctx context.Context, job *Job[SleepArgs]) error {
	select {
	case <-ctx.Done():
		return ctx.Err()
	case <-time.After(job.Args.Duration):
		return nil
	}
}

In addition to fulfilling the Worker interface, workers must be registered with the client using the AddWorker function.

func WorkFunc

func WorkFunc[T JobArgs](f func(context.Context, *Job[T]) error) Worker[T]

WorkFunc wraps a function to implement the Worker interface. A job args struct implementing JobArgs will still be required to specify a Kind.

For example:

river.AddWorker(workers, river.WorkFunc(func(ctx context.Context, job *river.Job[WorkFuncArgs]) error {
	fmt.Printf("Message: %s", job.Args.Message)
	return nil
}))

type WorkerDefaults

type WorkerDefaults[T JobArgs] struct{}

WorkerDefaults is an empty struct that can be embedded in your worker struct to make it fulfill the Worker interface with default values.

func (WorkerDefaults[T]) NextRetry

func (w WorkerDefaults[T]) NextRetry(*Job[T]) time.Time

NextRetry returns an empty time.Time{} to avoid setting any job or Worker-specific overrides on the next retry time. This means that the Client-level retry policy schedule will be used instead.

func (WorkerDefaults[T]) Timeout

func (w WorkerDefaults[T]) Timeout(*Job[T]) time.Duration

Timeout returns the job-specific timeout. Override this method to set a job-specific timeout, otherwise the Client-level timeout will be applied.

type Workers

type Workers struct {
	// contains filtered or unexported fields
}

Workers is a list of available job workers. A Worker must be registered for each type of Job to be handled.

Use the top-level AddWorker function combined with a Workers to register a worker.

func NewWorkers

func NewWorkers() *Workers

NewWorkers initializes a new registry of available job workers.

Use the top-level AddWorker function combined with a Workers registry to register each available worker.

Directories

Path Synopsis
cmd
river Module
internal
baseservice
Package baseservice contains structs and initialization functions for "service-like" objects that provide commonly needed facilities so that they don't have to be redefined on every struct.
Package baseservice contains structs and initialization functions for "service-like" objects that provide commonly needed facilities so that they don't have to be redefined on every struct.
cmd/update-submodule-versions
Package main provides a command to help bump the versions of River's internal dependencies in the `go.mod` files of submodules across the project.
Package main provides a command to help bump the versions of River's internal dependencies in the `go.mod` files of submodules across the project.
riverinternaltest
Package riverinternaltest contains shared testing utilities for tests throughout the rest of the project.
Package riverinternaltest contains shared testing utilities for tests throughout the rest of the project.
util/maputil
Package maputil contains helpers related to maps, usually ones that are generic-related.
Package maputil contains helpers related to maps, usually ones that are generic-related.
util/sliceutil
Package sliceutil contains helpers related to slices, usually ones that are generic-related, and are broadly useful, but which the Go core team, in its infinite wisdom, has decided are too much power for the unwashed mashes, and therefore omitted from the utilities in `slices`.
Package sliceutil contains helpers related to slices, usually ones that are generic-related, and are broadly useful, but which the Go core team, in its infinite wisdom, has decided are too much power for the unwashed mashes, and therefore omitted from the utilities in `slices`.
riverdriver module
riverpgxv5 Module
Package rivermigrate provides a Go API for running migrations as alternative to migrating via the bundled CLI.
Package rivermigrate provides a Go API for running migrations as alternative to migrating via the bundled CLI.
Package rivertest contains test assertions that can be used in a project's tests to verify that certain actions occurred from the main river package.
Package rivertest contains test assertions that can be used in a project's tests to verify that certain actions occurred from the main river package.
Package rivertype stores some of the lowest level River primitives so they can be shared amongst a number of packages including the top-level river package, database drivers, and internal utilities.
Package rivertype stores some of the lowest level River primitives so they can be shared amongst a number of packages including the top-level river package, database drivers, and internal utilities.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL