Documentation ¶
Index ¶
- Variables
- type CGManager
- type CGQueue
- type ConsumerGroup
- func (cg *ConsumerGroup) Delete() error
- func (cg *ConsumerGroup) Flush() error
- func (cg *ConsumerGroup) GetNext() ([]byte, error)
- func (cg *ConsumerGroup) IsEmpty() bool
- func (cg *ConsumerGroup) Length() uint64
- func (cg *ConsumerGroup) Peek() ([]byte, error)
- func (cg *ConsumerGroup) PutBack(value []byte) error
- func (cg *ConsumerGroup) Source() queue.Consumer
- func (cg *ConsumerGroup) Stats() *queue.Stats
Constants ¶
This section is empty.
Variables ¶
var ErrInvalidName = errors.New("cgroup: name is not alphanumeric")
ErrInvalidName is returned when consumer group name is not alphanumeric
Functions ¶
This section is empty.
Types ¶
type CGManager ¶
CGManager represents multiple consumer group manager
func NewCGManager ¶
NewCGManager initializes new consumer group manager
func (*CGManager) ConsumerGroup ¶
func (m *CGManager) ConsumerGroup(name string) (*ConsumerGroup, error)
ConsumerGroup returns queue interface for provided consumer group name
func (*CGManager) ConsumerGroupIterator ¶
func (m *CGManager) ConsumerGroupIterator() <-chan cmap.Tuple
ConsumerGroupIterator iterates through existing consumer groups
func (*CGManager) DeleteConsumerGroup ¶
DeleteConsumerGroup deletes specified consumer group
type CGQueue ¶
type CGQueue struct { sync.Mutex Name string *queue.Queue *CGManager // contains filtered or unexported fields }
CGQueue represents queue with multiple consumer groups
func CGQueueOpen ¶
CGQueueOpen opens a queue with multiple consumer groups
type ConsumerGroup ¶
ConsumerGroup represents a consumer group that reads from a source queue, stores its own cursor position and saves failed reliable reads in order to serve them to other consumers later
func NewConsumerGroup ¶
func NewConsumerGroup(name string, source *queue.Queue, storage *leveldb.DB) (*ConsumerGroup, error)
NewConsumerGroup initializes a consumer group
func (*ConsumerGroup) Delete ¶
func (cg *ConsumerGroup) Delete() error
Delete deletes all the data associated with consumer group
func (*ConsumerGroup) GetNext ¶
func (cg *ConsumerGroup) GetNext() ([]byte, error)
GetNext returns next value for that particular consumer group
func (*ConsumerGroup) IsEmpty ¶
func (cg *ConsumerGroup) IsEmpty() bool
IsEmpty returns false if thereis no more items for this consumer group
func (*ConsumerGroup) Length ¶
func (cg *ConsumerGroup) Length() uint64
Length returns remaining number of items for consumer group
func (*ConsumerGroup) Peek ¶
func (cg *ConsumerGroup) Peek() ([]byte, error)
Peek returns next value without updating the cursor
func (*ConsumerGroup) PutBack ¶
func (cg *ConsumerGroup) PutBack(value []byte) error
PutBack returns failed item back so it can be served to next consumer
func (*ConsumerGroup) Source ¶
func (cg *ConsumerGroup) Source() queue.Consumer
Source returns source queue Consumer interface
func (*ConsumerGroup) Stats ¶
func (cg *ConsumerGroup) Stats() *queue.Stats
Stats returns stats struct