rediscluster

package
v0.9.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 3, 2020 License: MIT Imports: 13 Imported by: 0

Documentation

Overview

Package rediscluster implements a connector for redis cluster.

Cluster automatically learns and periodically refreshes cluster configuration. It could send requests to slaves (if a corresponding policy is used), and could retry read requests within replicaset and write requests with connections to the same master host (if it is known that requests were not sent).

It reacts on set CLUSTER_SELF:MASTER_ONLY stored in the cluster itself to force master-only policy on some slots. It is used by proprietary tool for correct and fast cluster rebalancing.

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrCluster - some cluster related errors.
	ErrCluster = redis.Errors.NewSubNamespace("cluster")
	// ErrClusterSlots - fetching slots configuration failed
	ErrClusterSlots = ErrCluster.NewType("retrieve_slots")
	// ErrAddressNotResolved - address could not be resolved
	// Cluster resolves named hosts specified as start points. If this resolution fails, this error returned.
	ErrAddressNotResolved = ErrCluster.NewType("resolve_address")
	// ErrClusterConfigEmpty - no addresses found in config.
	ErrClusterConfigEmpty = ErrCluster.NewType("config_empty")
	// ErrNoAliveConnection - no alive connection to shard
	ErrNoAliveConnection = ErrCluster.NewType("no_alive_connection", redis.ErrTraitConnectivity)
)
View Source
var (
	// EKCluster - cluster for error
	EKCluster = errorx.RegisterProperty("cluster")
	// EKClusterName - cluster name
	EKClusterName = errorx.RegisterPrintableProperty("clusterName")
	// EKPolicy - policy used to choose between master and replicas.
	EKPolicy = errorx.RegisterPrintableProperty("policy")
)

Functions

func DebugEvent

func DebugEvent(ev string)

DebugEvent is stub implementation of test-related method.

Types

type Cluster

type Cluster struct {
	// contains filtered or unexported fields
}

Cluster is implementation of redis.Sender which represents connection to redis-cluster.

Under the hood, it uses set of redisconn.Connection to individual redis servers. There could be several connections to single redis server, it is controlled by Opts.ConnsPerHost, and Opts.ConnHostPolicy specifies how to use them.

By default requests are always sent to known master of replica-set. But you could override it with Cluster.WithPolicy. Write commands still will be sent to master, unless you specify ForceMasterAndSlaves or ForcePreferSlaves policy. Note: read-only commands are hard-coded in UPCASE format, therefore command will not be recognized as read-only if it is Camel-case or low-case.

func NewCluster

func NewCluster(ctx context.Context, initAddrs []string, opts Opts) (*Cluster, error)

NewCluster creates Cluster.

It connects to specified hosts, learns cluster configuration, and triggers asynchronous connection establishing to all cluster's hosts.

func (*Cluster) Close

func (c *Cluster) Close()

Close this cluster handler (by cancelling its context)

func (*Cluster) Ctx

func (c *Cluster) Ctx() context.Context

Ctx returns context associated with this connection

func (*Cluster) EachShard

func (c *Cluster) EachShard(cb func(redis.Sender, error) bool)

EachShard implements redis.Sender.EachShard

func (*Cluster) ForceReloading

func (c *Cluster) ForceReloading()

ForceReloading forces reloading of cluster slot mapping. It is non-blocking call, and it's effect is throttled: reloading is called at most 10 times a second.

func (*Cluster) Handle

func (c *Cluster) Handle() interface{}

Handle returns configured handle.

func (*Cluster) Name

func (c *Cluster) Name() string

Name returns configured name.

func (*Cluster) Scanner

func (c *Cluster) Scanner(opts redis.ScanOpts) redis.Scanner

Scanner implements redis.Sender.Scanner.

func (*Cluster) Send

func (c *Cluster) Send(req Request, cb Future, off uint64)

Send implements redis.Sender.Send It sends request to correct shard (accordingly to know cluster configuration), handles MOVED and ASKING redirections and performs suitable retries.

func (*Cluster) SendMany

func (c *Cluster) SendMany(reqs []Request, cb Future, off uint64)

SendMany implements redis.Sender.SendMany Each request will be handled as if it were sent with Send method.

func (*Cluster) SendTransaction

func (c *Cluster) SendTransaction(reqs []Request, cb Future, off uint64)

SendTransaction implements redis.Sender.SendTransaction. It analyses commands keys, and send whole transaction to suitable shard. It redirects whole transaction on MOVED/ASKING requests, and waits a bit if not all keys in transaction were moved.

func (*Cluster) SendWithPolicy

func (c *Cluster) SendWithPolicy(policy ReplicaPolicyEnum, req Request, cb Future, off uint64)

SendWithPolicy allows to choose master/replica policy for individual requests. You can also call cluster.WithPolicy() to obtain redis.Sender with predefined policy.

func (*Cluster) SetLatencyOrientedRR added in v0.9.2

func (c *Cluster) SetLatencyOrientedRR(v bool)

SetLatencyOrientedRR changes "latency awareness" on the fly.

func (*Cluster) String

func (c *Cluster) String() string

String implements fmt.Stringer

func (*Cluster) WithPolicy

func (c *Cluster) WithPolicy(policy ReplicaPolicyEnum) PolicyMan

WithPolicy returns PolicyMan with specified policy.

type ClusterHandle

type ClusterHandle struct {
	Handle  interface{}
	Address string
	N       int
}

ClusterHandle is used to wrap cluster's handle and set it as connection's handle. You can use it in connection's logging.

type ConnHostPolicyEnum

type ConnHostPolicyEnum int8

ConnHostPolicyEnum is config enumeration of policies of connections-per-host usage.

const (
	// ConnHostPreferFirst means "always prefer first connection among established to redis instance"
	ConnHostPreferFirst ConnHostPolicyEnum = iota
	// ConnHostRoundRobin means "spread requests among connections established to redis instance"
	ConnHostRoundRobin
)

type DefaultLogger

type DefaultLogger struct{}

DefaultLogger is a default Logger implementation

func (DefaultLogger) Report

func (d DefaultLogger) Report(cluster *Cluster, event LogEvent)

Report implements Logger.Report.

func (DefaultLogger) ReqStat

func (d DefaultLogger) ReqStat(c *Cluster, conn *redisconn.Connection, req Request, res interface{}, nanos int64)

ReqStat implements Logger.ReqStat as no-op.

type FairRoundRobinSeed

type FairRoundRobinSeed struct {
	// contains filtered or unexported fields
}

FairRoundRobinSeed implements RoundRobinSeed by returning new value every time using atomic increment. It doesn't works well in practice because it reduces pipeline efficiency. It is presented only as example.

func (*FairRoundRobinSeed) Current

func (d *FairRoundRobinSeed) Current() uint32

Current implements RoundRobinSeed.Current method.

type Future

type Future = redis.Future

Future is an alias for redis.Future

type LogClusterSlotsError

type LogClusterSlotsError struct {
	Conn  *redisconn.Connection // Connection which were used for CLUSTER SLOTS
	Error error                 // observed error
}

LogClusterSlotsError is logged when CLUSTER SLOTS failed.

type LogContextClosed

type LogContextClosed struct{ Error error }

LogContextClosed is logged when cluster's context is closed.

type LogEvent

type LogEvent interface {
	// contains filtered or unexported methods
}

LogEvent is a sumtype for events to be logged.

type LogHostEvent

type LogHostEvent struct {
	Conn  *redisconn.Connection // Connection which triggers event.
	Event redisconn.LogEvent
}

LogHostEvent is a wrapper for per-connection event

type LogSlotRangeError

type LogSlotRangeError struct{}

LogSlotRangeError is logged when no host were able to respond to CLUSTER SLOTS.

type Logger

type Logger interface {
	// Report will be called when some events happens during cluster's lifetime.
	// Default implementation just prints this information using standard log package.
	Report(c *Cluster, event LogEvent)
	// ReqStat is called after request receives it's answer with request/result information
	// and time spend to fulfill request.
	// Default implementation is no-op.
	ReqStat(c *Cluster, conn *redisconn.Connection, req Request, res interface{}, nanos int64)
}

Logger is used for loggin cluster-related events and requests statistic.

type NoopLogger

type NoopLogger struct{}

NoopLogger implements Logger with no logging at all.

func (NoopLogger) Report

func (d NoopLogger) Report(conn *Cluster, event LogEvent)

Report implements Logger.Report

func (NoopLogger) ReqStat

func (d NoopLogger) ReqStat(c *Cluster, conn *redisconn.Connection, req Request, res interface{}, nanos int64)

ReqStat implements Logger.ReqStat

type Opts

type Opts struct {
	// HostOpts - per host options
	// Note that HostOpts.Handle will be overwritten to ClusterHandle{ cluster.opts.Handle, conn.address}
	HostOpts redisconn.Opts
	// ConnsPerHost - how many connections are established to each host
	// if ConnsPerHost < 1 then ConnsPerHost = 2
	ConnsPerHost int
	// ConnHostPolicy - either prefer to send to first connection until it is disconnected, or
	//					send to all connections in round robin maner.
	// default: ConnHostPreferFirst
	ConnHostPolicy ConnHostPolicyEnum
	// Handle is returned with Cluster.Handle()
	// Also it is part of per-connection handle
	Handle interface{}
	// Name of a cluster.
	Name string
	// Check interval - default cluster configuration reloading interval
	// default: 5 seconds, min: 100 millisecond, max: 10 minutes
	// Note, that MOVE and ASK redis errors will force configuration reloading,
	// therefore there is not need to make it very frequent.
	CheckInterval time.Duration
	// MovedRetries - follow MOVED|ASK redirections this number of times
	// default: 3, min: 1, max: 10
	MovedRetries int
	// WaitToMigrate - wait this time if not all transaction keys were migrated
	// from one shard to another and then repeat transaction.
	// default: 20 millisecond, min: 100 microseconds, max: 100 milliseconds
	WaitToMigrate time.Duration
	// Logger used for logging cluster events and account request stats
	Logger Logger

	// RoundRobinSeed - used to choose between master and replica.
	RoundRobinSeed RoundRobinSeed
	// LatencyOrientedRR - when MasterAndSlaves is used, prefer hosts with lower latency
	LatencyOrientedRR bool
}

Opts is a options for Cluster

type PolicyMan

type PolicyMan struct {
	*Cluster
	// Policy is default policy for Send and SendMany
	Policy ReplicaPolicyEnum
}

PolicyMan wraps Cluster and change default policy for Send and SendMany methods. PolicyMan implements redis.Sender.

func (PolicyMan) Send

func (p PolicyMan) Send(req Request, cb Future, off uint64)

Send implements redis.Sender.Send It calls Cluster.SendWithPolicy with specified default policy.

func (PolicyMan) SendMany

func (p PolicyMan) SendMany(reqs []Request, cb Future, off uint64)

SendMany implements redis.Sender.SendMany It sends requests with specified default policy.

type ReplicaPolicyEnum

type ReplicaPolicyEnum int8

ReplicaPolicyEnum is config enumeration of policies of replica-set hosts usage.

const (
	// MasterOnly means request should be executed on master
	MasterOnly ReplicaPolicyEnum = iota
	// MasterAndSlaves means request could be executed on slave,
	// and every host in replica set has same probability for query execution.
	// Write requests still goes to master.
	MasterAndSlaves
	// PreferSlaves means request could be executed on slave,
	// but replica has 3 times more probability to handle request.
	// Write requests still goes to master.
	PreferSlaves
	// ForceMasterAndSlaves - override "writeness" of command and allow to send it to replica.
	// Since we could not analize Lua code, all "EVAL/EVALSHA" commands are considered as "writing".
	// Also, list of "readonly" commands is hardcoded, and could miss one you need.
	// In this case you may use one of ForceMasterAndSlaves, ForcePreferSlaves or ForceMasterWithFallback.
	ForceMasterAndSlaves
	// ForcePreferSlaves - overrides "writeness" of command. See ForceMasterAndSlaves for more description.
	ForcePreferSlaves
)

type Request

type Request = redis.Request

Request is an alias for redis.Request

type RoundRobinSeed

type RoundRobinSeed interface {
	// Current returns "deterministic random" value used for choosing replica.
	Current() uint32
}

RoundRobinSeed is the source of decision which replica to use for each particular request when replica-policy is MasterAndSlaves or PreferSlaves.

type Scanner

type Scanner struct {
	redis.ScannerBase
	// contains filtered or unexported fields
}

Scanner is an implementation of redis.Scanner.

If it were called for SCAN command, it will iterate through all shards.

func (*Scanner) Next

func (s *Scanner) Next(cb redis.Future)

Next implements redis.Scanner.Next Under the hood, it will scan each shard one after another.

type TimedRoundRobinSeed

type TimedRoundRobinSeed struct {
	// contains filtered or unexported fields
}

TimedRoundRobinSeed is implementation of RoundRobinSeed. It runs goroutine which periodically stores new random value, and returns this value between this updates. It improves pipeline efficiency, and it is used as default implementation.

func DefaultRoundRobinSeed

func DefaultRoundRobinSeed() *TimedRoundRobinSeed

DefaultRoundRobinSeed returns singleton of TimedRoundRobinSeed with random interval between 45ms and 100ms.

func NewTimedRoundRobinSeed

func NewTimedRoundRobinSeed(interval time.Duration) *TimedRoundRobinSeed

NewTimedRoundRobinSeed returns TimedRoundRobinSeed which updates its value every `interval`.

func (*TimedRoundRobinSeed) Current

func (rr *TimedRoundRobinSeed) Current() uint32

Current is implementation of RoundRobinSeed.Current. It returns same value during `interval` period.

func (*TimedRoundRobinSeed) Stop

func (rr *TimedRoundRobinSeed) Stop()

Stop signals value changing goroutine to quit.

Directories

Path Synopsis
Package redisclusterutil implements some protocol level details of cluster specification.
Package redisclusterutil implements some protocol level details of cluster specification.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL