redicluster

package module
v0.1.0-alpha Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 22, 2022 License: Apache-2.0 Imports: 9 Imported by: 0

README

Redicluster

Redicluster is a lightweight golang client driver for redis cluster and built on top of gomudle/redigo. It aimes to support pipeline and multi-keys commands to access redis cluster without proxy, besides of most of redis commands.

Project Background

Redis Cluster is complicated for users on the client side, compared to the standalone mode. Users must care about slot distribution and handle redirecting. There are a few libraries to help with this, but most of them can not support all the features well, like pipeline and scripting. The Redis proxy(like twitter/twemproxy and CodisLabs/codis) is a good way to solve this problem. Unfortunately, not all cloud providers support this kind of proxy.

Project Goal

Redicluster aims to support all Redis Cluster features on the client side in a simple way. By using this library, users don't need to care about the clustering complexity(like slots distribution, redirecting handling, etc.), just access Redis Cluster like a standalone one or the cluster behind proxy.

Redicluster is implemented based on gomodule/redigo which is a lightweight but powerful redis client driver. You will access Redis through Redicluster just like using Redigo directly. Especially, you only need to make trivial changes to migrate standalone Redis to cluster one if you are using Redigo in your project.

Main Features

1. Pool and connection management

The pool and connection interface defined in Redigo is exposed with drop-in replacement.

2. Slots mapping and routing

The slots mapping is stored in the pool object. It would be refreshed automatically once redirecting occurs every time, or updated manually by callers.

3. Redirecting handling

The Conn can request the right node indicated by the MOVED response automatically on the underlayer after redirecting occurs. The callers don't need to handle it on the application layer. Optionally, the callers can close this mechanism and handle redirecting by themselves.

4. Pipeline

A pipeline request always contains multiple keys. Unlike standalone Redis, those keys are highly probably located on different nodes in the Redis Cluster. We need to extract the keys and map them to the right nodes, and send multiple sub-requests to those nodes concurrently. Once all sub responses arrived, a final response composed by them in the original order will be returned to the caller. Obviously, the redirecting of every sub -request can be handled automatically, the same as mentioned above.

5. Multiple keys commands

On the underlayer, Multiple key commands(MGET, MSET, etc) will be transfered to a pipeline request according to the slot of the keys. All keys located on the same node will be put into the same command in the pipeline.

NOTE: Unlike standalone Redis, the atomicity of multiple key commands can't be guaranteed in the Redis Cluster, since those keys probably locate on different nodes and the request is processed concurrently. If the automatic handling of these commands is not expected by the caller, they have to handle the commands by themselves. So, Redicluster needs to guarantee flexibility for the caller.

6. Lua script

Lua script execution is an atomic operation in Redis. You are not able to process multiple keys that locate on different nodes in a cluster. So, we assume that all keys in Lua script have the same slot. For simplicity, the request will be sent to the node the first key indicates. For the caller, using hashtag is a feasible way to guarantee all keys locating on the same node.

7. Pub/Sub

A Pub/Sub message is propagated across the cluster to all subscribers. Any node can receive the message, so we don't need to do anything about it. But from Redis 7.0, sharded Pub/Sub channel is introduced to support sharded messages based on the channel key slots. Relevant commands(SSUBSCRIBE, SUNSUBSCRIBE, SPUBLISH, etc.) will be sent to the right node based on the channel key.

Hierarchy

  1. ClusterPool: the struct that manages cluster connection pool. By calling its Getxx API, users can request redis.Conn interface that can be used to access Redis Cluster directly. It also manages underlying Conn corresponding to the nodes in the cluster by a pool list inside.

  2. pipeLiner: the struct that implements redis.Conn interface and can handle redis pipeline commands directly. It scans all commands in the pipeline and then sorts out them into different sub-pipeline commands according to the slots. All sub-pipeline commands will be sent concurrently. If redirecting occurs to some commands, it will request them again to the right nodes indicated by the redirecting response. Once all of these sub-commands are returned, it composites them into a single response to the caller in the original command order.

  3. redirconn: the struct that implements redis.Conn interface, handles redirecting automatically when MOVED or ASK occur and passes send/receive/flush to a underlying pipeLiner to support pipeline for redis cluster. ClusterPool.Get() returns it's pointer to the caller.

How to use

  1. Creating a ClusterPool for your project
  2. Invoking the ClusterPool.Get() to get a redis.Conn for access the cluster
  3. Invoking the Close() of redis.Conn to return it to the ClusterPool if all operations done

A simple example:

// create a cluster pool
func CreateClusterPool() *ClusterPool {
   createConnPool := func(ctx context.Context, addr string) (*redis.Pool, error) {
      return &redis.Pool{
         Dial: func() (redis.Conn, error) {
            return redis.Dial(
               "tcp",
               addr,
               redis.DialWriteTimeout(time.Second*3),
               redis.DialConnectTimeout(time.Second*3),
               redis.DialReadTimeout(time.Second*3))
         },
         DialContext: func(ctx context.Context) (redis.Conn, error) {
            return redis.DialContext(
               ctx,
               "tcp",
               addr,
               redis.DialWriteTimeout(time.Second*3),
               redis.DialConnectTimeout(time.Second*3),
               redis.DialReadTimeout(time.Second*3))
         },
         TestOnBorrow: func(c redis.Conn, t time.Time) error {
            if time.Since(t) > time.Minute {
               _, err := c.Do("PING")
               return err
            }
            return nil
         },
         MaxIdle:     10,
         MaxActive:   10,
         IdleTimeout: time.Minute * 10,
      }, nil
   }
   cp := &ClusterPool{
      EntryAddrs:     []string{"127.0.0.1:6379"},
      CreateConnPool: createConnPool,
   }

   // reload slot mapping in advance
   cp.ReloadSlotMapping()
   return cp
}

func printPubSubReceive(prefix string, r interface{}) {
	if v, ok := r.(redis.Subscription); ok {
		fmt.Printf("%s redis.Subscription: %v\n", prefix, v)
	} else if v, ok := r.(redis.Message); ok {
		fmt.Printf("%s redis.Message: %v\n", prefix, v)
	} else if v, ok := r.(redis.Pong); ok {
		fmt.Printf("%s redis.Pong: %v\n", prefix, v)
   } else {
      fmt.Printf("%s unknown value: %v\n", prefix, v)
   }
}

cp := CreateClusterPool()
conn := cp.Get()
defer conn.Close()

// Set and GET
rep, err := conn.Do("SET", "abc", "123")
fmt.Printf("SET result:%s, err=%s", rep, err)

rep, err = conn.Do("GET", "abc")
fmt.Printf("GET result:%s, err=%s", rep, err)

// MSET
rep, err := conn.Do("MSET", "abc", "123", "efg", "456")
fmt.Printf("MSET result:%s, err=%s", rep, err)

// pipeline
conn.Send("GET", "abc")
conn.Send("GET", "efg")
conn.Flush()

rep, err = conn.Receive()
fmt.Printf("pipeline result1:%s, err=%s", rep, err)

rep, err = conn.Receive()
fmt.Printf("pipeline result2:%s, err=%s", rep, err)

// PubSub
s1, err := cp.GetPubSubConn()
s1.Subscribe("ChnA")
printPubSubReceive("s1.Subscribe", s1.Receive())

s2, err := cp.GetPubSubConn()
s2.Subscribe("ChnA")
printPubSubReceive("s2.Subscribe", s2.Receive())

conn.Do("SPUBLISH", "ChnA", "I'm msg")

printPubSubReceive("push to s1", s1.Receive())
printPubSubReceive("push to s2", s2.Receive())

// ShardedPubSub
ss1, err := cp.GetShardedPubSubConn()
ss1.SSubscribe("ShardedChnA")
printPubSubReceive("ss1.Subscribe", ss1.Receive())

ss2, err := cp.GetShardedPubSubConn()
ss2.SSubscribe("ShardedChnA")
printPubSubReceive("ss2.Subscribe", ss2.Receive())

conn.Do("SPUBLISH", "ShardedChnA", "I'm sharded msg")

printPubSubReceive("push to ss1", ss1.Receive())
printPubSubReceive("push to ss2", ss2.Receive())

Reference

  1. Redis Cluster Spec: https://redis.io/docs/reference/cluster-spec/
  2. Redigo: https://github.com/gomodule/redigo
  3. Redisc: https://github.com/mna/redisc

LICENSE

Redicluster is available under the Apache License, Version 2.0.

Documentation

Index

Constants

View Source
const (
	OpNil      = 0
	OpDO       = 1
	OpPipeLine = 2
)
View Source
const (
	TotalSlots = 16384
)

Variables

This section is empty.

Functions

func ChnSlot

func ChnSlot(channel ...interface{}) (int, error)

ChnSlot returns the channels slot if all channels in the same lost, otherwise returns -1 slot and error

func CmdSlot

func CmdSlot(cmd string, args ...interface{}) int

CmdSlot returns the hash slot of the command

func Slot

func Slot(key string) int

Slot returns the hash Slot of the key

Types

type ClusterPool

type ClusterPool struct {
	// The entry addresses for cluster, which can be any node address in cluster
	EntryAddrs []string

	// Dial options for case without pool(CreateConnPool is nil)
	DialOptionsWithoutPool []redis.DialOption

	// Defualt timeout for the connection pool
	DefaultPoolTimeout time.Duration

	// Function for creating connection pool, which would be invoked when the caller acquires conn by Getxx func
	// if the node has not pool in connPools. By this func, you can control the pool behavior based on your demand
	CreateConnPool func(ctx context.Context, addr string) (*redis.Pool, error)
	// contains filtered or unexported fields
}

func (*ClusterPool) ActiveCount

func (cp *ClusterPool) ActiveCount() int

ActiveCount returns the total active connection count in the cluster pool

func (*ClusterPool) Close

func (cp *ClusterPool) Close()

Close closes the connections and clear the slot mapping of the cluster pool

func (*ClusterPool) Get

func (cp *ClusterPool) Get() redis.Conn

Get gets the redis.Conn interface that handles the redirecting automatically

func (*ClusterPool) GetAddrsBySlots

func (cp *ClusterPool) GetAddrsBySlots(slots []int, readOnly bool) ([]string, error)

func (*ClusterPool) GetContext

func (cp *ClusterPool) GetContext(ctx context.Context) redis.Conn

GetContext gets the redis.Conn interface that handles the redirecting automatically

func (*ClusterPool) GetNoRedirConn

func (cp *ClusterPool) GetNoRedirConn() redis.Conn

GetNoRedirConn gets the redis.Conn interface without redirecting handling, which allows you handling the redirecting

func (*ClusterPool) GetPubSubConn

func (cp *ClusterPool) GetPubSubConn() (*redis.PubSubConn, error)

GetPubSubConn gets the redis.PubSubConn

func (*ClusterPool) GetRandomRealConn

func (cp *ClusterPool) GetRandomRealConn() (redis.Conn, error)

func (*ClusterPool) GetReadonlyConn

func (cp *ClusterPool) GetReadonlyConn() redis.Conn

GetConnWithoutRedir gets the redis.Conn interface without redirecting handling, which allows you handling the redirecting

func (*ClusterPool) GetShardedPubSubConn

func (cp *ClusterPool) GetShardedPubSubConn() (*ShardedPubSubConn, error)

GetShardedPubSubConn gets the ShardedPubSubConn

func (*ClusterPool) IdleCount

func (cp *ClusterPool) IdleCount() int

IdleCount returns the total idle connection count in the cluster pool

func (*ClusterPool) ReloadSlotMapping

func (cp *ClusterPool) ReloadSlotMapping() error

ReloadSlots reloads the slot mapping

func (*ClusterPool) Stats

func (cp *ClusterPool) Stats() map[string]redis.PoolStats

Stats gets the redis.PoolStats of the current cluster

func (*ClusterPool) VerbosSlotMapping

func (cp *ClusterPool) VerbosSlotMapping() string

VerbosSlots returns the slot mapping of the cluster with a readable string

type RedirInfo

type RedirInfo struct {
	// Kind indicates the redirection type, MOVED or ASK
	Kind string

	// Slot is the slot number of the redirecting
	Slot int

	// Addr is the node address to redirect to
	Addr string

	// Raw is the original error string
	Raw string
}

func ParseRedirInfo

func ParseRedirInfo(err error) *RedirInfo

ParseRedirInfo parses the redirecting error into redirInfo

type ShardedPubSubConn

type ShardedPubSubConn struct {
	// contains filtered or unexported fields
}

ShardedPubSubConn wraps a Conn with convenience API for sharded PubSub.

func (*ShardedPubSubConn) Close

func (c *ShardedPubSubConn) Close() error

Close closes the connection.

func (*ShardedPubSubConn) Ping

func (c *ShardedPubSubConn) Ping(data string) error

Ping sends a PING to the server with the specified data. The connection must be subscribed to at least one channel or pattern when calling this method.

func (*ShardedPubSubConn) Receive

func (c *ShardedPubSubConn) Receive() interface{}

Receive returns a pushed message as a Subscription, Message, Pong or error.

func (*ShardedPubSubConn) ReceiveContext

func (c *ShardedPubSubConn) ReceiveContext(ctx context.Context) interface{}

ReceiveContext is like Receive, but it allows termination of the receive via a Context. If the call returns due to closure of the context's Done channel the underlying Conn will have been closed.

func (*ShardedPubSubConn) ReceiveWithTimeout

func (c *ShardedPubSubConn) ReceiveWithTimeout(timeout time.Duration) interface{}

ReceiveWithTimeout is like Receive, but it allows the application to override the connection's default timeout.

func (*ShardedPubSubConn) SSubscribe

func (c *ShardedPubSubConn) SSubscribe(channel ...interface{}) error

Subscribe subscribes the connection to the specified channels.

func (*ShardedPubSubConn) SUnsubscribe

func (c *ShardedPubSubConn) SUnsubscribe(channel ...interface{}) error

SUnsubscribe unsubscribes the connection from the given sharded channels, or from all of them if none is given.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL