valkey

package module
v1.0.35 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 22, 2024 License: Apache-2.0 Imports: 27 Imported by: 0

README

valkey-go

Go Reference CircleCI Go Report Card codecov

A fast Golang Valkey client that does auto pipelining and supports server-assisted client-side caching.

Features


Getting Started

package main

import (
	"context"
	"github.com/rueian/valkey-go"
)

func main() {
	client, err := valkey.NewClient(valkey.ClientOption{InitAddress: []string{"127.0.0.1:6379"}})
	if err != nil {
		panic(err)
	}
	defer client.Close()

	ctx := context.Background()
	// SET key val NX
	err = client.Do(ctx, client.B().Set().Key("key").Value("val").Nx().Build()).Error()
	// HGETALL hm
	hm, err := client.Do(ctx, client.B().Hgetall().Key("hm").Build()).AsStrMap()
}

Checkout more examples: Command Response Cheatsheet

Developer Friendly Command Builder

client.B() is the builder entrypoint to construct a valkey command:

Developer friendly command builder
Recorded by @FZambia Improving Centrifugo Redis Engine throughput and allocation efficiency with Rueidis Go library

Once a command is built, use either client.Do() or client.DoMulti() to send it to valkey.

You ❗️SHOULD NOT❗️ reuse the command to another client.Do() or client.DoMulti() call because it has been recycled to the underlying sync.Pool by default.

To reuse a command, use Pin() after Build() and it will prevent the command being recycled.

Pipelining

Auto Pipelining

All concurrent non-blocking valkey commands (such as GET, SET) are automatically pipelined, which reduces the overall round trips and system calls, and gets higher throughput. You can easily get the benefit of pipelining technique by just calling client.Do() from multiple goroutines concurrently. For example:

func BenchmarkPipelining(b *testing.B, client valkey.Client) {
	// the below client.Do() operations will be issued from
	// multiple goroutines and thus will be pipelined automatically.
	b.RunParallel(func(pb *testing.PB) {
		for pb.Next() {
			client.Do(context.Background(), client.B().Get().Key("k").Build()).ToString()
		}
	})
}
Benchmark comparison with go-redis v9

Comparing to go-redis, valkey-go has higher throughput across 1, 8, and 64 parallelism settings.

It is even able to achieve ~14x throughput over go-redis in a local benchmark of Macbook Pro 16" M1 Pro 2021. (see parallelism(64)-key(16)-value(64)-10)

client_test_set

Benchmark source code: https://github.com/rueian/rueidis-benchmark

A benchmark result performed on two GCP n2-highcpu-2 machines also shows that valkey can achieve higher throughput with lower latencies: https://github.com/redis/rueidis/pull/93

Manual Pipelining

Besides auto pipelining, you can also pipeline commands manually with DoMulti():

cmds := make(valkey.Commands, 0, 10)
for i := 0; i < 10; i++ {
    cmds = append(cmds, client.B().Set().Key("key").Value("value").Build())
}
for _, resp := range client.DoMulti(ctx, cmds...) {
    if err := resp.Error(); err != nil {
        panic(err)
    }
}

Server-Assisted Client-Side Caching

The opt-in mode of server-assisted client-side caching is enabled by default, and can be used by calling DoCache() or DoMultiCache() with client-side TTLs specified.

client.DoCache(ctx, client.B().Hmget().Key("mk").Field("1", "2").Cache(), time.Minute).ToArray()
client.DoMultiCache(ctx,
    valkey.CT(client.B().Get().Key("k1").Cache(), 1*time.Minute),
    valkey.CT(client.B().Get().Key("k2").Cache(), 2*time.Minute))

Cached responses will be invalidated either when being notified by valkey servers or when their client side TTLs are reached.

Benchmark

Server-assisted client-side caching can dramatically boost latencies and throughput just like having a valkey replica right inside your application. For example:

client_test_get

Benchmark source code: https://github.com/rueian/rueidis-benchmark

Client Side Caching Helpers

Use CacheTTL() to check the remaining client side TTL in seconds:

client.DoCache(ctx, client.B().Get().Key("k1").Cache(), time.Minute).CacheTTL() == 60

Use IsCacheHit() to verify that if the response came from the client side memory:

client.DoCache(ctx, client.B().Get().Key("k1").Cache(), time.Minute).IsCacheHit() == true

If the OpenTelemetry is enabled by the valkeyotel.NewClient(option), then there are also two metrics instrumented:

  • valkey_do_cache_miss
  • valkey_do_cache_hits
MGET/JSON.MGET Client Side Caching Helpers

valkey.MGetCache and valkey.JsonMGetCache are handy helpers fetching multiple keys across different slots through the client side caching. They will first group keys by slot to build MGET or JSON.MGET commands respectively and then send requests with only cache missed keys to valkey nodes.

Broadcast Mode Client Side Caching

Although the default is opt-in mode, you can use broadcast mode by specifying your prefixes in ClientOption.ClientTrackingOptions:

client, err := valkey.NewClient(valkey.ClientOption{
	InitAddress:           []string{"127.0.0.1:6379"},
	ClientTrackingOptions: []string{"PREFIX", "prefix1:", "PREFIX", "prefix2:", "BCAST"},
})
if err != nil {
	panic(err)
}
client.DoCache(ctx, client.B().Get().Key("prefix1:1").Cache(), time.Minute).IsCacheHit() == false
client.DoCache(ctx, client.B().Get().Key("prefix1:1").Cache(), time.Minute).IsCacheHit() == true

Please make sure that commands passed to DoCache() and DoMultiCache() are covered by your prefixes. Otherwise, their client-side cache will not be invalidated by valkey.

Client Side Caching with Cache Aside Pattern

Cache-Aside is a widely used caching strategy. valkeyaside can help you cache data into your client-side cache backed by Valkey. For example:

client, err := valkeyaside.NewClient(valkeyaside.ClientOption{
    ClientOption: valkey.ClientOption{InitAddress: []string{"127.0.0.1:6379"}},
})
if err != nil {
    panic(err)
}
val, err := client.Get(context.Background(), time.Minute, "mykey", func(ctx context.Context, key string) (val string, err error) {
    if err = db.QueryRowContext(ctx, "SELECT val FROM mytab WHERE id = ?", key).Scan(&val); err == sql.ErrNoRows {
        val = "_nil_" // cache nil to avoid penetration.
        err = nil     // clear err in case of sql.ErrNoRows.
    }
    return
})
// ...

Please refer to the full example at valkeyaside.

Disable Client Side Caching

Some Valkey provider doesn't support client-side caching, ex. Google Cloud Memorystore. You can disable client-side caching by setting ClientOption.DisableCache to true. This will also fall back client.DoCache() and client.DoMultiCache() to client.Do() and client.DoMulti().

Context Cancellation

client.Do(), client.DoMulti(), client.DoCache() and client.DoMultiCache() can return early if the context is canceled or the deadline is reached.

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
client.Do(ctx, client.B().Set().Key("key").Value("val").Nx().Build()).Error() == context.DeadlineExceeded

Please note that though operations can return early, the command is likely sent already.

Pub/Sub

To receive messages from channels, client.Receive() should be used. It supports SUBSCRIBE, PSUBSCRIBE and Valkey 7.0's SSUBSCRIBE:

err = client.Receive(context.Background(), client.B().Subscribe().Channel("ch1", "ch2").Build(), func(msg valkey.PubSubMessage) {
    // handle the msg
})

The provided handler will be called with received message.

It is important to note that client.Receive() will keep blocking until returning a value in the following cases:

  1. return nil when received any unsubscribe/punsubscribe message related to the provided subscribe command.
  2. return valkey.ErrClosing when the client is closed manually.
  3. return ctx.Err() when the ctx is done.
  4. return non-nil err when the provided subscribe command failed.

While the client.Receive() call is blocking, the Client is still able to accept other concurrent requests, and they are sharing the same tcp connection. If your message handler may take some time to complete, it is recommended to use the client.Receive() inside a client.Dedicated() for not blocking other concurrent requests.

Alternative PubSub Hooks

The client.Receive() requires users to provide a subscription command in advance. There is an alternative Dedicatedclient.SetPubSubHooks() allows users to subscribe/unsubscribe channels later.

c, cancel := client.Dedicate()
defer cancel()

wait := c.SetPubSubHooks(valkey.PubSubHooks{
	OnMessage: func(m valkey.PubSubMessage) {
		// Handle message. This callback will be called sequentially, but in another goroutine.
	}
})
c.Do(ctx, c.B().Subscribe().Channel("ch").Build())
err := <-wait // disconnected with err

If the hooks are not nil, the above wait channel is guaranteed to be close when the hooks will not be called anymore, and produce at most one error describing the reason. Users can use this channel to detect disconnection.

CAS Transaction

To do a CAS Transaction (WATCH + MULTI + EXEC), a dedicated connection should be used because there should be no unintentional write commands between WATCH and EXEC. Otherwise, the EXEC may not fail as expected.

client.Dedicated(func(c valkey.DedicatedClient) error {
    // watch keys first
    c.Do(ctx, c.B().Watch().Key("k1", "k2").Build())
    // perform read here
    c.Do(ctx, c.B().Mget().Key("k1", "k2").Build())
    // perform write with MULTI EXEC
    c.DoMulti(
        ctx,
        c.B().Multi().Build(),
        c.B().Set().Key("k1").Value("1").Build(),
        c.B().Set().Key("k2").Value("2").Build(),
        c.B().Exec().Build(),
    )
    return nil
})

Or use Dedicate() and invoke cancel() when finished to put the connection back to the pool.

c, cancel := client.Dedicate()
defer cancel()

c.Do(ctx, c.B().Watch().Key("k1", "k2").Build())
// do the rest CAS operations with the `client` who occupying a connection 

However, occupying a connection is not good in terms of throughput. It is better to use Lua script to perform optimistic locking instead.

Lua Script

The NewLuaScript or NewLuaScriptReadOnly will create a script which is safe for concurrent usage.

When calling the script.Exec, it will try sending EVALSHA first and fallback to EVAL if the server returns NOSCRIPT.

script := valkey.NewLuaScript("return {KEYS[1],KEYS[2],ARGV[1],ARGV[2]}")
// the script.Exec is safe for concurrent call
list, err := script.Exec(ctx, client, []string{"k1", "k2"}, []string{"a1", "a2"}).ToArray()

Streaming Read

client.DoStream() and client.DoMultiStream() can be used to send large valkey responses to an io.Writer directly without allocating them in the memory. They work by first sending commands to a dedicated connection acquired from a pool, then directly copying the response values to the given io.Writer, and finally recycling the connection.

s := client.DoMultiStream(ctx, client.B().Get().Key("a{slot1}").Build(), client.B().Get().Key("b{slot1}").Build())
for s.HasNext() {
    n, err := s.WriteTo(io.Discard)
    if valkey.IsValkeyNil(err) {
        // ...
    }
}

Note that these two methods will occupy connections until all responses are written to the given io.Writer. This can take a long time and hurt performance. Use the normal Do() and DoMulti() instead unless you want to avoid allocating memory for large valkey response.

Also note that these two methods only work with string, integer, and float valkey responses. And DoMultiStream currently does not support pipelining keys across multiple slots when connecting to a valkey cluster.

Memory Consumption Consideration

Each underlying connection in valkey allocates a ring buffer for pipelining. Its size is controlled by the ClientOption.RingScaleEachConn and the default value is 10 which results into each ring of size 2^10.

If you have many valkey connections, you may find that they occupy quite amount of memory. In that case, you may consider reducing ClientOption.RingScaleEachConn to 8 or 9 at the cost of potential throughput degradation.

You may also consider setting the value of ClientOption.PipelineMultiplex to -1, which will let valkey use only 1 connection for pipelining to each valkey node.

Instantiating a new Valkey Client

You can create a new valkey client using NewClient and provide several options.

// Connect to a single valkey node:
client, err := valkey.NewClient(valkey.ClientOption{
    InitAddress: []string{"127.0.0.1:6379"},
})

// Connect to a valkey cluster
client, err := valkey.NewClient(valkey.ClientOption{
    InitAddress: []string{"127.0.0.1:7001", "127.0.0.1:7002", "127.0.0.1:7003"},
    ShuffleInit: true,
})

// Connect to a valkey cluster and use replicas for read operations
client, err := valkey.NewClient(valkey.ClientOption{
    InitAddress: []string{"127.0.0.1:7001", "127.0.0.1:7002", "127.0.0.1:7003"},
    SendToReplicas: func(cmd valkey.Completed) bool {
        return cmd.IsReadOnly()
    },
})

// Connect to sentinels
client, err := valkey.NewClient(valkey.ClientOption{
    InitAddress: []string{"127.0.0.1:26379", "127.0.0.1:26380", "127.0.0.1:26381"},
    Sentinel: valkey.SentinelOption{
        MasterSet: "my_master",
    },
})
Valkey URL

You can use ParseURL or MustParseURL to construct a ClientOption.

The provided url must be started with either redis://, rediss:// or unix://.

Currently supported url parameters are db, dial_timeout, write_timeout, addr, protocol, client_cache, client_name, max_retries, and master_set.

// connect to a valkey cluster
client, err = valkey.NewClient(valkey.MustParseURL("redis://127.0.0.1:7001?addr=127.0.0.1:7002&addr=127.0.0.1:7003"))
// connect to a valkey node
client, err = valkey.NewClient(valkey.MustParseURL("redis://127.0.0.1:6379/0"))
// connect to a valkey sentinel
client, err = valkey.NewClient(valkey.MustParseURL("redis://127.0.0.1:26379/0?master_set=my_master"))

Arbitrary Command

If you want to construct commands that are absent from the command builder, you can use client.B().Arbitrary():

// This will result into [ANY CMD k1 k2 a1 a2]
client.B().Arbitrary("ANY", "CMD").Keys("k1", "k2").Args("a1", "a2").Build()

The command builder treats all the parameters as Valkey strings, which are binary safe. This means that users can store []byte directly into Valkey without conversion. And the valkey.BinaryString helper can convert []byte to string without copy. For example:

client.B().Set().Key("b").Value(valkey.BinaryString([]byte{...})).Build()

Treating all the parameters as Valkey strings also means that the command builder doesn't do any quoting, conversion automatically for users.

When working with RedisJSON, users frequently need to prepare JSON string in Valkey string. And valkey.JSON can help:

client.B().JsonSet().Key("j").Path("$.myStrField").Value(valkey.JSON("str")).Build()
// equivalent to
client.B().JsonSet().Key("j").Path("$.myStrField").Value(`"str"`).Build()

When working with vector similarity search, users can use valkey.VectorString32 and valkey.VectorString64 to build queries:

cmd := client.B().FtSearch().Index("idx").Query("*=>[KNN 5 @vec $V]").
    Params().Nargs(2).NameValue().NameValue("V", valkey.VectorString64([]float64{...})).
    Dialect(2).Build()
n, resp, err := client.Do(ctx, cmd).AsFtSearch()

Command Response Cheatsheet

While the command builder is developer friendly, the response parser is a little unfriendly. Developers must know what type of Valkey response will be returned from the server beforehand and which parser they should use. Otherwise, it panics.

It is hard to remember what type of message will be returned and which parsing to used. So, here are some common examples:

// GET
client.Do(ctx, client.B().Get().Key("k").Build()).ToString()
client.Do(ctx, client.B().Get().Key("k").Build()).AsInt64()
// MGET
client.Do(ctx, client.B().Mget().Key("k1", "k2").Build()).ToArray()
// SET
client.Do(ctx, client.B().Set().Key("k").Value("v").Build()).Error()
// INCR
client.Do(ctx, client.B().Incr().Key("k").Build()).AsInt64()
// HGET
client.Do(ctx, client.B().Hget().Key("k").Field("f").Build()).ToString()
// HMGET
client.Do(ctx, client.B().Hmget().Key("h").Field("a", "b").Build()).ToArray()
// HGETALL
client.Do(ctx, client.B().Hgetall().Key("h").Build()).AsStrMap()
// ZRANGE
client.Do(ctx, client.B().Zrange().Key("k").Min("1").Max("2").Build()).AsStrSlice()
// ZRANK
client.Do(ctx, client.B().Zrank().Key("k").Member("m").Build()).AsInt64()
// ZSCORE
client.Do(ctx, client.B().Zscore().Key("k").Member("m").Build()).AsFloat64()
// ZRANGE
client.Do(ctx, client.B().Zrange().Key("k").Min("0").Max("-1").Build()).AsStrSlice()
client.Do(ctx, client.B().Zrange().Key("k").Min("0").Max("-1").Withscores().Build()).AsZScores()
// ZPOPMIN
client.Do(ctx, client.B().Zpopmin().Key("k").Build()).AsZScore()
client.Do(ctx, client.B().Zpopmin().Key("myzset").Count(2).Build()).AsZScores()
// SCARD
client.Do(ctx, client.B().Scard().Key("k").Build()).AsInt64()
// SMEMBERS
client.Do(ctx, client.B().Smembers().Key("k").Build()).AsStrSlice()
// LINDEX
client.Do(ctx, client.B().Lindex().Key("k").Index(0).Build()).ToString()
// LPOP
client.Do(ctx, client.B().Lpop().Key("k").Build()).ToString()
client.Do(ctx, client.B().Lpop().Key("k").Count(2).Build()).AsStrSlice()
// SCAN
client.Do(ctx, client.B().Scan().Cursor(0).Build()).AsScanEntry()
// FT.SEARCH
client.Do(ctx, client.B().FtSearch().Index("idx").Query("@f:v").Build()).AsFtSearch()
// GEOSEARCH
client.Do(ctx, client.B().Geosearch().Key("k").Fromlonlat(1, 1).Bybox(1).Height(1).Km().Build()).AsGeosearch()

Use DecodeSliceOfJSON to scan array result

DecodeSliceOfJSON is useful when you would like to scan the results of an array into a slice of a specific struct.

type User struct {
	Name string `json:"name"`
}

// Set some values
if err = client.Do(ctx, client.B().Set().Key("user1").Value(`{"name": "name1"}`).Build()).Error(); err != nil {
	return err
}
if err = client.Do(ctx, client.B().Set().Key("user2").Value(`{"name": "name2"}`).Build()).Error(); err != nil {
	return err
}

// Scan MGET results into []*User
var users []*User // or []User is also scannable
if err := valkey.DecodeSliceOfJSON(client.Do(ctx, client.B().Mget().Key("user1", "user2").Build()), &users); err != nil {
	return err
}

for _, user := range users {
	fmt.Printf("%+v\n", user)
}
/*
&{name:name1}
&{name:name2}
*/
!!!!!! DO NOT DO THIS !!!!!!

Please make sure that all values in the result have same JSON structure.

// Set a pure string value
if err = client.Do(ctx, client.B().Set().Key("user1").Value("userName1").Build()).Error(); err != nil {
	return err
}

// Bad
users := make([]*User, 0)
if err := valkey.DecodeSliceOfJSON(client.Do(ctx, client.B().Mget().Key("user1").Build()), &users); err != nil {
	return err
}
// -> Error: invalid character 'u' looking for beginning of value
// in this case, use client.Do(ctx, client.B().Mget().Key("user1").Build()).AsStrSlice()

Contributing

Contributions are welcome, including issues, pull requests, and discussions. Contributions mean a lot to us and help us improve this library and the community!

Generate command builders

Command builders are generated based on the definitions in ./hack/cmds by running:

go generate
Testing

Please use the ./dockertest.sh script for running test cases locally. And please try your best to have 100% test coverage on code changes.

Documentation

Overview

Package valkey is a fast Golang Valkey RESP3 client that does auto pipelining and supports client side caching.

Index

Examples

Constants

View Source
const (
	// DefaultCacheBytes is the default value of ClientOption.CacheSizeEachConn, which is 128 MiB
	DefaultCacheBytes = 128 * (1 << 20)
	// DefaultRingScale is the default value of ClientOption.RingScaleEachConn, which results into having a ring of size 2^10 for each connection
	DefaultRingScale = 10
	// DefaultPoolSize is the default value of ClientOption.BlockingPoolSize
	DefaultPoolSize = 1000
	// DefaultDialTimeout is the default value of ClientOption.Dialer.Timeout
	DefaultDialTimeout = 5 * time.Second
	// DefaultTCPKeepAlive is the default value of ClientOption.Dialer.KeepAlive
	DefaultTCPKeepAlive = 1 * time.Second
	// DefaultReadBuffer is the default value of bufio.NewReaderSize for each connection, which is 0.5MiB
	DefaultReadBuffer = 1 << 19
	// DefaultWriteBuffer is the default value of bufio.NewWriterSize for each connection, which is 0.5MiB
	DefaultWriteBuffer = 1 << 19
	// MaxPipelineMultiplex is the maximum meaningful value for ClientOption.PipelineMultiplex
	MaxPipelineMultiplex = 8
)
View Source
const LibName = "valkey"
View Source
const LibVer = "1.0.35"

Variables

View Source
var (
	// ErrClosing means the Client.Close had been called
	ErrClosing = errors.New("valkey client is closing or unable to connect valkey")
	// ErrNoAddr means the ClientOption.InitAddress is empty
	ErrNoAddr = errors.New("no alive address in InitAddress")
	// ErrNoCache means your valkey does not support client-side caching and must set ClientOption.DisableCache to true
	ErrNoCache = errors.New("ClientOption.DisableCache must be true for valkey not supporting client-side caching or not supporting RESP3")
	// ErrRESP2PubSubMixed means your valkey does not support RESP3 and valkey can't handle SUBSCRIBE/PSUBSCRIBE/SSUBSCRIBE in mixed case
	ErrRESP2PubSubMixed = errors.New("valkey does not support SUBSCRIBE/PSUBSCRIBE/SSUBSCRIBE mixed with other commands in RESP2")
	// ErrDoCacheAborted means valkey abort EXEC request or connection closed
	ErrDoCacheAborted = errors.New("failed to fetch the cache because EXEC was aborted by valkey or connection closed")
	// ErrReplicaOnlyNotSupported means ReplicaOnly flag is not supported by
	// current client
	ErrReplicaOnlyNotSupported = errors.New("ReplicaOnly is not supported for single client")
	// ErrWrongPipelineMultiplex means wrong value for ClientOption.PipelineMultiplex
	ErrWrongPipelineMultiplex = errors.New("ClientOption.PipelineMultiplex must not be bigger than MaxPipelineMultiplex")
)
View Source
var ErrMSetNXNotSet = errors.New("MSETNX: no key was set")

ErrMSetNXNotSet is used in the MSetNX helper when the underlying MSETNX response is 0. Ref: https://redis.io/commands/msetnx/

View Source
var ErrNoSlot = errors.New("the slot has no valkey node")

ErrNoSlot indicates that there is no valkey node owns the key slot.

View Source
var ErrReplicaOnlyConflict = errors.New("ReplicaOnly conflicts with SendToReplicas option")
View Source
var Nil = &ValkeyError{typ: typeNull}

Nil represents a Valkey Nil message

Functions

func BinaryString

func BinaryString(bs []byte) string

BinaryString convert the provided []byte into a string without copy. It does what strings.Builder.String() does. Valkey Strings are binary safe, this means that it is safe to store any []byte into Valkey directly. Users can use this BinaryString helper to insert a []byte as the part of valkey command. For example:

client.B().Set().Key(valkey.BinaryString([]byte{0})).Value(valkey.BinaryString([]byte{0})).Build()

To read back the []byte of the string returned from the Valkey, it is recommended to use the ValkeyMessage.AsReader.

func DecodeSliceOfJSON

func DecodeSliceOfJSON[T any](result ValkeyResult, dest *[]T) error

DecodeSliceOfJSON is a helper that struct-scans each ValkeyMessage into dest, which must be a slice of pointer.

func IsValkeyBusyGroup

func IsValkeyBusyGroup(err error) bool

IsValkeyBusyGroup checks if it is a valkey BUSYGROUP message.

func IsValkeyNil

func IsValkeyNil(err error) bool

IsValkeyNil is a handy method to check if error is a valkey nil response. All valkey nil response returns as an error.

Example
client, err := NewClient(ClientOption{InitAddress: []string{"127.0.0.1:6379"}})
if err != nil {
	panic(err)
}
defer client.Close()

_, err = client.Do(context.Background(), client.B().Get().Key("not_exists").Build()).ToString()
if err != nil && IsValkeyNil(err) {
	fmt.Printf("it is a nil response")
}
Output:

func JSON

func JSON(in any) string

JSON convert the provided parameter into a JSON string. Users can use this JSON helper to work with RedisJSON commands. For example:

client.B().JsonSet().Key("a").Path("$.myField").Value(valkey.JSON("str")).Build()

func JsonMGet

func JsonMGet(client Client, ctx context.Context, keys []string, path string) (ret map[string]ValkeyMessage, err error)

JsonMGet is a helper that consults valkey directly with multiple keys by grouping keys within same slot into JSON.MGETs or multiple JSON.GETs

func JsonMGetCache

func JsonMGetCache(client Client, ctx context.Context, ttl time.Duration, keys []string, path string) (ret map[string]ValkeyMessage, err error)

JsonMGetCache is a helper that consults the client-side caches with multiple keys by grouping keys within same slot into multiple JSON.GETs

func JsonMSet

func JsonMSet(client Client, ctx context.Context, kvs map[string]string, path string) map[string]error

JsonMSet is a helper that consults valkey directly with multiple keys by grouping keys within same slot into JSON.MSETs or multiple JOSN.SETs

func MDel

func MDel(client Client, ctx context.Context, keys []string) map[string]error

MDel is a helper that consults the valkey directly with multiple keys by grouping keys within same slot into DELs

func MGet

func MGet(client Client, ctx context.Context, keys []string) (ret map[string]ValkeyMessage, err error)

MGet is a helper that consults the valkey directly with multiple keys by grouping keys within same slot into MGET or multiple GETs

func MGetCache

func MGetCache(client Client, ctx context.Context, ttl time.Duration, keys []string) (ret map[string]ValkeyMessage, err error)

MGetCache is a helper that consults the client-side caches with multiple keys by grouping keys within same slot into multiple GETs

func MSet

func MSet(client Client, ctx context.Context, kvs map[string]string) map[string]error

MSet is a helper that consults the valkey directly with multiple keys by grouping keys within same slot into MSETs or multiple SETs

func MSetNX

func MSetNX(client Client, ctx context.Context, kvs map[string]string) map[string]error

MSetNX is a helper that consults the valkey directly with multiple keys by grouping keys within same slot into MSETNXs or multiple SETNXs

func ToVector32

func ToVector32(s string) []float32

ToVector32 reverts VectorString32. User can use this to convert valkey response back to []float32.

func ToVector64

func ToVector64(s string) []float64

ToVector64 reverts VectorString64. User can use this to convert valkey response back to []float64.

func VectorString32

func VectorString32(v []float32) string

VectorString32 convert the provided []float32 into a string. Users can use this to build vector search queries:

client.B().FtSearch().Index("idx").Query("*=>[KNN 5 @vec $V]").
    Params().Nargs(2).NameValue().NameValue("V", valkey.VectorString32([]float32{1})).
    Dialect(2).Build()

func VectorString64

func VectorString64(v []float64) string

VectorString64 convert the provided []float64 into a string. Users can use this to build vector search queries:

client.B().FtSearch().Index("idx").Query("*=>[KNN 5 @vec $V]").
    Params().Nargs(2).NameValue().NameValue("V", valkey.VectorString64([]float64{1})).
    Dialect(2).Build()

Types

type AuthCredentials

type AuthCredentials struct {
	Username string
	Password string
}

AuthCredentials is the output of AuthCredentialsFn

type AuthCredentialsContext

type AuthCredentialsContext struct {
	Address net.Addr
}

AuthCredentialsContext is the parameter container of AuthCredentialsFn

type Builder

type Builder = cmds.Builder

Builder represents a command builder. It should only be created from the client.B() method.

type CacheEntry

type CacheEntry interface {
	Wait(ctx context.Context) (ValkeyMessage, error)
}

CacheEntry should be used to wait for single-flight response when cache missed.

type CacheStore

type CacheStore interface {
	// Flight is called when DoCache and DoMultiCache, with the requested client side ttl and the current time.
	// It should look up the store in single-flight manner and return one of the following three combinations:
	// Case 1: (empty ValkeyMessage, nil CacheEntry)     <- when cache missed, and valkey will send the request to valkey.
	// Case 2: (empty ValkeyMessage, non-nil CacheEntry) <- when cache missed, and valkey will use CacheEntry.Wait to wait for response.
	// Case 3: (non-empty ValkeyMessage, nil CacheEntry) <- when cache hit
	Flight(key, cmd string, ttl time.Duration, now time.Time) (v ValkeyMessage, e CacheEntry)
	// Update is called when receiving the response of the request sent by the above Flight Case 1 from valkey.
	// It should not only update the store but also deliver the response to all CacheEntry.Wait and return a desired client side PXAT of the response.
	// Note that the server side expire time can be retrieved from ValkeyMessage.CachePXAT.
	Update(key, cmd string, val ValkeyMessage) (pxat int64)
	// Cancel is called when the request sent by the above Flight Case 1 failed.
	// It should not only deliver the error to all CacheEntry.Wait but also remove the CacheEntry from the store.
	Cancel(key, cmd string, err error)
	// Delete is called when receiving invalidation notifications from valkey.
	// If the keys is nil then it should delete all non-pending cached entries under all keys.
	// If the keys is not nil then it should delete all non-pending cached entries under those keys.
	Delete(keys []ValkeyMessage)
	// Close is called when connection between valkey is broken.
	// It should flush all cached entries and deliver the error to all pending CacheEntry.Wait.
	Close(err error)
}

CacheStore is the store interface for the client side caching More detailed interface requirement can be found in cache_test.go

func NewSimpleCacheAdapter

func NewSimpleCacheAdapter(store SimpleCache) CacheStore

NewSimpleCacheAdapter converts a SimpleCache into CacheStore

type CacheStoreOption

type CacheStoreOption struct {
	// CacheSizeEachConn is valkey client side cache size that bind to each TCP connection to a single valkey instance.
	// The default is DefaultCacheBytes.
	CacheSizeEachConn int
}

CacheStoreOption will be passed to NewCacheStoreFn

type Cacheable

type Cacheable = cmds.Cacheable

Cacheable represents a completed Valkey command which supports server-assisted client side caching, and it should be created by the Cache() of command builder.

type CacheableTTL

type CacheableTTL struct {
	Cmd Cacheable
	TTL time.Duration
}

CacheableTTL is parameter container of DoMultiCache

func CT

func CT(cmd Cacheable, ttl time.Duration) CacheableTTL

CT is a shorthand constructor for CacheableTTL

type Client

type Client interface {
	CoreClient

	// DoCache is similar to Do, but it uses opt-in client side caching and requires a client side TTL.
	// The explicit client side TTL specifies the maximum TTL on the client side.
	// If the key's TTL on the server is smaller than the client side TTL, the client side TTL will be capped.
	//  client.Do(ctx, client.B().Get().Key("k").Cache(), time.Minute).ToString()
	// The above example will send the following command to valkey if cache miss:
	//  CLIENT CACHING YES
	//  PTTL k
	//  GET k
	// The in-memory cache size is configured by ClientOption.CacheSizeEachConn.
	// The cmd parameter is recycled after passing into DoCache() and should not be reused.
	DoCache(ctx context.Context, cmd Cacheable, ttl time.Duration) (resp ValkeyResult)

	// DoMultiCache is similar to DoCache, but works with multiple cacheable commands across different slots.
	// It will first group commands by slots and will send only cache missed commands to valkey.
	DoMultiCache(ctx context.Context, multi ...CacheableTTL) (resp []ValkeyResult)

	// DoStream send a command to valkey through a dedicated connection acquired from a connection pool.
	// It returns a ValkeyResultStream, but it does not read the command response until the ValkeyResultStream.WriteTo is called.
	// After the ValkeyResultStream.WriteTo is called, the underlying connection is then recycled.
	// DoStream should only be used when you want to stream valkey response directly to an io.Writer without additional allocation,
	// otherwise, the normal Do() should be used instead.
	// Also note that DoStream can only work with commands returning string, integer, or float response.
	DoStream(ctx context.Context, cmd Completed) ValkeyResultStream

	// DoMultiStream is similar to DoStream, but pipelines multiple commands to valkey.
	// It returns a MultiValkeyResultStream, and users should call MultiValkeyResultStream.WriteTo as many times as the number of commands sequentially
	// to read each command response from valkey. After all responses are read, the underlying connection is then recycled.
	// DoMultiStream should only be used when you want to stream valkey responses directly to an io.Writer without additional allocation,
	// otherwise, the normal DoMulti() should be used instead.
	// DoMultiStream does not support multiple key slots when connecting to a valkey cluster.
	DoMultiStream(ctx context.Context, multi ...Completed) MultiValkeyResultStream

	// Dedicated acquire a connection from the blocking connection pool, no one else can use the connection
	// during Dedicated. The main usage of Dedicated is CAS operation, which is WATCH + MULTI + EXEC.
	// However, one should try to avoid CAS operation but use Lua script instead, because occupying a connection
	// is not good for performance.
	Dedicated(fn func(DedicatedClient) error) (err error)

	// Dedicate does the same as Dedicated, but it exposes DedicatedClient directly
	// and requires user to invoke cancel() manually to put connection back to the pool.
	Dedicate() (client DedicatedClient, cancel func())

	// Nodes returns each valkey node this client known as valkey.Client. This is useful if you want to
	// send commands to some specific valkey nodes in the cluster.
	Nodes() map[string]Client
}

Client is the valkey client interface for both single valkey instance and valkey cluster. It should be created from the NewClient()

Example (DedicateCAS)
client, err := NewClient(ClientOption{InitAddress: []string{"127.0.0.1:6379"}})
if err != nil {
	panic(err)
}
defer client.Close()

c, cancel := client.Dedicate()
defer cancel()

ctx := context.Background()

// watch keys first
if err := c.Do(ctx, c.B().Watch().Key("k1", "k2").Build()).Error(); err != nil {
	panic(err)
}
// perform read here
values, err := c.Do(ctx, c.B().Mget().Key("k1", "k2").Build()).ToArray()
if err != nil {
	panic(err)
}
v1, _ := values[0].ToString()
v2, _ := values[1].ToString()
// perform write with MULTI EXEC
for _, resp := range c.DoMulti(
	ctx,
	c.B().Multi().Build(),
	c.B().Set().Key("k1").Value(v1+"1").Build(),
	c.B().Set().Key("k2").Value(v2+"2").Build(),
	c.B().Exec().Build(),
) {
	if err := resp.Error(); err != nil {
		panic(err)
	}
}
Output:

Example (DedicatedCAS)
client, err := NewClient(ClientOption{InitAddress: []string{"127.0.0.1:6379"}})
if err != nil {
	panic(err)
}
defer client.Close()

ctx := context.Background()

client.Dedicated(func(client DedicatedClient) error {
	// watch keys first
	if err := client.Do(ctx, client.B().Watch().Key("k1", "k2").Build()).Error(); err != nil {
		return err
	}
	// perform read here
	values, err := client.Do(ctx, client.B().Mget().Key("k1", "k2").Build()).ToArray()
	if err != nil {
		return err
	}
	v1, _ := values[0].ToString()
	v2, _ := values[1].ToString()
	// perform write with MULTI EXEC
	for _, resp := range client.DoMulti(
		ctx,
		client.B().Multi().Build(),
		client.B().Set().Key("k1").Value(v1+"1").Build(),
		client.B().Set().Key("k2").Value(v2+"2").Build(),
		client.B().Exec().Build(),
	) {
		if err := resp.Error(); err != nil {
			return err
		}
	}
	return nil
})
Output:

Example (Do)
client, err := NewClient(ClientOption{InitAddress: []string{"127.0.0.1:6379"}})
if err != nil {
	panic(err)
}
defer client.Close()

ctx := context.Background()

client.Do(ctx, client.B().Set().Key("k").Value("1").Build()).Error()

client.Do(ctx, client.B().Get().Key("k").Build()).ToString()

client.Do(ctx, client.B().Get().Key("k").Build()).AsInt64()

client.Do(ctx, client.B().Hmget().Key("h").Field("a", "b").Build()).ToArray()

client.Do(ctx, client.B().Scard().Key("s").Build()).ToInt64()

client.Do(ctx, client.B().Smembers().Key("s").Build()).AsStrSlice()
Output:

Example (DoCache)
client, err := NewClient(ClientOption{InitAddress: []string{"127.0.0.1:6379"}})
if err != nil {
	panic(err)
}
defer client.Close()

ctx := context.Background()

client.DoCache(ctx, client.B().Get().Key("k").Cache(), time.Minute).ToString()

client.DoCache(ctx, client.B().Get().Key("k").Cache(), time.Minute).AsInt64()

client.DoCache(ctx, client.B().Hmget().Key("h").Field("a", "b").Cache(), time.Minute).ToArray()

client.DoCache(ctx, client.B().Scard().Key("s").Cache(), time.Minute).ToInt64()

client.DoCache(ctx, client.B().Smembers().Key("s").Cache(), time.Minute).AsStrSlice()
Output:

Example (Scan)
client, err := NewClient(ClientOption{InitAddress: []string{"127.0.0.1:6379"}})
if err != nil {
	panic(err)
}
defer client.Close()

for _, c := range client.Nodes() { // loop over all your valkey nodes
	var scan ScanEntry
	for more := true; more; more = scan.Cursor != 0 {
		if scan, err = c.Do(context.Background(), c.B().Scan().Cursor(scan.Cursor).Build()).AsScanEntry(); err != nil {
			panic(err)
		}
		fmt.Println(scan.Elements)
	}
}
Output:

func NewClient

func NewClient(option ClientOption) (client Client, err error)

NewClient uses ClientOption to initialize the Client for both cluster client and single client. It will first try to connect as cluster client. If the len(ClientOption.InitAddress) == 1 and the address does not enable cluster mode, the NewClient() will use single client instead.

Example (Cluster)
client, _ := NewClient(ClientOption{
	InitAddress: []string{"127.0.0.1:7001", "127.0.0.1:7002", "127.0.0.1:7003"},
	ShuffleInit: true,
})
defer client.Close()
Output:

Example (Sentinel)
client, _ := NewClient(ClientOption{
	InitAddress: []string{"127.0.0.1:26379", "127.0.0.1:26380", "127.0.0.1:26381"},
	Sentinel: SentinelOption{
		MasterSet: "my_master",
	},
})
defer client.Close()
Output:

Example (Single)
client, _ := NewClient(ClientOption{
	InitAddress: []string{"127.0.0.1:6379"},
})
defer client.Close()
Output:

type ClientOption

type ClientOption struct {
	// TCP & TLS
	// Dialer can be used to customized how valkey connect to a valkey instance via TCP, including:
	// - Timeout, the default is DefaultDialTimeout
	// - KeepAlive, the default is DefaultTCPKeepAlive
	// The Dialer.KeepAlive interval is used to detect an unresponsive idle tcp connection.
	// OS takes at least (tcp_keepalive_probes+1)*Dialer.KeepAlive time to conclude an idle connection to be unresponsive.
	// For example: DefaultTCPKeepAlive = 1s and the default of tcp_keepalive_probes on Linux is 9.
	// Therefore, it takes at least 10s to kill an idle and unresponsive tcp connection on Linux by default.
	Dialer    net.Dialer
	TLSConfig *tls.Config

	// DialFn allows for a custom function to be used to create net.Conn connections
	DialFn func(string, *net.Dialer, *tls.Config) (conn net.Conn, err error)

	// NewCacheStoreFn allows a custom client side caching store for each connection
	NewCacheStoreFn NewCacheStoreFn

	// OnInvalidations is a callback function in case of client-side caching invalidation received.
	// Note that this function must be fast, otherwise other valkey messages will be blocked.
	OnInvalidations func([]ValkeyMessage)

	// SendToReplicas is a function that returns true if the command should be sent to replicas.
	// currently only used for cluster client.
	// NOTE: This function can't be used with ReplicaOnly option.
	SendToReplicas func(cmd Completed) bool

	// Sentinel options, including MasterSet and Auth options
	Sentinel SentinelOption

	// Valkey AUTH parameters
	Username   string
	Password   string
	ClientName string

	// AuthCredentialsFn allows for setting the AUTH username and password dynamically on each connection attempt to
	// support rotating credentials
	AuthCredentialsFn func(AuthCredentialsContext) (AuthCredentials, error)

	// ClientSetInfo will assign various info attributes to the current connection.
	// Note that ClientSetInfo should have exactly 2 values, the lib name and the lib version respectively.
	ClientSetInfo []string

	// InitAddress point to valkey nodes.
	// Valkey will connect to them one by one and issue CLUSTER SLOT command to initialize the cluster client until success.
	// If len(InitAddress) == 1 and the address is not running in cluster mode, valkey will fall back to the single client mode.
	// If ClientOption.Sentinel.MasterSet is set, then InitAddress will be used to connect sentinels
	// You can bypass this behaviour by using ClientOption.ForceSingleClient.
	InitAddress []string

	// ClientTrackingOptions will be appended to CLIENT TRACKING ON command when the connection is established.
	// The default is []string{"OPTIN"}
	ClientTrackingOptions []string

	SelectDB int

	// CacheSizeEachConn is valkey client side cache size that bind to each TCP connection to a single valkey instance.
	// The default is DefaultCacheBytes.
	CacheSizeEachConn int

	// RingScaleEachConn sets the size of the ring buffer in each connection to (2 ^ RingScaleEachConn).
	// The default is RingScaleEachConn, which results into having a ring of size 2^10 for each connection.
	// Reduce this value can reduce the memory consumption of each connection at the cost of potential throughput degradation.
	// Values smaller than 8 is typically not recommended.
	RingScaleEachConn int

	// ReadBufferEachConn is the size of the bufio.NewReaderSize for each connection, default to DefaultReadBuffer (0.5 MiB).
	ReadBufferEachConn int
	// WriteBufferEachConn is the size of the bufio.NewWriterSize for each connection, default to DefaultWriteBuffer (0.5 MiB).
	WriteBufferEachConn int

	// BlockingPoolSize is the size of the connection pool shared by blocking commands (ex BLPOP, XREAD with BLOCK).
	// The default is DefaultPoolSize.
	BlockingPoolSize int

	// PipelineMultiplex determines how many tcp connections used to pipeline commands to one valkey instance.
	// The default for single and sentinel clients is 2, which means 4 connections (2^2).
	// The default for cluster clients is 0, which means 1 connection (2^0).
	PipelineMultiplex int

	// ConnWriteTimeout is read/write timeout for each connection. If specified,
	// it is used to control the maximum duration waits for responses to pipeline commands.
	// Also, ConnWriteTimeout is applied net.Conn.SetDeadline and periodic PING to valkey
	// Since the Dialer.KeepAlive will not be triggered if there is data in the outgoing buffer,
	// ConnWriteTimeout should be set in order to detect local congestion or unresponsive valkey server.
	// This default is ClientOption.Dialer.KeepAlive * (9+1), where 9 is the default of tcp_keepalive_probes on Linux.
	ConnWriteTimeout time.Duration

	// MaxFlushDelay when greater than zero pauses pipeline write loop for some time (not larger than MaxFlushDelay)
	// after each flushing of data to the connection. This gives pipeline a chance to collect more commands to send
	// to Valkey. Adding this delay increases latency, reduces throughput – but in most cases may significantly reduce
	// application and Valkey CPU utilization due to less executed system calls. By default, Valkey flushes data to the
	// connection without extra delays. Depending on network latency and application-specific conditions the value
	// of MaxFlushDelay may vary, sth like 20 microseconds should not affect latency/throughput a lot but still
	// produce notable CPU usage reduction under load. Ref: https://github.com/redis/rueidis/issues/156
	MaxFlushDelay time.Duration

	// ShuffleInit is a handy flag that shuffles the InitAddress after passing to the NewClient() if it is true
	ShuffleInit bool
	// ClientNoTouch controls whether commands alter LRU/LFU stats
	ClientNoTouch bool
	// DisableRetry disables retrying read-only commands under network errors
	DisableRetry bool
	// DisableCache falls back Client.DoCache/Client.DoMultiCache to Client.Do/Client.DoMulti
	DisableCache bool
	// AlwaysPipelining makes valkey.Client always pipeline valkey commands even if they are not issued concurrently.
	AlwaysPipelining bool
	// AlwaysRESP2 makes valkey.Client always uses RESP2, otherwise it will try using RESP3 first.
	AlwaysRESP2 bool
	//  ForceSingleClient force the usage of a single client connection, without letting the lib guessing
	//  if valkey instance is a cluster or a single valkey instance.
	ForceSingleClient bool

	// ReplicaOnly indicates that this client will only try to connect to readonly replicas of valkey setup.
	ReplicaOnly bool

	// ClientNoEvict sets the client eviction mode for the current connection.
	// When turned on and client eviction is configured,
	// the current connection will be excluded from the client eviction process
	// even if we're above the configured client eviction threshold.
	ClientNoEvict bool
}

ClientOption should be passed to NewClient to construct a Client

func MustParseURL

func MustParseURL(str string) ClientOption

func ParseURL

func ParseURL(str string) (opt ClientOption, err error)

ParseURL parses a valkey URL into ClientOption. https://github.com/redis/redis-specifications/blob/master/uri/redis.txt Example:

redis://<user>:<password>@<host>:<port>/<db_number> redis://<user>:<password>@<host>:<port>?addr=<host2>:<port2>&addr=<host3>:<port3> unix://<user>:<password>@</path/to/redis.sock>?db=<db_number>

type Commands

type Commands []Completed

Commands is an exported alias to []Completed. This allows users to store commands for later usage, for example:

c, release := client.Dedicate()
defer release()

cmds := make(valkey.Commands, 0, 10)
for i := 0; i < 10; i++ {
    cmds = append(cmds, c.B().Set().Key(strconv.Itoa(i)).Value(strconv.Itoa(i)).Build())
}
for _, resp := range c.DoMulti(ctx, cmds...) {
    if err := resp.Error(); err != nil {
    panic(err)
}

However, please know that once commands are processed by the Do() or DoMulti(), they are recycled and should not be reused.

type Completed

type Completed = cmds.Completed

Completed represents a completed Valkey command. It should only be created from the Build() of a command builder.

type CoreClient

type CoreClient interface {
	// B is the getter function to the command builder for the client
	// If the client is a cluster client, the command builder also prohibits cross key slots in one command.
	B() Builder
	// Do is the method sending user's valkey command building from the B() to a valkey node.
	//  client.Do(ctx, client.B().Get().Key("k").Build()).ToString()
	// All concurrent non-blocking commands will be pipelined automatically and have better throughput.
	// Blocking commands will use another separated connection pool.
	// The cmd parameter is recycled after passing into Do() and should not be reused.
	Do(ctx context.Context, cmd Completed) (resp ValkeyResult)
	// DoMulti takes multiple valkey commands and sends them together, reducing RTT from the user code.
	// The multi parameters are recycled after passing into DoMulti() and should not be reused.
	DoMulti(ctx context.Context, multi ...Completed) (resp []ValkeyResult)
	// Receive accepts SUBSCRIBE, SSUBSCRIBE, PSUBSCRIBE command and a message handler.
	// Receive will block and then return value only when the following cases:
	//   1. return nil when received any unsubscribe/punsubscribe message related to the provided `subscribe` command.
	//   2. return ErrClosing when the client is closed manually.
	//   3. return ctx.Err() when the `ctx` is done.
	//   4. return non-nil err when the provided `subscribe` command failed.
	Receive(ctx context.Context, subscribe Completed, fn func(msg PubSubMessage)) error
	// Close will make further calls to the client be rejected with ErrClosing,
	// and Close will wait until all pending calls finished.
	Close()
}

CoreClient is the minimum interface shared by the Client and the DedicatedClient.

type DedicatedClient

type DedicatedClient interface {
	CoreClient

	// SetPubSubHooks is an alternative way to processing Pub/Sub messages instead of using Receive.
	// SetPubSubHooks is non-blocking and allows users to subscribe/unsubscribe channels later.
	// Note that the hooks will be called sequentially but in another goroutine.
	// The return value will be either:
	//   1. an error channel, if the hooks passed in is not zero, or
	//   2. nil, if the hooks passed in is zero. (used for reset hooks)
	// In the former case, the error channel is guaranteed to be close when the hooks will not be called anymore,
	// and has at most one error describing the reason why the hooks will not be called anymore.
	// Users can use the error channel to detect disconnection.
	SetPubSubHooks(hooks PubSubHooks) <-chan error
}

DedicatedClient is obtained from Client.Dedicated() and it will be bound to single valkey connection and no other commands can be pipelined in to this connection during Client.Dedicated(). If the DedicatedClient is obtained from cluster client, the first command to it must have a Key() to identify the valkey node.

type FtSearchDoc

type FtSearchDoc struct {
	Doc   map[string]string
	Key   string
	Score float64
}

type GeoLocation

type GeoLocation struct {
	Name                      string
	Longitude, Latitude, Dist float64
	GeoHash                   int64
}

type Incomplete

type Incomplete = cmds.Incomplete

Incomplete represents an incomplete Valkey command. It should then be completed by calling the Build().

type KeyValues

type KeyValues struct {
	Key    string
	Values []string
}

type KeyZScores

type KeyZScores struct {
	Key    string
	Values []ZScore
}

type Lua

type Lua struct {
	// contains filtered or unexported fields
}

Lua represents a valkey lua script. It should be created from the NewLuaScript() or NewLuaScriptReadOnly()

Example (Exec)
client, err := NewClient(ClientOption{InitAddress: []string{"127.0.0.1:6379"}})
if err != nil {
	panic(err)
}
defer client.Close()

ctx := context.Background()

script := NewLuaScript("return {KEYS[1],KEYS[2],ARGV[1],ARGV[2]}")

script.Exec(ctx, client, []string{"k1", "k2"}, []string{"a1", "a2"}).ToArray()
Output:

func NewLuaScript

func NewLuaScript(script string) *Lua

NewLuaScript creates a Lua instance whose Lua.Exec uses EVALSHA and EVAL.

func NewLuaScriptReadOnly

func NewLuaScriptReadOnly(script string) *Lua

NewLuaScriptReadOnly creates a Lua instance whose Lua.Exec uses EVALSHA_RO and EVAL_RO.

func (*Lua) Exec

func (s *Lua) Exec(ctx context.Context, c Client, keys, args []string) (resp ValkeyResult)

Exec the script to the given Client. It will first try with the EVALSHA/EVALSHA_RO and then EVAL/EVAL_RO if first try failed. Cross slot keys are prohibited if the Client is a cluster client.

func (*Lua) ExecMulti

func (s *Lua) ExecMulti(ctx context.Context, c Client, multi ...LuaExec) (resp []ValkeyResult)

ExecMulti exec the script multiple times by the provided LuaExec to the given Client. It will first try SCRIPT LOAD the script to all valkey nodes and then exec it with the EVALSHA/EVALSHA_RO. Cross slot keys within single LuaExec are prohibited if the Client is a cluster client.

type LuaExec

type LuaExec struct {
	Keys []string
	Args []string
}

LuaExec is a single execution unit of Lua.ExecMulti

type MultiValkeyResultStream

type MultiValkeyResultStream = ValkeyResultStream

type NewCacheStoreFn

type NewCacheStoreFn func(CacheStoreOption) CacheStore

NewCacheStoreFn can be provided in ClientOption for using a custom CacheStore implementation

type PubSubHooks

type PubSubHooks struct {
	// OnMessage will be called when receiving "message" and "pmessage" event.
	OnMessage func(m PubSubMessage)
	// OnSubscription will be called when receiving "subscribe", "unsubscribe", "psubscribe" and "punsubscribe" event.
	OnSubscription func(s PubSubSubscription)
}

PubSubHooks can be registered into DedicatedClient to process pubsub messages without using Client.Receive

type PubSubMessage

type PubSubMessage struct {
	// Pattern is only available with pmessage.
	Pattern string
	// Channel is the channel the message belongs to
	Channel string
	// Message is the message content
	Message string
}

PubSubMessage represent a pubsub message from valkey

type PubSubSubscription

type PubSubSubscription struct {
	// Kind is "subscribe", "unsubscribe", "psubscribe" or "punsubscribe"
	Kind string
	// Channel is the event subject.
	Channel string
	// Count is the current number of subscriptions for connection.
	Count int64
}

PubSubSubscription represent a pubsub "subscribe", "unsubscribe", "psubscribe" or "punsubscribe" event.

type RedirectMode

type RedirectMode int
const (
	RedirectNone RedirectMode = iota
	RedirectMove
	RedirectAsk
	RedirectRetry
)

type ScanEntry

type ScanEntry struct {
	Elements []string
	Cursor   uint64
}

ScanEntry is the element type of both SCAN, SSCAN, HSCAN and ZSCAN command response.

type SentinelOption

type SentinelOption struct {
	// TCP & TLS, same as ClientOption but for connecting sentinel
	Dialer    net.Dialer
	TLSConfig *tls.Config

	// MasterSet is the valkey master set name monitored by sentinel cluster.
	// If this field is set, then ClientOption.InitAddress will be used to connect to sentinel cluster.
	MasterSet string

	// Valkey AUTH parameters for sentinel
	Username   string
	Password   string
	ClientName string
}

SentinelOption contains MasterSet,

type SimpleCache

type SimpleCache interface {
	Get(key string) ValkeyMessage
	Set(key string, val ValkeyMessage)
	Del(key string)
	Flush()
}

SimpleCache is an alternative interface should be paired with NewSimpleCacheAdapter to construct a CacheStore

type ValkeyError

type ValkeyError ValkeyMessage

ValkeyError is an error response or a nil message from valkey instance

func IsValkeyErr

func IsValkeyErr(err error) (ret *ValkeyError, ok bool)

IsValkeyErr is a handy method to check if error is a valkey ERR response.

func (*ValkeyError) Error

func (r *ValkeyError) Error() string

func (*ValkeyError) IsAsk

func (r *ValkeyError) IsAsk() (addr string, ok bool)

IsAsk checks if it is a valkey ASK message and returns ask address.

func (*ValkeyError) IsBusyGroup

func (r *ValkeyError) IsBusyGroup() bool

IsBusyGroup checks if it is a valkey BUSYGROUP message.

func (*ValkeyError) IsClusterDown

func (r *ValkeyError) IsClusterDown() bool

IsClusterDown checks if it is a valkey CLUSTERDOWN message and returns ask address.

func (*ValkeyError) IsMoved

func (r *ValkeyError) IsMoved() (addr string, ok bool)

IsMoved checks if it is a valkey MOVED message and returns moved address.

func (*ValkeyError) IsNil

func (r *ValkeyError) IsNil() bool

IsNil checks if it is a valkey nil message.

func (*ValkeyError) IsNoScript

func (r *ValkeyError) IsNoScript() bool

IsNoScript checks if it is a valkey NOSCRIPT message.

func (*ValkeyError) IsTryAgain

func (r *ValkeyError) IsTryAgain() bool

IsTryAgain checks if it is a valkey TRYAGAIN message and returns ask address.

type ValkeyMessage

type ValkeyMessage struct {
	// contains filtered or unexported fields
}

ValkeyMessage is a valkey response message, it may be a nil response

func (*ValkeyMessage) AsBool

func (m *ValkeyMessage) AsBool() (val bool, err error)

AsBool checks if message is non-nil valkey response, and parses it as bool

func (*ValkeyMessage) AsBoolSlice

func (m *ValkeyMessage) AsBoolSlice() ([]bool, error)

AsBoolSlice checks if message is a valkey array/set response, and converts it to []bool. Valkey nil elements and other non-boolean elements will be represented as false.

func (*ValkeyMessage) AsBytes

func (m *ValkeyMessage) AsBytes() (bs []byte, err error)

AsBytes check if message is a valkey string response and return it as an immutable []byte

func (*ValkeyMessage) AsFloat64

func (m *ValkeyMessage) AsFloat64() (val float64, err error)

AsFloat64 check if message is a valkey string response, and parse it as float64

func (*ValkeyMessage) AsFloatSlice

func (m *ValkeyMessage) AsFloatSlice() ([]float64, error)

AsFloatSlice check if message is a valkey array/set response, and convert to []float64. valkey nil element and other non float element will be present as zero.

func (*ValkeyMessage) AsFtAggregate

func (m *ValkeyMessage) AsFtAggregate() (total int64, docs []map[string]string, err error)

func (*ValkeyMessage) AsFtAggregateCursor

func (m *ValkeyMessage) AsFtAggregateCursor() (cursor, total int64, docs []map[string]string, err error)

func (*ValkeyMessage) AsFtSearch

func (m *ValkeyMessage) AsFtSearch() (total int64, docs []FtSearchDoc, err error)

func (*ValkeyMessage) AsGeosearch

func (m *ValkeyMessage) AsGeosearch() ([]GeoLocation, error)

func (*ValkeyMessage) AsInt64

func (m *ValkeyMessage) AsInt64() (val int64, err error)

AsInt64 check if message is a valkey string response, and parse it as int64

func (*ValkeyMessage) AsIntMap

func (m *ValkeyMessage) AsIntMap() (map[string]int64, error)

AsIntMap check if message is a valkey map/array/set response, and convert to map[string]int64. valkey nil element and other non integer element will be present as zero.

func (*ValkeyMessage) AsIntSlice

func (m *ValkeyMessage) AsIntSlice() ([]int64, error)

AsIntSlice check if message is a valkey array/set response, and convert to []int64. valkey nil element and other non integer element will be present as zero.

func (*ValkeyMessage) AsLMPop

func (m *ValkeyMessage) AsLMPop() (kvs KeyValues, err error)

func (*ValkeyMessage) AsMap

func (m *ValkeyMessage) AsMap() (map[string]ValkeyMessage, error)

AsMap check if message is a valkey array/set response, and convert to map[string]ValkeyMessage

func (*ValkeyMessage) AsReader

func (m *ValkeyMessage) AsReader() (reader io.Reader, err error)

AsReader check if message is a valkey string response and wrap it with the strings.NewReader

func (*ValkeyMessage) AsScanEntry

func (m *ValkeyMessage) AsScanEntry() (e ScanEntry, err error)

AsScanEntry check if message is a valkey array/set response of length 2, and convert to ScanEntry.

func (*ValkeyMessage) AsStrMap

func (m *ValkeyMessage) AsStrMap() (map[string]string, error)

AsStrMap check if message is a valkey map/array/set response, and convert to map[string]string. valkey nil element and other non string element will be present as zero.

func (*ValkeyMessage) AsStrSlice

func (m *ValkeyMessage) AsStrSlice() ([]string, error)

AsStrSlice check if message is a valkey array/set response, and convert to []string. valkey nil element and other non string element will be present as zero.

func (*ValkeyMessage) AsUint64

func (m *ValkeyMessage) AsUint64() (val uint64, err error)

AsUint64 check if message is a valkey string response, and parse it as uint64

func (*ValkeyMessage) AsXRange

func (m *ValkeyMessage) AsXRange() ([]XRangeEntry, error)

AsXRange check if message is a valkey array/set response, and convert to []XRangeEntry

func (*ValkeyMessage) AsXRangeEntry

func (m *ValkeyMessage) AsXRangeEntry() (XRangeEntry, error)

AsXRangeEntry check if message is a valkey array/set response of length 2, and convert to XRangeEntry

func (*ValkeyMessage) AsXRead

func (m *ValkeyMessage) AsXRead() (ret map[string][]XRangeEntry, err error)

AsXRead converts XREAD/XREADGRUOP response to map[string][]XRangeEntry

func (*ValkeyMessage) AsZMPop

func (m *ValkeyMessage) AsZMPop() (kvs KeyZScores, err error)

func (*ValkeyMessage) AsZScore

func (m *ValkeyMessage) AsZScore() (s ZScore, err error)

AsZScore converts ZPOPMAX and ZPOPMIN command with count 1 response to a single ZScore

func (*ValkeyMessage) AsZScores

func (m *ValkeyMessage) AsZScores() ([]ZScore, error)

AsZScores converts ZRANGE WITHSCROES, ZDIFF WITHSCROES and ZPOPMAX/ZPOPMIN command with count > 1 responses to []ZScore

func (*ValkeyMessage) CachePTTL

func (m *ValkeyMessage) CachePTTL() int64

CachePTTL returns the remaining PTTL in seconds of client side cache

func (*ValkeyMessage) CachePXAT

func (m *ValkeyMessage) CachePXAT() int64

CachePXAT returns the remaining PXAT in seconds of client side cache

func (*ValkeyMessage) CacheTTL

func (m *ValkeyMessage) CacheTTL() (ttl int64)

CacheTTL returns the remaining TTL in seconds of client side cache

func (*ValkeyMessage) DecodeJSON

func (m *ValkeyMessage) DecodeJSON(v any) (err error)

DecodeJSON check if message is a valkey string response and treat it as json, then unmarshal it into provided value

func (*ValkeyMessage) Error

func (m *ValkeyMessage) Error() error

Error check if message is a valkey error response, including nil response

func (*ValkeyMessage) IsArray

func (m *ValkeyMessage) IsArray() bool

IsArray check if message is a valkey array response

func (*ValkeyMessage) IsBool

func (m *ValkeyMessage) IsBool() bool

IsBool check if message is a valkey RESP3 bool response

func (*ValkeyMessage) IsCacheHit

func (m *ValkeyMessage) IsCacheHit() bool

IsCacheHit check if message is from client side cache

func (*ValkeyMessage) IsFloat64

func (m *ValkeyMessage) IsFloat64() bool

IsFloat64 check if message is a valkey RESP3 double response

func (*ValkeyMessage) IsInt64

func (m *ValkeyMessage) IsInt64() bool

IsInt64 check if message is a valkey RESP3 int response

func (*ValkeyMessage) IsMap

func (m *ValkeyMessage) IsMap() bool

IsMap check if message is a valkey RESP3 map response

func (*ValkeyMessage) IsNil

func (m *ValkeyMessage) IsNil() bool

IsNil check if message is a valkey nil response

func (*ValkeyMessage) IsString

func (m *ValkeyMessage) IsString() bool

IsString check if message is a valkey string response

func (*ValkeyMessage) String

func (m *ValkeyMessage) String() string

String returns human-readable representation of ValkeyMessage

func (*ValkeyMessage) ToAny

func (m *ValkeyMessage) ToAny() (any, error)

ToAny turns message into go any value

func (*ValkeyMessage) ToArray

func (m *ValkeyMessage) ToArray() ([]ValkeyMessage, error)

ToArray check if message is a valkey array/set response, and return it

func (*ValkeyMessage) ToBool

func (m *ValkeyMessage) ToBool() (val bool, err error)

ToBool check if message is a valkey RESP3 bool response, and return it

func (*ValkeyMessage) ToFloat64

func (m *ValkeyMessage) ToFloat64() (val float64, err error)

ToFloat64 check if message is a valkey RESP3 double response, and return it

func (*ValkeyMessage) ToInt64

func (m *ValkeyMessage) ToInt64() (val int64, err error)

ToInt64 check if message is a valkey RESP3 int response, and return it

func (*ValkeyMessage) ToMap

func (m *ValkeyMessage) ToMap() (map[string]ValkeyMessage, error)

ToMap check if message is a valkey RESP3 map response, and return it

func (*ValkeyMessage) ToString

func (m *ValkeyMessage) ToString() (val string, err error)

ToString check if message is a valkey string response, and return it

type ValkeyResult

type ValkeyResult struct {
	// contains filtered or unexported fields
}

ValkeyResult is the return struct from Client.Do or Client.DoCache it contains either a valkey response or an underlying error (ex. network timeout).

func (ValkeyResult) AsBool

func (r ValkeyResult) AsBool() (v bool, err error)

AsBool delegates to ValkeyMessage.AsBool

func (ValkeyResult) AsBoolSlice

func (r ValkeyResult) AsBoolSlice() (v []bool, err error)

AsBoolSlice delegates to ValkeyMessage.AsBoolSlice

func (ValkeyResult) AsBytes

func (r ValkeyResult) AsBytes() (v []byte, err error)

AsBytes delegates to ValkeyMessage.AsBytes

func (ValkeyResult) AsFloat64

func (r ValkeyResult) AsFloat64() (v float64, err error)

AsFloat64 delegates to ValkeyMessage.AsFloat64

func (ValkeyResult) AsFloatSlice

func (r ValkeyResult) AsFloatSlice() (v []float64, err error)

AsFloatSlice delegates to ValkeyMessage.AsFloatSlice

func (ValkeyResult) AsFtAggregate

func (r ValkeyResult) AsFtAggregate() (total int64, docs []map[string]string, err error)

func (ValkeyResult) AsFtAggregateCursor

func (r ValkeyResult) AsFtAggregateCursor() (cursor, total int64, docs []map[string]string, err error)

func (ValkeyResult) AsFtSearch

func (r ValkeyResult) AsFtSearch() (total int64, docs []FtSearchDoc, err error)

func (ValkeyResult) AsGeosearch

func (r ValkeyResult) AsGeosearch() (locations []GeoLocation, err error)

func (ValkeyResult) AsInt64

func (r ValkeyResult) AsInt64() (v int64, err error)

AsInt64 delegates to ValkeyMessage.AsInt64

func (ValkeyResult) AsIntMap

func (r ValkeyResult) AsIntMap() (v map[string]int64, err error)

AsIntMap delegates to ValkeyMessage.AsIntMap

func (ValkeyResult) AsIntSlice

func (r ValkeyResult) AsIntSlice() (v []int64, err error)

AsIntSlice delegates to ValkeyMessage.AsIntSlice

func (ValkeyResult) AsLMPop

func (r ValkeyResult) AsLMPop() (v KeyValues, err error)

func (ValkeyResult) AsMap

func (r ValkeyResult) AsMap() (v map[string]ValkeyMessage, err error)

AsMap delegates to ValkeyMessage.AsMap

func (ValkeyResult) AsReader

func (r ValkeyResult) AsReader() (v io.Reader, err error)

AsReader delegates to ValkeyMessage.AsReader

func (ValkeyResult) AsScanEntry

func (r ValkeyResult) AsScanEntry() (v ScanEntry, err error)

AsScanEntry delegates to ValkeyMessage.AsScanEntry.

func (ValkeyResult) AsStrMap

func (r ValkeyResult) AsStrMap() (v map[string]string, err error)

AsStrMap delegates to ValkeyMessage.AsStrMap

func (ValkeyResult) AsStrSlice

func (r ValkeyResult) AsStrSlice() (v []string, err error)

AsStrSlice delegates to ValkeyMessage.AsStrSlice

func (ValkeyResult) AsUint64

func (r ValkeyResult) AsUint64() (v uint64, err error)

AsUint64 delegates to ValkeyMessage.AsUint64

func (ValkeyResult) AsXRange

func (r ValkeyResult) AsXRange() (v []XRangeEntry, err error)

AsXRange delegates to ValkeyMessage.AsXRange

func (ValkeyResult) AsXRangeEntry

func (r ValkeyResult) AsXRangeEntry() (v XRangeEntry, err error)

AsXRangeEntry delegates to ValkeyMessage.AsXRangeEntry

func (ValkeyResult) AsXRead

func (r ValkeyResult) AsXRead() (v map[string][]XRangeEntry, err error)

AsXRead delegates to ValkeyMessage.AsXRead

func (ValkeyResult) AsZMPop

func (r ValkeyResult) AsZMPop() (v KeyZScores, err error)

func (ValkeyResult) AsZScore

func (r ValkeyResult) AsZScore() (v ZScore, err error)

AsZScore delegates to ValkeyMessage.AsZScore

func (ValkeyResult) AsZScores

func (r ValkeyResult) AsZScores() (v []ZScore, err error)

AsZScores delegates to ValkeyMessage.AsZScores

func (ValkeyResult) CachePTTL

func (r ValkeyResult) CachePTTL() int64

CachePTTL delegates to ValkeyMessage.CachePTTL

func (ValkeyResult) CachePXAT

func (r ValkeyResult) CachePXAT() int64

CachePXAT delegates to ValkeyMessage.CachePXAT

func (ValkeyResult) CacheTTL

func (r ValkeyResult) CacheTTL() int64

CacheTTL delegates to ValkeyMessage.CacheTTL

func (ValkeyResult) DecodeJSON

func (r ValkeyResult) DecodeJSON(v any) (err error)

DecodeJSON delegates to ValkeyMessage.DecodeJSON

func (ValkeyResult) Error

func (r ValkeyResult) Error() (err error)

Error returns either underlying error or valkey error or nil

func (ValkeyResult) IsCacheHit

func (r ValkeyResult) IsCacheHit() bool

IsCacheHit delegates to ValkeyMessage.IsCacheHit

func (ValkeyResult) NonValkeyError

func (r ValkeyResult) NonValkeyError() error

NonValkeyError can be used to check if there is an underlying error (ex. network timeout).

func (*ValkeyResult) String

func (r *ValkeyResult) String() string

String returns human-readable representation of ValkeyResult

func (ValkeyResult) ToAny

func (r ValkeyResult) ToAny() (v any, err error)

ToAny delegates to ValkeyMessage.ToAny

func (ValkeyResult) ToArray

func (r ValkeyResult) ToArray() (v []ValkeyMessage, err error)

ToArray delegates to ValkeyMessage.ToArray

func (ValkeyResult) ToBool

func (r ValkeyResult) ToBool() (v bool, err error)

ToBool delegates to ValkeyMessage.ToBool

func (ValkeyResult) ToFloat64

func (r ValkeyResult) ToFloat64() (v float64, err error)

ToFloat64 delegates to ValkeyMessage.ToFloat64

func (ValkeyResult) ToInt64

func (r ValkeyResult) ToInt64() (v int64, err error)

ToInt64 delegates to ValkeyMessage.ToInt64

func (ValkeyResult) ToMap

func (r ValkeyResult) ToMap() (v map[string]ValkeyMessage, err error)

ToMap delegates to ValkeyMessage.ToMap

func (ValkeyResult) ToMessage

func (r ValkeyResult) ToMessage() (v ValkeyMessage, err error)

ToMessage retrieves the ValkeyMessage

func (ValkeyResult) ToString

func (r ValkeyResult) ToString() (v string, err error)

ToString delegates to ValkeyMessage.ToString

type ValkeyResultStream

type ValkeyResultStream struct {
	// contains filtered or unexported fields
}

func (*ValkeyResultStream) Error

func (s *ValkeyResultStream) Error() error

Error returns the error happened when sending commands to valkey or reading response from valkey. Usually a user is not required to use this function because the error is also reported by the WriteTo.

func (*ValkeyResultStream) HasNext

func (s *ValkeyResultStream) HasNext() bool

HasNext can be used in a for loop condition to check if a further WriteTo call is needed.

func (*ValkeyResultStream) WriteTo

func (s *ValkeyResultStream) WriteTo(w io.Writer) (n int64, err error)

WriteTo reads a valkey response from valkey and then write it to the given writer. This function is not thread safe and should be called sequentially to read multiple responses. An io.EOF error will be reported if all responses are read.

type XRangeEntry

type XRangeEntry struct {
	FieldValues map[string]string
	ID          string
}

XRangeEntry is the element type of both XRANGE and XREVRANGE command response array

type ZScore

type ZScore struct {
	Member string
	Score  float64
}

ZScore is the element type of ZRANGE WITHSCORES, ZDIFF WITHSCORES and ZPOPMAX command response

Directories

Path Synopsis
hack
internal

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL