lib

package
v0.0.0-...-4a2d37b Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 20, 2024 License: Apache-2.0 Imports: 37 Imported by: 0

Documentation

Index

Constants

View Source
const (
	EvtTypeTAF     = "TAF"
	EvtNameTAFTmo  = "TMO"
	EvtNameTAFOra  = "ORA_"
	EvtNAmeTafBklg = "BKLG"

	EvtTypeSharding           = "SHARDING"
	EvtTypeMux                = "HERAMUX"
	EvtNameBadShardID         = "bad_shard_id"
	EvtNameUnkKey             = "unknown_key_name"
	EvtNameShardIDAndKey      = "shard_id_shard_key_coexist"
	EvtNameMultiShard         = "multi_shard_key_values"
	EvtNameSetShardIDInTxn    = "set_shard_id_in_txn"
	EvtNameAutodiscSetShardID = "autodiscover_while_set_shard_id"
	EvtNameScuttleMkdR        = "scuttle_mark_down_r"
	EvtNameScuttleMkdW        = "scuttle_mark_down_w"
	EvtNameXKeysTxn           = "cross_keys_txn"
	EvtNameXShardsTxn         = "cross_shards_txn"
	EvtNameNoShardKey         = "shard_key_not_found"
	EvtNameBadShardKey        = "shard_key_bad_value"
	EvtNameWhitelist          = "db_whitelist"
	EvtNameShardKeyAutodisc   = "shard_key_auto_discovery"
	EvtNameBadMapping         = "bad_mapping"
)

CAL constants

View Source
const (
	ShardMapRecordFlagsNotFound     = 0x0020
	ShardMapRecordFlagsBadLogical   = 0x0010
	ShardMapRecordFlagsReadStatusN  = 0x0008
	ShardMapRecordFlagsWriteStatusN = 0x0002
	ShatdMapRecordFlagsWhitelist    = 0x0001
)

Shard map configuration

View Source
const (
	ConfigMaxWorkers   = "max_connections"
	ConfigDatabaseType = "database_type"
)

Configuration entry names

View Source
const (
	Oracle dbtype = iota
	MySQL
	POSTGRES
)

Database typoe constants

View Source
const (
	//
	// following are connection states reported by proxies
	//
	Assign       ConnState = 0 // count by proxy
	Idle                   = 1 // count by proxy
	Backlog                = 2 // count by shard and proxy
	Stranded               = 3 // count by shard and proxy
	Close                  = 4 // this one is not reported, but used to track connection.
	MaxConnState           = 5 // count of connection states
)

ConnState constants

View Source
const (
	WorkerStateEvt = iota
	ConnStateEvt
	WorkerResizeEvt
	StateEventTypeSize
)

StateEventType constants

View Source
const MaxRacID = 16

MaxRacID is the maximum number of racs supported

View Source
const (
	MaxWorkerState = 7
)

constants for HeraWorkerStatus

View Source
const (
	SrcPrefixAppKey string = "srcPrefixApp"
)

Variables

View Source
var (
	ErrClientFail = errors.New("Client error")
	ErrWorkerFail = errors.New("Worker error")
	ErrTimeout    = errors.New("Timeout")
	ErrCanceled   = errors.New("Canceled")
)

Errors returned to the main loop for the connection

View Source
var (
	ErrBklgTimeout,
	ErrSaturationKill,
	ErrCrossShardDML,
	ErrBadShardID,

	ErrChangeShardIDInTxn,
	ErrScuttleMarkdownR,
	ErrScuttleMarkdownW,
	ErrBklgEviction,
	ErrRejectDbDown,
	ErrSaturationSoftSQLEviction,
	ErrBindThrottle,
	ErrBindEviction,
	ErrNoShardKey,
	ErrNoShardValue,
	ErrAutodiscoverWhileSetShardID,
	ErrNoScuttleIdPredicate,
	ErrCrossKeysDML,
	ErrQueryBindBlocker,
	ErrOther,
	ErrReqParseFail error
)

Errors returned to the client

View Source
var (
	ErrDML = errors.New("DML not allowed")
)

Errors

View Source
var FindTnsCacheData map[string]string
View Source
var FindTnsCacheTime *time.Time
View Source
var (
	Seed = uint32(0x183d1db4)
)

The seed

View Source
var StateNames = [MaxWorkerState + MaxConnState]string{
	"init", "acpt", "wait", "busy", "schd", "fnsh", "quce", "asgn", "idle", "bklg", "strd", "cls"}

StateNames contains names used to print header line. first 7 worker states, rest proxy connection states

View Source
var TryLockContentionError error

TryLockContentionError the error for lock contention

Functions

func CfgFromTns

func CfgFromTns(name string)

name hera-winky-batch state log prefix hera

func CheckEnableProfiling

func CheckEnableProfiling()

CheckEnableProfiling check if "enable_profile" is true in config and enables the profiling:

  • 6060 port is open to stats via http: <hostname>:6060/debug/pprof/
  • 3030 port is open via telnet to manually start and stop CPU profile. For example, before starting some test, write "s cpu.prof" and after finishing the tests you do "e", which will create the profile dump "cpu.prof" (or whatever name via "s" command"). cpu.prof can then be read via the pprof tool

func CheckErrAndShutdown

func CheckErrAndShutdown(err error, msg string) bool

CheckErrAndShutdown if error then it logs it and starts the shutdown

func CheckOpsConfigChange

func CheckOpsConfigChange()

CheckOpsConfigChange checks if the ops config file needs to be reloaded and reloads it if necessary. it is called every several seconds from a dedicated go-routine.

func DebugString

func DebugString(data []byte) string

DebugString truncates the string if it is larger than 200 bytes

func ExtractSQLHash

func ExtractSQLHash(request *netstring.Netstring) (uint32, bool)

ExtractSQLHash parse request to see if it has an embedded PREPARE statement. if there is one, compute and return the sqlhash and true. otherwise, return 0 and false

func FindTns

func FindTns() (map[string]string, error)

func FullShutdown

func FullShutdown()

FullShutdown kills the parent process if it is named watchdog and exits the process

func GetIdleTimeoutMs

func GetIdleTimeoutMs() int

GetIdleTimeoutMs gets the idle timeout for the client connections. A client connection is terminated if it is idle for more that this

func GetMaxLifespanPerChild

func GetMaxLifespanPerChild() uint32

GetMaxLifespanPerChild returns how much time a worker process is allowed to run. After this time, a worker is killed and a new one is restarted

func GetMaxRequestsPerChild

func GetMaxRequestsPerChild() uint32

GetMaxRequestsPerChild is similar to GetMaxLifespanPerChild, it returns how many requests a worker will server, before is re-started.

func GetNumRWorkers

func GetNumRWorkers(shard int) int

GetNumRWorkers gets the number of workers for the "Read" pool

func GetNumWWorkers

func GetNumWWorkers(shard int) int

GetNumWWorkers gets the number of workers for the "Write" pool

func GetNumWorkers

func GetNumWorkers(shard int) int

GetNumWorkers gets the number of children for a shard.

func GetSatRecoverFreqMs

func GetSatRecoverFreqMs(shard int) int

GetSatRecoverFreqMs gets the saturation recover frequency in milliseconds from ops config

func GetSatRecoverThresholdMs

func GetSatRecoverThresholdMs() uint32

GetSatRecoverThresholdMs gets the saturation recover threshold in milliseconds from ops config

func GetSatRecoverThrottleCnt

func GetSatRecoverThrottleCnt(shard int) int

GetSatRecoverThrottleCnt gets the saturation recover throttle count

func GetSatRecoverThrottleRate

func GetSatRecoverThrottleRate() uint32

GetSatRecoverThrottleRate gets the saturation recover throttle rate from ops config

func GetTrIdleTimeoutMs

func GetTrIdleTimeoutMs() int

GetTrIdleTimeoutMs gets the idle timeout for the client connections when they are in a transaction. A client connection is terminated if it is idle for more that this

func GetWhiteListChildCount

func GetWhiteListChildCount(shard int) int

GetWhiteListChildCount gets the number of whitelist children for a shard

func GoStats

func GoStats()

GoStats runs in a goroutine and dumps every second stats from /proc<pid>/stat

func HandleConnection

func HandleConnection(conn net.Conn)

HandleConnection runs as a go routine handling a client connection. It creates the coordinator go-routine and the one way channel to communicate with the coordinator. Then it sits in a loop for the life of the connection reading data from the connection. Once a complete netstring is read, the netstring object (which can contain nested sub-netstrings) is passed on to the coordinator for processing

func IPAddrStr

func IPAddrStr(address net.Addr) string

IPAddrStr stringifies the IP address

func InitConfig

func InitConfig() error

InitConfig initializes the configuration, both the static configuration (from hera.txt) and the dynamic configuration

func InitQueryBindBlocker

func InitQueryBindBlocker(modName string)

func InitRacMaint

func InitRacMaint(cmdLineModuleName string)

InitRacMaint initializes RAC maintenance, if enabled, by starting one goroutine racMaintMain per shard

func InitShardingCfg

func InitShardingCfg() error

InitShardingCfg initializes the sharding config. If sharding and using shard map is enabled InitShardingCfg runs a go-routine which loads shard map configuration periodically

func InitTAF

func InitTAF(shards int)

InitTAF initializes the TAF structure

func IsPidRunning

func IsPidRunning(pid int) (isRunning bool)

IsPidRunning checks to see if pid still associates to a running process

func MkErr

func MkErr(prefix string)

Initializes error strings with a prefix like "HERA" HERA-100, HERA-101..

func Murmur3

func Murmur3(key []byte) (hash uint32)

Murmur3 calculates the hash

func NetstringFromBytes

func NetstringFromBytes(data []byte) (*netstring.Netstring, error)

NetstringFromBytes creates a netstring containing data as payload.

func NormalizeBindName

func NormalizeBindName(bindName0 string) string

func ParseBool

func ParseBool(str string) (value bool, err error)

ParseBool returns true of the string is one of "1", "t", "T", "true", "TRUE", "True", "y", "Y" and false if it is one of "0", "f", "F", "false", "FALSE", "False", "n", "N"

func RegisterLoopDriver

func RegisterLoopDriver(f ConnHandlerFunc)

RegisterLoopDriver installs the callback for the loop driver

func Run

func Run()

Run is practically the main function of the mux. It performs various the intializations, spawns server.Run - the "infinite loop" as a goroutine and waits on the worker broker channel for the signal to exit

func WriteAll

func WriteAll(w io.Writer, data []byte) error

WriteAll writes data in a loop until all is sent TODO: is this needed? isn't net.Conn.Write() all or nothing?

Types

type BindCount

type BindCount struct {
	Sqlhash uint32
	Name    string
	Value   string                   // should be long enough >=6-9 to avoid status fields
	Workers map[string]*WorkerClient // lookup by ticket
}

used in doBindEviction() to find hot bind values

type BindEvict

type BindEvict struct {
	// evicted binds get throttled to have overall steady state during bad bind queries
	// nested map uses sqlhash "bindName|bindValue"
	BindThrottle map[uint32]map[string]*BindThrottle
	// contains filtered or unexported fields
}

func GetBindEvict

func GetBindEvict() *BindEvict

func (*BindEvict) Copy

func (this *BindEvict) Copy() *BindEvict

func (*BindEvict) ShouldBlock

func (be *BindEvict) ShouldBlock(sqlhash uint32, bindKV map[string]string, heavyUsage bool) (bool, *BindThrottle)

type BindPair

type BindPair struct {
	// contains filtered or unexported fields
}

type BindThrottle

type BindThrottle struct {
	Name             string
	Value            string
	Sqlhash          uint32
	RecentAttempt    atomic.Value // time.Time
	AllowEveryX      int
	AllowEveryXCount int
}

type BouncerReasonCode

type BouncerReasonCode int

BouncerReasonCode defines the possible reason for bouncing

const (
	BRCUnknown BouncerReasonCode = iota
	BRCNoActiveWorker
	BRCNoWorkerCapacity
	BRCConnectionLimit
)

BouncerReasonCode constants

type Config

type Config struct {
	CertChainFile   string
	KeyFile         string // leave blank for no SSL
	Port            int
	ChildExecutable string
	//
	// worker sizing
	//
	NumStdbyDbs        int
	InitialMaxChildren int
	ReadonlyPct        int
	//
	// backlog
	//
	BacklogPct                  int
	BacklogTimeoutMsec          int
	BacklogTimeoutUnit          int64
	ShortBacklogTimeoutMsec     int
	SoftEvictionEffectiveTimeMs int
	SoftEvictionProbability     int
	BindEvictionTargetConnPct   int
	BindEvictionThresholdPct    int
	BindEvictionDecrPerSec      float64
	BindEvictionNames           string
	BindEvictionMaxThrottle     int
	SkipEvictRegex              string
	EvictRegex                  string
	//
	//
	//
	BouncerEnabled bool
	//
	// second
	//
	BouncerStartupDelay int
	//
	// millisecond
	//
	BouncerPollInterval int
	//
	// config_reload_time_ms(30 * 1000)
	//
	ConfigReloadTimeMs int
	// custom_auth_timeout(1000)
	CustomAuthTimeoutMs int
	// time_skew_threshold_warn(2)
	TimeSkewThresholdWarnSec int
	// time_skew_threshold_error(15)
	TimeSkewThresholdErrorSec int
	// max_stranded_time_interval(2000)
	StrandedWorkerTimeoutMs         int
	HighLoadStrandedWorkerTimeoutMs int
	HighLoadSkipInitiateRecoverPct  int
	HighLoadPct                     int
	InitLimitPct                    int

	// the worker scheduler policy
	LifoScheduler bool

	//
	// @TODO need a function for cdb boolean
	//
	DatabaseType              dbtype
	EnableSharding            bool
	UseShardMap               bool
	NumOfShards               int
	ShardKeyName              string
	MaxScuttleBuckets         int
	ScuttleColName            string
	ShardingAlgoHash          bool
	ShardKeyValueTypeIsString bool

	EnableWhitelistTest       bool
	NumWhitelistChildren      int
	ShardingPostfix           string
	ShardingCfgReloadInterval int

	HostnamePrefix       map[string]string
	ShardingCrossKeysErr bool

	CfgFromTns                  bool
	CfgFromTnsOverrideNumShards int // -1 no-override
	CfgFromTnsOverrideTaf       int // -1 no-override, 0 override-false, 1 override-true
	CfgFromTnsOverrideRWSplit   int // -1 no-override, readChildPct

	//
	// statelog printing interval (in sec)
	//
	StateLogInterval int

	// flag to enable CLIENT_INFO to worker
	EnableCmdClientInfoToWorker bool

	// if TAF is enabled
	EnableTAF bool
	// Timeout for a query to run on the primary, before fallback to secondary
	TAFTimeoutMs uint32

	// for adaptive timeouts, how long a window to try to keep
	TAFBinDuration       int
	TAFAllowSlowEveryX   int
	TAFNormallySlowCount int

	// for testing, enabling profile
	EnableProfile     bool
	ProfileHTTPPort   string
	ProfileTelnetPort string
	// to use OpenSSL (for testing) or crypto/tls
	UseOpenSSL bool

	ErrorCodePrefix       string
	StateLogPrefix        string
	ManagementTablePrefix string
	// RAC maint reload config interval
	RacMaintReloadInterval int
	// worker restarts are spread over this window
	RacRestartWindow int

	MuxPidFile string

	EnableConnLimitCheck         bool
	EnableQueryBindBlocker       bool
	QueryBindBlockerMinSqlPrefix int

	// taf testing
	TestingEnableDMLTaf bool

	//
	// enable background goroutine to recover worker not returned by coordinator
	//
	EnableDanglingWorkerRecovery bool

	GoStatsInterval int
	RandomStartMs   int

	// The max number of database connections to be established per second
	MaxDbConnectsPerSec int

	// Max desired percentage of healthy workers for the worker pool
	MaxDesiredHealthyWorkerPct int
	// contains filtered or unexported fields
}

The Config contains all the static configuration

func GetConfig

func GetConfig() *Config

GetConfig returns the application config

func (*Config) GetBacklogLimit

func (cfg *Config) GetBacklogLimit(wtype HeraWorkerType, shard int) int

GetBacklogLimit returns the limit for the number of backlogged workers for a certain pool and shard.

func (*Config) NumWorkersCh

func (cfg *Config) NumWorkersCh() <-chan int

NumWorkersCh returns the channel where number of workers change is sent

type ConnHandlerFunc

type ConnHandlerFunc func(net.Conn)

ConnHandlerFunc defines the signature of a fucntion that can be used as a callback by the loop driver

type ConnState

type ConnState int

ConnState is a possible state

type ConnStateInfo

type ConnStateInfo struct {
	// contains filtered or unexported fields
}

ConnStateInfo is a container holding connection state information

type Coordinator

type Coordinator struct {
	// contains filtered or unexported fields
}

Coordinator is the entity managing a client session. It receives commands from the client connection and allocates one or more workers to execute the request. After the request is completed it will free the worker(s) if they are not needed. It is possible that a request will start a transaction, in which case the worker will staty allocated until the transaction is completed (with COMMIT) or canceled (with ROLLBACK)

func NewCoordinator

func NewCoordinator(ctx context.Context, clientchannel <-chan *netstring.Netstring, conn net.Conn) *Coordinator

NewCoordinator creates a coordinator, clientchannel is used to read the requests, conn is used to write responses

func (*Coordinator) DispatchTAFSession

func (crd *Coordinator) DispatchTAFSession(request *netstring.Netstring) error

DispatchTAFSession starts running a session, which is a series of netstring.Netstrings executed by the same resource. Session is completed when the worker sends EOR free, for example after a commit, a rollback or as part of end-of-data if the request was a select+fetch When the primary database is OK all the requests go to it. When a request to the primary fails, (which can be because of a configured timeout or because of some ORA error) we remember into a structure TAF. A measure of the "health" for the primary database is how many queries are failing versus the total - see taf.go for details. When the primary database health decreases, we start sending the some requests directly to the fallback database. If the primary is completely down, we still send 1% of requests to the primary, as a "health check", so that we can c ompletely switch to the primary when the primary eventualy comes back up

func (*Coordinator) Done

func (crd *Coordinator) Done() <-chan int

Done returns the channel used when the coordinator is done

func (*Coordinator) PreprocessQueryBindBlocker

func (crd *Coordinator) PreprocessQueryBindBlocker(requests []*netstring.Netstring) (bool, string)

func (*Coordinator) PreprocessSharding

func (crd *Coordinator) PreprocessSharding(requests []*netstring.Netstring) (bool, error)

PreprocessSharding is doing shard info calculation and validation checks (by calling verifyValidShard) before determining if the current request should continue, returning nil error if the request should be allowed. If error is not nil, the second parameter says if the coordinator should hangup the client connection. The decision to hang-up or not in case of error is based on backward compatibility

func (*Coordinator) Run

func (crd *Coordinator) Run()

Run is designed to be hosted by a constantly running goroutine for the duration of a client connection. client requests are picked off through clientchannel one at a time and handed over to DispatchSession. A session contains a collection of client netstrings within the same transactional context. it starts from a "fresh" client request and ends at a client request receiving an worker EOR with intrans=false, at which point the next client request is the start of another "fresh" client request. a simple session may consist of a single client netstring command (e.g. a read query). a more complicated session may span over several client netstring commands (e.g. a update netstring followed by a commit/rollback netstring). when processing session with multiple client netstring commands, Run() reads off the first netstring command to start a session, while the subsequent netstring commands within the same session are read off by corresponding session dispatching functions. as of now, dispatchDefaultSession is the only such dispatching function allowed to handle session with multiple netstring commands. Caching/Routing sessions contains read queries with no open cursor, where multiple client netsting commands in those sessions are treated as client protocol errors during the lifecyle of a coordinator, multiple sessions could be processed, each ends on an eor free from the worker. at the end of each session, flow control is returned back to Run(), and the next client request is parsed again before dispatching

type HandlerFunc

type HandlerFunc func(net.Conn)

HandlerFunc defines the signature of the callback to handle the connection

type HeraWorkerStatus

type HeraWorkerStatus int

HeraWorkerStatus defines the posible states the worker can be in

type HeraWorkerType

type HeraWorkerType int

HeraWorkerType defines the possible worker type

type Listener

type Listener interface {
	// Accept waits for and returns the next connection to the listener.
	Accept() (net.Conn, error)

	// Initialize the connection
	Init(net.Conn) (net.Conn, error)

	// Close closes the listener.
	// Any blocked Accept operations will be unblocked and return errors.
	Close() error
}

Listener interface is used by the server to accept connections

func NewTCPListener

func NewTCPListener(service string) Listener

NewTCPListener creates a Listener attached to the address "service". It is a wrapper over net.Listener

func NewTLSListener

func NewTLSListener(service string) Listener

NewTLSListener creates the TLS listener

type LockTimeout

type LockTimeout struct {
	// contains filtered or unexported fields
}

LockTimeout is a non-blocking best-effort mutex

func (*LockTimeout) TryLock

func (lt *LockTimeout) TryLock() int

TryLock is attempting to acquire the lock Returns 1 if lock acquired, 0 if lock no available

func (*LockTimeout) Unlock

func (lt *LockTimeout) Unlock()

Unlock releases the lock

type OpsConfig

type OpsConfig struct {
	// contains filtered or unexported fields
}

The OpsConfig contains the configuration that can be modified during run time

type QueryBindBlockerCfg

type QueryBindBlockerCfg struct {
	// lookup by sqlhash
	// then by bind name, then by bind value
	BySqlHash map[uint32]map[string]map[string][]QueryBindBlockerEntry
}

func GetQueryBindBlockerCfg

func GetQueryBindBlockerCfg() *QueryBindBlockerCfg

func (*QueryBindBlockerCfg) IsBlocked

func (cfg *QueryBindBlockerCfg) IsBlocked(sqltext string, bindPairs []string) (bool, string)

type QueryBindBlockerEntry

type QueryBindBlockerEntry struct {
	Herasqlhash  uint32
	Herasqltext  string // prefix since some sql is too long
	Bindvarname  string // prefix for in clause
	Bindvarvalue string // when set to "BLOCKALLVALUES" should block all sqltext queries
	Blockperc    int
	Heramodule   string
}

type Queue

type Queue interface {
	// Len function tells how many elements are
	Len() int
	// Push adds an element to the queue, at the end
	Push(el interface{}) bool
	// PushFront adds an element to the queue, at the front - basically making this a stack
	PushFront(el interface{}) bool
	// Poll poss an element from the queue
	Poll() interface{}
	// Remove removes the element having the given value
	Remove(el interface{}) bool
	// ForEachRemove walks the entire list removeing elements satisfying the condition
	ForEachRemove(f func(interface{}) bool) int
}

Queue is interface for a queue implementation

func NewQueue

func NewQueue() Queue

NewQueue creates a queue

type Server

type Server interface {
	Run()
}

Server contains the Run method which is the infinite loop

func NewServer

func NewServer(lsn Listener, f HandlerFunc) Server

NewServer creates a server from the Lister and the function handling the connections accepted

type ShardMapRecord

type ShardMapRecord struct {
	// contains filtered or unexported fields
}

ShardMapRecord is the mapping between a physical bin to a logical shard

type ShardingCfg

type ShardingCfg struct {
	// contains filtered or unexported fields
}

ShardingCfg is an array of 1024 ShardMapRecord

func GetShardingCfg

func GetShardingCfg() *ShardingCfg

GetShardingCfg atomically get the sharding config

type StateEvent

type StateEvent struct {
	// contains filtered or unexported fields
}

StateEvent keeps the state information

type StateEventType

type StateEventType int

StateEventType is an event published by proxy when state changes.

type StateLog

type StateLog struct {
	// contains filtered or unexported fields
}

StateLog is exposed as a singleton. all stateful resources are protected behind a message channel that sychronizes incoming messages. user should not call any of the internal functions that are not threadsafe.

func GetStateLog

func GetStateLog() *StateLog

GetStateLog gets the state log object

func (*StateLog) GetStartTime

func (sl *StateLog) GetStartTime() int64

GetStartTime gets the app start time

func (*StateLog) GetStrandedWorkerCountForPool

func (sl *StateLog) GetStrandedWorkerCountForPool(shardID int, wType HeraWorkerType, instID int) int

GetStrandedWorkerCountForPool is a best effort function to get the count of backlog in a worker pool without thread locking.

func (*StateLog) GetTotalConnections

func (sl *StateLog) GetTotalConnections() int

GetTotalConnections is a best effort, without thread locking, to give the total number of connections

func (*StateLog) GetWorkerCountForPool

func (sl *StateLog) GetWorkerCountForPool(workerState HeraWorkerStatus, shardID int, wType HeraWorkerType, instID int) int

func (*StateLog) HasActiveWorker

func (sl *StateLog) HasActiveWorker() bool

HasActiveWorker is a best effort, without thread locking, telling if at least a worker is active

func (*StateLog) ProxyHasCapacity

func (sl *StateLog) ProxyHasCapacity(_wlimit int, _rlimit int) (bool, int)

ProxyHasCapacity checks if there is enough capacity

func (*StateLog) PublishStateEvent

func (sl *StateLog) PublishStateEvent(_evt StateEvent) error

PublishStateEvent sends the event to the channel, so it will be processed by the state log routine

func (*StateLog) SetStartTime

func (sl *StateLog) SetStartTime(t time.Time)

SetStartTime emits the CAL event

type TAF

type TAF interface {
	// return true if the request should use the primary db, false to use the fallback
	UsePrimary() bool
	// For logging, gets the internal PCT field, which is a measure of the health of the primary database
	GetPct() int
	// To be called by coordinator if the request was ok
	NotifyOK()
	// To be called by coordinator if the request timed out failed with an ORA error
	NotifyError()
}

TAF keeps a statistic of errors and successes and tells which database whould be used for the next request

func GetTAF

func GetTAF(shard int) TAF

GetTAF returns the TAF entry for the shard

type TafQueries

type TafQueries struct {
	CountNormallyFast int64
	// contains filtered or unexported fields
}

TafQueries keeps for each SQL statistics if the query timed out. This is in order to decide if a query is "naturaly slow" so it can be ignored by taf

func GetTafQueries

func GetTafQueries(shardID int) *TafQueries

GetTafQueries returns the taf queries statistics

func (*TafQueries) IsNormallySlow

func (tq *TafQueries) IsNormallySlow(sqlhash int32) (bool, error)

IsNormallySlow looks into the stats for the query whose has is sqlhash and returns if the query is normaly slow

func (*TafQueries) RecordTimeout

func (tq *TafQueries) RecordTimeout(sqlhash int32) (bool, error)

RecordTimeout update the statistic for the query whose hash is sqlhash, when the query timed out

type TafQueryRuns

type TafQueryRuns struct {
	// contains filtered or unexported fields
}

TafQueryRuns store one run

type Throttler

type Throttler interface {
	CanRun()
}

Throttler is used to throttle some activity based on a rate

func NewThrottler

func NewThrottler(max uint32, name string) Throttler

NewThrottler creates a throttler

type WLCfg

type WLCfg struct {
	// contains filtered or unexported fields
}

WLCfg keeps the whitelist configuration keys should be int64 or string

func GetWLCfg

func GetWLCfg() *WLCfg

GetWLCfg atomically get whitelist config

type WorkerBroker

type WorkerBroker struct {
	// contains filtered or unexported fields
}

WorkerBroker is managing the workers, starting the worker pools, and restarting workers when needed

func GetWorkerBrokerInstance

func GetWorkerBrokerInstance() *WorkerBroker

GetWorkerBrokerInstance returns the singleton broker instance where different request handler goroutines can use to get a free worker

func (*WorkerBroker) AddPidToWorkermap

func (broker *WorkerBroker) AddPidToWorkermap(worker *WorkerClient, pid int)

AddPidToWorkermap add the worker to the map pid -> worker

func (*WorkerBroker) GetWorkerPool

func (broker *WorkerBroker) GetWorkerPool(wType HeraWorkerType, ids ...int) (workerbroker *WorkerPool, err error)

GetWorkerPool get the worker pool object for the type and id ids holds optional paramenters.

ids[0] == instance id; ids[1] == shard id.

if a particular id is not set, it defaults to 0. TODO: interchange sid <--> instId since instId is not yet used

func (*WorkerBroker) GetWorkerPoolCfgs

func (broker *WorkerBroker) GetWorkerPoolCfgs() (pCfgs []map[HeraWorkerType]*WorkerPoolCfg)

GetWorkerPoolCfgs returns the worker pool configuration

func (*WorkerBroker) RestartWorkerPool

func (broker *WorkerBroker) RestartWorkerPool(_moduleName string) error

RestartWorkerPool (re)starts all the worker pools workerpool.init calls statelog.init, which in turn calls back GetWorkerBrokerInstance this causes a deadlock during workerbroker initialization since golang lock is not reentrant. taking out workerpool.init from broker.init and calling it separately.

func (*WorkerBroker) Stopped

func (broker *WorkerBroker) Stopped() <-chan struct{}

Stopped is called when we are done, it sends a message to the "stopped" channel, which is read by the main mux routine

type WorkerClient

type WorkerClient struct {
	ID     int              // the worker identifier, from 0 to max worker count
	Type   HeraWorkerType   // the type of worker (ex write, read); all workers from the same type are grouped in a pool
	Status HeraWorkerStatus // the worker state, like init, accept, etc
	// contains filtered or unexported fields
}

WorkerClient represents a worker process

func NewWorker

func NewWorker(wid int, wType HeraWorkerType, instID int, shardID int, moduleName string, thr Throttler) *WorkerClient

NewWorker creates a new workerclient instance (pointer)

func (*WorkerClient) AttachToWorker

func (worker *WorkerClient) AttachToWorker() (err error)

AttachToWorker is called immediately after a worker process was created, it is a wrapper over the function doing the initialization work - attachToWorker. In case attachToWorker fails it does the cleanup.

func (*WorkerClient) Close

func (worker *WorkerClient) Close()

Close close the connections to the worker

func (*WorkerClient) DrainResponseChannel

func (worker *WorkerClient) DrainResponseChannel(sleep time.Duration)

DrainResponseChannel removes any messages that might be in the channel. This is used when the worker is recovered.

func (*WorkerClient) Recover

func (worker *WorkerClient) Recover(p *WorkerPool, ticket string, recovParam WorkerClientRecoverParam, info *strandedCalInfo, param ...int)

Recover interrupts a worker busy executing a request, usually because a client went away. It sends a break to the worker and expect the worker to respond with EOR free. If worker is not free-ing in two seconds, the worker is stopped with SIGKILL

func (*WorkerClient) StartWorker

func (worker *WorkerClient) StartWorker() (err error)

StartWorker fork exec a new worker. Note: the routine is not "reentrant", it changes the global environment. For now is fine.

func (*WorkerClient) Terminate

func (worker *WorkerClient) Terminate() error

Terminate sends SIGTERM to worker first (allow worker to gracefully shutdown) wait for 2000 ms before sending SIGKILL if necessary. Note: this function will block ~ 100 - 2000 ms

func (*WorkerClient) Write

func (worker *WorkerClient) Write(ns *netstring.Netstring, nsCount uint16) error

Write sends a message to the worker

type WorkerClientRecoverParam

type WorkerClientRecoverParam struct {
	// contains filtered or unexported fields
}

type WorkerPool

type WorkerPool struct {
	ShardID int            // the shard the workers are connected to
	Type    HeraWorkerType // the worker type like write, read
	InstID  int
	// contains filtered or unexported fields
}

WorkerPool represents a pool of workers of the same kind the implementation uses a C++-ish mutex/condition variable/queue rather than a Golang-ish channel + timer because the policy for using the worker is LIFO (for better usage of cache) while the channels are FIFO

func (*WorkerPool) DecHealthyWorkers

func (pool *WorkerPool) DecHealthyWorkers()

DecHealthyWorkers called to decrement the number of workers conected to the database

func (*WorkerPool) GetHealthyWorkersCount

func (pool *WorkerPool) GetHealthyWorkersCount() int32

GetHealthyWorkersCount returns the number of workers conected to the database

func (*WorkerPool) GetWorker

func (pool *WorkerPool) GetWorker(sqlhash int32, timeoutMs ...int) (worker *WorkerClient, t string, err error)

GetWorker gets the active worker if available. backlog with timeout if not.

@param sqlhash to check for soft eviction against a blacklist of slow queries.

if getworker needs to exam the incoming sql, there does not seem to be another elegant
way to do this except to pass in the sqlhash as a parameter.

@param timeoutMs[0] timeout in milliseconds. default to adaptive queue timeout.

func (*WorkerPool) Healthy

func (pool *WorkerPool) Healthy() bool

Healthy checks if the number of workers connected to the database is greater than 20%

func (*WorkerPool) IncHealthyWorkers

func (pool *WorkerPool) IncHealthyWorkers()

IncHealthyWorkers called to increment the number of workers conected to the database

func (*WorkerPool) Init

func (pool *WorkerPool) Init(wType HeraWorkerType, size int, instID int, shardID int, moduleName string) error

Init creates the pool by creating the workers and making all the initializations

func (*WorkerPool) RacMaint

func (pool *WorkerPool) RacMaint(racReq racAct)

RacMaint is called when rac maintenance is needed. It marks the workers for restart, spreading to an interval in order to avoid connection storm to the database

func (*WorkerPool) Resize

func (pool *WorkerPool) Resize(newSize int)

Resize resize the worker pool when the corresponding dynamic configuration changed. When the size is increased, the increase is immediate by spawning the necessary number of new workers. When the size is decreased, it removes the workers whose id is bigger then the number of workers. If the workers to be removed are free, they are terminated immediately, otherwise the termination is delayed until the worker eventually calls ReturnWorker to make itself available

func (*WorkerPool) RestartWorker

func (pool *WorkerPool) RestartWorker(worker *WorkerClient) (err error)

RestartWorker is called after a worker exited to perform the necessary cleanup and re-start a new worker. In the rare situation where the pool need to be down-sized a new worker is not restarted.

func (*WorkerPool) ReturnWorker

func (pool *WorkerPool) ReturnWorker(worker *WorkerClient, ticket string) (err error)

ReturnWorker puts the worker into the list of available workers. It is called usually after a coordinator used it for requests and no longer needs it. If the pool is about to be downsize, the worker is instead terminated instead of being put in the available list. It the worker lifetime expired, the worker is instead terminated instead of being put in the available list.

func (*WorkerPool) WorkerReady

func (pool *WorkerPool) WorkerReady(worker *WorkerClient) (err error)

WorkerReady is called after the worker started and become available. It puts the worker into the internal list of workers as well as in the list of available workers

type WorkerPoolCfg

type WorkerPoolCfg struct {
	// contains filtered or unexported fields
}

WorkerPoolCfg is a configuration structure to keep setups for each type of worker pool

type WorkerStateInfo

type WorkerStateInfo struct {
	// contains filtered or unexported fields
}

WorkerStateInfo is a container holding worker state information

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL