grid

package
v0.0.0-...-04f92f1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 2, 2024 License: AGPL-3.0 Imports: 34 Imported by: 0

README

MinIO Grid

The MinIO Grid is a package that provides two-way communication between servers. It uses a single two-way connection to send and receive messages between servers.

It includes built in muxing of concurrent requests as well as congestion handling for streams.

Requests can be "Single Payload" or "Streamed".

Use the MinIO Grid for:

  • Small, frequent requests with low latency requirements.
  • Long-running requests with small/medium payloads.

Do not use the MinIO Grid for:

  • Large payloads.

Only a single connection is ever made between two servers. Likely this means that this connection will not be able to saturate network bandwidth. Therefore, using this for large payloads will likely be slower than using a separate connection, and other connections will be blocked while the large payload is being sent.

Handlers & Routes

Handlers have a predefined Handler ID. In addition, there can be several static subroutes used to differentiate between different handlers of the same ID. A subroute on a client must match a subroute on the server. So routes cannot be used for dynamic routing, unlike HTTP.

Handlers should remain backwards compatible. If a breaking API change is required, a new handler ID should be created.

Setup & Configuration

A Manager is used to manage all incoming and outgoing connections to a server.

On startup all remote servers must be specified. From that individual connections will be spawned to each remote server, or incoming requests will be hooked up to the appropriate connection.

To get a connection to a specific server, use Manager.Connection(host) to get a connection to the specified host. From this connection individual requests can be made.

Each handler, with optional subroutes can be registered with the manager using Manager.RegisterXHandler(handlerID, handler, subroutes...).

A Handler() function provides an HTTP handler, which should be hooked up to the appropriate route on the server.

On startup, the manager will start connecting to remotes and also starts listening for incoming connections. Until a connection is established, all outgoing requests will return ErrDisconnected.

Usage

Single Payload Requests

Single payload requests are requests and responses that are sent in a single message. In essence, they are []byte -> []byte, error functions.

It is not possible to return both an error and a response.

Handlers are registered on the manager using (*Manager).RegisterSingleHandler(id HandlerID, h SingleHandlerFn, subroute ...string).

The server handler function has this signature: type SingleHandlerFn func(payload []byte) ([]byte, *RemoteErr).

Sample handler:

    handler :=  func(payload []byte) ([]byte, *grid.RemoteErr) {
        // Do something with payload
        return []byte("response"), nil
    }

    err := manager.RegisterSingleHandler(grid.HandlerDiskInfo, handler)

Sample call:

    // Get a connection to the remote host
    conn := manager.Connection(host)
	
    payload := []byte("request")
    response, err := conn.SingleRequest(ctx, grid.HandlerDiskInfo, payload)

If the error type is *RemoteErr, then the error was returned by the remote server. Otherwise it is a local error.

Context timeouts are propagated, and a default timeout of 1 minute is added if none is specified.

There is no cancellation propagation for single payload requests. When the context is canceled, the request will return at once with an appropriate error. However, the remote call will not see the cancellation - as can be seen from the 'missing' context on the handler. The result will be discarded.

Typed handlers

Typed handlers are handlers that have a specific type for the request and response payloads. These must provide msgp serialization and deserialization.

In the examples we use a MSS type, which is a map[string]string that is msgp serializable.

    handler := func(request *grid.MSS) (*grid.MSS, *grid.RemoteErr) {
        fmt.Println("Got request with field", request["myfield"])
        // Do something with payload
        return NewMSSWith(map[string]string{"result": "ok"}), nil
    }
	
    // Create a typed handler.
    // Due to current generics limitations, a constructor of the empty type must be provided.
    instance := grid.NewSingleHandler[*grid.MSS, *grid.MSS](h, grid.NewMSS, grid.NewMSS)
	
    // Register the handler on the manager
    instance.Register(manager, handler)
	
    // The typed instance is also used for calls
    conn := manager.Connection("host")
    resp, err := instance.Call(ctx, conn, grid.NewMSSWith(map[string]string{"myfield": "myvalue"}))
    if err == nil {
        fmt.Println("Got response with field", resp["result"])
    }

The wrapper will handle all serialization and de-seralization of the request and response, and furthermore provides reuse of the structs used for the request and response.

Note that Responses sent for serialization are automatically reused for similar requests. If the response contains shared data it will cause issues, since each unique response is reused. To disable this behavior, use (SingleHandler).WithSharedResponse() to disable it.

Streaming Requests

Streams consists of an initial request with payload and allows for full two-way communication between the client and server.

The handler function has this signature.

Sample handler:

    handler :=  func(ctx context.Context, payload []byte, in <-chan []byte, out chan<- []byte) *RemoteErr {
        fmt.Println("Got request with initial payload", p, "from", GetCaller(ctx context.Context))
        fmt.Println("Subroute:", GetSubroute(ctx))
        for {
            select {
            case <-ctx.Done():
                return nil
            case req, ok := <-in:
                if !ok {
                    break
                }           
                // Do something with payload
                out <- []byte("response")

                // Return the request for reuse
                grid.PutByteBuffer(req)
            }
        }
        // out is closed by the caller and should never be closed by the handler.
        return nil
    }

    err := manager.RegisterStreamingHandler(grid.HandlerDiskInfo, StreamHandler{
        Handle: handler,
        Subroute: "asubroute",
        OutCapacity: 1,
        InCapacity: 1,
    })

Sample call:

    // Get a connection to the remote host
    conn := manager.Connection(host).Subroute("asubroute")
	
    payload := []byte("request")
    stream, err := conn.NewStream(ctx, grid.HandlerDiskInfo, payload)
	if err != nil {
        return err
    }
    // Read results from the stream
    err = stream.Results(func(result []byte) error {
        fmt.Println("Got result", string(result))

        // Return the response for reuse
        grid.PutByteBuffer(result)
        return nil
    })

Context cancellation and timeouts are propagated to the handler. The client does not wait for the remote handler to finish before returning. Returning any error will also cancel the stream remotely.

CAREFUL: When utilizing two-way communication, it is important to ensure that the remote handler is not blocked on a send. If the remote handler is blocked on a send, and the client is trying to send without the remote receiving, the operation would become deadlocked if the channels are full.

Typed handlers

Typed handlers are handlers that have a specific type for the request and response payloads.

    // Create a typed handler.
    handler := func(ctx context.Context, p *Payload, in <-chan *Req, out chan<- *Resp) *RemoteErr {
        fmt.Println("Got request with initial payload", p, "from", GetCaller(ctx context.Context))
		fmt.Println("Subroute:", GetSubroute(ctx))
        for {
            select {
            case <-ctx.Done():
                return nil
            case req, ok := <-in:
                if !ok {
                    break
                }
                fmt.Println("Got request", in)
                // Do something with payload
                out <- Resp{"response"}
            }
            // out is closed by the caller and should never be closed by the handler.
            return nil
    }
	
    // Create a typed handler.
    // Due to current generics limitations, a constructor of the empty type must be provided.
    instance := grid.NewStream[*Payload, *Req, *Resp](h, newPayload, newReq, newResp)
	
    // Tweakable options
    instance.WithPayload = true // default true when newPayload != nil
    instance.OutCapacity = 1    // default
    instance.InCapacity = 1     // default true when newReq != nil
	
    // Register the handler on the manager
    instance.Register(manager, handler, "asubroute")
	
    // The typed instance is also used for calls
    conn := manager.Connection("host").Subroute("asubroute")
    stream, err := instance.Call(ctx, conn, &Payload{"request payload"})
    if err != nil { ... }
	
    // Read results from the stream
    err = stream.Results(func(resp *Resp) error {
        fmt.Println("Got result", resp)
        // Return the response for reuse
		instance.PutResponse(resp)
        return nil
    })

There are handlers for requests with:

  • No input stream: RegisterNoInput.
  • No initial payload: RegisterNoPayload.

Note that Responses sent for serialization are automatically reused for similar requests. If the response contains shared data it will cause issues, since each unique response is reused. To disable this behavior, use (StreamTypeHandler).WithSharedResponse() to disable it.

Documentation

Overview

Package grid provides single-connection two-way grid communication.

Index

Constants

View Source
const (
	// StateUnconnected is the initial state of a connection.
	// When the first message is sent it will attempt to connect.
	StateUnconnected = iota

	// StateConnecting is the state from StateUnconnected while the connection is attempted to be established.
	// After this connection will be StateConnected or StateConnectionError.
	StateConnecting

	// StateConnected is the state when the connection has been established and is considered stable.
	// If the connection is lost, state will switch to StateConnecting.
	StateConnected

	// StateConnectionError is the state once a connection attempt has been made, and it failed.
	// The connection will remain in this stat until the connection has been successfully re-established.
	StateConnectionError

	// StateShutdown is the state when the server has been shut down.
	// This will not be used under normal operation.
	StateShutdown

	// MaxDeadline is the maximum deadline allowed,
	// Approx 49 days.
	MaxDeadline = time.Duration(math.MaxUint32) * time.Millisecond
)
View Source
const (

	// RoutePath is the remote path to connect to.
	RoutePath = "/minio/grid/" + apiVersion
)

Variables

View Source
var (
	// ErrUnknownHandler is returned when an unknown handler is requested.
	ErrUnknownHandler = errors.New("unknown mux handler")

	// ErrHandlerAlreadyExists is returned when a handler is already registered.
	ErrHandlerAlreadyExists = errors.New("mux handler already exists")

	// ErrIncorrectSequence is returned when an out-of-sequence item is received.
	ErrIncorrectSequence = errors.New("out-of-sequence item received")
)
View Source
var ErrDisconnected = RemoteErr("remote disconnected")

ErrDisconnected is returned when the connection to the remote has been lost during the call.

View Source
var GetByteBuffer = func() []byte {
	b := *internalByteBuffer.Get().(*[]byte)
	return b[:0]
}

GetByteBuffer can be replaced with a function that returns a small byte buffer. When replacing PutByteBuffer should also be replaced There is no minimum size.

View Source
var PutByteBuffer = func(b []byte) {
	if cap(b) >= biggerBufMin && cap(b) < biggerBufMax {
		internal32KByteBuffer.Put(&b)
		return
	}
	if cap(b) >= minBufferSize && cap(b) < biggerBufMin {
		internalByteBuffer.Put(&b)
		return
	}
}

PutByteBuffer is for returning byte buffers.

Functions

func GetByteBufferCap

func GetByteBufferCap(wantSz int) []byte

GetByteBufferCap returns a length 0 byte buffer with at least the given capacity.

func GetSubroute

func GetSubroute(ctx context.Context) string

GetSubroute returns caller information from contexts provided to handlers.

func NewNPErr

func NewNPErr(err error) (NoPayload, *RemoteErr)

NewNPErr is a helper to no payload and optional remote error. The error type is not preserved.

func WriterToChannel

func WriterToChannel(ctx context.Context, ch chan<- []byte) io.Writer

WriterToChannel will return an io.Writer that writes to the given channel. The context both allows returning errors on writes and to ensure that this isn't abandoned if the channel is no longer being read from.

Types

type Array

type Array[T RoundTripper] struct {
	// contains filtered or unexported fields
}

Array provides a wrapper for an underlying array of serializable objects.

func (*Array[T]) Append

func (j *Array[T]) Append(v ...T) *Array[T]

Append a value to the underlying array. The returned Array is always the same as the one called.

func (*Array[T]) MarshalMsg

func (j *Array[T]) MarshalMsg(b []byte) (o []byte, err error)

MarshalMsg implements msgp.Marshaler

func (*Array[T]) Msgsize

func (j *Array[T]) Msgsize() int

Msgsize returns the size of the array in bytes.

func (*Array[T]) Recycle

func (j *Array[T]) Recycle()

Recycle the underlying value.

func (*Array[T]) Set

func (j *Array[T]) Set(val []T)

Set the underlying value.

func (*Array[T]) UnmarshalMsg

func (j *Array[T]) UnmarshalMsg(bytes []byte) ([]byte, error)

UnmarshalMsg will JSON marshal the value and wrap as a msgp byte array. Nil values are supported.

func (*Array[T]) Value

func (j *Array[T]) Value() []T

Value returns the underlying value. Regular append mechanics should be observed. If no value has been set yet, a new array is created.

type ArrayOf

type ArrayOf[T RoundTripper] struct {
	// contains filtered or unexported fields
}

ArrayOf wraps an array of Messagepack compatible objects.

func NewArrayOf

func NewArrayOf[T RoundTripper](newFn func() T) *ArrayOf[T]

NewArrayOf returns a new ArrayOf. You must provide a function that returns a new instance of T.

func (*ArrayOf[T]) New

func (p *ArrayOf[T]) New() *Array[T]

New returns a new empty Array.

func (*ArrayOf[T]) NewWith

func (p *ArrayOf[T]) NewWith(val []T) *Array[T]

NewWith returns a new Array with the provided value (not copied).

type AuthFn

type AuthFn func(aud string) string

AuthFn should provide an authentication string for the given aud.

type Bytes

type Bytes []byte

Bytes provides a byte slice that can be serialized.

func NewBytes

func NewBytes() *Bytes

NewBytes returns a new Bytes. A slice is preallocated.

func NewBytesCap

func NewBytesCap(size int) *Bytes

NewBytesCap returns an empty Bytes with the given capacity.

func NewBytesWith

func NewBytesWith(b []byte) *Bytes

NewBytesWith returns a new Bytes with the provided content. When sent as a parameter, the caller gives up ownership of the byte slice. When returned as response, the handler also gives up ownership of the byte slice.

func NewBytesWithCopyOf

func NewBytesWithCopyOf(b []byte) *Bytes

NewBytesWithCopyOf returns a new byte slice with a copy of the provided content.

func (*Bytes) MarshalMsg

func (b *Bytes) MarshalMsg(bytes []byte) ([]byte, error)

MarshalMsg appends the bytes representation of b to the provided byte slice.

func (*Bytes) Msgsize

func (b *Bytes) Msgsize() int

Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message.

func (*Bytes) Recycle

func (b *Bytes) Recycle()

Recycle puts the Bytes back into the pool.

func (*Bytes) UnmarshalMsg

func (b *Bytes) UnmarshalMsg(bytes []byte) ([]byte, error)

UnmarshalMsg deserializes b from the provided byte slice and returns the remainder of bytes.

type Connection

type Connection struct {
	// NextID is the next ID that can be used (atomic).
	NextID uint64

	// LastPong is last pong time (atomic)
	// Only valid when StateConnected.
	LastPong int64

	// Non-atomic
	Remote string
	Local  string
	// contains filtered or unexported fields
}

A Connection is a remote connection. There is no distinction externally whether the connection was initiated from this server or from the remote.

func (*Connection) NewStream

func (c *Connection) NewStream(ctx context.Context, h HandlerID, payload []byte) (st *Stream, err error)

NewStream creates a new stream. Initial payload can be reused by the caller.

func (*Connection) Request

func (c *Connection) Request(ctx context.Context, h HandlerID, req []byte) ([]byte, error)

Request allows to do a single remote request. 'req' will not be used after the call and caller can reuse. If no deadline is set on ctx, a 1-minute deadline will be added.

func (*Connection) State

func (c *Connection) State() State

State returns the current connection status.

func (*Connection) Stats

func (c *Connection) Stats() ConnectionStats

Stats returns the current connection stats.

func (*Connection) String

func (c *Connection) String() string

String returns a string representation of the connection.

func (*Connection) StringReverse

func (c *Connection) StringReverse() string

StringReverse returns a string representation of the reverse connection.

func (*Connection) Subroute

func (c *Connection) Subroute(s string) *Subroute

Subroute returns a static subroute for the connection.

func (*Connection) WaitForConnect

func (c *Connection) WaitForConnect(ctx context.Context) error

WaitForConnect will block until a connection has been established or the context is canceled, in which case the context error is returned.

type ConnectionStats

type ConnectionStats struct {
	OutgoingStreams int
	IncomingStreams int
}

ConnectionStats contains connection statistics.

type ContextDialer

type ContextDialer func(ctx context.Context, network, address string) (net.Conn, error)

ContextDialer is a dialer that can be used to dial a remote.

func (ContextDialer) DialContext

func (c ContextDialer) DialContext(ctx context.Context, network, address string) (net.Conn, error)

DialContext implements the Dialer interface.

type ErrResponse

type ErrResponse struct {
	// contains filtered or unexported fields
}

ErrResponse is a remote error response.

func (ErrResponse) Error

func (e ErrResponse) Error() string

type Flags

type Flags uint8

Flags is a set of flags set on a message.

const (
	// FlagCRCxxh3 indicates that, the lower 32 bits of xxhash3 of the serialized
	// message will be sent after the serialized message as little endian.
	FlagCRCxxh3 Flags = 1 << iota

	// FlagEOF the stream (either direction) is at EOF.
	FlagEOF

	// FlagStateless indicates the message is stateless.
	// This will retain clients across reconnections or
	// if sequence numbers are unexpected.
	FlagStateless

	// FlagPayloadIsErr can be used by individual ops to signify that
	// The payload is a string error converted to byte slice.
	FlagPayloadIsErr

	// FlagPayloadIsZero means that payload is 0-length slice and not nil.
	FlagPayloadIsZero

	// FlagSubroute indicates that the message has subroute.
	// Subroute will be 32 bytes long and added before any CRC.
	FlagSubroute
)

func (*Flags) Clear

func (f *Flags) Clear(flags Flags)

Clear one or more flags on f.

func (*Flags) DecodeMsg

func (z *Flags) DecodeMsg(dc *msgp.Reader) (err error)

DecodeMsg implements msgp.Decodable

func (Flags) EncodeMsg

func (z Flags) EncodeMsg(en *msgp.Writer) (err error)

EncodeMsg implements msgp.Encodable

func (Flags) MarshalMsg

func (z Flags) MarshalMsg(b []byte) (o []byte, err error)

MarshalMsg implements msgp.Marshaler

func (Flags) Msgsize

func (z Flags) Msgsize() (s int)

Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message

func (*Flags) Set

func (f *Flags) Set(flags Flags)

Set one or more flags on f.

func (Flags) String

func (f Flags) String() string

func (*Flags) UnmarshalMsg

func (z *Flags) UnmarshalMsg(bts []byte) (o []byte, err error)

UnmarshalMsg implements msgp.Unmarshaler

type HandlerID

type HandlerID uint8

HandlerID is the ID for the handler of a specific type.

const (
	HandlerLockLock HandlerID
	HandlerLockRLock
	HandlerLockUnlock
	HandlerLockRUnlock
	HandlerLockRefresh
	HandlerLockForceUnlock
	HandlerWalkDir
	HandlerStatVol
	HandlerDiskInfo
	HandlerNSScanner
	HandlerReadXL
	HandlerReadVersion
	HandlerDeleteFile
	HandlerDeleteVersion
	HandlerUpdateMetadata
	HandlerWriteMetadata
	HandlerCheckParts
	HandlerRenameData
	HandlerRenameFile
	HandlerReadAll
	HandlerServerVerify
	HandlerTrace
	HandlerListen
	HandlerDeleteBucketMetadata
	HandlerLoadBucketMetadata
	HandlerReloadSiteReplicationConfig
	HandlerReloadPoolMeta
	HandlerStopRebalance
	HandlerLoadRebalanceMeta
	HandlerLoadTransitionTierConfig
	HandlerDeletePolicy
	HandlerLoadPolicy
	HandlerLoadPolicyMapping
	HandlerDeleteServiceAccount
	HandlerLoadServiceAccount
	HandlerDeleteUser
	HandlerLoadUser
	HandlerLoadGroup
	HandlerHealBucket
	HandlerMakeBucket
	HandlerHeadBucket
	HandlerDeleteBucket
	HandlerGetMetrics
	HandlerGetResourceMetrics
	HandlerGetMemInfo
	HandlerGetProcInfo
	HandlerGetOSInfo
	HandlerGetPartitions
	HandlerGetNetInfo
	HandlerGetCPUs
	HandlerServerInfo
	HandlerGetSysConfig
	HandlerGetSysServices
	HandlerGetSysErrors
	HandlerGetAllBucketStats
	HandlerGetBucketStats
	HandlerGetSRMetrics
	HandlerGetPeerMetrics
	HandlerGetMetacacheListing
	HandlerUpdateMetacacheListing
	HandlerGetPeerBucketMetrics
	HandlerStorageInfo
	HandlerConsoleLog
	HandlerListDir
	HandlerGetLocks
	HandlerBackgroundHealStatus
	HandlerGetLastDayTierStats
	HandlerSignalService
	HandlerGetBandwidth
	HandlerWriteAll
	HandlerListBuckets
	HandlerRenameDataInline
	HandlerRenameData2
)

HandlerID is a handler identifier. It is used to determine request routing on the server. Handlers can be registered with a static subroute. Do NOT remove or change the order of existing handlers.

func (*HandlerID) DecodeMsg

func (z *HandlerID) DecodeMsg(dc *msgp.Reader) (err error)

DecodeMsg implements msgp.Decodable

func (HandlerID) EncodeMsg

func (z HandlerID) EncodeMsg(en *msgp.Writer) (err error)

EncodeMsg implements msgp.Encodable

func (HandlerID) MarshalMsg

func (z HandlerID) MarshalMsg(b []byte) (o []byte, err error)

MarshalMsg implements msgp.Marshaler

func (HandlerID) Msgsize

func (z HandlerID) Msgsize() (s int)

Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message

func (HandlerID) String

func (i HandlerID) String() string

func (*HandlerID) UnmarshalMsg

func (z *HandlerID) UnmarshalMsg(bts []byte) (o []byte, err error)

UnmarshalMsg implements msgp.Unmarshaler

type JSON

type JSON[T any] struct {
	// contains filtered or unexported fields
}

JSON is a wrapper around a T object that can be serialized. There is an internal value

func (*JSON[T]) MarshalMsg

func (j *JSON[T]) MarshalMsg(b []byte) (o []byte, err error)

MarshalMsg implements msgp.Marshaler

func (*JSON[T]) Msgsize

func (j *JSON[T]) Msgsize() int

Msgsize returns the size of an empty JSON object.

func (*JSON[T]) Recycle

func (j *JSON[T]) Recycle()

Recycle the underlying value.

func (*JSON[T]) Set

func (j *JSON[T]) Set(v *T)

Set the underlying value.

func (*JSON[T]) UnmarshalMsg

func (j *JSON[T]) UnmarshalMsg(bytes []byte) ([]byte, error)

UnmarshalMsg will JSON marshal the value and wrap as a msgp byte array. Nil values are supported.

func (*JSON[T]) Value

func (j *JSON[T]) Value() *T

Value returns the underlying value. If not set yet, a new value is created.

func (*JSON[T]) ValueOrZero

func (j *JSON[T]) ValueOrZero() T

ValueOrZero returns the underlying value. If the underlying value is nil, a zero value is returned.

type JSONPool

type JSONPool[T any] struct {
	// contains filtered or unexported fields
}

JSONPool is a pool for JSON objects that unmarshal into T.

func NewJSONPool

func NewJSONPool[T any]() *JSONPool[T]

NewJSONPool returns a new JSONPool.

func (*JSONPool[T]) NewJSON

func (p *JSONPool[T]) NewJSON() *JSON[T]

NewJSON returns a new JSONPool. No initial value is set.

func (*JSONPool[T]) NewJSONWith

func (p *JSONPool[T]) NewJSONWith(val *T) *JSON[T]

NewJSONWith returns a new JSON with the provided value.

type MSS

type MSS map[string]string

MSS is a map[string]string that can be serialized. It is not very efficient, but it is only used for easy parameter passing.

func NewMSS

func NewMSS() *MSS

NewMSS returns a new MSS.

func NewMSSWith

func NewMSSWith(m map[string]string) *MSS

NewMSSWith returns a new MSS with the given map.

func (*MSS) Get

func (m *MSS) Get(key string) string

Get returns the value for the given key.

func (*MSS) MarshalMsg

func (m *MSS) MarshalMsg(bytes []byte) (o []byte, err error)

MarshalMsg appends the bytes representation of b to the provided byte slice.

func (*MSS) Msgsize

func (m *MSS) Msgsize() int

Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message.

func (*MSS) Recycle

func (m *MSS) Recycle()

Recycle the underlying map.

func (*MSS) Set

func (m *MSS) Set(key, value string)

Set a key, value pair.

func (MSS) ToQuery

func (m MSS) ToQuery() string

ToQuery constructs a URL query string from the MSS, including "?" if there are any keys.

func (*MSS) UnmarshalMsg

func (m *MSS) UnmarshalMsg(bts []byte) (o []byte, err error)

UnmarshalMsg deserializes m from the provided byte slice and returns the remainder of bytes.

type Manager

type Manager struct {
	// ID is an instance ID, that will change whenever the server restarts.
	// This allows remotes to keep track of whether state is preserved.
	ID uuid.UUID
	// contains filtered or unexported fields
}

Manager will contain all the connections to the grid. It also handles incoming requests and routes them to the appropriate connection.

func NewManager

func NewManager(ctx context.Context, o ManagerOptions) (*Manager, error)

NewManager creates a new grid manager

func (*Manager) AddToMux

func (m *Manager) AddToMux(router *mux.Router)

AddToMux will add the grid manager to the given mux.

func (*Manager) Connection

func (m *Manager) Connection(host string) *Connection

Connection will return the connection for the specified host. If the host does not exist nil will be returned.

func (*Manager) Handler

func (m *Manager) Handler() http.HandlerFunc

Handler returns a handler that can be used to serve grid requests. This should be connected on RoutePath to the main server.

func (*Manager) HostName

func (m *Manager) HostName() string

HostName returns the name of the local host.

func (*Manager) RegisterSingleHandler

func (m *Manager) RegisterSingleHandler(id HandlerID, h SingleHandlerFn, subroute ...string) error

RegisterSingleHandler will register a stateless handler that serves []byte -> ([]byte, error) requests. subroutes are joined with "/" to a single subroute.

func (*Manager) RegisterStreamingHandler

func (m *Manager) RegisterStreamingHandler(id HandlerID, h StreamHandler) error

RegisterStreamingHandler will register a stateless handler that serves two-way streaming requests.

func (*Manager) Targets

func (m *Manager) Targets() []string

Targets returns the names of all remote targets.

type ManagerOptions

type ManagerOptions struct {
	Dialer       ContextDialer               // Outgoing dialer.
	Local        string                      // Local host name.
	Hosts        []string                    // All hosts, including local in the grid.
	AddAuth      AuthFn                      // Add authentication to the given audience.
	AuthRequest  func(r *http.Request) error // Validate incoming requests.
	TLSConfig    *tls.Config                 // TLS to apply to the connections.
	Incoming     func(n int64)               // Record incoming bytes.
	Outgoing     func(n int64)               // Record outgoing bytes.
	BlockConnect chan struct{}               // If set, incoming and outgoing connections will be blocked until closed.
	TraceTo      *pubsub.PubSub[madmin.TraceInfo, madmin.TraceType]
}

ManagerOptions are options for creating a new grid manager.

type NoPayload

type NoPayload struct{}

NoPayload is a type that can be used for handlers that do not use a payload.

func NewNoPayload

func NewNoPayload() NoPayload

NewNoPayload returns an empty NoPayload struct.

func (NoPayload) MarshalMsg

func (NoPayload) MarshalMsg(bytes []byte) ([]byte, error)

MarshalMsg satisfies the interface, but is a no-op.

func (NoPayload) Msgsize

func (p NoPayload) Msgsize() int

Msgsize returns 0.

func (NoPayload) Recycle

func (NoPayload) Recycle()

Recycle is a no-op.

func (NoPayload) UnmarshalMsg

func (NoPayload) UnmarshalMsg(bytes []byte) ([]byte, error)

UnmarshalMsg satisfies the interface, but is a no-op.

type Op

type Op uint8

Op is operation type messages.

const (
	// OpConnect is a connect request.
	OpConnect Op = iota + 1

	// OpConnectResponse is a response to a connect request.
	OpConnectResponse

	// OpPing is a ping request.
	// If a mux id is specified that mux is pinged.
	// Clients send ping requests.
	OpPing

	// OpPong is a OpPing response returned by the server.
	OpPong

	// OpConnectMux will connect a new mux with optional payload.
	OpConnectMux

	// OpMuxConnectError is an  error while connecting a mux.
	OpMuxConnectError

	// OpDisconnectClientMux instructs a client to disconnect a mux
	OpDisconnectClientMux

	// OpDisconnectServerMux instructs a server to disconnect (cancel) a server mux
	OpDisconnectServerMux

	// OpMuxClientMsg contains a message to a client Mux
	OpMuxClientMsg

	// OpMuxServerMsg contains a message to a server Mux
	OpMuxServerMsg

	// OpUnblockSrvMux contains a message that a server mux is unblocked with one.
	// Only Stateful streams has flow control.
	OpUnblockSrvMux

	// OpUnblockClMux contains a message that a client mux is unblocked with one.
	// Only Stateful streams has flow control.
	OpUnblockClMux

	// OpAckMux acknowledges a mux was created.
	OpAckMux

	// OpRequest is a single request + response.
	// MuxID is returned in response.
	OpRequest

	// OpResponse is a response to a single request.
	// FlagPayloadIsErr is used to signify that the payload is a string error converted to byte slice.
	// When a response is received, the mux is already removed from the remote.
	OpResponse

	// OpDisconnect instructs that remote wants to disconnect
	OpDisconnect

	// OpMerged is several operations merged into one.
	OpMerged
)

func (*Op) DecodeMsg

func (z *Op) DecodeMsg(dc *msgp.Reader) (err error)

DecodeMsg implements msgp.Decodable

func (Op) EncodeMsg

func (z Op) EncodeMsg(en *msgp.Writer) (err error)

EncodeMsg implements msgp.Encodable

func (Op) MarshalMsg

func (z Op) MarshalMsg(b []byte) (o []byte, err error)

MarshalMsg implements msgp.Marshaler

func (Op) Msgsize

func (z Op) Msgsize() (s int)

Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message

func (Op) String

func (i Op) String() string

func (*Op) UnmarshalMsg

func (z *Op) UnmarshalMsg(bts []byte) (o []byte, err error)

UnmarshalMsg implements msgp.Unmarshaler

type Recycler

type Recycler interface {
	Recycle()
}

Recycler will override the internal reuse in typed handlers. When this is supported, the handler will not do internal pooling of objects, call Recycle() when the object is no longer needed. The recycler should handle nil pointers.

type RemoteClient

type RemoteClient struct {
	Name string
}

RemoteClient contains information about the caller.

func GetCaller

func GetCaller(ctx context.Context) *RemoteClient

GetCaller returns caller information from contexts provided to handlers.

type RemoteErr

type RemoteErr string

RemoteErr is a remote error type. Any error seen on a remote will be returned like this.

func IsRemoteErr

func IsRemoteErr(err error) *RemoteErr

IsRemoteErr returns the value if the error is a RemoteErr.

func NewRemoteErr

func NewRemoteErr(err error) *RemoteErr

NewRemoteErr creates a new remote error. The error type is not preserved.

func NewRemoteErrString

func NewRemoteErrString(msg string) *RemoteErr

NewRemoteErrString creates a new remote error from a string.

func NewRemoteErrf

func NewRemoteErrf(format string, a ...any) *RemoteErr

NewRemoteErrf creates a new remote error from a format string.

func (RemoteErr) Error

func (r RemoteErr) Error() string

func (*RemoteErr) Is

func (r *RemoteErr) Is(other error) bool

Is returns if the string representation matches.

type Requester

type Requester interface {
	Request(ctx context.Context, h HandlerID, req []byte) ([]byte, error)
}

Requester is able to send requests to a remote.

type Response

type Response struct {
	Msg []byte
	Err error
}

Response is a response from the server.

type RoundTripper

type RoundTripper interface {
	msgp.Unmarshaler
	msgp.Marshaler
	msgp.Sizer

	comparable
}

RoundTripper provides an interface for type roundtrip serialization.

type SingleHandler

type SingleHandler[Req, Resp RoundTripper] struct {
	// contains filtered or unexported fields
}

SingleHandler is a type safe handler for single roundtrip requests.

func NewSingleHandler

func NewSingleHandler[Req, Resp RoundTripper](h HandlerID, newReq func() Req, newResp func() Resp) *SingleHandler[Req, Resp]

NewSingleHandler creates a typed handler that can provide Marshal/Unmarshal. Use Register to register a server handler. Use Call to initiate a clientside call.

func (*SingleHandler[Req, Resp]) AllowCallRequestPool

func (h *SingleHandler[Req, Resp]) AllowCallRequestPool(b bool) *SingleHandler[Req, Resp]

AllowCallRequestPool indicates it is safe to reuse the request on the client side, meaning the request is recycled/pooled when a request is sent. CAREFUL: This should only be used when there are no pointers, slices that aren't freshly constructed.

func (*SingleHandler[Req, Resp]) Call

func (h *SingleHandler[Req, Resp]) Call(ctx context.Context, c Requester, req Req) (resp Resp, err error)

Call the remote with the request and return the response. The response should be returned with PutResponse when no error. If no deadline is set, a 1-minute deadline is added.

func (*SingleHandler[Req, Resp]) IgnoreNilConn

func (h *SingleHandler[Req, Resp]) IgnoreNilConn() *SingleHandler[Req, Resp]

IgnoreNilConn will ignore nil connections when calling. This will make Call return nil instead of ErrDisconnected when the connection is nil. This may only be set ONCE before use.

func (*SingleHandler[Req, Resp]) NewRequest

func (h *SingleHandler[Req, Resp]) NewRequest() Req

NewRequest creates a new request. Handlers can use this to create a reusable request. The request may be reused, so caller should clear any fields.

func (*SingleHandler[Req, Resp]) NewResponse

func (h *SingleHandler[Req, Resp]) NewResponse() Resp

NewResponse creates a new response. Handlers can use this to create a reusable response. The response may be reused, so caller should clear any fields.

func (*SingleHandler[Req, Resp]) PutResponse

func (h *SingleHandler[Req, Resp]) PutResponse(r Resp)

PutResponse will accept a response for reuse. This can be used by a caller to recycle a response after receiving it from a Call.

func (*SingleHandler[Req, Resp]) Register

func (h *SingleHandler[Req, Resp]) Register(m *Manager, handle func(req Req) (resp Resp, err *RemoteErr), subroute ...string) error

Register a handler for a Req -> Resp roundtrip. Requests are automatically recycled.

func (*SingleHandler[Req, Resp]) WithSharedResponse

func (h *SingleHandler[Req, Resp]) WithSharedResponse() *SingleHandler[Req, Resp]

WithSharedResponse indicates it is unsafe to reuse the response when it has been returned on a handler. This will disable automatic response recycling/pooling. Typically this is used when the response sharing part of its data structure.

type SingleHandlerFn

type SingleHandlerFn func(payload []byte) ([]byte, *RemoteErr)

SingleHandlerFn is handlers for one to one requests. A non-nil error value will be returned as RemoteErr(msg) to client. No client information or cancellation (deadline) is available. Include this in payload if needed. Payload should be recycled with PutByteBuffer if not needed after the call.

type State

type State uint32

State is a connection state.

type StatelessHandler

type StatelessHandler struct {
	Handle StatelessHandlerFn
	// OutCapacity is the output capacity on the caller.
	// If <= 0 capacity will be 1.
	OutCapacity int
}

StatelessHandler is handlers for one to many requests, where responses may be dropped. Stateless requests provide no incoming stream and there is no flow control on outgoing messages.

type StatelessHandlerFn

type StatelessHandlerFn func(ctx context.Context, payload []byte, resp chan<- []byte) *RemoteErr

StatelessHandlerFn must handle incoming stateless request. A non-nil error value will be returned as RemoteErr(msg) to client.

type Stream

type Stream struct {

	// Requests sent to the server.
	// If the handler is defined with 0 incoming capacity this will be nil.
	// Channel *must* be closed to signal the end of the stream.
	// If the request context is canceled, the stream will no longer process requests.
	// Requests sent cannot be used any further by the called.
	Requests chan<- []byte
	// contains filtered or unexported fields
}

A Stream is a two-way stream. All responses *must* be read by the caller. If the call is canceled through the context, the appropriate error will be returned.

func (*Stream) Results

func (s *Stream) Results(next func(b []byte) error) (err error)

Results returns the results from the remote server one by one. If any error is returned by the callback, the stream will be canceled. If the context is canceled, the stream will be canceled.

func (*Stream) Send

func (s *Stream) Send(b []byte) error

Send a payload to the remote server.

type StreamHandler

type StreamHandler struct {
	// Handle an incoming request. Initial payload is sent.
	// Additional input packets (if any) are streamed to request.
	// Upstream will block when request channel is full.
	// Response packets can be sent at any time.
	// Any non-nil error sent as response means no more responses are sent.
	Handle StreamHandlerFn

	// Subroute for handler.
	// Subroute must be static and clients should specify a matching subroute.
	// Should not be set unless there are different handlers for the same HandlerID.
	Subroute string

	// OutCapacity is the output capacity. If <= 0 capacity will be 1.
	OutCapacity int

	// InCapacity is the output capacity.
	// If == 0 no input is expected
	InCapacity int
}

StreamHandler handles fully bidirectional streams, There is flow control in both directions.

type StreamHandlerFn

type StreamHandlerFn func(ctx context.Context, payload []byte, in <-chan []byte, out chan<- []byte) *RemoteErr

StreamHandlerFn must process a request with an optional initial payload. It must keep consuming from 'in' until it returns. 'in' and 'out' are independent. The handler should never close out. Buffers received from 'in' can be recycled with PutByteBuffer. Buffers sent on out can not be referenced once sent.

type StreamTypeHandler

type StreamTypeHandler[Payload, Req, Resp RoundTripper] struct {
	WithPayload bool

	// Override the default capacities (1)
	OutCapacity int

	// Set to 0 if no input is expected.
	// Will be 0 if newReq is nil.
	InCapacity int
	// contains filtered or unexported fields
}

StreamTypeHandler is a type safe handler for streaming requests.

func NewStream

func NewStream[Payload, Req, Resp RoundTripper](h HandlerID, newPayload func() Payload, newReq func() Req, newResp func() Resp) *StreamTypeHandler[Payload, Req, Resp]

NewStream creates a typed handler that can provide Marshal/Unmarshal. Use Register to register a server handler. Use Call to initiate a clientside call. newPayload can be nil. In that case payloads will always be nil. newReq can be nil. In that case no input stream is expected and the handler will be called with nil 'in' channel.

func (*StreamTypeHandler[Payload, Req, Resp]) Call

func (h *StreamTypeHandler[Payload, Req, Resp]) Call(ctx context.Context, c Streamer, payload Payload) (st *TypedStream[Req, Resp], err error)

Call the remove with the request and

func (*StreamTypeHandler[Payload, Req, Resp]) NewPayload

func (h *StreamTypeHandler[Payload, Req, Resp]) NewPayload() Payload

NewPayload creates a new payload.

func (*StreamTypeHandler[Payload, Req, Resp]) NewRequest

func (h *StreamTypeHandler[Payload, Req, Resp]) NewRequest() Req

NewRequest creates a new request. The struct may be reused, so caller should clear any fields.

func (*StreamTypeHandler[Payload, Req, Resp]) NewResponse

func (h *StreamTypeHandler[Payload, Req, Resp]) NewResponse() Resp

NewResponse creates a new response. Handlers can use this to create a reusable response.

func (*StreamTypeHandler[Payload, Req, Resp]) PutRequest

func (h *StreamTypeHandler[Payload, Req, Resp]) PutRequest(r Req)

PutRequest will accept a request for reuse. These should be returned by the handler.

func (*StreamTypeHandler[Payload, Req, Resp]) PutResponse

func (h *StreamTypeHandler[Payload, Req, Resp]) PutResponse(r Resp)

PutResponse will accept a response for reuse. These should be returned by the caller.

func (*StreamTypeHandler[Payload, Req, Resp]) Register

func (h *StreamTypeHandler[Payload, Req, Resp]) Register(m *Manager, handle func(ctx context.Context, p Payload, in <-chan Req, out chan<- Resp) *RemoteErr, subroute ...string) error

Register a handler for two-way streaming with payload, input stream and output stream. An optional subroute can be given. Multiple entries are joined with '/'.

func (*StreamTypeHandler[Payload, Req, Resp]) RegisterNoInput

func (h *StreamTypeHandler[Payload, Req, Resp]) RegisterNoInput(m *Manager, handle func(ctx context.Context, p Payload, out chan<- Resp) *RemoteErr, subroute ...string) error

RegisterNoInput a handler for one-way streaming with payload and output stream. An optional subroute can be given. Multiple entries are joined with '/'.

func (*StreamTypeHandler[Payload, Req, Resp]) RegisterNoPayload

func (h *StreamTypeHandler[Payload, Req, Resp]) RegisterNoPayload(m *Manager, handle func(ctx context.Context, in <-chan Req, out chan<- Resp) *RemoteErr, subroute ...string) error

RegisterNoPayload a handler for one-way streaming with payload and output stream. An optional subroute can be given. Multiple entries are joined with '/'.

func (*StreamTypeHandler[Payload, Req, Resp]) WithInCapacity

func (h *StreamTypeHandler[Payload, Req, Resp]) WithInCapacity(in int) *StreamTypeHandler[Payload, Req, Resp]

WithInCapacity adjusts the input capacity from the handler perspective. This must be done prior to registering the handler.

func (*StreamTypeHandler[Payload, Req, Resp]) WithOutCapacity

func (h *StreamTypeHandler[Payload, Req, Resp]) WithOutCapacity(out int) *StreamTypeHandler[Payload, Req, Resp]

WithOutCapacity adjusts the output capacity from the handler perspective. This must be done prior to registering the handler.

func (*StreamTypeHandler[Payload, Req, Resp]) WithSharedResponse

func (h *StreamTypeHandler[Payload, Req, Resp]) WithSharedResponse() *StreamTypeHandler[Payload, Req, Resp]

WithSharedResponse indicates it is unsafe to reuse the response. Typically this is used when the response sharing part of its data structure.

type Streamer

type Streamer interface {
	NewStream(ctx context.Context, h HandlerID, payload []byte) (st *Stream, err error)
}

Streamer creates a stream.

type Subroute

type Subroute struct {
	*Connection
	// contains filtered or unexported fields
}

Subroute is a connection subroute that can be used to route to a specific handler with the same handler ID.

func (*Subroute) NewStream

func (c *Subroute) NewStream(ctx context.Context, h HandlerID, payload []byte) (st *Stream, err error)

NewStream creates a new stream. Initial payload can be reused by the caller.

func (*Subroute) Request

func (c *Subroute) Request(ctx context.Context, h HandlerID, req []byte) ([]byte, error)

Request allows to do a single remote request. 'req' will not be used after the call and caller can reuse. If no deadline is set on ctx, a 1-minute deadline will be added.

func (*Subroute) Subroute

func (c *Subroute) Subroute(s string) *Subroute

Subroute adds a subroute to the subroute. The subroutes are combined with '/'.

type TestGrid

type TestGrid struct {
	Servers   []*httptest.Server
	Listeners []net.Listener
	Managers  []*Manager
	Mux       []*mux.Router
	Hosts     []string
	// contains filtered or unexported fields
}

TestGrid contains a grid of servers for testing purposes.

func SetupTestGrid

func SetupTestGrid(n int) (*TestGrid, error)

SetupTestGrid creates a new grid for testing purposes. Select the number of hosts to create. Call (TestGrid).Cleanup() when done.

func (*TestGrid) Cleanup

func (t *TestGrid) Cleanup()

Cleanup will clean up the test grid.

func (*TestGrid) WaitAllConnect

func (t *TestGrid) WaitAllConnect(ctx context.Context)

WaitAllConnect will wait for all connections to be established.

type TraceParamsKey

type TraceParamsKey struct{}

TraceParamsKey allows to pass trace parameters to the request via context. This is only needed when un-typed requests are used. MSS, map[string]string types are preferred, but any struct with exported fields will work.

type TypedStream

type TypedStream[Req, Resp RoundTripper] struct {

	// Requests sent to the server.
	// If the handler is defined with 0 incoming capacity this will be nil.
	// Channel *must* be closed to signal the end of the stream.
	// If the request context is canceled, the stream will no longer process requests.
	Requests chan<- Req
	// contains filtered or unexported fields
}

TypedStream is a stream with specific types.

func (*TypedStream[Req, Resp]) Results

func (s *TypedStream[Req, Resp]) Results(next func(resp Resp) error) (err error)

Results returns the results from the remote server one by one. If any error is returned by the callback, the stream will be canceled. If the context is canceled, the stream will be canceled.

type URLValues

type URLValues map[string][]string

URLValues can be used for url.Values.

func NewURLValues

func NewURLValues() *URLValues

NewURLValues returns a new URLValues.

func NewURLValuesWith

func NewURLValuesWith(values map[string][]string) *URLValues

NewURLValuesWith returns a new URLValues with the provided content.

func (URLValues) MarshalMsg

func (u URLValues) MarshalMsg(b []byte) (o []byte, err error)

MarshalMsg implements msgp.Marshaler

func (URLValues) Msgsize

func (u URLValues) Msgsize() (s int)

Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message

func (*URLValues) Recycle

func (u *URLValues) Recycle()

Recycle the underlying map.

func (*URLValues) UnmarshalMsg

func (u *URLValues) UnmarshalMsg(bts []byte) (o []byte, err error)

UnmarshalMsg implements msgp.Unmarshaler

func (*URLValues) Values

func (u *URLValues) Values() url.Values

Values returns the url.Values. If u is nil, an empty url.Values is returned. The values are a shallow copy of the underlying map.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL