h2mux

package
v0.0.0-...-455eedf Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 28, 2023 License: Apache-2.0 Imports: 17 Imported by: 0

Documentation

Index

Constants

View Source
const (
	FrameSetCompressionContext http2.FrameType = 0xf0
	FrameUseDictionary         http2.FrameType = 0xf1
	FrameSetDictionary         http2.FrameType = 0xf2
)
View Source
const (
	FlagSetDictionaryAppend http2.Flags = 0x1
	FlagSetDictionaryOffset http2.Flags = 0x2
)
View Source
const (
	SettingMuxerMagic http2.SettingID = 0x42db
	MuxerMagicOrigin  uint32          = 0xa2e43c8b
	MuxerMagicEdge    uint32          = 0x1088ebf9
)
View Source
const SettingCompression http2.SettingID = 0xff20

Assign temporary values

Variables

View Source
var (
	// HTTP2 error codes: https://http2.github.io/http2-spec/#ErrorCodes
	ErrHandshakeTimeout               = MuxerHandshakeError{"1000 handshake timeout"}
	ErrBadHandshakeNotSettings        = MuxerHandshakeError{"1001 unexpected response"}
	ErrBadHandshakeUnexpectedAck      = MuxerHandshakeError{"1002 unexpected response"}
	ErrBadHandshakeNoMagic            = MuxerHandshakeError{"1003 unexpected response"}
	ErrBadHandshakeWrongMagic         = MuxerHandshakeError{"1004 connected to endpoint of wrong type"}
	ErrBadHandshakeNotSettingsAck     = MuxerHandshakeError{"1005 unexpected response"}
	ErrBadHandshakeUnexpectedSettings = MuxerHandshakeError{"1006 unexpected response"}

	ErrUnexpectedFrameType = MuxerProtocolError{"2001 unexpected frame type", http2.ErrCodeProtocol}
	ErrUnknownStream       = MuxerProtocolError{"2002 unknown stream", http2.ErrCodeProtocol}
	ErrInvalidStream       = MuxerProtocolError{"2003 invalid stream", http2.ErrCodeProtocol}
	ErrNotRPCStream        = MuxerProtocolError{"2004 not RPC stream", http2.ErrCodeProtocol}

	ErrStreamHeadersSent               = MuxerApplicationError{"3000 headers already sent"}
	ErrStreamRequestConnectionClosed   = MuxerApplicationError{"3001 connection closed while opening stream"}
	ErrConnectionDropped               = MuxerApplicationError{"3002 connection dropped"}
	ErrStreamRequestTimeout            = MuxerApplicationError{"3003 open stream timeout"}
	ErrResponseHeadersTimeout          = MuxerApplicationError{"3004 timeout waiting for initial response headers"}
	ErrResponseHeadersConnectionClosed = MuxerApplicationError{"3005 connection closed while waiting for initial response headers"}

	ErrClosedStream = MuxerStreamError{"4000 stream closed", http2.ErrCodeStreamClosed}
)
View Source
var (
	ActiveStreams = prometheus.NewGauge(prometheus.GaugeOpts{
		Namespace: "cloudflared",
		Subsystem: "tunnel",
		Name:      "active_streams",
		Help:      "Number of active streams created by all muxers.",
	})
)

Functions

func CompressionIsSupported

func CompressionIsSupported() bool

func IsRPCStreamResponse

func IsRPCStreamResponse(stream *MuxedStream) bool

Types

type AtomicCounter

type AtomicCounter struct {
	// contains filtered or unexported fields
}

func NewAtomicCounter

func NewAtomicCounter(initCount uint64) *AtomicCounter

func (*AtomicCounter) Count

func (c *AtomicCounter) Count() uint64

Count returns the current value of counter and reset it to 0

func (*AtomicCounter) IncrementBy

func (c *AtomicCounter) IncrementBy(number uint64)

func (*AtomicCounter) Value

func (c *AtomicCounter) Value() uint64

Value returns the current value of counter

type BooleanFuse

type BooleanFuse struct {
	// contains filtered or unexported fields
}

BooleanFuse is a data structure that can be set once to a particular value using Fuse(value). Subsequent calls to Fuse() will have no effect.

func NewBooleanFuse

func NewBooleanFuse() *BooleanFuse

func (*BooleanFuse) Await

func (f *BooleanFuse) Await() bool

Await blocks until Fuse has been called at least once.

func (*BooleanFuse) Fuse

func (f *BooleanFuse) Fuse(result bool)

func (*BooleanFuse) Value

func (f *BooleanFuse) Value() bool

Value gets the value

type CompressionPreset

type CompressionPreset struct {
	// contains filtered or unexported fields
}

type CompressionSetting

type CompressionSetting uint
const (
	CompressionNone CompressionSetting = iota
	CompressionLow
	CompressionMedium
	CompressionMax
)
type Header struct {
	Name, Value string
}

func RPCHeaders

func RPCHeaders() []Header

type IdleTimer

type IdleTimer struct {
	// The channel on which ticks are delivered.
	C <-chan time.Time
	// contains filtered or unexported fields
}

IdleTimer is a type of Timer designed for managing heartbeats on an idle connection. The timer ticks on an interval with added jitter to avoid accidental synchronisation between two endpoints. It tracks the number of retries/ticks since the connection was last marked active.

The methods of IdleTimer must not be called while a goroutine is reading from C.

func NewIdleTimer

func NewIdleTimer(idleDuration time.Duration, maxRetries uint64) *IdleTimer

func (*IdleTimer) MarkActive

func (t *IdleTimer) MarkActive()

MarkActive resets the idle connection timer and suppresses any outstanding idle events.

func (*IdleTimer) ResetTimer

func (t *IdleTimer) ResetTimer()

Reset the idle timer according to the configured duration, with some added jitter.

func (*IdleTimer) Retry

func (t *IdleTimer) Retry() bool

Retry should be called when retrying the idle timeout. If the maximum number of retries has been met, returns false. After calling this function and sending a heartbeat, call ResetTimer. Since sending the heartbeat could be a blocking operation, we resetting the timer after the write completes to avoid it expiring during the write.

func (*IdleTimer) RetryCount

func (t *IdleTimer) RetryCount() uint64

type MuxReader

type MuxReader struct {
	// contains filtered or unexported fields
}

func (*MuxReader) Shutdown

func (r *MuxReader) Shutdown() <-chan struct{}

Shutdown blocks new streams from being created. It returns a channel that is closed once the last stream has closed.

type MuxWriter

type MuxWriter struct {
	// contains filtered or unexported fields
}

type MuxedStream

type MuxedStream struct {

	// The headers that were most recently received.
	// Particularly:
	//     * for an eyeball-initiated stream (as passed to TunnelHandler::ServeStream),
	//       these are the request headers
	//     * for a cloudflared-initiated stream (as created by Register/UnregisterTunnel),
	//       these are the response headers.
	// They are useful in both of these contexts; hence `Headers` is public.
	Headers []Header
	// contains filtered or unexported fields
}

MuxedStream is logically an HTTP/2 stream, with an additional buffer for outgoing data.

func NewStream

func NewStream(config MuxerConfig, writeHeaders []Header, readyList MuxedStreamDataSignaller, dictionaries h2Dictionaries) *MuxedStream

func (*MuxedStream) Close

func (s *MuxedStream) Close() error

func (*MuxedStream) CloseRead

func (s *MuxedStream) CloseRead() error

func (*MuxedStream) CloseWrite

func (s *MuxedStream) CloseWrite() error

func (*MuxedStream) IsRPCStream

func (s *MuxedStream) IsRPCStream() bool

IsRPCStream returns if the stream is used to transport RPC.

func (*MuxedStream) Read

func (s *MuxedStream) Read(p []byte) (n int, err error)

func (*MuxedStream) Write

func (s *MuxedStream) Write(p []byte) (int, error)

Blocks until len(p) bytes have been written to the buffer

func (*MuxedStream) WriteClosed

func (s *MuxedStream) WriteClosed() bool

func (*MuxedStream) WriteHeaders

func (s *MuxedStream) WriteHeaders(headers []Header) error

type MuxedStreamDataSignaller

type MuxedStreamDataSignaller interface {
	// Non-blocking: call this when data is ready to be sent for the given stream ID.
	Signal(ID uint32)
}

MuxedStreamDataSignaller is a write-only *ReadyList

type MuxedStreamFunc

type MuxedStreamFunc func(stream *MuxedStream) error

func (MuxedStreamFunc) ServeStream

func (f MuxedStreamFunc) ServeStream(stream *MuxedStream) error

type MuxedStreamHandler

type MuxedStreamHandler interface {
	ServeStream(*MuxedStream) error
}

type MuxedStreamReader

type MuxedStreamReader struct {
	*MuxedStream
}

MuxedStreamReader implements io.ReadCloser for the read end of the stream. This is useful for passing to functions that close the object after it is done reading, but you still want to be able to write data afterwards (e.g. http.Client).

func (MuxedStreamReader) Close

func (s MuxedStreamReader) Close() error

func (MuxedStreamReader) Read

func (s MuxedStreamReader) Read(p []byte) (n int, err error)

type MuxedStreamRequest

type MuxedStreamRequest struct {
	// contains filtered or unexported fields
}

func NewMuxedStreamRequest

func NewMuxedStreamRequest(stream *MuxedStream, body io.Reader) MuxedStreamRequest

type Muxer

type Muxer struct {
	// contains filtered or unexported fields
}

func Handshake

func Handshake(
	w io.WriteCloser,
	r io.ReadCloser,
	config MuxerConfig,
	activeStreamsMetrics prometheus.Gauge,
) (*Muxer, error)

Handshake establishes a muxed connection with the peer. After the handshake completes, it is possible to open and accept streams.

func (*Muxer) AwaitResponseHeaders

func (m *Muxer) AwaitResponseHeaders(ctx context.Context, stream *MuxedStream) error

func (*Muxer) CloseStreamRead

func (m *Muxer) CloseStreamRead(stream *MuxedStream)

func (*Muxer) MakeMuxedStreamRequest

func (m *Muxer) MakeMuxedStreamRequest(ctx context.Context, request MuxedStreamRequest) error

func (*Muxer) Metrics

func (m *Muxer) Metrics() *MuxerMetrics

func (*Muxer) NewStream

func (m *Muxer) NewStream(headers []Header) *MuxedStream

func (*Muxer) OpenRPCStream

func (m *Muxer) OpenRPCStream(ctx context.Context) (*MuxedStream, error)

func (*Muxer) OpenStream

func (m *Muxer) OpenStream(ctx context.Context, headers []Header, body io.Reader) (*MuxedStream, error)

OpenStream opens a new data stream with the given headers. Called by proxy server and tunnel

func (*Muxer) Serve

func (m *Muxer) Serve(ctx context.Context) error

Serve runs the event loops that comprise h2mux: - MuxReader.run() - MuxWriter.run() - muxMetricsUpdater.run() In the normal case, Shutdown() is called concurrently with Serve() to stop these loops.

func (*Muxer) Shutdown

func (m *Muxer) Shutdown() <-chan struct{}

Shutdown is called to initiate the "happy path" of muxer termination. It blocks new streams from being created. It returns a channel that is closed when the last stream has been closed.

func (*Muxer) TimerRetries

func (m *Muxer) TimerRetries() uint64

Return how many retries/ticks since the connection was last marked active

type MuxerApplicationError

type MuxerApplicationError struct {
	// contains filtered or unexported fields
}

func (MuxerApplicationError) Error

func (e MuxerApplicationError) Error() string

type MuxerConfig

type MuxerConfig struct {
	Timeout  time.Duration
	Handler  MuxedStreamHandler
	IsClient bool
	// Name is used to identify this muxer instance when logging.
	Name string
	// The minimum time this connection can be idle before sending a heartbeat.
	HeartbeatInterval time.Duration
	// The minimum number of heartbeats to send before terminating the connection.
	MaxHeartbeats uint64
	// Logger to use
	Log                *zerolog.Logger
	CompressionQuality CompressionSetting
	// Initial size for HTTP2 flow control windows
	DefaultWindowSize uint32
	// Largest allowable size for HTTP2 flow control windows
	MaxWindowSize uint32
	// Largest allowable capacity for the buffer of data to be sent
	StreamWriteBufferMaxLen int
}

type MuxerHandshakeError

type MuxerHandshakeError struct {
	// contains filtered or unexported fields
}

func (MuxerHandshakeError) Error

func (e MuxerHandshakeError) Error() string

type MuxerMetrics

type MuxerMetrics struct {
	RTT, RTTMin, RTTMax                                              time.Duration
	ReceiveWindowAve, SendWindowAve                                  float64
	ReceiveWindowMin, ReceiveWindowMax, SendWindowMin, SendWindowMax uint32
	InBoundRateCurr, InBoundRateMin, InBoundRateMax                  uint64
	OutBoundRateCurr, OutBoundRateMin, OutBoundRateMax               uint64
	CompBytesBefore, CompBytesAfter                                  *AtomicCounter
}

func (*MuxerMetrics) CompRateAve

func (m *MuxerMetrics) CompRateAve() float64

type MuxerProtocolError

type MuxerProtocolError struct {
	// contains filtered or unexported fields
}

func (MuxerProtocolError) Error

func (e MuxerProtocolError) Error() string

type MuxerStreamError

type MuxerStreamError struct {
	// contains filtered or unexported fields
}

func (MuxerStreamError) Error

func (e MuxerStreamError) Error() string

type PingTimestamp

type PingTimestamp struct {
	// contains filtered or unexported fields
}

PingTimestamp is an atomic interface around ping timestamping and signalling.

func NewPingTimestamp

func NewPingTimestamp() *PingTimestamp

func (*PingTimestamp) Get

func (pt *PingTimestamp) Get() int64

func (*PingTimestamp) GetUpdateChan

func (pt *PingTimestamp) GetUpdateChan() <-chan struct{}

func (*PingTimestamp) Set

func (pt *PingTimestamp) Set(v int64)

type ReadWriteClosedCloser

type ReadWriteClosedCloser interface {
	io.ReadWriteCloser
	Closed() bool
}

type ReadWriteLengther

type ReadWriteLengther interface {
	io.ReadWriter
	Reset()
	Len() int
}

type ReadyList

type ReadyList struct {
	// contains filtered or unexported fields
}

ReadyList multiplexes several event signals onto a single channel.

func NewReadyList

func NewReadyList() *ReadyList

func (*ReadyList) Close

func (r *ReadyList) Close()

func (*ReadyList) ReadyChannel

func (r *ReadyList) ReadyChannel() <-chan uint32

func (*ReadyList) Signal

func (r *ReadyList) Signal(ID uint32)

ID is the stream ID

type SharedBuffer

type SharedBuffer struct {
	// contains filtered or unexported fields
}

func NewSharedBuffer

func NewSharedBuffer() *SharedBuffer

func (*SharedBuffer) Close

func (s *SharedBuffer) Close() error

func (*SharedBuffer) Closed

func (s *SharedBuffer) Closed() bool

func (*SharedBuffer) Read

func (s *SharedBuffer) Read(p []byte) (n int, err error)

func (*SharedBuffer) Write

func (s *SharedBuffer) Write(p []byte) (n int, err error)

type Signal

type Signal struct {
	// contains filtered or unexported fields
}

Signal describes an event that can be waited on for at least one signal. Signalling the event while it is in the signalled state is a noop. When the waiter wakes up, the signal is set to unsignalled. It is a way for any number of writers to inform a reader (without blocking) that an event has happened.

func NewSignal

func NewSignal() Signal

NewSignal creates a new Signal.

func (Signal) Signal

func (s Signal) Signal()

Signal signals the event.

func (Signal) Wait

func (s Signal) Wait()

Wait for the event to be signalled.

func (Signal) WaitChannel

func (s Signal) WaitChannel() <-chan struct{}

WaitChannel returns a channel that is readable after Signal is called.

type StreamErrorMap

type StreamErrorMap struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

StreamErrorMap is used to track stream errors. This is a separate structure to ActiveStreamMap because errors can be raised against non-existent or closed streams.

func NewStreamErrorMap

func NewStreamErrorMap() *StreamErrorMap

NewStreamErrorMap creates a new StreamErrorMap.

func (*StreamErrorMap) GetErrors

func (s *StreamErrorMap) GetErrors() map[uint32]http2.ErrCode

GetErrors retrieves all errors currently raised. This resets the currently-tracked errors.

func (*StreamErrorMap) GetSignalChan

func (s *StreamErrorMap) GetSignalChan() <-chan struct{}

GetSignalChan returns a channel that is signalled when an error is raised.

func (*StreamErrorMap) RaiseError

func (s *StreamErrorMap) RaiseError(streamID uint32, err http2.ErrCode)

RaiseError raises a stream error.

type TunnelHostname

type TunnelHostname string

func (TunnelHostname) IsSet

func (th TunnelHostname) IsSet() bool

func (TunnelHostname) String

func (th TunnelHostname) String() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL