Documentation ¶
Index ¶
- Variables
- func RunClearExpireBucket()
- func RunGrpcServer(Ip string, Port int)
- type BNHelper
- func (bn *BNHelper) CheckBucketName(bucketName string) bool
- func (bn *BNHelper) GetBucketName(t ...time.Time) string
- func (bn *BNHelper) GetBucketNameFromTime(tim string) string
- func (bn *BNHelper) GetBucketStartKey() int64
- func (bn *BNHelper) MinBucketName() string
- func (bn *BNHelper) NextBucketName(bucketName string) string
- type ChannelDb
- type Controller
- func (c *Controller) ClearExpireBucket()
- func (c *Controller) PullGetChannelPositionAll(channelName string) (retBucketName, retBucketKey string, retErr error)
- func (c *Controller) PullGetChannelPositionNow(channelName string) (retBucketName, retBucketKey string, retErr error)
- func (c *Controller) PullGetData(userName, channelName string) (retData []string, retBucketName, retBucketKey string, retErr error)
- func (c *Controller) PullGetUserChannelPosition(userName, channelName string) (retBucketName, retBucketKey string, retErr error)
- func (c *Controller) PullSetChannelPosition(userName, channelName, bucketName, bucketKey string) error
- func (c *Controller) PushWriteData(channelName string, Data [][]byte) error
- type SysDb
- func (s *SysDb) Init()
- func (s *SysDb) PullGetChannelPositionAll(dbHandle *bolt.DB, channelName string) (retBucketName, retBucketKey string, retErr error)
- func (s *SysDb) PullGetChannelPositionNow(dbHandle *bolt.DB, channelName string) (retBucketName, retBucketKey string, retErr error)
- func (s *SysDb) PullGetUserPosition(dbHandle *bolt.DB, userName, channelName string) (retBucketName, retBucketKey string, retErr error)
- func (s *SysDb) PullWriteUserPosition(dbHandle *bolt.DB, userName, channelName, bucketName, bucketKey string) (retErr error)
- func (s *SysDb) PushRefreshChannelPosition(channelName, bucketName, bucketKey string) (retErr error)
Constants ¶
This section is empty.
Variables ¶
var CfgBoltDb = struct { Host string // 监听ip, Port int // 监听端口 DbFolder string // 数据库存放文件夹 SysDbName string // 系统数据库名称 PreChannelDb string // 频道数据前缀名 PushPerNumLimit int // 单次推送条数限制,这个条数内需要有一条WaitRes的数据触发写入 BucketNameUserChannelPosition string // 用户拉取数据当前频道拉取位置的bucket名(系统数据库里) BucketNameChannelPosition string // 频道当前激活的bucket(系统数据库里) HourDataRetain int // 数据保留小时数 }{}
Functions ¶
Types ¶
type BNHelper ¶
type BNHelper struct{}
BNHelper bucket名称管理工具
func (*BNHelper) CheckBucketName ¶
CheckBucketName 检查bucket名称是否有效
func (*BNHelper) GetBucketName ¶
GetBucketName 获取bucket名称, 参数:有时间参数则通过时间参数转换,否则取当前时间转换的 返回格式:D加11位时间,及精确到10分钟 (D年月日时分<十位部分>)
func (*BNHelper) GetBucketNameFromTime ¶
GetBucketNameFromTime 时间格式 转换成 bucket名称 参数:时间格式为 20231212235959 (年月日时分秒) 返回格式:D加11位时间,及精确到10分钟 (D年月日时分<十位部分>)
func (*BNHelper) GetBucketStartKey ¶
GetBucketStartKey 获取bucket开始key值 说明: 修改该值,只会影响还未创建的数据bucket,该值需要1打头,该值不可太小,否则会影响表数据顺序
func (*BNHelper) MinBucketName ¶
MinBucketName 当前最小有效bucket名
func (*BNHelper) NextBucketName ¶
NextBucketName 下一个bucket名称
type ChannelDb ¶
type ChannelDb struct { }
ChannelDb 库${channel}.db 操作封装
type Controller ¶
type Controller struct { }
Controller 数据控制入口
func (*Controller) ClearExpireBucket ¶
func (c *Controller) ClearExpireBucket()
ClearExpireBucket 清理过期的bucket数据
func (*Controller) PullGetChannelPositionAll ¶ added in v1.0.2
func (c *Controller) PullGetChannelPositionAll(channelName string) (retBucketName, retBucketKey string, retErr error)
PullGetChannelPositionAll 获取频道存量最老位置 场景: 1、用户拉取数据时,位置确认用
func (*Controller) PullGetChannelPositionNow ¶ added in v1.0.2
func (c *Controller) PullGetChannelPositionNow(channelName string) (retBucketName, retBucketKey string, retErr error)
PullGetChannelPositionNow 获取频道最新位置 场景: 1、用户拉取数据时,位置确认用
func (*Controller) PullGetData ¶
func (c *Controller) PullGetData(userName, channelName string) (retData []string, retBucketName, retBucketKey string, retErr error)
PullGetData 读取一批数据 场景:消费用户连上服务器后,服务器循环调用次函数获取将要发送的数据 注: retBucketKey为"0"时表示切换到新的bucket,但bucket里还没有数据
func (*Controller) PullGetUserChannelPosition ¶ added in v1.0.2
func (c *Controller) PullGetUserChannelPosition(userName, channelName string) (retBucketName, retBucketKey string, retErr error)
PullGetUserChannelPosition 获取用户频道当前位置 场景: 1、用户拉取数据时,位置确认用
func (*Controller) PullSetChannelPosition ¶
func (c *Controller) PullSetChannelPosition(userName, channelName, bucketName, bucketKey string) error
PullSetChannelPosition 设置用户拉取频道数据位置 场景: 1、用户拉取数据时,指定要求刷新拉取位置时,将调用此函数刷新位置 2、另外系统发送完一批数据给用户后,将调用此函数刷新位置
func (*Controller) PushWriteData ¶
func (c *Controller) PushWriteData(channelName string, Data [][]byte) error
PushWriteData 写入一批数据 场景:服务收到用户写入数据时,直接调用这个方法将数据写入boltDb
type SysDb ¶
type SysDb struct { }
SysDb 库sys.db 操作封装
func (*SysDb) Init ¶
func (s *SysDb) Init()
Init 初始化 初始化创建sys.db库,里面创建userChannelPosition和channelPosition两个bucket 场景: 启动时就调用这个函数
func (*SysDb) PullGetChannelPositionAll ¶ added in v1.0.2
func (s *SysDb) PullGetChannelPositionAll(dbHandle *bolt.DB, channelName string) (retBucketName, retBucketKey string, retErr error)
PullGetChannelPositionAll 获取频道最老位置 直接从boltdb里读取频道最老位置 场景: 用户消费数据时,指定了从最老位置读取时
func (*SysDb) PullGetChannelPositionNow ¶ added in v1.0.2
func (s *SysDb) PullGetChannelPositionNow(dbHandle *bolt.DB, channelName string) (retBucketName, retBucketKey string, retErr error)
PullGetChannelPositionNow 获取频道当前位置 直接从boltdb里读取频道最新数据位置 场景: 用户消费数据时,指定了从最新位置读取时,将调用次函数
func (*SysDb) PullGetUserPosition ¶
func (s *SysDb) PullGetUserPosition(dbHandle *bolt.DB, userName, channelName string) (retBucketName, retBucketKey string, retErr error)
PullGetUserPosition 从boltdb里读取记录的用户的channel已读取位置(bucketKey为0表示新bucket里还没有读取任何数据) 场景: 拉取用户数据,每次取数据时需要先读取已读数据位置
func (*SysDb) PullWriteUserPosition ¶
func (s *SysDb) PullWriteUserPosition(dbHandle *bolt.DB, userName, channelName, bucketName, bucketKey string) (retErr error)
PullWriteUserPosition 写入用户当前用户的bucketName和bucketKey的值 直接修改boltdb数据表数据 场景: 数据成功发送给用户后,调用这个接口刷新数据发送位置
func (*SysDb) PushRefreshChannelPosition ¶
func (s *SysDb) PushRefreshChannelPosition(channelName, bucketName, bucketKey string) (retErr error)
PushRefreshChannelPosition 写入数据后刷新频道的bucket集合 和 频道当前激活的bucket位置 直接写入boltdb 场景: 频道数据写入完成后,将调用这个函数刷新频道信息