request

package
v0.0.0-...-7cd50c7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 4, 2023 License: Apache-2.0 Imports: 18 Imported by: 5

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (

	// Holds all the buckets to which client requests are added after being added to Buffers.
	Buckets []*Bucket
)

Functions

func AdvanceWatermarks

func AdvanceWatermarks(entries []interface{})

Advances the client watermarks of all Buffers. This allows new Requests to be added to the Buffers.

func BatchDigest

func BatchDigest(batch *pb.Batch) []byte

func Digest

func Digest(req *pb.ClientRequest) []byte

Return the hash of a protobuf client request message.

func GetBucketNr

func GetBucketNr(clID int32, clSN int32) int

This is the hash function that computes the bucket number of a request. This implementation assigns requests from the same client to buckets in a round-robin way.

func HandleRequest

func HandleRequest(req *pb.ClientRequest)

This function is used by the messenger as the handler function for requests (the main file performs the assignment). Simply adds the received request to the corresponding request buffer. TODO: If too many threads (64 or more in the current deployment with 32-core machines) invoke Add(),

the buffer locks get extremely contended.
Have only a fixed (configurable) number of threads invoking Add().
Spawn those worker threads in the Init() function and make HandleRequest (this function) only write
the request to a channel (do we need a big channel buffer for this?) that a worker reads.
It would make sense to send requests from the same client to the same worker,
Since the Buffer lock to be acquired by the worker is determined by the clientID.
The lock being acquired by the same thread is crucial for avoiding contention.
If this is not enough, try having the worker threads add requests to buffers in batches.
(Although this might be very tricky if we want to avoid verifying signatures while holding the buffer lock,
and at the same time avoid verifying the signature again, in case the request is already present.)

func Init

func Init()

Initialize the request package. Cannot be part of the init() function, as the configuration file is not yet loaded when init() is executed.

func RemoveBatch

func RemoveBatch(batch *Batch)

Removes all requests in batch from their respective Buckets. The buffer is not affected by this function.

func RequestIDToBytes

func RequestIDToBytes(req *pb.ClientRequest) []byte

Types

type Batch

type Batch struct {
	Requests []*Request
}

Represents a batch of requests.

func NewBatch

func NewBatch(msg *pb.Batch) *Batch

Creates a batch from a protobuf message and tries to add the requests of the message to their buffer. If the request is not added successfully (Add returns nil) this method also returns nil

func (*Batch) CheckBucket

func (b *Batch) CheckBucket(activeBuckets []int) error

Checks if the requests in the batch match a specific bucket. If there exists some request that does not match the bucket id the method returns an error.

func (*Batch) CheckInFlight

func (b *Batch) CheckInFlight() error

Checks if the batch contains "in flight" requests. If the batch has "in flight" requests the method returns an error.

func (*Batch) CheckSignatures

func (b *Batch) CheckSignatures() error

func (*Batch) MarkInFlight

func (b *Batch) MarkInFlight()

Marks all requests in the batch as "in flight"

func (*Batch) Message

func (b *Batch) Message() *pb.Batch

Returns a protobuf message containing this Batch.

func (*Batch) Resurrect

func (b *Batch) Resurrect()

Returns requests in the batch in their buckets after an unsuccessful proposal. TODO: Optimization: First group the requests by bucket and then prepend each group at once.

type Bucket

type Bucket struct {
	// Any modification (or read of potentially concurrently modified values)
	// of the request buffer requires acquiring this lock.
	// Using this syntax, one can use .Lock() and .Unlock() methods directly on the Bucket.
	sync.Mutex

	// BucketGroup waiting to cut a batch of requests (also) from this Bucket.
	// If no BucketGroup is waiting for this Bucket, group nil.
	// Used for notifying the BucketGroup waiting to cut a batch about request additions.
	// Modifications of this field also require the Bucket to be locked.
	Group *BucketGroup

	// Start of a doubly linked list of requests in the bucket.
	// Necessary for constant-time adding and removing, while still being able to traverse in consistent order.
	// If set to nil, no requests are in the bucket (and LastRequest also must be nil).
	FirstRequest *Request

	// End of the doubly linked list of request.
	// Pointer to the end necessary for using the list as a FIFO queue.
	// If set to nil, no requests are in the bucket (and FirstRequest also must be nil).
	LastRequest *Request
	// contains filtered or unexported fields
}

Represents a single bucket of client requests. The contents of the Bucket must be kept consistent with Buffers. Therefore, requests are added here only after being successfully added to Buffers. Moreover, requests removed from here must be removed from Buffers as well.

func GetBucketByHashing

func GetBucketByHashing(req *pb.ClientRequest) *Bucket

func NewBucket

func NewBucket(id int) *Bucket

func (*Bucket) AddRequest

func (b *Bucket) AddRequest(req *Request) (*Request, bool)

Wrapper for addNoLock() that acquires the bucket lock.

func (*Bucket) AddRequests

func (b *Bucket) AddRequests(reqs []*Request) ([]*Request, []*Request, []*Request)

Adds multiple requests. Batching wrapper for multiple calls to addNoLock(), only acquiring the bucket lock once.

func (*Bucket) GetId

func (b *Bucket) GetId() int

Get the ID of the bucket that corresponds to its position in the Buckets slice. Currently only used for printing debug messages.

func (*Bucket) Len

func (b *Bucket) Len() int

Counts all the requests in the bucket and returns their number.

func (*Bucket) Prepend

func (b *Bucket) Prepend(req *Request)

func (*Bucket) PrependMultiple

func (b *Bucket) PrependMultiple(reqs []*Request)

Adds request to the start of the bucket. Required for resurrecting requests. ATTENTION! The requests must not be already present in the bucket, otherwise PrependMultiple() corrupts the Bucket state. TODO: Batch the additions, passing a slice of requests instead of going one-by-one

func (*Bucket) PruneIndex

func (b *Bucket) PruneIndex(watermarks *sync.Map)

Removes the index entry for all the requests present in the given Log entries. This is only safe to do when the client watermarks for the corresponding epoch have been advanced. (See removeNoLock()) Decrements the given wait group when done.

func (*Bucket) Remove

func (b *Bucket) Remove(reqs []*Request)

Removes each Request in req from the Bucket if it is present, but NOT from the index (see removeNoLock()).

func (*Bucket) RemoveFirst

func (b *Bucket) RemoveFirst(n int, dest []*Request) []*Request

Removes the first up to n Requests from the Bucket and appends them to dest. Returns the resulting slice obtained by appending the Requests to dest. ATTENTION: Bucket must be LOCKED when calling this method.

type BucketGroup

type BucketGroup struct {
	// contains filtered or unexported fields
}

func NewBucketGroup

func NewBucketGroup(bucketIDs []int) *BucketGroup

Creates a new BucketGroup and returns a pointer to it. The buckets parameter is a list of bucket IDs. NewBucketGroup() sorts this list! Sorting by bucket ID is important to prevent deadlocks, as buckets are always locked in the order of this list. NOTE: Currently there is always only one bucket group, so these deadlocks cannot occur, but this might change.

func (*BucketGroup) CountRequests

func (bg *BucketGroup) CountRequests() int

Counts all requests in all buckets. Only makes sense if the buckets are locked.

func (*BucketGroup) CutBatch

func (bg *BucketGroup) CutBatch(size int, timeout time.Duration) *Batch

Returns a new request batch assembled from requests in the bucket group. Blocks until the Buckets contain at least size requests, but at most for the duration of timeout. On timeout, returns a batch with all requests in the Buckets, even if all the Buckets are empty.

func (*BucketGroup) GetBucketIDs

func (bg *BucketGroup) GetBucketIDs() []int

Returns a list with the bucket IDs in the Bucket Group.

func (*BucketGroup) RequestAdded

func (bg *BucketGroup) RequestAdded()

Notifies the BucketGroup that is waiting to cut a batch about a request being added in one of its buckets. Can be called concurrently from many Bucket.Add() methods (while the Bucket is locked).

func (*BucketGroup) WaitForRequests

func (bg *BucketGroup) WaitForRequests(numRequests int, timeout time.Duration)

Blocks until the buckets in the BucketGroup (cumulatively) contain numRequests requests or until timeout elapses. When WaitForRequests returns, bg.totalRequests accurately represents the total number of requests in the BucketGroup.

type Buffer

type Buffer struct {
	// Any modification (or read of potentially concurrently modified values)
	// of the request buffer requires acquiring this lock.
	// (Using this syntax, one can use .Lock() and .Unlock() methods directly on the Buffer.)
	sync.RWMutex

	// ID of the client this buffer is associated to.
	ClientID int32

	// Start of the client watermark window (as seen locally by the replica).
	// All ClientRequests with SNs lower than LowWatermark were present in the past,
	// but are now removed from the buffer.
	// Only requests with sequence numbers between LowWatermark
	// and (LowWatermark + config.Config.ClientWatermarkWindowSize) can be added to the buffer.
	// Additionaly, up to config.Config.ClientRequestBacklogSize requests will be stored in the buffer's backlog
	// and added automatically when LowWatermark increases in Buffer.AdvanceWatermarks()
	LowWatermark int32
	// contains filtered or unexported fields
}

Buffers Requests from a single client. Only requests within the watermark window can be added to the Buffer. The Buffer also maintains a backlog of requests with client sequence numbers higher than the watermark window. When the capacity of the backlog is exceeded, requests above the watermark window cannot be added. ATTENTION! While the Buffer can be locked, not all methods are thread-safe.

No two goroutines must access the buffer concurrently without proper synchronization.

func NewBuffer

func NewBuffer(clientID int32) *Buffer

Allocates and returns a new Buffer.

func (*Buffer) Add

func (b *Buffer) Add(req *Request) bool

Adds a ClientRequest req to the buffer, if req has never been added before. Does nothing if req has been added in the past, even if it has been removed in the meantime. If the request is added to the buffer, Add() adds the request to the corresponding bucket as well. If, after the call to Add(), the request is in the Buffer (either has just been added or has been in the buffer before), Add() returns a pointer to the stored Request struct of which req is a part. TODO: Update the comments when request verification is added to this method. If the request is not part of the buffer after the call to Add() (it has been ignored or backlogged), Add() returns nil. ATTENTION: The Add() method does not lock the buffer (as it is also called from another method that does).

Still, the Buffer must be locked when calling Add().

func (*Buffer) AdvanceWatermarks

func (b *Buffer) AdvanceWatermarks(entries []interface{}) watermarkRange

Processes log entries for advancing the client watermark. Tries to add requests from the backlog back to the buffer (since some of them might be now in the watermark window). Returns the old and the new watermark.

type Request

type Request struct {

	// The request message received over the network.
	Msg *pb.ClientRequest

	// Digest of the request.
	Digest []byte

	// Pointer to the buffer the request maps to (through its client ID).
	// This, except for convenience, avoids acquiring the read lock on the Buffer map when looking up this request's Buffer.
	Buffer *Buffer

	// Pointer to the bucket the request maps to.
	// Note that this does not necessarily mean that the request is currently inserted in that bucket!
	// It only means that IF the request is in a bucket, this is the bucket.
	Bucket *Bucket

	// Flag indicating whether the request signature has been verified.
	Verified bool

	// Request is "in flight", i.e., has been added to or observed in some (protocol-specific) proposal message.
	// This flag tracks duplication.
	// A request should be marked as in flight upon being either added to or upon encountered in a proposal message.
	// If a request is already proposed it should not be found in any other proposal message.
	InFlight bool

	// Requests are stored in a doubly linked list in the bucket for constant-time (random-access) adding and removing
	// while still being able to use it as a FIFO queue.
	// We do not encapsulate the Request in a container "Element" struct to avoid allocations of those elements.
	// Both Next and Prev must be set to nil when the request is not in a bucket.
	Next *Request
	Prev *Request

	// Channel to which the verifying goroutine should write the verified request.
	// During verification of a batch, a channel is created and assigned to this field for all requests in the batch.
	// Then, the requests are written to a (different) channel for the verifier goroutines to process them.
	// When a verifier has verified the request's signature, it writes it to this channel in order to notify the
	// batch verification method that the verification of this request finished.
	VerifiedChan chan *Request
}

Represents a client request and stores request-specific metadata.

func Add

func Add(req *Request) *Request

Adds a request received as a protobuf message to the appropriate buffer and bucket.

func AddReqMsg

func AddReqMsg(reqMsg *pb.ClientRequest) *Request

Allocates a new Request object from a client request message and adds it by calling Add().

func UglyUglyDummyRegisterRequest

func UglyUglyDummyRegisterRequest(reqMsg *pb.ClientRequest) *Request

This ugly ugly function is only a nasty workaround for the DummyOrderer to circumvent properly adding received requests to their buffers. Should never be used anywhere outside the DummyOrderer.

type Responder

type Responder struct {
	// contains filtered or unexported fields
}

Represents a responder to client requests

func NewResponder

func NewResponder() *Responder

Creates a new responder. A responder must be created before any protocol messages can be received from the network. Otherwise some responses to the client could be missed (in case entries are committed to the log before the responder has been created).

func (*Responder) Start

func (r *Responder) Start(wg *sync.WaitGroup)

Observes the log and responds to clients in commit order. Meant to be run as a separate goroutine. Decrements the provided wait group when done.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL