amqpw

package module
v0.0.0-...-f5c3624 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 30, 2022 License: MIT Imports: 10 Imported by: 0

README

AMQP adapter for Go Worker

This package implements the github.com/purposeinplay/go-worker/ Worker interface using the github.com/streadway/amqp package.

Setup

import "github.com/purposeinplay/go-worker/amqpw"
import "github.com/streadway/amqp"

conn, err := amqp.Dial("amqp://guest:guest@localhost:5672")
if err != nil {
    log.Fatal(err)
}

amqpWorker, err := amqpw.New(
    WithConnectionOption(conn),
    WithNameOption("myapp"),
    WithMaxConcurrency(25),
)

if err != nil {
    log.Fatal(err)
}

// Start the worker
go func() {
    err = amqpWorker.Start()
}()

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrInvalidConnection = errors.New("invalid connection")

ErrInvalidConnection is returned when the Connection opt is not defined.

Functions

func Dial

func Dial(url string) (*amqp.Connection, error)

Dial returns a new AMQP client conn.

Types

type Dialer

type Dialer interface {
	Dial(url string) (*amqp.Connection, error)
}

Dialer is the dialer interface.

type Option

type Option interface {
	// contains filtered or unexported methods
}

Option describes how options should be implemented.

func WithConnectionOption

func WithConnectionOption(conn *amqp.Connection) Option

WithConnectionOption configures the connection option.

func WithExchangeOption

func WithExchangeOption(exchange string) Option

WithExchangeOption configures the exchange option.

func WithLogger

func WithLogger(l *zap.Logger) Option

WithLogger configures the logger option.

func WithMaxConcurrency

func WithMaxConcurrency(v int) Option

WithMaxConcurrency configures the maxConcurrency option.

func WithNameOption

func WithNameOption(name string) Option

WithNameOption configures the name option.

type Options

type Options struct {
	// contains filtered or unexported fields
}

Options are used to configure the AMQP worker adapter.

type Worker

type Worker struct {
	Connection *amqp.Connection
	Channel    *amqp.Channel
	Logger     *zap.Logger
	// contains filtered or unexported fields
}

Worker implements the Worker interface.

func New

func New(opts ...Option) (*Worker, error)

New creates a new AMQP adapter.

func (*Worker) DeleteJob

func (*Worker) DeleteJob(_, _ string) error

DeleteJob removes a job from the queue.

func (*Worker) Perform

func (w *Worker) Perform(job worker.Job) (*worker.JobInfo, error)

Perform enqueues a new job.

func (*Worker) PerformAt

func (w *Worker) PerformAt(
	job worker.Job,
	t time.Time,
) (*worker.JobInfo, error)

PerformAt performs a job at the given time.

func (*Worker) PerformIn

func (w *Worker) PerformIn(job worker.Job, t time.Duration) (*worker.JobInfo, error)

PerformIn performs a job delayed by the given duration.

func (*Worker) Register

func (w *Worker) Register(name string, handler worker.Handler) error

Register consumes a task, using the declared worker.Handler.

func (*Worker) Start

func (w *Worker) Start() error

Start connects to the broker.

func (*Worker) Stop

func (w *Worker) Stop() error

Stop closes the connection to the broker.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL