Documentation ¶
Index ¶
- Constants
- Variables
- func InitSenders()
- func InitTimoutAck(done <-chan bool)
- func Listener() error
- func Send(cqName string, msgs MQWrapper)
- type AckPair
- type AckStorageEngine
- type AckWrapper
- type Application
- type Config
- type DQueue
- func (D *DQueue) Abort() error
- func (D *DQueue) Close() error
- func (D *DQueue) Init() error
- func (D *DQueue) IsEmpty() (bool, error)
- func (D *DQueue) Peek() (MessageKey, error)
- func (D *DQueue) Pop() (MessageKey, error)
- func (D *DQueue) Prepend(keys []MessageKey) error
- func (D *DQueue) Push(key MessageKey) error
- func (D *DQueue) Sync() error
- type EngineImpl
- func (engine *EngineImpl) Ack(cqName string, keys []MessageKey) error
- func (engine *EngineImpl) Add(msgs []Message) error
- func (engine *EngineImpl) Close() error
- func (engine *EngineImpl) Expire(keys []MessageKey) error
- func (engine *EngineImpl) GetNamesOfSubscribers() []string
- func (engine *EngineImpl) GetSubscriber(cqName string) (*SubscriberQueue, error)
- func (engine *EngineImpl) Init(mq MessageTopic, factory StorageFactory) error
- func (engine *EngineImpl) IsAvailable(sName string) (bool, error)
- func (engine *EngineImpl) NextBatch(cqName string, count int) ([]Message, error)
- func (engine *EngineImpl) ResetAck(cqName string) error
- func (engine *EngineImpl) TestState(cqName string, queue []MessageKey, ack []AckPair) error
- func (engine *EngineImpl) TimeoutAck() error
- type LdbMessageStore
- func (lms *LdbMessageStore) Abort() error
- func (lms *LdbMessageStore) Begin() error
- func (lms *LdbMessageStore) Close() error
- func (lms *LdbMessageStore) Delete(key MessageKey) error
- func (lms *LdbMessageStore) Exists(key MessageKey) (bool, error)
- func (lms *LdbMessageStore) Get(key MessageKey) (Message, MessageMeta, error)
- func (lms *LdbMessageStore) GetMeta(key MessageKey) (MessageMeta, error)
- func (lms *LdbMessageStore) Init() error
- func (lms *LdbMessageStore) LoadAck(cqName string) ([]AckPair, error)
- func (lms *LdbMessageStore) Put(m Message, meta MessageMeta) error
- func (lms *LdbMessageStore) SaveAck(cqName string, ack []AckPair) error
- func (lms *LdbMessageStore) Sync() error
- func (lms *LdbMessageStore) UpdateMeta(key MessageKey, meta MessageMeta) error
- type MQWrapper
- type MemoryFactory
- type MemoryMessageStore
- func (mms *MemoryMessageStore) Abort() error
- func (mms *MemoryMessageStore) Close() error
- func (mms *MemoryMessageStore) Delete(key MessageKey) error
- func (mms *MemoryMessageStore) Exists(key MessageKey) (bool, error)
- func (mms *MemoryMessageStore) Get(key MessageKey) (Message, MessageMeta, error)
- func (mms *MemoryMessageStore) GetMeta(key MessageKey) (MessageMeta, error)
- func (mms *MemoryMessageStore) Init() error
- func (mms *MemoryMessageStore) LoadAck(cqName string) ([]AckPair, error)
- func (mms *MemoryMessageStore) Put(m Message, meta MessageMeta) error
- func (mms *MemoryMessageStore) SaveAck(cqName string, _ []AckPair) error
- func (mms *MemoryMessageStore) Sync() error
- func (mms *MemoryMessageStore) UpdateMeta(key MessageKey, meta MessageMeta) error
- type MemoryQueue
- func (mq *MemoryQueue) Abort() error
- func (mq *MemoryQueue) Close() error
- func (mq *MemoryQueue) Init() error
- func (mq *MemoryQueue) IsEmpty() (bool, error)
- func (mq *MemoryQueue) Peek() (MessageKey, error)
- func (mq *MemoryQueue) Pop() (MessageKey, error)
- func (mq *MemoryQueue) Prepend(keys []MessageKey) error
- func (mq *MemoryQueue) Push(key MessageKey) error
- func (mq *MemoryQueue) Sync() error
- type Message
- type MessageKey
- type MessageMeta
- type MessagePayload
- type MessageTopic
- func (mq *MessageTopic) Ack(cqName string, ids []MessageKey) error
- func (mq *MessageTopic) Init(factory StorageFactory) error
- func (mq *MessageTopic) Publish(msgs []Message) error
- func (mq *MessageTopic) Reconnect(cqName string) error
- func (mq *MessageTopic) SendToSubscriber(cqName string)
- func (mq *MessageTopic) SendToSubscribers()
- func (mq *MessageTopic) TimeoutAck() error
- type PPMessageStore
- type PPQueue
- type PersistentFactory
- type StorageEngine
- type StorageFactory
- type Subscriber
- type SubscriberQueue
- func (cq *SubscriberQueue) Abort() error
- func (cq *SubscriberQueue) Ack(key MessageKey) bool
- func (cq *SubscriberQueue) Add(key MessageKey, priority int) error
- func (cq *SubscriberQueue) Init(priorities int, factory StorageFactory, topicName, subscriberName string) error
- func (cq *SubscriberQueue) IsAvailable() bool
- func (cq *SubscriberQueue) SaveAck() error
- func (cq *SubscriberQueue) Sync() error
Constants ¶
const DefaultBatchSize = 50
DefaultBatchSize to use if not specified in config
Variables ¶
var App = Application{clock.NewMock(), Config{}, false}
App singleton representing the application config.
var ErrNoSuchMessage = errors.New("no such message")
ErrNoSuchMessage is returned when underlying storage engine cannot find message with given MessageKey
var ErrQueueEmpty = errors.New("queue is empty")
ErrQueueEmpty is returned when Topic has no messages / is empty
Functions ¶
func InitSenders ¶
func InitSenders()
InitSenders initializes channel & goroutine for each subscriber Each subscriber gets its own goroutine to prevent single subscriber blocking send for everyone else
func InitTimoutAck ¶
func InitTimoutAck(done <-chan bool)
InitTimeoutAck initializes goroutine that checks for Ack timeouts
Types ¶
type AckPair ¶
type AckPair struct { Key MessageKey Expires time.Time }
AckPair holds MessageKey & expiration time for Ack
type AckStorageEngine ¶
type AckStorageEngine interface { // LoadAck loads slice of AckPair for given subscriber LoadAck(cqName string) ([]AckPair, error) // SaveAck saves slice of AckPair for given subscriber SaveAck(cqName string, waiting []AckPair) error }
AckStorageEngine interface provides a way to load & store AckPair
type AckWrapper ¶
type AckWrapper struct {
Ack []AckPair
}
AckWrapper is used so we can use Gob encoder to store slice of AckPairs
type Application ¶
Application structure holds app Config, Clock (so that we can set it to test clock on tests) as well as Test boolean flag to represent test runs
type Config ¶
type Config struct { ServerSocket string `yaml:"ServerSocket"` // socket for MQ app, to send commands, messages & ack DataPath string `yaml:"DataPath"` // parent directory to store messages/queue/ack for each message topic Subscribers []Subscriber `yaml:"Subscribers"` // list of subscribers Topics []MessageTopic `yaml:"Topics"` // list of message topics // contains filtered or unexported fields }
Config for the ppMQ
func LoadConfig ¶
LoadConfig is exported method that loads config from YAML file and initializes all underling structures
type DQueue ¶
type DQueue struct {
// contains filtered or unexported fields
}
Dqueue implements persistent queue for storing message keys that needs to be served to subscribers it is based on a fork of https://github.com/joncrlsn/dque
func (*DQueue) Init ¶
Init initialized DQue. This includes creating necessary directories if they don't exist It will open existing DQue if it is already there, or create new one if not.
func (*DQueue) Peek ¶
func (D *DQueue) Peek() (MessageKey, error)
func (*DQueue) Pop ¶
func (D *DQueue) Pop() (MessageKey, error)
func (*DQueue) Prepend ¶
func (D *DQueue) Prepend(keys []MessageKey) error
func (*DQueue) Push ¶
func (D *DQueue) Push(key MessageKey) error
type EngineImpl ¶
type EngineImpl struct { Topic MessageTopic // Topic for which we are listening Messages PPMessageStore // Messages is a store implementation for messages SQ map[string]*SubscriberQueue // SQ is a map of subscriber name and SubscriberQueue pairs // contains filtered or unexported fields }
EngineImpl is storage agnostic, message queue implementation that lacks synchronization It uses MessageTopic, PPMessageStore & AckStorageEngine implementations to persist message queue.
func (*EngineImpl) Ack ¶
func (engine *EngineImpl) Ack(cqName string, keys []MessageKey) error
Ack processes the fact that list of message keys for subscriber had been acknowledged
func (*EngineImpl) Add ¶
func (engine *EngineImpl) Add(msgs []Message) error
Add slice of messages to the Topic
func (*EngineImpl) Close ¶
func (engine *EngineImpl) Close() error
Close closes underlying persistent elements. If any errors are thrown, returns last one
func (*EngineImpl) Expire ¶
func (engine *EngineImpl) Expire(keys []MessageKey) error
Check for old messages
func (*EngineImpl) GetNamesOfSubscribers ¶
func (engine *EngineImpl) GetNamesOfSubscribers() []string
GetNamesOfSubscribers returns a slice of subscribers names
func (*EngineImpl) GetSubscriber ¶
func (engine *EngineImpl) GetSubscriber(cqName string) (*SubscriberQueue, error)
GetSubscriber returns subscriber queue given subscriber name
func (*EngineImpl) Init ¶
func (engine *EngineImpl) Init(mq MessageTopic, factory StorageFactory) error
Init initializes Topic including loading or creating any underlying persistent elements
func (*EngineImpl) IsAvailable ¶
func (engine *EngineImpl) IsAvailable(sName string) (bool, error)
IsAvailable returns true if any messages are available to be served for sName subscriber to any of the subscribers
func (*EngineImpl) NextBatch ¶
func (engine *EngineImpl) NextBatch(cqName string, count int) ([]Message, error)
NextBatch retrieves up to count messages from the Subscriber's queue if available
func (*EngineImpl) ResetAck ¶
func (engine *EngineImpl) ResetAck(cqName string) error
ResetAck is used on restarts, to reset all Ack so that messages would be re-delivered
func (*EngineImpl) TestState ¶
func (engine *EngineImpl) TestState(cqName string, queue []MessageKey, ack []AckPair) error
TestState is used during testing process to test final state of the queue for a given subscriber It should be assumed that the method is destructive, and the queue/MessageTopic will be in invalid state afterwards. Used specifically for testing purposes.
func (*EngineImpl) TimeoutAck ¶
func (engine *EngineImpl) TimeoutAck() error
TimeoutAck deals with the case when we didn't receive Ack for message in time. In ack timed out, we put message in front of the queue for the receiver to redeliver it
type LdbMessageStore ¶
type LdbMessageStore struct {
// contains filtered or unexported fields
}
LdbMessageStore uses LevelDB (levigo implementation) to store messages. One instance of LevelDB per Topic
func (*LdbMessageStore) Abort ¶
func (lms *LdbMessageStore) Abort() error
func (*LdbMessageStore) Begin ¶
func (lms *LdbMessageStore) Begin() error
Begin starts a new batch, closes previous batch if it is still open
func (*LdbMessageStore) Close ¶
func (lms *LdbMessageStore) Close() error
func (*LdbMessageStore) Delete ¶
func (lms *LdbMessageStore) Delete(key MessageKey) error
func (*LdbMessageStore) Exists ¶
func (lms *LdbMessageStore) Exists(key MessageKey) (bool, error)
func (*LdbMessageStore) Get ¶
func (lms *LdbMessageStore) Get(key MessageKey) (Message, MessageMeta, error)
func (*LdbMessageStore) GetMeta ¶
func (lms *LdbMessageStore) GetMeta(key MessageKey) (MessageMeta, error)
func (*LdbMessageStore) Init ¶
func (lms *LdbMessageStore) Init() error
func (*LdbMessageStore) LoadAck ¶
func (lms *LdbMessageStore) LoadAck(cqName string) ([]AckPair, error)
func (*LdbMessageStore) Put ¶
func (lms *LdbMessageStore) Put(m Message, meta MessageMeta) error
func (*LdbMessageStore) SaveAck ¶
func (lms *LdbMessageStore) SaveAck(cqName string, ack []AckPair) error
func (*LdbMessageStore) Sync ¶
func (lms *LdbMessageStore) Sync() error
func (*LdbMessageStore) UpdateMeta ¶
func (lms *LdbMessageStore) UpdateMeta(key MessageKey, meta MessageMeta) error
type MQWrapper ¶
type MQWrapper struct { // Topic name of the MessageTopic for which message is sent Topic string `json:"topic,omitempty"` // Subscriber name of the subscriber sending or receiving Subscriber string `json:"subscriber,omitempty"` // Messages slice of Message (used to send to Subscriber) Messages []Message `json:"messages,omitempty"` // Acks slice of MessageKey received from Subscriber to acknowledge processing Acks []MessageKey `json:"acks,omitempty"` // Command is sent by Subscriber, right now only on reconnect/restart of the Subscriber to reset Acks Command string `json:"command,omitempty""` }
MQWrapper used to communicate between subscriber & ppMQ
type MemoryFactory ¶
type MemoryFactory struct {
// contains filtered or unexported fields
}
MemoryFactory is a storage factory that returns non-persistent Interfaces for handing Message, Ack & Topic storage. It was used for initial development and debugging.
func (*MemoryFactory) AckStore ¶
func (pf *MemoryFactory) AckStore(_ string) (AckStorageEngine, error)
func (*MemoryFactory) MessageQueue ¶
func (pf *MemoryFactory) MessageQueue(_, _ string) (PPQueue, error)
func (*MemoryFactory) MessageStore ¶
func (pf *MemoryFactory) MessageStore(_ string) (PPMessageStore, error)
type MemoryMessageStore ¶
type MemoryMessageStore struct { Messages map[MessageKey]Message Meta map[MessageKey]MessageMeta }
MemoryMessageStore is non-persistent implementation of Message & Ack store used for debugging & dev purposes.
func (*MemoryMessageStore) Abort ¶
func (mms *MemoryMessageStore) Abort() error
func (*MemoryMessageStore) Close ¶
func (mms *MemoryMessageStore) Close() error
func (*MemoryMessageStore) Delete ¶
func (mms *MemoryMessageStore) Delete(key MessageKey) error
func (*MemoryMessageStore) Exists ¶
func (mms *MemoryMessageStore) Exists(key MessageKey) (bool, error)
func (*MemoryMessageStore) Get ¶
func (mms *MemoryMessageStore) Get(key MessageKey) (Message, MessageMeta, error)
func (*MemoryMessageStore) GetMeta ¶
func (mms *MemoryMessageStore) GetMeta(key MessageKey) (MessageMeta, error)
func (*MemoryMessageStore) Init ¶
func (mms *MemoryMessageStore) Init() error
func (*MemoryMessageStore) LoadAck ¶
func (mms *MemoryMessageStore) LoadAck(cqName string) ([]AckPair, error)
func (*MemoryMessageStore) Put ¶
func (mms *MemoryMessageStore) Put(m Message, meta MessageMeta) error
func (*MemoryMessageStore) SaveAck ¶
func (mms *MemoryMessageStore) SaveAck(cqName string, _ []AckPair) error
func (*MemoryMessageStore) Sync ¶
func (mms *MemoryMessageStore) Sync() error
func (*MemoryMessageStore) UpdateMeta ¶
func (mms *MemoryMessageStore) UpdateMeta(key MessageKey, meta MessageMeta) error
type MemoryQueue ¶
type MemoryQueue struct {
Queue []MessageKey
}
MemoryQueue is non-persistent, memory only implementation of StorageEngine for the queue
func (*MemoryQueue) Abort ¶
func (mq *MemoryQueue) Abort() error
func (*MemoryQueue) Close ¶
func (mq *MemoryQueue) Close() error
func (*MemoryQueue) Init ¶
func (mq *MemoryQueue) Init() error
func (*MemoryQueue) IsEmpty ¶
func (mq *MemoryQueue) IsEmpty() (bool, error)
func (*MemoryQueue) Peek ¶
func (mq *MemoryQueue) Peek() (MessageKey, error)
func (*MemoryQueue) Pop ¶
func (mq *MemoryQueue) Pop() (MessageKey, error)
func (*MemoryQueue) Prepend ¶
func (mq *MemoryQueue) Prepend(keys []MessageKey) error
func (*MemoryQueue) Push ¶
func (mq *MemoryQueue) Push(key MessageKey) error
func (*MemoryQueue) Sync ¶
func (mq *MemoryQueue) Sync() error
type Message ¶
type Message struct { // MessageKey used for dedupe, ignored if dedupe in MessageTopic is disabled Key MessageKey `json:",omitempty"` // MessagePayload the actual payload Payload MessagePayload // Expires time when message becomes expired, and should be thrown out, can be empty to represent messages // that don't expire Expires time.Time `json:",omitempty"` // Priority defines message priority, starts with 0 (highest priority) Priority int `json:",omitempty"` }
Message struct the basic structure used to send / receive & store messages
type MessageKey ¶
type MessageKey string
MessageKey aliases key type for messages
func (MessageKey) ToBytesId ¶
func (mk MessageKey) ToBytesId() []byte
ToBytesId returns []byte representation of MessageKey for message id
func (MessageKey) ToBytesMeta ¶
func (mk MessageKey) ToBytesMeta() []byte
ToBytesMeta returns []byte representation of MessageKey for meta
type MessageMeta ¶
MessageMeta is stored by StorageEngine for each message to track if message has been sent for particular subscriber, and how many acknowledgements were received. This is needed so we can safely remove message once it was delivered & acknowledged by each subscriber.
func (MessageMeta) ToBytes ¶
func (m MessageMeta) ToBytes() ([]byte, error)
ToBytes encodes MessageMeta to []byte representation
type MessagePayload ¶
type MessagePayload string
MessagePayload alias for payload sent through the message
type MessageTopic ¶
type MessageTopic struct { Name string `yaml:"Name"` // name of the topic AckRetry time.Duration `yaml:"AckRetry,omitempty"` //retry resending message if Ack was received in // AckRetry interval, 0 if retry disabled Dedupe bool `yaml:"Dedupe,omitempty"` // if true, messages will be dedupe based on MessageKey, // latest will be always used instead of old message Priorities int `yaml:"Priorities,omitempty"` // 1 or more, if 1 - no priorities... Expire bool `yaml:"Expire,omitempty"` // True if message can expire Subscriber []string `yaml:"Subscribers"` // List of subscribers // contains filtered or unexported fields }
A MessageTopic keeps the configuration on the message topic as well as storage engine with all the topic information We expect that it will be configured and loaded from yaml file.
func (*MessageTopic) Ack ¶
func (mq *MessageTopic) Ack(cqName string, ids []MessageKey) error
Ack messages for subscriber. The call will trigger SendToSubscriber for the given subscriber
func (*MessageTopic) Init ¶
func (mq *MessageTopic) Init(factory StorageFactory) error
Init MessageTopic initializing underlying storage engine based on StorageFactory passed
func (*MessageTopic) Publish ¶
func (mq *MessageTopic) Publish(msgs []Message) error
Publish messages to the MessageTopic. The call will add messages to the queue and trigger SendToSubscribers call
func (*MessageTopic) Reconnect ¶
func (mq *MessageTopic) Reconnect(cqName string) error
Reconnect acknowledges MessageTopic that subscriber has reconnected, and that whatever messages waiting in Ack should be resent to subscriber
func (*MessageTopic) SendToSubscriber ¶
func (mq *MessageTopic) SendToSubscriber(cqName string)
SendToSubscriber sends messages if available to subscriber
func (*MessageTopic) SendToSubscribers ¶
func (mq *MessageTopic) SendToSubscribers()
SendToSubscribers sends messages to all subscribers for give MessageTopic
func (*MessageTopic) TimeoutAck ¶
func (mq *MessageTopic) TimeoutAck() error
TimeoutAck checks if any of the Ack in the MessageTopic has expired and corresponding messages should be resent to subscribers. This method will be called automatically once a minute
type PPMessageStore ¶
type PPMessageStore interface { // Init message store engine Init() error // Close message store engine, safely persisting all messages if needed Close() error // Get Message and MessageMeta for given MessageKey. Returns ErrNoSuchMessage if doesn't exist Get(key MessageKey) (Message, MessageMeta, error) // GetMeta returns MessageMeta for given MessageKey, Returns ErrNoSuchMessage if doesn't exist GetMeta(key MessageKey) (MessageMeta, error) // Put persists Message & MessageMeta Put(m Message, meta MessageMeta) error // UpdateMeta updates MessageMeta UpdateMeta(key MessageKey, meta MessageMeta) error // Delete removes Message & MessageMeta for given key from the storage Delete(key MessageKey) error // Exists checks if Message exists Exists(key MessageKey) (bool, error) // Sync saves all changes to disk Sync() error // Abort tries to safely abort the operation. Not implemented at this moment. Abort() error }
PPMessageStore is used to persist messages
type PPQueue ¶
type PPQueue interface { // Init underlying persistence layer Init() error // Push message key into the queue Push(key MessageKey) error // Prepend slice of message keys into the queue. Used on ack reset & timeout, when we want to resend some messages Prepend(keys []MessageKey) error // Pop message from the queue Pop() (MessageKey, error) // Peek check first available message on the queue Peek() (MessageKey, error) // IsEmpty returns true if queue is empty IsEmpty() (bool, error) // Sync saves queue state Sync() error // Abort attempts to abort the changes to the queue before saving (might not be implemented) Abort() error // Closes the queue, saving all unsaved data. Close() error }
PPQueue interface for persisting queue for subscriber
type PersistentFactory ¶
type PersistentFactory struct { Path string // location where data should be stored for given MessageTopic // contains filtered or unexported fields }
PersistentFactory is a storage factory for MessageTopic based on LevelDB & dque implementation
func (*PersistentFactory) AckStore ¶
func (pf *PersistentFactory) AckStore(topicName string) (AckStorageEngine, error)
func (*PersistentFactory) MessageQueue ¶
func (pf *PersistentFactory) MessageQueue(topicName, subscriberName string) (PPQueue, error)
func (*PersistentFactory) MessageStore ¶
func (pf *PersistentFactory) MessageStore(topicName string) (PPMessageStore, error)
type StorageEngine ¶
type StorageEngine interface { // Init initializes StorageEngine for given MessageTopic, using particular StorageFactory Init(mq MessageTopic, factory StorageFactory) error // Closes all underlying persistence engines, making sure that data is saved (if supported) Close() error // Checks if the are available messages for given subscriber that can be served now IsAvailable(cqName string) (bool, error) // Retrieves up to count messages for given subscriber NextBatch(cqName string, count int) ([]Message, error) // Returns slice of subscribers (names) for given StorageEngine GetNamesOfSubscribers() []string // Add slice of messages to the MessageTopic Add(msgs []Message) error // Ack slice of messages for given subscriber Ack(cqName string, keys []MessageKey) error // ResetAck resets unacknowledged messages for given subscriber ResetAck(cqName string) error // TimeoutAck resets expired ack for all subscribers for MessageTopic TimeoutAck() error // TestState is used during testing process to test final state of the queue for a given subscriber // It should be assumed that the method is destructive, and the queue/MessageTopic will be in invalid state // afterwards. Used specifically for testing purposes. TestState(cqName string, queue []MessageKey, ack []AckPair) error }
StorageEngine interface is needed so that we can have multiple implementations of storage models without reimplementing MessageTopic
type StorageFactory ¶
type StorageFactory interface { // MessageStore returns persistence engine for messages MessageStore(topicName string) (PPMessageStore, error) // MessageQueue returns persistence engine for queue MessageQueue(topicName, subscriberName string) (PPQueue, error) // AckStore returns persistence engine for Ack AckStore(topicName string) (AckStorageEngine, error) }
StorageFactory interface is used to initialize various persistent strategies without re-implementing overall storage engine functionality
type Subscriber ¶
type Subscriber struct { Id int `yaml:",omitempty"` // subscriber id (generated on init) Name string `yaml:"Name"` // subscriber name BatchSize int `yaml:"BatchSize,omitempty"` // max of messages to send to subscriber in one go Socket string `yaml:"Socket"` // socket to which to send the messages }
A Subscriber keeps the configuration information about subscriber. It is expected that the info will be loaded from yaml file
type SubscriberQueue ¶
type SubscriberQueue struct { Name string // Name of the subscriber Queue []PPQueue // Queue is a slice of queues, one per priority, highest starts with 0 AckWaiting []AckPair // AckWaiting list ack for message id AckEngine AckStorageEngine // used to store ack pairs }
SubscriberQueue is storage agnostic subscriber queue used keep track of messages queued and unprocessed Ack
func (*SubscriberQueue) Abort ¶
func (cq *SubscriberQueue) Abort() error
Abort operation done on SubscriberQueue (not implemented in underlying engines)
func (*SubscriberQueue) Add ¶
func (cq *SubscriberQueue) Add(key MessageKey, priority int) error
Add MessageKey to SubscriberQueue
func (*SubscriberQueue) Init ¶
func (cq *SubscriberQueue) Init(priorities int, factory StorageFactory, topicName, subscriberName string) error
Init SubscriberQueue including initializing or loading any underlying persistent storage components
func (*SubscriberQueue) IsAvailable ¶
func (cq *SubscriberQueue) IsAvailable() bool
IsAvailable returns true if any messages in this SubscriberQueue are available to be served in the queue Message will not be available if there are no messages in the queue, or if some messages hasn't been Ack yet.
func (*SubscriberQueue) SaveAck ¶
func (cq *SubscriberQueue) SaveAck() error
SaveAck saves ack to the persistent storage