gows

package
v1.2.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 13, 2023 License: Apache-2.0, Apache-2.0 Imports: 21 Imported by: 0

README

主要特点

gows是一款方便易用的Go语言websocket库,gows使用简单,能够支持大量客户端连接。具体特征如下:

  • 采用更加经济的内存分配机制,使得每台服务器可接入更多的客户端。
  • 内置消息接收循环,简化了消息的接收以及处理。
  • 支持permessage-deflate压缩,支持设置压缩阈值,只有大于阈值的消息才会压缩发送。
  • 支持Close handshake,可以优雅地关闭websocket连接。
  • 支持并发消息发送。

安装

go get github.com/haming123/wego/gows

快速上手

  • 注册页面路由,启动web服务
package main
import (
	"github.com/haming123/wego/gows"
	"html/template"
	"net/http"
)
func main() {
	http.HandleFunc("/ws", HandlerWebSocket)
	http.HandleFunc("/index", func(w http.ResponseWriter, r *http.Request) {
		user := r.FormValue("user")
		t, _ := template.ParseFiles("./index.html")
		t.Execute(w, user)
	})
	http.ListenAndServe(":8080", nil)
}

其中: 1)/index是一个web页面,在该页面中连接websocket,发送并接收websocket消息。 2)在/ws页面中进行websocket握手并升级为websocket协议。

  • 定义websocket连接对应的结构体 在进行websocket握手前需要首先实现一个代表websocket连接的结构体,该结构体用于websocket事件的处理。该结构体需要实现以下接口方法:
OnClose(ws *WebSocket)
OnMessage(ws *WebSocket, opcode int, buff *ByteBuffer) error

当接收的消息时会调用OnMessage方法,消息的数据可以从buff中获取。websocket连接关闭时会调用OnClose方法。

type Client struct {
    ws   *gows.WebSocket
    user string
}

func (c *Client) OnClose(ws *gows.WebSocket) {
    log.Printf("OnClose: %s\n", c.user)
    c.ws = nil
}

func (c *Client) OnMessage(ws *gows.WebSocket, opcode int, vbuff *gows.ByteBuffer) error {
    log.Println("收到消息:", vbuff.GetString())
    c.ws.WriteString(vbuff.GetString())
    return nil
}
  • 实现websocket握手处理器函数,将http连接升级为websocket连接
func HandlerWebSocket(w http.ResponseWriter, r *http.Request) {
    user := r.FormValue("user")
    ws, err := gows.Accept(w, r, nil, nil)
    if err != nil {
        http.Error(w, err.Error(), http.StatusInternalServerError)
        return
    }
    client := &Client{ws: ws, user: user}
    ws.ServeRead(client)
}

其中的Client结构体代表客户端链接,必须实现OnMessage方法以及OnClose方法。创建Client对象后调用ws.Serve来启动消息接收循环。

使用wego的web框架实现webSocket握手

func HandlerWebSocket(c *wego.WebContext) {
    user := c.Param.MustString("user")
    ws, err := c.AcceptWebsocket(nil, nil)
    if err != nil {
        c.AbortWithError(500, err)
        return
    }
    client := &Client{ws: ws, user: user}
    ws.ServeRead(client)
}

发送消息

  • 发送文本消息 若要发送文本格式的消息,请调用WriteText或WriteString函数:
func (ws *WebSocket) WriteText(data []byte) error {
    ......
}
func (ws *WebSocket) WriteString(data string) error {
    ......
}
  • 若要发送二进制消息,使用WriteBinary方法
func (ws *WebSocket) WriteBinary(data []byte) error {
    ......
}
  • 发送JSON数据
type Book struct {
    Name  string
    Price float64
}

err := ws.WriteJSON(Book{"Golang", 30.2})
if err != nil {
    log.Error(err)
    return
}
  • 通过WriteCloser接口发送消息
writer := ws.NextWriter(gows.Frame_Text)
defer writer.Close()

_, err := writer.WriteString("hello")
if err != nil {
    log.Error(err)
    return
}

_, err := writer.WriteString(" world")
if err != nil {
    log.Error(err)
    return
}

启用消息压缩处理

package main
import (
	"github.com/haming123/wego/gows"
	"html/template"
	"net/http"
)
func main() {
	//开启发送压缩
	gows.UseFlate()
	//设置压缩的阈值,只有大于阈值的消息才会被压缩
	gows.SetMinCompressSize(512)
	http.HandleFunc("/ws", HandlerWebSocket)
	http.HandleFunc("/index", func(w http.ResponseWriter, r *http.Request) {
		user := r.FormValue("user")
		t, _ := template.ParseFiles("./index.html")
		t.Execute(w, user)
	})
	http.ListenAndServe(":8080", nil)
}

跨域处理函数

gows缺省不支持跨域访问,若需要开启跨域访问,则需要自定义跨域处理函数(通过:SetOriginCheckFunc来设置)。例如:

package main
import (
	"github.com/haming123/wego/gows"
	"html/template"
	"net/http"
)
func main() {
	//允许跨域访问
	gows.SetOriginCheckFunc(func(r *http.Request) bool {
		return true
	})
	http.HandleFunc("/ws", HandlerWebSocket)
	http.HandleFunc("/index", func(w http.ResponseWriter, r *http.Request) {
		user := r.FormValue("user")
		t, _ := template.ParseFiles("./index.html")
		t.Execute(w, user)
	})
	http.ListenAndServe(":8080", nil)
}

关闭客户端链接

关闭websocket时需要进行关闭协商:接任一端想关闭websocket,就发一个close frame给对端,对端收到该frame后,若之前没有发过close frame,则必须回复一个close frame。 采用协商方式关闭websocket连接,请调用CloseHandshake方法:

func (ws *WebSocket) CloseHandshake(code CloseCode, text string) error {
    ......
}

CloseHandshake方法中的code参数为webSocket关闭状态码。webSocket关闭状态码的列表如下所示:

const (
//正常关闭; 无论为何目的而创建, 该链接都已成功完成任务.
CloseNormalClosure CloseCode = 1000
//终端离开:可能因为服务端错误, 也可能因为浏览器正从打开连接的页面跳转离开.
CloseGoingAway CloseCode = 1001
//协议错误:由于协议错误而中断连接.
CloseProtocolError CloseCode = 1002
//数据格式错误:由于接收到不允许的数据类型而断开连接
CloseUnsupportedData CloseCode = 1003
//保留
CloseReserved CloseCode = 1004
//没有收到预期的状态码.
CloseNoCloseRcvd CloseCode = 1005
//异常关闭:用于期望收到状态码时连接非正常关闭 (也就是说, 没有发送关闭帧).
CloseAbnormalClosure CloseCode = 1006
//由于收到了格式不符的数据而断开连接 (如文本消息中包含了非 UTF-8 数据).
CloseInvalidPayload CloseCode = 1007
//由于收到不符合约定的数据而断开连接.
ClosePolicyViolation CloseCode = 1008
//由于收到过大的数据帧而断开连接.
CloseMessageTooBig CloseCode = 1009
//缺少扩展:客户端终止连接,因为期望一个或多个拓展, 但服务器没有.
CloseMandatoryExtension CloseCode = 1010
//内部错误:服务器终止连接,因为遇到异常
CloseInternalError CloseCode = 1011
//服务重启:服务器由于重启而断开连接.
CloseServiceRestart CloseCode = 1012
//稍后再试:服务器由于临时原因断开连接。
CloseTryAgainLater CloseCode = 1013
//错误的网关.
CloseBadGateway CloseCode = 1014
//握手错误:表示连接由于无法完成 TLS 握手而关闭 (例如无法验证服务器证书).
CloseTLSHandshake CloseCode = 1015
)

Documentation

Index

Constants

View Source
const (
	Frame_Null     = -1
	Frame_Continue = 0
	Frame_Text     = 1
	Frame_Binary   = 2
	Frame_Close    = 8
	Frame_Ping     = 9
	Frame_Pong     = 10
)

Variables

This section is empty.

Functions

func BytesToString

func BytesToString(bytes []byte) string

func CloseReader

func CloseReader(r *MessageReader) error

func CloseWriter

func CloseWriter(writer *MessageWriter) error

关闭MessageWriter,MessageWriter关闭后会解除对NextWriter调用的阻塞

func HeaderValueCheck

func HeaderValueCheck(header http.Header, key string, val string) bool

func MarshalCloseInfo

func MarshalCloseInfo(code CloseCode, text string) ([]byte, error)

func OriginHostCheck

func OriginHostCheck(r *http.Request) bool

func PutByteBuffer

func PutByteBuffer(b *ByteBuffer)

func SetDebugLogger

func SetDebugLogger(logger DebugLogger)

func SetFrameReadBuffSize

func SetFrameReadBuffSize(size int)

func SetLoggerPrefix

func SetLoggerPrefix(prefix string)

func SetMessageBufferSize

func SetMessageBufferSize(size int)

func SetMinCompressSize

func SetMinCompressSize(size int)

func SetOriginCheckFunc added in v1.1.7

func SetOriginCheckFunc(fn OriginCheckFunc)

func ShowDebugLog

func ShowDebugLog(show bool)

func StringToBytes

func StringToBytes(s string) []byte

func UseFlate

func UseFlate(val ...CompressAlloter)

func WriteAllTo

func WriteAllTo(buff []byte, dst io.Writer) error

Types

type AcceptOptions

type AcceptOptions struct {
	SubProtocols []string
	// contains filtered or unexported fields
}

func DefaultAcceptOptions

func DefaultAcceptOptions() *AcceptOptions

func NewAcceptOptions

func NewAcceptOptions() *AcceptOptions

func (*AcceptOptions) SetFrameReadBuffSize

func (this *AcceptOptions) SetFrameReadBuffSize(size int)

func (*AcceptOptions) SetMessageBufferSize

func (this *AcceptOptions) SetMessageBufferSize(size int)

func (*AcceptOptions) SetMinCompressSize

func (this *AcceptOptions) SetMinCompressSize(size int)

func (*AcceptOptions) SetOriginCheckFunc added in v1.1.7

func (this *AcceptOptions) SetOriginCheckFunc(fn OriginCheckFunc)

func (*AcceptOptions) UseFlate

func (this *AcceptOptions) UseFlate(val ...CompressAlloter)

type ByteBuffer

type ByteBuffer struct {
	// contains filtered or unexported fields
}

func GetByteBuffer

func GetByteBuffer(opts *AcceptOptions) *ByteBuffer

func (*ByteBuffer) CloneBytes

func (b *ByteBuffer) CloneBytes() []byte

func (*ByteBuffer) Close

func (b *ByteBuffer) Close()

func (*ByteBuffer) GetBytes

func (b *ByteBuffer) GetBytes() []byte

func (*ByteBuffer) GetString

func (b *ByteBuffer) GetString() string

func (*ByteBuffer) GetVolatileString

func (b *ByteBuffer) GetVolatileString() string

func (*ByteBuffer) ReadAll

func (b *ByteBuffer) ReadAll(reader io.Reader) error

func (*ByteBuffer) ReadFull

func (b *ByteBuffer) ReadFull(reader io.Reader) error

func (*ByteBuffer) Reset

func (b *ByteBuffer) Reset()

func (*ByteBuffer) Size

func (b *ByteBuffer) Size() int

type ChuckReadHandler added in v1.1.7

type ChuckReadHandler interface {
	SocketHandler
	OnRead(ws *WebSocket, opcode int, fin bool, buff *ByteBuffer) error
}

type CloseCode

type CloseCode uint16
const (
	//正常关闭; 无论为何目的而创建, 该链接都已成功完成任务.
	CloseNormalClosure CloseCode = 1000
	//终端离开:可能因为服务端错误, 也可能因为浏览器正从打开连接的页面跳转离开.
	CloseGoingAway CloseCode = 1001
	//协议错误:由于协议错误而中断连接.
	CloseProtocolError CloseCode = 1002
	//数据格式错误:由于接收到不允许的数据类型而断开连接
	CloseUnsupportedData CloseCode = 1003
	//保留
	CloseReserved CloseCode = 1004
	//没有收到预期的状态码.
	CloseNoCloseRcvd CloseCode = 1005
	//异常关闭:用于期望收到状态码时连接非正常关闭 (也就是说, 没有发送关闭帧).
	CloseAbnormalClosure CloseCode = 1006
	//由于收到了格式不符的数据而断开连接 (如文本消息中包含了非 UTF-8 数据).
	CloseInvalidPayload CloseCode = 1007
	//由于收到不符合约定的数据而断开连接. 这是一个通用状态码, 用于不适合使用 1003 和 1009 状态码的场景.
	ClosePolicyViolation CloseCode = 1008
	//由于收到过大的数据帧而断开连接.
	CloseMessageTooBig CloseCode = 1009
	//缺少扩展:客户端终止连接,因为期望一个或多个拓展, 但服务器没有.
	CloseMandatoryExtension CloseCode = 1010
	//内部错误:服务器终止连接,因为遇到异常
	CloseInternalError CloseCode = 1011
	//服务重启:服务器由于重启而断开连接.
	CloseServiceRestart CloseCode = 1012
	//稍后再试:服务器由于临时原因断开连接。
	CloseTryAgainLater CloseCode = 1013
	//错误的网关.
	CloseBadGateway CloseCode = 1014
	//握手错误:表示连接由于无法完成 TLS 握手而关闭 (例如无法验证服务器证书).
	CloseTLSHandshake CloseCode = 1015
)

type CloseInfo

type CloseInfo struct {
	Code CloseCode
	Info string
}

func (*CloseInfo) Error added in v1.1.7

func (cc *CloseInfo) Error() string

type CompressAlloter

type CompressAlloter interface {
	WebsocketExtension(args []string) string
	NewWriter(mw *FrameWriter) (io.WriteCloser, error)
	FlushWriter(fw io.WriteCloser) error
	ResetWriter(fw io.WriteCloser, mw *FrameWriter) error
	NewReader(mr *FrameReader) (io.ReadCloser, error)
	ResetReader(fr io.ReadCloser, mr *FrameReader) error
}

type CompressMode

type CompressMode int
const (
	CompressDisabled CompressMode = iota
	CompressContextTakeover
	CompressNoContextTakeover
)

type DebugLogger

type DebugLogger interface {
	Output(prefix string, msg string)
}

type ExtParam

type ExtParam struct {
	// contains filtered or unexported fields
}

type FlateAlloter

type FlateAlloter struct {
	// contains filtered or unexported fields
}

func NewFlateAlloter

func NewFlateAlloter(level int) *FlateAlloter

func (*FlateAlloter) FlushWriter added in v1.1.7

func (this *FlateAlloter) FlushWriter(fw io.WriteCloser) error

func (*FlateAlloter) NewReader

func (this *FlateAlloter) NewReader(mr *FrameReader) (io.ReadCloser, error)

func (*FlateAlloter) NewWriter

func (this *FlateAlloter) NewWriter(mw *FrameWriter) (io.WriteCloser, error)

func (*FlateAlloter) ResetReader

func (this *FlateAlloter) ResetReader(fr io.ReadCloser, mr *FrameReader) error

func (*FlateAlloter) ResetWriter

func (this *FlateAlloter) ResetWriter(fw io.WriteCloser, mw *FrameWriter) error

func (*FlateAlloter) WebsocketExtension added in v1.1.7

func (this *FlateAlloter) WebsocketExtension(params []string) string

type FrameHeader

type FrameHeader struct {
	// contains filtered or unexported fields
}

type FrameReader

type FrameReader struct {
	// contains filtered or unexported fields
}

func (*FrameReader) BufferSize

func (mr *FrameReader) BufferSize() int

func (*FrameReader) Close

func (mr *FrameReader) Close() error

func (*FrameReader) Init

func (mr *FrameReader) Init(ws *WebSocket, br *bufio.Reader)

func (*FrameReader) Read

func (mr *FrameReader) Read(p []byte) (int, error)

mr.extra是结束标志:"\x01\x00\x00\xff\xff"来防止flate.reader产生:unexpected EOF错误 首先从网络接口读取消息数据,消息数据读取完成后从mr.extra读取压缩数据结束标志

func (*FrameReader) ReadMessagePayload

func (mr *FrameReader) ReadMessagePayload(p []byte) (int, error)

type FrameWriter

type FrameWriter struct {
	// contains filtered or unexported fields
}

func NewFrameWriter

func NewFrameWriter(ws *WebSocket, opcode int) *FrameWriter

func (*FrameWriter) Close

func (w *FrameWriter) Close() error

init后,opcode != (Frame_Continue, Frame_Null) 写数据后,opcode == (Frame_Continue, Frame_Null) colse后,opcode == Frame_Null

colse前检查是否已经发送了结束帧(opcode == Frame_Null), 若已经发送了结束帧,则退出

func (*FrameWriter) GetPayload

func (w *FrameWriter) GetPayload() []byte

func (*FrameWriter) GetPayloadLength

func (w *FrameWriter) GetPayloadLength() int

func (*FrameWriter) Reset

func (w *FrameWriter) Reset(ws *WebSocket, opcode int) error

func (*FrameWriter) SetTrimlength

func (w *FrameWriter) SetTrimlength(trim int)

func (*FrameWriter) Write

func (w *FrameWriter) Write(p []byte) (int, error)

func (*FrameWriter) WriteAll

func (w *FrameWriter) WriteAll(data []byte) error

func (*FrameWriter) WriteControlFrame

func (w *FrameWriter) WriteControlFrame(data []byte) error

func (*FrameWriter) WriteString

func (w *FrameWriter) WriteString(str string) error

type MessageHandler

type MessageHandler interface {
	SocketHandler
	OnMessage(ws *WebSocket, opcode int, buff *ByteBuffer) error
}

type MessageReader

type MessageReader struct {
	// contains filtered or unexported fields
}

func (*MessageReader) Close

func (mr *MessageReader) Close() error

func (*MessageReader) Read

func (mr *MessageReader) Read(p []byte) (int, error)

func (*MessageReader) ReadAll

func (mr *MessageReader) ReadAll() (*ByteBuffer, error)

type MessageWriter

type MessageWriter struct {
	// contains filtered or unexported fields
}

func (*MessageWriter) Close

func (w *MessageWriter) Close() error

func (*MessageWriter) Write

func (w *MessageWriter) Write(p []byte) (int, error)

func (*MessageWriter) WriteAll

func (w *MessageWriter) WriteAll(data []byte) error

func (*MessageWriter) WriteControlFrame

func (w *MessageWriter) WriteControlFrame(data []byte) error

func (*MessageWriter) WriteString

func (w *MessageWriter) WriteString(str string) error

type OriginCheckFunc added in v1.1.7

type OriginCheckFunc func(r *http.Request) bool

type SimpleLogger

type SimpleLogger struct {
	// contains filtered or unexported fields
}

func NewSimpleLogger

func NewSimpleLogger() *SimpleLogger

func (*SimpleLogger) Output

func (lg *SimpleLogger) Output(prefix string, msg string)

type SocketHandler

type SocketHandler interface {
	OnClose(ws *WebSocket)
}

type TokenInfo

type TokenInfo struct {
	Value string
	Index int
	Flag  int
}

type WebSocket

type WebSocket struct {
	// contains filtered or unexported fields
}

func Accept

func Accept(w http.ResponseWriter, r *http.Request, opts *AcceptOptions, headers map[string]string) (*WebSocket, error)

func NewWebSocket

func NewWebSocket(cnn net.Conn, opts *AcceptOptions, br *bufio.Reader) *WebSocket

func (*WebSocket) Close

func (ws *WebSocket) Close() error

func (*WebSocket) CloseHandshake added in v1.1.8

func (ws *WebSocket) CloseHandshake(code CloseCode, text string) error

func (*WebSocket) LocalAddr

func (ws *WebSocket) LocalAddr() net.Addr

func (*WebSocket) NextReader

func (ws *WebSocket) NextReader() (FrameHeader, *MessageReader, error)

func (*WebSocket) NextWriter

func (ws *WebSocket) NextWriter(opcode int) *MessageWriter

获取一个MessageWriter, 若存在已经创建的MessageWriter,则该调用被阻塞

func (*WebSocket) RemoteAddr

func (ws *WebSocket) RemoteAddr() net.Addr

func (*WebSocket) ServeRead added in v1.2.0

func (ws *WebSocket) ServeRead(handler MessageHandler)

func (*WebSocket) SetReadTimeOut added in v1.1.8

func (ws *WebSocket) SetReadTimeOut(readTimeOut time.Duration)

func (*WebSocket) SetWriteTimeOut

func (ws *WebSocket) SetWriteTimeOut(writeTimeOut time.Duration)

func (*WebSocket) WriteBinary

func (ws *WebSocket) WriteBinary(data []byte) error

func (*WebSocket) WriteJSON

func (ws *WebSocket) WriteJSON(v interface{}) error

func (*WebSocket) WriteMessage

func (ws *WebSocket) WriteMessage(opcode int, data []byte) error

func (*WebSocket) WritePing

func (ws *WebSocket) WritePing(data []byte) error

Ping和Pong是websocket里的心跳,用来保证客户端是在线的, 目前浏览器中没有相关api发送ping给服务器,只能由服务器发ping给浏览器,浏览器返回pong消息。

func (*WebSocket) WritePong

func (ws *WebSocket) WritePong(data []byte) error

func (*WebSocket) WriteString added in v1.2.0

func (ws *WebSocket) WriteString(data string) error

func (*WebSocket) WriteText

func (ws *WebSocket) WriteText(data []byte) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL