cs

package module
v1.1.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 21, 2022 License: MIT Imports: 13 Imported by: 0

README

Command Service

Build Status Go Doc Code Coverage License

开箱即用的基于命令的消息处理框架,让 websocket 和 tcp 开发就像 http 那样简单

使用示例

package main
import (
  "net/http"
  "github.com/eyasliu/cs"
  "github.com/eyasliu/cs/xwebsocket"
)

func main() {
  // 初始化 websocket
  ws := xwebsocket.New()
  http.Handle("/ws", ws)

  srv := ws.Srv()
  srv.Use(cs.AccessLogger("MYSRV")). // 打印请求响应日志
            Use(cs.Recover()) // 统一错误处理,消化 panic 错误

  srv.Handle("register", func(c *cs.Context) {
    // 定义请求数据
    var body struct {
      UID  int    `p:"uid" v:"required"`
      Name string `p:"name" v:"required|min:4#必需指定名称|名称长度必需大于4位"`
    }
    // 解析请求数据
    if err := c.Parse(&body); err != nil {
      c.Err(err, 401)
      return
    }
    // 设置会话状态数据
    c.Set("uid", body.UID)
    c.Set("name", body.Name)

    // 响应消息
    c.OK(map[string]interface{}{
      "timestamp": time.Now().Unix(),
    })

    // 给所有连接广播消息
    c.Broadcast(&cs.Response{
      Cmd:  "someone_online",
      Data: body,
    })

    // 往当前连接主动推送消息
    c.Push(&cs.Response{
      Cmd:  "welcome",
      Data: "welcome to register my server",
    })

    // 遍历所有在线会话,获取其他会话的状态,并往指定会话推送消息
    for _, sid := range c.GetAllSID() {
      if c.Srv.GetState(sid, "uid") != nil {
        c.Srv.Push(sid, &cs.Response{
          Cmd:  "firend_online",
          Data: "your firend is online",
        })
      }
    }
  })

  // 分组
  group := srv.Group(func(c *cs.Context) {
    // 过滤指定请求
    if _, ok := c.Get("uid").(int); !ok {
      c.Err(errors.New("unregister session"), 101)
      return
    }
    c.Next()
  })

  group.Handle("userinfo", func(c *cs.Context) {
    uid := c.Get("uid").(int) // 中间件已处理过,可大胆断言
    c.OK(map[string]interface{}{
      "uid": uid,
    })
  })
  go srv.Run()

  http.ListenAndServe(":8080", nil)
}
适配器

用在 websocket

import (
  "net/http"
  "github.com/eyasliu/cs/xwebsocket"
)

func main() {
  ws := xwebsocket.New()
  http.Handler("/ws", ws.Handler)
  srv := ws.Srv(ws)
  go srv.Run()

  http.ListenAndServe(":8080", nil)
}

用在 TCP,使用内置默认协议

import (
  "github.com/eyasliu/cs/xtcp"
)

func main() {
  server := xtcp.New("127.0.0.1:8520")
  srv, err := server.Srv()
  if err != nil {
    panic(err)
  }

  srv.Run() // 阻塞运行
}

用在 HTTP,支持请求响应,支持服务器主动推送

import (
  "net/http"
  "github.com/eyasliu/cs"
  "github.com/eyasliu/cs/xhttp"
)

func main() {
  server := xhttp.New()
  http.Handle("/cmd", server)
  http.HandleFunc("/cmd2", server.Handler)
  srv := server.Srv()
  go http.ListenAndServe(":8080", nil)
	
  srv.Run()
}

多个适配器混用,让 websocket, tcp, http 共用同一套逻辑

import (
  "net/http"
  "github.com/eyasliu/cs"
  "github.com/eyasliu/cs/xhttp"
  "github.com/eyasliu/cs/xwebsocket"
  "github.com/eyasliu/cs/xtcp"
)

func main() {
  // http adapter
  server := xhttp.New()
  http.Handle("/cmd", server)
  http.HandleFunc("/cmd2", server.Handler)
  
  // websocket adapter
  ws := xwebsocket.New()
  http.Handle("/ws", server)

  // tcp adapter
  tcp := xtcp.New()

  // boot srv
  go tcp.Run()
  go http.ListenAndServe(":8080", nil)

  srv := cs.New(server, ws, tcp)
  srv.Run() // 阻塞运行
}
$ curl -XPOST -H"Content-Type:application/json" --data '{"cmd":"register", "data":{"uid": 101, "name": "eyasliu"}}' http://localhost:8080/cmd
{"cmd":"register","data":{"timestamp": 1610960488}}

实现过程

在开发 websocket 和 tcp 的时候,对于长连接的消息处理都需要手动处理,并没有类似于 http 的路由那么方便,于是就想要实现一个可以处理该类消息的工具。

在长连接的开发中,经常遇到的一些问题:

  1. 每个连接会话在连接后都需要注册,以标识该连接的用途
  2. 每个请求都需要处理,并且保证一定有响应
  3. 往连接主动推送消息
  4. 给所有连接广播消息
  5. 消息处理的代码不够优雅,太多 switch case 等样板代码
  6. 请求的数据解析不好写

实现方案:

在 websocket 和 tcp 中,每个连接都抽象成一个字符串 SID, 即 Session ID, cs 只负责处理消息,不处理连接的任何状态,与连接和状态相关的操作全都以 interface 定义好,给各种工具去实现

API

GoDoc

License

cs is released under the MIT License.

(c) 2020-2021 Eyas Liu liuyuesongde@163.com

Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

Documentation

Index

Constants

View Source
const (
	// CmdConnected on connection connected
	CmdConnected = "__cs_connected__"
	// CmdClosed on connection closed
	CmdClosed = "__cs_closed__"
	// CmdHeartbeat heartbeat message
	CmdHeartbeat = "__cs_heartbeat__"
)

内置命令

Variables

This section is empty.

Functions

func RouteNotFound

func RouteNotFound(c *Context)

RouteNotFound 当路由没匹配到时的默认处理函数

Types

type Context

type Context struct {
	*Response
	SID    string
	Srv    *Srv
	Server ServerAdapter
	// contains filtered or unexported fields
}

Context 处理函数的的上下文对象,中间件和路由的函数参数

func (*Context) Abort

func (c *Context) Abort()

Abort 中断更里层的中间件调用 该方法应该只用在中间件函数使用

func (*Context) Broadcast

func (c *Context) Broadcast(data *Response)

Broadcast 广播消息,即给所有有效的会话推送消息

func (*Context) Close

func (c *Context) Close() error

Close 关闭当前会话连接

func (*Context) Err

func (c *Context) Err(err error, code int)

Err 响应错误,如果错误对象为空则忽略不处理

func (*Context) Exit added in v1.1.0

func (c *Context) Exit(code int)

Exit 终止后续逻辑执行, code 错误码

func (*Context) Get

func (c *Context) Get(key string) interface{}

Get 获取当前会话的状态, 注意:这是会话的状态,而不是当前请求函数的状态(和HTTP那边不一样)

func (*Context) GetAllSID

func (c *Context) GetAllSID() []string

GetAllSID 获取目前生效的所有会话ID

func (*Context) GetServerAllSID

func (c *Context) GetServerAllSID() []string

GetServerAllSID 获取当前适配器中生效的所有会话ID

func (*Context) GetState

func (c *Context) GetState(sid, key string) interface{}

GetState 获取指定 sid 和 key 的状态值

func (*Context) IfErrExit added in v1.1.0

func (c *Context) IfErrExit(err error, code int)

IfErrExit 如果 err 不为空,则中断执行并直接返回

func (*Context) Next

func (c *Context) Next()

Next 调用下一层中间件。 中间件的调用是按照洋葱模型调用,该方法应该只用在中间件函数使用

func (*Context) OK

func (c *Context) OK(data ...interface{})

OK 响应成功,参数是指定响应的 data 数据,如果不设置则默认为空对象 c.OK() 响应 {} c.OK(nill) 响应 null c.OK(map[string]interface{}{"x": 1}) 响应 {"x": 1}

func (*Context) Parse

func (c *Context) Parse(pointer interface{}, mapping ...map[string]string) error

Parse 解析并验证消息携带的参数,参考 Goframe 的 请求输入-对象处理 https://itician.org/pages/viewpage.action?pageId=1114185 支持 json 和 xml 数据流 支持将数据解析为 *struct/**struct/*[]struct/*[]*struct/*map/*[]map 如果目标值是 *struct/**struct/*[]struct/*[]*struct ,则会自动调用请求验证,参考 GoFrame 的 请求输入-请求校验 https://itician.org/pages/viewpage.action?pageId=1114244

func (*Context) Push

func (c *Context) Push(data *Response) error

Push 往当前会话推送消息

func (*Context) PushSID

func (c *Context) PushSID(sid string, data *Response) error

PushSID 往指定SID会话推送消息

func (*Context) Resp

func (c *Context) Resp(code int, msg string, data ...interface{})

Resp 设置响应

func (*Context) Set

func (c *Context) Set(key string, v interface{})

Set 设置当前会话的状态

func (*Context) SetState

func (c *Context) SetState(sid, key string, v interface{})

SetState 设置当前会话的状态

type HandlerFunc

type HandlerFunc = func(*Context)

HandlerFunc 消息处理函数,中间件和路由的函数签名

func Heartbeat

func Heartbeat(timeout time.Duration, srv *Srv) HandlerFunc

Heartbeat 会话心跳维护,如果会话在指定周期内没有发送任何数据,则关闭该连接会话 timeout 心跳过期时长,指定当会话间隔 重置心跳过期时间,当接收到了会话的任意命令,都会重置

func Recover

func Recover() HandlerFunc

Recover 错误处理中间件 当处理函数发生 panic 时在该中间件恢复,并根据panic 的内容默认处理响应数据

type PushHandlerFunc

type PushHandlerFunc = func(*Context) error

type Request

type Request struct {
	Cmd     string          // message command, use for route
	Seqno   string          // seq number,the request id
	RawData json.RawMessage // request raw []byte data
}

Request request message

type Response

type Response struct {
	*Request             // reply the Request
	Cmd      string      // message command, use for route
	Seqno    string      // seq number,the request id
	Code     int         // response status code
	Msg      string      // response status message text
	Data     interface{} // response data
}

Response reply Request message

type ServerAdapter

type ServerAdapter interface {
	// Write send response message to connect
	Write(sid string, resp *Response) error
	// Read read message form connect
	Read(*Srv) (sid string, req *Request, err error)
	// Close close specify connect
	Close(sid string) error

	// GetAllSID get server all sid
	GetAllSID() []string
}

ServerAdapter defined integer to srv server

type Srv

type Srv struct {
	Server []ServerAdapter // 服务器适配器
	// contains filtered or unexported fields
}

Srv 基于命令的消息处理框架

func New

func New(server ...ServerAdapter) *Srv

New 指定服务器实例化一个消息服务

func (*Srv) AccessLogger

func (s *Srv) AccessLogger(args ...interface{}) HandlerFunc

AccessLogger 打印请求响应中间件 2 个可选参数,如果参数是 printLogger 接口类型则用于设置打印日志的 Logger 实例,如果是 string 类型则用于设置日志前缀 AccessLogger("MySRV") 设置名称 AccessLogger("MySRV", logger) 设置名称和打日志的实例 AccessLogger(logger, "MySRV") 设置名称和打日志的实例 AccessLogger(logger) 设置打日志的实例 AccessLogger(123) 无效参数,不会产生异常,等价于没有参数

func (*Srv) AddServer

func (s *Srv) AddServer(server ...ServerAdapter) *Srv

AddServer 增加服务适配器

func (*Srv) Broadcast

func (s *Srv) Broadcast(resp *Response)

Broadcast 往所有可用的会话推送消息

func (*Srv) CallContext

func (s *Srv) CallContext(ctx *Context)

CallContext 调用上下文,触发上下文中间件 应该在实现 adapter 时才有用

func (*Srv) Close

func (s *Srv) Close(sid string) error

Close 关闭指定会话 SID 的连接

func (*Srv) CloseWithServer

func (s *Srv) CloseWithServer(server ServerAdapter, sid string) error

CloseWithServer 关闭指定适配器的指定sid,该方法效率比 Close 高

func (*Srv) GetAllSID

func (s *Srv) GetAllSID() []string

GetAllSID 获取所有适配器的 SID

func (*Srv) GetState

func (s *Srv) GetState(sid, key string) interface{}

GetState 获取指定会话的指定状态值

func (*Srv) Group

func (s *Srv) Group(handlers ...HandlerFunc) *SrvGroup

Group 路由分组,指定该分组下的中间件

func (*Srv) Handle

func (s *Srv) Handle(cmd string, handlers ...HandlerFunc) *Srv

Handle 注册路由,cmd 是命令, handlers 是该路由的处理函数

func (*Srv) Heartbeat

func (srv *Srv) Heartbeat(timeout time.Duration) HandlerFunc

Heartbeat 会话心跳维护,如果会话在指定周期内没有发送任何数据,则关闭该连接会话 timeout 心跳过期时长,指定当会话间隔 重置心跳过期时间,当接收到了会话的任意命令,都会重置

func (*Srv) NewContext

func (s *Srv) NewContext(server ServerAdapter, sid string, req *Request) *Context

NewContext 根据请求消息实例化上下文 应该在实现 adapter 时才有用

func (*Srv) Push

func (s *Srv) Push(sid string, resp *Response) error

Push 往指定的会话 SID 连接推送消息

func (*Srv) PushServer

func (s *Srv) PushServer(server ServerAdapter, sid string, resp *Response) error

PushServer 往指定适配器的 sid 推送消息

func (*Srv) Run

func (s *Srv) Run() error

Run 开始接收命令消息,运行框架,会阻塞当前 goroutine

func (*Srv) SetState

func (s *Srv) SetState(sid, key string, val interface{})

SetState 设置指定连接的状态

func (*Srv) SetStateAdapter

func (s *Srv) SetStateAdapter(adapter gcache.Adapter) *Srv

SetStateAdapter 设置状态管理的存储适配器,默认是存储在内存中,可设置为其他

func (*Srv) SetStateExpire

func (s *Srv) SetStateExpire(t time.Duration) *Srv

SetStateExpire 设置会话的状态有效时长

func (*Srv) Use

func (s *Srv) Use(handlers ...HandlerFunc) *Srv

Use 增加全局中间件

func (*Srv) UsePush

func (s *Srv) UsePush(handlers ...PushHandlerFunc) *Srv

UsePush 增加推送中间件,该类中间件只会在使用 *Context 服务器主动推送的场景下才会被调用,如 Push, Broadcast, PushSID,在请求-响应模式时不会被调用,使用 ctx.Srv 调用也不会被触发

type SrvGroup

type SrvGroup struct {
	// contains filtered or unexported fields
}

SrvGroup 路由组,用于实现分组路由

func (*SrvGroup) Group

func (s *SrvGroup) Group(handlers ...HandlerFunc) *SrvGroup

Group 基于当前分组继续创建分组路由

func (*SrvGroup) Handle

func (s *SrvGroup) Handle(cmd string, handlers ...HandlerFunc) *SrvGroup

Handle 注册路由

func (*SrvGroup) Use

func (s *SrvGroup) Use(handlers ...HandlerFunc) *SrvGroup

Use 在当前分组添加中间件

type State

type State struct {
	// contains filtered or unexported fields
}

State 会话的状态数据管理

func (*State) Get

func (s *State) Get(sid, key string) interface{}

Get 获取指定会话的指定 key 的状态值

func (*State) Set

func (s *State) Set(sid, key string, val interface{})

Set 设置指定会话的状态键值对

func (*State) SetAdapter

func (s *State) SetAdapter(a gcache.Adapter)

SetAdapter 设置会话状态的存储适配器,参考 goframe 的缓存管理适配器 See: https://itician.org/pages/viewpage.action?pageId=1114265

Directories

Path Synopsis
examples

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL