redisconn

package
v0.9.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 3, 2020 License: MIT Imports: 11 Imported by: 0

Documentation

Overview

Package redisconn implements connection to single redis server.

Connection is "wrapper" around a single tcp (unix-socket) connection. All requests are fed into a single connection, and responses are asynchronously read from it. Connection is thread-safe, meaning it doesn't need external synchronization. Connect is responsible for reconnection, but it does not retry requests in the case of networking problems.

Index

Constants

View Source
const (
	// DoAsking is a flag for Connection.SendBatchFlag signalling to send ASKING request before transactions.
	DoAsking = 1
	// DoTransaction is a flag for Connection.SendBatchFlag signalling to wrap bunch of requests into MULTI/EXEC.
	DoTransaction = 2

	PingMaxLatency         = 10 * time.Second
	PingLatencyGranularity = 10 * time.Microsecond
)

Variables

View Source
var (
	// ErrConnection - connection was not established at the moment request were done,
	// request is definitely not sent anywhere.
	ErrConnection = redis.Errors.NewSubNamespace("connection", redis.ErrTraitNotSent, redis.ErrTraitConnectivity)
	// ErrNotConnected - connection were not established at the moment
	ErrNotConnected = ErrConnection.NewType("not_connected")
	// ErrDial - could not connect.
	ErrDial = ErrConnection.NewType("could_not_connect")
	// ErrAuth - password didn't match
	ErrAuth = ErrConnection.NewType("count_not_auth", ErrTraitInitPermanent)
	// ErrInit - other error during initial conversation with redis
	ErrInit = ErrConnection.NewType("initialization_error", ErrTraitInitPermanent)
	// ErrConnSetup - other connection initialization error (including io errors)
	ErrConnSetup = ErrConnection.NewType("initialization_temp_error")

	// ErrTraitInitPermanent signals about non-transient error in initial communication with redis.
	// It means that either authentication fails or selected database doesn't exists or redis
	// behaves in unexpected way.
	ErrTraitInitPermanent = errorx.RegisterTrait("init_permanent")
)
View Source
var (
	// EKConnection - key for connection that handled request.
	EKConnection = errorx.RegisterProperty("connection")
	// EKDb - db number to select.
	EKDb = errorx.RegisterPrintableProperty("db")
)

Functions

This section is empty.

Types

type Connection

type Connection struct {
	// contains filtered or unexported fields
}

Connection is implementation of redis.Sender which represents single connection to single redis instance.

Underlying net.Conn is re-established as necessary. Queries are not retried in case of connection errors. Connection is safe for multi-threaded usage, ie it doesn't need in synchronisation.

func Connect

func Connect(ctx context.Context, addr string, opts Opts) (conn *Connection, err error)

Connect establishes new connection to redis server. Connect will be automatically closed if context will be cancelled or timeouted. But it could be closed explicitly as well.

func (*Connection) Addr

func (conn *Connection) Addr() string

Addr returns configurred address

func (*Connection) Close

func (conn *Connection) Close()

Close closes connection forever

func (*Connection) ConnectedNow

func (conn *Connection) ConnectedNow() bool

ConnectedNow answers if connection is certainly connected at the moment

func (*Connection) Ctx

func (conn *Connection) Ctx() context.Context

Ctx returns context of this connection

func (*Connection) EachShard

func (c *Connection) EachShard(cb func(redis.Sender, error) bool)

EachShard implements redis.Sender.EachShard. It just calls callback once with Connection itself.

func (*Connection) Handle

func (conn *Connection) Handle() interface{}

Handle returns user specified handle from Opts

func (*Connection) LocalAddr

func (conn *Connection) LocalAddr() string

LocalAddr is outgoing socket addr Attention: do not call this method from Logger.Report, because it could lead to deadlock!

func (*Connection) MayBeConnected

func (conn *Connection) MayBeConnected() bool

MayBeConnected answers if connection either connected or connecting at the moment. Ie it returns false if connection is disconnected at the moment, and reconnection is not started yet.

func (*Connection) Ping

func (conn *Connection) Ping() error

Ping sends ping request synchronously

func (*Connection) PingLatency added in v0.9.2

func (conn *Connection) PingLatency() time.Duration

PingLatency returns last known ping latency

func (*Connection) RemoteAddr

func (conn *Connection) RemoteAddr() string

RemoteAddr is address of Redis socket Attention: do not call this method from Logger.Report, because it could lead to deadlock!

func (*Connection) Scanner

func (c *Connection) Scanner(opts redis.ScanOpts) redis.Scanner

Scanner implements redis.Sender.Scanner

func (*Connection) Send

func (conn *Connection) Send(req Request, cb Future, n uint64)

Send implements redis.Sender.Send It sends request asynchronously. At some moment in a future it will call cb.Resolve(result, n) But if cb is cancelled, then cb.Resolve will be called immediately.

func (*Connection) SendAsk

func (conn *Connection) SendAsk(req Request, cb Future, n uint64, asking bool)

SendAsk is a helper method for redis-cluster client implementation. If asking==true, it will send request with ASKING request sent before.

func (*Connection) SendBatch

func (conn *Connection) SendBatch(requests []Request, cb Future, start uint64)

SendBatch sends several requests in preserved order. They will be serialized to network in the order passed.

func (*Connection) SendBatchFlags

func (conn *Connection) SendBatchFlags(requests []Request, cb Future, start uint64, flags int)

SendBatchFlags sends several requests in preserved order with addition ASKING, MULTI+EXEC commands. If flag&DoAsking != 0 , then "ASKING" command is prepended. If flag&DoTransaction != 0, then "MULTI" command is prepended, and "EXEC" command appended. Note: cb.Resolve will be also called with start+len(requests) index with result of EXEC command. It is mostly helper method for SendTransaction for single connect and cluster implementations.

Note: since it is used for transaction, single wrong argument in single request will result in error for all commands in a batch.

func (*Connection) SendMany

func (conn *Connection) SendMany(requests []Request, cb Future, start uint64)

SendMany implements redis.Sender.SendMany Sends several requests asynchronously. Fills with cb.Resolve(res, n), cb.Resolve(res, n+1), ... etc. Note: it could resolve requests in arbitrary order.

func (*Connection) SendTransaction

func (conn *Connection) SendTransaction(reqs []Request, cb Future, off uint64)

SendTransaction implements redis.Sender.SendTransaction

func (*Connection) String

func (conn *Connection) String() string

String implements fmt.Stringer

type DefaultLogger

type DefaultLogger struct{}

DefaultLogger is default implementation of Logger

func (DefaultLogger) Report

func (d DefaultLogger) Report(conn *Connection, event LogEvent)

Report implements Logger.Report

func (DefaultLogger) ReqStat

func (d DefaultLogger) ReqStat(conn *Connection, req Request, res interface{}, nanos int64)

ReqStat implements Logger.ReqStat

type Future

type Future = redis.Future

Future is an alias for redis.Future

type LogConnectFailed

type LogConnectFailed struct {
	Error error // - failure reason
}

LogConnectFailed is logged when connection establishing were unsuccessful.

type LogConnected

type LogConnected struct {
	LocalAddr  string // - local ip:port
	RemoteAddr string // - remote ip:port
}

LogConnected is logged when Connection established connection to redis.

type LogConnecting

type LogConnecting struct{}

LogConnecting is an event logged when Connection starts dialing to redis.

type LogContextClosed

type LogContextClosed struct {
	Error error // - ctx.Err()
}

LogContextClosed is logged when Connection's context were closed, or Connection.Close() called. Ie when connection is explicitly closed by user.

type LogDisconnected

type LogDisconnected struct {
	Error      error  // - disconnection reason
	LocalAddr  string // - local ip:port
	RemoteAddr string // - remote ip:port
}

LogDisconnected is logged when connection were broken.

type LogEvent

type LogEvent interface {
	// contains filtered or unexported methods
}

LogEvent is a sum-type for events to be logged.

type Logger

type Logger interface {
	// Report will be called when some events happens during connection's lifetime.
	// Default implementation just prints this information using standard log package.
	Report(conn *Connection, event LogEvent)
	// ReqStat is called after request receives it's answer with request/result information
	// and time spend to fulfill request.
	// Default implementation is no-op.
	ReqStat(conn *Connection, req Request, res interface{}, nanos int64)
}

Logger is a type for custom event and stat reporter.

type NoopLogger

type NoopLogger struct{}

NoopLogger is noop implementation of Logger Useful in tests

func (NoopLogger) Report

func (d NoopLogger) Report(*Connection, LogEvent)

Report implements Logger.Report

func (NoopLogger) ReqStat

func (d NoopLogger) ReqStat(conn *Connection, req Request, res interface{}, nanos int64)

ReqStat implements Logger.ReqStat

type Opts

type Opts struct {
	// DB - database number
	DB int
	// Password for AUTH
	Password string
	// IOTimeout - timeout on read/write to socket.
	// If IOTimeout == 0, then it is set to 1 second
	// If IOTimeout < 0, then timeout is disabled
	IOTimeout time.Duration
	// DialTimeout is timeout for net.Dialer
	// If it is <= 0 or >= IOTimeout, then IOTimeout
	// If IOTimeout is disabled, then 5 seconds used (but without affect on ReconnectPause)
	DialTimeout time.Duration
	// ReconnectPause is a pause after failed connection attempt before next one.
	// If ReconnectPause < 0, then no reconnection will be performed.
	// If ReconnectPause == 0, then DialTimeout * 2 is used
	ReconnectPause time.Duration
	// TCPKeepAlive - KeepAlive parameter for net.Dialer
	// default is IOTimeout / 3
	TCPKeepAlive time.Duration
	// Handle is returned with Connection.Handle()
	Handle interface{}
	// WritePause - write loop pauses for this time to collect more requests.
	// Default is 50 microseconds. Recommended value is 150 microseconds.
	// Set < 0 to disable for single threaded use case.
	WritePause time.Duration
	// Logger
	Logger Logger
	// AsyncDial - do not establish connection immediately
	AsyncDial bool
	// ScriptMode - enables blocking commands and turns default WritePause to -1.
	// It will allow to use this connector in script like (ie single threaded) environment
	// where it is ok to use blocking commands and pipelining gives no gain.
	ScriptMode bool
}

Opts - options for Connection

type Request

type Request = redis.Request

Request is an alias for redis.Request

type Scanner

type Scanner struct {
	redis.ScannerBase
	// contains filtered or unexported fields
}

Scanner is an implementation of redis.Scanner

func (*Scanner) Next

func (s *Scanner) Next(cb redis.Future)

Next is an implementation of redis.Scanner.Next

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL