Documentation ¶
Overview ¶
Package grid provides single-connection two-way grid communication.
Index ¶
- Constants
- Variables
- func GetByteBufferCap(wantSz int) []byte
- func GetSubroute(ctx context.Context) string
- func NewNPErr(err error) (NoPayload, *RemoteErr)
- func WriterToChannel(ctx context.Context, ch chan<- []byte) io.Writer
- type Array
- type ArrayOf
- type AuthFn
- type Bytes
- type Connection
- func (c *Connection) NewStream(ctx context.Context, h HandlerID, payload []byte) (st *Stream, err error)
- func (c *Connection) Request(ctx context.Context, h HandlerID, req []byte) ([]byte, error)
- func (c *Connection) State() State
- func (c *Connection) Stats() ConnectionStats
- func (c *Connection) String() string
- func (c *Connection) StringReverse() string
- func (c *Connection) Subroute(s string) *Subroute
- func (c *Connection) WaitForConnect(ctx context.Context) error
- type ConnectionStats
- type ContextDialer
- type ErrResponse
- type Flags
- func (f *Flags) Clear(flags Flags)
- func (z *Flags) DecodeMsg(dc *msgp.Reader) (err error)
- func (z Flags) EncodeMsg(en *msgp.Writer) (err error)
- func (z Flags) MarshalMsg(b []byte) (o []byte, err error)
- func (z Flags) Msgsize() (s int)
- func (f *Flags) Set(flags Flags)
- func (f Flags) String() string
- func (z *Flags) UnmarshalMsg(bts []byte) (o []byte, err error)
- type HandlerID
- func (z *HandlerID) DecodeMsg(dc *msgp.Reader) (err error)
- func (z HandlerID) EncodeMsg(en *msgp.Writer) (err error)
- func (z HandlerID) MarshalMsg(b []byte) (o []byte, err error)
- func (z HandlerID) Msgsize() (s int)
- func (i HandlerID) String() string
- func (z *HandlerID) UnmarshalMsg(bts []byte) (o []byte, err error)
- type JSON
- type JSONPool
- type MSS
- type Manager
- func (m *Manager) AddToMux(router *mux.Router)
- func (m *Manager) Connection(host string) *Connection
- func (m *Manager) Handler() http.HandlerFunc
- func (m *Manager) HostName() string
- func (m *Manager) RegisterSingleHandler(id HandlerID, h SingleHandlerFn, subroute ...string) error
- func (m *Manager) RegisterStreamingHandler(id HandlerID, h StreamHandler) error
- func (m *Manager) Targets() []string
- type ManagerOptions
- type NoPayload
- type Op
- type Recycler
- type RemoteClient
- type RemoteErr
- type Requester
- type Response
- type RoundTripper
- type SingleHandler
- func (h *SingleHandler[Req, Resp]) AllowCallRequestPool(b bool) *SingleHandler[Req, Resp]
- func (h *SingleHandler[Req, Resp]) Call(ctx context.Context, c Requester, req Req) (resp Resp, err error)
- func (h *SingleHandler[Req, Resp]) IgnoreNilConn() *SingleHandler[Req, Resp]
- func (h *SingleHandler[Req, Resp]) NewRequest() Req
- func (h *SingleHandler[Req, Resp]) NewResponse() Resp
- func (h *SingleHandler[Req, Resp]) PutResponse(r Resp)
- func (h *SingleHandler[Req, Resp]) Register(m *Manager, handle func(req Req) (resp Resp, err *RemoteErr), ...) error
- func (h *SingleHandler[Req, Resp]) WithSharedResponse() *SingleHandler[Req, Resp]
- type SingleHandlerFn
- type State
- type StatelessHandler
- type StatelessHandlerFn
- type Stream
- type StreamHandler
- type StreamHandlerFn
- type StreamTypeHandler
- func (h *StreamTypeHandler[Payload, Req, Resp]) Call(ctx context.Context, c Streamer, payload Payload) (st *TypedStream[Req, Resp], err error)
- func (h *StreamTypeHandler[Payload, Req, Resp]) NewPayload() Payload
- func (h *StreamTypeHandler[Payload, Req, Resp]) NewRequest() Req
- func (h *StreamTypeHandler[Payload, Req, Resp]) NewResponse() Resp
- func (h *StreamTypeHandler[Payload, Req, Resp]) PutRequest(r Req)
- func (h *StreamTypeHandler[Payload, Req, Resp]) PutResponse(r Resp)
- func (h *StreamTypeHandler[Payload, Req, Resp]) Register(m *Manager, ...) error
- func (h *StreamTypeHandler[Payload, Req, Resp]) RegisterNoInput(m *Manager, ...) error
- func (h *StreamTypeHandler[Payload, Req, Resp]) RegisterNoPayload(m *Manager, ...) error
- func (h *StreamTypeHandler[Payload, Req, Resp]) WithInCapacity(in int) *StreamTypeHandler[Payload, Req, Resp]
- func (h *StreamTypeHandler[Payload, Req, Resp]) WithOutCapacity(out int) *StreamTypeHandler[Payload, Req, Resp]
- func (h *StreamTypeHandler[Payload, Req, Resp]) WithSharedResponse() *StreamTypeHandler[Payload, Req, Resp]
- type Streamer
- type Subroute
- type TestGrid
- type TraceParamsKey
- type TypedStream
- type URLValues
Constants ¶
const ( // StateUnconnected is the initial state of a connection. // When the first message is sent it will attempt to connect. StateUnconnected = iota // StateConnecting is the state from StateUnconnected while the connection is attempted to be established. // After this connection will be StateConnected or StateConnectionError. StateConnecting // StateConnected is the state when the connection has been established and is considered stable. // If the connection is lost, state will switch to StateConnecting. StateConnected // StateConnectionError is the state once a connection attempt has been made, and it failed. // The connection will remain in this stat until the connection has been successfully re-established. StateConnectionError // StateShutdown is the state when the server has been shut down. // This will not be used under normal operation. StateShutdown // MaxDeadline is the maximum deadline allowed, // Approx 49 days. MaxDeadline = time.Duration(math.MaxUint32) * time.Millisecond )
const (
// RoutePath is the remote path to connect to.
RoutePath = "/minio/grid/" + apiVersion
)
Variables ¶
var ( // ErrUnknownHandler is returned when an unknown handler is requested. ErrUnknownHandler = errors.New("unknown mux handler") // ErrHandlerAlreadyExists is returned when a handler is already registered. ErrHandlerAlreadyExists = errors.New("mux handler already exists") // ErrIncorrectSequence is returned when an out-of-sequence item is received. ErrIncorrectSequence = errors.New("out-of-sequence item received") )
var ErrDisconnected = RemoteErr("remote disconnected")
ErrDisconnected is returned when the connection to the remote has been lost during the call.
var GetByteBuffer = func() []byte { b := *internalByteBuffer.Get().(*[]byte) return b[:0] }
GetByteBuffer can be replaced with a function that returns a small byte buffer. When replacing PutByteBuffer should also be replaced There is no minimum size.
var PutByteBuffer = func(b []byte) { if cap(b) >= biggerBufMin && cap(b) < biggerBufMax { internal32KByteBuffer.Put(&b) return } if cap(b) >= minBufferSize && cap(b) < biggerBufMin { internalByteBuffer.Put(&b) return } }
PutByteBuffer is for returning byte buffers.
Functions ¶
func GetByteBufferCap ¶
GetByteBufferCap returns a length 0 byte buffer with at least the given capacity.
func GetSubroute ¶
GetSubroute returns caller information from contexts provided to handlers.
func NewNPErr ¶
NewNPErr is a helper to no payload and optional remote error. The error type is not preserved.
func WriterToChannel ¶
WriterToChannel will return an io.Writer that writes to the given channel. The context both allows returning errors on writes and to ensure that this isn't abandoned if the channel is no longer being read from.
Types ¶
type Array ¶
type Array[T RoundTripper] struct { // contains filtered or unexported fields }
Array provides a wrapper for an underlying array of serializable objects.
func (*Array[T]) Append ¶
Append a value to the underlying array. The returned Array is always the same as the one called.
func (*Array[T]) MarshalMsg ¶
MarshalMsg implements msgp.Marshaler
func (*Array[T]) UnmarshalMsg ¶
UnmarshalMsg will JSON marshal the value and wrap as a msgp byte array. Nil values are supported.
type ArrayOf ¶
type ArrayOf[T RoundTripper] struct { // contains filtered or unexported fields }
ArrayOf wraps an array of Messagepack compatible objects.
func NewArrayOf ¶
func NewArrayOf[T RoundTripper](newFn func() T) *ArrayOf[T]
NewArrayOf returns a new ArrayOf. You must provide a function that returns a new instance of T.
type Bytes ¶
type Bytes []byte
Bytes provides a byte slice that can be serialized.
func NewBytesCap ¶
NewBytesCap returns an empty Bytes with the given capacity.
func NewBytesWith ¶
NewBytesWith returns a new Bytes with the provided content. When sent as a parameter, the caller gives up ownership of the byte slice. When returned as response, the handler also gives up ownership of the byte slice.
func NewBytesWithCopyOf ¶
NewBytesWithCopyOf returns a new byte slice with a copy of the provided content.
func (*Bytes) MarshalMsg ¶
MarshalMsg appends the bytes representation of b to the provided byte slice.
type Connection ¶
type Connection struct { // NextID is the next ID that can be used (atomic). NextID uint64 // LastPong is last pong time (atomic) // Only valid when StateConnected. LastPong int64 // Non-atomic Remote string Local string // contains filtered or unexported fields }
A Connection is a remote connection. There is no distinction externally whether the connection was initiated from this server or from the remote.
func (*Connection) NewStream ¶
func (c *Connection) NewStream(ctx context.Context, h HandlerID, payload []byte) (st *Stream, err error)
NewStream creates a new stream. Initial payload can be reused by the caller.
func (*Connection) Request ¶
Request allows to do a single remote request. 'req' will not be used after the call and caller can reuse. If no deadline is set on ctx, a 1-minute deadline will be added.
func (*Connection) State ¶
func (c *Connection) State() State
State returns the current connection status.
func (*Connection) Stats ¶
func (c *Connection) Stats() ConnectionStats
Stats returns the current connection stats.
func (*Connection) String ¶
func (c *Connection) String() string
String returns a string representation of the connection.
func (*Connection) StringReverse ¶
func (c *Connection) StringReverse() string
StringReverse returns a string representation of the reverse connection.
func (*Connection) Subroute ¶
func (c *Connection) Subroute(s string) *Subroute
Subroute returns a static subroute for the connection.
func (*Connection) WaitForConnect ¶
func (c *Connection) WaitForConnect(ctx context.Context) error
WaitForConnect will block until a connection has been established or the context is canceled, in which case the context error is returned.
type ConnectionStats ¶
ConnectionStats contains connection statistics.
type ContextDialer ¶
ContextDialer is a dialer that can be used to dial a remote.
func (ContextDialer) DialContext ¶
DialContext implements the Dialer interface.
type ErrResponse ¶
type ErrResponse struct {
// contains filtered or unexported fields
}
ErrResponse is a remote error response.
func (ErrResponse) Error ¶
func (e ErrResponse) Error() string
type Flags ¶
type Flags uint8
Flags is a set of flags set on a message.
const ( // FlagCRCxxh3 indicates that, the lower 32 bits of xxhash3 of the serialized // message will be sent after the serialized message as little endian. FlagCRCxxh3 Flags = 1 << iota // FlagEOF the stream (either direction) is at EOF. FlagEOF // FlagStateless indicates the message is stateless. // This will retain clients across reconnections or // if sequence numbers are unexpected. FlagStateless // FlagPayloadIsErr can be used by individual ops to signify that // The payload is a string error converted to byte slice. FlagPayloadIsErr // FlagPayloadIsZero means that payload is 0-length slice and not nil. FlagPayloadIsZero // FlagSubroute indicates that the message has subroute. // Subroute will be 32 bytes long and added before any CRC. FlagSubroute )
func (Flags) MarshalMsg ¶
MarshalMsg implements msgp.Marshaler
type HandlerID ¶
type HandlerID uint8
HandlerID is the ID for the handler of a specific type.
const ( HandlerLockLock HandlerID HandlerLockRLock HandlerLockUnlock HandlerLockRUnlock HandlerLockRefresh HandlerLockForceUnlock HandlerWalkDir HandlerStatVol HandlerDiskInfo HandlerNSScanner HandlerReadXL HandlerReadVersion HandlerDeleteFile HandlerDeleteVersion HandlerUpdateMetadata HandlerWriteMetadata HandlerCheckParts HandlerRenameData HandlerRenameFile HandlerReadAll HandlerServerVerify HandlerTrace HandlerListen HandlerDeleteBucketMetadata HandlerLoadBucketMetadata HandlerReloadSiteReplicationConfig HandlerReloadPoolMeta HandlerStopRebalance HandlerLoadRebalanceMeta HandlerLoadTransitionTierConfig HandlerDeletePolicy HandlerLoadPolicy HandlerLoadPolicyMapping HandlerDeleteServiceAccount HandlerLoadServiceAccount HandlerDeleteUser HandlerLoadUser HandlerLoadGroup HandlerHealBucket HandlerMakeBucket HandlerHeadBucket HandlerDeleteBucket HandlerGetMetrics HandlerGetResourceMetrics HandlerGetMemInfo HandlerGetProcInfo HandlerGetOSInfo HandlerGetPartitions HandlerGetNetInfo HandlerGetCPUs HandlerServerInfo HandlerGetSysConfig HandlerGetSysServices HandlerGetSysErrors HandlerGetAllBucketStats HandlerGetBucketStats HandlerGetSRMetrics HandlerGetPeerMetrics HandlerGetMetacacheListing HandlerUpdateMetacacheListing HandlerGetPeerBucketMetrics HandlerStorageInfo HandlerConsoleLog HandlerListDir HandlerGetLocks HandlerBackgroundHealStatus HandlerGetLastDayTierStats HandlerSignalService HandlerGetBandwidth HandlerWriteAll HandlerListBuckets HandlerRenameDataInline HandlerRenameData2 )
HandlerID is a handler identifier. It is used to determine request routing on the server. Handlers can be registered with a static subroute. Do NOT remove or change the order of existing handlers.
func (HandlerID) MarshalMsg ¶
MarshalMsg implements msgp.Marshaler
type JSON ¶
type JSON[T any] struct { // contains filtered or unexported fields }
JSON is a wrapper around a T object that can be serialized. There is an internal value
func (*JSON[T]) MarshalMsg ¶
MarshalMsg implements msgp.Marshaler
func (*JSON[T]) UnmarshalMsg ¶
UnmarshalMsg will JSON marshal the value and wrap as a msgp byte array. Nil values are supported.
func (*JSON[T]) Value ¶
func (j *JSON[T]) Value() *T
Value returns the underlying value. If not set yet, a new value is created.
func (*JSON[T]) ValueOrZero ¶
func (j *JSON[T]) ValueOrZero() T
ValueOrZero returns the underlying value. If the underlying value is nil, a zero value is returned.
type JSONPool ¶
type JSONPool[T any] struct { // contains filtered or unexported fields }
JSONPool is a pool for JSON objects that unmarshal into T.
func (*JSONPool[T]) NewJSONWith ¶
NewJSONWith returns a new JSON with the provided value.
type MSS ¶
MSS is a map[string]string that can be serialized. It is not very efficient, but it is only used for easy parameter passing.
func NewMSSWith ¶
NewMSSWith returns a new MSS with the given map.
func (*MSS) MarshalMsg ¶
MarshalMsg appends the bytes representation of b to the provided byte slice.
func (*MSS) Msgsize ¶
Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message.
type Manager ¶
type Manager struct { // ID is an instance ID, that will change whenever the server restarts. // This allows remotes to keep track of whether state is preserved. ID uuid.UUID // contains filtered or unexported fields }
Manager will contain all the connections to the grid. It also handles incoming requests and routes them to the appropriate connection.
func NewManager ¶
func NewManager(ctx context.Context, o ManagerOptions) (*Manager, error)
NewManager creates a new grid manager
func (*Manager) Connection ¶
func (m *Manager) Connection(host string) *Connection
Connection will return the connection for the specified host. If the host does not exist nil will be returned.
func (*Manager) Handler ¶
func (m *Manager) Handler() http.HandlerFunc
Handler returns a handler that can be used to serve grid requests. This should be connected on RoutePath to the main server.
func (*Manager) RegisterSingleHandler ¶
func (m *Manager) RegisterSingleHandler(id HandlerID, h SingleHandlerFn, subroute ...string) error
RegisterSingleHandler will register a stateless handler that serves []byte -> ([]byte, error) requests. subroutes are joined with "/" to a single subroute.
func (*Manager) RegisterStreamingHandler ¶
func (m *Manager) RegisterStreamingHandler(id HandlerID, h StreamHandler) error
RegisterStreamingHandler will register a stateless handler that serves two-way streaming requests.
type ManagerOptions ¶
type ManagerOptions struct { Dialer ContextDialer // Outgoing dialer. Local string // Local host name. Hosts []string // All hosts, including local in the grid. AddAuth AuthFn // Add authentication to the given audience. AuthRequest func(r *http.Request) error // Validate incoming requests. TLSConfig *tls.Config // TLS to apply to the connections. Incoming func(n int64) // Record incoming bytes. Outgoing func(n int64) // Record outgoing bytes. BlockConnect chan struct{} // If set, incoming and outgoing connections will be blocked until closed. TraceTo *pubsub.PubSub[madmin.TraceInfo, madmin.TraceType] }
ManagerOptions are options for creating a new grid manager.
type NoPayload ¶
type NoPayload struct{}
NoPayload is a type that can be used for handlers that do not use a payload.
func (NoPayload) MarshalMsg ¶
MarshalMsg satisfies the interface, but is a no-op.
type Op ¶
type Op uint8
Op is operation type messages.
const ( // OpConnect is a connect request. OpConnect Op = iota + 1 // OpConnectResponse is a response to a connect request. OpConnectResponse // OpPing is a ping request. // If a mux id is specified that mux is pinged. // Clients send ping requests. OpPing // OpPong is a OpPing response returned by the server. OpPong // OpConnectMux will connect a new mux with optional payload. OpConnectMux // OpMuxConnectError is an error while connecting a mux. OpMuxConnectError // OpDisconnectClientMux instructs a client to disconnect a mux OpDisconnectClientMux // OpDisconnectServerMux instructs a server to disconnect (cancel) a server mux OpDisconnectServerMux // OpMuxClientMsg contains a message to a client Mux OpMuxClientMsg // OpMuxServerMsg contains a message to a server Mux OpMuxServerMsg // OpUnblockSrvMux contains a message that a server mux is unblocked with one. // Only Stateful streams has flow control. OpUnblockSrvMux // OpUnblockClMux contains a message that a client mux is unblocked with one. // Only Stateful streams has flow control. OpUnblockClMux // OpAckMux acknowledges a mux was created. OpAckMux // OpRequest is a single request + response. // MuxID is returned in response. OpRequest // OpResponse is a response to a single request. // FlagPayloadIsErr is used to signify that the payload is a string error converted to byte slice. // When a response is received, the mux is already removed from the remote. OpResponse // OpDisconnect instructs that remote wants to disconnect OpDisconnect // OpMerged is several operations merged into one. OpMerged )
func (Op) MarshalMsg ¶
MarshalMsg implements msgp.Marshaler
type Recycler ¶
type Recycler interface {
Recycle()
}
Recycler will override the internal reuse in typed handlers. When this is supported, the handler will not do internal pooling of objects, call Recycle() when the object is no longer needed. The recycler should handle nil pointers.
type RemoteClient ¶
type RemoteClient struct {
Name string
}
RemoteClient contains information about the caller.
func GetCaller ¶
func GetCaller(ctx context.Context) *RemoteClient
GetCaller returns caller information from contexts provided to handlers.
type RemoteErr ¶
type RemoteErr string
RemoteErr is a remote error type. Any error seen on a remote will be returned like this.
func IsRemoteErr ¶
IsRemoteErr returns the value if the error is a RemoteErr.
func NewRemoteErr ¶
NewRemoteErr creates a new remote error. The error type is not preserved.
func NewRemoteErrString ¶
NewRemoteErrString creates a new remote error from a string.
func NewRemoteErrf ¶
NewRemoteErrf creates a new remote error from a format string.
type RoundTripper ¶
type RoundTripper interface { msgp.Unmarshaler msgp.Marshaler msgp.Sizer comparable }
RoundTripper provides an interface for type roundtrip serialization.
type SingleHandler ¶
type SingleHandler[Req, Resp RoundTripper] struct { // contains filtered or unexported fields }
SingleHandler is a type safe handler for single roundtrip requests.
func NewSingleHandler ¶
func NewSingleHandler[Req, Resp RoundTripper](h HandlerID, newReq func() Req, newResp func() Resp) *SingleHandler[Req, Resp]
NewSingleHandler creates a typed handler that can provide Marshal/Unmarshal. Use Register to register a server handler. Use Call to initiate a clientside call.
func (*SingleHandler[Req, Resp]) AllowCallRequestPool ¶
func (h *SingleHandler[Req, Resp]) AllowCallRequestPool(b bool) *SingleHandler[Req, Resp]
AllowCallRequestPool indicates it is safe to reuse the request on the client side, meaning the request is recycled/pooled when a request is sent. CAREFUL: This should only be used when there are no pointers, slices that aren't freshly constructed.
func (*SingleHandler[Req, Resp]) Call ¶
func (h *SingleHandler[Req, Resp]) Call(ctx context.Context, c Requester, req Req) (resp Resp, err error)
Call the remote with the request and return the response. The response should be returned with PutResponse when no error. If no deadline is set, a 1-minute deadline is added.
func (*SingleHandler[Req, Resp]) IgnoreNilConn ¶
func (h *SingleHandler[Req, Resp]) IgnoreNilConn() *SingleHandler[Req, Resp]
IgnoreNilConn will ignore nil connections when calling. This will make Call return nil instead of ErrDisconnected when the connection is nil. This may only be set ONCE before use.
func (*SingleHandler[Req, Resp]) NewRequest ¶
func (h *SingleHandler[Req, Resp]) NewRequest() Req
NewRequest creates a new request. Handlers can use this to create a reusable request. The request may be reused, so caller should clear any fields.
func (*SingleHandler[Req, Resp]) NewResponse ¶
func (h *SingleHandler[Req, Resp]) NewResponse() Resp
NewResponse creates a new response. Handlers can use this to create a reusable response. The response may be reused, so caller should clear any fields.
func (*SingleHandler[Req, Resp]) PutResponse ¶
func (h *SingleHandler[Req, Resp]) PutResponse(r Resp)
PutResponse will accept a response for reuse. This can be used by a caller to recycle a response after receiving it from a Call.
func (*SingleHandler[Req, Resp]) Register ¶
func (h *SingleHandler[Req, Resp]) Register(m *Manager, handle func(req Req) (resp Resp, err *RemoteErr), subroute ...string) error
Register a handler for a Req -> Resp roundtrip. Requests are automatically recycled.
func (*SingleHandler[Req, Resp]) WithSharedResponse ¶
func (h *SingleHandler[Req, Resp]) WithSharedResponse() *SingleHandler[Req, Resp]
WithSharedResponse indicates it is unsafe to reuse the response when it has been returned on a handler. This will disable automatic response recycling/pooling. Typically this is used when the response sharing part of its data structure.
type SingleHandlerFn ¶
SingleHandlerFn is handlers for one to one requests. A non-nil error value will be returned as RemoteErr(msg) to client. No client information or cancellation (deadline) is available. Include this in payload if needed. Payload should be recycled with PutByteBuffer if not needed after the call.
type StatelessHandler ¶
type StatelessHandler struct { Handle StatelessHandlerFn // OutCapacity is the output capacity on the caller. // If <= 0 capacity will be 1. OutCapacity int }
StatelessHandler is handlers for one to many requests, where responses may be dropped. Stateless requests provide no incoming stream and there is no flow control on outgoing messages.
type StatelessHandlerFn ¶
StatelessHandlerFn must handle incoming stateless request. A non-nil error value will be returned as RemoteErr(msg) to client.
type Stream ¶
type Stream struct { // Requests sent to the server. // If the handler is defined with 0 incoming capacity this will be nil. // Channel *must* be closed to signal the end of the stream. // If the request context is canceled, the stream will no longer process requests. // Requests sent cannot be used any further by the called. Requests chan<- []byte // contains filtered or unexported fields }
A Stream is a two-way stream. All responses *must* be read by the caller. If the call is canceled through the context, the appropriate error will be returned.
type StreamHandler ¶
type StreamHandler struct { // Handle an incoming request. Initial payload is sent. // Additional input packets (if any) are streamed to request. // Upstream will block when request channel is full. // Response packets can be sent at any time. // Any non-nil error sent as response means no more responses are sent. Handle StreamHandlerFn // Subroute for handler. // Subroute must be static and clients should specify a matching subroute. // Should not be set unless there are different handlers for the same HandlerID. Subroute string // OutCapacity is the output capacity. If <= 0 capacity will be 1. OutCapacity int // InCapacity is the output capacity. // If == 0 no input is expected InCapacity int }
StreamHandler handles fully bidirectional streams, There is flow control in both directions.
type StreamHandlerFn ¶
type StreamHandlerFn func(ctx context.Context, payload []byte, in <-chan []byte, out chan<- []byte) *RemoteErr
StreamHandlerFn must process a request with an optional initial payload. It must keep consuming from 'in' until it returns. 'in' and 'out' are independent. The handler should never close out. Buffers received from 'in' can be recycled with PutByteBuffer. Buffers sent on out can not be referenced once sent.
type StreamTypeHandler ¶
type StreamTypeHandler[Payload, Req, Resp RoundTripper] struct { WithPayload bool // Override the default capacities (1) OutCapacity int // Set to 0 if no input is expected. // Will be 0 if newReq is nil. InCapacity int // contains filtered or unexported fields }
StreamTypeHandler is a type safe handler for streaming requests.
func NewStream ¶
func NewStream[Payload, Req, Resp RoundTripper](h HandlerID, newPayload func() Payload, newReq func() Req, newResp func() Resp) *StreamTypeHandler[Payload, Req, Resp]
NewStream creates a typed handler that can provide Marshal/Unmarshal. Use Register to register a server handler. Use Call to initiate a clientside call. newPayload can be nil. In that case payloads will always be nil. newReq can be nil. In that case no input stream is expected and the handler will be called with nil 'in' channel.
func (*StreamTypeHandler[Payload, Req, Resp]) Call ¶
func (h *StreamTypeHandler[Payload, Req, Resp]) Call(ctx context.Context, c Streamer, payload Payload) (st *TypedStream[Req, Resp], err error)
Call the remove with the request and
func (*StreamTypeHandler[Payload, Req, Resp]) NewPayload ¶
func (h *StreamTypeHandler[Payload, Req, Resp]) NewPayload() Payload
NewPayload creates a new payload.
func (*StreamTypeHandler[Payload, Req, Resp]) NewRequest ¶
func (h *StreamTypeHandler[Payload, Req, Resp]) NewRequest() Req
NewRequest creates a new request. The struct may be reused, so caller should clear any fields.
func (*StreamTypeHandler[Payload, Req, Resp]) NewResponse ¶
func (h *StreamTypeHandler[Payload, Req, Resp]) NewResponse() Resp
NewResponse creates a new response. Handlers can use this to create a reusable response.
func (*StreamTypeHandler[Payload, Req, Resp]) PutRequest ¶
func (h *StreamTypeHandler[Payload, Req, Resp]) PutRequest(r Req)
PutRequest will accept a request for reuse. These should be returned by the handler.
func (*StreamTypeHandler[Payload, Req, Resp]) PutResponse ¶
func (h *StreamTypeHandler[Payload, Req, Resp]) PutResponse(r Resp)
PutResponse will accept a response for reuse. These should be returned by the caller.
func (*StreamTypeHandler[Payload, Req, Resp]) Register ¶
func (h *StreamTypeHandler[Payload, Req, Resp]) Register(m *Manager, handle func(ctx context.Context, p Payload, in <-chan Req, out chan<- Resp) *RemoteErr, subroute ...string) error
Register a handler for two-way streaming with payload, input stream and output stream. An optional subroute can be given. Multiple entries are joined with '/'.
func (*StreamTypeHandler[Payload, Req, Resp]) RegisterNoInput ¶
func (h *StreamTypeHandler[Payload, Req, Resp]) RegisterNoInput(m *Manager, handle func(ctx context.Context, p Payload, out chan<- Resp) *RemoteErr, subroute ...string) error
RegisterNoInput a handler for one-way streaming with payload and output stream. An optional subroute can be given. Multiple entries are joined with '/'.
func (*StreamTypeHandler[Payload, Req, Resp]) RegisterNoPayload ¶
func (h *StreamTypeHandler[Payload, Req, Resp]) RegisterNoPayload(m *Manager, handle func(ctx context.Context, in <-chan Req, out chan<- Resp) *RemoteErr, subroute ...string) error
RegisterNoPayload a handler for one-way streaming with payload and output stream. An optional subroute can be given. Multiple entries are joined with '/'.
func (*StreamTypeHandler[Payload, Req, Resp]) WithInCapacity ¶
func (h *StreamTypeHandler[Payload, Req, Resp]) WithInCapacity(in int) *StreamTypeHandler[Payload, Req, Resp]
WithInCapacity adjusts the input capacity from the handler perspective. This must be done prior to registering the handler.
func (*StreamTypeHandler[Payload, Req, Resp]) WithOutCapacity ¶
func (h *StreamTypeHandler[Payload, Req, Resp]) WithOutCapacity(out int) *StreamTypeHandler[Payload, Req, Resp]
WithOutCapacity adjusts the output capacity from the handler perspective. This must be done prior to registering the handler.
func (*StreamTypeHandler[Payload, Req, Resp]) WithSharedResponse ¶
func (h *StreamTypeHandler[Payload, Req, Resp]) WithSharedResponse() *StreamTypeHandler[Payload, Req, Resp]
WithSharedResponse indicates it is unsafe to reuse the response. Typically this is used when the response sharing part of its data structure.
type Streamer ¶
type Streamer interface {
NewStream(ctx context.Context, h HandlerID, payload []byte) (st *Stream, err error)
}
Streamer creates a stream.
type Subroute ¶
type Subroute struct { *Connection // contains filtered or unexported fields }
Subroute is a connection subroute that can be used to route to a specific handler with the same handler ID.
func (*Subroute) NewStream ¶
func (c *Subroute) NewStream(ctx context.Context, h HandlerID, payload []byte) (st *Stream, err error)
NewStream creates a new stream. Initial payload can be reused by the caller.
type TestGrid ¶
type TestGrid struct { Servers []*httptest.Server Listeners []net.Listener Managers []*Manager Mux []*mux.Router Hosts []string // contains filtered or unexported fields }
TestGrid contains a grid of servers for testing purposes.
func SetupTestGrid ¶
SetupTestGrid creates a new grid for testing purposes. Select the number of hosts to create. Call (TestGrid).Cleanup() when done.
func (*TestGrid) WaitAllConnect ¶
WaitAllConnect will wait for all connections to be established.
type TraceParamsKey ¶
type TraceParamsKey struct{}
TraceParamsKey allows to pass trace parameters to the request via context. This is only needed when un-typed requests are used. MSS, map[string]string types are preferred, but any struct with exported fields will work.
type TypedStream ¶
type TypedStream[Req, Resp RoundTripper] struct { // Requests sent to the server. // If the handler is defined with 0 incoming capacity this will be nil. // Channel *must* be closed to signal the end of the stream. // If the request context is canceled, the stream will no longer process requests. Requests chan<- Req // contains filtered or unexported fields }
TypedStream is a stream with specific types.
func (*TypedStream[Req, Resp]) Results ¶
func (s *TypedStream[Req, Resp]) Results(next func(resp Resp) error) (err error)
Results returns the results from the remote server one by one. If any error is returned by the callback, the stream will be canceled. If the context is canceled, the stream will be canceled.
type URLValues ¶
URLValues can be used for url.Values.
func NewURLValuesWith ¶
NewURLValuesWith returns a new URLValues with the provided content.
func (URLValues) MarshalMsg ¶
MarshalMsg implements msgp.Marshaler
func (URLValues) Msgsize ¶
Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
func (*URLValues) UnmarshalMsg ¶
UnmarshalMsg implements msgp.Unmarshaler