websocket

package
v0.0.0-...-b4bb62b Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 8, 2024 License: Apache-2.0 Imports: 27 Imported by: 5

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrInvalidLengthStructmessage        = fmt.Errorf("proto: negative length found during unmarshaling")
	ErrIntOverflowStructmessage          = fmt.Errorf("proto: integer overflow")
	ErrUnexpectedEndOfGroupStructmessage = fmt.Errorf("proto: unexpected end of group")
)
View Source
var DefaultRequestTimeout = 10 * time.Second

DefaultRequestTimeout default timeout used for Request/Reply JSON message.

View Source
var Format_name = map[int32]string{
	0: "Json",
	1: "Protobuf",
}
View Source
var Format_value = map[string]int32{
	"Json":     0,
	"Protobuf": 1,
}

Functions

This section is empty.

Types

type Client

type Client struct {
	*Conn
	Path      string
	AuthOpts  *shttp.AuthenticationOpts
	TLSConfig *tls.Config
	Opts      ClientOpts
}

Client is a outgoint client meaning a client connected to a remote websocket server. It embeds a Conn.

func NewClient

func NewClient(host string, clientType service.Type, url *url.URL, opts ClientOpts) *Client

NewClient returns a Client with a new connection.

func (*Client) Connect

func (c *Client) Connect(ctx context.Context) error

Connect to the server

func (*Client) Start

func (c *Client) Start()

Start connects to the server - and reconnect if necessary

func (*Client) UpgradeToStructSpeaker

func (c *Client) UpgradeToStructSpeaker() *StructSpeaker

UpgradeToStructSpeaker a WebSocket client to a StructSpeaker

type ClientOpts

type ClientOpts struct {
	Protocol         Protocol
	AuthOpts         *shttp.AuthenticationOpts
	Headers          http.Header
	QueueSize        int
	WriteCompression bool
	TLSConfig        *tls.Config
	Logger           logging.Logger
}

ClientOpts defines some options that can be set when creating a new client

func NewClientOpts

func NewClientOpts() ClientOpts

NewClientOpts returns a new client option set

type ClientPool

type ClientPool struct {
	*Pool
}

ClientPool is a pool of out going Speaker meaning connection to a remote Server.

func NewClientPool

func NewClientPool(name string, opts PoolOpts) *ClientPool

NewClientPool returns a new ClientPool meaning a pool of outgoing Client.

func (*ClientPool) ConnectAll

func (s *ClientPool) ConnectAll()

ConnectAll calls connect to all the wSSpeakers of the pool.

type Conn

type Conn struct {
	insanelock.RWMutex
	ConnStatus
	// contains filtered or unexported fields
}

Conn is the connection object of a Speaker

func (*Conn) AddEventHandler

func (c *Conn) AddEventHandler(h SpeakerEventHandler)

AddEventHandler registers a new event handler

func (*Conn) Connect

func (c *Conn) Connect(context.Context) error

Connect default implementation doing nothing as for incoming connection it is not used.

func (*Conn) Flush

func (c *Conn) Flush()

Flush all the pending sent messages

func (*Conn) GetAddrPort

func (c *Conn) GetAddrPort() (string, int)

GetAddrPort returns the address and the port of the remote end.

func (*Conn) GetClientProtocol

func (c *Conn) GetClientProtocol() Protocol

GetClientProtocol returns the websocket protocol.

func (*Conn) GetHeaders

func (c *Conn) GetHeaders() http.Header

GetHeaders returns the client HTTP headers.

func (*Conn) GetHost

func (c *Conn) GetHost() string

GetHost returns the hostname/host-id of the connection.

func (*Conn) GetRemoteHost

func (c *Conn) GetRemoteHost() string

GetRemoteHost returns the hostname/host-id of the remote side of the connection.

func (*Conn) GetRemoteServiceType

func (c *Conn) GetRemoteServiceType() service.Type

GetRemoteServiceType returns the remote service type.

func (*Conn) GetServiceType

func (c *Conn) GetServiceType() service.Type

GetServiceType returns the client type.

func (*Conn) GetStatus

func (c *Conn) GetStatus() ConnStatus

GetStatus returns the status of a WebSocket connection

func (*Conn) GetURL

func (c *Conn) GetURL() *url.URL

GetURL returns the URL of the connection

func (*Conn) IsConnected

func (c *Conn) IsConnected() bool

IsConnected returns the connection status.

func (*Conn) Run

func (c *Conn) Run()

Run the main loop

func (*Conn) SendMessage

func (c *Conn) SendMessage(m Message) error

SendMessage adds a message to sending queue.

func (*Conn) SendRaw

func (c *Conn) SendRaw(b []byte) error

SendRaw adds raw bytes to sending queue.

func (*Conn) Start

func (c *Conn) Start()

Start main loop in a goroutine

func (*Conn) Stop

func (c *Conn) Stop()

Stop disconnect the speaker

func (*Conn) StopAndWait

func (c *Conn) StopAndWait()

StopAndWait disconnect the speaker and wait for the goroutine to end

type ConnState

type ConnState service.State

ConnState describes the connection state

func (*ConnState) CompareAndSwap

func (s *ConnState) CompareAndSwap(old, new service.State) bool

CompareAndSwap executes the compare-and-swap operation for a state

func (*ConnState) Load

func (s *ConnState) Load() service.State

Load atomatically loads and returns the state

func (*ConnState) MarshalJSON

func (s *ConnState) MarshalJSON() ([]byte, error)

MarshalJSON marshal the connection state to JSON

func (*ConnState) Store

func (s *ConnState) Store(state service.State)

Store atomatically stores the state

func (*ConnState) UnmarshalJSON

func (s *ConnState) UnmarshalJSON(b []byte) error

UnmarshalJSON de-serialize a connection state

type ConnStatus

type ConnStatus struct {
	ServiceType       service.Type
	ClientProtocol    Protocol
	Addr              string
	Port              int
	Host              string      `json:"-"`
	State             *ConnState  `json:"IsConnected"`
	URL               *url.URL    `json:"-"`
	Headers           http.Header `json:"-"`
	ConnectTime       time.Time
	RemoteHost        string       `json:",omitempty"`
	RemoteServiceType service.Type `json:",omitempty"`
}

ConnStatus describes the status of a WebSocket connection

type DefaultSpeakerEventHandler

type DefaultSpeakerEventHandler struct {
}

DefaultSpeakerEventHandler implements stubs for the wsIncomingClientEventHandler interface

func (*DefaultSpeakerEventHandler) OnConnected

func (d *DefaultSpeakerEventHandler) OnConnected(c Speaker) error

OnConnected is called when the connection is established.

func (*DefaultSpeakerEventHandler) OnDisconnected

func (d *DefaultSpeakerEventHandler) OnDisconnected(c Speaker)

OnDisconnected is called when the connection is closed or lost.

func (*DefaultSpeakerEventHandler) OnMessage

func (d *DefaultSpeakerEventHandler) OnMessage(c Speaker, m Message)

OnMessage is called when a message is received.

type Format

type Format int32
const (
	Format_Json     Format = 0
	Format_Protobuf Format = 1
)

func (Format) EnumDescriptor

func (Format) EnumDescriptor() ([]byte, []int)

func (Format) String

func (x Format) String() string

type IncomerHandler

type IncomerHandler func(*websocket.Conn, *auth.AuthenticatedRequest, clientPromoter) (Speaker, error)

IncomerHandler incoming client handler interface.

type MasterElection

type MasterElection struct {
	insanelock.RWMutex
	DefaultSpeakerEventHandler
	// contains filtered or unexported fields
}

MasterElection provides a mechanism based on etcd to elect a master from a SpeakerPool.

func NewMasterElection

func NewMasterElection(pool SpeakerPool) *MasterElection

NewMasterElection returns a new MasterElection.

func (*MasterElection) AddEventHandler

func (a *MasterElection) AddEventHandler(eventHandler MasterEventHandler)

AddEventHandler a new MasterEventHandler event handler.

func (*MasterElection) GetMaster

func (a *MasterElection) GetMaster() Speaker

GetMaster returns the current master.

func (*MasterElection) OnConnected

func (a *MasterElection) OnConnected(c Speaker) error

OnConnected is triggered when a new Speaker get connected. If no master was elected this Speaker will be chosen as master.

func (*MasterElection) OnDisconnected

func (a *MasterElection) OnDisconnected(c Speaker)

OnDisconnected is triggered when a new Speaker get disconnected. If it was the master a new election is triggered.

func (*MasterElection) SendMessageToMaster

func (a *MasterElection) SendMessageToMaster(m Message)

SendMessageToMaster sends a message to the master.

type MasterEventHandler

type MasterEventHandler interface {
	OnNewMaster(c Speaker)
}

MasterEventHandler is the interface to be implemented by master election listeners.

type Message

type Message interface {
	Bytes(protocol Protocol) ([]byte, error)
}

Message is the interface of a message to send over the wire

type PongListener

type PongListener interface {
	OnPong(speaker Speaker)
}

PongListener listens pong event

type Pool

type Pool struct {
	insanelock.RWMutex
	// contains filtered or unexported fields
}

Pool is a connection container. It embed a list of Speaker.

func (*Pool) AddClient

func (s *Pool) AddClient(c Speaker) error

AddClient adds the given Speaker to the pool.

func (*Pool) AddEventHandler

func (s *Pool) AddEventHandler(h SpeakerEventHandler)

AddEventHandler registers a new event handler.

func (*Pool) BroadcastMessage

func (s *Pool) BroadcastMessage(m Message)

BroadcastMessage broadcasts the given message.

func (*Pool) DisconnectAll

func (s *Pool) DisconnectAll()

DisconnectAll disconnects all the Speaker

func (*Pool) GetName

func (s *Pool) GetName() string

GetName returns the name of the pool

func (*Pool) GetSpeakerByRemoteHost

func (s *Pool) GetSpeakerByRemoteHost(host string) (Speaker, error)

GetSpeakerByRemoteHost returns the Speaker for the given remote host.

func (*Pool) GetSpeakers

func (s *Pool) GetSpeakers() (speakers []Speaker)

GetSpeakers returns the Speakers of the pool.

func (*Pool) GetSpeakersByType

func (s *Pool) GetSpeakersByType(serviceType service.Type) (speakers []Speaker)

GetSpeakersByType returns Speakers matching the given type.

func (*Pool) GetStatus

func (s *Pool) GetStatus() map[string]ConnStatus

GetStatus returns the states of the WebSocket clients

func (*Pool) OnConnected

func (s *Pool) OnConnected(c Speaker) error

OnConnected forwards the OnConnected event to event listeners of the pool.

func (*Pool) OnDisconnected

func (s *Pool) OnDisconnected(c Speaker)

OnDisconnected forwards the OnDisconnected event to event listeners of the pool.

func (*Pool) OnMessage

func (s *Pool) OnMessage(c Speaker, m Message)

OnMessage forwards the OnMessage event to event listeners of the pool.

func (*Pool) PickConnectedSpeaker

func (s *Pool) PickConnectedSpeaker() Speaker

PickConnectedSpeaker returns randomly a connected Speaker

func (*Pool) RemoveClient

func (s *Pool) RemoveClient(c Speaker) bool

RemoveClient removes client from the pool

func (*Pool) SendMessageTo

func (s *Pool) SendMessageTo(m Message, host string) error

SendMessageTo sends message to Speaker for the given remote host.

func (*Pool) Start

func (s *Pool) Start()

Start starts the pool in a goroutine.

func (*Pool) Stop

func (s *Pool) Stop()

Stop stops the pool and wait until stopped.

type PoolOpts

type PoolOpts struct {
	Logger logging.Logger
}

PoolOpts defines pool options

type ProtobufObject

type ProtobufObject interface {
	proto.Marshaler
	proto.Message
}

ProtobufObject defines an object that can be serialized in protobuf

type Protocol

type Protocol string

Protocol used to transport messages

const (
	// WildcardNamespace is the namespace used as wildcard. It is used by listeners to filter callbacks.
	WildcardNamespace = "*"

	// RawProtocol is used for raw messages
	RawProtocol Protocol = "raw"
	// ProtobufProtocol is used for protobuf encoded messages
	ProtobufProtocol Protocol = "protobuf"
	// JSONProtocol is used for JSON encoded messages
	JSONProtocol Protocol = "json"
)

func (*Protocol) String

func (p *Protocol) String() string

type RawMessage

type RawMessage []byte

RawMessage represents a raw message (array of bytes)

func (RawMessage) Bytes

func (m RawMessage) Bytes(protocol Protocol) ([]byte, error)

Bytes returns the string representation of the raw message

type Server

type Server struct {
	// contains filtered or unexported fields
}

Server implements a websocket server. It owns a Pool of incoming Speakers.

func NewServer

func NewServer(server *shttp.Server, endpoint string, opts ServerOpts) *Server

NewServer returns a new Server. The given auth backend will validate the credentials

func (Server) AddClient

func (s Server) AddClient(c Speaker) error

AddClient adds the given Speaker to the incomerPool.

func (Server) OnDisconnected

func (s Server) OnDisconnected(c Speaker)

OnDisconnected forwards the OnDisconnected event to event listeners of the pool.

type ServerOpts

type ServerOpts struct {
	WriteCompression bool
	QueueSize        int
	PingDelay        time.Duration
	PongTimeout      time.Duration
	Logger           logging.Logger
	AuthBackend      shttp.AuthenticationBackend
	PongListeners    []PongListener
}

ServerOpts defines server options

type Speaker

type Speaker interface {
	GetStatus() ConnStatus
	GetHost() string
	GetAddrPort() (string, int)
	GetServiceType() service.Type
	GetClientProtocol() Protocol
	GetHeaders() http.Header
	GetURL() *url.URL
	IsConnected() bool
	SendMessage(Message) error
	SendRaw(r []byte) error
	Connect(context.Context) error
	Start()
	Stop()
	StopAndWait()
	Run()
	AddEventHandler(SpeakerEventHandler)
	GetRemoteHost() string
	GetRemoteServiceType() service.Type
}

Speaker is the interface for a websocket speaking client. It is used for outgoing or incoming connections.

type SpeakerEventHandler

type SpeakerEventHandler interface {
	OnMessage(c Speaker, m Message)
	OnConnected(c Speaker) error
	OnDisconnected(c Speaker)
}

SpeakerEventHandler is the interface to be implement by the client events listeners.

type SpeakerPool

type SpeakerPool interface {
	AddClient(c Speaker) error
	RemoveClient(c Speaker) bool
	AddEventHandler(h SpeakerEventHandler)
	GetSpeakers() []Speaker
	GetSpeakerByRemoteHost(host string) (Speaker, error)
	PickConnectedSpeaker() Speaker
	BroadcastMessage(m Message)
	SendMessageTo(m Message, host string) error
}

SpeakerPool is the interface that Speaker pools have to implement.

type SpeakerStructMessageDispatcher

type SpeakerStructMessageDispatcher interface {
	AddStructMessageHandler(h SpeakerStructMessageHandler, namespaces []string)
}

SpeakerStructMessageDispatcher interface is used to dispatch OnStructMessage events.

type SpeakerStructMessageHandler

type SpeakerStructMessageHandler interface {
	OnStructMessage(c Speaker, m *StructMessage)
}

SpeakerStructMessageHandler interface used to receive Struct messages.

type StructClientPool

type StructClientPool struct {
	*ClientPool
	// contains filtered or unexported fields
}

StructClientPool is a ClientPool able to send StructMessage.

func NewStructClientPool

func NewStructClientPool(name string, opts PoolOpts) *StructClientPool

NewStructClientPool returns a new StructClientPool.

func (*StructClientPool) AddClient

func (s *StructClientPool) AddClient(c Speaker) error

AddClient adds a Client to the pool.

func (StructClientPool) AddStructMessageHandler

func (d StructClientPool) AddStructMessageHandler(h SpeakerStructMessageHandler, namespaces []string)

AddStructMessageHandler adds a new listener for Struct messages.

func (StructClientPool) AddStructSpeaker

func (d StructClientPool) AddStructSpeaker(c *StructSpeaker)

func (*StructClientPool) Request

func (s *StructClientPool) Request(host string, request *StructMessage, timeout time.Duration) (*StructMessage, error)

Request sends a Request Struct message to the Speaker of the given remote host.

type StructMessage

type StructMessage struct {
	XXX_state structMessageState `json:"-"`
	Namespace string             `protobuf:"bytes,1,opt,name=Namespace,proto3" json:"Namespace,omitempty"`
	Type      string             `protobuf:"bytes,2,opt,name=Type,proto3" json:"Type,omitempty"`
	UUID      string             `protobuf:"bytes,3,opt,name=UUID,proto3" json:"UUID,omitempty"`
	Status    int64              `protobuf:"varint,4,opt,name=Status,proto3" json:"Status,omitempty"`
	Format    Format             `protobuf:"varint,5,opt,name=Format,proto3,enum=websocket.Format" json:"Format,omitempty"`
	Obj       []byte             `protobuf:"bytes,6,opt,name=Obj,proto3" json:"Obj,omitempty"`
}

StructMessage is a Protobuf based message on top of Message. It implements Message interface and can be sent with via a Speaker.

func NewStructMessage

func NewStructMessage(ns string, tp string, v interface{}, uuids ...string) *StructMessage

NewStructMessage creates a new StructMessage with the given namespace, type, value and optionally the UUID.

func (*StructMessage) Bytes

func (g *StructMessage) Bytes(protocol Protocol) ([]byte, error)

Bytes implements the message interface

func (*StructMessage) Debug

func (g *StructMessage) Debug() string

Debug representation of the struct StructMessage

func (*StructMessage) Descriptor

func (*StructMessage) Descriptor() ([]byte, []int)

func (*StructMessage) GetFormat

func (m *StructMessage) GetFormat() Format

func (*StructMessage) GetNamespace

func (m *StructMessage) GetNamespace() string

func (*StructMessage) GetObj

func (m *StructMessage) GetObj() []byte

func (*StructMessage) GetStatus

func (m *StructMessage) GetStatus() int64

func (*StructMessage) GetType

func (m *StructMessage) GetType() string

func (*StructMessage) GetUUID

func (m *StructMessage) GetUUID() string

func (*StructMessage) Marshal

func (m *StructMessage) Marshal() (dAtA []byte, err error)

func (*StructMessage) MarshalTo

func (m *StructMessage) MarshalTo(dAtA []byte) (int, error)

func (*StructMessage) MarshalToSizedBuffer

func (m *StructMessage) MarshalToSizedBuffer(dAtA []byte) (int, error)

func (*StructMessage) ProtoMessage

func (*StructMessage) ProtoMessage()

func (*StructMessage) ProtoSize

func (m *StructMessage) ProtoSize() (n int)

func (*StructMessage) Reply

func (g *StructMessage) Reply(v interface{}, kind string, status int) *StructMessage

Reply returns a reply message with the given value, type and status. Basically it return a new StructMessage with the correct Namespace and UUID.

func (*StructMessage) Reset

func (m *StructMessage) Reset()

func (*StructMessage) String

func (m *StructMessage) String() string

func (*StructMessage) Unmarshal

func (m *StructMessage) Unmarshal(dAtA []byte) error

func (*StructMessage) UnmarshalJSON

func (g *StructMessage) UnmarshalJSON(b []byte) error

UnmarshalJSON custom unmarshal

func (*StructMessage) XXX_DiscardUnknown

func (m *StructMessage) XXX_DiscardUnknown()

func (*StructMessage) XXX_Marshal

func (m *StructMessage) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*StructMessage) XXX_Merge

func (m *StructMessage) XXX_Merge(src proto.Message)

func (*StructMessage) XXX_Size

func (m *StructMessage) XXX_Size() int

func (*StructMessage) XXX_Unmarshal

func (m *StructMessage) XXX_Unmarshal(b []byte) error

type StructServer

type StructServer struct {
	*Server
	// contains filtered or unexported fields
}

StructServer is a Server able to handle StructSpeaker.

func NewStructServer

func NewStructServer(server *Server) *StructServer

NewStructServer returns a new StructServer

func (StructServer) AddClient

func (s StructServer) AddClient(c Speaker) error

AddClient adds the given Speaker to the incomerPool.

func (StructServer) AddStructMessageHandler

func (d StructServer) AddStructMessageHandler(h SpeakerStructMessageHandler, namespaces []string)

AddStructMessageHandler adds a new listener for Struct messages.

func (StructServer) AddStructSpeaker

func (d StructServer) AddStructSpeaker(c *StructSpeaker)

func (*StructServer) OnConnected

func (s *StructServer) OnConnected(c Speaker) error

OnConnected websocket event.

func (*StructServer) OnDisconnected

func (s *StructServer) OnDisconnected(c Speaker)

OnDisconnected removes the Speaker from the incomer pool.

func (*StructServer) OnMessage

func (s *StructServer) OnMessage(c Speaker, m Message)

OnMessage websocket event.

func (*StructServer) Request

func (s *StructServer) Request(host string, request *StructMessage, timeout time.Duration) (*StructMessage, error)

Request sends a Request Struct message to the Speaker of the given remote host.

type StructSpeaker

type StructSpeaker struct {
	Speaker
	// contains filtered or unexported fields
}

StructSpeaker is a Speaker able to handle Struct Message and Request/Reply calls.

func (StructSpeaker) AddStructMessageHandler

func (a StructSpeaker) AddStructMessageHandler(h SpeakerStructMessageHandler, namespaces []string)

AddStructMessageHandler adds a new listener for Struct messages.

func (StructSpeaker) OnConnected

func (a StructSpeaker) OnConnected(c Speaker) error

OnConnected is implemented here to avoid infinite loop since the default implementation is triggering OnDisconnected too.

func (StructSpeaker) OnDisconnected

func (a StructSpeaker) OnDisconnected(c Speaker)

OnDisconnected is implemented here to avoid infinite loop since the default implementation is triggering OnDisconnected too.

func (*StructSpeaker) OnMessage

func (s *StructSpeaker) OnMessage(c Speaker, m Message)

OnMessage checks that the Message comes from a StructSpeaker. It parses the Struct message and then dispatch the message to the proper listeners according to the namespace.

func (*StructSpeaker) Request

func (s *StructSpeaker) Request(m *StructMessage, timeout time.Duration) (*StructMessage, error)

Request sends a Struct message request waiting for a reply using the given timeout.

func (*StructSpeaker) SendMessage

func (s *StructSpeaker) SendMessage(m Message) error

SendMessage sends a message according to the namespace.

type StructSpeakerPool

type StructSpeakerPool interface {
	SpeakerPool
	SpeakerStructMessageDispatcher
	Request(host string, request *StructMessage, timeout time.Duration) (*StructMessage, error)
}

StructSpeakerPool is the interface of a pool of StructSpeakers.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL