Documentation ¶
Overview ¶
Package peerstream is a peer-to-peer networking library that multiplexes connections to many hosts. It attempts to simplify the complexity of:
* accepting incoming connections over **multiple** listeners * dialing outgoing connections over **multiple** transports * multiplexing **multiple** connections per-peer * multiplexing **multiple** different servers or protocols * handling backpressure correctly * handling stream multiplexing * providing a **simple** interface to the user
Index ¶
- Variables
- func ConnInConns(c1 *Conn, conns []*Conn) bool
- func EchoHandler(s *Stream)
- func NoOpConnHandler(c *Conn)
- func NoOpStreamHandler(s *Stream)
- func ResetHandler(s *Stream)
- type Conn
- func (c *Conn) AddGroup(g Group)
- func (c *Conn) Close() error
- func (c *Conn) Conn() smux.Conn
- func (c *Conn) GoClose()
- func (c *Conn) Groups() []Group
- func (c *Conn) InGroup(g Group) bool
- func (c *Conn) NetConn() net.Conn
- func (c *Conn) NewStream() (*Stream, error)
- func (c *Conn) Streams() []*Stream
- func (c *Conn) String() string
- func (c *Conn) Swarm() *Swarm
- type ConnHandler
- type Group
- type Groupable
- type Listener
- type Notifiee
- type SelectConn
- type Stream
- func (s *Stream) AddGroup(g Group)
- func (s *Stream) Close() error
- func (s *Stream) Conn() *Conn
- func (s *Stream) Groups() []Group
- func (s *Stream) InGroup(g Group) bool
- func (s *Stream) Protocol() protocol.ID
- func (s *Stream) Read(p []byte) (n int, err error)
- func (s *Stream) Reset() error
- func (s *Stream) SetDeadline(t time.Time) error
- func (s *Stream) SetProtocol(p protocol.ID)
- func (s *Stream) SetReadDeadline(t time.Time) error
- func (s *Stream) SetWriteDeadline(t time.Time) error
- func (s *Stream) Stream() smux.Stream
- func (s *Stream) String() string
- func (s *Stream) Swarm() *Swarm
- func (s *Stream) Write(p []byte) (n int, err error)
- type StreamHandler
- type Swarm
- func (s *Swarm) AddConn(tptConn tpt.Conn, groups ...Group) (*Conn, error)
- func (s *Swarm) AddListener(l tpt.Listener, groups ...Group) (*Listener, error)
- func (s *Swarm) Close() error
- func (s *Swarm) ConnHandler() ConnHandler
- func (s *Swarm) Conns() []*Conn
- func (s *Swarm) ConnsWithGroup(g Group) []*Conn
- func (s *Swarm) Dump() string
- func (s *Swarm) Listeners() []*Listener
- func (s *Swarm) NewStream() (*Stream, error)
- func (s *Swarm) NewStreamSelectConn(selConn SelectConn) (*Stream, error)
- func (s *Swarm) NewStreamWithConn(conn *Conn) (*Stream, error)
- func (s *Swarm) NewStreamWithGroup(group Group) (*Stream, error)
- func (s *Swarm) NewStreamWithNetConn(netConn tpt.Conn) (*Stream, error)
- func (s *Swarm) Notify(n Notifiee)
- func (s *Swarm) SelectConn() SelectConn
- func (s *Swarm) SetConnHandler(ch ConnHandler)
- func (s *Swarm) SetSelectConn(cs SelectConn)
- func (s *Swarm) SetStreamHandler(sh StreamHandler)
- func (s *Swarm) StopNotify(n Notifiee)
- func (s *Swarm) StreamHandler() StreamHandler
- func (s *Swarm) Streams() []*Stream
- func (s *Swarm) StreamsWithGroup(g Group) []*Stream
- func (s *Swarm) String() string
Constants ¶
This section is empty.
Variables ¶
var AcceptConcurrency = 200
AcceptConcurrency is how many connections can simultaneously be in process of being accepted. Handshakes can sometimes occur as part of this process, so it may take some time. It is imporant to rate limit lest a malicious influx of connections would cause our node to consume all its resources accepting new connections.
var ErrGroupNotFound = errors.New("group not found")
ErrGroupNotFound signals no such group exists
var ErrInvalidConnSelected = errors.New("invalid selected connection")
ErrInvalidConnSelected signals that a connection selected with a SelectConn function is invalid. This may be due to the Conn not being part of the original set given to the function, or the value being nil.
var ErrNoConnections = errors.New("no connections")
ErrNoConnections signals that no connections are available
var GarbageCollectTimeout = 5 * time.Second
GarbageCollectTimeout governs the periodic connection closer.
var SelectRandomConn = func(conns []*Conn) *Conn { if len(conns) == 0 { return nil } return conns[rand.Intn(len(conns))] }
SelectRandomConn defines a function which takes a slice of Conns and returns a randomly selected one. Note that it is user's responsability to rand.Seed before using.
Functions ¶
func ConnInConns ¶
ConnInConns returns true if a connection belongs to the conns slice.
func EchoHandler ¶
func EchoHandler(s *Stream)
EchoHandler launches a StreamHandling go-routine which echoes everything read from the stream back into it. It closes the stream at the end.
func NoOpConnHandler ¶
func NoOpConnHandler(c *Conn)
NoOpConnHandler is a connection handler which does nothing.
func NoOpStreamHandler ¶
func NoOpStreamHandler(s *Stream)
NoOpStreamHandler is a StreamHandler which does nothing.
func ResetHandler ¶
func ResetHandler(s *Stream)
ResetHandler is a StreamHandler which simply resets the stream.
Types ¶
type Conn ¶
type Conn struct {
// contains filtered or unexported fields
}
Conn is a Swarm-associated connection.
func (*Conn) Conn ¶
Conn returns the underlying transport Connection we use Warning: modifying this object is undefined.
func (*Conn) GoClose ¶
func (c *Conn) GoClose()
GoClose spawns off a goroutine to close the connection iff the connection is not already being closed and returns immediately
type ConnHandler ¶
type ConnHandler func(s *Conn)
ConnHandler is a function which receives a Conn. It allows clients to set a function to receive newly accepted connections. It works like StreamHandler, but is usually less useful than usual as most services will only use Streams. It is safe to pass or store the *Conn elsewhere. Note: the ConnHandler is called sequentially, so spawn goroutines or pass the Conn. See EchoHandler.
type Group ¶
type Group interface{}
Group is an object used to associate a group of Streams, Connections, and Listeners. It can be anything, it is meant to work like a KeyType in maps
type Groupable ¶
type Groupable interface { // Groups returns the groups this object belongs to Groups() []Group // InGroup returns whether this object belongs to a Group InGroup(g Group) bool // AddGroup adds this object to a group AddGroup(g Group) }
Groupable is an interface for a set of objects that can be assigned groups: Streams, Connections, and Listeners. Objects inherit groups (e.g. a Stream inherits the groups of its parent Connection, and in turn that of its Listener).
type Listener ¶
type Listener struct {
// contains filtered or unexported fields
}
Listener wraps a libp2p-transport Listener and a Swarm and a group set. Listener is returned by Swarm.AddListener() and provides its purpose is to be able to associate Swarm/Listener pairs to groups.
func ListenersWithGroup ¶
ListenersWithGroup narrows down a set of listeners to those in given group.
func (*Listener) AcceptErrors ¶
AcceptErrors returns a channel for the errors that we **might** get on listener close.
func (*Listener) NetListener ¶
NetListener returns the libp2p-transport Listener wrapped in Listener.
type Notifiee ¶
type Notifiee interface { Connected(*Conn) // called when a connection opened Disconnected(*Conn) // called when a connection closed OpenedStream(*Stream) // called when a stream opened ClosedStream(*Stream) // called when a stream closed }
Notifiee is an interface for an object wishing to receive notifications from a Swarm. Notifiees should take care not to register other notifiees inside of a notification. They should also take care to do as little work as possible within their notification, putting any blocking work out into a goroutine.
type SelectConn ¶
SelectConn selects a connection out of list. It allows delegation of decision making to clients. Clients can make SelectConn functons that check things connection qualities -- like latency andbandwidth -- or pick from a logical set of connections.
type Stream ¶
type Stream struct {
// contains filtered or unexported fields
}
Stream is an io.{Read,Write,Close}r to a remote counterpart. It wraps a spdystream.Stream, and links it to a Conn and groups
func StreamsWithGroup ¶
StreamsWithGroup narrows down a set of streams to those in given group.
func (*Stream) Close ¶
Close closes the write end of the stream. NOTE: This currently removes the stream from the swarm as well. We shouldn't do this but not doing this will result in bad memory leaks so we're punting for now until we have some way to know when a stream has been closed by the remote side.
func (*Stream) Read ¶
Read reads from the stream and returns the number of bytes read. It implements the io.Reader interface.
func (*Stream) SetDeadline ¶
SetDeadline sets the read and write deadlines associated with the Stream. It is equivalent to calling both SetReadDeadline and SetWriteDeadline.
A deadline is an absolute time after which I/O operations fail with a timeout (see type Error) instead of blocking.
func (*Stream) SetProtocol ¶
SetProtocol sets the protocol identifier for this Stream.
func (*Stream) SetReadDeadline ¶
SetReadDeadline sets the deadline for future Read calls and any currently-blocked Read call. A zero value for t means Read will not time out.
func (*Stream) SetWriteDeadline ¶
SetWriteDeadline sets the deadline for future Write calls and any currently-blocked Write call. Even if write times out, it may return n > 0, indicating that some of the data was successfully written. A zero value for t means Write will not time out.
type StreamHandler ¶
type StreamHandler func(s *Stream)
StreamHandler is a function which receives a Stream. It allows clients to set a function to receive newly created streams, and decide whether to continue adding them. It works sort of like a http.HandleFunc. Note: the StreamHandler is called sequentially, so spawn goroutines or pass the Stream. See EchoHandler.
type Swarm ¶
type Swarm struct {
// contains filtered or unexported fields
}
Swarm represents a group of streams, connections and listeners which are interconnected using a multiplexed transport. Swarms keep track of user-added handlers which define the actions upon the arrival of new streams and handlers.
func (*Swarm) AddConn ¶
AddConn gives the Swarm ownership of tpt.Conn. The Swarm will negotiate an appropriate multiplexer for the connection and and begin listening for Streams. Returns the resulting Swarm-associated peerstream.Conn.
Do not use the tpt.Conn once you've passed it to this method.
func (*Swarm) AddListener ¶
AddListener adds libp2p-transport Listener to the Swarm, and immediately begins accepting incoming connections.
func (*Swarm) ConnHandler ¶
func (s *Swarm) ConnHandler() ConnHandler
ConnHandler returns the Swarm's current ConnHandler. This is a threadsafe (atomic) operation
func (*Swarm) ConnsWithGroup ¶
ConnsWithGroup returns all the connections with a given Group
func (*Swarm) NewStream ¶
NewStream opens a new Stream on the best available connection, as selected by current swarm.SelectConn.
func (*Swarm) NewStreamSelectConn ¶
func (s *Swarm) NewStreamSelectConn(selConn SelectConn) (*Stream, error)
NewStreamSelectConn opens a new Stream on a connection selected by selConn.
func (*Swarm) NewStreamWithConn ¶
NewStreamWithConn opens a new Stream on given Conn.
func (*Swarm) NewStreamWithGroup ¶
NewStreamWithGroup opens a new Stream on an available connection in the given group. Uses the current swarm.SelectConn to pick between multiple connections.
func (*Swarm) NewStreamWithNetConn ¶
NewStreamWithNetConn opens a new Stream on a given libp2p-transport Conn. Calls s.AddConn(Conn).
func (*Swarm) SelectConn ¶
func (s *Swarm) SelectConn() SelectConn
SelectConn returns the Swarm's current connection selector. SelectConn is used in order to select the best of a set of possible connections. The default chooses one at random. This is a threadsafe (atomic) operation
func (*Swarm) SetConnHandler ¶
func (s *Swarm) SetConnHandler(ch ConnHandler)
SetConnHandler assigns the conn handler in the swarm. Unlike the StreamHandler, the ConnHandler has less respon- ibility for the Connection. The Swarm is still its client. This handler is only a notification. This is a threadsafe (atomic) operation
func (*Swarm) SetSelectConn ¶
func (s *Swarm) SetSelectConn(cs SelectConn)
SetSelectConn assigns the connection selector in the swarm. If cs is nil, will use SelectRandomConn This is a threadsafe (atomic) operation
func (*Swarm) SetStreamHandler ¶
func (s *Swarm) SetStreamHandler(sh StreamHandler)
SetStreamHandler assigns the stream handler in the swarm. The handler assumes responsibility for closing the stream. This need not happen at the end of the handler, leaving the stream open (to be used and closed later) is fine. It is also fine to keep a pointer to the Stream. This is a threadsafe (atomic) operation
func (*Swarm) StopNotify ¶
StopNotify unregisters Notifiee fromr receiving signals
func (*Swarm) StreamHandler ¶
func (s *Swarm) StreamHandler() StreamHandler
StreamHandler returns the Swarm's current StreamHandler. This is a threadsafe (atomic) operation
func (*Swarm) StreamsWithGroup ¶
StreamsWithGroup returns all the streams with a given Group