rmux

package module
v0.2.2-0...-bca02f7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 12, 2021 License: BSD-3-Clause Imports: 17 Imported by: 1

README

Travis CI

Rmux

Rmux is a Redis connection pooler and multiplexer, written in Go. Rmux is meant to be used for LAMP stacks, or other short-lived process applications, with high request volume. It should be run as a client, on every server that connects to redis--to reduce the total inbound connection count to the redis servers, while handle consistent multiplexing.

Motivation

At Pardot, we use redis (among other things) for our cache layer. Early on, we saw occasional latency spikes. After tuning our redis servers' net.ipv4.tcp.. settings , everything settled down--but as we grew, we began to see issues pop up again.

While our Memory usage remained remarkably low, we saw occasional CPU spikes during peak access times. Adding more redis boxes, with key-based hashing in our application, surprisingly did not help. Pardot application severs run on a LAMP stack, which means that each request has to create its own connection to Redis. Since each application request hits multiple cache keys, destination redis boxes were receiving the same number of connections, but less commands.

Since the issue seemed to be purely connection rates, and not command count, we started looking for a connection pooler. After finding none that were designed for redis, we built our own. Along the way, we built in key-based multiplexing, with a failover strategy in place.

With rmux, our application servers all connect to a local unix socket, instead of the target destination redis port. Rmux then parses the incomming request, reads the first key, and hashes it to find which server to execute the command on. If a server is down, the command will instead be sent to a backup-hashed server. Since rmux understands the redis protocol, it also handles connection pooling//recycling for you, and handles server id management for the connections.

When rmux hit production, we saw immediate gains in our 90th-percentile and upper-bound response times.

Installing

  • Install Go
  • go get -u github.com/salesforce/rmux
  • go build -o /usr/local/bin/rmux github.com/salesforce/rmux/main

Usage

Usage of rmux:
  -host="localhost": The host to listen for incoming connections on
  -localReadTimeout=0: Timeout to set locally (read)
  -localTimeout=0: Timeout to set locally (read+write)
  -localWriteTimeout=0: Timeout to set locally (write)
  -maxProcesses=0: The number of processes to use.  If this is not defined, go's default is used.
  -poolSize=50: The size of the connection pools to use
  -port="6379": The port to listen for incoming connections on
  -remoteConnectTimeout=0: Timeout to set for remote redises (connect)
  -remoteReadTimeout=0: Timeout to set for remote redises (read)
  -remoteTimeout=0: Timeout to set for remote redises (connect+read+write)
  -remoteWriteTimeout=0: Timeout to set for remote redises (write)
  -socket="": The socket to listen for incoming connections on.  If this is provided, host and port are ignored
  -tcpConnections="localhost:6380 localhost:6381": TCP connections (destination redis servers) to multiplex over
  -unixConnections="": Unix connections (destination redis servers) to multiplex over
  -config="": Path to configuration file

For more details about rmux configuration see Configuration

Localhost example:

redis-server --port 6379 &
redis-server --port 6380 &
redis-server --port 6381 &
redis-server --port 6382 &
rmux -socket=/tmp/rmux.sock -tcpConnections="localhost:6379 localhost:6380 localhost:6381 localhost:6382" &
redis-cli -s /tmp/rmux.sock
  • In the above example, all key-based commands will hash over ports 6379->6382 on localhost
  • If the server that a key hashes to is down, a backup server is automatically used (hashed based over the servers that are currently up)
  • All servers running production code should be running the same version (and destination flags) of rmux, and should be connecting over the rmux socket
  • Select will always return +OK, even if the server id is invalid
  • Ping will always return +PONG
  • Quit will always return +OK
  • Info will return an abbreviated response:
rmux_version: 1.0
go_version: go1.1.2
process_id: 48885
connected_clients: 0
active_endpoints: 4
total_endpoints: 4
role: master

Production equivalent:

rmux -socket=/tmp/rmux.sock -tcpConnections="redis1:6379 redis1:6380 redis2:6379 redis2:6380"
Disabled commands

Redis commands that should only be run directly on a redis server are disabled. Commands that operate on more than one key (or have the potential to) are disabled if multiplexing is enabled.

PubSub support is currently experimental, and only publish and subscribe are supported. Disabled:

psubscribe
pubsub
punsubscribe
unsubscribe

Full list of disabled commands

Benchmarks

Benchmarks with keep-alive off (simulating a lamp stack) show rmux being ~10x as fast as a direct connection, under heavy load.

Benchmarks with keep-alive on (simulating how a java server would operate) show rmux being ~70% as fast as a direct connection.

Benchmark results here

Rmux is currently used in production by Pardot. We have seen a reduction in our upper and 90th percentile connection and command times. The 90th percentile times are slightly improved, and the upper times are drastically improved.

Production graphite data is here

Documentation

Overview

Package rmux provides a connection-pooling, multiplexing redis server. Commands are parsed, and multiplexed out based on their arguments. Package rmux/main includes a working server implementation, if no customization is needed

Index

Constants

This section is empty.

Variables

View Source
var (
	ERR_QUIT            = errors.New("Client asked to quit")
	ERR_CONNECTION_DOWN = errors.New(string(CONNECTION_DOWN_RESPONSE))
	ERR_TIMEOUT         = errors.New("Proxy timeout")
)
View Source
var (
	//Response code for when a command (that operates on multiple keys) is used on a server that is multiplexing
	MULTIPLEX_OPERATION_UNSUPPORTED_RESPONSE = []byte("This command is not supported for multiplexing servers")
	//Response code for when a client can't connect to any target servers
	CONNECTION_DOWN_RESPONSE = []byte("Connection down")
)

Functions

This section is empty.

Types

type Client

type Client struct {
	//The underlying ReadWriter for this connection
	Writer *FlexibleWriter
	//Whether or not this client needs to consider multiplexing
	Multiplexing bool
	Connection   net.Conn
	//The Database that our client thinks we're connected to
	DatabaseId int
	//Whether or not this client connection is active or not
	//Upon QUIT command, this gets toggled off
	Active      bool
	ReadChannel chan readItem
	HashRing    *connection.HashRing

	Scanner *protocol.RespScanner
	// contains filtered or unexported fields
}

Represents a redis client that is connected to our rmux server

func NewClient

func NewClient(connection net.Conn, readTimeout, writeTimeout time.Duration, isMuliplexing bool, hashRing *connection.HashRing) (newClient *Client)

Initializes a new client, for the given established net connection, with the specified read/write timeouts

func (*Client) FlushError

func (this *Client) FlushError(err error) error

func (*Client) FlushLine

func (this *Client) FlushLine(line []byte) (err error)

func (*Client) FlushRedisAndRespond

func (this *Client) FlushRedisAndRespond() error

Performs the query against the redis server and responds to the connected client with the response from redis.

func (*Client) HasBufferedOutput

func (this *Client) HasBufferedOutput() bool

func (*Client) HasQueued

func (this *Client) HasQueued() bool

func (*Client) ParseCommand

func (this *Client) ParseCommand(command protocol.Command) ([]byte, error)

Parses the given command

func (*Client) Queue

func (this *Client) Queue(command protocol.Command)

func (*Client) ReadLoop

func (this *Client) ReadLoop(rmux *RedisMultiplexer)

Read loop for this client - moves commands and channels to the worker loop

func (*Client) WriteError

func (this *Client) WriteError(err error, flush bool) error

func (*Client) WriteLine

func (this *Client) WriteLine(line []byte) (err error)

type RedisMultiplexer

type RedisMultiplexer struct {
	HashRing *connection.HashRing
	//hashmap of [connection endpoint] -> connectionPools
	ConnectionCluster []*connection.ConnectionPool
	//The net.listener for our server
	Listener net.Listener
	//The amount of connections to store, in each of our connectionpools
	PoolSize int
	//The primary connection key to use.  If we're not operating on a key-based operation, it will go here
	PrimaryConnectionPool *connection.ConnectionPool
	//And overridable connect timeout.  Defaults to EXTERN_CONNECT_TIMEOUT
	EndpointConnectTimeout time.Duration
	//An overridable read timeout.  Defaults to EXTERN_READ_TIMEOUT
	EndpointReadTimeout time.Duration
	//An overridable write timeout.  Defaults to EXTERN_WRITE_TIMEOUT
	EndpointWriteTimeout time.Duration
	//An overridable read timeout.  Defaults to EXTERN_READ_TIMEOUT
	ClientReadTimeout time.Duration
	//An overridable write timeout.  Defaults to EXTERN_WRITE_TIMEOUT
	ClientWriteTimeout time.Duration
	// The graphite statsd server to ping with metrics
	GraphiteServer *string

	// Whether to failover to another connection pool if the target connection pool is down (in multiplexing mode)
	Failover bool
	// contains filtered or unexported fields
}

The main RedisMultiplexer Listens on a specified socket or port, and assigns out queries to any number of connection pools If more than one connection pool is given multi-key operations are blocked

func NewRedisMultiplexer

func NewRedisMultiplexer(listenProtocol, listenEndpoint string, poolSize int) (newRedisMultiplexer *RedisMultiplexer, err error)

Initializes a new redis multiplexer, listening on the given protocol/endpoint, with a set connectionPool size ex: "unix", "/tmp/myAwesomeSocket", 50

func (*RedisMultiplexer) AddConnection

func (this *RedisMultiplexer) AddConnection(remoteProtocol, remoteEndpoint string)

Adds a connection to the redis multiplexer, for the given protocol and endpoint

func (*RedisMultiplexer) GraphiteCheckin

func (this *RedisMultiplexer) GraphiteCheckin()

func (*RedisMultiplexer) HandleClientRequests

func (this *RedisMultiplexer) HandleClientRequests(client *Client)

Handles requests for a client. Inspects all incoming commands, to find if they are key-driven or not. If they are, finds the appropriate connection pool, and passes the request off to it.

func (*RedisMultiplexer) HandleCommand

func (this *RedisMultiplexer) HandleCommand(client *Client, command protocol.Command)

func (*RedisMultiplexer) HandleCommandChunk

func (this *RedisMultiplexer) HandleCommandChunk(client *Client, command protocol.Command)

This looks a lot like HandleClientRequests above, but will break and flush to redis if there is nothing to read. Will allow it to handle a pipeline of commands without spinning indefinitely.

func (*RedisMultiplexer) HandleError

func (this *RedisMultiplexer) HandleError(client *Client, err error)

func (*RedisMultiplexer) SetAllTimeouts

func (rm *RedisMultiplexer) SetAllTimeouts(t time.Duration)

func (*RedisMultiplexer) Start

func (this *RedisMultiplexer) Start() (err error)

Called when a rmux server is ready to begin accepting connections

Directories

Path Synopsis
Package rmux/connection provides a way to open outbound connections to redis servers.
Package rmux/connection provides a way to open outbound connections to redis servers.
Package rmux/protocol provides a standard way to listen in on the redis protocol, look ahead to what commands are about to be executed, and ignore them or pass them on to another buffer, as desired
Package rmux/protocol provides a standard way to listen in on the redis protocol, look ahead to what commands are about to be executed, and ignore them or pass them on to another buffer, as desired

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL