Documentation ¶
Overview ¶
Package mpx implements a Redis Pub/Sub multiplexer.
Important ¶
All main types implemented by this package must not be copied.
Index ¶
- Variables
- type ChannelSubscription
- type Multiplexer
- func (mpx *Multiplexer) NewChannelSubscription(onMessage OnMessageFunc, onDisconnect OnDisconnectFunc, ...) *ChannelSubscription
- func (mpx *Multiplexer) NewPatternSubscription(pattern string, onMessage OnMessageFunc, onDisconnect OnDisconnectFunc, ...) *PatternSubscription
- func (mpx *Multiplexer) NewPromiseSubscription(prefix string) *PromiseSubscription
- func (mpx *Multiplexer) Restart()
- func (mpx *Multiplexer) Stop()
- type OnActivationFunc
- type OnDisconnectFunc
- type OnMessageFunc
- type PatternSubscription
- type Promise
- type PromiseSubscription
Constants ¶
This section is empty.
Variables ¶
var InactiveSubscriptionError = errors.New("the subscription is currently inactive")
Error returned by NewPromise when the subscription is not active. See WaitForActivation and WaitForNewPromise for alternative solutions.
Functions ¶
This section is empty.
Types ¶
type ChannelSubscription ¶ added in v0.1.0
type ChannelSubscription struct { // Map that contains the Redis Pub/Sub channels // added to the subscription. Useful for testing // membership and obtaining a list of names. // Do not modify directly. Channels map[string]*list.Element // contains filtered or unexported fields }
A ChannelSubscription ties a OnMessageFunc to zero or more Redis Pub/Sub channels through a single multiplexed connection. Use NewChannelSubscription from Multiplexer to create a new ChannelSubscription. Before disposing of a ChannelSubscription you must call Close on it.
ChannelSubscription instances are not safe for concurrent use.
func (*ChannelSubscription) Add ¶ added in v0.1.0
func (s *ChannelSubscription) Add(channels ...string)
Adds a new Redis Pub/Sub channel to the ChannelSubscription.
func (*ChannelSubscription) Clear ¶ added in v0.1.0
func (s *ChannelSubscription) Clear()
Removes all Redis Pub/Sub channels from the ChannelSubscription.
func (*ChannelSubscription) Close ¶ added in v0.1.0
func (s *ChannelSubscription) Close()
Calls Clear and frees all references from the Multiplexer. After calling this method the ChannelSubscription instance cannot not be used any more. There is no need to call Close if you are also disposing of the whole Multiplexer.
func (*ChannelSubscription) Remove ¶ added in v0.1.0
func (s *ChannelSubscription) Remove(channels ...string)
Removes a previously added Redis Pub/Sub channel from the ChannelSubscription.
type Multiplexer ¶
type Multiplexer struct {
// contains filtered or unexported fields
}
A Multiplexer instance corresponds to one Redis Pub/Sub connection that will be shared by multiple subscription instances. A Multiplexer must be created with New. Multiplexer instances are safe for concurrent use.
func New ¶
func New(createConn func() (redis.Conn, error)) *Multiplexer
Creates a new Multiplexer. The input function must provide a new connection whenever called. The Multiplexer will only use it to create a new connection in case of errors, meaning that a Multiplexer will only have one active connection to Redis at a time. Multiplexers will automatically try to reconnect using an exponential backoff (plus jitter) algorithm. It also provides a few default options. Look at the source code to see the defaults.
func NewWithOpts ¶ added in v0.2.0
func NewWithOpts( createConn func() (redis.Conn, error), pingTimeout time.Duration, minBackoff time.Duration, maxBackoff time.Duration, messagesBufSize uint, pipeliningBufSize uint, ) *Multiplexer
Like new, but allows customizing a few options.
- pingTimeout: time of inactivity before we trigger a PING request
- min/maxBackoff: parameter for exponential backoff during reconnection events
- messagesBufSize: buffer size for internal Pub/Sub messages channel
- pipeliningBufSize: buffer size for pipelining Pub/Sub commands
func (*Multiplexer) NewChannelSubscription ¶ added in v0.1.0
func (mpx *Multiplexer) NewChannelSubscription( onMessage OnMessageFunc, onDisconnect OnDisconnectFunc, onActivation OnActivationFunc, ) *ChannelSubscription
Creates a new ChannelSubscription tied to the Multiplexer. Before disposing of a ChannelSubscription you must call its Close method. The arguments onDisconnect and onActivation can be nil if you're not interested in the corresponding types of event. All event listeners will be called sequentially from a single goroutine. Depending on the workload, consider keeping all functions lean and offload slow operations to other goroutines whenever possible. ChannelSubscription instances are not safe for concurrent use.
func (*Multiplexer) NewPatternSubscription ¶ added in v0.1.0
func (mpx *Multiplexer) NewPatternSubscription( pattern string, onMessage OnMessageFunc, onDisconnect OnDisconnectFunc, onActivation OnActivationFunc, ) *PatternSubscription
Creates a new PatternSubcription tied to the Multiplexer. Before disposing of a PatternSubcription you must call its Close method. The arguments onDisconnect and onActivation can be nil if you're not interested in the corresponding types of event. All event listeners will be called sequentially from a single goroutine. Depending on the workload, consider keeping all functions lean and offload slow operations to other goroutines whenever possible. PatternSubscription instances are not safe for concurrent use.
For more information about the pattern syntax: https://redis.io/topics/pubsub#pattern-matching-subscriptions
func (*Multiplexer) NewPromiseSubscription ¶ added in v0.1.0
func (mpx *Multiplexer) NewPromiseSubscription(prefix string) *PromiseSubscription
Creates a new PromiseSubscription. PromiseSubscriptions are safe for concurrent use. The prefix argument is used to create a PatternSubscription that will match all channels that start with the provided prefix.
func (*Multiplexer) Restart ¶ added in v0.1.0
func (mpx *Multiplexer) Restart()
Restarts a stopped Multiplexer. Calling Restart on a Multiplexer that was not stopped will trigger a panic.
func (*Multiplexer) Stop ¶ added in v0.1.0
func (mpx *Multiplexer) Stop()
Stops all service goroutines and closes the underlying Redis Pub/Sub connection.
type OnActivationFunc ¶ added in v0.1.0
type OnActivationFunc = func(name string)
A function that gets triggered whenever a subscription goes into effect.
- Subscription: name is a Redis Pub/Sub channel
- PatternSubscription: name is a Redis Pub/Sub pattern
type OnDisconnectFunc ¶ added in v0.1.0
type OnDisconnectFunc = func(error)
A function that gets triggered every time the Redis Pub/Sub connection has been lost. The error argument will be the error that caused the reconnection event. This function will be triggered *after* all pending messages have been dispatched.
type OnMessageFunc ¶ added in v0.1.0
A function that gets triggered whenever a message is received on a given Redis Pub/Sub channel.
type PatternSubscription ¶ added in v0.1.0
type PatternSubscription struct {
// contains filtered or unexported fields
}
A PatternSubscription ties a OnMessageFunc to one Redis Pub/Sub pattern through a single multiplexed connection. Use NewPatternSubscription from Multiplexer to create a new PatternSubscription. Before disposing of a PatternSubscription you must call Close on it. PatternSubscription instances are not safe for concurrent use.
For more information about the pattern syntax: https://redis.io/topics/pubsub#pattern-matching-subscriptions
func (*PatternSubscription) Close ¶ added in v0.1.0
func (p *PatternSubscription) Close()
Closes the PatternSubscription and frees all allocated resources. You don't need to call Close if you're also disposing of the whole Multiplexer.
func (PatternSubscription) GetPattern ¶ added in v0.1.0
func (p PatternSubscription) GetPattern() string
Returns the pattern that this PatternSubscription is subscribed to.
type Promise ¶ added in v0.1.0
type Promise struct { // Possible outcomes: // - promise fulliflled: one message will be sent, then the channel will be closed // - promise timed out: channel will be closed // - promise canceled: channel will be closed // - network error: channel will be closed C <-chan []byte // contains filtered or unexported fields }
A Promise represents a timed, uninterrupted, single-message subscription to a Redis Pub/Sub channel. If network connectivity gets lost, thus causing an interruption, the Promise will be failed (unless already fullfilled). Use NewPromise from PromiseSubscription to create a new Promise.
type PromiseSubscription ¶ added in v0.1.0
type PromiseSubscription struct {
// contains filtered or unexported fields
}
A PromiseSubscription allows you to wait for individual Redis Pub/Sub messages with support for timeouts. This effectively creates a networked promise system. It makes use of a PatternSubscription internally to make creating new promises as lightweight as possible (no subscribe/unsubscribe command is sent to Redis to fullfill or expire a Promise).
Unlike other types of subscriptions, PromiseSubscriptions *are* safe for concurrent use. You will probably want to call WaitForActivation after creating a new PromiseSubscription.
func (*PromiseSubscription) Close ¶ added in v0.1.0
func (p *PromiseSubscription) Close()
Fails all outstanding Promises and closes the subscription. Calling Close on a closed subscription will trigger a panic.
func (*PromiseSubscription) NewPromise ¶ added in v0.1.0
Creates a new Promise for the given suffix. The suffix gets composed with the prefix specified when creating the PromiseSubscription to create the final Redis Pub/Sub channel name. The underlying PatternSubscription will receive all messages sent under the given prefix, thus ensuring that new promises get into effect as soon as this method returns. Trying to create a new Promise while the PromiseSubscription is not active will cause this method to return InactiveSubscriptionError.
func (*PromiseSubscription) WaitForActivation ¶ added in v0.1.0
func (p *PromiseSubscription) WaitForActivation() (ok bool)
When a PromiseSubscription is first created (and after a disconnection event) it is not immediately able to create new Promises because it first needs to wait for the underlying PatternSubscription to become active. This function will block the caller until such condition is fullfilled. All waiters will be also unlocked when the subscription gets closed, so it's important to check for the return value before attempting to use it.
if sub.WaitForActivation() { // make use of the subscription }
func (*PromiseSubscription) WaitForNewPromise ¶ added in v0.1.0
func (p *PromiseSubscription) WaitForNewPromise(suffix string, timeout time.Duration) (*Promise, bool)
Like NewPromise, but it will wait for the PromiseSubscription to become active instead of returning an error. The timeout will start only *after* the function returns. All waiters will also be unlocked if the subscription gets closed, so it's important to check the second return value before attempting to use the returned Promise. Closing the subscription is the only way to make this function fail.
if promise, ok := sub.WaitForNewPromise(pfx, t_out); ok { // make use of the promise }