comet

package module
v0.0.0-...-522d870 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 7, 2022 License: BSD-2-Clause Imports: 7 Imported by: 5

README

melody

Build Status Coverage Status GoDoc

🎶 Minimalist websocket framework for Go.

Melody is websocket framework based on github.com/gorilla/websocket that abstracts away the tedious parts of handling websockets. It gets out of your way so you can write real-time apps. Features include:

  • Clear and easy interface similar to net/http or Gin.
  • A simple way to broadcast to all or selected connected sessions.
  • Message buffers making concurrent writing safe.
  • Automatic handling of ping/pong and session timeouts.
  • Store data on sessions.

Install

go get gopkg.in/olahol/melody.v1

Example: chat

Chat

Using Gin:

package main

import (
	"github.com/gin-gonic/gin"
	"gopkg.in/olahol/melody.v1"
	"net/http"
)

func main() {
	r := gin.Default()
	m := melody.New()

	r.GET("/", func(c *gin.Context) {
		http.ServeFile(c.Writer, c.Request, "index.html")
	})

	r.GET("/ws", func(c *gin.Context) {
		m.HandleRequest(c.Writer, c.Request)
	})

	m.HandleMessage(func(s *melody.Session, msg []byte) {
		m.Broadcast(msg)
	})

	r.Run(":5000")
}

Using Echo:

package main

import (
	"github.com/labstack/echo"
	"github.com/labstack/echo/engine/standard"
	"github.com/labstack/echo/middleware"
	"gopkg.in/olahol/melody.v1"
	"net/http"
)

func main() {
	e := echo.New()
	m := melody.New()

	e.Use(middleware.Logger())
	e.Use(middleware.Recover())

	e.GET("/", func(c echo.Context) error {
		http.ServeFile(c.Response().(*standard.Response).ResponseWriter, c.Request().(*standard.Request).Request, "index.html")
		return nil
	})

	e.GET("/ws", func(c echo.Context) error {
		m.HandleRequest(c.Response().(*standard.Response).ResponseWriter, c.Request().(*standard.Request).Request)
		return nil
	})

	m.HandleMessage(func(s *melody.Session, msg []byte) {
		m.Broadcast(msg)
	})

	e.Run(standard.New(":5000"))
}

Example: gophers

Gophers

package main

import (
	"github.com/gin-gonic/gin"
	"gopkg.in/olahol/melody.v1"
	"net/http"
	"strconv"
	"strings"
	"sync"
)

type GopherInfo struct {
	ID, X, Y string
}

func main() {
	router := gin.Default()
	mrouter := melody.New()
	gophers := make(map[*melody.Session]*GopherInfo)
	lock := new(sync.Mutex)
	counter := 0

	router.GET("/", func(c *gin.Context) {
		http.ServeFile(c.Writer, c.Request, "index.html")
	})

	router.GET("/ws", func(c *gin.Context) {
		mrouter.HandleRequest(c.Writer, c.Request)
	})

	mrouter.HandleConnect(func(s *melody.Session) {
		lock.Lock()
		for _, info := range gophers {
			s.Write([]byte("set " + info.ID + " " + info.X + " " + info.Y))
		}
		gophers[s] = &GopherInfo{strconv.Itoa(counter), "0", "0"}
		s.Write([]byte("iam " + gophers[s].ID))
		counter += 1
		lock.Unlock()
	})

	mrouter.HandleDisconnect(func(s *melody.Session) {
		lock.Lock()
		mrouter.BroadcastOthers([]byte("dis "+gophers[s].ID), s)
		delete(gophers, s)
		lock.Unlock()
	})

	mrouter.HandleMessage(func(s *melody.Session, msg []byte) {
		p := strings.Split(string(msg), " ")
		lock.Lock()
		info := gophers[s]
		if len(p) == 2 {
			info.X = p[0]
			info.Y = p[1]
			mrouter.BroadcastOthers([]byte("set "+info.ID+" "+info.X+" "+info.Y), s)
		}
		lock.Unlock()
	})

	router.Run(":5000")
}
More examples

Documentation

Contributors

  • Ola Holmström (@olahol)
  • Shogo Iwano (@shiwano)
  • Matt Caldwell (@mattcaldwell)
  • Heikki Uljas (@huljas)
  • Robbie Trencheny (@robbiet480)
  • yangjinecho (@yangjinecho)

FAQ

If you are getting a 403 when trying to connect to your websocket you can change allow all origin hosts:

m := melody.New()
m.Upgrader.CheckOrigin = func(r *http.Request) bool { return true }

Documentation

Overview

Package comet implements a framework for dealing with WebSockets.

Example

A broadcasting echo server:

func main() {
	r := gin.Default()
	m := comet.New()
	r.GET("/ws", func(c *gin.Context) {
		m.HandleRequest(c.Writer, c.Request)
	})
	m.HandleMessage(func(s *comet.Session, msg []byte) {
		m.Push(msg)
	})
	r.Run(":5000")
}

Index

Constants

View Source
const (
	CloseNormalClosure           = 1000
	CloseGoingAway               = 1001
	CloseProtocolError           = 1002
	CloseUnsupportedData         = 1003
	CloseNoStatusReceived        = 1005
	CloseAbnormalClosure         = 1006
	CloseInvalidFramePayloadData = 1007
	ClosePolicyViolation         = 1008
	CloseMessageTooBig           = 1009
	CloseMandatoryExtension      = 1010
	CloseInternalServerErr       = 1011
	CloseServiceRestart          = 1012
	CloseTryAgainLater           = 1013
	CloseTLSHandshake            = 1015
)

Close codes defined in RFC 6455, section 11.7. Duplicate of codes from gorilla/websocket for convenience.

Variables

View Source
var (
	// ErrDisposed is returned when an operation is performed on a disposed
	// queue.
	ErrDisposed = errors.New(`ring: disposed`)

	// ErrTimeout is returned when an applicable queue operation times out.
	ErrTimeout = errors.New(`ring: timed out`)

	// panic(`Ring buffer in a compromised state during a put operation.`)
	ErrPanic = errors.New(`ring: panic`)
)

Functions

func FormatCloseMessage

func FormatCloseMessage(closeCode int, text string) []byte

FormatCloseMessage formats closeCode and text as a WebSocket close message.

func NewRooms

func NewRooms(size uint64) *rooms

Types

type Comet

type Comet struct {
	Config   *Config
	Upgrader *websocket.Upgrader
	// contains filtered or unexported fields
}

Comet implements a websocket manager.

func New

func New(opts ...MelodyOpt) *Comet

New creates a new comet instance with default Upgrader and Config.

func (*Comet) Broadcast

func (c *Comet) Broadcast(msg []byte, rooms ...string) error

Broadcast broadcasts a text message to all sessions.

func (*Comet) BroadcastBinary

func (c *Comet) BroadcastBinary(msg []byte, rooms ...string) error

BroadcastBinary broadcasts a binary message to all sessions.

func (*Comet) BroadcastBinaryFilter

func (c *Comet) BroadcastBinaryFilter(msg []byte, fn func(*Session) bool, rooms ...string) error

BroadcastBinaryFilter broadcasts a binary message to all sessions that fn returns true for.

func (*Comet) BroadcastBinaryOthers

func (c *Comet) BroadcastBinaryOthers(msg []byte, s *Session) error

BroadcastBinaryOthers broadcasts a binary message to all sessions except session s.

func (*Comet) BroadcastFilter

func (c *Comet) BroadcastFilter(msg []byte, fn func(*Session) bool, rooms ...string) error

BroadcastFilter broadcasts a text message to all sessions that fn returns true for.

func (*Comet) BroadcastOthers

func (c *Comet) BroadcastOthers(msg []byte, s *Session, rooms ...string) error

BroadcastOthers broadcasts a text message to all sessions except session s.

func (*Comet) Close

func (c *Comet) Close() error

Close closes the comet instance and all connected sessions.

func (*Comet) CloseWithMsg

func (c *Comet) CloseWithMsg(msg []byte) error

CloseWithMsg closes the comet instance with the given close payload and all connected sessions. Use the FormatCloseMessage function to format a proper close message payload.

func (*Comet) HandleClose

func (c *Comet) HandleClose(fn func(*Session, int, string) error)

HandleClose sets the handler for close messages received from the session. The code argument to h is the received close code or CloseNoStatusReceived if the close message is empty. The default close handler sends a close frame back to the session.

The application must read the connection to process close messages as described in the section on Control Frames above.

The connection read methods return a CloseError when a close frame is received. Most applications should handle close messages as part of their normal error handling. Applications should only set a close handler when the application must perform some action before sending a close frame back to the session.

func (*Comet) HandleConnect

func (c *Comet) HandleConnect(fn func(*Session))

HandleConnect fires fn when a session connects.

func (*Comet) HandleDisconnect

func (c *Comet) HandleDisconnect(fn func(*Session))

HandleDisconnect fires fn when a session disconnects.

func (*Comet) HandleError

func (c *Comet) HandleError(fn func(*Session, error))

HandleError fires fn when a session has an error.

func (*Comet) HandleMessage

func (c *Comet) HandleMessage(fn func(*Session, []byte))

HandleMessage fires fn when a text message comes in.

func (*Comet) HandleMessageBinary

func (c *Comet) HandleMessageBinary(fn func(*Session, []byte))

HandleMessageBinary fires fn when a binary message comes in.

func (*Comet) HandlePong

func (c *Comet) HandlePong(fn func(*Session))

HandlePong fires fn when a pong is received from a session.

func (*Comet) HandleRequest

func (c *Comet) HandleRequest(w http.ResponseWriter, r *http.Request) error

HandleRequest upgrades http requests to websocket connections and dispatches them to be handled by the comet instance.

func (*Comet) HandleRequestWithKeys

func (c *Comet) HandleRequestWithKeys(w http.ResponseWriter, r *http.Request, keys map[string]interface{}) error

HandleRequestWithKeys does the same as HandleRequest but populates session.keys with keys.

func (*Comet) HandleSentMessage

func (c *Comet) HandleSentMessage(fn func(*Session, []byte))

HandleSentMessage fires fn when a text message is successfully sent.

func (*Comet) HandleSentMessageBinary

func (c *Comet) HandleSentMessageBinary(fn func(*Session, []byte))

HandleSentMessageBinary fires fn when a binary message is successfully sent.

func (*Comet) IsClosed

func (c *Comet) IsClosed() bool

IsClosed returns the status of the comet instance.

func (*Comet) Len

func (c *Comet) Len() int

Len return the number of connected sessions.

type Config

type Config struct {
	WriteWait           time.Duration // Milliseconds until write times out.
	PongWait            time.Duration // Timeout for waiting on pong.
	PingPeriod          time.Duration // Milliseconds between pings.
	MaxMessageSize      int64         // Maximum size in bytes of a message.
	MessageBufferSize   uint64        // The max amount of messages that can be in a sessions buffer before it starts dropping them.
	WriteBufferPollSize uint64
	Rooms               Rooms
}

Config comet configuration struct.

type LinkList struct {
	// contains filtered or unexported fields
}

LinkList represents a doubly linked list. The zero value for LinkList is an empty list ready to use.

func NewLinkList() *LinkList

NewLinkList returns an initialized list.

func (*LinkList) Back

func (l *LinkList) Back() *LinkNode

Back returns the last element of list l or nil if the list is empty.

func (*LinkList) Front

func (l *LinkList) Front() *LinkNode

Front returns the first element of list l or nil if the list is empty.

func (*LinkList) Init

func (l *LinkList) Init() *LinkList

Init initializes or clears list l.

func (*LinkList) InsertAfter

func (l *LinkList) InsertAfter(v *Session, mark *LinkNode) *LinkNode

InsertAfter inserts a new element e with value v immediately after mark and returns e. If mark is not an element of l, the list is not modified. The mark must not be nil.

func (*LinkList) InsertBefore

func (l *LinkList) InsertBefore(v *Session, mark *LinkNode) *LinkNode

InsertBefore inserts a new element e with value v immediately before mark and returns e. If mark is not an element of l, the list is not modified. The mark must not be nil.

func (*LinkList) Len

func (l *LinkList) Len() int

Len returns the number of elements of list l. The complexity is O(1).

func (*LinkList) MoveAfter

func (l *LinkList) MoveAfter(e, mark *LinkNode)

MoveAfter moves element e to its new position after mark. If e or mark is not an element of l, or e == mark, the list is not modified. The element and mark must not be nil.

func (*LinkList) MoveBefore

func (l *LinkList) MoveBefore(e, mark *LinkNode)

MoveBefore moves element e to its new position before mark. If e or mark is not an element of l, or e == mark, the list is not modified. The element and mark must not be nil.

func (*LinkList) MoveToBack

func (l *LinkList) MoveToBack(e *LinkNode)

MoveToBack moves element e to the back of list l. If e is not an element of l, the list is not modified. The element must not be nil.

func (*LinkList) MoveToFront

func (l *LinkList) MoveToFront(e *LinkNode)

MoveToFront moves element e to the front of list l. If e is not an element of l, the list is not modified. The element must not be nil.

func (*LinkList) PushBack

func (l *LinkList) PushBack(v *Session) *LinkNode

PushBack inserts a new element e with value v at the back of list l and returns e.

func (*LinkList) PushBackList

func (l *LinkList) PushBackList(other *LinkList)

PushBackList inserts a copy of another list at the back of list l. The lists l and other may be the same. They must not be nil.

func (*LinkList) PushFront

func (l *LinkList) PushFront(v *Session) *LinkNode

PushFront inserts a new element e with value v at the front of list l and returns e.

func (*LinkList) PushFrontList

func (l *LinkList) PushFrontList(other *LinkList)

PushFrontList inserts a copy of another list at the front of list l. The lists l and other may be the same. They must not be nil.

func (*LinkList) Remove

func (l *LinkList) Remove(e *LinkNode) interface{}

Remove removes e from l if e is an element of list l. It returns the element value e.Value. The element must not be nil.

type LinkNode

type LinkNode struct {

	// The value stored with this element.
	Value *Session
	// contains filtered or unexported fields
}

LinkNode is an element of a linked list.

func (*LinkNode) Next

func (e *LinkNode) Next() *LinkNode

Next returns the next list element or nil.

func (*LinkNode) Prev

func (e *LinkNode) Prev() *LinkNode

Prev returns the previous list element or nil.

type MelodyOpt

type MelodyOpt func(*Config)

Comet implements a websocket manager.

type RingBuffer

type RingBuffer struct {
	// contains filtered or unexported fields
}

RingBuffer is a MPMC buffer that achieves threadsafety with CAS operations only. A put on full or get on empty call will block until an item is put or retrieved. Calling Dispose on the RingBuffer will unblock any blocked threads with an error. This buffer is similar to the buffer described here: http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue with some minor additions.

func NewRingBuffer

func NewRingBuffer(size uint64) *RingBuffer

NewRingBuffer will allocate, initialize, and return a ring buffer with the specified size.

func (*RingBuffer) Cap

func (rb *RingBuffer) Cap() uint64

Cap returns the capacity of this ring buffer.

func (*RingBuffer) Dispose

func (rb *RingBuffer) Dispose()

Dispose will dispose of this queue and free any blocked threads in the Put and/or Get methods. Calling those methods on a disposed queue will return an error.

func (*RingBuffer) Get

func (rb *RingBuffer) Get(timeouts ...time.Duration) (*envelope, error)

Get will return the next item in the queue. This call will block if the queue is empty. This call will unblock when an item is added to the queue or Dispose is called on the queue. An error will be returned if the queue is disposed.

func (*RingBuffer) IsDisposed

func (rb *RingBuffer) IsDisposed() bool

IsDisposed will return a bool indicating if this queue has been disposed.

func (*RingBuffer) Len

func (rb *RingBuffer) Len() uint64

Len returns the number of items in the queue.

func (*RingBuffer) Poll

func (rb *RingBuffer) Poll(timeout time.Duration) (*envelope, error)

Poll will return the next item in the queue. This call will block if the queue is empty. This call will unblock when an item is added to the queue, Dispose is called on the queue, or the timeout is reached. An error will be returned if the queue is disposed or a timeout occurs. A non-positive timeout will block indefinitely.

func (*RingBuffer) Put

func (rb *RingBuffer) Put(item *envelope) error

Put adds the provided item to the queue. If the queue is full, this call will block until an item is added to the queue or Dispose is called on the queue. An error will be returned if the queue is disposed.

type Rooms

type Rooms interface {
	Register(*Session)
	Unregister(*Session)
	AllOnline() int
	Count() int
	Join(string, *Session)
	Quit(string, *Session)
	Close([]byte)
	Online(string) int
	Push(int, []byte, ...string) error
	Range(string, func(*Session))
	Run()
}

type Session

type Session struct {
	Request *http.Request
	// contains filtered or unexported fields
}

Session wrapper around websocket connections.

func (*Session) Close

func (s *Session) Close() error

Close closes session.

func (*Session) CloseWithMsg

func (s *Session) CloseWithMsg(msg []byte) error

CloseWithMsg closes the session with the provided payload. Use the FormatCloseMessage function to format a proper close message payload.

func (*Session) Get

func (s *Session) Get(key string) (value interface{}, exists bool)

Get returns the value for the given key, ie: (value, true). If the value does not exists it returns (nil, false)

func (*Session) IsClosed

func (s *Session) IsClosed() bool

IsClosed returns the status of the connection.

func (*Session) JoinRoom

func (s *Session) JoinRoom(name string)

JoinRoom session will join room.

func (*Session) MustGet

func (s *Session) MustGet(key string) interface{}

MustGet returns the value for the given key if it exists, otherwise it panics.

func (*Session) QuitRoom

func (s *Session) QuitRoom(name string)

QuitRoom session will quit room.

func (*Session) Set

func (s *Session) Set(key string, value interface{})

Set is used to store a new key/value pair exclusivelly for this session. It also lazy initializes s.keys if it was not used previously.

func (*Session) Write

func (s *Session) Write(msg []byte) error

Write writes message to session.

func (*Session) WriteBinary

func (s *Session) WriteBinary(msg []byte) error

WriteBinary writes a binary message to session.

Directories

Path Synopsis
examples

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL