multiplex

package module
v0.7.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 18, 2022 License: MIT Imports: 15 Imported by: 16

README

go-mplex

Go Reference Discourse posts

A super simple stream muxing library implementing mplex.

Usage

mplex := multiplex.NewMultiplex(mysocket)

s, _ := mplex.NewStream()
s.Write([]byte("Hello World!"))
s.Close()

os, _ := mplex.Accept()
// echo back everything received
io.Copy(os, os)

The last gx published version of this module was: 0.2.35: QmWGQQ6Tz8AdUpxktLf3zgnVN9Vy8fcWVezZJSU3ZmiANj

Documentation

Index

Constants

View Source
const (
	MaxMessageSize = 1 << 20
	BufferSize     = 4096
	MaxBuffers     = 4

	MinMemoryReservation = 3 * BufferSize
)

Variables

View Source
var (
	ErrStreamReset  = errors.New("stream reset")
	ErrStreamClosed = errors.New("closed stream")
)
View Source
var (
	ChunkSize = BufferSize - 20
)
View Source
var ErrInvalidState = errors.New("received an unexpected message from the peer")

ErrInvalidState is returned when the other side does something it shouldn't. In this case, we close the connection to be safe.

View Source
var ErrShutdown = errors.New("session shut down")

ErrShutdown is returned when operating on a shutdown session

View Source
var ErrTwoInitiators = errors.New("two initiators")

ErrTwoInitiators is returned when both sides think they're the initiator

View Source
var ReceiveTimeout = 5 * time.Second

Max time to block waiting for a slow reader to read from a stream before resetting it. Preferably, we'd have some form of back-pressure mechanism but we don't have that in this protocol.

View Source
var ResetStreamTimeout = 2 * time.Minute

Functions

This section is empty.

Types

type MemoryManager added in v0.4.0

type MemoryManager interface {
	// ReserveMemory reserves memory / buffer.
	ReserveMemory(size int, prio uint8) error
	// ReleaseMemory explicitly releases memory previously reserved with ReserveMemory
	ReleaseMemory(size int)
}

The MemoryManager allows management of memory allocations.

type Multiplex

type Multiplex struct {
	// contains filtered or unexported fields
}

Multiplex is a mplex session.

func NewMultiplex

func NewMultiplex(con net.Conn, initiator bool, memoryManager MemoryManager) (*Multiplex, error)

NewMultiplex creates a new multiplexer session.

func (*Multiplex) Accept

func (m *Multiplex) Accept() (*Stream, error)

Accept accepts the next stream from the connection.

func (*Multiplex) Close

func (mp *Multiplex) Close() error

Close closes the session.

func (*Multiplex) CloseChan added in v0.4.0

func (mp *Multiplex) CloseChan() <-chan struct{}

CloseChan returns a read-only channel which will be closed when the session is closed

func (*Multiplex) IsClosed

func (mp *Multiplex) IsClosed() bool

IsClosed returns true if the session is closed.

func (*Multiplex) NewNamedStream

func (mp *Multiplex) NewNamedStream(ctx context.Context, name string) (*Stream, error)

NewNamedStream creates a new named stream.

func (*Multiplex) NewStream

func (mp *Multiplex) NewStream(ctx context.Context) (*Stream, error)

NewStream creates a new stream.

type Stream

type Stream struct {
	// contains filtered or unexported fields
}

func (*Stream) Close

func (s *Stream) Close() error

func (*Stream) CloseRead added in v0.2.0

func (s *Stream) CloseRead() error

func (*Stream) CloseWrite added in v0.2.0

func (s *Stream) CloseWrite() error

func (*Stream) Name

func (s *Stream) Name() string

func (*Stream) Read

func (s *Stream) Read(b []byte) (int, error)

func (*Stream) Reset

func (s *Stream) Reset() error

func (*Stream) SetDeadline

func (s *Stream) SetDeadline(t time.Time) error

func (*Stream) SetReadDeadline

func (s *Stream) SetReadDeadline(t time.Time) error

func (*Stream) SetWriteDeadline

func (s *Stream) SetWriteDeadline(t time.Time) error

func (*Stream) Write

func (s *Stream) Write(b []byte) (int, error)

Directories

Path Synopsis
interop
go

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL