redis

package
v0.9.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 3, 2020 License: MIT Imports: 11 Imported by: 0

Documentation

Overview

Package redis contains common parts for other packages.

- main interfaces visible to user (Sender, Scanner, ScanOpts)

- wrappers for synchronous interface over Sender (Sync, SyncCtx) and chan-based-future interface (ChanFutured)

- request writing,

- response parsing,

- root errorx namespace and common error types.

Usually you get Sender from redisconn.Connect or rediscluster.NewCluster, then wrap with Sync or SyncCtx, and use their sync methods without any locking:

sender, err := redisconn.Connect(ctx, "127.0.0.1:6379", redisconn.Opts{})
sync := redis.Sync{sender}
go func() {
	res := sync.Do("GET", "x")
	if err := redis.AsError(res); err != nil {
		log.Println("failed", err)
	}
	log.Println("found x", res)
}()
go func() {
	results := sync.SendMany([]redis.Request{
		redis.Req("GET", "k1"),
		redis.Req("Incr", "k2"),
		redis.Req("HMGET, "h1", "hk1", "hk2"),
	})
	if err := redis.AsError(results[0]); err != nil {
		log.Println("failed", err)
	}
	if results[0] == nil {
		log.Println("not found")
	} else {
		log.Println("k1: ", results[0])
	}
}()

See more documentation in root redispipe package.

Index

Examples

Constants

This section is empty.

Variables

View Source
var (
	// Errors is a root namespaces of all redispipe errors.
	Errors = errorx.NewNamespace("redispipe").ApplyModifiers(errorx.TypeModifierOmitStackTrace)

	// ErrOpts - options are wrong
	ErrOpts = Errors.NewSubNamespace("opts")
	// ErrContextIsNil - context is not passed to constructor
	ErrContextIsNil = ErrOpts.NewType("context_is_nil")
	// ErrNoAddressProvided - no address is given to constructor
	ErrNoAddressProvided = ErrOpts.NewType("no_address")

	// ErrTraitNotSent signals request were not written to wire
	ErrTraitNotSent = errorx.RegisterTrait("request_not_sent")

	// ErrContextClosed - context were explicitly closed (or connection / cluster were shut down)
	ErrContextClosed = Errors.NewType("connection_context_closed", ErrTraitNotSent)

	// ErrTraitConnectivity marks all networking and io errors
	ErrTraitConnectivity = errorx.RegisterTrait("network")

	// ErrIO - io error: read/write error, or timeout, or connection closed while reading/writing
	// It is not known if request were processed or not
	ErrIO = Errors.NewType("io error", ErrTraitConnectivity)

	// ErrRequest - request malformed. Can not serialize request, no reason to retry.
	ErrRequest = Errors.NewSubNamespace("request")
	// ErrArgumentType - argument is not serializable
	ErrArgumentType = ErrRequest.NewType("argument_type")
	// ErrBatchFormat - some other command in batch is malformed
	ErrBatchFormat = ErrRequest.NewType("batch_format")
	// ErrNoSlotKey - no key to determine cluster slot
	ErrNoSlotKey = ErrRequest.NewType("no_slot_key")
	// ErrRequestCancelled - request already cancelled
	ErrRequestCancelled = ErrRequest.NewType("request_cancelled")
	// ErrCommandForbidden - command is blocking or dangerous
	ErrCommandForbidden = ErrRequest.NewType("command_forbidden")

	// ErrResponse - response malformed. Redis returns unexpected response.
	ErrResponse = Errors.NewSubNamespace("response")
	// ErrResponseFormat - response is not valid Redis response
	ErrResponseFormat = ErrResponse.NewType("format")
	// ErrResponseUnexpected - response is valid redis response, but its structure/type unexpected
	ErrResponseUnexpected = ErrResponse.NewType("unexpected")
	// ErrHeaderlineTooLarge - header line too large
	ErrHeaderlineTooLarge = ErrResponse.NewType("headerline_too_large")
	// ErrHeaderlineEmpty - header line is empty
	ErrHeaderlineEmpty = ErrResponse.NewType("headerline_empty")
	// ErrIntegerParsing - integer malformed
	ErrIntegerParsing = ErrResponse.NewType("integer_parsiing")
	// ErrNoFinalRN - no final "\r\n"
	ErrNoFinalRN = ErrResponse.NewType("no_final_rn")
	// ErrUnknownHeaderType - unknown header type
	ErrUnknownHeaderType = ErrResponse.NewType("unknown_headerline_type")
	// ErrPing - ping receives wrong response
	ErrPing = ErrResponse.NewType("ping")

	// ErrTraitClusterMove signals that error happens due to cluster rebalancing.
	ErrTraitClusterMove = errorx.RegisterTrait("cluster_move")

	// ErrResult - just regular redis response.
	ErrResult = Errors.NewType("result")
	// ErrMoved - MOVED response
	ErrMoved = ErrResult.NewSubtype("moved", ErrTraitClusterMove)
	// ErrAsk - ASK response
	ErrAsk = ErrResult.NewSubtype("ask", ErrTraitClusterMove)
	// ErrLoading - redis didn't finish start
	ErrLoading = ErrResult.NewSubtype("loading", ErrTraitNotSent)
	// ErrExecEmpty - EXEC returns nil (WATCH failed) (it is strange, cause we don't support WATCH)
	ErrExecEmpty = ErrResult.NewSubtype("exec_empty")
	// ErrExecAbort - EXEC returns EXECABORT
	ErrExecAbort = ErrResult.NewSubtype("exec_abort")
	// ErrTryAgain - EXEC returns TryAgain
	ErrTryAgain = ErrResult.NewSubtype("exec_try_again")
)
View Source
var (
	// EKLine - set by response parser for unrecognized header lines.
	EKLine = errorx.RegisterProperty("line")
	// EKMovedTo - set by response parser for MOVED and ASK responses.
	EKMovedTo = errorx.RegisterProperty("movedto")
	// EKSlot - set by response parser for MOVED and ASK responses.
	EKSlot = errorx.RegisterPrintableProperty("slot")
	// EKVal - set by request writer and checker to argument value which could not be serialized.
	EKVal = errorx.RegisterPrintableProperty("val")
	// EKArgPos - set by request writer and checker to argument position which could not be serialized.
	EKArgPos = errorx.RegisterPrintableProperty("argpos")
	// EKRequest - request that triggered error.
	EKRequest = errorx.RegisterPrintableProperty("request")
	// EKRequests - batch requests that triggered error.
	EKRequests = errorx.RegisterPrintableProperty("requests")
	// EKResponse - unexpected response
	EKResponse = errorx.RegisterProperty("response")
	// EKAddress - address of redis that has a problems
	EKAddress = errorx.RegisterPrintableProperty("address")
)
View Source
var (
	// CollectTrace - should Sync and SyncCtx wrappers collect stack traces on a call side.
	CollectTrace = false
)
View Source
var ScanEOF = errors.New("Iteration finished")

ScanEOF is error returned by Sync wrappers when iteration exhausted.

Functions

func AppendRequest

func AppendRequest(buf []byte, req Request) ([]byte, error)

AppendRequest appends request to byte slice as RESP request (ie as array of strings).

It could fail if some request value is not nil, integer, float, string or byte slice. In case of error it still returns modified buffer, but truncated to original size, it could be used save reallocation.

Note: command could contain single space. In that case, it will be split and last part will be prepended to arguments.

Example
package main

import (
	"fmt"
	"time"

	"github.com/joomcode/redispipe/redis"
)

func main() {
	req, err := redis.AppendRequest(nil, redis.Req("GET", "one"))
	fmt.Printf("%q\n%v\n", req, err)
	req, err = redis.AppendRequest(req, redis.Req("INCRBY", "cnt", 5))
	fmt.Printf("%q\n%v\n", req, err)
	req, err = redis.AppendRequest(req, redis.Req("SENDFOO", time.Second))
	fmt.Printf("%q\n%v\n", req, err)

}
Output:

"*2\r\n$3\r\nGET\r\n$3\r\none\r\n"
<nil>
"*2\r\n$3\r\nGET\r\n$3\r\none\r\n*3\r\n$6\r\nINCRBY\r\n$3\r\ncnt\r\n$1\r\n5\r\n"
<nil>
"*2\r\n$3\r\nGET\r\n$3\r\none\r\n*3\r\n$6\r\nINCRBY\r\n$3\r\ncnt\r\n$1\r\n5\r\n"
redispipe.request.argument_type: {request: Req("SENDFOO", ["1s"]), argpos: 0, val: 1s}

func ArgToString

func ArgToString(arg interface{}) (string, bool)

ArgToString returns string representataion of an argument. Used in cluster to determine cluster slot. Have to be in sync with AppendRequest

func AsError

func AsError(v interface{}) error

AsError casts interface to error (if it is error)

Example
package main

import (
	"errors"
	"fmt"

	"github.com/joomcode/redispipe/redis"
)

func main() {
	vals := []interface{}{
		nil,
		1,
		"hello",
		errors.New("high"),
		redis.ErrResult.New("goodbye"),
	}

	for _, v := range vals {
		fmt.Printf("%T %v => %T %v\n", v, v, redis.AsError(v), redis.AsError(v))
	}

}
Output:

<nil> <nil> => <nil> <nil>
int 1 => <nil> <nil>
string hello => <nil> <nil>
*errors.errorString high => *errors.errorString high
*errorx.Error redispipe.result: goodbye => *errorx.Error redispipe.result: goodbye

func AsErrorx

func AsErrorx(v interface{}) *errorx.Error

AsErrorx casts interface to *errorx.Error. It panics if value is error but not *redis.Error.

func Blocking

func Blocking(name string) bool

Blocking returns true if command is known to be blocking. Blocking commands could stall whole pipeline and therefore affect other commands sent through this connection. It is undesirable and prevented by default.

This commands are forbidden in default configuration, but could be enabled with `SingleThreaded` connection option.

`WATCH` command is also included here because while it is dangerous in concurrent environment, it is safe to be used in single threaded case.

func CheckRequest

func CheckRequest(req Request, singleThreaded bool) error

CheckRequest checks requests command and arguments to be compatible with connector.

func Dangerous

func Dangerous(name string) bool

Dangerous returns true if command is not safe to use with the connector. Currently it includes `SUBSCRIBE`, `PSUBSCRIBE` commands, because they changes connection protocol mode.

func ForbiddenCommand

func ForbiddenCommand(name string, singleThreaded bool) error

ForbiddenCommand returns true if command is not allowed to run.

func ReadResponse

func ReadResponse(b *bufio.Reader) interface{}

ReadResponse reads single RESP answer from bufio.Reader

func ReplicaSafe

func ReplicaSafe(name string) bool

ReplicaSafe returns true if command is readonly and "safe to run on replica". Some commands like "scan" are not included, because their result could differ between master and replica.

func ScanResponse

func ScanResponse(res interface{}) ([]byte, []string, error)

ScanResponse parses response of Scan command, returns iterator and array of keys.

func TransactionResponse

func TransactionResponse(res interface{}) ([]interface{}, error)

TransactionResponse parses response of EXEC command, returns array of answers.

Types

type ChanFuture

type ChanFuture struct {
	// contains filtered or unexported fields
}

ChanFuture - future implemented with channel as signal of fulfillment.

func (*ChanFuture) Cancelled

func (f *ChanFuture) Cancelled() error

Cancelled - implementation of Future.Cancelled (always false).

func (*ChanFuture) Done

func (f *ChanFuture) Done() <-chan struct{}

Done returns channel that will be closed on fulfillment.

func (*ChanFuture) Resolve

func (f *ChanFuture) Resolve(res interface{}, _ uint64)

Resolve - implementation of Future.Resolve

func (*ChanFuture) Value

func (f *ChanFuture) Value() interface{}

Value waits for result to be fulfilled and returns result.

type ChanFutured

type ChanFutured struct {
	S Sender
}

ChanFutured wraps Sender and provides asynchronous interface through future implemented with channel.

func (ChanFutured) Send

func (s ChanFutured) Send(r Request) *ChanFuture

Send sends requests and returns ChanFuture for result.

func (ChanFutured) SendMany

func (s ChanFutured) SendMany(reqs []Request) ChanFutures

SendMany sends several requests and returns slice of ChanFuture for results.

func (ChanFutured) SendTransaction

func (s ChanFutured) SendTransaction(r []Request) *ChanTransaction

SendTransaction sends several requests as MULTI+EXEC transaction, returns ChanTransaction - wrapper around ChanFuture with additional method.

type ChanFutures

type ChanFutures []*ChanFuture

ChanFutures - implementation of Future over slice of *ChanFuture

func (ChanFutures) Cancelled

func (f ChanFutures) Cancelled() error

Cancelled - implementation of Future.Cancelled (always false).

func (ChanFutures) Resolve

func (f ChanFutures) Resolve(res interface{}, i uint64)

Resolve - implementation of Future.Resolve. It resolves ChanFuture corresponding to index.

type ChanTransaction

type ChanTransaction struct {
	ChanFuture
}

ChanTransaction - wrapper over ChanFuture with additional convenient method.

func (*ChanTransaction) Results

func (f *ChanTransaction) Results() ([]interface{}, error)

Results - parses result of transaction and returns it as an array of results.

type FuncFuture

type FuncFuture func(res interface{}, n uint64)

FuncFuture simple wrapper that makes Future from function.

func (FuncFuture) Cancelled

func (f FuncFuture) Cancelled() error

Cancelled implements Future.Cancelled (always false)

func (FuncFuture) Resolve

func (f FuncFuture) Resolve(res interface{}, n uint64)

Resolve implements Future.Resolve (by calling wrapped function).

type Future

type Future interface {
	// Resolve is called by sender to pass result (or error) for particular request.
	// Single future could be used for accepting multiple results.
	// n argument is used then to distinguish request this result is for.
	Resolve(res interface{}, n uint64)
	// Cancelled method could inform sender that request is abandoned.
	// It is called usually before sending request, and if Cancelled returns non-nil error,
	// then Sender calls Resolve with ErrRequestCancelled error wrapped around returned error.
	Cancelled() error
}

Future is interface accepted by Sender to signal request completion.

type Request

type Request struct {
	// Cmd is a redis command to be sent.
	// It could contain single space, then it will be split, and last part will be serialized as an argument.
	Cmd  string
	Args []interface{}
}

Request represents request to be passed to redis.

func Req

func Req(cmd string, args ...interface{}) Request

Req - convenient wrapper to create Request.

func (Request) Key

func (r Request) Key() (string, bool)

Key returns first field of request that should be used as a key for redis cluster.

func (Request) String

func (r Request) String() string

type ScanOpts

type ScanOpts struct {
	// Cmd - command to be sent. Could be 'SCAN', 'SSCAN', 'HSCAN', 'ZSCAN'
	// default is 'SCAN'
	Cmd string
	// Key - key for SSCAN, HSCAN and ZSCAN command
	Key string
	// Match - pattern for filtering keys
	Match string
	// Count - soft-limit of single *SCAN answer
	Count int
}

ScanOpts is options for scanning

func (ScanOpts) Request

func (s ScanOpts) Request(it []byte) Request

Request returns corresponding request to be send. Used mostly internally

type Scanner

type Scanner interface {
	// Next will call cb.Resolve(result, 0) where `results` is keys part of result of SCAN/HSCAN/SSCAN/ZSCAN
	// (ie iterator part is handled internally).
	// When iteration completes, cb.Resolve(nil, 0) will be called.
	Next(cb Future)
}

Scanner is an object used for scanning redis key space. It is returned by Sender.Scanner().

Example
package main

import (
	"context"
	"fmt"
	"log"

	"github.com/joomcode/redispipe/redis"
	"github.com/joomcode/redispipe/redisconn"
	"github.com/joomcode/redispipe/testbed"
)

func main() {
	defer runServer(46231)()
	ctx := context.Background()
	conn, _ := redisconn.Connect(ctx, "127.0.0.1:46231", redisconn.Opts{
		Logger: redisconn.NoopLogger{},
	})
	sync := redis.Sync{conn}
	sync.Do("SET", "key1", "val1")
	sync.Do("SET", "key2", "val2")
	scan := sync.Scanner(redis.ScanOpts{Match: "key*"})
	for {
		keys, err := scan.Next()
		if err != nil {
			if err != redis.ScanEOF {
				log.Fatal(err)
			}
			break
		}
		for _, key := range keys {
			fmt.Println(key)
		}
	}

}

func runServer(port int) func() {
	testbed.InitDir(".")
	s := testbed.Server{Port: uint16(port)}
	s.Start()
	return func() {
		s.Stop()
		testbed.RmDir()
	}
}
Output:

key1
key2

type ScannerBase

type ScannerBase struct {
	// ScanOpts - options for this scanning
	ScanOpts
	// Iter - current iterator state
	Iter []byte
	// Err - error occurred. Implementation should stop iteration if Err is nil.
	Err error
	// contains filtered or unexported fields
}

ScannerBase is internal "parent" object for scanner implementations

func (*ScannerBase) Cancelled

func (s *ScannerBase) Cancelled() error

Cancelled - implements Future.Cancelled method

func (*ScannerBase) DoNext

func (s *ScannerBase) DoNext(cb Future, snd Sender)

DoNext - perform next step of iteration - send corresponding *SCAN command

func (*ScannerBase) IterLast

func (s *ScannerBase) IterLast() bool

IterLast - return true if iterator is at the end of this server/key keyspace.

func (*ScannerBase) Resolve

func (s *ScannerBase) Resolve(res interface{}, _ uint64)

Resolve - implements Future.Resolve. Accepts result of *SCAN command, remembers error and iterator and calls Resolve on underlying future.

type Sender

type Sender interface {
	// Send sends request to redis. When response will arrive, cb.Resolve(result, n) will be called.
	// Note: cb.Resolve could be called before Send returns.
	Send(r Request, cb Future, n uint64)
	// SendMany sends many requests at once.
	// When responses will arrive, cb.Resolve will be called with distinct n values:
	// - first request's response will be passed as cb.Resolve(response, n)
	// - second request's response will be passed as cb.Resolve(response, n+1)
	// - third ... cb.Resolve(response, n+2)
	// Note: responses could arrive in arbitrary order.
	SendMany(r []Request, cb Future, n uint64)
	// SendTransaction sends several requests as MULTI+EXEC redis transaction.
	// Response will be passed only once as an array of responses to commands (as EXEC does)
	// cb.Resolve([]interface{res1, res2, res3, ...}, n)
	SendTransaction(r []Request, cb Future, n uint64)
	// Scanner returns scanner object that scans keyspace sequentially.
	Scanner(opts ScanOpts) Scanner
	// EachShard synchronously calls callback for each shard.
	// Single-connection client will call it only once, but clustered will call for every master.
	// If callback is called with error, it will not be called again.
	// If callback returns false, iteration stops.
	EachShard(func(Sender, error) bool)
	// Close closes client. All following requests will be immediately resolved with error.
	Close()
}

Sender is interface of client implementation. It provides interface in term of Future, and could be either single connection, connection to cluster, or whatever.

type Sync

type Sync struct {
	S Sender
}

Sync provides convenient synchronouse interface over asynchronouse Sender.

Example
package main

import (
	"context"
	"fmt"

	"github.com/joomcode/redispipe/redis"
	"github.com/joomcode/redispipe/redisconn"
	"github.com/joomcode/redispipe/testbed"
)

func main() {
	defer runServer(46231)()
	ctx := context.Background()
	conn, _ := redisconn.Connect(ctx, "127.0.0.1:46231", redisconn.Opts{
		Logger: redisconn.NoopLogger{},
	})
	sync := redis.Sync{conn}

	res := sync.Do("SET", "key1", "1")
	fmt.Println(res)

	res = sync.Send(redis.Req("SET", "key2", "2"))
	fmt.Println(res)

	ress := sync.SendMany([]redis.Request{
		redis.Req("GET", "key1"),
		redis.Req("GET", "key2"),
	})
	fmt.Printf("%q\n", ress)

	res = sync.Do("HSET", "key1", "field1", "val1")
	fmt.Println(redis.AsError(res))

	ress, err := sync.SendTransaction([]redis.Request{
		redis.Req("INCR", "key1"),
		redis.Req("INCRBY", "key2", -1),
		redis.Req("GET", "key1"),
		redis.Req("GET", "key2"),
	})
	fmt.Println(err)
	fmt.Printf("%q\n", ress)

}

func runServer(port int) func() {
	testbed.InitDir(".")
	s := testbed.Server{Port: uint16(port)}
	s.Start()
	return func() {
		s.Stop()
		testbed.RmDir()
	}
}
Output:

OK
OK
["1" "2"]
redispipe.result: WRONGTYPE Operation against a key holding the wrong kind of value {request: Req("HSET", ["key1" "field1" "val1"]), address: 127.0.0.1:46231}
<nil>
['\x02' '\x01' "2" "1"]

func (Sync) Do

func (s Sync) Do(cmd string, args ...interface{}) interface{}

Do is convenient method to construct and send request. Returns value that could be either result or error.

func (Sync) Scanner

func (s Sync) Scanner(opts ScanOpts) SyncIterator

Scanner returns synchronous iterator over redis keyspace/key.

func (Sync) Send

func (s Sync) Send(r Request) interface{}

Send sends request to redis. Returns value that could be either result or error.

func (Sync) SendMany

func (s Sync) SendMany(reqs []Request) []interface{}

SendMany sends several requests in "parallel" and returns slice or results in a same order. Each result could be value or error.

func (Sync) SendTransaction

func (s Sync) SendTransaction(reqs []Request) ([]interface{}, error)

SendTransaction sends several requests as a single MULTI+EXEC transaction. It returns array of responses and an error, if transaction fails. Since Redis transaction either fully executed or fully failed, all values are valid if err == nil.

type SyncCtx

type SyncCtx struct {
	S Sender
}

SyncCtx (like Sync) provides convenient synchronous interface over asynchronous Sender. Its methods accept context.Context to allow early request cancelling. Note that if context were cancelled after request were send, redis still will execute it, but you will have no way to know about that fact.

func (SyncCtx) Do

func (s SyncCtx) Do(ctx context.Context, cmd string, args ...interface{}) interface{}

Do is convenient method to construct and send request. Returns value that could be either result or error. When context is cancelled, Do returns ErrRequestCancelled error.

func (SyncCtx) Scanner

func (s SyncCtx) Scanner(ctx context.Context, opts ScanOpts) SyncCtxIterator

Scanner returns synchronous iterator over redis keyspace/key. Scanner will stop iteration if context were cancelled.

func (SyncCtx) Send

func (s SyncCtx) Send(ctx context.Context, r Request) interface{}

Send sends request to redis. Returns value that could be either result or error. When context is cancelled, Send returns ErrRequestCancelled error.

func (SyncCtx) SendMany

func (s SyncCtx) SendMany(ctx context.Context, reqs []Request) []interface{}

SendMany sends several requests in "parallel" and returns slice or results in a same order. Each result could be value or error. When context is cancelled, SendMany returns slice of ErrRequestCancelled errors.

func (SyncCtx) SendTransaction

func (s SyncCtx) SendTransaction(ctx context.Context, reqs []Request) ([]interface{}, error)

SendTransaction sends several requests as a single MULTI+EXEC transaction. It returns array of responses and an error, if transaction fails. Since Redis transaction either fully executed or fully failed, all values are valid if err == nil. But some of them could be error on their own. When context is cancelled, SendTransaction returns ErrRequestCancelled error.

type SyncCtxIterator

type SyncCtxIterator struct {
	// contains filtered or unexported fields
}

SyncCtxIterator is synchronous iterator over repeating *SCAN command. It will stop iteration if context were cancelled.

func (SyncCtxIterator) Next

func (s SyncCtxIterator) Next() ([]string, error)

Next returns next bunch of keys, or error. ScanEOF error signals for regular iteration completion. It will return ErrRequestCancelled error if context were cancelled.

type SyncIterator

type SyncIterator struct {
	// contains filtered or unexported fields
}

SyncIterator is synchronous iterator over repeating *SCAN command.

func (SyncIterator) Next

func (s SyncIterator) Next() ([]string, error)

Next returns next bunch of keys, or error. ScanEOF error signals for regular iteration completion.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL