session

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 10, 2024 License: BSD-3-Clause Imports: 21 Imported by: 1

Documentation

Overview

Package session defines an abstraction of a session during a distributed RPC.

During a stream-based distributed RPC in minogrpc, the stream is kept alive during the whole protocol to act as a health check so that resources can be cleaned eventually, or if something goes wrong. The session manages this state while also managing the relays to other participants that the node must forward the messages to. Basically, a session has one or several relays open to the parent nodes and zero, one or multiple relays to other participants depending on the routing of the messages.

The package implements a unicast and a stream relay. Stream relay is only used when the orchestrator of a protocol is connecting to the first participant. Unicast is then used so that the sender of a message can receive feedbacks on the status of the message.

Documentation Last Review: 07.10.20202

Index

Constants

View Source
const HandshakeKey = "handshake"

HandshakeKey is the key to the handshake store in the headers.

Variables

This section is empty.

Functions

This section is empty.

Types

type Address

type Address struct {
	// contains filtered or unexported fields
}

Address is a representation of the network Address of a participant. The overlay implementation requires a difference between an orchestrator and its source address, where the former initiates a protocol and the latter participates.

See session.wrapAddress for the abstraction provided to a caller external to the overlay module.

- implements mino.Address

func NewAddress

func NewAddress(host string) Address

NewAddress creates a new address.

func NewAddressFromURL

func NewAddressFromURL(addr url.URL) (a Address, err error)

NewAddressFromURL creates a new address given a URL.

func NewOrchestratorAddress

func NewOrchestratorAddress(addr mino.Address) Address

NewOrchestratorAddress creates a new address which will be considered as the initiator of a protocol.

func (Address) ConnectionType

func (a Address) ConnectionType() mino.AddressConnectionType

ConnectionType returns how to connect to the other host

func (Address) Equal

func (a Address) Equal(other mino.Address) bool

Equal implements 'mino.Address'. It returns true if both addresses are exactly similar, in the sense that an orchestrator won't match a follower address with the same host.

func (Address) GetDialAddress

func (a Address) GetDialAddress() string

GetDialAddress returns a string formatted to be understood by grpc.Dial() functions.

func (Address) GetHostname

func (a Address) GetHostname() (string, error)

GetHostname parses the address to extract the hostname.

func (Address) MarshalText

func (a Address) MarshalText() ([]byte, error)

MarshalText implements mino.Address. It returns the text format of the address that can later be deserialized.

func (Address) String

func (a Address) String() string

String implements fmt.Stringer. It returns a string representation of the address.

type AddressFactory

type AddressFactory struct {
	serde.Factory
}

AddressFactory is a factory for addresses.

- implements mino.AddressFactory

func (AddressFactory) FromText

func (f AddressFactory) FromText(buf []byte) mino.Address

FromText implements mino.AddressFactory. It returns an instance of an address from a byte slice.

type ConnectionManager

type ConnectionManager interface {
	Len() int
	Acquire(mino.Address) (grpc.ClientConnInterface, error)
	Release(mino.Address)
}

ConnectionManager is an interface required by the session to open and release connections to the relays.

type NonBlockingQueue

type NonBlockingQueue struct {
	sync.Mutex
	// contains filtered or unexported fields
}

NonBlockingQueue is an implementation of a queue that makes sure pushing a message will never hang. The queue will fill a buffer if the channel is not drained and will drop messages when the limit is reached.

- implements session.Queue

func (*NonBlockingQueue) Channel

func (q *NonBlockingQueue) Channel() <-chan router.Packet

Channel implements session.Queue. It returns a channel that will be populated with incoming messages. The queue uses a buffer when the channel is busy therefore this channel should listened to as much as possible to drain the messages. At some point when the size of the buffer reaches a limit, messages will be dropped.

func (*NonBlockingQueue) Push

func (q *NonBlockingQueue) Push(msg router.Packet) error

Push implements session.Queue. It appends the message to the queue without blocking. The message is dropped if the queue is at maximum capacity by returning an error.

type PacketStream

type PacketStream interface {
	Context() context.Context
	Send(*ptypes.Packet) error
	Recv() (*ptypes.Packet, error)
}

PacketStream is a gRPC stream to send and receive protobuf packets.

type Queue

type Queue interface {
	Channel() <-chan router.Packet
	Push(router.Packet) error
}

Queue is an interface to queue messages.

type Relay

type Relay interface {
	// GetDistantAddress returns the address of the peer at the other end of the
	// relay.
	GetDistantAddress() mino.Address

	// Stream returns the stream that is holding the relay.
	Stream() PacketStream

	// Send sends a packet through the relay.
	Send(ctx context.Context, p router.Packet) (*ptypes.Ack, error)

	// Close closes the relay and clean the resources.
	Close() error
}

Relay is the interface of the relays spawn by the session when trying to contact a child node.

func NewRelay

func NewRelay(
	stream PacketStream, gw mino.Address,
	ctx serde.Context, conn grpc.ClientConnInterface, md metadata.MD,
) Relay

NewRelay returns a new relay that will send messages to the gateway through unicast requests.

func NewStreamRelay

func NewStreamRelay(gw mino.Address, stream PacketStream, ctx serde.Context) Relay

NewStreamRelay creates a new relay that will send the packets through the stream.

type Session

type Session interface {
	mino.Sender
	mino.Receiver

	// GetNumParents returns the number of active parents for the session.
	GetNumParents() int

	// Listen takes a stream that will determine when to close the session.
	Listen(parent Relay, table router.RoutingTable, ready chan struct{})

	// SetPassive sets a new passive parent. A passive parent is part of the
	// parent relays, but the stream does not listen to, and thus it is not
	// removed from the map if it closed.
	SetPassive(parent Relay, table router.RoutingTable)

	// Close shutdowns the session so that future calls to receive will return
	// an error.
	Close()

	// RecvPacket takes a packet and the address of the distant peer that have
	// sent it, then pass it to the correct relay according to the routing
	// table.
	RecvPacket(from mino.Address, p *ptypes.Packet) (*ptypes.Ack, error)
}

Session is an interface for a stream session that allows to send messages to the parent and relays, while receiving the ones for the local address.

func NewSession

func NewSession(
	md metadata.MD,
	me mino.Address,
	msgFac serde.Factory,
	pktFac router.PacketFactory,
	ctx serde.Context,
	connMgr ConnectionManager,
) Session

NewSession creates a new session for the provided parent relay.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL