redisqueue: github.com/robinjoseph08/redisqueue Index | Files

package redisqueue

import "github.com/robinjoseph08/redisqueue"

Package redisqueue provides a producer and consumer of a queue that uses Redis streams (https://redis.io/topics/streams-intro).

Features

The features of this package include:

- A `Producer` struct to make enqueuing messages easy.
- A `Consumer` struct to make processing messages concurrenly.
- Claiming and acknowledging messages if there's no error, so that if a consumer
  dies while processing, the message it was working on isn't lost. This
  guarantees at least once delivery.
- A "visibility timeout" so that if a message isn't processed in a designated
  time frame, it will be be processed by another consumer.
- A max length on the stream so that it doesn't store the messages indefinitely
  and run out of memory.
- Graceful handling of Unix signals (`SIGINT` and `SIGTERM`) to let in-flight
  messages complete.
- A channel that will surface any errors so you can handle them centrally.
- Graceful handling of panics to avoid crashing the whole process.
- A concurrency setting to control how many goroutines are spawned to process
  messages.
- A batch size setting to limit the total messages in flight.
- Support for multiple streams.

Example

Here's an example of a producer that inserts 1000 messages into a queue:

package main

import (
	"fmt"

	"github.com/robinjoseph08/redisqueue/v2"
)

func main() {
	p, err := redisqueue.NewProducerWithOptions(&redisqueue.ProducerOptions{
		StreamMaxLength:      10000,
		ApproximateMaxLength: true,
	})
	if err != nil {
		panic(err)
	}

	for i := 0; i < 1000; i++ {
		err := p.Enqueue(&redisqueue.Message{
			Stream: "redisqueue:test",
			Values: map[string]interface{}{
				"index": i,
			},
		})
		if err != nil {
			panic(err)
		}

		if i%100 == 0 {
			fmt.Printf("enqueued %d\n", i)
		}
	}
}

And here's an example of a consumer that reads the messages off of that queue:

package main

import (
	"fmt"
	"time"

	"github.com/robinjoseph08/redisqueue/v2"
)

func main() {
	c, err := redisqueue.NewConsumerWithOptions(&redisqueue.ConsumerOptions{
		VisibilityTimeout: 60 * time.Second,
		BlockingTimeout:   5 * time.Second,
		ReclaimInterval:   1 * time.Second,
		BufferSize:        100,
		Concurrency:       10,
	})
	if err != nil {
		panic(err)
	}

	c.Register("redisqueue:test", process)

	go func() {
		for err := range c.Errors {
			// handle errors accordingly
			fmt.Printf("err: %+v\n", err)
		}
	}()

	fmt.Println("starting")
	c.Run()
	fmt.Println("stopped")
}

func process(msg *redisqueue.Message) error {
	fmt.Printf("processing message: %v\n", msg.Values["index"])
	return nil
}

Index

Package Files

consumer.go doc.go message.go producer.go redis.go signals.go

type Consumer Uses

type Consumer struct {
    // Errors is a channel that you can receive from to centrally handle any
    // errors that may occur either by your ConsumerFuncs or by internal
    // processing functions. Because this is an unbuffered channel, you must
    // have a listener on it. If you don't parts of the consumer could stop
    // functioning when errors occur due to the blocking nature of unbuffered
    // channels.
    Errors chan error
    // contains filtered or unexported fields
}

Consumer adds a convenient wrapper around dequeuing and managing concurrency.

func NewConsumer Uses

func NewConsumer() (*Consumer, error)

NewConsumer uses a default set of options to create a Consumer. It sets Name to the hostname, GroupName to "redisqueue", VisibilityTimeout to 60 seconds, BufferSize to 100, and Concurrency to 10. In most production environments, you'll want to use NewConsumerWithOptions.

func NewConsumerWithOptions Uses

func NewConsumerWithOptions(options *ConsumerOptions) (*Consumer, error)

NewConsumerWithOptions creates a Consumer with custom ConsumerOptions. If Name is left empty, it defaults to the hostname; if GroupName is left empty, it defaults to "redisqueue"; if BlockingTimeout is 0, it defaults to 5 seconds; if ReclaimInterval is 0, it defaults to 1 second.

func (*Consumer) Register Uses

func (c *Consumer) Register(stream string, fn ConsumerFunc)

Register takes in a stream name and a ConsumerFunc that will be called when a message comes in from that stream. Register must be called at least once before Run is called. If the same stream name is passed in twice, the first ConsumerFunc is overwritten by the second.

func (*Consumer) RegisterWithLastID Uses

func (c *Consumer) RegisterWithLastID(stream string, id string, fn ConsumerFunc)

RegisterWithLastID is the same as Register, except that it also lets you specify the oldest message to receive when first creating the consumer group. This can be any valid message ID, "0" for all messages in the stream, or "$" for only new messages.

If the consumer group already exists the id field is ignored, meaning you'll receive unprocessed messages.

func (*Consumer) Run Uses

func (c *Consumer) Run()

Run starts all of the worker goroutines and starts processing from the streams that have been registered with Register. All errors will be sent to the Errors channel. If Register was never called, an error will be sent and Run will terminate early. The same will happen if an error occurs when creating the consumer group in Redis. Run will block until Shutdown is called and all of the in-flight messages have been processed.

func (*Consumer) Shutdown Uses

func (c *Consumer) Shutdown()

Shutdown stops new messages from being processed and tells the workers to wait until all in-flight messages have been processed, and then they exit. The order that things stop is 1) the reclaim process (if it's running), 2) the polling process, and 3) the worker processes.

type ConsumerFunc Uses

type ConsumerFunc func(*Message) error

ConsumerFunc is a type alias for the functions that will be used to handle and process Messages.

type ConsumerOptions Uses

type ConsumerOptions struct {
    // Name sets the name of this consumer. This will be used when fetching from
    // Redis. If empty, the hostname will be used.
    Name string
    // GroupName sets the name of the consumer group. This will be used when
    // coordinating in Redis. If empty, the hostname will be used.
    GroupName string
    // VisibilityTimeout dictates the maximum amount of time a message should
    // stay in pending. If there is a message that has been idle for more than
    // this duration, the consumer will attempt to claim it.
    VisibilityTimeout time.Duration
    // BlockingTimeout designates how long the XREADGROUP call blocks for. If
    // this is 0, it will block indefinitely. While this is the most efficient
    // from a polling perspective, if this call never times out, there is no
    // opportunity to yield back to Go at a regular interval. This means it's
    // possible that if no messages are coming in, the consumer cannot
    // gracefully shutdown. Instead, it's recommended to set this to 1-5
    // seconds, or even longer, depending on how long your application can wait
    // to shutdown.
    BlockingTimeout time.Duration
    // ReclaimInterval is the amount of time in between calls to XPENDING to
    // attempt to reclaim jobs that have been idle for more than the visibility
    // timeout. A smaller duration will result in more frequent checks. This
    // will allow messages to be reaped faster, but it will put more load on
    // Redis.
    ReclaimInterval time.Duration
    // BufferSize determines the size of the channel uses to coordinate the
    // processing of the messages. This determines the maximum number of
    // in-flight messages.
    BufferSize int
    // Concurrency dictates how many goroutines to spawn to handle the messages.
    Concurrency int
    // RedisClient supersedes the RedisOptions field, and allows you to inject
    // an already-made *redis.Client for use in the consumer.
    RedisClient *redis.Client
    // RedisOptions allows you to configure the underlying Redis connection.
    // More info here:
    // https://pkg.go.dev/github.com/go-redis/redis/v7?tab=doc#Options.
    //
    // This field is used if RedisClient field is nil.
    RedisOptions *RedisOptions
}

ConsumerOptions provide options to configure the Consumer.

type Message Uses

type Message struct {
    ID     string
    Stream string
    Values map[string]interface{}
}

Message constitutes a message that will be enqueued and dequeued from Redis. When enqueuing, it's recommended to leave ID empty and let Redis generate it, unless you know what you're doing.

type Producer Uses

type Producer struct {
    // contains filtered or unexported fields
}

Producer adds a convenient wrapper around enqueuing messages that will be processed later by a Consumer.

func NewProducer Uses

func NewProducer() (*Producer, error)

NewProducer uses a default set of options to create a Producer. It sets StreamMaxLength to 1000 and ApproximateMaxLength to true. In most production environments, you'll want to use NewProducerWithOptions.

func NewProducerWithOptions Uses

func NewProducerWithOptions(options *ProducerOptions) (*Producer, error)

NewProducerWithOptions creates a Producer using custom ProducerOptions.

func (*Producer) Enqueue Uses

func (p *Producer) Enqueue(msg *Message) error

Enqueue takes in a pointer to Message and enqueues it into the stream set at msg.Stream. While you can set msg.ID, unless you know what you're doing, you should let Redis auto-generate the ID. If an ID is auto-generated, it will be set on msg.ID for your reference. msg.Values is also required.

type ProducerOptions Uses

type ProducerOptions struct {
    // StreamMaxLength sets the MAXLEN option when calling XADD. This creates a
    // capped stream to prevent the stream from taking up memory indefinitely.
    // It's important to note though that this isn't the maximum number of
    // _completed_ messages, but the maximum number of _total_ messages. This
    // means that if all consumers are down, but producers are still enqueuing,
    // and the maximum is reached, unprocessed message will start to be dropped.
    // So ideally, you'll set this number to be as high as you can makee it.
    // More info here: https://redis.io/commands/xadd#capped-streams.
    StreamMaxLength int64
    // ApproximateMaxLength determines whether to use the ~ with the MAXLEN
    // option. This allows the stream trimming to done in a more efficient
    // manner. More info here: https://redis.io/commands/xadd#capped-streams.
    ApproximateMaxLength bool
    // RedisClient supersedes the RedisOptions field, and allows you to inject
    // an already-made *redis.Client for use in the consumer.
    RedisClient *redis.Client
    // RedisOptions allows you to configure the underlying Redis connection.
    // More info here:
    // https://pkg.go.dev/github.com/go-redis/redis/v7?tab=doc#Options.
    //
    // This field is used if RedisClient field is nil.
    RedisOptions *RedisOptions
}

ProducerOptions provide options to configure the Producer.

type RedisOptions Uses

type RedisOptions = redis.Options

RedisOptions is an alias to redis.Options so that users can this instead of having to import go-redis directly.

Package redisqueue imports 12 packages (graph). Updated 2020-05-27. Refresh now. Tools for package owners.