messagebuffer

package
v0.4.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 2, 2023 License: Apache-2.0 Imports: 4 Imported by: 0

Documentation

Overview

Package messagebuffer implements a backlog for messages that have been received but cannot yet be processed. Any protobuf-serializable message can be stored, and it is the calling code's responsibility to keep track of the actual types of the messages it stores and retrieves.

On reception of a message that the node is not yet ready to process (e.g., in the ISS protocol, a message from a future epoch received from another node that already transitioned to that epoch), the message is stored in a buffer for later processing (e.g., when this node also transitions to that epoch). The buffer has a maximal capacity, after which it starts evicting the oldest messages when storing new ones, such that the capacity constraint is never exceeded. Effectively, the most recently received messages that together do not exceed the capacity are stored.

The buffer can be iterated over, selecting the messages that can be stored or safely ignored.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewBuffers

func NewBuffers(nodeIDs []t.NodeID, totalCapacity int, logger logging.Logger) map[t.NodeID]*MessageBuffer

NewBuffers returns multiple buffers, one for each node listed in nodeIDs. The total capacity is divided equally among all buffers, i.e., each buffer's capacity is totalCapacity/len(nodeIDs) (using integer division, thus the resulting capacities might sum up to less than totalCapacity). In the current implementation, only the payload of the stored message is counted towards the capacity, disregarding the overhead of the buffer implementation itself. The returned buffers are stored in a map, indexed by node IDs.

Types

type Applicable

type Applicable int

Applicable is an enum-style type representing the status of a message stored in the message buffer. When iterating over the buffer, the iterator must be provided a function that returns a value of this type. The iterator then applies this function to each message and decides, based on the return value, what to do with the message.

const (

	// Past message, usually outdated, no need to process it at all.
	// Iterator action: Delete without processing.
	Past Applicable = iota

	// Current message, should be applied now.
	// Iterator action: apply message and remove it from the buffer.
	Current

	// Future message, cannot be applied yet, but might need to be applied in the future.
	// Iterator action: Keep message in buffer, but do not process it.
	Future

	// Invalid message, cannot be processed.
	// Iterator action: Delete without processing.
	Invalid
)

type MessageBuffer

type MessageBuffer struct {
	// contains filtered or unexported fields
}

MessageBuffer represents a message buffer, buffering messages from a single node.

func New

func New(nodeID t.NodeID, capacity int, logger logging.Logger) *MessageBuffer

New returns a newly allocated and initialized MessageBuffer for the given nodeID and with the given initial capacity.

func (*MessageBuffer) Capacity

func (mb *MessageBuffer) Capacity() int

func (*MessageBuffer) Iterate

func (mb *MessageBuffer) Iterate(
	filter func(source t.NodeID, msg proto.Message) Applicable,
	apply func(source t.NodeID, msg proto.Message),
)

Iterate iterates over all messages in the MessageBuffer and applies a and removes selected ones, according the provided filter function. Iterate takes two function arguments: (1) filter is applied to every message in the buffer and performs the following actions based on its output (see description of the Applicable type):

  • Past: Remove message from the buffer.
  • Current: Remove message from the buffer and call apply with the message and its sender as arguments.
  • Future: Do nothing.
  • Invalid: Remove message from the buffer.

(2) apply is called with every message for which filter returns the Current value.

func (*MessageBuffer) Resize

func (mb *MessageBuffer) Resize(newCapacity int)

Resize changes the capacity of the MessageBuffer to newCapacity. If newCapacity is smaller than the current capacity, Resize removes as many least recently added messages as is necessary for the buffer size not to exceed the new capacity. E.g., if the most recently added message is larger than newCapacity, the buffer will be empty after Resize returns.

func (*MessageBuffer) Store

func (mb *MessageBuffer) Store(msg proto.Message) bool

Store stores a given message in the MessageBuffer, if capacity allows it. Returns true if the message has been successfully stored, false otherwise. If msg is larger than the buffer capacity, the message is not stored and the contents of the buffer is left untouched. Otherwise, as many least recently added messages are removed from the buffer as is necessary for storing msg. Note that this implies that there is no guarantee that msg will remain in the buffer until it is explicitly consumed. If store is invoked again with some other messages, msg can be pushed out of the buffer.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL