redispubsub

package module
v1.0.8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 28, 2022 License: MIT Imports: 16 Imported by: 0

README

redispubsub

Redis driver for https://godoc.org/gocloud.dev/pubsub package.

A great alternative to using Kafka, with the ability to quickly switch to it. You can use this driver for MVP, local, small or medium projects. When your project grows you can simply switch to another driver from https://pkg.go.dev/gocloud.dev/pubsub#section-directories.

Using Redis Streams, this driver supports at-least-once delivery.

The driver uses these Redis commands:

  • XADD
  • XGROUP CREATE
  • XREADGROUP (with pending and then new messages - only this library actually supports it)
  • XACK
  • XAUTOCLAIM

Many other queuing implementations with Redis Streams contain a big bug. They incorrectly support reconnecting a consumer to a topic if a message has been received but not acknowledged. They use ">" streaming strategy, which does not deliver unacknowledged messages more than once. And you miss messages when microservices are restarted. This library does not have this disadvantage.

Connection to Redis

The connection string must be defined in the REDIS_URL environment value.

Warning about creating a topic consumer group for the first time

All consumers already have a group, even if there is only one consumer in the group.

Consumer groups receive the same messages from the topic, and consumers within the group receive these messages exclusively.

Messages flow

This driver supports new consumers joining with a new group name after the publisher has sent multiple messages to a topic before the group was created. These consumers will receive all previous non-ACK-ed messages from the beginning of the topic.

How to open topic and send message

import (
    _ "github.com/covrom/redispubsub"
    "gocloud.dev/pubsub"
)

ctx := context.Background()
topic, err := pubsub.OpenTopic(ctx, "redis://topics/1")
if err != nil {
    return fmt.Errorf("could not open topic: %v", err)
}
defer topic.Shutdown(ctx)

m := &pubsub.Message{
    Body: []byte("Hello, World!\n"),
    // Metadata is optional and can be nil.
    Metadata: map[string]string{
        // These are examples of metadata.
        // There is nothing special about the key names.
        "language":   "en",
        "importance": "high",
    },
}

err = topic.Send(ctx, m)
if err != nil {
    return err
}

OpenTopic connection string host/path is required and must contain the topic name.

Connection string query parameters:

  • maxlen is MAXLEN parameter for XADD (limit queue length), unlimited if not set.

How to subscribe on topic

import (
    _ "github.com/covrom/redispubsub"
    "gocloud.dev/pubsub"
)

subs, err := pubsub.OpenSubscription(ctx, "redis://group1?consumer=cons1&topic=topics/1")
if err != nil {
    return err
}
defer subs.Shutdown(ctx)

msg, err := subs.Receive(ctx)
if err != nil {
    // Errors from Receive indicate that Receive will no longer succeed.
    return fmt.Errorf("Receiving message: %v", err)
}
// Do work based on the message, for example:
fmt.Printf("Got message: %q\n", msg.Body)
// Messages must always be acknowledged with Ack.
msg.Ack()

OpenSubscription connection string host(path) is required and must contain the consumer group name.

Connection string query parameters:

  • topic is topic name, required
  • consumer is unique consumer name, required
  • from is FROM option for creating a consumer group (if not exists) with XGROUP CREATE, default is '0'
  • autoclaim is min-idle-time option for XAUTOCLAIM, 30 min by default
  • noack is NOACK option for XREADGROUP, not used by default

See basic_test.go for full usage example.

Monitoring with Prometheus & Grafana

Use redis-exporter prometheus exporter with check-streams option.

See streams.go for details.

Documentation

Overview

Package redispubsub provides an implementation of pubsub for Redis. It requires a minimum Redis version of 6.x for Streams support.

redispubsub does not support Message.Nack; Message.Nackable will return false, and Message.Nack will panic if called.

URLs

For pubsub.OpenTopic and pubsub.OpenSubscription, redispubsub registers for the scheme "redis". The default URL opener will connect to a Redis Server based on the environment variable "REDIS_URL", expected to server address like "redis://<user>:<pass>@localhost:6379/<db>". To customize the URL opener, or for more details on the URL format, see URLOpener. See https://gocloud.dev/concepts/urls/ for background information.

Escaping

Go CDK supports all UTF-8 strings. No escaping is required for Redis.

Index

Constants

View Source
const Scheme = "redis"

Scheme is the URL scheme that redispubsub registers its URLOpeners under on pubsub.DefaultMux.

Variables

This section is empty.

Functions

func OpenSubscription

func OpenSubscription(broker *redis.Client, group, topic string, opts *SubscriptionOptions) (*pubsub.Subscription, error)

OpenSubscription creates a pubsub.Subscription that joins group, receiving messages from topics.

func OpenTopic

func OpenTopic(broker *redis.Client, topicName string, opts *TopicOptions) (*pubsub.Topic, error)

OpenTopic creates a pubsub.Topic that sends to a Redis topic.

Types

type SubscriptionOptions

type SubscriptionOptions struct {
	From              string // starting id ($ after tail of stream), 0 by default (from head of stream)
	Consumer          string // unique consumer name
	NoAck             bool
	AutoClaimIdleTime time.Duration
}

SubscriptionOptions contains configuration for subscriptions.

type TopicOptions

type TopicOptions struct {
	// BatcherOptions adds constraints to the default batching done for sends.
	BatcherOptions batcher.Options
	MaxLen         int64
}

TopicOptions contains configuration options for topics.

type URLOpener

type URLOpener struct {
	// Broker is the Redis parsed URL like "redis://<user>:<pass>@localhost:6379/<db>" with options.
	Broker *redis.Client

	// TopicOptions specifies the options to pass to OpenTopic.
	TopicOptions TopicOptions
	// SubscriptionOptions specifies the options to pass to OpenSubscription.
	SubscriptionOptions SubscriptionOptions
}

URLOpener opens Redis URLs like "redis://mytopic" for topics and "redis://group?topic=mytopic" for subscriptions.

For topics, the URL's host+path is used as the topic name.

For subscriptions, the URL's host+path is used as the group name, and the "topic" query parameter(s) are used as the set of topics to subscribe to. The "offset" parameter is available to subscribers to set the Redis Streams consumer's initial offset. Where "oldest" starts consuming from the oldest offset of the consumer group and "newest" starts consuming from the most recent offset on the topic.

func (*URLOpener) OpenSubscriptionURL

func (o *URLOpener) OpenSubscriptionURL(ctx context.Context, u *url.URL) (*pubsub.Subscription, error)

OpenSubscriptionURL opens a pubsub.Subscription based on u.

func (*URLOpener) OpenTopicURL

func (o *URLOpener) OpenTopicURL(ctx context.Context, u *url.URL) (*pubsub.Topic, error)

OpenTopicURL opens a pubsub.Topic based on u.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL