redplex

package module
v0.0.0-...-248ac9a Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 27, 2023 License: MIT Imports: 15 Imported by: 0

README

redplex

Project status: redplex is actively maintained and was running in production at Mixer from 2017 to 2021.

redplex is a tool to multiplex Redis pubsub. It implements the Redis protocol and is a drop-in replacement for existing Redis pubsub servers, simply boot redplex and change your port number. This is a useful tool in situations where you have very many readers for pubsub events, as Redis pubsub throughput is inversely proportional to the number of subscribers for the event.

Note: some Redis clients have health checks that call commands like INFO on boot. You'll want to turn these off, as redplex does not implement commands expect for SUBSCRIBE, PSUBSCRIBE, UNSUBSCRIBE, PUNSUBSCRIBE, and EXIT.

Usage
➜  redplex git:(master) ./redplex --help
usage: redplex [<flags>]

Flags:
      --help                     Show context-sensitive help (also try --help-long and --help-man).
  -l, --listen="127.0.0.1:3000"  Address to listen on
  -n, --network="tcp"            Network to listen on
      --remote="127.0.0.1:6379"  Remote address of the Redis server
      --remote-network="tcp"     Remote network to dial through (usually tcp or tcp6)
      --sentinels=SENTINELS ...  A list of Redis sentinel addresses
      --sentinel-name=SENTINEL-NAME
                                 The name of the sentinel master
      --log-level="info"         Log level (one of debug, info, warn, error
      --dial-timeout=10s         Timeout connecting to Redis
      --write-timeout=2s         Timeout during write operations
      --pprof-server=PPROF-SERVER
                                 Address to bind a pprof server on. Not bound if empty.

Documentation

Index

Constants

View Source
const (
	// MessageError is the prefix for Redis line errors in the protocol.
	MessageError = '-'
	// MessageStatus is the prefix for Redis line statues in the protocol.
	MessageStatus = '+'
	// MessageInt is the prefix for Redis line integers in the protocol.
	// It's followed by the plain text number
	MessageInt = ':'
	// MessageBulk is the prefix for Redis bulk messages. It's followed by the
	// bulk message size, and CRLF, and then the full bulk message bytes.
	MessageBulk = '$'
	// MessageMutli   is the prefix for Redis "multi" messages (arrays).
	// It's followed by the array length, and CRLF, and then the next N messages
	// as elements of the array/
	MessageMutli = '*'
)

Variables

View Source
var (

	// ErrWrongMessage is returned in Parse commands if the command
	// is not a pubsub command.
	ErrWrongMessage = errors.New("redplex/protocol: unexpected message type")
)

Functions

func ParseBulkMessage

func ParseBulkMessage(line []byte) ([]byte, error)

ParseBulkMessage expects that the byte slice starts with the length delimiter, and returns the contained message. Does not include the trailing delimiter.

func ParseRequest

func ParseRequest(r *bufio.Reader) (method string, args [][]byte, err error)

ParseRequest parses a method and arguments from the reader.

func ReadNextFull

func ReadNextFull(writeTo *bytes.Buffer, r *bufio.Reader) error

ReadNextFull copies the next full command from the reader to the buffer.

func SubscribeResponse

func SubscribeResponse(command string, channel []byte) []byte

SubscribeResponse returns an appropriate response to the given subscribe or unsubscribe command.

Types

type Dialer

type Dialer interface {
	// Dial attempts to create a connection to the server.
	Dial() (net.Conn, error)
}

The Dialer is a type that can create a TCP connection to the Redis server.

type DirectDialer

type DirectDialer struct {
	// contains filtered or unexported fields
}

DirectDialer creates a direct connection to a single Redis server or IP.

func NewDirectDialer

func NewDirectDialer(network string, address string, password string, useTLS bool, timeout time.Duration) DirectDialer

NewDirectDialer creates a DirectDialer that dials to the given address. If the timeout is 0, a default value of 5 seconds will be used.

func (DirectDialer) Dial

func (d DirectDialer) Dial() (cnx net.Conn, err error)

Dial implements Dialer.Dial

type Listener

type Listener struct {
	IsPattern bool
	Channel   string
	Conn      Writable
}

The Listener wraps a function that's called when a pubsub message it sent.

type PublishCommand

type PublishCommand struct {
	IsPattern        bool
	ChannelOrPattern []byte
}

PublishCommand is returned from ParsePublishCommand.

func ParsePublishCommand

func ParsePublishCommand(b []byte) (cmd PublishCommand, err error)

ParsePublishCommand parses the given pubsub command efficiently. Returns a NotPubsubError if the command isn't a pubsub command.

type Pubsub

type Pubsub struct {
	// contains filtered or unexported fields
}

Pubsub manages the connection of redplex to the remote pubsub server.

func NewPubsub

func NewPubsub(dialer Dialer, writeTimeout time.Duration) *Pubsub

NewPubsub creates a new Pubsub instance.

func (*Pubsub) Close

func (p *Pubsub) Close()

Close frees resources associated with the pubsub server.

func (*Pubsub) Start

func (p *Pubsub) Start()

Start creates a pubsub listener to proxy connection data.

func (*Pubsub) Subscribe

func (p *Pubsub) Subscribe(listener Listener)

Subscribe adds the listener to the channel.

func (*Pubsub) Unsubscribe

func (p *Pubsub) Unsubscribe(listener Listener)

Unsubscribe removes the listener from the channel.

func (*Pubsub) UnsubscribeAll

func (p *Pubsub) UnsubscribeAll(c Writable)

UnsubscribeAll removes all channels the writer is subscribed to.

type Request

type Request []byte

Request is a byte slice with utility methods for building up Redis commands.

func NewRequest

func NewRequest(name string, argCount int) *Request

NewRequest creates a new request to send to the Redis server.

func (*Request) Bulk

func (r *Request) Bulk(arg []byte) *Request

Bulk adds a new bulk argument value to the request.

func (*Request) Bytes

func (r *Request) Bytes() []byte

Bytes returns the request bytes.

func (*Request) Int

func (r *Request) Int(n int) *Request

Int adds a new integer argument value to the request.

type SentinelDialer

type SentinelDialer struct {
	// contains filtered or unexported fields
}

The SentinelDialer dials into the Redis cluster master as defined by the assortment of sentinel servers.

func NewSentinelDialer

func NewSentinelDialer(network string, sentinels []string, masterName string, password string, sentinelPassword string, useTLS bool, timeout time.Duration) *SentinelDialer

NewSentinelDialer creates a SentinelDialer.

func (*SentinelDialer) Dial

func (s *SentinelDialer) Dial() (net.Conn, error)

Dial implements Dialer.Dial. It looks up and connects to the Redis master dictated by the sentinels. The connection will be closed if we detect that the master changes.

type Server

type Server struct {
	// contains filtered or unexported fields
}

Server is the redplex server which accepts connections and talks to the underlying Pubsub implementation.

func NewServer

func NewServer(l net.Listener, pubsub *Pubsub) *Server

NewServer creates a new Redplex server. It listens for connections from the listener and uses the Dialer to proxy to the remote server.

func (*Server) Close

func (s *Server) Close()

Close frees resources associated with the server.

func (*Server) Listen

func (s *Server) Listen() error

Listen accepts and serves incoming connections.

type Writable

type Writable interface {
	Write(b []byte)
}

Writable is an interface passed into Pubsub. It's called when we want to publish data.

Directories

Path Synopsis
cmd

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL