rulego

package module
v0.20.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 23, 2024 License: Apache-2.0 Imports: 21 Imported by: 8

README

RuleGo

GoDoc Go Report codecov test build

English| 中文

RuleGo is a lightweight, high-performance, embedded, orchestrable component-based rule engine based on Go language. It is also a flexible and highly customizable event processing framework. Support heterogeneous system data integration. It can aggregate, distribute, filter, transform, enrich and execute various actions on input messages.

Documentation

RuleGo documentation is hosted on: rulego.cc .

Features

  • Lightweight: No external middleware dependencies, can efficiently process and link data on low-cost devices, suitable for IoT edge computing.
  • High performance: Thanks to the high-performance characteristics of Go, in addition, RuleGo adopts technologies such as coroutine pool and object pool.
  • Embedded: Support embedding RuleGo into existing projects, non-intrusively utilizing its features.
  • Componentized: All business logic is componentized and can be flexibly configured and reused.
  • Rule chain: You can flexibly combine and reuse different components to achieve highly customizable and scalable business processes.
  • Process orchestration: Support dynamic orchestration of rule chain components, replace or add business logic without restarting the application.
  • Easy to extend: Provide rich and flexible extension interfaces, you can easily implement custom components or introduce third-party components.
  • Dynamic loading: Support dynamic loading of components and extension components through Go plugin.
  • Rule chain nesting: Support sub-rule chain nesting, realize process reuse.
  • Built-in common components: Message type Switch,JavaScript Switch,JavaScript filter,JavaScript converter,HTTP push,MQTT push,Send email,Log record and other components. You can extend other components by yourself.
  • Context isolation mechanism: Reliable context isolation mechanism, no need to worry about data streaming in high concurrency situations.
  • AOP: Allows adding extra behavior to the execution of the rule chain, or directly replacing the original rule chain or node logic, without modifying the original logic of the rule chain or node.

Use Cases

RuleGo is a rule engine based on orchestration, which is best at decoupling your system.

  • If your system is complex and bloated with code
  • If your business scenario is highly customized or frequently changed
  • If your system needs to interface with a large number of third-party systems or protocols
  • Or you need an end-to-end IoT solution
  • Or you need to process data from heterogeneous systems centrally
  • Or you want to try hot deployment in Go language... Then RuleGo framework will be a very good solution.
Typical use cases
  • Edge computing: For example: You can deploy RuleGo on the edge server, preprocess, filter, aggregate or calculate the data before reporting it to the cloud. The data processing rules and distribution rules can be dynamically configured and modified through the rule chain without restarting the system.
  • Internet of Things: For example: Collect device data reporting, and after the rule judgment of the rule chain, trigger one or more actions, such as: send email, send alarm, and link with other devices or systems.
  • Data distribution: For example: You can distribute data to different systems according to different message types, such as HTTP, MQTT or gRPC.
  • Application integration: Use RuleGo as a glue to various different systems or protocols, such as: ssh,webhook,kafka, message queue, database, chatGPT, third-party systems.
  • Data processing from heterogeneous systems: For example: Receive data from different data sources (such as MQTT, HTTP,WS,TCP/UDP etc.), and then filter, format conversion, and then distribute to databases, business systems or dashboards.
  • Highly customized business: For example: Decouple highly customized or frequently changed business and hand it over to RuleGo rule chain for management. Business requirements change without restarting the main program.
  • Complex business orchestration: For example: Encapsulate the business into custom components, and use RuleGo to orchestrate and drive these custom components, and support dynamic adjustment.
  • Microservice orchestration: For example: Use RuleGo to orchestrate and drive microservices, or dynamically call third-party services to process business and return results.
  • Business code and business logic decoupling: For example: User points calculation system, risk control system.
  • Flexible configuration and highly customized event processing framework: For example: Asynchronously or synchronously process different message types.
  • Automation: For example, process automation systems, marketing automation systems.

Architecture Diagram

RuleGo Architecture Diagram

Installation

Use the go get command to install RuleGo:

go get github.com/rulego/rulego

Usage

First, define the rule chain in Json format. The rule chain definition does not require learning a specific rule syntax or DSL, just configure the components and connect them with certain relationships, and you can achieve your functional requirements. Rule chain definition: Reference rule chain

RuleGo is extremely simple and lightweight. Just follow these 2 steps:

  1. Import the RuleGo package and use the rule chain definition to create a rule engine instance:
import "github.com/rulego/rulego"

//Use the rule chain definition to create a rule engine instance
ruleEngine, err := rulego.New("rule01", []byte(ruleFile))
  1. Pass the message payload, message type, and message metadata to the rule engine instance, and the rule engine will process the message according to the rule chain definition:
//Define message metadata
metaData := types.NewMetadata()
metaData.PutValue("productType", "test01")
//Define message payload and message type
msg := types.NewMsg(0, "TELEMETRY_MSG", types.JSON, metaData, "{\"temperature\":35}")

//Pass the message to the rule engine for processing
ruleEngine.OnMsg(msg)

Rule engine management API

Dynamically update the rule chain

//Update the root rule chain
err := ruleEngine.ReloadSelf([]byte(ruleFile))
//Update a node under the rule chain
ruleEngine.ReloadChild("rule_chain_test", nodeFile)
//Get the rule chain definition
ruleEngine.DSL()

Rule engine instance management:

//Load all rule chain definitions in the folder to the rule engine pool
rulego.Load("/rules", rulego.WithConfig(config))
//Get a created rule engine instance by ID
ruleEngine, ok := rulego.Get("rule01")
//Delete a created rule engine instance
rulego.Del("rule01")

Configuration:

See documentation for details

//Create a default configuration
config := rulego.NewConfig()
//Debug node callback, node configuration must be configured debugMode:true to trigger call
//Node entry and exit information will call this callback function
config.OnDebug = func (chainId,flowType string, nodeId string, msg types.RuleMsg, relationType string, err error) {
}
//Use configuration
ruleEngine, err := rulego.New("rule01", []byte(ruleFile), rulego.WithConfig(config))
More examples

About rule chain

Rule node

Rule nodes are the basic components of the rule chain, they are functions that implement specific business logic. Rule nodes can filter, transform, enrich or perform some actions on the incoming messages. Rule nodes can adjust their behavior and output by configuring parameters. You can easily encapsulate your business into RuleGo node components, and flexibly configure and reuse them, like building blocks to achieve your business requirements.

Rule chains

Rule chains are the core concept of RuleGo, they are directed acyclic graphs composed of multiple rule nodes, each rule node is a component that can implement different business logic, nodes are connected by relationship types (relation type). Rule chains can be dynamically configured and modified, support nesting and orchestration, and implement complex business processes.

The following example defines 3 rule nodes, which are to filter->transform->push data, the rule chain logic is as follows:

Rule chain definition:

{
  "ruleChain": {
    "name": "Test rule chain",
    "root": true
  },
  "metadata": {
    "nodes": [
      {
        "id": "s1",
        "type": "jsFilter",
        "name": "Filter",
        "debugMode": true,
        "configuration": {
          "jsScript": "return msg!='bb';"
        }
      },
      {
        "id": "s2",
        "type": "jsTransform",
        "name": "Transform",
        "debugMode": true,
        "configuration": {
          "jsScript": "metadata['test']='test02';\n metadata['index']=50;\n msgType='TEST_MSG_TYPE2';\n var msg2=JSON.parse(msg);\n msg2['aa']=66;\n return {'msg':msg2,'metadata':metadata,'msgType':msgType};"
        }
      },
      {
        "id": "s3",
        "type": "restApiCall",
        "name": "Push data",
        "debugMode": true,
        "configuration": {
          "restEndpointUrlPattern": "http://192.168.216.21:9099/api/socket/msg",
          "requestMethod": "POST",
          "maxParallelRequestsCount": 200
        }
      }
    ],
    "connections": [
      {
        "fromId": "s1",
        "toId": "s2",
        "type": "True"
      },
      {
        "fromId": "s2",
        "toId": "s3",
        "type": "Success"
      }
    ]
  }
}

Other rule chain examples:

  • Asynchronous + sequential execution:


  • Using sub-rule chain method:


  • Some complex examples:


Data Integration

RuleGo provides Endpoint module for unified data integration and processing of heterogeneous systems.For more details, please refer to: Endpoint

Performance

RuleGo almost does not increase system overhead, resource consumption is extremely low, especially suitable for running on edge servers. In addition, RuleGo uses a directed acyclic graph to represent the rule chain, and each input message only needs to be processed along the path in the graph, without matching all the rules, This greatly improves the efficiency and speed of message processing, and also saves resources and time. The routing algorithm can achieve: no matter how many nodes the rule chain has, it will not affect the node routing performance.

Performance test cases:

Machine: Raspberry Pi 2 (900MHz Cortex-A7*4,1GB LPDDR2)  
Data size: 260B   
Rule chain: JS script filtering->JS complex transformation->HTTP push   
Test results: 100 concurrent and 500 concurrent, memory consumption does not change much around 19M

More performance test cases

Ecosystem

Contribution

Any form of contribution is welcome, including submitting issues, suggestions, documentation, tests or code. Please follow these steps:

  • Clone the project repository to your local machine
  • Create a new branch and make modifications
  • Submit a merge request to the main branch
  • Wait for review and feedback

License

RuleGo uses Apache 2.0 license, please refer to LICENSE file for details.

Documentation

Overview

Package rulego provides a lightweight, high-performance, embedded, orchestrable component-based rule engine.

Usage

Implement your business requirements by configuring components in the rule chain, and support dynamic modification. Rule chain definition format:

{
	  "ruleChain": {
		"id":"rule01"
	  },
	  "metadata": {
	    "nodes": [
	    ],
		"connections": [
		]
	 }
}

nodes:configure components. You can use built-in components and third-party extension components without writing any code.

connections:configure the relation type between components. Determine the data flow.

Example:

var ruleFile = `
{
	"ruleChain": {
	"id":"rule02",
	"name": "test",
	"root": true
	},
	"metadata": {
	"nodes": [
		{
		"id": "s1",
		"type": "jsTransform",
		"name": "transform",
		"debugMode": true,
		"configuration": {
			"jsScript": "metadata['state']='modify by js';\n msg['addField']='addValueFromJs'; return {'msg':msg,'metadata':metadata,'msgType':msgType};"
			}
		},
		{
			"id": "s2",
			"type": "restApiCall",
			"name": "push data",
			"debugMode": true,
			"configuration": {
				"restEndpointUrlPattern": "http://127.0.0.1:9090/api/msg",
				"requestMethod": "POST",
			}
		}
	],
	"connections": [
		{
			"fromId": "s1",
			"toId": "s2",
			"type": "Success"
		}
	]
	}
}
`

Create Rule Engine Instance

ruleEngine, err := rulego.New("rule01", []byte(ruleFile))

Define Message Metadata

metaData := types.NewMetadata()
metaData.PutValue("productType", "test01")

Define Message Payload And Type

msg := types.NewMsg(0, "TELEMETRY_MSG", types.JSON, metaData, "{\"temperature\":35}")

Processing Message

ruleEngine.OnMsg(msg)

Update Rule Chain

err := ruleEngine.ReloadSelf([]byte(ruleFile))

Load All Rule Chain

err := ruleEngine.Load("./rulechains")

Get Engine Instance

ruleEngine, ok := rulego.Get("rule01")

Index

Constants

View Source
const PluginsSymbol = "Plugins"

PluginsSymbol 插件检查点 Symbol

Variables

View Source
var DefaultRuleGo = &RuleGo{}

Registry 规则引擎组件默认注册器

Functions

func Del

func Del(id string)

Del 删除指定ID规则引擎实例

func Load added in v0.14.0

func Load(folderPath string, opts ...RuleEngineOption) error

Load 加载指定文件夹及其子文件夹所有规则链配置(与.json结尾文件),到规则引擎实例池 规则链ID,使用文件配置的 ruleChain.id

func NewConfig

func NewConfig(opts ...types.Option) types.Config

NewConfig creates a new Config and applies the options.

func OnMsg added in v0.15.0

func OnMsg(msg types.RuleMsg)

OnMsg 调用所有规则引擎实例处理消息 规则引擎实例池所有规则链都会去尝试处理该消息

func ParserRuleChain

func ParserRuleChain(rootRuleChain []byte) (types.RuleChain, error)

ParserRuleChain 通过json解析规则链结构体

func ParserRuleNode

func ParserRuleNode(rootRuleChain []byte) (types.RuleNode, error)

ParserRuleNode 通过json解析节点结构体

func Range added in v0.20.0

func Range(f func(key, value any) bool)

Range 遍历所有规则引擎实例

func Reload added in v0.20.0

func Reload(opts ...RuleEngineOption)

Reload 重新加载所有规则引擎实例

func Stop

func Stop()

Stop 释放所有规则引擎实例

Types

type DefaultRuleContext

type DefaultRuleContext struct {
	// contains filtered or unexported fields
}

DefaultRuleContext 默认规则引擎消息处理上下文

func NewRuleContext

func NewRuleContext(context context.Context, config types.Config, ruleChainCtx *RuleChainCtx, from types.NodeCtx, self types.NodeCtx, pool types.Pool, onEnd types.OnEndFunc, ruleChainPool *RuleGo) *DefaultRuleContext

NewRuleContext 创建一个默认规则引擎消息处理上下文实例

func (*DefaultRuleContext) Config

func (ctx *DefaultRuleContext) Config() types.Config

func (*DefaultRuleContext) DoOnEnd added in v0.18.0

func (ctx *DefaultRuleContext) DoOnEnd(msg types.RuleMsg, err error, relationType string)

DoOnEnd 结束规则链分支执行,触发 OnEnd 回调函数

func (*DefaultRuleContext) ExecuteNode added in v0.17.0

func (ctx *DefaultRuleContext) ExecuteNode(chanCtx context.Context, nodeId string, msg types.RuleMsg, skipTellNext bool, onEnd types.OnEndFunc)

ExecuteNode 从指定节点开始执行,如果 skipTellNext=true 则只执行当前节点,不通知下一个节点。 onEnd 查看获得最终执行结果

func (*DefaultRuleContext) From added in v0.18.0

func (ctx *DefaultRuleContext) From() types.NodeCtx

func (*DefaultRuleContext) GetCallbackFunc added in v0.20.0

func (ctx *DefaultRuleContext) GetCallbackFunc(functionName string) interface{}

func (*DefaultRuleContext) GetContext added in v0.13.0

func (ctx *DefaultRuleContext) GetContext() context.Context

func (*DefaultRuleContext) GetEndFunc

func (ctx *DefaultRuleContext) GetEndFunc() types.OnEndFunc

func (*DefaultRuleContext) GetRuleChainPool added in v0.17.0

func (ctx *DefaultRuleContext) GetRuleChainPool() *RuleGo

GetRuleChainPool 获取子规则链池

func (*DefaultRuleContext) GetSelfId

func (ctx *DefaultRuleContext) GetSelfId() string

func (*DefaultRuleContext) IsDebugMode added in v0.20.0

func (ctx *DefaultRuleContext) IsDebugMode() bool

IsDebugMode 是否调试模式,优先使用规则链指定的调试模式

func (*DefaultRuleContext) NewMsg

func (ctx *DefaultRuleContext) NewMsg(msgType string, metaData types.Metadata, data string) types.RuleMsg

func (*DefaultRuleContext) NewNextNodeRuleContext added in v0.15.0

func (ctx *DefaultRuleContext) NewNextNodeRuleContext(nextNode types.NodeCtx) *DefaultRuleContext

NewNextNodeRuleContext 创建下一个节点的规则引擎消息处理上下文实例RuleContext

func (*DefaultRuleContext) OnDebug added in v0.20.0

func (ctx *DefaultRuleContext) OnDebug(ruleChainId string, flowType string, nodeId string, msg types.RuleMsg, relationType string, err error)

func (*DefaultRuleContext) RuleChain added in v0.18.0

func (ctx *DefaultRuleContext) RuleChain() types.NodeCtx

func (*DefaultRuleContext) Self added in v0.18.0

func (ctx *DefaultRuleContext) Self() types.NodeCtx

func (*DefaultRuleContext) SetAllCompletedFunc added in v0.15.0

func (ctx *DefaultRuleContext) SetAllCompletedFunc(f func()) types.RuleContext

func (*DefaultRuleContext) SetCallbackFunc added in v0.20.0

func (ctx *DefaultRuleContext) SetCallbackFunc(functionName string, f interface{})

func (*DefaultRuleContext) SetContext added in v0.13.0

func (ctx *DefaultRuleContext) SetContext(c context.Context) types.RuleContext

func (*DefaultRuleContext) SetEndFunc

func (ctx *DefaultRuleContext) SetEndFunc(onEndFunc types.OnEndFunc) types.RuleContext

func (*DefaultRuleContext) SetOnAllNodeCompleted added in v0.17.0

func (ctx *DefaultRuleContext) SetOnAllNodeCompleted(onAllNodeCompleted func())

SetOnAllNodeCompleted 设置所有节点执行完回调

func (*DefaultRuleContext) SetRuleChainPool added in v0.17.0

func (ctx *DefaultRuleContext) SetRuleChainPool(ruleChainPool *RuleGo)

SetRuleChainPool 设置子规则链池

func (*DefaultRuleContext) SubmitTack

func (ctx *DefaultRuleContext) SubmitTack(task func())

func (*DefaultRuleContext) TellFailure

func (ctx *DefaultRuleContext) TellFailure(msg types.RuleMsg, err error)

func (*DefaultRuleContext) TellFlow added in v0.17.0

func (ctx *DefaultRuleContext) TellFlow(msg types.RuleMsg, chainId string, onEndFunc types.OnEndFunc, onAllNodeCompleted func())

TellFlow 执行子规则链,ruleChainId 规则链ID onEndFunc 子规则链链分支执行完的回调,并返回该链执行结果,如果同时触发多个分支链,则会调用多次 onAllNodeCompleted 所以节点执行完之后的回调,无结果返回 如果找不到规则链,并把消息通过`Failure`关系发送到下一个节点

func (*DefaultRuleContext) TellNext

func (ctx *DefaultRuleContext) TellNext(msg types.RuleMsg, relationTypes ...string)

func (*DefaultRuleContext) TellSelf

func (ctx *DefaultRuleContext) TellSelf(msg types.RuleMsg, delayMs int64)

func (*DefaultRuleContext) TellSuccess

func (ctx *DefaultRuleContext) TellSuccess(msg types.RuleMsg)

type JsonParser

type JsonParser struct {
}

JsonParser Json

func (*JsonParser) DecodeRuleChain

func (p *JsonParser) DecodeRuleChain(config types.Config, dsl []byte) (types.Node, error)

func (*JsonParser) DecodeRuleNode

func (p *JsonParser) DecodeRuleNode(config types.Config, dsl []byte, chainCtx types.Node) (types.Node, error)

func (*JsonParser) EncodeRuleChain

func (p *JsonParser) EncodeRuleChain(def interface{}) ([]byte, error)

func (*JsonParser) EncodeRuleNode

func (p *JsonParser) EncodeRuleNode(def interface{}) ([]byte, error)

type PluginComponentRegistry

type PluginComponentRegistry struct {
	// contains filtered or unexported fields
}

PluginComponentRegistry go plugin组件初始化器

func (*PluginComponentRegistry) Components

func (p *PluginComponentRegistry) Components() []types.Node

func (*PluginComponentRegistry) Init

func (p *PluginComponentRegistry) Init() error

type RelationCache

type RelationCache struct {
	// contains filtered or unexported fields
}

type RuleChainCtx

type RuleChainCtx struct {
	//节点ID
	Id types.RuleNodeId
	//规则链定义
	SelfDefinition *types.RuleChain
	//规则引擎配置
	Config types.Config

	sync.RWMutex
	// contains filtered or unexported fields
}

RuleChainCtx 规则链实例定义 初始化所有节点 记录规则链,所有节点路由关系

func InitRuleChainCtx

func InitRuleChainCtx(config types.Config, ruleChainDef *types.RuleChain) (*RuleChainCtx, error)

InitRuleChainCtx 初始化RuleChainCtx

func (*RuleChainCtx) Copy

func (rc *RuleChainCtx) Copy(newCtx *RuleChainCtx)

Copy 复制

func (*RuleChainCtx) DSL

func (rc *RuleChainCtx) DSL() []byte

func (*RuleChainCtx) Destroy

func (rc *RuleChainCtx) Destroy()

func (*RuleChainCtx) GetFirstNode

func (rc *RuleChainCtx) GetFirstNode() (types.NodeCtx, bool)

GetFirstNode 获取第一个节点,消息从该节点开始流转。默认是index=0的节点

func (*RuleChainCtx) GetNextNodes

func (rc *RuleChainCtx) GetNextNodes(id types.RuleNodeId, relationType string) ([]types.NodeCtx, bool)

GetNextNodes 获取当前节点指定关系的子节点

func (*RuleChainCtx) GetNodeById

func (rc *RuleChainCtx) GetNodeById(id types.RuleNodeId) (types.NodeCtx, bool)

func (*RuleChainCtx) GetNodeByIndex

func (rc *RuleChainCtx) GetNodeByIndex(index int) (types.NodeCtx, bool)

func (*RuleChainCtx) GetNodeId

func (rc *RuleChainCtx) GetNodeId() types.RuleNodeId

func (*RuleChainCtx) GetNodeRoutes

func (rc *RuleChainCtx) GetNodeRoutes(id types.RuleNodeId) ([]types.RuleNodeRelation, bool)

func (*RuleChainCtx) GetRuleChainPool added in v0.15.0

func (rc *RuleChainCtx) GetRuleChainPool() *RuleGo

GetRuleChainPool 获取子规则链池

func (*RuleChainCtx) Init

func (rc *RuleChainCtx) Init(_ types.Config, configuration types.Configuration) error

Init 初始化

func (*RuleChainCtx) IsDebugMode

func (rc *RuleChainCtx) IsDebugMode() bool

func (*RuleChainCtx) New

func (rc *RuleChainCtx) New() types.Node

func (*RuleChainCtx) OnMsg

func (rc *RuleChainCtx) OnMsg(ctx types.RuleContext, msg types.RuleMsg)

OnMsg 处理消息

func (*RuleChainCtx) ReloadChild

func (rc *RuleChainCtx) ReloadChild(ruleNodeId types.RuleNodeId, def []byte) error

func (*RuleChainCtx) ReloadSelf

func (rc *RuleChainCtx) ReloadSelf(def []byte) error

func (*RuleChainCtx) SetRuleChainPool added in v0.15.0

func (rc *RuleChainCtx) SetRuleChainPool(ruleChainPool *RuleGo)

SetRuleChainPool 设置子规则链池

func (*RuleChainCtx) Type

func (rc *RuleChainCtx) Type() string

Type 组件类型

type RuleComponentRegistry

type RuleComponentRegistry struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

RuleComponentRegistry 组件注册器

func (*RuleComponentRegistry) GetComponentForms added in v0.15.0

func (r *RuleComponentRegistry) GetComponentForms() types.ComponentFormList

func (*RuleComponentRegistry) GetComponents

func (r *RuleComponentRegistry) GetComponents() map[string]types.Node

func (*RuleComponentRegistry) NewNode

func (r *RuleComponentRegistry) NewNode(nodeType string) (types.Node, error)

NewNode 获取规则引擎节点组件

func (*RuleComponentRegistry) Register

func (r *RuleComponentRegistry) Register(node types.Node) error

Register 注册规则引擎节点组件

func (*RuleComponentRegistry) RegisterPlugin

func (r *RuleComponentRegistry) RegisterPlugin(name string, file string) error

RegisterPlugin 注册规则引擎节点组件

func (*RuleComponentRegistry) Unregister

func (r *RuleComponentRegistry) Unregister(componentType string) error

type RuleEngine

type RuleEngine struct {
	//规则引擎实例标识
	Id string
	//配置
	Config types.Config
	//子规则链池
	RuleChainPool *RuleGo
	// contains filtered or unexported fields
}

RuleEngine 规则引擎 每个规则引擎实例只有一个根规则链,如果没设置规则链则无法处理数据

func Get

func Get(id string) (*RuleEngine, bool)

Get 获取指定ID规则引擎实例

func New

func New(id string, rootRuleChainSrc []byte, opts ...RuleEngineOption) (*RuleEngine, error)

New 创建一个新的RuleEngine并将其存储在RuleGo规则链池中

func (*RuleEngine) DSL

func (e *RuleEngine) DSL() []byte

DSL 获取根规则链配置

func (*RuleEngine) Definition added in v0.20.0

func (e *RuleEngine) Definition() types.RuleChain

func (*RuleEngine) Initialized

func (e *RuleEngine) Initialized() bool

func (*RuleEngine) NodeDSL

func (e *RuleEngine) NodeDSL(chainId types.RuleNodeId, childNodeId types.RuleNodeId) []byte

NodeDSL 获取规则链节点配置

func (*RuleEngine) OnMsg

func (e *RuleEngine) OnMsg(msg types.RuleMsg, opts ...types.RuleContextOption)

OnMsg 把消息交给规则引擎处理,异步执行 提供可选参数types.RuleContextOption

func (*RuleEngine) OnMsgAndWait added in v0.15.0

func (e *RuleEngine) OnMsgAndWait(msg types.RuleMsg, opts ...types.RuleContextOption)

OnMsgAndWait 把消息交给规则引擎处理,同步执行 等规则链所有节点执行完后返回

func (*RuleEngine) OnMsgWithEndFunc

func (e *RuleEngine) OnMsgWithEndFunc(msg types.RuleMsg, endFunc types.OnEndFunc)

OnMsgWithEndFunc 把消息交给规则引擎处理,异步执行 endFunc 用于数据经过规则链执行完的回调,用于获取规则链处理结果数据。注意:如果规则链有多个结束点,回调函数则会执行多次 Deprecated 使用OnMsg代替

func (*RuleEngine) OnMsgWithOptions added in v0.13.0

func (e *RuleEngine) OnMsgWithOptions(msg types.RuleMsg, opts ...types.RuleContextOption)

OnMsgWithOptions 把消息交给规则引擎处理,异步执行 可以携带context选项和结束回调选项 context 用于不同组件实例数据共享 endFunc 用于数据经过规则链执行完的回调,用于获取规则链处理结果数据。注意:如果规则链有多个结束点,回调函数则会执行多次 Deprecated 使用OnMsg代替

func (*RuleEngine) Reload added in v0.20.0

func (e *RuleEngine) Reload(opts ...RuleEngineOption) error

func (*RuleEngine) ReloadChild

func (e *RuleEngine) ReloadChild(ruleNodeId string, dsl []byte) error

ReloadChild 更新根规则链或者其下某个节点 如果ruleNodeId为空更新根规则链,否则更新指定的子节点 dsl 根规则链/子节点配置

func (*RuleEngine) ReloadSelf

func (e *RuleEngine) ReloadSelf(def []byte, opts ...RuleEngineOption) error

ReloadSelf 重新加载规则链

func (*RuleEngine) RootRuleChainCtx

func (e *RuleEngine) RootRuleChainCtx() *RuleChainCtx

RootRuleChainCtx 获取根规则链

func (*RuleEngine) Stop

func (e *RuleEngine) Stop()

type RuleEngineOption

type RuleEngineOption func(*RuleEngine) error

RuleEngineOption is a function type that modifies the RuleEngine.

func WithConfig

func WithConfig(config types.Config) RuleEngineOption

WithConfig is an option that sets the Config of the RuleEngine.

type RuleGo

type RuleGo struct {
	// contains filtered or unexported fields
}

RuleGo 规则引擎实例池

func (*RuleGo) Del

func (g *RuleGo) Del(id string)

Del 删除指定ID规则引擎实例

func (*RuleGo) Get

func (g *RuleGo) Get(id string) (*RuleEngine, bool)

Get 获取指定ID规则引擎实例

func (*RuleGo) Load added in v0.14.0

func (g *RuleGo) Load(folderPath string, opts ...RuleEngineOption) error

Load 加载指定文件夹及其子文件夹所有规则链配置(与.json结尾文件),到规则引擎实例池 规则链ID,使用规则链文件配置的ruleChain.id

func (*RuleGo) New

func (g *RuleGo) New(id string, rootRuleChainSrc []byte, opts ...RuleEngineOption) (*RuleEngine, error)

New 创建一个新的RuleEngine并将其存储在RuleGo规则链池中 如果指定id="",则使用规则链文件的ruleChain.id

func (*RuleGo) OnMsg added in v0.15.0

func (g *RuleGo) OnMsg(msg types.RuleMsg)

OnMsg 调用所有规则引擎实例处理消息 规则引擎实例池所有规则链都会去尝试处理该消息

func (*RuleGo) Range added in v0.20.0

func (g *RuleGo) Range(f func(key, value any) bool)

Range 遍历所有规则引擎实例

func (*RuleGo) Stop

func (g *RuleGo) Stop()

Stop 释放所有规则引擎实例

type RuleNodeCtx

type RuleNodeCtx struct {
	//组件实例
	types.Node
	//规则链配置上下文
	ChainCtx *RuleChainCtx
	//组件配置
	SelfDefinition *types.RuleNode
	//规则引擎配置
	Config types.Config
}

RuleNodeCtx 节点组件实例定义

func InitRuleNodeCtx

func InitRuleNodeCtx(config types.Config, chainCtx *RuleChainCtx, selfDefinition *types.RuleNode) (*RuleNodeCtx, error)

InitRuleNodeCtx 初始化RuleNodeCtx

func (*RuleNodeCtx) Copy

func (rn *RuleNodeCtx) Copy(newCtx *RuleNodeCtx)

Copy 复制

func (*RuleNodeCtx) DSL

func (rn *RuleNodeCtx) DSL() []byte

func (*RuleNodeCtx) GetNodeById

func (rn *RuleNodeCtx) GetNodeById(_ types.RuleNodeId) (types.NodeCtx, bool)

func (*RuleNodeCtx) GetNodeId

func (rn *RuleNodeCtx) GetNodeId() types.RuleNodeId

func (*RuleNodeCtx) IsDebugMode

func (rn *RuleNodeCtx) IsDebugMode() bool

func (*RuleNodeCtx) ReloadChild

func (rn *RuleNodeCtx) ReloadChild(_ types.RuleNodeId, _ []byte) error

func (*RuleNodeCtx) ReloadSelf

func (rn *RuleNodeCtx) ReloadSelf(def []byte) error

type RunSnapshot added in v0.20.0

type RunSnapshot struct {
	// contains filtered or unexported fields
}

func NewRunSnapshot added in v0.20.0

func NewRunSnapshot(msgId string, chainCtx *RuleChainCtx, startTs int64) *RunSnapshot

Directories

Path Synopsis
api
js
net
examples
Package pool Note: This file is inspired by: Valyala, A. (2023) workerpool.go (Version 1.48.0) [Source code].
Package pool Note: This file is inspired by: Valyala, A. (2023) workerpool.go (Version 1.48.0) [Source code].
utils
aes
fs
str

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL