fastmulticache

package module
v0.0.0-...-0eb2668 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 18, 2022 License: Apache-2.0 Imports: 12 Imported by: 0

README

fastmulticache

介绍

多级缓存引入了一种批量化的查询和更新方法,用户可通过实现SetMem、SetKv等接口接入不同类型的缓存中间件,如memcache、redis等。

实现原理
kv批量查询

image

kv批量查询会将当前收到的所有请求传入查询工作池,查询工作池会将这些请求对应的查询key随机放入n个查询队列中,并通过对象服用池申请n个channel管道来获取查询结果的异步通知。每个查询队列对应一个工作协程,用户可自定义查询队列的属性,如队列长度N,代表队列中的个数达到多少后就放入pipeline执行批量查询;队列数量M,代表并发执行的查询队列的数量有多少,在高QPS时可以设大些以提升系统查询的性能;超时时间T, 代表QPS较低时在T时间内队列中待提交的元素个数<N时,为了维持请求QPS的要求,强制放入pipeline执行批量查询。批量查询后会解析redis批量请求结果,并拆分成n个value并通过n个channel管道来通知查询结果。通过批量查询的方式可以提升redis网络请求带宽的利用率,从而提升整体的QPS。

kv批量写入

image

kv批量写入会将当前收到的所有请求传入写入工作池,写入工作池会将这些请求对应的写入key及对应的value随机放入n个写入队列中,并通过对象服用池申请n个channel管道来获取写入结果的异步通知。每个写入队列对应一个工作协程,用户可通过环境变量自定义写入队列的属性,如队列长度N,代表队列中的个数达到多少后就放入pipeline执行批量写入;队列数量M,代表并发执行的写入队列的数量有多少,在高QPS时可以设大些以提升系统写入的性能;超时时间T, 代表QPS较低时在T时间内队列中待提交的元素个数<N时,为了维持请求QPS的要求,强制放入pipeline执行批量写入。批量写入后会解析redis批量请求结果,并拆分成n个value并通过n个channel管道来通知写入结果。通过批量写入的方式也可以提升redis网络请求带宽的利用率,从而提升整体的QPS。

db批量写入

image

db批量写入会将当前收到的所有请求传入写入工作池,写入工作池会将这些请求按table名进行分组,并将每组table对应的写入key及对应的value随机放入n个写入队列中,并通过对象服用池申请n个channel管道来获取写入结果的异步通知。每个写入队列对应一个工作协程,用户可通过环境变量自定义写入队列的属性,如队列长度N,代表队列中的个数达到多少后就放入pipeline执行批量写入;队列数量M,代表并发执行的写入队列的数量有多少,在高QPS时可以设大些以提升系统写入的性能;超时时间T, 代表QPS较低时在T时间内队列中待提交的元素个数<N时,为了维持请求QPS的要求,强制放入pipeline执行批量写入。批量写入后会解析mysql批量请求结果,并拆分成n个value并通过n个channel管道来通知写入结果。通过批量写入的方式可以提升mysql网络请求带宽的利用率,从而提升整体的QPS。

参数配置方法
方法名 含义
SetKv 设置缓存中间件的通用接口
SetMem 设置内存的通用接口
SetExpirationKvSeconds redis超时时间(秒)
SetExpirationMemSeconds 内存超时时间(秒)
SetPriorities 缓存优先级
SetBatchCommitTimeoutMsDb db批量插入的超时时间(毫秒)
SetBatchCommitMinNumDb db批量插入最小数量(毫秒)
SetBatchCommitTimeoutMsKv redis批量执行的超时时间(毫秒)
SetBatchCommitMinNumKv redis批量提交的最小数量
SetBufferNum 工作池队列的缓冲数
SetQueueNum 工作池队列数
SetDbMaxParallel 访问db的最大并行数
SetDbLockTimeoutMs 访问db的超时锁的超时时间(毫秒)
SetBreakDownTimeoutMs 预防雪崩的kv空数据过期时间(毫秒)
SetCacheSync 设置缓存跨节点同步通用方法

Documentation

Index

Constants

View Source
const (
	DefaultExpirationKvSeconds    = 3600
	DefaultExpirationMemSeconds   = 3600
	DefaultBatchCommitTimeoutMsDb = 10
	DefaultBatchCommitMinNumDb    = 10
	DefaultBatchCommitTimeoutMsKv = 10
	DefaultBatchCommitMinNumKv    = 10
	DefaultBufferNum              = 1000
	DefaultQueueNum               = 10
	DefaultDbMaxParallel          = 15
	DefaultDbLockTimeoutMs        = 50
)
View Source
const (
	CacheTypeMemory = 1
	CacheTypeKv     = 2
)
View Source
const (
	DataTypeSingle = 1
	DataTypeList   = 2
)
View Source
const (
	PublishMemChannel = "pub_mem"
)

Variables

View Source
var (
	ErrInitKvOpInvalid      = fmt.Errorf("init kv op invalid")
	ErrInitDbOpInvalid      = fmt.Errorf("init db op invalid")
	ErrInitMemOpInvalid     = fmt.Errorf("init mem op invalid")
	ErrPriorityInvalid      = fmt.Errorf("priority invalid")
	ErrNotExist             = fmt.Errorf("not exist")
	ErrInitCacheSyncInvalid = fmt.Errorf("init cache sync invalid")
)
View Source
var (
	CacheTypeNameMap = map[int]string{
		CacheTypeMemory: "memory",
		CacheTypeKv:     "kv",
	}
)
View Source
var (
	CacheTypePriMap = map[string][]int{
		"local":  {CacheTypeMemory},
		"remote": {CacheTypeKv},
		"all":    {CacheTypeMemory, CacheTypeKv},
	}
)

Functions

func BatchDbInsertExtractData

func BatchDbInsertExtractData(dataMake map[string]interface{}) (taskId int64, table string, data map[string]interface{})

func BatchDbInsertMakeData

func BatchDbInsertMakeData(table string, data map[string]interface{}) (dataMake map[string]interface{})

func BatchGetResultErr

func BatchGetResultErr(m map[string]interface{}) (err error)

func BatchGetResultVal

func BatchGetResultVal(m map[string]interface{}) (val interface{})

func BatchKvGetExtractData

func BatchKvGetExtractData(dataMake map[string]interface{}) (taskId int64, key string, dataType int)

func BatchKvGetMakeData

func BatchKvGetMakeData(key string, dataType int) (dataMake map[string]interface{})

func BatchKvSetExtractData

func BatchKvSetExtractData(dataMake map[string]interface{}) (taskId int64, key string, val string, dataType int, onlySync bool, breakDown bool)

func BatchKvSetMakeData

func BatchKvSetMakeData(key, val string, dataType int, onlySync bool, breakDown bool) (dataMake map[string]interface{})

func BatchResultErr

func BatchResultErr(m map[string]interface{}, err error)

func BatchResultVal

func BatchResultVal(m map[string]interface{}, val interface{})

func BatchTaskId

func BatchTaskId(dataMake map[string]interface{}) int64

func ByteToStr

func ByteToStr(b []byte) string

func CacheDbGet

func CacheDbGet(cacheType string, dataType int, table string, key string, order string, query interface{}, args ...interface{}) (value map[string]interface{}, err error)

func CacheDbInsert

func CacheDbInsert(cacheType string, dataType int, table string, data map[string]interface{}, key string) (err error)

func CacheDbPreload

func CacheDbPreload(tableName string, keyName string, dataType int, maxNum int)

缓存预热

func CacheDbUpdate

func CacheDbUpdate(cacheType string, table string, data map[string]interface{}, key string, query interface{}, args ...interface{}) (err error)

func CacheExtractKey

func CacheExtractKey(key string) (keyPrefix string, keySuffix string)

func CacheGet

func CacheGet(cacheType string, dataType int, table string, key string, order string, query interface{}, args ...interface{}) (value map[string]interface{}, err error)

func CacheKeyInfo

func CacheKeyInfo(key string) (src string, keyVal string, keyPrefix string)

func CacheLocalRate

func CacheLocalRate() float64

func CacheMakeKey

func CacheMakeKey(keyName string, value interface{}) string

func CacheRebuild

func CacheRebuild(maxNum int)

缓存重建

func CacheRefreshForce

func CacheRefreshForce(cacheType string, dataType int, table string, key string, query interface{}, args ...interface{})

func CacheRemoteRate

func CacheRemoteRate() float64

func CacheResetLocalRate

func CacheResetLocalRate()

func CacheResetRemoteRate

func CacheResetRemoteRate()

func CacheSyncStart

func CacheSyncStart()

func CacheTimeout

func CacheTimeout(breakDown bool, cacheType int) time.Duration

func Init

func Init(options ...CacheOptionFunc) (err error)

func MakeIdUnique

func MakeIdUnique() int64

func RecoverCommon

func RecoverCommon()

func SelectDbMu

func SelectDbMu() int

func SelectQueue

func SelectQueue() int

func Str2Byte

func Str2Byte(s string) (b []byte)

Types

type Batch

type Batch interface {
	AddTask(data map[string]interface{}) map[string]interface{}
	RunAsync()
}

type BatchCommon

type BatchCommon interface {
	Get(key string, dataType int) (value interface{}, err error)
	Set(key string, value string, dataType int, onlySync bool, breakDown bool) (err error)
}

type BatchDbInsert

type BatchDbInsert struct {
	// contains filtered or unexported fields
}

func NewBatchDbInsert

func NewBatchDbInsert() (obj *BatchDbInsert)

func (*BatchDbInsert) AddTask

func (b *BatchDbInsert) AddTask(data map[string]interface{}) (res map[string]interface{})

func (*BatchDbInsert) RunAsync

func (b *BatchDbInsert) RunAsync()

type BatchKvCommon

type BatchKvCommon struct {
}

func NewBatchKvCommon

func NewBatchKvCommon() *BatchKvCommon

func (*BatchKvCommon) Get

func (b *BatchKvCommon) Get(key string, dataType int) (value interface{}, err error)

func (*BatchKvCommon) Set

func (b *BatchKvCommon) Set(key string, value string, dataType int, onlySync bool, breakDown bool) (err error)

type BatchKvGet

type BatchKvGet struct {
	// contains filtered or unexported fields
}

func NewBatchKvGet

func NewBatchKvGet() (obj *BatchKvGet)

func (*BatchKvGet) AddTask

func (b *BatchKvGet) AddTask(data map[string]interface{}) (res map[string]interface{})

func (*BatchKvGet) RunAsync

func (b *BatchKvGet) RunAsync()

type BatchKvSet

type BatchKvSet struct {
	// contains filtered or unexported fields
}

func NewBatchKvSet

func NewBatchKvSet() (obj *BatchKvSet)

func (*BatchKvSet) AddTask

func (b *BatchKvSet) AddTask(data map[string]interface{}) (res map[string]interface{})

func (*BatchKvSet) RunAsync

func (b *BatchKvSet) RunAsync()

type BatchMemCommon

type BatchMemCommon struct {
}

func NewBatchMemCommon

func NewBatchMemCommon() *BatchMemCommon

func (*BatchMemCommon) Get

func (b *BatchMemCommon) Get(key string, dataType int) (value interface{}, err error)

func (*BatchMemCommon) Set

func (b *BatchMemCommon) Set(key string, value string, dataType int, onlySync bool, breakDown bool) (err error)

type BatchMemGet

type BatchMemGet struct {
}

func NewBatchMemGet

func NewBatchMemGet() *BatchMemGet

func (*BatchMemGet) AddTask

func (b *BatchMemGet) AddTask(data map[string]interface{}) (res map[string]interface{})

func (*BatchMemGet) RunAsync

func (b *BatchMemGet) RunAsync()

type BatchMemSet

type BatchMemSet struct {
}

func NewBatchMemSet

func NewBatchMemSet() *BatchMemSet

func (*BatchMemSet) AddTask

func (b *BatchMemSet) AddTask(data map[string]interface{}) (res map[string]interface{})

func (*BatchMemSet) RunAsync

func (b *BatchMemSet) RunAsync()

type CacheOp

type CacheOp interface {
	GetBatch(keys []KeyInfo) (values map[string]interface{}, err error)
	SetBatch(data []KeyValSetInfo) (err error)
	GetKeys(maxNum int) ([]KeyInfo, error)
	DelKey(keyInfo KeyInfo) error
}

type CacheOption

type CacheOption struct {
	// contains filtered or unexported fields
}

type CacheOptionFunc

type CacheOptionFunc func(*CacheOption) error

func SetBatchCommitMinNumDb

func SetBatchCommitMinNumDb(minNum int) CacheOptionFunc

func SetBatchCommitMinNumKv

func SetBatchCommitMinNumKv(minNum int) CacheOptionFunc

func SetBatchCommitTimeoutMsDb

func SetBatchCommitTimeoutMsDb(ms int) CacheOptionFunc

func SetBatchCommitTimeoutMsKv

func SetBatchCommitTimeoutMsKv(ms int) CacheOptionFunc

func SetBreakDownTimeoutMs

func SetBreakDownTimeoutMs(ms int) CacheOptionFunc

func SetBufferNum

func SetBufferNum(num int) CacheOptionFunc

func SetCacheSync

func SetCacheSync(sync CacheSync) CacheOptionFunc

func SetDb

func SetDb(db DbOp) CacheOptionFunc

func SetDbLockTimeoutMs

func SetDbLockTimeoutMs(ms int) CacheOptionFunc

func SetDbMaxParallel

func SetDbMaxParallel(dbMaxParallel int) CacheOptionFunc

func SetExpirationKvSeconds

func SetExpirationKvSeconds(seconds int) CacheOptionFunc

func SetExpirationMemSeconds

func SetExpirationMemSeconds(seconds int) CacheOptionFunc

func SetKeyPrefixSrc

func SetKeyPrefixSrc(m map[string]string) CacheOptionFunc

func SetKv

func SetKv(kv CacheOp) CacheOptionFunc

func SetMem

func SetMem(mem CacheOp) CacheOptionFunc

func SetPriorities

func SetPriorities(priority ...int) CacheOptionFunc

func SetQueueNum

func SetQueueNum(num int) CacheOptionFunc

type CacheStat

type CacheStat struct {
	CacheNumLocal     int64
	CacheNumRemote    int64
	LastRateLocal     float64
	LastRateRemote    float64
	RequestsNumLocal  int64
	RequestsNumRemote int64
}

func (*CacheStat) AddStat

func (s *CacheStat) AddStat(useCache bool, cacheType int)

func (*CacheStat) RateLocal

func (s *CacheStat) RateLocal() float64

func (*CacheStat) RateRemote

func (s *CacheStat) RateRemote() float64

func (*CacheStat) ResetLocal

func (s *CacheStat) ResetLocal()

func (*CacheStat) ResetRemote

func (s *CacheStat) ResetRemote()

type CacheSync

type CacheSync interface {
	Subscribe() (chan KeyValSetInfo, error)
}

type DbOp

type DbOp interface {
	InsertBatch(table string, data []map[string]interface{}) (err error)
	Get(table string, order string, query interface{}, args ...interface{}) (value []map[string]interface{}, err error)
	Update(table string, data map[string]interface{}, query interface{}, args ...interface{}) (err error)
	GetPage(table string, offset int, limit int) ([]map[string]interface{}, error)
}

type KeyInfo

type KeyInfo struct {
	Name     string
	DataType int
}

type KeyValSetInfo

type KeyValSetInfo struct {
	Key       string
	Val       string
	DataType  int
	OnlySync  bool
	BreakDown bool
}

type MutexTimeout

type MutexTimeout struct {
	// contains filtered or unexported fields
}

func NewMutexTimeout

func NewMutexTimeout() (mu *MutexTimeout)

func (*MutexTimeout) LockTimeout

func (m *MutexTimeout) LockTimeout(d time.Duration) bool

func (*MutexTimeout) Unlock

func (m *MutexTimeout) Unlock()

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL