gstatem

package module
v0.0.0-...-b095219 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 18, 2023 License: Apache-2.0 Imports: 7 Imported by: 0

README

gstatem


一、前言

Golang 语言初学者, 习惯了写 Erlang 程序, 想试试使用 Golang 能否实现相似的流程

为什么?

通常 Go Routine 用于单个功能实现, 一个任务会根据功能类别拆分成多个 Go Routine 同步进行逻辑处理
特别是 http 服务是无状态的, 一次 http 请求可能会拆成多个 Go Routine, 完成任务后通过 context 维护这些协程的生命周期
而下次相同或类似的请求, 会把之前的步骤再来一遍, 但如果是做某些实时共享状态和数据的服务可能就不太适合?
比如游戏服务很多时候有玩家或房间等数据状态需要持续读写和同步, 并用状态来控制游戏的流程和限制
至此参照 Erlang OTP gen_statem 状态机进程服务思路, 编写了 Golang gstatem

区别?

状态机服务协程可以为某个上下文缓存数据和状态, 根据收到的 消息类型消息内容 和 协程当前的状态 更新缓存的数据
无需频繁的读写数据库, 比如某个游戏的用户在很多情况下数据是临时的, 而到了某个状态用户的部分数据才需要做入库持久化
又比如用户可以有自己的协程和n个业务逻辑协程, 用户协程用于维护用户数据, 业务协程用于维护该用户的业务逻辑

特色?

无论是 Erlang OTP gen_statem 还是 Golang gstatem, 相较于 Golang 的多个 Go Routine 同步进行, 前者是线性的
Erlang OTP gen_statemGolang gstatemactor model, 可以理解为一个邮箱, 每次都是根据先来后到的顺序规则一个个读取和处理邮件
数据的处理只需要读取已缓存的上下文的部分数据即可, 而无需频繁读取数据库和做重复的前置流程
Erlang OTP gen_statem 一样, 为协程提供同步异步等通信方案, 同时可以控制协程携带的数据和状态
并且每次处理完数据后, 可以根据业务需求决定上下文接下来处于什么状态, 多久后进入什么状态, 从而控制业务流程


二、例子

参考用于调试的 gstatem_example 仓库


三、使用

go get gitee.com/gelomen/gstatem

四、父级上下文

先创建属于整个程序的顶级上下文: topCtx, 并创建属于某类服务的父级上下文: fooParentCtx

例:

package main

func main() {
    // 启动顶级上下文
    topCtx, topCancel := context.WithCancel(context.Background())
    // 启动 foo server 父级上下文
    fooParentCtx, fooParentCancel := foo_server.StartServerParent(topCtx)
    ...
}

创建某类型服务的父级 context 时, 定义用于处理服务的函数:

  • Name, 该类服务父级名字
  • InitFunc, 初始化函数, 用于自定义该类服务的初始化数据
  • CastFunc, 异步消息处理函数, 用于自定义该类服务处理 gstatem.Cast 消息
  • CallFunc, 同步消息处理函数, 用于自定义该类服务处理 gstatem.Call 消息
  • InfoFunc, 同步消息处理函数, 用于自定义该类服务处理 gstatem.Info 消息
  • StateTimeoutFunc, 状态超时消息处理函数, 用于自定义该类服务 Action 状态超时消息
  • TerminateFunc, 终结函数, 用于自定义处理协程关闭是的清理工作

例:

package foo_server

// StartServerParent 启动 foo server 父级上下文
func StartServerParent(parentCtx context.Context) (context.Context, context.CancelFunc) {
    ParentData := gstatem.ParentData{
        Name:             "FooServer",
        InitFunc:         HandleInit,
        CastFunc:         HandleCast,
        CallFunc:         HandleCall,
        InfoFunc:         HandleInfo,
        StateTimeoutFunc: HandleStateTimeout,
        TerminateFunc:    HandleTerminate,
    }
    return gstatem.ParentStart(parentCtx, ParentData)
}

五、子级上下文

创建某类型服务子级上下文和协程

  • 根据实际需求, 自定义传入该子级上下文的初始化数据
  • 初始化 gstatem.StateData, 传入初始化数据和协程通道大小
  • 自定义协程通道大小必须大于 0, 否则使用默认值 1000
  • 调用该函数后, 若第三个返回值返回 true, 则代表协程创建成功, 并返回该子级上下文和子级取消函数

例:

package foo_server

// StartServer 启动服务
func StartServer(parentCtx context.Context, name string) (context.Context, context.CancelFunc, bool) {
    initData := initDataS{fooName: name}
    statemData := &gstatem.Data{
        InitData:    initData,
        MsgChanSize: 2000,
    }
    return gstatem.ChildStart(parentCtx, statemData)
}

六、Context / Cancel 存储

每个上下文有属于自己的 n 个上下文, 这就需要一个全局变量来做上下文存储, 以便在每次处理请求时能复用已经创建过的上下文和协程 key 为属于该服务器类型和唯一标识组成的任意类型

1. 保存

传入 key, 以及已经创建的子级上下文和子级取消函数

package gstatem_context

// Set 保存某个唯一标识的上下文存储信息
func Set(key any, context context.Context, cancel context.CancelFunc) {
    ctxData := contextData{
        context: context,
        cancel:  cancel,
    }
    contextStore.Store(key, ctxData)
}
2. 删除

传入 key

package gstatem_context

// Del 删除某个唯一标识的上下文存储信息
func Del(key any) {
    contextStore.Delete(key)
}
3. 同时获取

传入 key, 返回上下文、取消函数 和 是否存在字段

// Get 同时获取某个唯一标识的 context.Context 和 context.CancelFunc
func Get(key any) (context.Context, context.CancelFunc, bool) {
    if ctxDataAny, ok := contextStore.Load(key); ok {
        ctxData := toContextData(ctxDataAny)
        ctx := ctxData.context
        cancel := ctxData.cancel
        return ctx, cancel, true
    }
    return nil, nil, false
}
4. 获取 Context

传入 key, 返回上下文和是否存在的字段

// Context 获取某个唯一标识的 context.Context
func Context(key any) (context.Context, bool) {
    if ctxDataAny, ok := contextStore.Load(key); ok {
        ctxData := toContextData(ctxDataAny)
        ctx := ctxData.context
        return ctx, true
    }
    return nil, false
}
5. 获取 CancelFunc

传入 key, 返回取消函数和是否存在的字段

// Cancel 获取某个唯一标识的 context.CancelFunc
func Cancel(key any) (context.CancelFunc, bool) {
    if ctxDataAny, ok := contextStore.Load(key); ok {
        ctxData := toContextData(ctxDataAny)
        cancel := ctxData.cancel
        return cancel, true
    }
    return nil, false
}

七、创建协程

  • 根据当前服务类型和请求发起者唯一标识, 组合成任意类型的唯一 key
  • 在请求发起者请求数据时, 根据 key 查找上下文, 从而复用或创建新的协程

例:

func foo(parentCtx context.Context, key any) (context.Context, context.cancelFunc, bool) {
    // 根据服务类型和请求发起者标识组成的key获取上下文和取消函数
    if ctx, cancel, ok := gstatem_context.Get(key); ok {
        // 找得到上下文, 复用上下文和协程
        return ctx, cancel, true
    } else {
        // 找不到上下文, 创建新的协程和上下文
        ctx, cancel, ok := foo_server:StartServer(parentCtx, key)
        if !ok {
            // 创建协程失败
            return nil, nil, false
        }
        // 创建协程成功, 保存上下文和取消函数
        gstatem_context.Set(key, ctx, cancel)
        return ctx, cancle, true
    }
}

八、协程消息

有异步消息、同步消息、普通消息等多种请求方式

1. 异步消息

发送消息到指定上下文的协程, 无需等待上下文协程返回值, 协程的父级上下文定义的 gstatem.ParentData.CastFunc 回调函数会收到消息

package gstatem

// Cast 异步消息
func Cast(ctx context.Context, castData any) {
    statemData := ctx.Value(DataKey).(*Data)
    statemMsg := msg{Type: msgTypeCast, Msg: castData}
    go func() { _ = try(func() { statemData.channel <- statemMsg }) }()
}

将上下文和请求数据传入即可, 请求数据的结构由具体项目指定

2. 同步消息

发送消息到指定上下文的协程, 并等待上下文协程返回值, 协程的父级上下文定义的 gstatem.ParentData.CallFunc 回调函数会收到消息

package gstatem

// Call 同步消息
func Call(ctx context.Context, callData any, timeouts ...duration) (any, error) {
    statemData := ctx.Value(DataKey).(*Data)

    // 创建响应通道
    replyChan := make(chan any)
    defer close(replyChan)

    // 发送消息
    statemMsg := msg{Type: msgTypeCall, Msg: callData, replyChan: replyChan}
    err := try(func() { go func() { statemData.channel <- statemMsg }() })
    if err != nil {
        return nil, err
    }

    // 获取超时时间间隔
    var timeout duration
    if len(timeouts) > 0 {
        timeout = timeouts[0]
    } else {
        timeout = defaultCallTimeout
    }
    // 等待响应
    timeoutCtx, cancel := context.WithTimeout(ctx, timeout)
    defer cancel()
    select {
    case <-timeoutCtx.Done():
        err := fmt.Errorf("Call timeout, callData: %v\n", callData)
        return nil, err
    case reply := <-replyChan:
        return reply, nil
    }
}
3. 普通消息

通常用于协程发送给自己的定时消息, 协程的父级上下文定义的 gstatem.ParentData.InfoFunc 回调函数会收到消息

package gstatem

// Info 内部消息
func Info(ctx context.Context, infoData any) {
    statemData := ctx.Value(DataKey).(*Data)
    statemMsg := msg{Type: msgTypeInfo, Msg: infoData}
    go func() { _ = try(func() { statemData.channel <- statemMsg }) }()
}
4. 定时消息

基于普通消息, 启动定时器调用普通消息函数发送消息, 并返回 *time.Timer, 可调用 *time.Stop() 取消定时
协程的父级上下文定义的 gstatem.ParentData.InfoFunc 回调函数会收到消息

package gstatem

// SendAfter 启动定时器 n 秒后发送消息
func SendAfter(ctx context.Context, n duration, infoData any) *time.Timer {
    timer := time.AfterFunc(n, func() {
        Info(ctx, infoData)
    })
    return timer
}

九、回调函数

实现以下六个回调函数, 并赋值到创建父级上下文的 gstatem.ParentData (这里 父级上下文)

// 初始化回调函数
type initHandlerFunc func(key any, initData any) Return

// 异步消息回调函数
type castHandlerFunc func(ctx context.Context, msg any, state any, serverData any) Return

// 同步消息回调函数
type callHandlerFunc func(ctx context.Context, msg any, state any, serverData any) Return

// 普通消息回调函数
type infoHandlerFunc func(ctx context.Context, msg any, state any, serverData any) Return

// 状态超时回调函数
type stateTimeoutHandlerFunc func(ctx context.Context, msg any, state any, serverData any) Return

// 终结回调函数
type terminateHandlerFunc func(closeMsg CloseMsg, state any, serverData any)
1. 初始化回调函数

实现协程初始化回调函数逻辑

  • 先查询子级上下文(当前协程)唯一标识是否已存在, 若存在则结束协程, 该判断为可选
  • 获取创建协程时的初始化数据, 组合成当前项目协程的缓存数据
  • 返还初始化成功或失败, 若成功要返回新状态和服务数据

例:

// HandleInit 协程初始化
func HandleInit(key any, sourceInitData any) gstatem.Return {
    // 查询是否已经存在相同的协程(非必要)
    if _, ok := gstatem_context.Context(key); ok {
        // 已经存在, 关闭协程
        return gstatem.Return{
            Type: gstatem.ReturnTypeStop,
        }
    } else {
        // 不存在, 初始化服务数据
        initData := sourceInitData.(initDataS)
        ...
        fooServerData := &fooServerData{
            ...
        }
        ...
        return gstatem.Return{
            Type:       gstatem.ReturnTypeInitOk,
            NewState:   FooSrvStateXXX,
            ServerData: fooServerData,
        }
    }
}

如上所示, 初始化可以返回两种类型

gstatem.ReturnTypeInitOk
// 或
gstatem.ReturnTypeStop
2. 异步消息回调函数
  • 自定义异步消息请求数据的结构 fooReqMsgType
  • 将收到的消息转为该自定义结构
  • 根据自定义结构取值和处理逻辑

例:

// FooCast 异步消息
func FooCast(ctx context.Context) {
    msg := make(fooReqMsgType)
    // 组合 msg 数据, 有当前项目和 `fooReqMsgType` 结构决定
    ...
    gstatem.Cast(ctx, msg)
}

该上下文协程的 HandleCast 会收到消息

例:

// HandleCast 异步消息回调函数
func HandleCast(ctx context.Context, msg any, state any, serverData any) gstatem.Return {
    fooServerData := serverData.(*fooServerData)
    fooMsg := msg.(fooReqMsgType)
    statemReturn := gstatem.Return{}
    ...
    return statemReturn
}
3. 同步消息回调函数
  • 自定义同步消息请求数据的结构 fooReqMsgType
  • 将收到的消息转为该自定义结构
  • 根据自定义结构取值和处理逻辑

例:

// FooCall 同步消息
func FooCall(ctx context.Context, context *gin.Context) (any, error) {
    msg := make(fooReqMsgType)
    // 组合 msg 数据, 由当前项目和 `fooReqMsgType` 结构决定
    ...
    return gstatem.Call(ctx, msg)
}

该上下文协程的 HandleCall 会收到消息

例:

// HandleCall 同步消息回调函数
func HandleCall(ctx context.Context, msg any, state any, serverData any) gstatem.Return {
    fooServerData := serverData.(fooServerData)
    fooMsg := msg.(fooReqMsgType)
    statemReturn := gstatem.Return{}
    ...
    return statemReturn
}
4. 普通消息回调函数
  • 自定义同步消息请求数据的结构 fooReqMsgType
  • 将收到的消息转为该自定义结构
  • 根据自定义结构取值和处理逻辑

例:

// FooInfo 普通消息
func FooInfo(ctx context.Context, context *gin.Context) {
    msg := make(fooReqMsgType)
    // 组合 msg 数据, 有当前项目和 `fooReqMsgType` 结构决定
    ...
    gstatem.Info(ctx, msg)
}

该上下文协程的 HandleInfo 会收到消息

例:

// HandleInfo 普通消息回调函数
func HandleInfo(ctx context.Context, msg any, state any, serverData any) gstatem.Return {
    fooServerData := serverData.(fooServerData)
    fooMsg := msg.(fooReqMsgType)
    statemReturn := gstatem.Return{}
    ...
    return statemReturn
}
5. 状态超时回调函数
  • 由回调函数的返回值 gstatem.Return.Action 触发
  • Action 说明 回调返回动作

例:

// HandleStateTimeout 状态超时回调函数
func HandleStateTimeout(ctx context.Context, msg any, state any, serverData any) gstatem.Return {
    fooServerData := serverData.(fooServerData)
    fooTimeoutMsg := msg.(fooStateTimeoutMsg)
    statemReturn := gstatem.Return{}
    ...
    return statemReturn
}
6. 终结回调函数

协程关闭前会调用该回调函数, 以便协程做清理工作

例:

// HandleTerminate 协程终结
func HandleTerminate(closeMsg gstatem.CloseMsg, state any, serverData any) {
    fooServerData := serverData.(fooServerData)
    ...
    log.Infof("Book server userName: %s terminal, state: %s, closeMsg: %s", serverData.userName, state, closeMsg)
}

十、回调返回

回调函数的返回统一结果:

// Return statem server 回调返回结构
type Return struct {
    Type       returnType // 回调返回类型
    NewState   any        // 新状态
    ServerData any        // 服务数据
    Action     Action     // 回调返回动作
    ReplyMsg   any        // 响应消息

    msgType   msgType  // 消息类型
    replyChan chan any // 响应通道
}

// Action statem server 回调返回动作
type Action struct {
    ActionType actionType // 动作类型
    StateTime  duration   // 状态时间
    TimeoutMsg any        // 超时消息
}
1. 类型
// statem server 回调返回类型
type returnType = int8
const (
    ReturnTypeInitOk           = iota + 1 // 初始化完成
    ReturnTypeKeepState                   // 保持状态
    ReturnTypeNextState                   // 下个状态
    ReturnTypeKeepStateAndData            // 保持状态和数据
    ReturnTypeStop                        // 停止
)
1) 初始化完成

例:

gstatem.Return{
    Type: gstatem.ReturnTypeInitOk,
}

只用于初始化回调函数返回, 当返回该类型时, 协程就正常创建成功, 同时 gstatem 会读取 Return.NewStateReturn.ServerData 并更新

2) 保持状态

例:

gstatem.Return{
    Type: gstatem.ReturnTypeKeepState,
}

当返回该类型时, 表示协程状态不变, gstatem 不会读取 Return.NewState 字段, 但依然会读取 Return.ServerData 更新服务数据

3) 下个状态

例:

gstatem.Return{
    Type: gstatem.ReturnTypeNextState,
}

当返回该类型时, 表示协程进入新的状态, 同时 gstatem 会读取 Return.NewStateReturn.ServerData 并更新

4) 保持状态和数据

例:

gstatem.Return{
    Type: gstatem.ReturnTypeKeepStateAndData,
}

当返回该类型时, 表示协程什么都不改变, gstatem 什么都不读取和更新

例:

5) 停止
gstatem.Return{
    Type: gstatem.ReturnTypeStop,
}

当返回该类型时, 表现关闭协程, 同时 gstatem 会读取 Return.NewStateReturn.ServerData 并更新

2. 状态

状态的类型是 any, 由具体项目决定状态的类型

例:

gstatem.Return{
    NewState: fooServerStateXXX,
}
3. 服务数据

服务数据的类型是 any, 由具体项目决定状态的类型, 是保存协程缓存数据的结构

例:

gstatem.Return{
    ServerData: fooServerData
}
4. 动作

动作用于控制协程服务的状态, 比如多久后状态超时并进入某个状态, 或者取消当前的状态定时

gstatem.Return{
    Action: gstatem.Action{
        ActionType actionType // 动作类型
        StateTime  duration   // 状态时间
        TimeoutMsg any        // 超时消息
    }
}

// ActionType statem server 回调返回动作类型
type ActionType = int8
const (
    ActionTypeStateTimeout       = iota + 1 // 当前状态定时
    ActionTypeCancelStateTimeout            // 取消状态定时
)
1) 状态定时

设置动作类型为 ActionTypeStateTimeout, 指定状态持续时间 StateTime 和 超时后收到消息 TimeoutMsg

例:

gstatem.Action{
    ActionType: ActionTypeStateTimeout,
    StateTime: 3 * time.Second,
    TimeoutMsg: fooStateTimeoutMsg,
}

当时间到时, 状态超时回调函数将会收到消息, 此时可以进行状态的变化或其他逻辑处理 状态超时回调函数

2) 取消定时

设置动作类型为 ActionTypeCancelStateTimeout, 其他无需设置

例:

gstatem.Action{
    ActionType: ActionTypeCancelStateTimeout,
}

此时会把协程的状态定时取消, 状态超时回调函数也不会收到消息

5. 消息响应
  • 当消息类型为 Call 同步类型时, 会获取 gstatem.Return 里的 ReplyMsgreplyChan 进行响应通道消息发送
  • Call 请求逻辑处理, 在最后 gstatem.ReturnReplyMsg 响应消息知道赋值即可

例:

package foo

// HandleCall 同步消息回调函数
func HandleCall(ctx context.Context, msg any, state any, serverData any) gstatem.Return {
    serverData := serverData.(fooServerData)
    fooMsg := msg.(ReqMsgType)
    statemReturn := gstatem.Return{}
    ...
    replyMsg := fooRespMsgType
    statemReturn.ReplyMsg = replyMsg
    ...
    return statemReturn
}

十一、上下文和通道清理

协程在关闭时会自动调用上下文的 context.CancelFunc 和 关闭该协程的通道, 无需具体服务在终结回调函数处理

package gstatem

// 服务 Routine
func serverRoutine(ctx context.Context, cancel context.CancelFunc) {
    statemData := ctx.Value(DataKey).(*Data)

    defer handleTerminate(ctx)
    defer func(sd *Data) {
        if r := recover(); r != nil {
            ...
        }
        sd.cancelMsg = ctx.Err().Error()
    }(statemData)
    defer func(sd *Data) {
        close(sd.channel)
        if ctx.Err() == nil {
            cancel()
        }
    }(statemData)

    // 初始化
    handleInit(ctx, cancel)

    // 服务循环
    serverLoop(ctx, cancel)
}

EOF

Documentation

Index

Constants

View Source
const (
	ReturnTypeInitOk           = iota + 1 // 初始化完成
	ReturnTypeKeepState                   // 保持状态
	ReturnTypeNextState                   // 下个状态
	ReturnTypeKeepStateAndData            // 保持状态和数据
	ReturnTypeStop                        // 停止
)
View Source
const (
	ActionTypeStateTimeout       = iota + 1 // 当前状态定时
	ActionTypeCancelStateTimeout            // 取消状态定时
)

Variables

View Source
var DataKey = "StatemDataKey" // statem 数据key
View Source
var ParentDataKey = "StatemParentDataKey" // 父级上下文数据key

Functions

func Call

func Call(ctx context.Context, callData any, timeouts ...duration) (any, error)

Call 同步消息

func Cast

func Cast(ctx context.Context, castData any)

Cast 异步消息

func ChildStart

func ChildStart(parentCtx context.Context, statemData *Data) (context.Context, context.CancelFunc, bool)

ChildStart 创建子级上下文, 并启动协程服务

func Info

func Info(ctx context.Context, infoData any)

Info 内部消息

func ParentStart

func ParentStart(prevParentCtx context.Context, data *ParentData) (context.Context, context.CancelFunc)

ParentStart 创建某类服务的父级上下文

func SendAfter

func SendAfter(ctx context.Context, n duration, infoData any) *time.Timer

SendAfter 启动定时器 n 秒后发送消息

func Start

func Start(ctx context.Context, cancel context.CancelFunc) bool

Start 启动服务

Types

type Action

type Action struct {
	ActionType actionType // 动作类型
	StateTime  duration   // 状态时间
	TimeoutMsg any        // 超时消息
}

Action statem server 回调返回动作

type CloseMsg

type CloseMsg = string

CloseMsg statem server 协程关闭信息

type Data

type Data struct {
	Key         any    // 上下文唯一标识
	InitData    any    // 初始化数据
	MsgChanSize uint32 // 消息通道大小
	// contains filtered or unexported fields
}

Data statem 数据结构

type ParentData

type ParentData struct {
	Name             string                  // 父级上下文名字
	InitFunc         initHandlerFunc         // 初始化回调函数
	CastFunc         castHandlerFunc         // 异步消息回调函数
	CallFunc         callHandlerFunc         // 同步消息回调函数
	InfoFunc         infoHandlerFunc         // 普通消息回调函数
	StateTimeoutFunc stateTimeoutHandlerFunc // 状态超时回调函数
	TerminateFunc    terminateHandlerFunc    // 终结回调函数
}

ParentData 父级初始化数据结构

type Return

type Return struct {
	Type       returnType // 回调返回类型
	NewState   any        // 新状态
	ServerData any        // 服务数据
	Action     Action     // 回调返回动作
	ReplyMsg   any        // 响应消息
	// contains filtered or unexported fields
}

Return statem server 回调返回结构

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL