pipe

package module
v1.1.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 11, 2019 License: MIT Imports: 14 Imported by: 0

README

go-libp2p-pipe

Pipe is an effective way to reuse libp2p streams. While streams are lightweight there are still cases of protocols which needs a lot of messaging between same peers for continuous time. Current libp2p flow suggests to create new stream for every new message or request/response, what could be inefficient in high flood of messages. Pipe suggests simple interface for two most common cases of stream messaging: simple message without any feedback and asynchronous request/response pattern.

Pipe is somewhere similar to request pipelining, but with one key difference - requested host does not have to handle requests in line and can process them for long time, so responses could be sent at any time without any ordering.

Pipe takes full control over stream and handles new stream creation on failures with graceful pipe closing on both sides of the pipe.

Documentation

Overview

Pipe is an effective way to reuse libp2p streams. While streams are lightweight, there are still cases of protocols which needs a lot of messaging between same peers for continuous time. Current libp2p flow suggests to create new stream for every new message or request/response, what could be inefficient and bandwidth wasteful in high flood of messages. Pipe suggests simple interface for two most common cases of messaging protocols over streams: simple message without any feedback and request/response pattern.

Pipe is somewhere similar to request pipelining, but with one key difference - requested host does not have to handle requests in line and can process them for some time, so responses could be sent at any time without any ordering.

Pipe takes full control over stream and handles new stream creation on failures with graceful pipe closing on both sides of the pipe.

Index

Constants

This section is empty.

Variables

View Source
var (
	// MaxWriteAttempts specifies amount of retries to write on failure
	MaxWriteAttempts = 3

	// MessageBuffer specifies the size of buffer for incoming messages
	// If buffer is full, new messages will be dropped
	MessageBuffer = 8
)
View Source
var (
	ErrClosed = errors.New("pipe: closed pipe")
	ErrReset  = errors.New("pipe: reset pipe")
)
View Source
var Protocol protocol.ID = "/pipe/1.0.0"

Functions

func MarshalMessage added in v1.1.2

func MarshalMessage(msg *Message, buf []byte) (int, error)

MarshalMessage fills given byte slice with the Message

func ReadMessage added in v1.1.2

func ReadMessage(r io.Reader, msg *Message) error

ReadMessage fills given Message from Reader

func RemovePipeHandler

func RemovePipeHandler(host core.Host, proto core.ProtocolID)

RemovePipeHandler removes pipe handler from host

func SetPipeHandler

func SetPipeHandler(host core.Host, h Handler, proto core.ProtocolID)

SetPipeHandler sets new stream handler which wraps stream into the pipe

func UnmarshalMessage added in v1.1.2

func UnmarshalMessage(msg *Message, b []byte) error

UnmarshalMessage fills given Message from byte slice

func WriteMessage added in v1.1.2

func WriteMessage(w io.Writer, msg *Message) error

WriteMessage writes given Message to the Writer

Types

type Handler

type Handler func(Pipe)

type Message

type Message struct {
	// contains filtered or unexported fields
}

Message is the central object in Pipe's communication. It is responsible for transferring Pipe's user data. Also, it has 3 forms: simple message, request, response.

func NewMessage

func NewMessage(msg []byte) *Message

NewMessage creates new simple pipe Message. Pipe just them pass through without additional handling.

func NewRequest

func NewRequest(msg []byte) *Message

NewRequest creates new Message with request form. Requests allow users to send any data through the Pipe and asynchronously receive responses at any time after other end calculates them.

func (*Message) Data

func (r *Message) Data() []byte

Data returns bytes which were transported through message

func (*Message) Reply

func (r *Message) Reply(resp ...RepOpt) error

Reply sends response, if the message is a received request Reply accepts different options as responses, which could be concatenated together

func (*Message) Response

func (r *Message) Response(ctx context.Context) ([]byte, error)

Response waits and returns response, if the message is a sent request

type Pipe

type Pipe interface {
	// Closes pipe for writing
	io.Closer

	// Send puts message in the pipe which after are transported to other pipe's end
	Send(*Message) error

	// Next iteratively reads new messages from pipe
	Next(context.Context) (*Message, error)

	// Protocol returns protocol identifier defined in pipe
	Protocol() protocol.ID

	// Conn returns underlying connection used by pipe
	Conn() network.Conn

	// Reset closes the pipe for reading and writing on both sides
	Reset() error
}

func NewPipe

func NewPipe(ctx context.Context, host core.Host, peer peer.ID, proto core.ProtocolID) (Pipe, error)

NewPipe creates new pipe over new stream

type RepOpt

type RepOpt func(*Message)

RepOpt is a type of option functions for Reply method

func Data

func Data(data []byte) RepOpt

Data is an reply option which sends a byte slice to the requester as a response

func Error

func Error(err error) RepOpt

Error is an reply option which sends an error to the requester as a response

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL