protocol

package
v0.2.2-0...-bca02f7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 12, 2021 License: BSD-3-Clause Imports: 7 Imported by: 2

Documentation

Overview

Package rmux/protocol provides a standard way to listen in on the redis protocol, look ahead to what commands are about to be executed, and ignore them or pass them on to another buffer, as desired

Index

Constants

View Source
const (
	//This is set to match bufio's default buffer size, so taht we can safely read&ignore large chunks of data when necessary
	BUFFER_SIZE = 4096
)

Variables

View Source
var (
	//Used when we are trying to parse the size of a bulk or multibulk message, and do not receive a valid number
	ERROR_INVALID_INT            = &RecoverableError{"Did not receive valid int value"}
	ERROR_INVALID_COMMAND_FORMAT = &RecoverableError{"Bad command format provided"}
	//Used when we expect a redis bulk-format payload, and do not receive one
	ERROR_BAD_BULK_FORMAT = &RecoverableError{"Bad bulk format supplied"}
	ERROR_COMMAND_PARSE   = &RecoverableError{"Command parse error"}

	//Error for unsupported (deemed unsafe for multiplexing) commands
	ERR_COMMAND_UNSUPPORTED = &RecoverableError{"This command is not supported"}

	//Error for when we receive bad arguments (for multiplexing) accompanying a command
	ERR_BAD_ARGUMENTS = &RecoverableError{"Bad arguments for command"}

	//Commands declared once for convenience
	DEL_COMMAND         = []byte("del")
	SUBSCRIBE_COMMAND   = []byte("subscribe")
	UNSUBSCRIBE_COMMAND = []byte("unsubscribe")
	PING_COMMAND        = []byte("ping")
	INFO_COMMAND        = []byte("info")
	SHORT_PING_COMMAND  = []byte("PING")
	SELECT_COMMAND      = []byte("select")
	QUIT_COMMAND        = []byte("quit")

	//Responses declared once for convenience
	OK_RESPONSE   = []byte("+OK")
	PONG_RESPONSE = []byte("+PONG")
	ERR_RESPONSE  = []byte("$-1")

	//Redis expects \r\n newlines.  Using this means we can stop remembering that
	REDIS_NEWLINE = []byte("\r\n")

	//These functions should not be executed through a proxy.
	//If you know what you're doing, you are welcome to execute them directly on your server
	UNSAFE_FUNCTIONS = map[string]bool{
		"auth":         true,
		"bgrewriteaof": true,
		"bgsave":       true,
		"client":       true,
		"config":       true,
		"dbsize":       true,
		"discard":      true,
		"debug":        true,
		"exec":         true,
		"lastsave":     true,
		"move":         true,
		"monitor":      true,
		"migrate":      true,
		"multi":        true,
		"object":       true,
		"punsubscribe": true,
		"psubscribe":   true,
		"pubsub":       true,
		"randomkey":    true,
		"save":         true,
		"shutdown":     true,
		"slaveof":      true,
		"slowlog":      true,
		"subscribe":    true,
		"sync":         true,
		"time":         true,
		"unsubscribe":  true,
		"unwatch":      true,
		"watch":        true,
	}

	//These functions will only work if multiplexing is disabled.
	//It would be rather worthless to watch on one server, multi on another, and increment on a third
	SINGLE_DB_FUNCTIONS = map[string]bool{
		"bitop":       true,
		"brpoplpush":  true,
		"eval":        true,
		"keys":        true,
		"flushall":    true,
		"flushdb":     true,
		"mget":        true,
		"mset":        true,
		"msetnx":      true,
		"rename":      true,
		"renamenx":    true,
		"rpoplpush":   true,
		"script":      true,
		"sdiff":       true,
		"sdiffstore":  true,
		"sinter":      true,
		"sinterstore": true,
		"smove":       true,
		"sunion":      true,
		"sunionstore": true,
		"zinterstore": true,
		"zunionstore": true,
	}
)
View Source
var NIL_STRING []byte = nil

Functions

func CopyServerResponses

func CopyServerResponses(reader *bufio.Reader, localBuffer *FlexibleWriter, numResponses int) (err error)

Copies a server response from the remoteBuffer into your localBuffer If a protocol or buffer error is encountered, it is bubbled up

func IsSupportedFunction

func IsSupportedFunction(command []byte, isMultiplexing, isMultipleArgument bool) bool

func ParseInt

func ParseInt(response []byte) (value int, err error)

Parses a string into an int. Differs from atoi in that this only parses positive dec ints--hex, octal, and negatives are not allowed Upon invalid character received, a PANIC_INVALID_INT is caught and err'd

func ScanArray

func ScanArray(data []byte, atEOF bool) (advance int, token []byte, err error)

=============== Array ==============

func ScanBulkString

func ScanBulkString(data []byte, atEOF bool) (advance int, token []byte, err error)

=============== Bulk String ==============

func ScanError

func ScanError(data []byte, atEOF bool) (advance int, token []byte, err error)

=============== Errors ==============

func ScanInlineString

func ScanInlineString(data []byte, atEOF bool) (advance int, token []byte, err error)

=============== Inline String ==============

func ScanInteger

func ScanInteger(data []byte, atEOF bool) (advance int, token []byte, err error)

=============== Integer ==============

func ScanResp

func ScanResp(data []byte, atEOF bool) (advance int, token []byte, err error)

func ScanSimpleString

func ScanSimpleString(data []byte, atEOF bool) (advance int, token []byte, err error)

=============== Simple String ==============

func WriteError

func WriteError(line []byte, dest *FlexibleWriter, flush bool) (err error)

Writes the given error to the buffer, preceded by a '-' and followed by a GO_NEWLINE Bubbles any errors from underlying writer

func WriteLine

func WriteLine(line []byte, destination *FlexibleWriter, flush bool) (err error)

Writes the given line to the buffer, followed by a GO_NEWLINE Does not explicitly flush the buffer. Final lines in a sequence should be followed by FlushLine

Types

type Command

type Command interface {
	GetCommand() []byte
	GetBuffer() []byte
	GetFirstArg() []byte
	GetArgCount() int
}

Represents a redis client that is connected to our rmux server

func ParseCommand

func ParseCommand(b []byte) (command Command, err error)

type InlineCommand

type InlineCommand struct {
	Buffer  []byte
	Command []byte
	// Usually denotes the key
	FirstArg []byte
	ArgCount int
}

func NewInlineCommand

func NewInlineCommand() *InlineCommand

func ParseInlineCommand

func ParseInlineCommand(b []byte) (*InlineCommand, error)

func (*InlineCommand) GetArgCount

func (this *InlineCommand) GetArgCount() int

func (*InlineCommand) GetBuffer

func (this *InlineCommand) GetBuffer() []byte

func (*InlineCommand) GetCommand

func (this *InlineCommand) GetCommand() []byte

Satisfy Command Interface

func (*InlineCommand) GetFirstArg

func (this *InlineCommand) GetFirstArg() []byte

type MultibulkCommand

type MultibulkCommand struct {
	Buffer  []byte
	Command []byte
	// Usually denotes the key
	FirstArg []byte
	ArgCount int
}

func ParseMultibulkCommand

func ParseMultibulkCommand(b []byte) (*MultibulkCommand, error)

func (*MultibulkCommand) GetArgCount

func (this *MultibulkCommand) GetArgCount() int

func (*MultibulkCommand) GetBuffer

func (this *MultibulkCommand) GetBuffer() []byte

func (*MultibulkCommand) GetCommand

func (this *MultibulkCommand) GetCommand() []byte

Satisfy Command Interface

func (*MultibulkCommand) GetFirstArg

func (this *MultibulkCommand) GetFirstArg() []byte

type RecoverableError

type RecoverableError struct {
	// contains filtered or unexported fields
}

func (*RecoverableError) Error

func (e *RecoverableError) Error() string

type RespScanner

type RespScanner struct {
	// contains filtered or unexported fields
}

A partially-built scanner that can handle >64kb

func NewRespScanner

func NewRespScanner(r io.Reader) *RespScanner

func (*RespScanner) Bytes

func (s *RespScanner) Bytes() []byte

Returns the most recent token generated by a successful call to Scan() The array's contents may be invalid on the next call to scan, make sure to copy it somewhere safe.

func (*RespScanner) Err

func (s *RespScanner) Err() error

func (*RespScanner) Scan

func (s *RespScanner) Scan() bool

type SimpleCommand

type SimpleCommand struct {
	Buffer  []byte
	Command []byte
}

func ParseSimpleCommand

func ParseSimpleCommand(b []byte) (*SimpleCommand, error)

func (*SimpleCommand) GetArgCount

func (this *SimpleCommand) GetArgCount() int

func (*SimpleCommand) GetBuffer

func (this *SimpleCommand) GetBuffer() []byte

func (*SimpleCommand) GetCommand

func (this *SimpleCommand) GetCommand() []byte

Satisfy Command Interface

func (*SimpleCommand) GetFirstArg

func (this *SimpleCommand) GetFirstArg() []byte

type StringCommand

type StringCommand struct {
	Buffer  []byte
	Command []byte
}

func ParseStringCommand

func ParseStringCommand(b []byte) (*StringCommand, error)

func (*StringCommand) GetArgCount

func (this *StringCommand) GetArgCount() int

func (*StringCommand) GetBuffer

func (this *StringCommand) GetBuffer() []byte

func (*StringCommand) GetCommand

func (this *StringCommand) GetCommand() []byte

Satisfy Command Interface

func (*StringCommand) GetFirstArg

func (this *StringCommand) GetFirstArg() []byte

type TimedNetReadWriter

type TimedNetReadWriter struct {
	//The underlying connection used by our remote (redis) connection
	NetConnection net.Conn
	//Timeout to use for read operations
	ReadTimeout time.Duration
	//Timeout to use for write operations
	WriteTimeout time.Duration
}

A ReadWriter for a NetConnection's read/writer, that allows for sane & reliable timeouts applied to all of its operations

func NewTimedNetReadWriter

func NewTimedNetReadWriter(connection net.Conn, readTimeout, writeTimeout time.Duration) (newReadWriter *TimedNetReadWriter)

Initializes a TimedNetReadWriter, with the given timeouts

func (*TimedNetReadWriter) Read

func (myReadWriter *TimedNetReadWriter) Read(line []byte) (n int, err error)

Wraps the net.connection's read function with a ReadDeadline

func (*TimedNetReadWriter) Write

func (myReadWriter *TimedNetReadWriter) Write(line []byte) (n int, err error)

Wraps the net.connection's write function with a WriteDeadline

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL