redjet

package module
v0.7.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 13, 2024 License: CC0-1.0 Imports: 15 Imported by: 1

README

redjet

Go Reference ci Coverage Status Go Report Card

redjet is a high-performance Go library for Redis. Its hallmark feature is a low-allocation, streaming API. See the benchmarks section for more details.

Unlike redigo and go-redis, redjet does not provide a function for every Redis command. Instead, it offers a generic interface that supports all commands and options. While this approach has less type-safety, it provides forward compatibility with new Redis features.

In the aim of both performance and ease-of-use, redjet attempts to provide an API that closely resembles the protocol. For example, the Command method is really a Pipeline of size 1.

Table of Contents

Basic Usage

Install:

go get github.com/coder/redjet@latest

For the most part, you can interact with Redis using a familiar interface:

package main

import (
    "context"
    "fmt"
    "log"

    "github.com/coder/redjet"
)

func main() {
    client := redjet.New("localhost:6379")
    ctx := context.Background()

    err := client.Command(ctx, "SET", "foo", "bar").Ok()
    // check error

    got, err := client.Command(ctx, "GET", "foo").Bytes()
    // check error
    // got == []byte("bar")
}

Streaming

To minimize allocations, call (*Pipeline).WriteTo instead of (*Pipeline).Bytes. WriteTo streams the response directly to an io.Writer such as a file or HTTP response.

For example:

_, err := client.Command(ctx, "GET", "big-object").WriteTo(os.Stdout)
// check error

Similarly, you can pass in a value that implements redjet.LenReader to Command to stream larger values into Redis. Unfortunately, the API cannot accept a regular io.Reader because bulk string messages in the Redis protocol are length-prefixed.

Here's an example of streaming a large file into Redis:

bigFile, err := os.Open("bigfile.txt")
// check error
defer bigFile.Close()

stat, err := bigFile.Stat()
// check error

err = client.Command(
    ctx, "SET", "bigfile",
    redjet.NewLenReader(bigFile, stat.Size()),
).Ok()
// check error

If you have no way of knowing the size of your blob in advance and still want to avoid large allocations, you may chunk a stream into Redis using repeated APPEND commands.

Pipelining

redjet supports pipelining via the (*Client).Pipeline method. This method accepts a Pipeline, potentially that of a previous, open command.

// Set foo0, foo1, ..., foo99 to "bar", and confirm that each succeeded.
//
// This entire example only takes one round-trip to Redis!
var p *Pipeline
for i := 0; i < 100; i++ {
    p = client.Pipeline(p, "SET", fmt.Sprintf("foo%d", i), "bar")
}

for r.Next() {
    if err := p.Ok(); err != nil {
        log.Fatal(err)
    }
}
p.Close() // allow the underlying connection to be reused.

PubSub

redjet suports PubSub via the NextSubMessage method. For example:

// Subscribe to a channel
sub := client.Command(ctx, "SUBSCRIBE", "my-channel")
sub.NextSubMessage() // ignore the first message, which is a confirmation of the subscription

// Publish a message to the channel
n, err := client.Command(ctx, "PUBLISH", "my-channel", "hello world").Int()
// check error
// n == 1, since there is one subscriber

// Receive the message
sub.NextSubMessage()
// sub.Payload == "hello world"
// sub.Channel == "my-channel"
// sub.Type == "message"

Note that NextSubMessage will block until a message is received. To interrupt the subscription, cancel the context passed to Command.

Once a connection enters subscribe mode, the internal pool does not re-use it.

It is possible to subscribe to a channel in a performant, low-allocation way via the public API. NextSubMessage is just a convenience method.

JSON

redjet supports convenient JSON encoding and decoding via the (*Pipeline).JSON method. For example:

type Person struct {
    Name string `json:"name"`
    Age  int    `json:"age"`
}

// Set a person
// Unknown argument types are automatically encoded to JSON.
err := client.Command(ctx, "SET", "person", Person{
    Name: "Alice",
    Age:  30,
}).Ok()
// check error

// Get a person
var p Person
client.Command(ctx, "GET", "person").JSON(&p)
// check error

// p == Person{Name: "Alice", Age: 30}

Connection Pooling

Redjet provides automatic connection pooling. Configuration knobs exist within the Client struct that may be changed before any Commands are issued.

If you want synchronous command execution over the same connection, use the Pipeline method and consume the Pipeline after each call to Pipeline. Storing a long-lived Pipeline offers the same functionality as storing a long-lived connection.

Benchmarks

On a pure throughput basis, redjet will perform similarly to redigo and go-redis. But, since redjet doesn't allocate memory for the entire response object, it consumes far less resources when handling large responses.

Here are some benchmarks (reproducible via make gen-bench) to illustrate:

.fullname: Get/1_B-10
 │   redjet    │               redigo               │           go-redis            │               rueidis                │
 │   sec/op    │   sec/op     vs base               │   sec/op     vs base          │    sec/op     vs base                │
   908.2n ± 2%   962.4n ± 1%  +5.97% (p=0.000 n=10)   913.8n ± 3%  ~ (p=0.280 n=10)   1045.0n ± 1%  +15.06% (p=0.000 n=10)

 │    redjet     │                redigo                │            go-redis             │               rueidis                │
 │      B/s      │      B/s       vs base               │      B/s       vs base          │     B/s       vs base                │
   1074.2Ki ± 2%   1015.6Ki ± 1%  -5.45% (p=0.000 n=10)   1069.3Ki ± 2%  ~ (p=0.413 n=10)   937.5Ki ± 1%  -12.73% (p=0.000 n=10)

 │  redjet   │            redigo            │           go-redis            │            rueidis            │
 │   B/op    │    B/op     vs base          │    B/op      vs base          │    B/op      vs base          │
   0.00 ± 0%   41.00 ± 0%  ? (p=0.000 n=10)   275.50 ± 2%  ? (p=0.000 n=10)   249.00 ± 0%  ? (p=0.000 n=10)

 │   redjet   │            redigo            │           go-redis           │           rueidis            │
 │ allocs/op  │ allocs/op   vs base          │ allocs/op   vs base          │ allocs/op   vs base          │
   0.000 ± 0%   3.000 ± 0%  ? (p=0.000 n=10)   4.000 ± 0%  ? (p=0.000 n=10)   2.000 ± 0%  ? (p=0.000 n=10)

.fullname: Get/1.0_kB-10
 │   redjet    │               redigo                │              go-redis               │               rueidis               │
 │   sec/op    │   sec/op     vs base                │   sec/op     vs base                │   sec/op     vs base                │
   1.302µ ± 2%   1.802µ ± 1%  +38.42% (p=0.000 n=10)   1.713µ ± 3%  +31.58% (p=0.000 n=10)   1.645µ ± 1%  +26.35% (p=0.000 n=10)

 │    redjet    │                redigo                │               go-redis               │               rueidis                │
 │     B/s      │     B/s       vs base                │     B/s       vs base                │     B/s       vs base                │
   750.4Mi ± 2%   542.1Mi ± 1%  -27.76% (p=0.000 n=10)   570.3Mi ± 3%  -24.01% (p=0.000 n=10)   593.8Mi ± 1%  -20.87% (p=0.000 n=10)

 │    redjet    │             redigo             │            go-redis            │            rueidis             │
 │     B/op     │     B/op      vs base          │     B/op      vs base          │     B/op      vs base          │
   0.000Ki ± 0%   1.039Ki ± 0%  ? (p=0.000 n=10)   1.392Ki ± 0%  ? (p=0.000 n=10)   1.248Ki ± 1%  ? (p=0.000 n=10)

 │   redjet   │            redigo            │           go-redis           │           rueidis            │
 │ allocs/op  │ allocs/op   vs base          │ allocs/op   vs base          │ allocs/op   vs base          │
   0.000 ± 0%   3.000 ± 0%  ? (p=0.000 n=10)   4.000 ± 0%  ? (p=0.000 n=10)   2.000 ± 0%  ? (p=0.000 n=10)

.fullname: Get/1.0_MB-10
 │   redjet    │            redigo             │              go-redis               │            rueidis            │
 │   sec/op    │   sec/op     vs base          │   sec/op     vs base                │   sec/op     vs base          │
   472.5µ ± 7%   477.3µ ± 2%  ~ (p=0.190 n=10)   536.8µ ± 6%  +13.61% (p=0.000 n=10)   475.3µ ± 6%  ~ (p=0.684 n=10)

 │    redjet    │             redigo             │               go-redis               │            rueidis             │
 │     B/s      │     B/s       vs base          │     B/s       vs base                │     B/s       vs base          │
   2.067Gi ± 8%   2.046Gi ± 2%  ~ (p=0.190 n=10)   1.819Gi ± 6%  -11.98% (p=0.000 n=10)   2.055Gi ± 6%  ~ (p=0.684 n=10)

 │   redjet    │                    redigo                    │                   go-redis                   │                   rueidis                    │
 │    B/op     │      B/op        vs base                     │      B/op        vs base                     │      B/op        vs base                     │
   51.00 ± 12%   1047849.50 ± 0%  +2054506.86% (p=0.000 n=10)   1057005.00 ± 0%  +2072458.82% (p=0.000 n=10)   1048808.50 ± 0%  +2056387.25% (p=0.000 n=10)

 │   redjet   │               redigo                │              go-redis               │               rueidis               │
 │ allocs/op  │ allocs/op   vs base                 │ allocs/op   vs base                 │ allocs/op   vs base                 │
   1.000 ± 0%   3.000 ± 0%  +200.00% (p=0.000 n=10)   4.000 ± 0%  +300.00% (p=0.000 n=10)   2.000 ± 0%  +100.00% (p=0.000 n=10)

Limitations

  • redjet does not have convenient support for client side caching. But, the redjet API is flexible enough that a client could implement it themselves by following the instructions here.
  • RESP3 is not supported. Practically, this means that connections aren't multiplexed, and other Redis libraries may perform better in high-concurrency scenarios.
  • Certain features have not been tested but may still work:
    • Redis Streams
    • Monitor

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func IsAuthError

func IsAuthError(err error) bool

func IsUnknownCommand

func IsUnknownCommand(err error) bool

IsUnknownCommand returns whether err is an "unknown command" error.

func SetupAuth added in v0.7.0

func SetupAuth(
	username string,
	password string,
) func(ctx context.Context, client *Client, pipe *Pipeline) error

SetupAuth returns a Setup function that authenticates with the given username and password.

AuthUsername is the username used for authentication.

If set, AuthPassword must also be set. If not using Redis ACLs, just set AuthPassword.

See more: https://redis.io/commands/auth/ AuthPassword is the password used for authentication. Authentication must be set before any other commands are sent, and must not change during the lifetime of the client.

See more: https://redis.io/commands/auth/

Types

type Client

type Client struct {
	// ConnectionPoolSize limits the size of the connection pool. If 0, connections
	// are not pooled.
	ConnectionPoolSize int

	// IdleTimeout is the amount of time after which an idle connection will
	// be closed.
	IdleTimeout time.Duration

	// Dial is the function used to create new connections.
	Dial func(ctx context.Context) (net.Conn, error)

	// Setup is called after a new connection is established, but before any
	// commands are sent. It is useful for selecting a database or authenticating.
	//
	// See SetupAuth for authenticating with a username and password.
	Setup func(ctx context.Context, client *Client, pipe *Pipeline) error
	// contains filtered or unexported fields
}

func New

func New(addr string) *Client

New returns a new client that connects to addr with default settings.

func NewFromURL added in v0.6.0

func NewFromURL(rawURL string) (*Client, error)

func (*Client) Close

func (c *Client) Close() error

func (*Client) Command

func (c *Client) Command(ctx context.Context, cmd string, args ...any) *Pipeline

Command sends a command to the server and returns the result. The error is encoded into the result for ergonomics.

See Pipeline for more information on argument types.

The caller should call Close on the result when finished with it.

func (*Client) Pipeline

func (c *Client) Pipeline(ctx context.Context, r *Pipeline, cmd string, args ...any) *Pipeline

Pipeline sends a command to the server and returns the promise of a result. r may be nil, as in the case of the first command in a pipeline. Each successive call to Pipeline should re-use the last returned Pipeline.

Known arg types are strings, []byte, LenReader, and fmt.Stringer. All other types will be converted to JSON.

It is safe to keep a pipeline running for a long time, with many send and receive cycles.

Example:

p := client.Pipeline(ctx, nil, "SET", "foo", "bar")
defer p.Close()

p = client.Pipeline(ctx, r, "GET", "foo")
// Read the result of SET first.
err := p.Ok()
if err != nil {
	// handle error
}

got, err := p.Bytes()
if err != nil {
	// handle error
}
fmt.Println(string(got))

func (*Client) PoolStats

func (c *Client) PoolStats() PoolStats

PoolStats returns statistics about the connection pool.

type Error

type Error struct {
	// contains filtered or unexported fields
}

Error represents an error returned by the Redis server, as opposed to a a protocol or network error.

func (*Error) Error

func (e *Error) Error() string

type LenReader

type LenReader interface {
	Len() int
	io.Reader
}

LenReader is an io.Reader that also knows its length. A new one may be created with NewLenReader.

func NewLenReader

func NewLenReader(r io.Reader, size int) LenReader

type Pipeline

type Pipeline struct {
	// CloseOnRead determines whether the Pipeline is closed after the first read.
	//
	// It is set to True when the result is returned from Command, and
	// False when it is returned from Pipeline.
	//
	// It is ignored if in the middle of reading an array, or if the result
	// is of a subscribe command.
	CloseOnRead bool
	// contains filtered or unexported fields
}

Pipeline is the result of a command.

Its methods are not safe for concurrent use.

func (*Pipeline) ArrayLength

func (r *Pipeline) ArrayLength() (int, error)

ArrayLength reads the next message as an array length. It does not close the Pipeline even if CloseOnRead is true.

func (*Pipeline) ArrayStack

func (r *Pipeline) ArrayStack() []int

ArrayStack returns the position of the result within an array.

The returned slice must not be modified.

func (*Pipeline) Bytes

func (r *Pipeline) Bytes() ([]byte, error)

Bytes returns the result as a byte slice.

Refer to r.CloseOnRead for whether the result is closed after the first read.

func (*Pipeline) Close

func (r *Pipeline) Close() error

Close releases all resources associated with the result.

It is safe to call Close multiple times.

func (*Pipeline) Error

func (r *Pipeline) Error() string

func (*Pipeline) HasMore added in v0.7.2

func (r *Pipeline) HasMore() bool

HasMore returns true if there are more results to read.

func (*Pipeline) Int

func (r *Pipeline) Int() (int, error)

Int returns the result as an integer.

Refer to r.CloseOnRead for whether the result is closed after the first read.

func (*Pipeline) JSON

func (r *Pipeline) JSON(v interface{}) error

JSON unmarshals the result into v.

func (*Pipeline) Next

func (r *Pipeline) Next() bool

Next returns true if there are more results to read.

func (*Pipeline) NextSubMessage

func (r *Pipeline) NextSubMessage() (*SubMessage, error)

NextSubMessage reads the next subscribe from the pipeline. Read more: https://redis.io/docs/manual/pubsub/.

It does not close the Pipeline even if CloseOnRead is true.

func (*Pipeline) Ok

func (r *Pipeline) Ok() error

Ok returns whether the result is "OK". Note that it may fail even if the

command succeeded. For example, a successful GET will return a value.

func (*Pipeline) String

func (r *Pipeline) String() (string, error)

String returns the result as a string.

Refer to r.CloseOnRead for whether the result is closed after the first read.

func (*Pipeline) Strings

func (r *Pipeline) Strings() ([]string, error)

Strings returns the array result as a slice of strings.

func (*Pipeline) WriteTo

func (r *Pipeline) WriteTo(w io.Writer) (int64, error)

WriteTo writes the result to w.

r.CloseOnRead sets whether the result is closed after the first read.

The result is never automatically closed if in progress of reading an array.

type PoolStats

type PoolStats struct {
	FreeConns      int
	Returns        int64
	FullPoolCloses int64
	CleanCycles    int64
}

type SubMessage

type SubMessage struct {
	// Type is either "subscribe" (acknowledgement of subscription) or "message"
	Type    SubMessageType
	Channel string

	// Payload is the number of channels subscribed to if Type is "subscribe",
	Payload string
}

SubMessage is a message received from a pubsub subscription.

type SubMessageType

type SubMessageType string
const (
	SubMessageSubscribe SubMessageType = "subscribe"
	SubMessageMessage   SubMessageType = "message"
)

Directories

Path Synopsis
Package redcache provides a simple cache implementation using Redis as a backend.
Package redcache provides a simple cache implementation using Redis as a backend.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL