server

package
v0.0.0-...-110c355 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 2, 2024 License: Apache-2.0 Imports: 40 Imported by: 0

Documentation

Index

Constants

View Source
const (
	TxStateNone    int = 0
	TxStateWatch   int = 0x1
	TxStateMulti   int = 0x2
	TxStatePrepare int = 0x4
)
View Source
const (
	PrepareStateNone = iota
	PrepareStateKeyModified
	PrepareStateLockFail
	PrepareStateLocked
	PrepareStateUnlock
)
View Source
const (
	CONFIGGET = "GET"
	CONFIGSET = "SET"
)
View Source
const (
	MsgWrongType          = "WRONGTYPE Operation against a key holding the wrong kind of value"
	MsgInvalidInt         = "ERR value is not an integer or out of range"
	MsgInvalidFloat       = "ERR value is not a valid float"
	MsgInvalidMinMax      = "ERR min or max is not a float"
	MsgInvalidRangeItem   = "ERR min or max not valid string range item"
	MsgInvalidTimeout     = "ERR timeout is not a float or out of range"
	MsgErrSyntaxor        = "ERR syntax error"
	MsgKeyNotFound        = "ERR no such key"
	MsgOutOfRange         = "ERR index out of range"
	MsgInvalidCursor      = "ERR invalid cursor"
	MsgXXandNX            = "ERR XX and NX options at the same time are not compatible"
	MsgNegTimeout         = "ERR timeout is negative"
	MsgInvalidSETime      = "ERR invalid expire time in set"
	MsgInvalidSETEXTime   = "ERR invalid expire time in setex"
	MsgInvalidPSETEXTime  = "ERR invalid expire time in psetex"
	MsgInvalidKeysNumber  = "ERR Number of keys can't be greater than number of args"
	MsgNegativeKeysNumber = "ERR Number of keys can't be negative"
	MsgFScriptUsage       = "ERR Unknown subcommand or wrong number of arguments for '%s'. Try SCRIPT HELP."
	MsgFPubsubUsage       = "ERR Unknown subcommand or wrong number of arguments for '%s'. Try PUBSUB HELP."
	MsgSingleElementPair  = "ERR INCR option supports a single increment-element pair"
	MsgInvalidStreamID    = "ERR Invalid stream ID specified as stream command argument"
	MsgStreamIDTooSmall   = "ERR The ID specified in XADD is equal or smaller than the target stream top item"
	MsgStreamIDZero       = "ERR The ID specified in XADD must be greater than 0-0"
	MsgNoScriptFound      = "NOSCRIPT No matching script. Please use EVAL."
	MsgUnsupportedUnit    = "ERR unsupported unit provided. please use m, km, ft, mi"
	MsgNotFromScripts     = "This Redis command is not allowed from scripts"
	MsgXreadUnbalanced    = "ERR Unbalanced XREAD list of streams: for each stream key an ID or '$' must be specified."
)
View Source
const (
	M_NORMAL   ModelType = 0
	M_OBSERVER ModelType = 1
	M_WITNESS  ModelType = 2

	DB_SYNC_RUN_TYPE_END  = 0
	DB_SYNC_RUN_TYPE_SEND = 1
	DB_SYNC_RUN_TYPE_RECV = 2

	DB_SYNC_NOTHING      = 0
	DB_SYNC_PREPARE_FAIL = 1
	DB_SYNC_PREPARE_SUCC = 2
	DB_SYNC_SEND_FAIL    = 3
	DB_SYNC_SENDING      = 4
	DB_SYNC_SEND_SUCC    = 5
	DB_SYNC_RECVING_FAIL = 6
	DB_SYNC_RECVING      = 7
	DB_SYNC_RECVING_SUCC = 8
	DB_SYNC_CONN_FAIL    = 9
	DB_SYNC_CONN_SUCC    = 10
)
View Source
const (
	StatusPrepare = iota
	StatusStart
	StatusRunning
	StatusClose
	StatusExited
)

Variables

View Source
var LuaShardCount uint32 = 64

Functions

func AddCommand

func AddCommand(list map[string]*Cmd)

func AddPlugin

func AddPlugin(p *Proc)

func AddRaftPlugin

func AddRaftPlugin(p *Proc)

func ConvertLuaTable

func ConvertLuaTable(l *lua.LState, value lua.LValue) []string

func ErrLuaParseError

func ErrLuaParseError(err error) string

func ErrWrongNumber

func ErrWrongNumber(cmd string) string

func InitLuaPool

func InitLuaPool(s *Server)

func LuaToRedis

func LuaToRedis(l *lua.LState, c *Client, value lua.LValue)

func MkLuaFuncs

func MkLuaFuncs(srv *Server) map[string]lua.LGFunction

func NewCpuAdjust

func NewCpuAdjust(path string, lastCpuNum int) *cpuAdjust

func PutLuaClientToPool

func PutLuaClientToPool(l *LuaClient)

func PutRaftClientToPool

func PutRaftClientToPool(c *Client)

Types

type Client

type Client struct {
	*resp.Session

	DB             *engine.Bitalos
	QueryStartTime time.Time
	KeyHash        uint32
	IsMaster       func() bool
	// contains filtered or unexported fields
}

func GetRaftClientFromPool

func GetRaftClientFromPool(s *Server, data [][]byte, keyHash uint32) *Client

func GetVmFromPool

func GetVmFromPool(s *Server) *Client

func NewClientRESP

func NewClientRESP(conn net.Conn, s *Server) *Client

func (*Client) ApplyDB

func (c *Client) ApplyDB(raftSyncCostNs int64) error

func (*Client) Close

func (c *Client) Close()

func (*Client) FormatData

func (c *Client) FormatData(reqData [][]byte)

func (*Client) GetInfo

func (c *Client) GetInfo() *SInfo

func (*Client) HandleRequest

func (c *Client) HandleRequest(plugin bool, reqData [][]byte, isHashTag bool) (err error)

func (*Client) ResetQueryStartTime

func (c *Client) ResetQueryStartTime()

type Cmd

type Cmd struct {
	NArg           int
	Sync           bool
	Name           string
	Handler        func(*Client) error
	NotAllowedInTx bool
	NoKey          bool
	KeySkip        uint8
}

type DB

type DB struct {
	*engine.Bitalos

	Info *SInfo
}

type DbSyncStatusType

type DbSyncStatusType int

func (DbSyncStatusType) String

func (dst DbSyncStatusType) String() string

type LuaClient

type LuaClient struct {
	LState *lua.LState
	Count  int16
}

func GetLuaClientFromPool

func GetLuaClientFromPool() *LuaClient

type ModelType

type ModelType int

func (ModelType) String

func (mt ModelType) String() string

type Proc

type Proc struct {
	Name string

	Start func(*Server)
	Stop  func(*Server, interface{})

	Connect func(*Server, *Client)
	Disconn func(*Server, *Client, interface{})
	Prepare func(*Client, *Cmd, string) bool
	Handled func(*Client, *Cmd, string)

	DoRaftSync func(*Client, *Cmd, string) error
}

type SInfo

type SInfo struct {
	Server         SinfoServer
	Client         SinfoClient
	Cluster        SinfoCluster
	Stats          SinfoStats
	Data           SinfoData
	RuntimeStats   SRuntimeStats
	BitalosdbUsage *bitsdb.BitsUsage
}

func NewSinfo

func NewSinfo() *SInfo

func (*SInfo) Marshal

func (sinfo *SInfo) Marshal() ([]byte, func())

type SRuntimeStats

type SRuntimeStats struct {
	General struct {
		Alloc   uint64 `json:"runtime_general_alloc"`
		Sys     uint64 `json:"runtime_general_sys"`
		Lookups uint64 `json:"runtime_general_lookups"`
		Mallocs uint64 `json:"runtime_general_mallocs"`
		Frees   uint64 `json:"runtime_general_frees"`
	} `json:"runtime_general"`

	Heap struct {
		Alloc   uint64 `json:"runtime_heap_alloc"`
		Sys     uint64 `json:"runtime_heap_sys"`
		Idle    uint64 `json:"runtime_heap_idle"`
		Inuse   uint64 `json:"runtime_heap_inuse"`
		Objects uint64 `json:"runtime_heap_objects"`
	} `json:"heap"`

	GC struct {
		Num          uint32  `json:"runtime_gc_num"`
		CPUFraction  float64 `json:"runtime_gc_cpu_fraction"`
		TotalPauseMs uint64  `json:"runtime_gc_total_pausems"`
	} `json:"gc"`

	NumProcs      int `json:"runtime_num_procs"`
	NumGoroutines int `json:"runtime_num_goroutines"`

	MemoryTotal int64   `json:"memory_total"`
	MemoryShr   int64   `json:"memory_shr"`
	CPU         float64 `json:"cpu"`
	// contains filtered or unexported fields
}

func (*SRuntimeStats) AppendTo

func (srs *SRuntimeStats) AppendTo(target []byte, pos int) int

func (*SRuntimeStats) Marshal

func (srs *SRuntimeStats) Marshal() ([]byte, func())

func (*SRuntimeStats) Samples

func (srs *SRuntimeStats) Samples()

func (*SRuntimeStats) UpdateCache

func (srs *SRuntimeStats) UpdateCache()

type Server

type Server struct {
	Info              *SInfo
	IsMaster          func() bool
	MigrateDelToSlave func(keyHash uint32, data [][]byte) error
	IsWitness         bool
	// contains filtered or unexported fields
}

func NewServer

func NewServer() (*Server, error)

func (*Server) Close

func (s *Server) Close()

func (*Server) FlushCallback

func (s *Server) FlushCallback(compactIndex uint64)

func (*Server) GetCommand

func (s *Server) GetCommand(c string) *Cmd

func (*Server) GetDB

func (s *Server) GetDB() *engine.Bitalos

func (*Server) GetIsClosed

func (s *Server) GetIsClosed() bool

func (*Server) PrepareSnapshot

func (s *Server) PrepareSnapshot() (ls interface{}, err error)

func (*Server) RecoverFromSnapshot

func (s *Server) RecoverFromSnapshot(r io.Reader, done <-chan struct{}) error

func (*Server) Run

func (s *Server) Run()

func (*Server) RunDeleteExpireDataTask

func (s *Server) RunDeleteExpireDataTask()

func (*Server) SaveSnapshot

func (s *Server) SaveSnapshot(ctx interface{}, w io.Writer, done <-chan struct{}) error

type SinfoClient

type SinfoClient struct {
	ClientTotal atomic.Int64 `json:"total_clients"`
	ClientAlive atomic.Int64 `json:"connected_clients"`
	// contains filtered or unexported fields
}

func (*SinfoClient) AppendTo

func (sc *SinfoClient) AppendTo(target []byte, pos int) int

func (*SinfoClient) Marshal

func (sc *SinfoClient) Marshal() ([]byte, func())

func (*SinfoClient) UpdateCache

func (sc *SinfoClient) UpdateCache()

type SinfoCluster

type SinfoCluster struct {
	StartModel       ModelType `json:"start_model"`
	Status           bool      `json:"status"`
	Role             string    `json:"role"`
	ClusterId        uint64    `json:"cluster_id"`
	CurrentNodeId    uint64    `json:"current_node_id"`
	RaftAddress      string    `json:"raft_address"`
	LeaderNodeId     uint64    `json:"leader_node_id"`
	LeaderAddress    string    `json:"leader_address"`
	ClusterNodes     string    `json:"cluster_nodes"`
	ClusterNodesList string    `json:"cluster_nodes_list"`
	// contains filtered or unexported fields
}

func (*SinfoCluster) AppendTo

func (sc *SinfoCluster) AppendTo(target []byte, pos int) int

func (*SinfoCluster) Marshal

func (sc *SinfoCluster) Marshal() ([]byte, func())

func (*SinfoCluster) UpdateCache

func (sc *SinfoCluster) UpdateCache()

type SinfoData

type SinfoData struct {
	UsedSize         int64 `json:"used_size"`
	DataSize         int64 `json:"data_size"`
	RaftNodeHostSize int64 `json:"raft_nodehost_size"`
	RaftWalSize      int64 `json:"raft_wal_size"`
	SnapshotSize     int64 `json:"snapshot_size"`
	// contains filtered or unexported fields
}

func (*SinfoData) AppendTo

func (sd *SinfoData) AppendTo(target []byte, pos int) int

func (*SinfoData) Marshal

func (sd *SinfoData) Marshal() ([]byte, func())

func (*SinfoData) Samples

func (sd *SinfoData) Samples()

func (*SinfoData) UpdateCache

func (sd *SinfoData) UpdateCache()

type SinfoServer

type SinfoServer struct {
	MaxProcs      int    `json:"maxprocs"`
	ProcessId     int    `json:"process_id"`
	StartTime     string `json:"start_time"`
	ServerAddress string `json:"server_address"`
	MaxClient     int64  `json:"max_client"`
	SingleDegrade bool   `json:"single_degrade"`
	GitVersion    string `json:"git_version"`
	Compile       string `json:"compile"`
	ConfigFile    string `json:"config_file"`
	// contains filtered or unexported fields
}

func (*SinfoServer) AppendTo

func (ss *SinfoServer) AppendTo(target []byte, pos int) int

func (*SinfoServer) Marshal

func (ss *SinfoServer) Marshal() ([]byte, func())

func (*SinfoServer) UpdateCache

func (ss *SinfoServer) UpdateCache()

type SinfoStats

type SinfoStats struct {
	TotolCmd      atomic.Uint64
	QPS           atomic.Uint64
	QueueLen      int
	RaftLogIndex  uint64
	IsDelExpire   int
	StartModel    ModelType
	DbSyncRunning atomic.Int32
	DbSyncStatus  DbSyncStatusType
	DbSyncErr     string
	IsMigrate     atomic.Int32 `json:"is_migrate"`
	// contains filtered or unexported fields
}

func (*SinfoStats) AppendTo

func (ss *SinfoStats) AppendTo(target []byte, pos int) int

func (*SinfoStats) Marshal

func (ss *SinfoStats) Marshal() ([]byte, func())

func (*SinfoStats) UpdateCache

func (ss *SinfoStats) UpdateCache()

type TxLocker

type TxLocker struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

type TxShardLocker

type TxShardLocker struct {
	// contains filtered or unexported fields
}

func NewTxLockers

func NewTxLockers(shards uint32) *TxShardLocker

func (*TxShardLocker) GetTxLock

func (sl *TxShardLocker) GetTxLock(khash uint32) *TxLocker

func (*TxShardLocker) GetTxLockByKey

func (sl *TxShardLocker) GetTxLockByKey(key []byte) *TxLocker

func (*TxShardLocker) GetWatchKey

func (sl *TxShardLocker) GetWatchKey(keyStr string) *TxWatchKey

func (*TxShardLocker) GetWatchKeyWithKhash

func (sl *TxShardLocker) GetWatchKeyWithKhash(khash uint32, keyStr string) *TxWatchKey

type TxWatchKey

type TxWatchKey struct {
	// contains filtered or unexported fields
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL