redis

package
v1.1.14 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 2, 2024 License: Apache-2.0, MIT Imports: 10 Imported by: 0

README

Watermill Redis Stream Pub/Sub

This is Pub/Sub for the Watermill project.

All Pub/Sub implementations can be found at https://watermill.io/pubsubs/.

Watermill is a Go library for working efficiently with message streams. It is intended for building event driven applications, enabling event sourcing, RPC over messages, sagas and basically whatever else comes to your mind. You can use conventional pub/sub implementations like Kafka or RabbitMQ, but also HTTP or MySQL binlog if that fits your use case.

Documentation: https://watermill.io/

Getting started guide: https://watermill.io/docs/getting-started/

Issues: https://github.com/ThreeDotsLabs/watermill/issues

Contributing

All contributions are very much welcome. If you'd like to help with Watermill development, please see open issues and submit your pull request via GitHub.

Support

If you didn't find the answer to your question in the documentation, feel free to ask us directly!

Please join us on the #watermill channel on the Gophers slack: You can get an invite here.

License

MIT License

Documentation

Overview

Package redis fork from github.com/ThreeDotsLabs/watermill-redisstream@v1.2.2

Index

Constants

View Source
const (
	// NoSleep can be set to SubscriberConfig.NackResendSleep
	NoSleep time.Duration = -1

	DefaultBlockTime = time.Millisecond * 100

	DefaultClaimInterval = time.Second * 5

	DefaultClaimBatchSize = int64(100)

	DefaultMaxIdleTime = time.Second * 60

	DefaultCheckConsumersInterval = time.Second * 300
	DefaultConsumerTimeout        = time.Second * 600
)
View Source
const UUIDHeaderKey = "_watermill_message_uuid"

Variables

This section is empty.

Functions

This section is empty.

Types

type DefaultMarshallerUnmarshaller

type DefaultMarshallerUnmarshaller struct {
	AppID string
}

func (DefaultMarshallerUnmarshaller) Marshal

func (DefaultMarshallerUnmarshaller) Unmarshal

func (DefaultMarshallerUnmarshaller) Unmarshal(values map[string]any) (msg *message.Message, err error)

type Marshaller

type Marshaller interface {
	Marshal(topic string, msg *message.Message) (map[string]any, error)
}

type MarshallerUnmarshaller

type MarshallerUnmarshaller interface {
	Marshaller
	Unmarshaller
}

type Publisher

type Publisher struct {
	// contains filtered or unexported fields
}

func NewPublisher

func NewPublisher(config PublisherConfig, logger watermill.LoggerAdapter) (*Publisher, error)

NewPublisher creates a new redis stream Publisher.

func (*Publisher) Close

func (p *Publisher) Close() error

func (*Publisher) Publish

func (p *Publisher) Publish(ctx context.Context, topic string, msgs ...*message.Message) error

Publish publishes message to redis stream

Publish is blocking and waits for redis response. When any of messages delivery fails - function is interrupted.

type PublisherConfig

type PublisherConfig struct {
	Client                redis.UniversalClient
	Marshaller            Marshaller
	Maxlens               map[string]int64
	DisableRedisConnClose bool
}

func (*PublisherConfig) Validate

func (c *PublisherConfig) Validate() error

type Subscriber

type Subscriber struct {
	// contains filtered or unexported fields
}

func NewSubscriber

func NewSubscriber(config SubscriberConfig, logger watermill.LoggerAdapter) (*Subscriber, error)

NewSubscriber creates a new redis stream Subscriber.

func (*Subscriber) Close

func (s *Subscriber) Close() error

func (*Subscriber) Subscribe

func (s *Subscriber) Subscribe(ctx context.Context, topic string) (<-chan *message.Message, error)

type SubscriberConfig

type SubscriberConfig struct {
	Client redis.UniversalClient

	Unmarshaller Unmarshaller

	// Redis stream consumer id, paired with ConsumerGroup.
	Consumer string
	// When empty, fan-out mode will be used.
	ConsumerGroup string

	// How long after Nack message should be redelivered.
	NackResendSleep time.Duration

	// Block to wait next redis stream message.
	BlockTime time.Duration

	// Claim idle pending message interval.
	ClaimInterval time.Duration

	// How many pending messages are claimed at most each claim interval.
	ClaimBatchSize int64

	// How long should we treat a pending message as claimable.
	MaxIdleTime time.Duration

	// Check consumer status interval.
	CheckConsumersInterval time.Duration

	// After this timeout an idle consumer with no pending messages will be removed from the consumer group.
	ConsumerTimeout time.Duration

	// Start consumption from the specified message ID.
	// When using "0", the consumer group will consume from the very first message.
	// When using "$", the consumer group will consume from the latest message.
	OldestId string

	// If this is set, it will be called to decide whether a pending message that
	// has been idle for more than MaxIdleTime should actually be claimed.
	// If this is not set, then all pending messages that have been idle for more than MaxIdleTime will be claimed.
	// This can be useful e.g. for tasks where the processing time can be very variable -
	// so we can't just use a short MaxIdleTime; but at the same time dead
	// consumers should be spotted quickly - so we can't just use a long MaxIdleTime either.
	// In such cases, if we have another way for checking consumers' health, then we can
	// leverage that in this callback.
	ShouldClaimPendingMessage func(redis.XPendingExt) bool

	DisableRedisConnClose bool
}

func (*SubscriberConfig) Validate

func (sc *SubscriberConfig) Validate() error

type Unmarshaller

type Unmarshaller interface {
	Unmarshal(values map[string]any) (msg *message.Message, err error)
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL