Documentation ¶
Index ¶
- Constants
- type Client
- type Connection
- type ConnectionAuth
- type DeDuplication
- type Hub
- type HubConnection
- type Message
- type MultiPlexHub
- type Server
- type ServerClient
- type ServerHubHandler
- type SimpleAuth
- func (s *SimpleAuth) CanBind(conn Connection, message *Message) bool
- func (s *SimpleAuth) CanWrite(conn Connection, message *Message) bool
- func (s *SimpleAuth) ConnToRequest(r *http.Request, conn Connection)
- func (s *SimpleAuth) IsSister(r *http.Request) bool
- func (s *SimpleAuth) IsValid(r *http.Request) bool
- type SimpleMaxSisterManager
- func (s *SimpleMaxSisterManager) HandleMetaQueryResponse(b []byte)
- func (s *SimpleMaxSisterManager) MetaQueryResponse() []byte
- func (s *SimpleMaxSisterManager) SisterConnected(c Connection)
- func (s *SimpleMaxSisterManager) SisterDisconnected(c Connection)
- func (s *SimpleMaxSisterManager) Start()
- func (s *SimpleMaxSisterManager) Write(message *Message)
- type SimpleStorage
- type SisterClient
- type SisterManager
- type SisterManagerClient
- type SisterServer
- type SisterServerClient
- type StandardDeDuplication
- type Storage
Constants ¶
const ( BindOpcode = iota // BindOpcode bind to a channel. This will create the channel if it does not exist. UnbindOpcode // UnbindOpcode unbind from a channel. WriteOpcode // WriteOpcode broadcasts on provided channel. ServerOpcode // ServerOpcode intend to be between a single client and the server (not broadcasted). CleanUpOpcode // a message to cleanup a disconnected client/connection. StreamStartOpcode // StreamStartOpcode signifies the start of a stream of a file StreamEndOpcode // StreamEndOpcode signifies the end of a stream of a file StreamWriteOpcode // StreamWriteOpcode signifies the write (a chunk) of a file MetaQueryOpcode // MetaQueryOpcode is for sister servers to query meta data from each other MetaQueryResponseOpcode // MetaQueryResponseOpcode is to respond to a meta query )
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Client ¶
type Client struct { Read <-chan *Message // contains filtered or unexported fields }
Client is basic websocket connection.
func NewClient ¶
NewClient allocates and returns a new channel ServerUrl is the server url to connect to.
func (*Client) ServerMessage ¶
ServerMessage sends a message to the server for server operations (like getting message history or something)
type Connection ¶
type Connection interface { Write(message *Message) error // Write is to send a message to the client this connection represents. ReadLoop(hub HubConnection) // ReadLoop is the loop that keeps this connection alive. Don't call this. Disconnect() // Disconnect is use to disconnect the connection. Channels() []string // Channels is to hold the channels this connection is bound to. Very useful for auth and cleanup. SetChannels(channels []string) // Update the channel list of this connection. Store(key, value string) // Store is a map of local storage for the connection. This way you can identify the connection in other interfaces. Get(key string) string // Get is a map of local storage for the connection. }
Connection is the based interface for mocking a connection.
type ConnectionAuth ¶
type ConnectionAuth interface { IsValid(r *http.Request) bool // IsValid is called on every HTTP upgrade request and should be used to validate auth tokens. ConnToRequest(r *http.Request, conn Connection) // ConnToRequest is called so you can map your HTTP request (like the auth token) to a current connection. CanBind(conn Connection, message *Message) bool // CanBind is called on every bind request, so optimizing it is highly recommended. CanWrite(conn Connection, message *Message) bool // CanWrite is called on every write request, so optimizing it is highly recommended. IsSister(r *http.Request) bool // IsSister is called on every HTTP upgrade request and should be used to check if an incoming connection is from a sister node or not. }
ConnectionAuth is the based interface for handling authentication and authorization. This is used for new HTTP requests that upgrade a websocket and the permissions to channels. Use this for checking for auth tokens and such to ensure only real clients can connect. and that connections have the right permissions to write or bind to a channel.
type DeDuplication ¶
type DeDuplication interface { Start() Add(message *Message) Remove(message *Message) IsDuplicate(message *Message) bool }
DeDuplication is the based interface for handling deduplication of messages. Use this for ensuring messages aren't processed twice in the hub.
type Hub ¶
type Hub interface { RunLoop() // This is the master run loop that processes all the messages that come into the channel. Write(conn Connection, message *Message) // Not sure if I like the duplicate method trick yet... Auth() ConnectionAuth // This returns the current auther (if one is used) SisterManager() SisterManager // This returns the current sister manager (if one is used) ReceivedSisterMessage(conn Connection, message *Message) // Handle a sister message into this hub }
Hub is the based interface for what methods aHub should provide.
type HubConnection ¶
type HubConnection interface { Write(conn Connection, message *Message) ReceivedSisterMessage(conn Connection, message *Message) }
HubConnection is the an interface to hide the other methods of the Hub. This way `Connection`s can only write to the Hub and not call its other methods.
type Message ¶
type Message struct { Opcode uint16 `json:"opcode"` Uuid string `json:"uuid"` ChannelName string `json:"channel_name"` Body []byte `json:"body"` // contains filtered or unexported fields }
Message represents the framing of the messages that get sent back and forth.
type MultiPlexHub ¶
type MultiPlexHub struct {
// contains filtered or unexported fields
}
MultiPlexHub is the standard hub that handles interaction between clients and other hubs.
func (*MultiPlexHub) Auth ¶
func (h *MultiPlexHub) Auth() ConnectionAuth
Auth returns the auther object for use in the server.
func (*MultiPlexHub) ReceivedSisterMessage ¶
func (h *MultiPlexHub) ReceivedSisterMessage(conn Connection, message *Message)
ReceivedSisterMessage is just like Write, expect it sets the isSister flag.
func (*MultiPlexHub) RunLoop ¶
func (h *MultiPlexHub) RunLoop()
RunLoop is the loop that runs forever processing messages from connections.
func (*MultiPlexHub) SisterManager ¶
func (h *MultiPlexHub) SisterManager() SisterManager
SisterManager returns the auther object for use in the server.
func (*MultiPlexHub) Write ¶
func (h *MultiPlexHub) Write(conn Connection, message *Message)
Write is the implementation of HubConnection. This way clients can write messages to the hub without being able to call RunLoop.
type Server ¶
type Server struct { Port int CertName string KeyName string Router http.Handler // contains filtered or unexported fields }
Server is the implementation of ServerClient.
func New ¶
func New(port int, deduper DeDuplication, auther ConnectionAuth, storer Storage, serverHandler ServerHubHandler, sisterManager SisterManager) *Server
New takes in everything need to setup a Server and have all the interfaces implemented. port is what port to bind on. deduper is the DeDuplication interface to use message deduplication. auther is the ConnectionAuth interface to use for auth. storer is the Storage interface to use message storage. serverHandler is the ServerHubHandler interface to use for one to one operations. sisterManager is the SisterManager interface to use for handling federation.
func (*Server) AddSister ¶
func (s *Server) AddSister(sister SisterClient) error
AddSister adds a sister server to use for federation. It sends and receives messages to the other server.
func (*Server) Start ¶
Start starts the websocket server to allow connections. useHTTPServer is if conductor should start an HTTP server or not. Set this to no if you are going to install the WebsocketHandler into your own HTTP system.
func (*Server) WebsocketHandler ¶
func (s *Server) WebsocketHandler(w http.ResponseWriter, r *http.Request)
WebsocketHandler is the handler of the HTTP HandleFunc. This way you can install conductor into your current HTTP stack.
type ServerClient ¶
type ServerClient interface { Start(useHTTPServer bool) error AddSister(sister SisterClient) error }
ServerClient is the based interface for mocking.
type ServerHubHandler ¶
type ServerHubHandler interface {
Process(conn Connection, message *Message)
}
ServerHubHandler is the based interface for handling one to one server message between the client and the server.
type SimpleAuth ¶
type SimpleAuth struct { }
SimpleAuth is the default implmentation of ConnectionAuth. It simply accepts every websocket request. You probably shouldn't use this in production unless you want no auth for some reason.
func NewSimpleAuth ¶
func NewSimpleAuth() *SimpleAuth
NewSimpleAuth creates a SimpleAuth to use. Don't use this for anything but testing as you need to create proper auth
func (*SimpleAuth) CanBind ¶
func (s *SimpleAuth) CanBind(conn Connection, message *Message) bool
CanBind should check if a connection has rights to bind to channel. It accepts any client that request to bind.
func (*SimpleAuth) CanWrite ¶
func (s *SimpleAuth) CanWrite(conn Connection, message *Message) bool
CanWrite should check if a connection has rights to write to channel. This makes sure the connection is bound to channel before allowing a write.
func (*SimpleAuth) ConnToRequest ¶
func (s *SimpleAuth) ConnToRequest(r *http.Request, conn Connection)
ConnToRequest does nothing because simple auth doesn't use auth tokens!
func (*SimpleAuth) IsSister ¶
func (s *SimpleAuth) IsSister(r *http.Request) bool
IsSister should check if a connection is a sister node or not. This example just looks for the header is_sister and respects that. This should be a much more robust check in production. TODO: This needs an offer/answer setup to ensure security
type SimpleMaxSisterManager ¶
type SimpleMaxSisterManager struct {
// contains filtered or unexported fields
}
SimpleMaxSisterManager is the implementation of SisterManager. It provides scaling of the sisters with a limit of connections you want between each sister. For example let's say there are 5 Conductor servers. We can set a max of 2 sisters per server. This means each server will connect to 2 other sister servers and balance the connections to ensure redundancy and scale. The trick is to balance how much "noise" of duplicate messages being sent versus how quickly you want a message to move through the web of connections. You also need to consider how many connections from the sisters are open per server. More sisters per server means less available sockets for the clients.
func NewSisterManager ¶
func NewSisterManager() *SimpleMaxSisterManager
NewSisterManager is used to create a new SimpleMaxSisterManager
func (*SimpleMaxSisterManager) HandleMetaQueryResponse ¶
func (s *SimpleMaxSisterManager) HandleMetaQueryResponse(b []byte)
HandleMetaQueryResponse handles the response of a meta query
func (*SimpleMaxSisterManager) MetaQueryResponse ¶
func (s *SimpleMaxSisterManager) MetaQueryResponse() []byte
MetaQueryResponse returns the meta data to use in the response
func (*SimpleMaxSisterManager) SisterConnected ¶
func (s *SimpleMaxSisterManager) SisterConnected(c Connection)
SisterConnected adds a server because we got a new sister connection (we might need to requery and balance).
func (*SimpleMaxSisterManager) SisterDisconnected ¶
func (s *SimpleMaxSisterManager) SisterDisconnected(c Connection)
SisterDisconnected removes a server because we lost a sister (we might need to requery and balance).
func (*SimpleMaxSisterManager) Start ¶
func (s *SimpleMaxSisterManager) Start()
Start builds a list of servers using a discovery protocol or service (like consult). You could also just use a hard set list or config file if you wanted (like this implementation does).
func (*SimpleMaxSisterManager) Write ¶
func (s *SimpleMaxSisterManager) Write(message *Message)
Write forwards this message onto the sisters under its care.
type SimpleStorage ¶
type SimpleStorage struct {
// contains filtered or unexported fields
}
SimpleStorage is the default implmentation of Storage. It simply stores the last X messages for each channel. You probably shouldn't use this in production.
func NewSimpleStorage ¶
func NewSimpleStorage(limit int) *SimpleStorage
NewSimpleStorage creates a SimpleStorage to use. Don't use this for anything but testing limit is the amount of messages store for each channel in memory. 100 is a easy default.
func (*SimpleStorage) Get ¶
func (s *SimpleStorage) Get(channelName string) []Message
Get retrieves the messages for that channel
func (*SimpleStorage) SentTo ¶
func (s *SimpleStorage) SentTo(sender, conn Connection, message *Message)
SentTo does nothing in simple storage.
func (*SimpleStorage) Store ¶
func (s *SimpleStorage) Store(conn Connection, message *Message)
Store puts the X amount of messages in the list
type SisterClient ¶
type SisterClient interface { Connect(h HubConnection) error // do the network connection to the sister server Write(message *Message) // write a message to the sister server ReadLoop(h HubConnection) // start the read loop to process messages from the sister server }
SisterClient is the based interface for a sister connection.
type SisterManager ¶
type SisterManager interface { Start() // start by fetching/searching for your sister servers. Write(message *Message) // write a message to all the sister servers under this management SisterConnected(c Connection) // notification when a sister connects to this server SisterDisconnected(c Connection) // notification when a sister disconnects from this server MetaQueryResponse() []byte // respond to a meta query with some content HandleMetaQueryResponse([]byte) // this manager got a response to a query it sent // contains filtered or unexported methods }
SisterManager is the based interface for handling scaling of sister nodes. See SimpleMaxSisterManager for more details and a default implementation.
type SisterManagerClient ¶
type SisterManagerClient interface {
// contains filtered or unexported methods
}
SisterManagerClient is the based interface for handling adding sisters.
type SisterServer ¶
type SisterServer struct { ServerURL string // contains filtered or unexported fields }
SisterServer is the standard server that handles interaction between two server and their message hubs. NOTE: I can't stress enough how important a good deduper in the hub is. This is the only way to not get stuck in an infinite messaging loop bouncing between sister to sister.
func NewSisterServer ¶
func NewSisterServer(serverURL string, headers map[string]string) *SisterServer
NewSisterServer creates a new sister server object. serverURL is the other server url to connect with. h is the hub to write to.
func (*SisterServer) Connect ¶
func (s *SisterServer) Connect(h HubConnection) error
Connect creates a WebSocket connection to the other server.
func (*SisterServer) ReadLoop ¶
func (s *SisterServer) ReadLoop(h HubConnection)
ReadLoop starts the reading loop from the websocket. It then reads any incoming messages from other sister servers and processes them accordingly.
func (*SisterServer) Write ¶
func (s *SisterServer) Write(message *Message)
Write handles taking a message from the hub and sending it back to the server this object represents.
type SisterServerClient ¶
type SisterServerClient interface { }
SisterServerClient is the based interface for a sister server. Server implements this
type StandardDeDuplication ¶
type StandardDeDuplication struct {
// contains filtered or unexported fields
}
StandardDeDuplication is the default implmentation of DeDuplication. It works by holding the message in memory for a period of time waiting to see if a duplication will arrive. If "durablity" is enabled for the message it will be removed as soon as a message is fin'ed. - Might not do this... TODO: look at possible race condition with the timestamps map...
func NewDeDuper ¶
func NewDeDuper(tick, ttl time.Duration) *StandardDeDuplication
NewDeDuper creates a StandardDeDuplication to use. tick is how often the cleanup sweep should happen. ttl is how long a message should live in the deduper cache.
func (*StandardDeDuplication) Add ¶
func (deduper *StandardDeDuplication) Add(message *Message)
Add puts a timestamp in the timestamps map based on the message's ID. The message will then be checked in the ticker's clean up sweep to remove the message if it is past the ttl.
func (*StandardDeDuplication) IsDuplicate ¶
func (deduper *StandardDeDuplication) IsDuplicate(message *Message) bool
IsDuplicate checks to see if the message has a duplicate id of any of the messages stored in the timestamp map.
func (*StandardDeDuplication) Remove ¶
func (deduper *StandardDeDuplication) Remove(message *Message)
Remove removes a message based on the ID of the message from the timestamp map.
func (*StandardDeDuplication) Start ¶
func (deduper *StandardDeDuplication) Start()
Start kicks off the ticker so it can do a sweep based on the ttl and cleanup any stale messages that didn't get purged (this is much more likely with messages that aren't durable).
type Storage ¶
type Storage interface { Store(conn Connection, message *Message) //user on this connection wrote a message to a channel. SentTo(sender, conn Connection, message *Message) //a connection sent a message to the other connection. }
Storage is the based interface for handling data storage.