websockets

package module
v1.2.84 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 17, 2024 License: Apache-2.0 Imports: 25 Imported by: 2

README

Websocket for FNS

Install

go get github.com/aacfactory/fns-contrib/transports/handlers/websockets

Note

Only support json bytes.

Usage

Make sure tls is used.

app := fns.New(
    fns.Handler(websockets.New())
)

Setup config

transport:
  handlers:
    websockets:
      maxConnections: 1024
      handshakeTimeout: "1s"
      readTimeout: "10s"
      readBufferSize: "4MB"
      writeTimeout: "60s"
      writeBufferSize: "4MB"
      enableCompression: false
      maxRequestMessageSize: "4KB"
      connectionTTL: "10m0s"
      enableEcho: false
      originCheckPolicy:
        mode: "pass"

Get connection id in function

connId := websockets.ConnectionId(ctx)

Send message to client

err := websockets.Send(ctx, connId, payload)

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrRequestMessageIsTooLarge = fmt.Errorf("message is too large")
)
View Source
var (
	ErrTooMayConnections = errors.New(http.StatusTooManyRequests, "***TOO MANY CONNECTIONS***", "fns: too may connections, try again later.")
)

Functions

func ConnectionId

func ConnectionId(ctx context.Context) (id []byte)

func LoadConnection added in v1.2.1

func LoadConnection(ctx context.Context) (conn *websocket.Conn, has bool)

func New added in v1.2.1

func New(options ...Option) (handler transports.MuxHandler)

func ReleaseRequest added in v1.2.1

func ReleaseRequest(r *Request)

func Send

func Send(ctx context.Context, id []byte, data any) (err error)

func WithConnection added in v1.2.1

func WithConnection(ctx context.Context, conn *websocket.Conn)

Types

type Config

type Config struct {
	MaxConnections        int                     `json:"maxConnections"`
	HandshakeTimeout      string                  `json:"handshakeTimeout"`
	ReadTimeout           string                  `json:"readTimeout"`
	ReadBufferSize        string                  `json:"readBufferSize"`
	WriteTimeout          string                  `json:"writeTimeout"`
	WriteBufferSize       string                  `json:"writeBufferSize"`
	EnableCompression     bool                    `json:"enableCompression"`
	MaxRequestMessageSize string                  `json:"maxRequestMessageSize"`
	OriginCheckPolicy     OriginCheckPolicyConfig `json:"originCheckPolicy"`
	ConnectionTTL         string                  `json:"connectionTTL"`
	EnableEcho            bool                    `json:"enableEcho"`
}

type Connection

type Connection interface {
	ReadMessage() (messageType MessageType, p []byte, err error)
	WriteMessage(messageType MessageType, data []byte) (err error)
	WriteControl(messageType MessageType, data []byte, deadline time.Time) error
	NextWriter(messageType MessageType) (io.WriteCloser, error)
	SetWriteDeadline(t time.Time) error
	NextReader() (messageType MessageType, r io.Reader, err error)
	SetReadDeadline(t time.Time) error
	SetReadLimit(limit int64)
	CloseHandler() func(code int, text string) error
	SetCloseHandler(h func(code int, text string) error)
	PingHandler() func(appData []byte) error
	SetPingHandler(h func(appData []byte) error)
	PongHandler() func(appData []byte) error
	SetPongHandler(h func(appData []byte) error)
	EnableWriteCompression(enable bool)
	SetCompressionLevel(level int) error
	Subprotocol() (protocol string)
	Close() (err error)
	LocalAddr() net.Addr
	RemoteAddr() net.Addr
}

type Connections added in v1.2.1

type Connections struct {
	// contains filtered or unexported fields
}

func LoadConnections added in v1.2.1

func LoadConnections(ctx context.Context) (conns *Connections, has bool)

func (*Connections) Construct added in v1.2.1

func (conns *Connections) Construct(_ services.Options) (err error)

func (*Connections) Get added in v1.2.1

func (conns *Connections) Get(id []byte) (conn *websocket.Conn, has bool)

func (*Connections) Name added in v1.2.1

func (conns *Connections) Name() (name string)

func (*Connections) Remove added in v1.2.1

func (conns *Connections) Remove(id []byte)

func (*Connections) Set added in v1.2.1

func (conns *Connections) Set(conn *websocket.Conn)

func (*Connections) Shutdown added in v1.2.1

func (conns *Connections) Shutdown(_ context.Context)

type MessageType

type MessageType int

type Option added in v1.2.1

type Option func(options *Options)

func WithRegistration added in v1.2.1

func WithRegistration(registration Registration) Option

func WithSubProtocolHandler added in v1.2.1

func WithSubProtocolHandler(handler ...SubProtocolHandler) Option

type Options added in v1.2.1

type Options struct {
	// contains filtered or unexported fields
}

type OriginCheckPolicyConfig added in v1.2.1

type OriginCheckPolicyConfig struct {
	Mode    string          `json:"mode"`
	Options json.RawMessage `json:"options"`
}

func (*OriginCheckPolicyConfig) Build added in v1.2.1

func (config *OriginCheckPolicyConfig) Build() (fn func(r transports.Request) bool, err error)

type Registration added in v1.2.1

type Registration interface {
	services.Component
	Get(ctx context.Context, id []byte) (endpointId []byte, has bool, err error)
	Set(ctx context.Context, id []byte, endpointId []byte, ttl time.Duration) (err error)
	Remove(ctx context.Context, id []byte) (err error)
}

type Request

type Request struct {
	Endpoint string            `json:"endpoint"`
	Fn       string            `json:"fn"`
	Header   transports.Header `json:"header"`
	Payload  json.RawMessage   `json:"payload"`
}

func AcquireRequest added in v1.2.1

func AcquireRequest(endpoint []byte, fn []byte, payload any) (r *Request, err error)

func (*Request) Authorization added in v1.2.1

func (r *Request) Authorization() (v []byte)

func (*Request) Validate

func (r *Request) Validate() (err error)

func (*Request) Versions added in v1.2.1

func (r *Request) Versions() (v versions.Intervals, has bool, err error)

type Response

type Response struct {
	Succeed bool `json:"succeed"`
	Payload any  `json:"result"`
}

func Failed

func Failed(err error) (resp *Response)

func Succeed

func Succeed(payload any) (resp *Response)

func (*Response) Encode

func (resp *Response) Encode() (p []byte)

type SendOption added in v1.2.1

type SendOption func(options *SendOptions)

func WithEndpointId added in v1.2.1

func WithEndpointId(endpointId []byte) SendOption

type SendOptions added in v1.2.1

type SendOptions struct {
	EndpointId []byte
}

type SendParam added in v1.2.1

type SendParam struct {
	ConnectionId string `json:"connectionId" avro:"connectionId"`
	Message      []byte `json:"message" avro:"message"`
}

type SubProtocolHandler

type SubProtocolHandler interface {
	Name() (name string)
	Construct(options SubProtocolHandlerOptions) (err error)
	Handle(ctx context.Context, conn Connection)
	Close() (err error)
}

type SubProtocolHandlerOptions

type SubProtocolHandlerOptions struct {
	Log                   logs.Logger
	Config                configures.Config
	ReadTimeout           time.Duration
	WriteTimeout          time.Duration
	MaxRequestMessageSize int64
}

type SubProtocolHandlerTask

type SubProtocolHandlerTask struct {
	// contains filtered or unexported fields
}

func (*SubProtocolHandlerTask) Execute

func (t *SubProtocolHandlerTask) Execute(ctx context.Context)

type WebsocketConnection

type WebsocketConnection struct {
	*websocket.Conn
	// contains filtered or unexported fields
}

func (*WebsocketConnection) DeviceId added in v1.2.1

func (conn *WebsocketConnection) DeviceId() string

func (*WebsocketConnection) DeviceIp added in v1.2.1

func (conn *WebsocketConnection) DeviceIp() string

func (*WebsocketConnection) Header added in v1.2.1

func (conn *WebsocketConnection) Header() transports.Header

func (*WebsocketConnection) NextReader

func (conn *WebsocketConnection) NextReader() (messageType MessageType, r io.Reader, err error)

func (*WebsocketConnection) NextWriter

func (conn *WebsocketConnection) NextWriter(messageType MessageType) (w io.WriteCloser, err error)

func (*WebsocketConnection) ReadMessage

func (conn *WebsocketConnection) ReadMessage() (messageType MessageType, p []byte, err error)

func (*WebsocketConnection) WriteControl

func (conn *WebsocketConnection) WriteControl(messageType MessageType, data []byte, deadline time.Time) error

func (*WebsocketConnection) WriteMessage

func (conn *WebsocketConnection) WriteMessage(messageType MessageType, data []byte) (err error)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL