udp

package module
v0.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 23, 2023 License: MIT Imports: 22 Imported by: 0

README

beacon-tower

基于传输层UDP设计和封装的应用层网络传输协议,目的是为高效开发CS架构项目的数据传输功能提供支持。

基于UDP的网络传输协议

udp对要求实时性业务场景很友好,对udp在进一步设计可以有效避免其缺点,保障了完全性,可靠性,完整性; 使用本协议可以高效开发实时性场景,分布式场景,低功耗交互式场景的应用,本协议也在实际场景中得到了验证。

设计:

数据包:

Packet 包设计
______________________________________________________________________
|            |              |             |                           |
| 指令(1字节) |  name(7字节)  | 签名(7字节)  |  data(建议小于533字节)...  |
|____________|______________|_____________|___________________________|

指令: 区分是什么数据 Connect,Put,Reply,Heartbeat,Notice,Get
name: 主要场景s端指定广播,name对应多个ip(节点)
签名: 用于确保数据安全,签名会更具心跳进行动态签发
data: 传输的数据,不支持分包,建议小于533字节,可以在业务中设计分次传输

封包 : 装载数据 -> 压缩 -> 加密  
解包 : 解密 -> 解压 -> 匹配指令 -> 验证签名

是如何提升安全性?

  1. 采用连接认证机制
  2. IP黑白名单
  3. 动态签名机制
  4. 数据加解密

是如何提升可靠性?

  • 心跳与时间轮机制
  • 数据传输确认机制
  • 数据积压机制
  • 数据重传机制

其他?

  • 数据压缩

限制: 数据包应小而独立,大数据包应在业务层进行拆分

基础
S 端有 Notice(通知), Get(获取) 两种通讯方法

Notice

  1. 一对多发送通知
  2. 支持重传
  3. 指定节点发送通知

Get

  1. 获取C端数据
  2. 超时报错
  3. 存储C端的连接信息 一个name对应多个连接地址
  4. 最佳场景是设置每个C端独立名称对应一个连接地址
C 端有 Put(发送), Get(获取) 两种通讯方法

Put

  1. 发送数据包
  2. 积压模式: 每个数据包都会被积压,只有当s端确认接收后清除,当心跳包确认后触发积压数据重传
  3. 积压数据持久化: 积压数据包到达一定量被持久化到磁盘,重传时积压数据小于指定值读取持久化数据一半的数据量
  4. C端收到信号量 SIGTERM, SIGINT, SIGKILL, SIGHUP, SIGQUIT 当前积压数据包全部持久化

Get

  1. 获取C端数据
  2. 超时报错
安全
  1. 使用 DES ECB 对数据包加解密,保障数据被抓包并非明文
  2. 连接Code用于确保两端下发签名的识别
  3. 每次收到心跳包重新颁发签名
  4. 除连接包和心跳包都会确认签名
如何在弱网环境下保障数据的传输可靠性

重传: S端采用通知的方式广播数据包,在此上设计了确认机制,如果在指定时间内收不到C端的确认包就会触发重传,重传是可配置的;

积压: C端采用Put方式上传数据到S端,在此上设计了数据包积压机制,只有当收到S端对应数据包id的确认包到才会将此条数据包移除, 在确认连接成功后触发积压包重传,心跳包的时间节点维护积压数据包的持久化;

例子

servers

package main

import (
	"github.com/mangenotwork/beacon-tower/udp"
	"fmt"
	"os"
	"time"
)

// 保存c端put来的数据
var testFile = "test.txt"

func main() {
	// 初始化 s端
	servers, err := udp.NewServers("0.0.0.0", 12345)
	if err != nil {
		panic(err)
	}
	// 定义put方法
	servers.PutHandleFunc("case1", Case1)
	servers.PutHandleFunc("case2", Case2)
	// 定义get方法
	servers.GetHandleFunc("case3", Case3)
	// 每5秒发送一个通知
	go func() {
		for {
			time.Sleep(5 * time.Second)
			servers.OnLineTable()
			// 发送一个通知
			rse, rseErr := servers.Notice("", "testNotice", []byte("testNotice"),
				servers.SetNoticeRetry(2, 3000))
			if rseErr != nil {
				udp.Error(rseErr)
				continue
			}
			udp.Info(rse)
		}
	}()
	// 启动servers
	servers.Run()
}

func Case1(s *udp.Servers, body []byte) {
	udp.Info("收到的数据: ", string(body))
	// 发送get,获取客户端信息
	rse, err := s.Get("getClient", "", []byte("getClient"))
	if err != nil {
		return
	}
	udp.Info(string(rse), err)
}

func Case2(s *udp.Servers, body []byte) {
	file, err := os.OpenFile(testFile, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
	if err != nil {
		udp.Error(err)
	}
	defer func() {
		_ = file.Close()
	}()
	content := []byte(string(body) + "\n")
	_, err = file.Write(content)
	if err != nil {
		panic(err)
	}
}

func Case3(s *udp.Servers, param []byte) (int, []byte) {
	udp.Info("获取到的请求参数  param = ", string(param))
	return 0, []byte(fmt.Sprintf("服务器名称 %s.", s.GetServersName()))
}

client端

package main

import (
	"github.com/mangenotwork/beacon-tower/udp"
	"fmt"
	"time"
)

func main() {
	// 定义客户端
	client, err := udp.NewClient("192.168.3.86:12345")
	if err != nil {
		panic(err)
	}
	// get方法
	client.GetHandleFunc("getClient", CGetTest)
	// 通知方法
	client.NoticeHandleFunc("testNotice", CNoticeTest)
	// 每两秒发送一些测试数据
	go func() {
		n := 0
		for {
			n++
			time.Sleep(2 * time.Second)
			// put上传数据到服务端的 case2 方法
			client.Put("case2", []byte(fmt.Sprintf("%d | hello : %d", time.Now().UnixNano(), n)))
			udp.Info("n = ", n)
			// get请求服务端的 case3 方法
			rse, err := client.Get("case3", []byte("test"))
			if err != nil {
				udp.Error(err)
				continue
			}
			udp.Info("get 请求返回 = ", string(rse))
		}
	}()

	// 运行客户端
	client.Run()
}

func CGetTest(c *udp.Client, param []byte) (int, []byte) {
	udp.Info("获取到的请求参数  param = ", string(param))
	return 0, []byte(fmt.Sprintf("客户端名称 %s.", c.DefaultClientName))
}

func CNoticeTest(c *udp.Client, data []byte) {
	udp.Info("收到来自服务器的通知,开始执行......")
	udp.Info("data = ", string(data))
}
更多例子
  • 基础例子: _examples/udp_base
  • 一对多: _examples/udp_onemany
  • 安全配置: _examples/udp_security

版本

v0.0.1

  • 基于udp传输协议的基础设计和实现
  • udp传输协议的实例

v0.0.2

Documentation

Index

Constants

View Source
const (
	SignLetterBytes         = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ-_+=~!@#$%^&*()<>{},.?~"
	DefaultConnectCode      = "c"
	DefaultServersName      = "servers"
	DefaultClientName       = "client"
	DefaultSecretKey        = "12345678"
	DefaultSGetTimeOut      = 1000 // 单位 ms
	DefaultNoticeMaxRetry   = 10   // 通知消息最大重试次数
	DefaultNoticeRetryTimer = 100  // 重试等待时间 单位ms
	HeartbeatTime           = 5    // 5s
	HeartbeatTimeLast       = 6    // 6s
	ServersTimeWheel        = 2    // 2s servers 时间轮
)

Variables

View Source
var (
	ErrNmeLengthAbove  = fmt.Errorf("名字不能超过7个长度")
	ErrDataLengthAbove = fmt.Errorf("数据大于 540个字节, 建议拆分")
	ErrNonePacket      = fmt.Errorf("空包")
	ErrSGetTimeOut     = func(label, name, ip string) error {
		return fmt.Errorf("请求客户端 FuncLabel:%s | name:%s | IP:%s 超时", label, name, ip)
	}
	ErrNotFondClient = func(name string) error {
		return fmt.Errorf("未找到客户端 name:%s ", name)
	}
	PanicGetHandleFuncExist = func(label string) {
		panic(fmt.Sprintf("get handle func label:%s is exist.", label))
	}
	PanicPutHandleFuncExist = func(label string) {
		panic(fmt.Sprintf("put handle func label:%s is exist.", label))
	}
	ErrServersSecretKey = fmt.Errorf("秘钥的长度只能为8,并且与Client端统一")
	ErrClientNameErr    = fmt.Errorf("client name 不能含特殊字符 @")
	ErrClientSecretKey  = fmt.Errorf("秘钥的长度只能为8,并且与Servers端统一")
)

err

View Source
var GetDataMap sync.Map
View Source
var LevelMap = map[Level]string{
	1: "[Info]  ",
	4: "[Error] ",
}
View Source
var LogClose bool = true

LogClose 是否关闭日志

View Source
var NoticeDataMap sync.Map

Functions

func BacklogLoad

func BacklogLoad()

BacklogLoad 加载持久化数据 并消费

func ByteToObj

func ByteToObj(data []byte, obj interface{}) error

func CloseLog

func CloseLog()

CloseLog 关闭日志

func DesECBDecrypt

func DesECBDecrypt(data, key []byte) []byte

func DesECBEncrypt

func DesECBEncrypt(data, key []byte) []byte

func Error

func Error(args ...interface{})

func ErrorF

func ErrorF(format string, args ...interface{})

func GzipCompress

func GzipCompress(src []byte) []byte

GzipCompress gzip压缩

func GzipDecompress

func GzipDecompress(src []byte) ([]byte, error)

GzipDecompress gzip解压

func ID64

func ID64() (int64, error)

func Info

func Info(args ...interface{})

func InfoF

func InfoF(format string, args ...interface{})

func ObjToByte

func ObjToByte(obj interface{}) ([]byte, error)

func PacketEncoder

func PacketEncoder(cmd CommandCode, name, sign, secret string, data []byte) ([]byte, error)

PacketEncoder 封包

func SetLogFile

func SetLogFile(name string)

func SignCheck

func SignCheck(addr, sign string) bool

func SignGet

func SignGet(addr string) string

func SignStore

func SignStore(addr, sign string)

func Version

func Version() string

func ZlibCompress

func ZlibCompress(src []byte) []byte

ZlibCompress zlib压缩

func ZlibDecompress

func ZlibDecompress(src []byte) ([]byte, error)

ZlibDecompress zlib解压

Types

type Client

type Client struct {
	ServersHost string       // serversIP:port
	Conn        *net.UDPConn // 连接对象
	SConn       *net.UDPAddr // s端连接信息

	GetHandle    ClientGetFunc    // get方法
	NoticeHandle ClientNoticeFunc // 接收通知的方法
	// contains filtered or unexported fields
}

func NewClient

func NewClient(host string, conf ...ClientConf) (*Client, error)

func (*Client) Close

func (c *Client) Close()

func (*Client) ConnectServers

func (c *Client) ConnectServers()

ConnectServers 请求连接服务器,获取签名 内容是发送 Connect code

func (*Client) DefaultClientName

func (c *Client) DefaultClientName()

func (*Client) DefaultConnectCode

func (c *Client) DefaultConnectCode()

func (*Client) DefaultSecretKey

func (c *Client) DefaultSecretKey()

func (*Client) Get

func (c *Client) Get(funcLabel string, param []byte) ([]byte, error)

func (*Client) GetHandleFunc

func (c *Client) GetHandleFunc(label string, f func(c *Client, param []byte) (int, []byte))

func (*Client) GetName

func (c *Client) GetName() string

func (*Client) GetTimeOut

func (c *Client) GetTimeOut(funcLabel string, param []byte, timeOut int) ([]byte, error)

func (*Client) NoticeHandleFunc

func (c *Client) NoticeHandleFunc(label string, f func(c *Client, data []byte))

func (*Client) Put

func (c *Client) Put(funcLabel string, data []byte)

Put client put 向服务端发送数据,如果服务端未在线数据会被积压,等服务器恢复后积压数据会一并发送

func (*Client) ReplyGet

func (c *Client) ReplyGet(id int64, state int, data []byte)

ReplyGet 返回put state:0x0 成功 state:0x1 签名失败 state:2 业务层面的失败

func (*Client) Run

func (c *Client) Run()

func (*Client) SendBacklog

func (c *Client) SendBacklog()

SendBacklog 发送积压的数据,

func (*Client) SetClientName

func (c *Client) SetClientName(name string) error

func (*Client) SetConnectCode

func (c *Client) SetConnectCode(code string)

func (*Client) SetSecretKey

func (c *Client) SetSecretKey(key string) error

func (*Client) Write

func (c *Client) Write(data []byte)

type ClientConf

type ClientConf struct {
	Name        string
	ConnectCode string
	SecretKey   string // 数据传输加密解密秘钥
}

func SetClientConf

func SetClientConf(clientName, connectCode, secretKey string) ClientConf

type ClientConnInfo

type ClientConnInfo struct {
	Name        string // 客户端名称
	Online      bool   // 是否存活
	IP          string // 连接的地址 ip
	Addr        string // 连接的地址 ip+port
	LastTime    int64  // 最后一次确认数据包加入存活的时间
	DiscardTime int64  // 记录断开的时间
}

type ClientConnectObj

type ClientConnectObj struct {
	IP   string
	Addr *net.UDPAddr
	Last int64 // 最后一次连接的时间
}

type ClientGetFunc

type ClientGetFunc map[string]func(c *Client, param []byte) (int, []byte)

type ClientInfo

type ClientInfo struct {
	Name        string
	Addr        *net.UDPAddr
	Interactive int64
	PacketSize  int
}

type ClientNoticeFunc

type ClientNoticeFunc map[string]func(c *Client, data []byte)

type CommandCode

type CommandCode uint8
const (
	CommandConnect   CommandCode = 0x0 // 首次连接确认身份信息
	CommandPut       CommandCode = 0x1 // 发送消息
	CommandReply     CommandCode = 0x2 // 收到回应, ackType: put, heartbeat, sign
	CommandHeartbeat CommandCode = 0x3 // 发送心跳
	CommandNotice    CommandCode = 0x4 // 下发签名
	CommandGet       CommandCode = 0x5 // 获取消息
)

type GetData

type GetData struct {
	Label string // 标签,用于区分当前数据处理的方法
	Id    int64  // 唯一id
	Param []byte // 传过来的数据

	Response []byte // 返回的数据
	Err      error
	// contains filtered or unexported fields
}

type IdWorker

type IdWorker struct {
	// contains filtered or unexported fields
}

func (*IdWorker) InitIdWorker

func (idw *IdWorker) InitIdWorker(workerId, datacenterId int64) error

func (*IdWorker) NextId

func (idw *IdWorker) NextId() (int64, error)

NextId 返回一个唯一的 INT64 ID

type Level

type Level int

type NoticeData

type NoticeData struct {
	Label string // 标签,用于区分当前数据处理的方法
	Id    int64  // 唯一id
	Data  []byte // 通知内容

	Response []byte // 返回的数据
	Err      error
	// contains filtered or unexported fields
}

type NoticeRetry

type NoticeRetry struct {
	TimeOutTimer time.Duration // 通知消息超时时间 10s > 重试*重试时间
	MaxRetry     int           // 通知消息最大重试次数
	RetryTimer   time.Duration // 重试等待时间
}

type Packet

type Packet struct {
	Command CommandCode
	Name    string
	Sign    string
	Data    []byte
}

func PacketDecrypt

func PacketDecrypt(secret string, data []byte, n int) (*Packet, error)

PacketDecrypt 解包

type PutData

type PutData struct {
	Label string // 标签,用于区分当前数据处理的方法
	Id    int64  // 唯一id
	Body  []byte // 传过来的数据
}

type Reply

type Reply struct {
	Type      int
	CtxId     int64 // 数据包上下文的交互id
	Data      []byte
	StateCode int // 状态码  0:成功  1:认证失败  2:自定义错误
}

type Servers

type Servers struct {
	Addr string       // 地址 默认0.0.0.0
	Port int          // 端口
	Conn *net.UDPConn // S端的UDP连接对象

	CMap map[string]map[string]*ClientConnectObj // 存放客户端连接信息  map:name -> map:ipaddr -> obj

	PutHandle ServersPutFunc // PUT类型方法
	GetHandle ServersGetFunc // GET类型方法
	// contains filtered or unexported fields
}

func NewServers

func NewServers(addr string, port int, conf ...ServersConf) (*Servers, error)

func (*Servers) ClientDiscard

func (s *Servers) ClientDiscard(name, ip string)

func (*Servers) DefaultConnectCode

func (s *Servers) DefaultConnectCode()

func (*Servers) DefaultSecretKey

func (s *Servers) DefaultSecretKey()

func (*Servers) DefaultServersName

func (s *Servers) DefaultServersName()

func (*Servers) Get

func (s *Servers) Get(funcLabel, name string, param []byte) ([]byte, error)

func (*Servers) GetAtIP

func (s *Servers) GetAtIP(funcLabel, name, ip string, param []byte) ([]byte, error)

func (*Servers) GetAtIPTimeOut

func (s *Servers) GetAtIPTimeOut(timeOut int, funcLabel, name, ip string, param []byte) ([]byte, error)

func (*Servers) GetAtNameTimeOut

func (s *Servers) GetAtNameTimeOut(timeOut int, funcLabel, name string, param []byte) ([]byte, error)

func (*Servers) GetClientAllName

func (s *Servers) GetClientAllName() []string

func (*Servers) GetClientConn

func (s *Servers) GetClientConn(name string) (map[string]*ClientConnectObj, bool)

func (*Servers) GetClientConnFromIP

func (s *Servers) GetClientConnFromIP(name, ip string) (*net.UDPAddr, bool)

func (*Servers) GetHandleFunc

func (s *Servers) GetHandleFunc(label string, f func(s *Servers, param []byte) (int, []byte))

func (*Servers) GetServersName

func (s *Servers) GetServersName() string

func (*Servers) Notice

func (s *Servers) Notice(name, label string, data []byte, retryConf *NoticeRetry) (string, error)

Notice 通知方法:针对 name,对Client发送通知 特点: 1. 重试次数 2. 指定时间内重试

func (*Servers) NoticeAll

func (s *Servers) NoticeAll(label string, data []byte, retryConf *NoticeRetry)

func (*Servers) OnLineTable

func (s *Servers) OnLineTable() map[string]*ClientConnInfo

OnLineTable 获取当前客户端连接情况

func (*Servers) PutHandleFunc

func (s *Servers) PutHandleFunc(label string, f func(s *Servers, c *ClientInfo, body []byte))

func (*Servers) ReplyGet

func (s *Servers) ReplyGet(client *net.UDPAddr, id int64, state int, data []byte)

ReplyGet 返回put state:0x0 成功 state:0x1 签名失败 state:2 业务层面的失败

func (*Servers) ReplyPut

func (s *Servers) ReplyPut(client *net.UDPAddr, id, state int64)

ReplyPut 响应put state:0x0 成功 state:0x1 签名失败

func (*Servers) Run

func (s *Servers) Run()

func (*Servers) SetConnectCode

func (s *Servers) SetConnectCode(code string)

func (*Servers) SetNoticeRetry

func (s *Servers) SetNoticeRetry(maxRetry, retryTimer int) *NoticeRetry

SetNoticeRetry retryTimer 单位ms

func (*Servers) SetSecretKey

func (s *Servers) SetSecretKey(key string) error

func (*Servers) SetServersName

func (s *Servers) SetServersName(name string) error

func (*Servers) Write

func (s *Servers) Write(client *net.UDPAddr, data []byte)

type ServersConf

type ServersConf struct {
	Name        string // servers端的名称
	ConnectCode string // 连接code 是静态的由server端配发
	SecretKey   string // 数据传输加密解密秘钥 8个字节
}

func SetServersConf

func SetServersConf(serversName, connectCode, secretKey string) ServersConf

type ServersGetFunc

type ServersGetFunc map[string]func(s *Servers, param []byte) (int, []byte)

type ServersPutFunc

type ServersPutFunc map[string]func(s *Servers, c *ClientInfo, data []byte)

Directories

Path Synopsis
_examples

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL