ppmq

package
v0.0.0-...-1d10c94 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 23, 2020 License: GPL-2.0 Imports: 19 Imported by: 0

Documentation

Index

Constants

View Source
const DefaultBatchSize = 50

DefaultBatchSize to use if not specified in config

Variables

App singleton representing the application config.

View Source
var ErrNoSuchMessage = errors.New("no such message")

ErrNoSuchMessage is returned when underlying storage engine cannot find message with given MessageKey

View Source
var ErrQueueEmpty = errors.New("queue is empty")

ErrQueueEmpty is returned when Topic has no messages / is empty

Functions

func InitSenders

func InitSenders()

InitSenders initializes channel & goroutine for each subscriber Each subscriber gets its own goroutine to prevent single subscriber blocking send for everyone else

func InitTimoutAck

func InitTimoutAck(done <-chan bool)

InitTimeoutAck initializes goroutine that checks for Ack timeouts

func Listener

func Listener() error

Listener listens on a server socket configured in App.Conf.ServerSocket for commands, messages & ack

func Send

func Send(cqName string, msgs MQWrapper)

Send messages to subscriber

Types

type AckPair

type AckPair struct {
	Key     MessageKey
	Expires time.Time
}

AckPair holds MessageKey & expiration time for Ack

type AckStorageEngine

type AckStorageEngine interface {
	// LoadAck loads slice of AckPair for given subscriber
	LoadAck(cqName string) ([]AckPair, error)
	// SaveAck saves slice of AckPair for given subscriber
	SaveAck(cqName string, waiting []AckPair) error
}

AckStorageEngine interface provides a way to load & store AckPair

type AckWrapper

type AckWrapper struct {
	Ack []AckPair
}

AckWrapper is used so we can use Gob encoder to store slice of AckPairs

type Application

type Application struct {
	Clock clock.Clock
	Conf  Config
	Test  bool
}

Application structure holds app Config, Clock (so that we can set it to test clock on tests) as well as Test boolean flag to represent test runs

type Config

type Config struct {
	ServerSocket string         `yaml:"ServerSocket"` // socket for MQ app, to send commands, messages & ack
	DataPath     string         `yaml:"DataPath"`     // parent directory to store messages/queue/ack for each message topic
	Subscribers  []Subscriber   `yaml:"Subscribers"`  // list of subscribers
	Topics       []MessageTopic `yaml:"Topics"`       // list of message topics
	// contains filtered or unexported fields
}

Config for the ppMQ

func LoadConfig

func LoadConfig(filename string) (config Config, err error)

LoadConfig is exported method that loads config from YAML file and initializes all underling structures

func (*Config) Init

func (conf *Config) Init() error

Init configuration includes initializing MessageTopic

type DQueue

type DQueue struct {
	// contains filtered or unexported fields
}

Dqueue implements persistent queue for storing message keys that needs to be served to subscribers it is based on a fork of https://github.com/joncrlsn/dque

func (*DQueue) Abort

func (D *DQueue) Abort() error

Abort does nothing in this implementation, reserved for the future

func (*DQueue) Close

func (D *DQueue) Close() error

func (*DQueue) Init

func (D *DQueue) Init() error

Init initialized DQue. This includes creating necessary directories if they don't exist It will open existing DQue if it is already there, or create new one if not.

func (*DQueue) IsEmpty

func (D *DQueue) IsEmpty() (bool, error)

func (*DQueue) Peek

func (D *DQueue) Peek() (MessageKey, error)

func (*DQueue) Pop

func (D *DQueue) Pop() (MessageKey, error)

func (*DQueue) Prepend

func (D *DQueue) Prepend(keys []MessageKey) error

func (*DQueue) Push

func (D *DQueue) Push(key MessageKey) error

func (*DQueue) Sync

func (D *DQueue) Sync() error

type EngineImpl

type EngineImpl struct {
	Topic    MessageTopic                // Topic for which we are listening
	Messages PPMessageStore              // Messages is a store implementation for messages
	SQ       map[string]*SubscriberQueue // SQ is a map of subscriber name and SubscriberQueue pairs
	// contains filtered or unexported fields
}

EngineImpl is storage agnostic, message queue implementation that lacks synchronization It uses MessageTopic, PPMessageStore & AckStorageEngine implementations to persist message queue.

func (*EngineImpl) Ack

func (engine *EngineImpl) Ack(cqName string, keys []MessageKey) error

Ack processes the fact that list of message keys for subscriber had been acknowledged

func (*EngineImpl) Add

func (engine *EngineImpl) Add(msgs []Message) error

Add slice of messages to the Topic

func (*EngineImpl) Close

func (engine *EngineImpl) Close() error

Close closes underlying persistent elements. If any errors are thrown, returns last one

func (*EngineImpl) Expire

func (engine *EngineImpl) Expire(keys []MessageKey) error

Check for old messages

func (*EngineImpl) GetNamesOfSubscribers

func (engine *EngineImpl) GetNamesOfSubscribers() []string

GetNamesOfSubscribers returns a slice of subscribers names

func (*EngineImpl) GetSubscriber

func (engine *EngineImpl) GetSubscriber(cqName string) (*SubscriberQueue, error)

GetSubscriber returns subscriber queue given subscriber name

func (*EngineImpl) Init

func (engine *EngineImpl) Init(mq MessageTopic, factory StorageFactory) error

Init initializes Topic including loading or creating any underlying persistent elements

func (*EngineImpl) IsAvailable

func (engine *EngineImpl) IsAvailable(sName string) (bool, error)

IsAvailable returns true if any messages are available to be served for sName subscriber to any of the subscribers

func (*EngineImpl) NextBatch

func (engine *EngineImpl) NextBatch(cqName string, count int) ([]Message, error)

NextBatch retrieves up to count messages from the Subscriber's queue if available

func (*EngineImpl) ResetAck

func (engine *EngineImpl) ResetAck(cqName string) error

ResetAck is used on restarts, to reset all Ack so that messages would be re-delivered

func (*EngineImpl) TestState

func (engine *EngineImpl) TestState(cqName string, queue []MessageKey, ack []AckPair) error

TestState is used during testing process to test final state of the queue for a given subscriber It should be assumed that the method is destructive, and the queue/MessageTopic will be in invalid state afterwards. Used specifically for testing purposes.

func (*EngineImpl) TimeoutAck

func (engine *EngineImpl) TimeoutAck() error

TimeoutAck deals with the case when we didn't receive Ack for message in time. In ack timed out, we put message in front of the queue for the receiver to redeliver it

type LdbMessageStore

type LdbMessageStore struct {
	// contains filtered or unexported fields
}

LdbMessageStore uses LevelDB (levigo implementation) to store messages. One instance of LevelDB per Topic

func (*LdbMessageStore) Abort

func (lms *LdbMessageStore) Abort() error

func (*LdbMessageStore) Begin

func (lms *LdbMessageStore) Begin() error

Begin starts a new batch, closes previous batch if it is still open

func (*LdbMessageStore) Close

func (lms *LdbMessageStore) Close() error

func (*LdbMessageStore) Delete

func (lms *LdbMessageStore) Delete(key MessageKey) error

func (*LdbMessageStore) Exists

func (lms *LdbMessageStore) Exists(key MessageKey) (bool, error)

func (*LdbMessageStore) Get

func (*LdbMessageStore) GetMeta

func (lms *LdbMessageStore) GetMeta(key MessageKey) (MessageMeta, error)

func (*LdbMessageStore) Init

func (lms *LdbMessageStore) Init() error

func (*LdbMessageStore) LoadAck

func (lms *LdbMessageStore) LoadAck(cqName string) ([]AckPair, error)

func (*LdbMessageStore) Put

func (lms *LdbMessageStore) Put(m Message, meta MessageMeta) error

func (*LdbMessageStore) SaveAck

func (lms *LdbMessageStore) SaveAck(cqName string, ack []AckPair) error

func (*LdbMessageStore) Sync

func (lms *LdbMessageStore) Sync() error

func (*LdbMessageStore) UpdateMeta

func (lms *LdbMessageStore) UpdateMeta(key MessageKey, meta MessageMeta) error

type MQWrapper

type MQWrapper struct {
	// Topic name of the MessageTopic for which message is sent
	Topic string `json:"topic,omitempty"`
	// Subscriber name of the subscriber sending or receiving
	Subscriber string `json:"subscriber,omitempty"`
	// Messages slice of Message (used to send to Subscriber)
	Messages []Message `json:"messages,omitempty"`
	// Acks slice of MessageKey received from Subscriber to acknowledge processing
	Acks []MessageKey `json:"acks,omitempty"`
	// Command is sent by Subscriber, right now only on reconnect/restart of the Subscriber to reset Acks
	Command string `json:"command,omitempty""`
}

MQWrapper used to communicate between subscriber & ppMQ

type MemoryFactory

type MemoryFactory struct {
	// contains filtered or unexported fields
}

MemoryFactory is a storage factory that returns non-persistent Interfaces for handing Message, Ack & Topic storage. It was used for initial development and debugging.

func (*MemoryFactory) AckStore

func (pf *MemoryFactory) AckStore(_ string) (AckStorageEngine, error)

func (*MemoryFactory) MessageQueue

func (pf *MemoryFactory) MessageQueue(_, _ string) (PPQueue, error)

func (*MemoryFactory) MessageStore

func (pf *MemoryFactory) MessageStore(_ string) (PPMessageStore, error)

type MemoryMessageStore

type MemoryMessageStore struct {
	Messages map[MessageKey]Message
	Meta     map[MessageKey]MessageMeta
}

MemoryMessageStore is non-persistent implementation of Message & Ack store used for debugging & dev purposes.

func (*MemoryMessageStore) Abort

func (mms *MemoryMessageStore) Abort() error

func (*MemoryMessageStore) Close

func (mms *MemoryMessageStore) Close() error

func (*MemoryMessageStore) Delete

func (mms *MemoryMessageStore) Delete(key MessageKey) error

func (*MemoryMessageStore) Exists

func (mms *MemoryMessageStore) Exists(key MessageKey) (bool, error)

func (*MemoryMessageStore) Get

func (*MemoryMessageStore) GetMeta

func (mms *MemoryMessageStore) GetMeta(key MessageKey) (MessageMeta, error)

func (*MemoryMessageStore) Init

func (mms *MemoryMessageStore) Init() error

func (*MemoryMessageStore) LoadAck

func (mms *MemoryMessageStore) LoadAck(cqName string) ([]AckPair, error)

func (*MemoryMessageStore) Put

func (mms *MemoryMessageStore) Put(m Message, meta MessageMeta) error

func (*MemoryMessageStore) SaveAck

func (mms *MemoryMessageStore) SaveAck(cqName string, _ []AckPair) error

func (*MemoryMessageStore) Sync

func (mms *MemoryMessageStore) Sync() error

func (*MemoryMessageStore) UpdateMeta

func (mms *MemoryMessageStore) UpdateMeta(key MessageKey, meta MessageMeta) error

type MemoryQueue

type MemoryQueue struct {
	Queue []MessageKey
}

MemoryQueue is non-persistent, memory only implementation of StorageEngine for the queue

func (*MemoryQueue) Abort

func (mq *MemoryQueue) Abort() error

func (*MemoryQueue) Close

func (mq *MemoryQueue) Close() error

func (*MemoryQueue) Init

func (mq *MemoryQueue) Init() error

func (*MemoryQueue) IsEmpty

func (mq *MemoryQueue) IsEmpty() (bool, error)

func (*MemoryQueue) Peek

func (mq *MemoryQueue) Peek() (MessageKey, error)

func (*MemoryQueue) Pop

func (mq *MemoryQueue) Pop() (MessageKey, error)

func (*MemoryQueue) Prepend

func (mq *MemoryQueue) Prepend(keys []MessageKey) error

func (*MemoryQueue) Push

func (mq *MemoryQueue) Push(key MessageKey) error

func (*MemoryQueue) Sync

func (mq *MemoryQueue) Sync() error

type Message

type Message struct {
	// MessageKey used for dedupe, ignored if dedupe in MessageTopic  is disabled
	Key MessageKey `json:",omitempty"`

	// MessagePayload the actual payload
	Payload MessagePayload

	// Expires time when message becomes expired, and should be thrown out, can be empty to represent messages
	// that don't expire
	Expires time.Time `json:",omitempty"`

	// Priority defines message priority, starts with 0 (highest priority)
	Priority int `json:",omitempty"`
}

Message struct the basic structure used to send / receive & store messages

func (Message) ToBytes

func (m Message) ToBytes() ([]byte, error)

ToBytes convert message to []byte representation

type MessageKey

type MessageKey string

MessageKey aliases key type for messages

func (MessageKey) ToBytesId

func (mk MessageKey) ToBytesId() []byte

ToBytesId returns []byte representation of MessageKey for message id

func (MessageKey) ToBytesMeta

func (mk MessageKey) ToBytesMeta() []byte

ToBytesMeta returns []byte representation of MessageKey for meta

type MessageMeta

type MessageMeta struct {
	AckCount int
	Sent     map[string]bool
}

MessageMeta is stored by StorageEngine for each message to track if message has been sent for particular subscriber, and how many acknowledgements were received. This is needed so we can safely remove message once it was delivered & acknowledged by each subscriber.

func (MessageMeta) ToBytes

func (m MessageMeta) ToBytes() ([]byte, error)

ToBytes encodes MessageMeta to []byte representation

type MessagePayload

type MessagePayload string

MessagePayload alias for payload sent through the message

type MessageTopic

type MessageTopic struct {
	Name     string        `yaml:"Name"`               // name of the topic
	AckRetry time.Duration `yaml:"AckRetry,omitempty"` //retry resending message if Ack was received in
	// AckRetry interval, 0 if retry disabled
	Dedupe bool `yaml:"Dedupe,omitempty"` // if true, messages will be dedupe based on MessageKey,
	// latest will be always used instead of old message
	Priorities int      `yaml:"Priorities,omitempty"` // 1 or more, if 1 - no priorities...
	Expire     bool     `yaml:"Expire,omitempty"`     // True if message can expire
	Subscriber []string `yaml:"Subscribers"`          // List of subscribers
	// contains filtered or unexported fields
}

A MessageTopic keeps the configuration on the message topic as well as storage engine with all the topic information We expect that it will be configured and loaded from yaml file.

func (*MessageTopic) Ack

func (mq *MessageTopic) Ack(cqName string, ids []MessageKey) error

Ack messages for subscriber. The call will trigger SendToSubscriber for the given subscriber

func (*MessageTopic) Init

func (mq *MessageTopic) Init(factory StorageFactory) error

Init MessageTopic initializing underlying storage engine based on StorageFactory passed

func (*MessageTopic) Publish

func (mq *MessageTopic) Publish(msgs []Message) error

Publish messages to the MessageTopic. The call will add messages to the queue and trigger SendToSubscribers call

func (*MessageTopic) Reconnect

func (mq *MessageTopic) Reconnect(cqName string) error

Reconnect acknowledges MessageTopic that subscriber has reconnected, and that whatever messages waiting in Ack should be resent to subscriber

func (*MessageTopic) SendToSubscriber

func (mq *MessageTopic) SendToSubscriber(cqName string)

SendToSubscriber sends messages if available to subscriber

func (*MessageTopic) SendToSubscribers

func (mq *MessageTopic) SendToSubscribers()

SendToSubscribers sends messages to all subscribers for give MessageTopic

func (*MessageTopic) TimeoutAck

func (mq *MessageTopic) TimeoutAck() error

TimeoutAck checks if any of the Ack in the MessageTopic has expired and corresponding messages should be resent to subscribers. This method will be called automatically once a minute

type PPMessageStore

type PPMessageStore interface {
	// Init message store engine
	Init() error
	// Close message store engine, safely persisting all messages if needed
	Close() error
	// Get Message and MessageMeta for given MessageKey. Returns ErrNoSuchMessage if doesn't exist
	Get(key MessageKey) (Message, MessageMeta, error)
	// GetMeta returns MessageMeta for given MessageKey, Returns ErrNoSuchMessage if doesn't exist
	GetMeta(key MessageKey) (MessageMeta, error)
	// Put persists Message & MessageMeta
	Put(m Message, meta MessageMeta) error
	// UpdateMeta updates MessageMeta
	UpdateMeta(key MessageKey, meta MessageMeta) error
	// Delete removes Message & MessageMeta for given key from the storage
	Delete(key MessageKey) error
	// Exists checks if Message exists
	Exists(key MessageKey) (bool, error)
	// Sync saves all changes to disk
	Sync() error
	// Abort tries to safely abort the operation. Not implemented at this moment.
	Abort() error
}

PPMessageStore is used to persist messages

type PPQueue

type PPQueue interface {
	// Init underlying persistence layer
	Init() error
	// Push message key into the queue
	Push(key MessageKey) error
	// Prepend slice of message keys into the queue. Used on ack reset & timeout, when we want to resend some messages
	Prepend(keys []MessageKey) error
	// Pop message from the queue
	Pop() (MessageKey, error)
	// Peek check first available message on the queue
	Peek() (MessageKey, error)
	// IsEmpty returns true if queue is empty
	IsEmpty() (bool, error)
	// Sync saves queue state
	Sync() error
	// Abort attempts to abort the changes to the queue before saving (might not be implemented)
	Abort() error
	// Closes the queue, saving all unsaved data.
	Close() error
}

PPQueue interface for persisting queue for subscriber

type PersistentFactory

type PersistentFactory struct {
	Path string // location where data should be stored for given MessageTopic
	// contains filtered or unexported fields
}

PersistentFactory is a storage factory for MessageTopic based on LevelDB & dque implementation

func (*PersistentFactory) AckStore

func (pf *PersistentFactory) AckStore(topicName string) (AckStorageEngine, error)

func (*PersistentFactory) MessageQueue

func (pf *PersistentFactory) MessageQueue(topicName, subscriberName string) (PPQueue, error)

func (*PersistentFactory) MessageStore

func (pf *PersistentFactory) MessageStore(topicName string) (PPMessageStore, error)

type StorageEngine

type StorageEngine interface {
	// Init initializes StorageEngine for given MessageTopic, using particular StorageFactory
	Init(mq MessageTopic, factory StorageFactory) error
	// Closes all underlying persistence engines, making sure that data is saved (if supported)
	Close() error
	// Checks if the are available messages for given subscriber that can be served now
	IsAvailable(cqName string) (bool, error)
	// Retrieves up to count messages for given subscriber
	NextBatch(cqName string, count int) ([]Message, error)
	// Returns slice of subscribers (names) for given StorageEngine
	GetNamesOfSubscribers() []string
	// Add slice of messages to the MessageTopic
	Add(msgs []Message) error
	// Ack slice of messages for given subscriber
	Ack(cqName string, keys []MessageKey) error
	// ResetAck resets unacknowledged messages for given subscriber
	ResetAck(cqName string) error
	// TimeoutAck resets expired ack for all subscribers for MessageTopic
	TimeoutAck() error
	// TestState is used during testing process to test final state of the queue for a given subscriber
	// It should be assumed that the method is destructive, and the queue/MessageTopic will be in invalid state
	// afterwards. Used specifically for testing purposes.
	TestState(cqName string, queue []MessageKey, ack []AckPair) error
}

StorageEngine interface is needed so that we can have multiple implementations of storage models without reimplementing MessageTopic

type StorageFactory

type StorageFactory interface {
	// MessageStore returns persistence engine for messages
	MessageStore(topicName string) (PPMessageStore, error)
	// MessageQueue returns persistence engine for queue
	MessageQueue(topicName, subscriberName string) (PPQueue, error)
	// AckStore returns persistence engine for Ack
	AckStore(topicName string) (AckStorageEngine, error)
}

StorageFactory interface is used to initialize various persistent strategies without re-implementing overall storage engine functionality

type Subscriber

type Subscriber struct {
	Id        int    `yaml:",omitempty"`          // subscriber id (generated on init)
	Name      string `yaml:"Name"`                // subscriber name
	BatchSize int    `yaml:"BatchSize,omitempty"` // max of messages to send to subscriber in one go
	Socket    string `yaml:"Socket"`              // socket to which to send the messages
}

A Subscriber keeps the configuration information about subscriber. It is expected that the info will be loaded from yaml file

type SubscriberQueue

type SubscriberQueue struct {
	Name       string           // Name of the subscriber
	Queue      []PPQueue        // Queue is a slice of queues, one per priority, highest starts with 0
	AckWaiting []AckPair        // AckWaiting list ack for message id
	AckEngine  AckStorageEngine // used to store ack pairs
}

SubscriberQueue is storage agnostic subscriber queue used keep track of messages queued and unprocessed Ack

func (*SubscriberQueue) Abort

func (cq *SubscriberQueue) Abort() error

Abort operation done on SubscriberQueue (not implemented in underlying engines)

func (*SubscriberQueue) Ack

func (cq *SubscriberQueue) Ack(key MessageKey) bool

Ack MessageKey

func (*SubscriberQueue) Add

func (cq *SubscriberQueue) Add(key MessageKey, priority int) error

Add MessageKey to SubscriberQueue

func (*SubscriberQueue) Init

func (cq *SubscriberQueue) Init(priorities int, factory StorageFactory, topicName, subscriberName string) error

Init SubscriberQueue including initializing or loading any underlying persistent storage components

func (*SubscriberQueue) IsAvailable

func (cq *SubscriberQueue) IsAvailable() bool

IsAvailable returns true if any messages in this SubscriberQueue are available to be served in the queue Message will not be available if there are no messages in the queue, or if some messages hasn't been Ack yet.

func (*SubscriberQueue) SaveAck

func (cq *SubscriberQueue) SaveAck() error

SaveAck saves ack to the persistent storage

func (*SubscriberQueue) Sync

func (cq *SubscriberQueue) Sync() error

Sync save changes to disk.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL