rhub

package module
v0.0.0-...-90e7987 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 15, 2019 License: MIT Imports: 10 Imported by: 0

README

rhub

distribute websocket = websocket + redis

Example:
func startServer() {
	hub := NewHub(1, conf.Redis, "test-room-1")
	hub.AfterJoin(func(c IClient) {
		hub.SendRedis("join", nil, c.GetClient().GetProps())
	})
	hub.AfterLeave(func(c IClient) {
		hub.SendRedis("leave", nil, c.GetClient().GetProps())
	})
	hub.On("join", func(m *RedisHubMessage) {
		fmt.Println("join", *m)
	})
	hub.On("leave", func(m *RedisHubMessage) {
		fmt.Println("leave", *m)
	})
	hub.OnWs("im", func(m *ClientHubMessage) {
		fmt.Println("ws receive im:", string(*m.Data))
		hub.SendRedisRaw(m.HubMessageIn, m.Client.GetProps())
	})
	hub.On("im", func(m *RedisHubMessage) {
		var str string
		fmt.Println("im ", string(*m.Data))
		err := m.Decode(&str)
		fmt.Println(str, err)
		// hub.SendRedis("im", "your say:"+str, nil)
		hub.SendWsAll("im", "your say:"+str)
		if "close" == str {
			fmt.Println("close client")
			hub.Close()
			// go func() { hub.UnregisterChan() <- m.Client }()
		}
	})

	go hub.Run()
	http.HandleFunc("/", serveHome)
	http.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) {
		clientProp := map[string]interface{}{"user": &User{Name: "jim"}}
		ServeWs(hub, w, r, clientProp, DefaultUpgrader(), DefaultWsConfig())
	})
	fmt.Println("start server al 8080")
	err := http.ListenAndServe(":8080", nil)
	if err != nil {
		log.Fatal("ListenAndServe: ", err)
	}
}

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func DefaultUpgrader

func DefaultUpgrader() websocket.Upgrader

func Encode

func Encode(subject string, msg interface{}) ([]byte, error)

func ServeWs

func ServeWs(hub IHub, w http.ResponseWriter, r *http.Request, clientKeyValues map[string]interface{}, upgrader websocket.Upgrader, config WsConfig)

serveWs handles websocket requests from the peer.

Types

type Client

type Client struct {

	// props sync.Map
	Props map[string]interface{}
	// contains filtered or unexported fields
}

Client is a middleman between the websocket connection and the hub.

func (*Client) Close

func (c *Client) Close()

Close client ,in message loop should in go client.Close()

func (*Client) DecodeMessage

func (c *Client) DecodeMessage(data []byte) (*MessageIn, error)

func (*Client) Encode

func (c *Client) Encode(subject string, msg interface{}) []byte

func (*Client) EncodeMessage

func (c *Client) EncodeMessage(msg interface{}) []byte

func (*Client) GetClient

func (c *Client) GetClient() *Client

func (*Client) GetProps

func (c *Client) GetProps() map[string]interface{}

func (*Client) Hub

func (c *Client) Hub() IHub

func (*Client) NewClientMessage

func (c *Client) NewClientMessage(data []byte) (*ClientMessage, error)

func (*Client) ReadPump

func (c *Client) ReadPump()

readPump pumps messages from the websocket connection to the hub.

The application runs readPump in a per-connection goroutine. The application ensures that there is at most one reader on a connection by executing all reads from this goroutine.

func (*Client) SendChan

func (c *Client) SendChan() chan []byte
func (c *Client) Send(subject string, msg interface{}) {
	c.send <- c.Encode(subject, msg)
}

func (*Client) WritePump

func (c *Client) WritePump()

writePump pumps messages from the hub to the websocket connection.

A goroutine running writePump is started for each connection. The application ensures that there is at most one writer to a connection by executing all writes from this goroutine.

type ClientMessage

type ClientMessage struct {
	*MessageIn
	Client IClient
}
type RedisHubMessage struct {
	*MessageIn
}

func NewMessage

func NewMessage(subject string, data interface{}, client IClient) *ClientMessage

type Conf

type Conf struct {
	Redis string
}

type Filter

type Filter func(msg *MessageIn, next func())

type Handler

type Handler func(msg *MessageIn)

type HandlerWs

type HandlerWs func(message *ClientMessage)

type Hub

type Hub struct {
	// contains filtered or unexported fields
}

hub maintains the set of active clients and broadcasts messages to the clients.

func (*Hub) AfterJoin

func (h *Hub) AfterJoin(callback func(client IClient))

func (*Hub) AfterLeave

func (h *Hub) AfterLeave(callback func(client IClient))

func (*Hub) BeforeJoin

func (h *Hub) BeforeJoin(callback func(client IClient) error)

func (*Hub) BeforeLeave

func (h *Hub) BeforeLeave(callback func(client IClient))

func (*Hub) BeforeWsMsg

func (h *Hub) BeforeWsMsg(callback func(msg *ClientMessage) bool)

func (*Hub) ClientList

func (h *Hub) ClientList() []IClient

func (*Hub) Clients

func (h *Hub) Clients() map[IClient]bool

func (*Hub) Close

func (h *Hub) Close()

func (*Hub) CloseChan

func (h *Hub) CloseChan() chan struct{}
func (h *Hub) SetSelf(self IHub) {
	h.self = self
}

func (*Hub) EchoWs

func (h *Hub) EchoWs(msg *ClientMessage)

func (*Hub) GetSeconds

func (h *Hub) GetSeconds() int

func (*Hub) Id

func (h *Hub) Id() interface{}

func (*Hub) MessageChan

func (h *Hub) MessageChan() chan *ClientMessage

RegisterChan() chan *Client

func (*Hub) Off

func (h *Hub) Off(subject string, handler Handler)

func (*Hub) OffWs

func (h *Hub) OffWs(subject string, handler HandlerWs)

func (*Hub) On

func (h *Hub) On(subject string, handler Handler)

func (*Hub) OnTick

func (h *Hub) OnTick(cb func(int))

func (*Hub) OnWs

func (h *Hub) OnWs(subject string, handler HandlerWs)

func (*Hub) RegisterChan

func (h *Hub) RegisterChan() chan IClient

func (*Hub) ResetRedis

func (h *Hub) ResetRedis() error

func (*Hub) Run

func (h *Hub) Run()

func (*Hub) SendRedis

func (h *Hub) SendRedis(subject string, data interface{})

func (*Hub) SendRedisRaw

func (h *Hub) SendRedisRaw(msg *MessageIn)

func (*Hub) SendWs

func (h *Hub) SendWs(subject string, message interface{}, receivers ...IClient)

func (*Hub) SendWsAll

func (h *Hub) SendWsAll(subject string, message interface{})

func (*Hub) SendWsBytes

func (h *Hub) SendWsBytes(client IClient, bs []byte)

func (*Hub) SendWsClient

func (h *Hub) SendWsClient(client IClient, subject string, message interface{})

func (*Hub) UnregisterChan

func (h *Hub) UnregisterChan() chan IClient

type IClient

type IClient interface {
	Close()
	// Send() chan []byte
	SendChan() chan []byte
	// Send(subject string, msg interface{})
	Hub() IHub
	WritePump()
	ReadPump()
	GetProps() map[string]interface{}
	// Get(key interface{}) interface{}
	// Set(key, value interface{})
	NewClientMessage(data []byte) (*ClientMessage, error)
	GetClient() *Client
}

func NewClient

func NewClient(conn *websocket.Conn, hub IHub, config WsConfig) IClient

type IFilters

type IFilters interface {
	Do(fn Handler)
}

type IHub

type IHub interface {
	//Get hub's id
	Id() interface{}
	//before new client join
	BeforeJoin(callback func(client IClient) error)
	AfterJoin(callback func(client IClient))
	//On client leave
	BeforeLeave(callback func(client IClient))
	AfterLeave(callback func(client IClient))
	BeforeWsMsg(callback func(msg *ClientMessage) bool)
	//Add filter
	// Use(filter Filter)
	// Attach an event handler function
	On(subject string, handler Handler)
	OnWs(subject string, handler HandlerWs)
	// OnLocal(subject string, handler HandlerLocal)
	//simulate client send msg
	// Emit(msg *ClientMessage)
	//Dettach an event handler function
	Off(subject string, handler Handler)
	OffWs(subject string, handler HandlerWs)
	SendRedisRaw(msg *MessageIn)
	SendRedis(subject string, data interface{})
	//Send message to all clients
	SendWsAll(subject string, message interface{})
	SendWs(subject string, message interface{}, receivers ...IClient)
	EchoWs(msg *ClientMessage)
	Close()
	// CloseMessageLoop()
	// SetSelf(self IHub)
	Run()
	UnregisterChan() chan IClient
	RegisterChan() chan IClient
	MessageChan() chan *ClientMessage
	// MessageLocalChan() chan<- MessageLocal
	CloseChan() chan struct{}
	Clients() map[IClient]bool

	OnTick(func(int))
	GetSeconds() int
	SendWsClient(client IClient, subject string, message interface{})
	SendWsBytes(client IClient, bs []byte)
}

IHub like chat room

func NewHub

func NewHub(id interface{}, redisConStr, redisChannel string) IHub

type IRoute

type IRoute interface {
	Route(subject string) []Handler
	// Attach an event handler function
	On(subject string, handler Handler)
	//Dettach an event handler function
	Off(subject string, handler Handler)
}

find callbacks in hub

type MessageIn

type MessageIn struct {
	Subject string
	Data    *json.RawMessage
}

func NewMessageIn

func NewMessageIn(subject string, data interface{}) *MessageIn

func (*MessageIn) Decode

func (m *MessageIn) Decode(obj interface{}) error

func (*MessageIn) SetData

func (m *MessageIn) SetData(data interface{}) error

type MessageLocal

type MessageLocal struct {
	Subject string
	Data    interface{}
	Error   error
}

type MessageOut

type MessageOut struct {
	Subject string
	Data    interface{}
}

send Message should have this format

func (MessageOut) Encode

func (m MessageOut) Encode() ([]byte, error)

type User

type User struct {
	Name string
}

type WsConfig

type WsConfig struct {
	WriteWait, PongWait time.Duration
	MaxMessageSize      int64
	// contains filtered or unexported fields
}

func DefaultWsConfig

func DefaultWsConfig() WsConfig

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL