mpx

package module
v0.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 29, 2020 License: MIT Imports: 7 Imported by: 0

README

go-mpx

RedisMPX is a Redis Pub/Sub multiplexer library written in multiple languages and live coded on Twitch.

The live coding of this implementation is archived on YouTube.

Abstract

When bridging multiple application instances through Redis Pub/Sub it's easy to end up needing support for multiplexing. RedisMPX streamlines this process in a consistent way across multiple languages by offering a consistent set of features that cover the most common use cases.

The library works under the assumption that you are going to create separate subscriptions for each client connected to your service (e.g. WebSockets clients):

  • ChannelSubscription allows you to add and remove individual Redis PubSub channels similarly to how a multi-room chat application would need to.
  • PatternSubscription allows you to subscribe to a single Redis Pub/Sub pattern.
  • PromiseSubscription allows you to create a networked promise system.

Features

  • Simple channel subscriptions
  • Pattern subscriptions
  • Networked promise system
  • Connection retry with exponetial backoff + jitter
  • Aggressive pipelining over the Redis Pub/Sub connection

Documentation

Quickstart

package main

import (
	"fmt"
	"github.com/RedisMPX/go-mpx"
	"github.com/gomodule/redigo/redis"
	"time"
)

func main() {
	connBuilder := func() (redis.Conn, error) {
		return redis.Dial("tcp", ":6379")
	}

	// Create a Multiplexer
	multiplexer := mpx.New(connBuilder)

	// Define a onMessage callback
	onMessage := func(ch string, msg []byte) {
		fmt.Printf("channel: [%v] msg: [%v]\n", ch, string(msg))
	}

	// Create a Subscription
	sub := multiplexer.NewChannelSubscription(onMessage, nil, nil)

	// Add a Redis Pub/Sub channel to the Subscription
	sub.Add("mychannel")

	// Create a second connection to Redis
	conn, err := redis.Dial("tcp", ":6379")
	if err != nil {
		panic(err)
	}

	// Publish Messages over Redis Pub/Sub forever.
	for {
		_, err := conn.Do("PUBLISH", "mychannel", "Hello World!")
		if err != nil {
			panic(err)
		}

		time.Sleep(3 * time.Second)
	}
}

Documentation

Overview

Package mpx implements a Redis Pub/Sub multiplexer.

Important

All main types implemented by this package must not be copied.

Index

Constants

This section is empty.

Variables

View Source
var InactiveSubscriptionError = errors.New("the subscription is currently inactive")

Error returned by NewPromise when the subscription is not active. See WaitForActivation and WaitForNewPromise for alternative solutions.

Functions

This section is empty.

Types

type ChannelSubscription added in v0.1.0

type ChannelSubscription struct {
	// Map that contains the Redis Pub/Sub channels
	// added to the subscription. Useful for testing
	// membership and obtaining a list of names.
	// Do not modify directly.
	Channels map[string]*list.Element
	// contains filtered or unexported fields
}

A ChannelSubscription ties a OnMessageFunc to zero or more Redis Pub/Sub channels through a single multiplexed connection. Use NewChannelSubscription from Multiplexer to create a new ChannelSubscription. Before disposing of a ChannelSubscription you must call Close on it.

ChannelSubscription instances are not safe for concurrent use.

func (*ChannelSubscription) Add added in v0.1.0

func (s *ChannelSubscription) Add(channels ...string)

Adds a new Redis Pub/Sub channel to the ChannelSubscription.

func (*ChannelSubscription) Clear added in v0.1.0

func (s *ChannelSubscription) Clear()

Removes all Redis Pub/Sub channels from the ChannelSubscription.

func (*ChannelSubscription) Close added in v0.1.0

func (s *ChannelSubscription) Close()

Calls Clear and frees all references from the Multiplexer. After calling this method the ChannelSubscription instance cannot not be used any more. There is no need to call Close if you are also disposing of the whole Multiplexer.

func (*ChannelSubscription) Remove added in v0.1.0

func (s *ChannelSubscription) Remove(channels ...string)

Removes a previously added Redis Pub/Sub channel from the ChannelSubscription.

type Multiplexer

type Multiplexer struct {
	// contains filtered or unexported fields
}

A Multiplexer instance corresponds to one Redis Pub/Sub connection that will be shared by multiple subscription instances. A Multiplexer must be created with New. Multiplexer instances are safe for concurrent use.

func New

func New(createConn func() (redis.Conn, error)) *Multiplexer

Creates a new Multiplexer. The input function must provide a new connection whenever called. The Multiplexer will only use it to create a new connection in case of errors, meaning that a Multiplexer will only have one active connection to Redis at a time. Multiplexers will automatically try to reconnect using an exponential backoff (plus jitter) algorithm. It also provides a few default options. Look at the source code to see the defaults.

func NewWithOpts added in v0.2.0

func NewWithOpts(
	createConn func() (redis.Conn, error),
	pingTimeout time.Duration,
	minBackoff time.Duration,
	maxBackoff time.Duration,
	messagesBufSize uint,
	pipeliningBufSize uint,
) *Multiplexer

Like new, but allows customizing a few options.

  • pingTimeout: time of inactivity before we trigger a PING request
  • min/maxBackoff: parameter for exponential backoff during reconnection events
  • messagesBufSize: buffer size for internal Pub/Sub messages channel
  • pipeliningBufSize: buffer size for pipelining Pub/Sub commands

func (*Multiplexer) NewChannelSubscription added in v0.1.0

func (mpx *Multiplexer) NewChannelSubscription(
	onMessage OnMessageFunc,
	onDisconnect OnDisconnectFunc,
	onActivation OnActivationFunc,
) *ChannelSubscription

Creates a new ChannelSubscription tied to the Multiplexer. Before disposing of a ChannelSubscription you must call its Close method. The arguments onDisconnect and onActivation can be nil if you're not interested in the corresponding types of event. All event listeners will be called sequentially from a single goroutine. Depending on the workload, consider keeping all functions lean and offload slow operations to other goroutines whenever possible. ChannelSubscription instances are not safe for concurrent use.

func (*Multiplexer) NewPatternSubscription added in v0.1.0

func (mpx *Multiplexer) NewPatternSubscription(
	pattern string,
	onMessage OnMessageFunc,
	onDisconnect OnDisconnectFunc,
	onActivation OnActivationFunc,
) *PatternSubscription

Creates a new PatternSubcription tied to the Multiplexer. Before disposing of a PatternSubcription you must call its Close method. The arguments onDisconnect and onActivation can be nil if you're not interested in the corresponding types of event. All event listeners will be called sequentially from a single goroutine. Depending on the workload, consider keeping all functions lean and offload slow operations to other goroutines whenever possible. PatternSubscription instances are not safe for concurrent use.

For more information about the pattern syntax: https://redis.io/topics/pubsub#pattern-matching-subscriptions

func (*Multiplexer) NewPromiseSubscription added in v0.1.0

func (mpx *Multiplexer) NewPromiseSubscription(prefix string) *PromiseSubscription

Creates a new PromiseSubscription. PromiseSubscriptions are safe for concurrent use. The prefix argument is used to create a PatternSubscription that will match all channels that start with the provided prefix.

func (*Multiplexer) Restart added in v0.1.0

func (mpx *Multiplexer) Restart()

Restarts a stopped Multiplexer. Calling Restart on a Multiplexer that was not stopped will trigger a panic.

func (*Multiplexer) Stop added in v0.1.0

func (mpx *Multiplexer) Stop()

Stops all service goroutines and closes the underlying Redis Pub/Sub connection.

type OnActivationFunc added in v0.1.0

type OnActivationFunc = func(name string)

A function that gets triggered whenever a subscription goes into effect.

  • Subscription: name is a Redis Pub/Sub channel
  • PatternSubscription: name is a Redis Pub/Sub pattern

type OnDisconnectFunc added in v0.1.0

type OnDisconnectFunc = func(error)

A function that gets triggered every time the Redis Pub/Sub connection has been lost. The error argument will be the error that caused the reconnection event. This function will be triggered *after* all pending messages have been dispatched.

type OnMessageFunc added in v0.1.0

type OnMessageFunc = func(channel string, message []byte)

A function that gets triggered whenever a message is received on a given Redis Pub/Sub channel.

type PatternSubscription added in v0.1.0

type PatternSubscription struct {
	// contains filtered or unexported fields
}

A PatternSubscription ties a OnMessageFunc to one Redis Pub/Sub pattern through a single multiplexed connection. Use NewPatternSubscription from Multiplexer to create a new PatternSubscription. Before disposing of a PatternSubscription you must call Close on it. PatternSubscription instances are not safe for concurrent use.

For more information about the pattern syntax: https://redis.io/topics/pubsub#pattern-matching-subscriptions

func (*PatternSubscription) Close added in v0.1.0

func (p *PatternSubscription) Close()

Closes the PatternSubscription and frees all allocated resources. You don't need to call Close if you're also disposing of the whole Multiplexer.

func (PatternSubscription) GetPattern added in v0.1.0

func (p PatternSubscription) GetPattern() string

Returns the pattern that this PatternSubscription is subscribed to.

type Promise added in v0.1.0

type Promise struct {
	// Possible outcomes:
	// - promise fulliflled: one message will be sent, then the channel will be closed
	// - promise timed out: channel will be closed
	// - promise canceled: channel will be closed
	// - network error: channel will be closed
	C <-chan []byte
	// contains filtered or unexported fields
}

A Promise represents a timed, uninterrupted, single-message subscription to a Redis Pub/Sub channel. If network connectivity gets lost, thus causing an interruption, the Promise will be failed (unless already fullfilled). Use NewPromise from PromiseSubscription to create a new Promise.

func (*Promise) Cancel added in v0.1.0

func (p *Promise) Cancel()

Cancels a Promise. Calling Cancel more than once, or on an already-fullfilled Promise, is a no-op.

type PromiseSubscription added in v0.1.0

type PromiseSubscription struct {
	// contains filtered or unexported fields
}

A PromiseSubscription allows you to wait for individual Redis Pub/Sub messages with support for timeouts. This effectively creates a networked promise system. It makes use of a PatternSubscription internally to make creating new promises as lightweight as possible (no subscribe/unsubscribe command is sent to Redis to fullfill or expire a Promise).

Unlike other types of subscriptions, PromiseSubscriptions *are* safe for concurrent use. You will probably want to call WaitForActivation after creating a new PromiseSubscription.

func (*PromiseSubscription) Close added in v0.1.0

func (p *PromiseSubscription) Close()

Fails all outstanding Promises and closes the subscription. Calling Close on a closed subscription will trigger a panic.

func (*PromiseSubscription) NewPromise added in v0.1.0

func (p *PromiseSubscription) NewPromise(suffix string, timeout time.Duration) (*Promise, error)

Creates a new Promise for the given suffix. The suffix gets composed with the prefix specified when creating the PromiseSubscription to create the final Redis Pub/Sub channel name. The underlying PatternSubscription will receive all messages sent under the given prefix, thus ensuring that new promises get into effect as soon as this method returns. Trying to create a new Promise while the PromiseSubscription is not active will cause this method to return InactiveSubscriptionError.

func (*PromiseSubscription) WaitForActivation added in v0.1.0

func (p *PromiseSubscription) WaitForActivation() (ok bool)

When a PromiseSubscription is first created (and after a disconnection event) it is not immediately able to create new Promises because it first needs to wait for the underlying PatternSubscription to become active. This function will block the caller until such condition is fullfilled. All waiters will be also unlocked when the subscription gets closed, so it's important to check for the return value before attempting to use it.

if sub.WaitForActivation() {
   // make use of the subscription
}

func (*PromiseSubscription) WaitForNewPromise added in v0.1.0

func (p *PromiseSubscription) WaitForNewPromise(suffix string, timeout time.Duration) (*Promise, bool)

Like NewPromise, but it will wait for the PromiseSubscription to become active instead of returning an error. The timeout will start only *after* the function returns. All waiters will also be unlocked if the subscription gets closed, so it's important to check the second return value before attempting to use the returned Promise. Closing the subscription is the only way to make this function fail.

if promise, ok := sub.WaitForNewPromise(pfx, t_out); ok {
   // make use of the promise
}

Directories

Path Synopsis
list
Package list implements a doubly linked list.
Package list implements a doubly linked list.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL