Documentation ¶
Index ¶
- Constants
- Variables
- func CfgFromTns(name string)
- func CheckEnableProfiling()
- func CheckErrAndShutdown(err error, msg string) bool
- func CheckOpsConfigChange()
- func DebugString(data []byte) string
- func ExtractSQLHash(request *netstring.Netstring) (uint32, bool)
- func FindTns() (map[string]string, error)
- func FullShutdown()
- func GetIdleTimeoutMs() int
- func GetMaxLifespanPerChild() uint32
- func GetMaxRequestsPerChild() uint32
- func GetNumRWorkers(shard int) int
- func GetNumWWorkers(shard int) int
- func GetNumWorkers(shard int) int
- func GetSatRecoverFreqMs(shard int) int
- func GetSatRecoverThresholdMs() uint32
- func GetSatRecoverThrottleCnt(shard int) int
- func GetSatRecoverThrottleRate() uint32
- func GetTrIdleTimeoutMs() int
- func GetWhiteListChildCount(shard int) int
- func GoStats()
- func HandleConnection(conn net.Conn)
- func IPAddrStr(address net.Addr) string
- func InitConfig() error
- func InitQueryBindBlocker(modName string)
- func InitRacMaint(cmdLineModuleName string)
- func InitShardingCfg() error
- func InitTAF(shards int)
- func IsPidRunning(pid int) (isRunning bool)
- func MkErr(prefix string)
- func Murmur3(key []byte) (hash uint32)
- func NetstringFromBytes(data []byte) (*netstring.Netstring, error)
- func NormalizeBindName(bindName0 string) string
- func ParseBool(str string) (value bool, err error)
- func RegisterLoopDriver(f ConnHandlerFunc)
- func Run()
- func WriteAll(w io.Writer, data []byte) error
- type BindCount
- type BindEvict
- type BindPair
- type BindThrottle
- type BouncerReasonCode
- type Config
- type ConnHandlerFunc
- type ConnState
- type ConnStateInfo
- type Coordinator
- func (crd *Coordinator) DispatchTAFSession(request *netstring.Netstring) error
- func (crd *Coordinator) Done() <-chan int
- func (crd *Coordinator) PreprocessQueryBindBlocker(requests []*netstring.Netstring) (bool, string)
- func (crd *Coordinator) PreprocessSharding(requests []*netstring.Netstring) (bool, error)
- func (crd *Coordinator) Run()
- type HandlerFunc
- type HeraWorkerStatus
- type HeraWorkerType
- type Listener
- type LockTimeout
- type OpsConfig
- type QueryBindBlockerCfg
- type QueryBindBlockerEntry
- type Queue
- type Server
- type ShardMapRecord
- type ShardingCfg
- type StateEvent
- type StateEventType
- type StateLog
- func (sl *StateLog) GetStartTime() int64
- func (sl *StateLog) GetStrandedWorkerCountForPool(shardID int, wType HeraWorkerType, instID int) int
- func (sl *StateLog) GetTotalConnections() int
- func (sl *StateLog) GetWorkerCountForPool(workerState HeraWorkerStatus, shardID int, wType HeraWorkerType, instID int) int
- func (sl *StateLog) HasActiveWorker() bool
- func (sl *StateLog) ProxyHasCapacity(_wlimit int, _rlimit int) (bool, int)
- func (sl *StateLog) PublishStateEvent(_evt StateEvent) error
- func (sl *StateLog) SetStartTime(t time.Time)
- type TAF
- type TafQueries
- type TafQueryRuns
- type Throttler
- type WLCfg
- type WorkerBroker
- func (broker *WorkerBroker) AddPidToWorkermap(worker *WorkerClient, pid int)
- func (broker *WorkerBroker) GetWorkerPool(wType HeraWorkerType, ids ...int) (workerbroker *WorkerPool, err error)
- func (broker *WorkerBroker) GetWorkerPoolCfgs() (pCfgs []map[HeraWorkerType]*WorkerPoolCfg)
- func (broker *WorkerBroker) RestartWorkerPool(_moduleName string) error
- func (broker *WorkerBroker) Stopped() <-chan struct{}
- type WorkerClient
- func (worker *WorkerClient) AttachToWorker() (err error)
- func (worker *WorkerClient) Close()
- func (worker *WorkerClient) DrainResponseChannel(sleep time.Duration)
- func (worker *WorkerClient) Recover(p *WorkerPool, ticket string, recovParam WorkerClientRecoverParam, ...)
- func (worker *WorkerClient) StartWorker() (err error)
- func (worker *WorkerClient) Terminate() error
- func (worker *WorkerClient) Write(ns *netstring.Netstring, nsCount uint16) error
- type WorkerClientRecoverParam
- type WorkerPool
- func (pool *WorkerPool) DecHealthyWorkers()
- func (pool *WorkerPool) GetHealthyWorkersCount() int32
- func (pool *WorkerPool) GetWorker(sqlhash int32, timeoutMs ...int) (worker *WorkerClient, t string, err error)
- func (pool *WorkerPool) Healthy() bool
- func (pool *WorkerPool) IncHealthyWorkers()
- func (pool *WorkerPool) Init(wType HeraWorkerType, size int, instID int, shardID int, moduleName string) error
- func (pool *WorkerPool) RacMaint(racReq racAct)
- func (pool *WorkerPool) Resize(newSize int)
- func (pool *WorkerPool) RestartWorker(worker *WorkerClient) (err error)
- func (pool *WorkerPool) ReturnWorker(worker *WorkerClient, ticket string) (err error)
- func (pool *WorkerPool) WorkerReady(worker *WorkerClient) (err error)
- type WorkerPoolCfg
- type WorkerStateInfo
Constants ¶
const ( EvtTypeTAF = "TAF" EvtNameTAFTmo = "TMO" EvtNameTAFOra = "ORA_" EvtNAmeTafBklg = "BKLG" EvtTypeSharding = "SHARDING" EvtTypeMux = "HERAMUX" EvtNameBadShardID = "bad_shard_id" EvtNameUnkKey = "unknown_key_name" EvtNameShardIDAndKey = "shard_id_shard_key_coexist" EvtNameMultiShard = "multi_shard_key_values" EvtNameSetShardIDInTxn = "set_shard_id_in_txn" EvtNameAutodiscSetShardID = "autodiscover_while_set_shard_id" EvtNameScuttleMkdR = "scuttle_mark_down_r" EvtNameScuttleMkdW = "scuttle_mark_down_w" EvtNameXKeysTxn = "cross_keys_txn" EvtNameXShardsTxn = "cross_shards_txn" EvtNameNoShardKey = "shard_key_not_found" EvtNameBadShardKey = "shard_key_bad_value" EvtNameWhitelist = "db_whitelist" EvtNameShardKeyAutodisc = "shard_key_auto_discovery" EvtNameBadMapping = "bad_mapping" )
CAL constants
const ( ShardMapRecordFlagsNotFound = 0x0020 ShardMapRecordFlagsBadLogical = 0x0010 ShardMapRecordFlagsReadStatusN = 0x0008 ShardMapRecordFlagsWriteStatusN = 0x0002 ShatdMapRecordFlagsWhitelist = 0x0001 )
Shard map configuration
const ( ConfigMaxWorkers = "max_connections" ConfigDatabaseType = "database_type" )
Configuration entry names
const ( Oracle dbtype = iota MySQL POSTGRES )
Database typoe constants
const ( // // following are connection states reported by proxies // Assign ConnState = 0 // count by proxy Idle = 1 // count by proxy Backlog = 2 // count by shard and proxy Stranded = 3 // count by shard and proxy Close = 4 // this one is not reported, but used to track connection. MaxConnState = 5 // count of connection states )
ConnState constants
const ( WorkerStateEvt = iota ConnStateEvt WorkerResizeEvt StateEventTypeSize )
StateEventType constants
const MaxRacID = 16
MaxRacID is the maximum number of racs supported
const (
MaxWorkerState = 7
)
constants for HeraWorkerStatus
const (
SrcPrefixAppKey string = "srcPrefixApp"
)
Variables ¶
var ( ErrClientFail = errors.New("Client error") ErrWorkerFail = errors.New("Worker error") ErrTimeout = errors.New("Timeout") ErrCanceled = errors.New("Canceled") )
Errors returned to the main loop for the connection
var ( ErrBklgTimeout, ErrSaturationKill, ErrCrossShardDML, ErrBadShardID, ErrChangeShardIDInTxn, ErrScuttleMarkdownR, ErrScuttleMarkdownW, ErrBklgEviction, ErrRejectDbDown, ErrSaturationSoftSQLEviction, ErrBindThrottle, ErrBindEviction, ErrNoShardKey, ErrNoShardValue, ErrAutodiscoverWhileSetShardID, ErrNoScuttleIdPredicate, ErrCrossKeysDML, ErrQueryBindBlocker, ErrOther, ErrReqParseFail error )
Errors returned to the client
var (
ErrDML = errors.New("DML not allowed")
)
Errors
var FindTnsCacheData map[string]string
var FindTnsCacheTime *time.Time
var (
Seed = uint32(0x183d1db4)
)
The seed
var StateNames = [MaxWorkerState + MaxConnState]string{
"init", "acpt", "wait", "busy", "schd", "fnsh", "quce", "asgn", "idle", "bklg", "strd", "cls"}
StateNames contains names used to print header line. first 7 worker states, rest proxy connection states
var TryLockContentionError error
TryLockContentionError the error for lock contention
Functions ¶
func CheckEnableProfiling ¶
func CheckEnableProfiling()
CheckEnableProfiling check if "enable_profile" is true in config and enables the profiling:
- 6060 port is open to stats via http: <hostname>:6060/debug/pprof/
- 3030 port is open via telnet to manually start and stop CPU profile. For example, before starting some test, write "s cpu.prof" and after finishing the tests you do "e", which will create the profile dump "cpu.prof" (or whatever name via "s" command"). cpu.prof can then be read via the pprof tool
func CheckErrAndShutdown ¶
CheckErrAndShutdown if error then it logs it and starts the shutdown
func CheckOpsConfigChange ¶
func CheckOpsConfigChange()
CheckOpsConfigChange checks if the ops config file needs to be reloaded and reloads it if necessary. it is called every several seconds from a dedicated go-routine.
func DebugString ¶
DebugString truncates the string if it is larger than 200 bytes
func ExtractSQLHash ¶
ExtractSQLHash parse request to see if it has an embedded PREPARE statement. if there is one, compute and return the sqlhash and true. otherwise, return 0 and false
func FullShutdown ¶
func FullShutdown()
FullShutdown kills the parent process if it is named watchdog and exits the process
func GetIdleTimeoutMs ¶
func GetIdleTimeoutMs() int
GetIdleTimeoutMs gets the idle timeout for the client connections. A client connection is terminated if it is idle for more that this
func GetMaxLifespanPerChild ¶
func GetMaxLifespanPerChild() uint32
GetMaxLifespanPerChild returns how much time a worker process is allowed to run. After this time, a worker is killed and a new one is restarted
func GetMaxRequestsPerChild ¶
func GetMaxRequestsPerChild() uint32
GetMaxRequestsPerChild is similar to GetMaxLifespanPerChild, it returns how many requests a worker will server, before is re-started.
func GetNumRWorkers ¶
GetNumRWorkers gets the number of workers for the "Read" pool
func GetNumWWorkers ¶
GetNumWWorkers gets the number of workers for the "Write" pool
func GetNumWorkers ¶
GetNumWorkers gets the number of children for a shard.
func GetSatRecoverFreqMs ¶
GetSatRecoverFreqMs gets the saturation recover frequency in milliseconds from ops config
func GetSatRecoverThresholdMs ¶
func GetSatRecoverThresholdMs() uint32
GetSatRecoverThresholdMs gets the saturation recover threshold in milliseconds from ops config
func GetSatRecoverThrottleCnt ¶
GetSatRecoverThrottleCnt gets the saturation recover throttle count
func GetSatRecoverThrottleRate ¶
func GetSatRecoverThrottleRate() uint32
GetSatRecoverThrottleRate gets the saturation recover throttle rate from ops config
func GetTrIdleTimeoutMs ¶
func GetTrIdleTimeoutMs() int
GetTrIdleTimeoutMs gets the idle timeout for the client connections when they are in a transaction. A client connection is terminated if it is idle for more that this
func GetWhiteListChildCount ¶
GetWhiteListChildCount gets the number of whitelist children for a shard
func GoStats ¶
func GoStats()
GoStats runs in a goroutine and dumps every second stats from /proc<pid>/stat
func HandleConnection ¶
HandleConnection runs as a go routine handling a client connection. It creates the coordinator go-routine and the one way channel to communicate with the coordinator. Then it sits in a loop for the life of the connection reading data from the connection. Once a complete netstring is read, the netstring object (which can contain nested sub-netstrings) is passed on to the coordinator for processing
func InitConfig ¶
func InitConfig() error
InitConfig initializes the configuration, both the static configuration (from hera.txt) and the dynamic configuration
func InitQueryBindBlocker ¶
func InitQueryBindBlocker(modName string)
func InitRacMaint ¶
func InitRacMaint(cmdLineModuleName string)
InitRacMaint initializes RAC maintenance, if enabled, by starting one goroutine racMaintMain per shard
func InitShardingCfg ¶
func InitShardingCfg() error
InitShardingCfg initializes the sharding config. If sharding and using shard map is enabled InitShardingCfg runs a go-routine which loads shard map configuration periodically
func IsPidRunning ¶
IsPidRunning checks to see if pid still associates to a running process
func MkErr ¶
func MkErr(prefix string)
Initializes error strings with a prefix like "HERA" HERA-100, HERA-101..
func NetstringFromBytes ¶
NetstringFromBytes creates a netstring containing data as payload.
func NormalizeBindName ¶
func ParseBool ¶
ParseBool returns true of the string is one of "1", "t", "T", "true", "TRUE", "True", "y", "Y" and false if it is one of "0", "f", "F", "false", "FALSE", "False", "n", "N"
func RegisterLoopDriver ¶
func RegisterLoopDriver(f ConnHandlerFunc)
RegisterLoopDriver installs the callback for the loop driver
Types ¶
type BindCount ¶
type BindCount struct { Sqlhash uint32 Name string Value string // should be long enough >=6-9 to avoid status fields Workers map[string]*WorkerClient // lookup by ticket }
used in doBindEviction() to find hot bind values
type BindEvict ¶
type BindEvict struct { // evicted binds get throttled to have overall steady state during bad bind queries // nested map uses sqlhash "bindName|bindValue" BindThrottle map[uint32]map[string]*BindThrottle // contains filtered or unexported fields }
func GetBindEvict ¶
func GetBindEvict() *BindEvict
func (*BindEvict) ShouldBlock ¶
type BindThrottle ¶
type BouncerReasonCode ¶
type BouncerReasonCode int
BouncerReasonCode defines the possible reason for bouncing
const ( BRCUnknown BouncerReasonCode = iota BRCNoActiveWorker BRCNoWorkerCapacity BRCConnectionLimit )
BouncerReasonCode constants
type Config ¶
type Config struct { CertChainFile string KeyFile string // leave blank for no SSL Port int ChildExecutable string // // worker sizing // NumStdbyDbs int InitialMaxChildren int ReadonlyPct int // // backlog // BacklogPct int BacklogTimeoutMsec int BacklogTimeoutUnit int64 ShortBacklogTimeoutMsec int SoftEvictionEffectiveTimeMs int SoftEvictionProbability int BindEvictionTargetConnPct int BindEvictionThresholdPct int BindEvictionDecrPerSec float64 BindEvictionNames string BindEvictionMaxThrottle int SkipEvictRegex string EvictRegex string // // // BouncerEnabled bool // // second // BouncerStartupDelay int // // millisecond // BouncerPollInterval int // // config_reload_time_ms(30 * 1000) // ConfigReloadTimeMs int // custom_auth_timeout(1000) CustomAuthTimeoutMs int // time_skew_threshold_warn(2) TimeSkewThresholdWarnSec int // time_skew_threshold_error(15) TimeSkewThresholdErrorSec int // max_stranded_time_interval(2000) StrandedWorkerTimeoutMs int HighLoadStrandedWorkerTimeoutMs int HighLoadSkipInitiateRecoverPct int HighLoadPct int InitLimitPct int // the worker scheduler policy LifoScheduler bool // // @TODO need a function for cdb boolean // DatabaseType dbtype EnableSharding bool UseShardMap bool NumOfShards int ShardKeyName string MaxScuttleBuckets int ScuttleColName string ShardingAlgoHash bool ShardKeyValueTypeIsString bool EnableWhitelistTest bool NumWhitelistChildren int ShardingPostfix string ShardingCfgReloadInterval int HostnamePrefix map[string]string ShardingCrossKeysErr bool CfgFromTns bool CfgFromTnsOverrideNumShards int // -1 no-override CfgFromTnsOverrideTaf int // -1 no-override, 0 override-false, 1 override-true CfgFromTnsOverrideRWSplit int // -1 no-override, readChildPct // // statelog printing interval (in sec) // StateLogInterval int // flag to enable CLIENT_INFO to worker EnableCmdClientInfoToWorker bool // if TAF is enabled EnableTAF bool // Timeout for a query to run on the primary, before fallback to secondary TAFTimeoutMs uint32 // for adaptive timeouts, how long a window to try to keep TAFBinDuration int TAFAllowSlowEveryX int TAFNormallySlowCount int // for testing, enabling profile EnableProfile bool ProfileHTTPPort string ProfileTelnetPort string // to use OpenSSL (for testing) or crypto/tls UseOpenSSL bool ErrorCodePrefix string StateLogPrefix string ManagementTablePrefix string // RAC maint reload config interval RacMaintReloadInterval int // worker restarts are spread over this window RacRestartWindow int MuxPidFile string EnableConnLimitCheck bool EnableQueryBindBlocker bool QueryBindBlockerMinSqlPrefix int // taf testing TestingEnableDMLTaf bool // // enable background goroutine to recover worker not returned by coordinator // EnableDanglingWorkerRecovery bool GoStatsInterval int RandomStartMs int // The max number of database connections to be established per second MaxDbConnectsPerSec int // Max desired percentage of healthy workers for the worker pool MaxDesiredHealthyWorkerPct int // contains filtered or unexported fields }
The Config contains all the static configuration
func (*Config) GetBacklogLimit ¶
func (cfg *Config) GetBacklogLimit(wtype HeraWorkerType, shard int) int
GetBacklogLimit returns the limit for the number of backlogged workers for a certain pool and shard.
func (*Config) NumWorkersCh ¶
NumWorkersCh returns the channel where number of workers change is sent
type ConnHandlerFunc ¶
ConnHandlerFunc defines the signature of a fucntion that can be used as a callback by the loop driver
type ConnStateInfo ¶
type ConnStateInfo struct {
// contains filtered or unexported fields
}
ConnStateInfo is a container holding connection state information
type Coordinator ¶
type Coordinator struct {
// contains filtered or unexported fields
}
Coordinator is the entity managing a client session. It receives commands from the client connection and allocates one or more workers to execute the request. After the request is completed it will free the worker(s) if they are not needed. It is possible that a request will start a transaction, in which case the worker will staty allocated until the transaction is completed (with COMMIT) or canceled (with ROLLBACK)
func NewCoordinator ¶
func NewCoordinator(ctx context.Context, clientchannel <-chan *netstring.Netstring, conn net.Conn) *Coordinator
NewCoordinator creates a coordinator, clientchannel is used to read the requests, conn is used to write responses
func (*Coordinator) DispatchTAFSession ¶
func (crd *Coordinator) DispatchTAFSession(request *netstring.Netstring) error
DispatchTAFSession starts running a session, which is a series of netstring.Netstrings executed by the same resource. Session is completed when the worker sends EOR free, for example after a commit, a rollback or as part of end-of-data if the request was a select+fetch When the primary database is OK all the requests go to it. When a request to the primary fails, (which can be because of a configured timeout or because of some ORA error) we remember into a structure TAF. A measure of the "health" for the primary database is how many queries are failing versus the total - see taf.go for details. When the primary database health decreases, we start sending the some requests directly to the fallback database. If the primary is completely down, we still send 1% of requests to the primary, as a "health check", so that we can c ompletely switch to the primary when the primary eventualy comes back up
func (*Coordinator) Done ¶
func (crd *Coordinator) Done() <-chan int
Done returns the channel used when the coordinator is done
func (*Coordinator) PreprocessQueryBindBlocker ¶
func (crd *Coordinator) PreprocessQueryBindBlocker(requests []*netstring.Netstring) (bool, string)
func (*Coordinator) PreprocessSharding ¶
func (crd *Coordinator) PreprocessSharding(requests []*netstring.Netstring) (bool, error)
PreprocessSharding is doing shard info calculation and validation checks (by calling verifyValidShard) before determining if the current request should continue, returning nil error if the request should be allowed. If error is not nil, the second parameter says if the coordinator should hangup the client connection. The decision to hang-up or not in case of error is based on backward compatibility
func (*Coordinator) Run ¶
func (crd *Coordinator) Run()
Run is designed to be hosted by a constantly running goroutine for the duration of a client connection. client requests are picked off through clientchannel one at a time and handed over to DispatchSession. A session contains a collection of client netstrings within the same transactional context. it starts from a "fresh" client request and ends at a client request receiving an worker EOR with intrans=false, at which point the next client request is the start of another "fresh" client request. a simple session may consist of a single client netstring command (e.g. a read query). a more complicated session may span over several client netstring commands (e.g. a update netstring followed by a commit/rollback netstring). when processing session with multiple client netstring commands, Run() reads off the first netstring command to start a session, while the subsequent netstring commands within the same session are read off by corresponding session dispatching functions. as of now, dispatchDefaultSession is the only such dispatching function allowed to handle session with multiple netstring commands. Caching/Routing sessions contains read queries with no open cursor, where multiple client netsting commands in those sessions are treated as client protocol errors during the lifecyle of a coordinator, multiple sessions could be processed, each ends on an eor free from the worker. at the end of each session, flow control is returned back to Run(), and the next client request is parsed again before dispatching
type HandlerFunc ¶
HandlerFunc defines the signature of the callback to handle the connection
type HeraWorkerStatus ¶
type HeraWorkerStatus int
HeraWorkerStatus defines the posible states the worker can be in
type Listener ¶
type Listener interface { // Accept waits for and returns the next connection to the listener. Accept() (net.Conn, error) // Initialize the connection Init(net.Conn) (net.Conn, error) // Close closes the listener. // Any blocked Accept operations will be unblocked and return errors. Close() error }
Listener interface is used by the server to accept connections
func NewTCPListener ¶
NewTCPListener creates a Listener attached to the address "service". It is a wrapper over net.Listener
func NewTLSListener ¶
NewTLSListener creates the TLS listener
type LockTimeout ¶
type LockTimeout struct {
// contains filtered or unexported fields
}
LockTimeout is a non-blocking best-effort mutex
func (*LockTimeout) TryLock ¶
func (lt *LockTimeout) TryLock() int
TryLock is attempting to acquire the lock Returns 1 if lock acquired, 0 if lock no available
type OpsConfig ¶
type OpsConfig struct {
// contains filtered or unexported fields
}
The OpsConfig contains the configuration that can be modified during run time
type QueryBindBlockerCfg ¶
type QueryBindBlockerCfg struct { // lookup by sqlhash // then by bind name, then by bind value BySqlHash map[uint32]map[string]map[string][]QueryBindBlockerEntry }
func GetQueryBindBlockerCfg ¶
func GetQueryBindBlockerCfg() *QueryBindBlockerCfg
type QueryBindBlockerEntry ¶
type Queue ¶
type Queue interface { // Len function tells how many elements are Len() int // Push adds an element to the queue, at the end Push(el interface{}) bool // PushFront adds an element to the queue, at the front - basically making this a stack PushFront(el interface{}) bool // Poll poss an element from the queue Poll() interface{} // Remove removes the element having the given value Remove(el interface{}) bool // ForEachRemove walks the entire list removeing elements satisfying the condition ForEachRemove(f func(interface{}) bool) int }
Queue is interface for a queue implementation
type Server ¶
type Server interface {
Run()
}
Server contains the Run method which is the infinite loop
func NewServer ¶
func NewServer(lsn Listener, f HandlerFunc) Server
NewServer creates a server from the Lister and the function handling the connections accepted
type ShardMapRecord ¶
type ShardMapRecord struct {
// contains filtered or unexported fields
}
ShardMapRecord is the mapping between a physical bin to a logical shard
type ShardingCfg ¶
type ShardingCfg struct {
// contains filtered or unexported fields
}
ShardingCfg is an array of 1024 ShardMapRecord
func GetShardingCfg ¶
func GetShardingCfg() *ShardingCfg
GetShardingCfg atomically get the sharding config
type StateEvent ¶
type StateEvent struct {
// contains filtered or unexported fields
}
StateEvent keeps the state information
type StateEventType ¶
type StateEventType int
StateEventType is an event published by proxy when state changes.
type StateLog ¶
type StateLog struct {
// contains filtered or unexported fields
}
StateLog is exposed as a singleton. all stateful resources are protected behind a message channel that sychronizes incoming messages. user should not call any of the internal functions that are not threadsafe.
func (*StateLog) GetStartTime ¶
GetStartTime gets the app start time
func (*StateLog) GetStrandedWorkerCountForPool ¶
func (sl *StateLog) GetStrandedWorkerCountForPool(shardID int, wType HeraWorkerType, instID int) int
GetStrandedWorkerCountForPool is a best effort function to get the count of backlog in a worker pool without thread locking.
func (*StateLog) GetTotalConnections ¶
GetTotalConnections is a best effort, without thread locking, to give the total number of connections
func (*StateLog) GetWorkerCountForPool ¶
func (sl *StateLog) GetWorkerCountForPool(workerState HeraWorkerStatus, shardID int, wType HeraWorkerType, instID int) int
func (*StateLog) HasActiveWorker ¶
HasActiveWorker is a best effort, without thread locking, telling if at least a worker is active
func (*StateLog) ProxyHasCapacity ¶
ProxyHasCapacity checks if there is enough capacity
func (*StateLog) PublishStateEvent ¶
func (sl *StateLog) PublishStateEvent(_evt StateEvent) error
PublishStateEvent sends the event to the channel, so it will be processed by the state log routine
func (*StateLog) SetStartTime ¶
SetStartTime emits the CAL event
type TAF ¶
type TAF interface { // return true if the request should use the primary db, false to use the fallback UsePrimary() bool // For logging, gets the internal PCT field, which is a measure of the health of the primary database GetPct() int // To be called by coordinator if the request was ok NotifyOK() // To be called by coordinator if the request timed out failed with an ORA error NotifyError() }
TAF keeps a statistic of errors and successes and tells which database whould be used for the next request
type TafQueries ¶
type TafQueries struct { CountNormallyFast int64 // contains filtered or unexported fields }
TafQueries keeps for each SQL statistics if the query timed out. This is in order to decide if a query is "naturaly slow" so it can be ignored by taf
func GetTafQueries ¶
func GetTafQueries(shardID int) *TafQueries
GetTafQueries returns the taf queries statistics
func (*TafQueries) IsNormallySlow ¶
func (tq *TafQueries) IsNormallySlow(sqlhash int32) (bool, error)
IsNormallySlow looks into the stats for the query whose has is sqlhash and returns if the query is normaly slow
func (*TafQueries) RecordTimeout ¶
func (tq *TafQueries) RecordTimeout(sqlhash int32) (bool, error)
RecordTimeout update the statistic for the query whose hash is sqlhash, when the query timed out
type TafQueryRuns ¶
type TafQueryRuns struct {
// contains filtered or unexported fields
}
TafQueryRuns store one run
type Throttler ¶
type Throttler interface {
CanRun()
}
Throttler is used to throttle some activity based on a rate
func NewThrottler ¶
NewThrottler creates a throttler
type WLCfg ¶
type WLCfg struct {
// contains filtered or unexported fields
}
WLCfg keeps the whitelist configuration keys should be int64 or string
type WorkerBroker ¶
type WorkerBroker struct {
// contains filtered or unexported fields
}
WorkerBroker is managing the workers, starting the worker pools, and restarting workers when needed
func GetWorkerBrokerInstance ¶
func GetWorkerBrokerInstance() *WorkerBroker
GetWorkerBrokerInstance returns the singleton broker instance where different request handler goroutines can use to get a free worker
func (*WorkerBroker) AddPidToWorkermap ¶
func (broker *WorkerBroker) AddPidToWorkermap(worker *WorkerClient, pid int)
AddPidToWorkermap add the worker to the map pid -> worker
func (*WorkerBroker) GetWorkerPool ¶
func (broker *WorkerBroker) GetWorkerPool(wType HeraWorkerType, ids ...int) (workerbroker *WorkerPool, err error)
GetWorkerPool get the worker pool object for the type and id ids holds optional paramenters.
ids[0] == instance id; ids[1] == shard id.
if a particular id is not set, it defaults to 0. TODO: interchange sid <--> instId since instId is not yet used
func (*WorkerBroker) GetWorkerPoolCfgs ¶
func (broker *WorkerBroker) GetWorkerPoolCfgs() (pCfgs []map[HeraWorkerType]*WorkerPoolCfg)
GetWorkerPoolCfgs returns the worker pool configuration
func (*WorkerBroker) RestartWorkerPool ¶
func (broker *WorkerBroker) RestartWorkerPool(_moduleName string) error
RestartWorkerPool (re)starts all the worker pools workerpool.init calls statelog.init, which in turn calls back GetWorkerBrokerInstance this causes a deadlock during workerbroker initialization since golang lock is not reentrant. taking out workerpool.init from broker.init and calling it separately.
func (*WorkerBroker) Stopped ¶
func (broker *WorkerBroker) Stopped() <-chan struct{}
Stopped is called when we are done, it sends a message to the "stopped" channel, which is read by the main mux routine
type WorkerClient ¶
type WorkerClient struct { ID int // the worker identifier, from 0 to max worker count Type HeraWorkerType // the type of worker (ex write, read); all workers from the same type are grouped in a pool Status HeraWorkerStatus // the worker state, like init, accept, etc // contains filtered or unexported fields }
WorkerClient represents a worker process
func NewWorker ¶
func NewWorker(wid int, wType HeraWorkerType, instID int, shardID int, moduleName string, thr Throttler) *WorkerClient
NewWorker creates a new workerclient instance (pointer)
func (*WorkerClient) AttachToWorker ¶
func (worker *WorkerClient) AttachToWorker() (err error)
AttachToWorker is called immediately after a worker process was created, it is a wrapper over the function doing the initialization work - attachToWorker. In case attachToWorker fails it does the cleanup.
func (*WorkerClient) Close ¶
func (worker *WorkerClient) Close()
Close close the connections to the worker
func (*WorkerClient) DrainResponseChannel ¶
func (worker *WorkerClient) DrainResponseChannel(sleep time.Duration)
DrainResponseChannel removes any messages that might be in the channel. This is used when the worker is recovered.
func (*WorkerClient) Recover ¶
func (worker *WorkerClient) Recover(p *WorkerPool, ticket string, recovParam WorkerClientRecoverParam, info *strandedCalInfo, param ...int)
Recover interrupts a worker busy executing a request, usually because a client went away. It sends a break to the worker and expect the worker to respond with EOR free. If worker is not free-ing in two seconds, the worker is stopped with SIGKILL
func (*WorkerClient) StartWorker ¶
func (worker *WorkerClient) StartWorker() (err error)
StartWorker fork exec a new worker. Note: the routine is not "reentrant", it changes the global environment. For now is fine.
func (*WorkerClient) Terminate ¶
func (worker *WorkerClient) Terminate() error
Terminate sends SIGTERM to worker first (allow worker to gracefully shutdown) wait for 2000 ms before sending SIGKILL if necessary. Note: this function will block ~ 100 - 2000 ms
type WorkerClientRecoverParam ¶
type WorkerClientRecoverParam struct {
// contains filtered or unexported fields
}
type WorkerPool ¶
type WorkerPool struct { ShardID int // the shard the workers are connected to Type HeraWorkerType // the worker type like write, read InstID int // contains filtered or unexported fields }
WorkerPool represents a pool of workers of the same kind the implementation uses a C++-ish mutex/condition variable/queue rather than a Golang-ish channel + timer because the policy for using the worker is LIFO (for better usage of cache) while the channels are FIFO
func (*WorkerPool) DecHealthyWorkers ¶
func (pool *WorkerPool) DecHealthyWorkers()
DecHealthyWorkers called to decrement the number of workers conected to the database
func (*WorkerPool) GetHealthyWorkersCount ¶
func (pool *WorkerPool) GetHealthyWorkersCount() int32
GetHealthyWorkersCount returns the number of workers conected to the database
func (*WorkerPool) GetWorker ¶
func (pool *WorkerPool) GetWorker(sqlhash int32, timeoutMs ...int) (worker *WorkerClient, t string, err error)
GetWorker gets the active worker if available. backlog with timeout if not.
@param sqlhash to check for soft eviction against a blacklist of slow queries.
if getworker needs to exam the incoming sql, there does not seem to be another elegant way to do this except to pass in the sqlhash as a parameter.
@param timeoutMs[0] timeout in milliseconds. default to adaptive queue timeout.
func (*WorkerPool) Healthy ¶
func (pool *WorkerPool) Healthy() bool
Healthy checks if the number of workers connected to the database is greater than 20%
func (*WorkerPool) IncHealthyWorkers ¶
func (pool *WorkerPool) IncHealthyWorkers()
IncHealthyWorkers called to increment the number of workers conected to the database
func (*WorkerPool) Init ¶
func (pool *WorkerPool) Init(wType HeraWorkerType, size int, instID int, shardID int, moduleName string) error
Init creates the pool by creating the workers and making all the initializations
func (*WorkerPool) RacMaint ¶
func (pool *WorkerPool) RacMaint(racReq racAct)
RacMaint is called when rac maintenance is needed. It marks the workers for restart, spreading to an interval in order to avoid connection storm to the database
func (*WorkerPool) Resize ¶
func (pool *WorkerPool) Resize(newSize int)
Resize resize the worker pool when the corresponding dynamic configuration changed. When the size is increased, the increase is immediate by spawning the necessary number of new workers. When the size is decreased, it removes the workers whose id is bigger then the number of workers. If the workers to be removed are free, they are terminated immediately, otherwise the termination is delayed until the worker eventually calls ReturnWorker to make itself available
func (*WorkerPool) RestartWorker ¶
func (pool *WorkerPool) RestartWorker(worker *WorkerClient) (err error)
RestartWorker is called after a worker exited to perform the necessary cleanup and re-start a new worker. In the rare situation where the pool need to be down-sized a new worker is not restarted.
func (*WorkerPool) ReturnWorker ¶
func (pool *WorkerPool) ReturnWorker(worker *WorkerClient, ticket string) (err error)
ReturnWorker puts the worker into the list of available workers. It is called usually after a coordinator used it for requests and no longer needs it. If the pool is about to be downsize, the worker is instead terminated instead of being put in the available list. It the worker lifetime expired, the worker is instead terminated instead of being put in the available list.
func (*WorkerPool) WorkerReady ¶
func (pool *WorkerPool) WorkerReady(worker *WorkerClient) (err error)
WorkerReady is called after the worker started and become available. It puts the worker into the internal list of workers as well as in the list of available workers
type WorkerPoolCfg ¶
type WorkerPoolCfg struct {
// contains filtered or unexported fields
}
WorkerPoolCfg is a configuration structure to keep setups for each type of worker pool
type WorkerStateInfo ¶
type WorkerStateInfo struct {
// contains filtered or unexported fields
}
WorkerStateInfo is a container holding worker state information
Source Files ¶
- adaptivequemgr.go
- bindevict.go
- cfgFromTns.go
- config.go
- connectionhandler.go
- constants.go
- coordinator.go
- coordinatorblocker.go
- coordinatorsharding.go
- coordinatortaf.go
- lock_timeout.go
- loopdriver.go
- main.go
- murmur3.go
- profiler.go
- querybindblocker.go
- queue.go
- racmaint.go
- server.go
- shardingcfg.go
- statelog.go
- stats.go
- taf.go
- tafqueries.go
- tcp_listener.go
- throttler.go
- tls_listener.go
- util.go
- workerbroker.go
- workerclient.go
- workerpool.go