mux

package module
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 9, 2024 License: MIT Imports: 11 Imported by: 0

README

simple-mux

Go Reference

A simple connection multiplexing library for Golang.

  • Focus on performance.
  • Bi-directional streams. Streams can also be opened from server side to client.
  • Stream level flow control. Built-in rx buffer.
  • Session level health check and keepalive.

Example

package main

import (
	"fmt"
	"io"
	"net"

	smux "github.com/IrineSistiana/simple-mux"
)

func main() {
	panicIfErr := func(err error) {
		if err != nil {
			panic(err.Error())
		}
	}

	clientConn, serverConn := net.Pipe()
	clientSession := smux.NewSession(clientConn, smux.Opts{})
	serverSession := smux.NewSession(serverConn, smux.Opts{AllowAccept: true})

	go func() {
		stream, err := clientSession.OpenStream()
		panicIfErr(err)
		defer stream.Close()

		_, err = stream.Write([]byte("hello world"))
		panicIfErr(err)
	}()

	clientStream, err := serverSession.Accept()
	panicIfErr(err)

	b, err := io.ReadAll(clientStream)
	panicIfErr(err)
	
	fmt.Printf("received msg from client: %s\n", b)
}

Benchmark

Transfer data concurrently through multiple streams over one tcp loopback connection.

Benchmark_Mux/1_streams-8                 335876             16582 ns/op              1885 Mb/s        0 B/op          0 allocs/op
Benchmark_Mux/8_streams-8                 250275             22085 ns/op              1415 Mb/s        7 B/op          0 allocs/op
Benchmark_Mux/64_streams-8                276235             21512 ns/op              1453 Mb/s        2 B/op          0 allocs/op
Benchmark_Mux/512_streams-8               237309             21215 ns/op              1473 Mb/s        2 B/op          0 allocs/op
Benchmark_Mux/2048_streams-8              266313             21881 ns/op              1428 Mb/s       14 B/op          0 allocs/op
Benchmark_Mux/8196_streams-8              252049             22783 ns/op              1372 Mb/s       39 B/op          0 allocs/op

Baseline: Transfer data directly through one tcp loopback connection.

Benchmark_TCP-8                           395679             14765 ns/op              2116 Mb/s        0 B/op          0 allocs/op

Documentation

Index

Constants

View Source
const (
	MaxStreamNum       = 1<<31 - 1
	InitialStreamQuota = 100
)

Variables

View Source
var (
	ErrSessionClosed     = errors.New("session closed")
	ErrSessionEoL        = errors.New("session end of life")
	ErrStreamQuotaLimit  = errors.New("stream quota limit")
	ErrTooManyStreams    = errors.New("too many streams")
	ErrPayloadOverflowed = errors.New("payload size is to large")
	ErrKeepaliveTimedOut = errors.New("keepalive ping timed out")
	ErrIdleTimedOut      = errors.New("idle timed out")
	ErrAcceptNotAllowed  = errors.New("accept is not allowed")
)
View Source
var (
	ErrClosedStream = errors.New("closed stream")
)

Functions

This section is empty.

Types

type Opts

type Opts struct {
	// AllowAccept indicates this Session can accept streams
	// from peer. If AllowAccept is false and peer sends a SYN
	// frame, local will send a FIN frame.
	// On serve this typically should be true and false on client.
	AllowAccept bool

	// StreamReceiveWindow sets the default size of receive window when
	// a stream was opened/accepted.
	// Minimum rx window size is 64k and maximum is (1<<32 - 1).
	// If StreamReceiveWindow is invalid, the closest limit will
	// be used. Which means a zero value is 64k.
	StreamReceiveWindow int32

	// Write buffer size. Default is about 64k.
	WriteBufferSize int

	// Read buffer size. Default is 64k.
	ReadBufferSize int

	// KeepaliveInterval indicates how long will this Session sends a
	// ping request to the peer if no data was received. Zero value means no ping will be sent.
	KeepaliveInterval time.Duration

	// KeepaliveTimeout indicates how long will this Session be closed with
	// ErrPingTimeout if no further data (any data, not just a pong) was
	// received after a keepalive ping was sent.
	// Default is 10s.
	KeepaliveTimeout time.Duration

	// IdleTimeout indicates how long will this Session be closed with
	// ErrIdleTimeout if no stream is alive.
	// Zero value means no idle timeout.
	IdleTimeout time.Duration

	// WriteTimeout is the timeout for write op. If a write op started and after
	// WriteTimeout no data was been written, the connection will be closed.
	// This requires the connection implementing [SetWriteDeadline(t time.Time) error] method.
	// See [net.Conn] for more details.
	WriteTimeout time.Duration

	// The number of concurrent streams that peer can open at a time.
	// Minimum is initialStreamQuota.
	MaxConcurrentStreams int32

	// OnClose will be called once when the session is closed.
	OnClose func(session *Session, err error)
	// contains filtered or unexported fields
}

type Session

type Session struct {
	// contains filtered or unexported fields
}

func NewSession

func NewSession(c io.ReadWriteCloser, opts Opts) *Session

func (*Session) Accept

func (s *Session) Accept() (*Stream, error)

Accept accepts a Stream from peer. Session must be created with Opts.AllowAccept. Otherwise, Accept returns ErrAcceptNotAllowed. A Stream must be Accept-ed ASAP. Otherwise, all read operations of this Session (all its streams) will be blocked.

func (*Session) Close

func (s *Session) Close() error

Close closes Session and all its Streams. It always returns nil.

func (*Session) CloseWithErr added in v0.2.0

func (s *Session) CloseWithErr(err error)

func (*Session) Context added in v0.2.0

func (s *Session) Context() context.Context

Returned context.Context will be canceled with the cause error when Session is dead.

func (*Session) OpenStream

func (s *Session) OpenStream() (*Stream, error)

OpenStream opens a stream. Returns: ErrClosedSession if Session was closed. ErrStreamIdOverFlowed if Session has opened too many streams (see MaxStreamNum). Any error that inner connection returns while sending syn frame.

func (*Session) Ping added in v0.2.0

func (s *Session) Ping(ctx context.Context) error

func (*Session) ReserveStreamN added in v0.2.0

func (s *Session) ReserveStreamN(n int32) (reserved int32)

ReserveStreamN tries to reserve N streams for future use. If session reaches the stream id limit, the returned [reserved] will be smaller than n.

func (*Session) Status added in v0.2.0

func (s *Session) Status() SessionStatus

func (*Session) SubConn

func (s *Session) SubConn() io.ReadWriteCloser

SubConn returns the io.ReadWriteCloser that created this Session. This is for accessing info only. DO NOT r/w/c this sub connection.

type SessionStatus added in v0.2.0

type SessionStatus struct {
	Closed           bool
	OpenedSid        int32
	Reserved         int32
	LocalStreamQuota int32
	PeerStreamQuota  int32
	ActiveStreams    int
}

type Stream

type Stream struct {
	// contains filtered or unexported fields
}

func (*Stream) Close

func (s *Stream) Close() error

Close implements io.Closer. Close interrupts Read and Write.

func (*Stream) CloseWithErr added in v0.2.0

func (s *Stream) CloseWithErr(err error)

func (*Stream) Context added in v0.2.0

func (s *Stream) Context() context.Context

If stream was closed, the context.Context will be canceled with the cause error.

func (*Stream) ID

func (s *Stream) ID() int32

ID returns the stream's id. Negative id means the stream is opened by peer.

func (*Stream) Read

func (s *Stream) Read(p []byte) (n int, err error)

Read implements io.Reader.

func (*Stream) Session

func (s *Stream) Session() *Session

Session returns the Session that this Stream is belonged to.

func (*Stream) SetRxWindowSize

func (s *Stream) SetRxWindowSize(n int32) error

SetRxWindowSize sets the stream rx windows size. If n is invalid, the default/minimum limit will be used. It returns an error if it cannot send a window update frame.

func (*Stream) Write

func (s *Stream) Write(p []byte) (n int, err error)

Write implements io.Writer.

func (*Stream) WriteTo

func (s *Stream) WriteTo(w io.Writer) (int64, error)

Directories

Path Synopsis
internal

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL