redishelper

package
v0.0.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 30, 2020 License: MIT Imports: 8 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var Done = errors.New("done")

Done 锁的结束信号

View Source
var ErrCountMustBePositive = errors.New("Count Must Be Positive")

ErrCountMustBePositive 个数参数必须为正数

View Source
var ErrElementNotExist = errors.New("Element Not Exist")

ErrElementNotExist 个数参数必须为正数

View Source
var ErrGroupNotInTopic = errors.New("Group Not In Topic")

ErrGroupNotInTopic 消费组不在topic上

View Source
var ErrHelperAlreadyInited = errors.New("proxy already inited yet")

ErrHelperAlreadyInited 代理已经初始化错误

View Source
var ErrHelperNotInited = errors.New("proxy not inited yet")

ErrHelperNotInited 代理未初始化错误

View Source
var ErrIsAlreadySubscribed = errors.New("Is Already Subscribed")

ErrIsAlreadySubscribed 消费者已经订阅了数据

View Source
var ErrIsAlreadyUnSubscribed = errors.New("Is Already UnSubscribed")

ErrIsAlreadyUnSubscribed 消费者已经取消订阅了数据

View Source
var ErrKeyNotExist = errors.New("key not exist")

ErrKeyNotExist 键不存在

View Source
var ErrLockAlreadySet = errors.New("Lock is already setted")

ErrLockAlreadySet 分布式锁已经设置

View Source
var ErrLockWaitTimeout = errors.New("wait lock timeout")

ErrLockWaitTimeout 等待解锁超时

View Source
var ErrPubSubNotSubscribe = errors.New("PubSub Not Subscribe")

ErrPubSubNotSubscribe Pubsub模式订阅者未订阅

View Source
var ErrQueueGetNotMatch = errors.New("Queue Get NotMatch")

ErrQueueGetNotMatch Pubsub模式订阅者未订阅

View Source
var ErrRankError = errors.New("Rank Error")

ErrRankError 排名信息有问题

View Source
var ErrStreamConsumerNotBlocked = errors.New("Stream Consumer Must Be Blocked")

ErrStreamConsumerNotBlocked 消费者不是用阻塞模式监听

View Source
var Helper = New()

Helper 默认的pg代理对象

Functions

func New

func New() *redisHelper

New 创建一个新的数据库客户端代理

Types

type NameSpcaeKey

type NameSpcaeKey struct {
	// contains filtered or unexported fields
}

NameSpcaeKey 带命名空间的键

func NewNameSpaceKey

func NewNameSpaceKey(namespace string) *NameSpcaeKey

NewNameSpaceKey 创建一个带命名空间的key

func (*NameSpcaeKey) AddSubNamespace

func (nsk *NameSpcaeKey) AddSubNamespace(namespace string) *NameSpcaeKey

AddSubNamespace 在原来的命名空间基础上加一级子命名空间创建一个新的命名空间

func (*NameSpcaeKey) Key

func (nsk *NameSpcaeKey) Key(key string) string

Key 在命名空间基础上创建一个key

type PubSubTopic

type PubSubTopic struct {
	Name string
	// contains filtered or unexported fields
}

PubSubTopic 流主题

func NewPubSubTopic

func NewPubSubTopic(proxy *redisHelper, name string) *PubSubTopic

NewPubSubTopic 新建一个PubSub主题

func (*PubSubTopic) Close

func (topic *PubSubTopic) Close() error

Close 关闭监听

func (*PubSubTopic) Publish

func (topic *PubSubTopic) Publish(value interface{}) (int64, error)

Publish 向发布订阅主题发送消息

func (*PubSubTopic) Subscribe

func (topic *PubSubTopic) Subscribe(ctx context.Context) (<-chan *redis.Message, error)

func (*PubSubTopic) UnSubscribe

func (topic *PubSubTopic) UnSubscribe() error

UnSubscribe 取消订阅

type Queue

type Queue struct {
	Name string
	// contains filtered or unexported fields
}

Queue 消息队列

func NewQueue

func NewQueue(proxy *redisHelper, name string) *Queue

NewQueue 新建一个PubSub主题

func (*Queue) Get

func (q *Queue) Get(timeout time.Duration) (*QueueMessage, error)

Get 从队列中取出数据,timeout为0则表示一直阻塞直到有数据

func (*Queue) GetNoWait

func (q *Queue) GetNoWait() (*QueueMessage, error)

GetNoWait 从队列中取出数据,timeout为0则表示一直阻塞直到有数据

func (*Queue) Len

func (q *Queue) Len() (int64, error)

Len 查看当前队列长度

func (*Queue) Put

func (q *Queue) Put(values ...interface{}) (int64, error)

Put 向队列中放入数据

type QueueMessage

type QueueMessage struct {
	Topic   string
	Payload string
}

QueueMessage 从队列中获取的消息

type StreamTopic

type StreamTopic struct {
	Name   string
	MaxLen int64
	Strict bool //maxlen with ~
	// contains filtered or unexported fields
}

StreamTopic 流主题

func NewStreamTopic

func NewStreamTopic(proxy *redisHelper, name string, maxlen int64, strict bool) *StreamTopic

NewStreamTopic 新建一个流主题

func (*StreamTopic) CreateGroup

func (topic *StreamTopic) CreateGroup(groupname, start string) (string, error)

CreateGroup 为指定消费者在指定的topic上创建消费者组

func (*StreamTopic) Delete

func (topic *StreamTopic) Delete(ids ...string) error

Delete 设置标志位标识删除主题流中指定id的数据

func (*StreamTopic) DeleteConsumer

func (topic *StreamTopic) DeleteConsumer(groupname string, consumername string) (int64, error)

DeleteConsumer 在指定的topic上删除指定消费者组中的指定消费者

func (*StreamTopic) DeleteGroup

func (topic *StreamTopic) DeleteGroup(groupname string) (int64, error)

DeleteGroup 为指定消费者在指定的topic上删除消费者组

func (*StreamTopic) GroupInfos

func (topic *StreamTopic) GroupInfos() ([]redis.XInfoGroups, error)

GroupInfos 获取主题流中注册的消费者组信息

func (*StreamTopic) HasGroup

func (topic *StreamTopic) HasGroup(groupname string) (bool, error)

HasGroup 判断消费者组是否在topic上

func (*StreamTopic) HasGroups

func (topic *StreamTopic) HasGroups(groupnames []string) (bool, error)

HasGroups 判断消费者组是否都在在topic上

func (*StreamTopic) Len

func (topic *StreamTopic) Len() (int64, error)

Len 查看主题流的长度

func (*StreamTopic) Move

func (topic *StreamTopic) Move(groupname string, toconsumer string, minIdle time.Duration, ids ...string) ([]redis.XMessage, error)

Move 查看消费组中等待确认的消息列表

func (*StreamTopic) Pending

func (topic *StreamTopic) Pending(groupname string) (*redis.XPending, error)

Pending 查看消费组中等待确认的消息列表

func (*StreamTopic) Publish

func (topic *StreamTopic) Publish(value map[string]interface{}) (string, error)

Publish 向主题流发送消息

func (*StreamTopic) Range

func (topic *StreamTopic) Range(start, stop string) ([]redis.XMessage, error)

Range 获取消息列表,会自动过滤已经删除的消息,注意-表示最小值, +表示最大值

func (*StreamTopic) SetGroupID

func (topic *StreamTopic) SetGroupID(groupname string, start string) (string, error)

SetGroupID 设置指定消费者组在主题流中的读取起始位置

func (*StreamTopic) Trim

func (topic *StreamTopic) Trim(count int64) (int64, error)

Trim 为主题扩容

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL