conductor

package module
v0.0.0-...-89f99df Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 15, 2018 License: Apache-2.0 Imports: 12 Imported by: 2

README

conductor

realtime scalable backend with websockets

Documentation

Index

Constants

View Source
const (
	BindOpcode              = iota // BindOpcode bind to a channel. This will create the channel if it does not exist.
	UnbindOpcode                   // UnbindOpcode unbind from a channel.
	WriteOpcode                    // WriteOpcode broadcasts on provided channel.
	ServerOpcode                   // ServerOpcode intend to be between a single client and the server (not broadcasted).
	CleanUpOpcode                  // a message to cleanup a disconnected client/connection.
	StreamStartOpcode              // StreamStartOpcode signifies the start of a stream of a file
	StreamEndOpcode                // StreamEndOpcode signifies the end of a stream of a file
	StreamWriteOpcode              // StreamWriteOpcode signifies the write (a chunk) of a file
	MetaQueryOpcode                // MetaQueryOpcode is for sister servers to query meta data from each other
	MetaQueryResponseOpcode        // MetaQueryResponseOpcode is to respond to a meta query
)

Variables

This section is empty.

Functions

This section is empty.

Types

type Client

type Client struct {
	Read <-chan *Message
	// contains filtered or unexported fields
}

Client is basic websocket connection.

func NewClient

func NewClient(serverURL string) (*Client, error)

NewClient allocates and returns a new channel ServerUrl is the server url to connect to.

func (*Client) Bind

func (c *Client) Bind(channelName string)

Bind is used to send a bind request to a channel

func (*Client) ServerMessage

func (c *Client) ServerMessage(messageBody []byte)

ServerMessage sends a message to the server for server operations (like getting message history or something)

func (*Client) Unbind

func (c *Client) Unbind(channelName string)

Unbind is used to send an unbind request to a channel

func (*Client) Write

func (c *Client) Write(channelName string, messageBody []byte)

Write to send a message to a channel

func (*Client) WriteStream

func (c *Client) WriteStream(channelName string, reader io.Reader) error

WriteStream is to write an whole file to the stream. It chucks the data using the special stream op codes.

type Connection

type Connection interface {
	Write(message *Message) error  // Write is to send a message to the client this connection represents.
	ReadLoop(hub HubConnection)    // ReadLoop is the loop that keeps this connection alive. Don't call this.
	Disconnect()                   // Disconnect is use to disconnect the connection.
	Channels() []string            // Channels is to hold the channels this connection is bound to. Very useful for auth and cleanup.
	SetChannels(channels []string) // Update the channel list of this connection.
	Store(key, value string)       // Store is a map of local storage for the connection. This way you can identify the connection in other interfaces.
	Get(key string) string         // Get is a map of local storage for the connection.
}

Connection is the based interface for mocking a connection.

type ConnectionAuth

type ConnectionAuth interface {
	IsValid(r *http.Request) bool                    // IsValid is called on every HTTP upgrade request and should be used to validate auth tokens.
	ConnToRequest(r *http.Request, conn Connection)  // ConnToRequest is called so you can map your HTTP request (like the auth token) to a current connection.
	CanBind(conn Connection, message *Message) bool  // CanBind is called on every bind request, so optimizing it is highly recommended.
	CanWrite(conn Connection, message *Message) bool // CanWrite is called on every write request, so optimizing it is highly recommended.
	IsSister(r *http.Request) bool                   // IsSister is called on every HTTP upgrade request and should be used to check if an incoming connection is from a sister node or not.
}

ConnectionAuth is the based interface for handling authentication and authorization. This is used for new HTTP requests that upgrade a websocket and the permissions to channels. Use this for checking for auth tokens and such to ensure only real clients can connect. and that connections have the right permissions to write or bind to a channel.

type DeDuplication

type DeDuplication interface {
	Start()
	Add(message *Message)
	Remove(message *Message)
	IsDuplicate(message *Message) bool
}

DeDuplication is the based interface for handling deduplication of messages. Use this for ensuring messages aren't processed twice in the hub.

type Hub

type Hub interface {
	RunLoop()                                                // This is the master run loop that processes all the messages that come into the channel.
	Write(conn Connection, message *Message)                 // Not sure if I like the duplicate method trick yet...
	Auth() ConnectionAuth                                    // This returns the current auther (if one is used)
	SisterManager() SisterManager                            // This returns the current sister manager (if one is used)
	ReceivedSisterMessage(conn Connection, message *Message) // Handle a sister message into this hub
}

Hub is the based interface for what methods aHub should provide.

type HubConnection

type HubConnection interface {
	Write(conn Connection, message *Message)
	ReceivedSisterMessage(conn Connection, message *Message)
}

HubConnection is the an interface to hide the other methods of the Hub. This way `Connection`s can only write to the Hub and not call its other methods.

type Message

type Message struct {
	Opcode uint16 `json:"opcode"`

	Uuid string `json:"uuid"`

	ChannelName string `json:"channel_name"`

	Body []byte `json:"body"`
	// contains filtered or unexported fields
}

Message represents the framing of the messages that get sent back and forth.

func Unmarshal

func Unmarshal(b []byte) (*Message, error)

Unmarshal converts a slice of bytes into a Message struct.

func (*Message) Marshal

func (m *Message) Marshal() ([]byte, error)

Marshal converts the Message struct into bytes to transmit over a connection.

type MultiPlexHub

type MultiPlexHub struct {
	// contains filtered or unexported fields
}

MultiPlexHub is the standard hub that handles interaction between clients and other hubs.

func (*MultiPlexHub) Auth

func (h *MultiPlexHub) Auth() ConnectionAuth

Auth returns the auther object for use in the server.

func (*MultiPlexHub) ReceivedSisterMessage

func (h *MultiPlexHub) ReceivedSisterMessage(conn Connection, message *Message)

ReceivedSisterMessage is just like Write, expect it sets the isSister flag.

func (*MultiPlexHub) RunLoop

func (h *MultiPlexHub) RunLoop()

RunLoop is the loop that runs forever processing messages from connections.

func (*MultiPlexHub) SisterManager

func (h *MultiPlexHub) SisterManager() SisterManager

SisterManager returns the auther object for use in the server.

func (*MultiPlexHub) Write

func (h *MultiPlexHub) Write(conn Connection, message *Message)

Write is the implementation of HubConnection. This way clients can write messages to the hub without being able to call RunLoop.

type Server

type Server struct {
	Port     int
	CertName string
	KeyName  string
	Router   http.Handler
	// contains filtered or unexported fields
}

Server is the implementation of ServerClient.

func New

func New(port int, deduper DeDuplication, auther ConnectionAuth, storer Storage, serverHandler ServerHubHandler, sisterManager SisterManager) *Server

New takes in everything need to setup a Server and have all the interfaces implemented. port is what port to bind on. deduper is the DeDuplication interface to use message deduplication. auther is the ConnectionAuth interface to use for auth. storer is the Storage interface to use message storage. serverHandler is the ServerHubHandler interface to use for one to one operations. sisterManager is the SisterManager interface to use for handling federation.

func (*Server) AddSister

func (s *Server) AddSister(sister SisterClient) error

AddSister adds a sister server to use for federation. It sends and receives messages to the other server.

func (*Server) Start

func (s *Server) Start(useHTTPServer bool) error

Start starts the websocket server to allow connections. useHTTPServer is if conductor should start an HTTP server or not. Set this to no if you are going to install the WebsocketHandler into your own HTTP system.

func (*Server) WebsocketHandler

func (s *Server) WebsocketHandler(w http.ResponseWriter, r *http.Request)

WebsocketHandler is the handler of the HTTP HandleFunc. This way you can install conductor into your current HTTP stack.

type ServerClient

type ServerClient interface {
	Start(useHTTPServer bool) error
	AddSister(sister SisterClient) error
}

ServerClient is the based interface for mocking.

type ServerHubHandler

type ServerHubHandler interface {
	Process(conn Connection, message *Message)
}

ServerHubHandler is the based interface for handling one to one server message between the client and the server.

type SimpleAuth

type SimpleAuth struct {
}

SimpleAuth is the default implmentation of ConnectionAuth. It simply accepts every websocket request. You probably shouldn't use this in production unless you want no auth for some reason.

func NewSimpleAuth

func NewSimpleAuth() *SimpleAuth

NewSimpleAuth creates a SimpleAuth to use. Don't use this for anything but testing as you need to create proper auth

func (*SimpleAuth) CanBind

func (s *SimpleAuth) CanBind(conn Connection, message *Message) bool

CanBind should check if a connection has rights to bind to channel. It accepts any client that request to bind.

func (*SimpleAuth) CanWrite

func (s *SimpleAuth) CanWrite(conn Connection, message *Message) bool

CanWrite should check if a connection has rights to write to channel. This makes sure the connection is bound to channel before allowing a write.

func (*SimpleAuth) ConnToRequest

func (s *SimpleAuth) ConnToRequest(r *http.Request, conn Connection)

ConnToRequest does nothing because simple auth doesn't use auth tokens!

func (*SimpleAuth) IsSister

func (s *SimpleAuth) IsSister(r *http.Request) bool

IsSister should check if a connection is a sister node or not. This example just looks for the header is_sister and respects that. This should be a much more robust check in production. TODO: This needs an offer/answer setup to ensure security

func (*SimpleAuth) IsValid

func (s *SimpleAuth) IsValid(r *http.Request) bool

IsValid that accept every connection!

type SimpleMaxSisterManager

type SimpleMaxSisterManager struct {
	// contains filtered or unexported fields
}

SimpleMaxSisterManager is the implementation of SisterManager. It provides scaling of the sisters with a limit of connections you want between each sister. For example let's say there are 5 Conductor servers. We can set a max of 2 sisters per server. This means each server will connect to 2 other sister servers and balance the connections to ensure redundancy and scale. The trick is to balance how much "noise" of duplicate messages being sent versus how quickly you want a message to move through the web of connections. You also need to consider how many connections from the sisters are open per server. More sisters per server means less available sockets for the clients.

func NewSisterManager

func NewSisterManager() *SimpleMaxSisterManager

NewSisterManager is used to create a new SimpleMaxSisterManager

func (*SimpleMaxSisterManager) HandleMetaQueryResponse

func (s *SimpleMaxSisterManager) HandleMetaQueryResponse(b []byte)

HandleMetaQueryResponse handles the response of a meta query

func (*SimpleMaxSisterManager) MetaQueryResponse

func (s *SimpleMaxSisterManager) MetaQueryResponse() []byte

MetaQueryResponse returns the meta data to use in the response

func (*SimpleMaxSisterManager) SisterConnected

func (s *SimpleMaxSisterManager) SisterConnected(c Connection)

SisterConnected adds a server because we got a new sister connection (we might need to requery and balance).

func (*SimpleMaxSisterManager) SisterDisconnected

func (s *SimpleMaxSisterManager) SisterDisconnected(c Connection)

SisterDisconnected removes a server because we lost a sister (we might need to requery and balance).

func (*SimpleMaxSisterManager) Start

func (s *SimpleMaxSisterManager) Start()

Start builds a list of servers using a discovery protocol or service (like consult). You could also just use a hard set list or config file if you wanted (like this implementation does).

func (*SimpleMaxSisterManager) Write

func (s *SimpleMaxSisterManager) Write(message *Message)

Write forwards this message onto the sisters under its care.

type SimpleStorage

type SimpleStorage struct {
	// contains filtered or unexported fields
}

SimpleStorage is the default implmentation of Storage. It simply stores the last X messages for each channel. You probably shouldn't use this in production.

func NewSimpleStorage

func NewSimpleStorage(limit int) *SimpleStorage

NewSimpleStorage creates a SimpleStorage to use. Don't use this for anything but testing limit is the amount of messages store for each channel in memory. 100 is a easy default.

func (*SimpleStorage) Get

func (s *SimpleStorage) Get(channelName string) []Message

Get retrieves the messages for that channel

func (*SimpleStorage) SentTo

func (s *SimpleStorage) SentTo(sender, conn Connection, message *Message)

SentTo does nothing in simple storage.

func (*SimpleStorage) Store

func (s *SimpleStorage) Store(conn Connection, message *Message)

Store puts the X amount of messages in the list

type SisterClient

type SisterClient interface {
	Connect(h HubConnection) error // do the network connection to the sister server
	Write(message *Message)        // write a message to the sister server
	ReadLoop(h HubConnection)      // start the read loop to process messages from the sister server

}

SisterClient is the based interface for a sister connection.

type SisterManager

type SisterManager interface {
	Start()                          // start by fetching/searching for your sister servers.
	Write(message *Message)          // write a message to all the sister servers under this management
	SisterConnected(c Connection)    // notification when a sister connects to this server
	SisterDisconnected(c Connection) // notification when a sister disconnects from this server
	MetaQueryResponse() []byte       // respond to a meta query with some content
	HandleMetaQueryResponse([]byte)  // this manager got a response to a query it sent
	// contains filtered or unexported methods
}

SisterManager is the based interface for handling scaling of sister nodes. See SimpleMaxSisterManager for more details and a default implementation.

type SisterManagerClient

type SisterManagerClient interface {
	// contains filtered or unexported methods
}

SisterManagerClient is the based interface for handling adding sisters.

type SisterServer

type SisterServer struct {
	ServerURL string
	// contains filtered or unexported fields
}

SisterServer is the standard server that handles interaction between two server and their message hubs. NOTE: I can't stress enough how important a good deduper in the hub is. This is the only way to not get stuck in an infinite messaging loop bouncing between sister to sister.

func NewSisterServer

func NewSisterServer(serverURL string, headers map[string]string) *SisterServer

NewSisterServer creates a new sister server object. serverURL is the other server url to connect with. h is the hub to write to.

func (*SisterServer) Connect

func (s *SisterServer) Connect(h HubConnection) error

Connect creates a WebSocket connection to the other server.

func (*SisterServer) ReadLoop

func (s *SisterServer) ReadLoop(h HubConnection)

ReadLoop starts the reading loop from the websocket. It then reads any incoming messages from other sister servers and processes them accordingly.

func (*SisterServer) Write

func (s *SisterServer) Write(message *Message)

Write handles taking a message from the hub and sending it back to the server this object represents.

type SisterServerClient

type SisterServerClient interface {
}

SisterServerClient is the based interface for a sister server. Server implements this

type StandardDeDuplication

type StandardDeDuplication struct {
	// contains filtered or unexported fields
}

StandardDeDuplication is the default implmentation of DeDuplication. It works by holding the message in memory for a period of time waiting to see if a duplication will arrive. If "durablity" is enabled for the message it will be removed as soon as a message is fin'ed. - Might not do this... TODO: look at possible race condition with the timestamps map...

func NewDeDuper

func NewDeDuper(tick, ttl time.Duration) *StandardDeDuplication

NewDeDuper creates a StandardDeDuplication to use. tick is how often the cleanup sweep should happen. ttl is how long a message should live in the deduper cache.

func (*StandardDeDuplication) Add

func (deduper *StandardDeDuplication) Add(message *Message)

Add puts a timestamp in the timestamps map based on the message's ID. The message will then be checked in the ticker's clean up sweep to remove the message if it is past the ttl.

func (*StandardDeDuplication) IsDuplicate

func (deduper *StandardDeDuplication) IsDuplicate(message *Message) bool

IsDuplicate checks to see if the message has a duplicate id of any of the messages stored in the timestamp map.

func (*StandardDeDuplication) Remove

func (deduper *StandardDeDuplication) Remove(message *Message)

Remove removes a message based on the ID of the message from the timestamp map.

func (*StandardDeDuplication) Start

func (deduper *StandardDeDuplication) Start()

Start kicks off the ticker so it can do a sweep based on the ttl and cleanup any stale messages that didn't get purged (this is much more likely with messages that aren't durable).

type Storage

type Storage interface {
	Store(conn Connection, message *Message)          //user on this connection wrote a message to a channel.
	SentTo(sender, conn Connection, message *Message) //a connection sent a message to the other connection.
}

Storage is the based interface for handling data storage.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL