endpoint

package
v0.20.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 23, 2024 License: Apache-2.0 Imports: 11 Imported by: 1

README

Endpoint

English| 中文

Endpoint is a module that abstracts different input source data routing, providing a consistent user experience for different protocols. It is an optional module of RuleGo that enables RuleGo to run independently and provide services.

It allows you to easily create and start different receiving services, such as http, mqtt, kafka, gRpc, websocket, schedule, tpc, udp, etc., to achieve data integration of heterogeneous systems, and then perform conversion, processing, flow, etc. operations according to different requests or messages, and finally hand them over to the rule chain or component for processing.

Endpoint architecture diagram

Usage

  1. First define the route, which provides a stream-like calling method, including the input end, processing function and output end. Different Endpoint types have consistent route processing
router := endpoint.NewRouter().From("/api/v1/msg/").Process(func(exchange *endpoint.Exchange) bool {
//processing logic
return true
}).To("chain:default")

For different Endpoint types, the meaning of the input end From will be different, but it will eventually route to the router according to the From value:

  • http/websocket endpoint: represents path routing, creating an http service according to the From value. For example: From("/api/v1/msg/") means creating /api/v1/msg/ http service.
  • mqtt/kafka endpoint: represents the subscribed topic, subscribing to the relevant topic according to the From value. For example: From("/api/v1/msg/") means subscribing to the /api/v1/msg/ topic.
  • schedule endpoint: represents the cron expression, creating a related timed task according to the From value. For example: From("*/1 * * * * *") means triggering the router every 1 second.
  • tpc/udp endpoint: represents a regular expression, forwarding the message that meets the condition to the router according to the From value. For example: From("^{.*") means data that satisfies { at the beginning.
  1. Then create the Endpoint service, the creation interface is also consistent:
//For example: create http service
restEndpoint, err := endpoint.New(rest.Type, config, rest.Config{Server: ":9090",})
// or use map to set configuration
restEndpoint, err := endpoint.New(rest.Type, config, types.Configuration{"server": ":9090",})

//For example: create mqtt subscription service
mqttEndpoint, err := endpoint.New(mqtt.Type, config, mqtt.Config{Server: "127.0.0.1:1883",})
// or use map to set configuration
mqttEndpoint, err := endpoint.New(mqtt.Type, config, types.Configuration{"server": "127.0.0.1:1883",})

//For example: create ws service
wsEndpoint, err := endpoint.New(websocket.Type, config, websocket.Config{Server: ":9090"})

//For example: create tcp service
tcpEndpoint, err := endpoint.New(net.Type, config, Config{Protocol: "tcp", Server:   ":8888",})

//For example: create schedule endpoint service
scheduleEndpoint, err := endpoint.New(schedule.Type, config, nil)
  1. Register the route to the endpoint service and start the service
//http endpoint register route
_, err = restEndpoint.AddRouter(router1,"POST")
_, err = restEndpoint.AddRouter(router2,"GET")
_ = restEndpoint.Start()

//mqtt endpoint register route
_, err = mqttEndpoint.AddRouter(router1)
_, err = mqttEndpoint.AddRouter(router2)
_ = mqttEndpoint.Start()
  1. Endpoint supports responding to the caller
router5 := endpoint.NewRouter().From("/api/v1/msgToComponent2/:msgType").Process(func(router *endpoint.Router, exchange *endpoint.Exchange) bool {
    //respond to the client
    exchange.Out.Headers().Set("Content-Type", "application/json")
    exchange.Out.SetBody([]byte("ok"))
    return true
})
//If you need to synchronize the rule chain execution result to the client, add the wait semantics
router5 := endpoint.NewRouter().From("/api/v1/msg2Chain4/:chainId").
To("chain:${chainId}").
//Must add Wait, asynchronous to synchronous, http can respond normally, if not synchronous response, do not add this sentence, will affect the throughput
Wait().
Process(func(router *endpoint.Router, exchange *endpoint.Exchange) bool {
  err := exchange.Out.GetError()
  if err != nil {
    //error
    exchange.Out.SetStatusCode(400)
    exchange.Out.SetBody([]byte(exchange.Out.GetError().Error()))
    } else {
    //respond the processing result to the client, http endpoint must add Wait(), otherwise it cannot respond normally
    outMsg := exchange.Out.GetMsg()
    exchange.Out.Headers().Set("Content-Type", "application/json")
    exchange.Out.SetBody([]byte(outMsg.Data))
  }

  return true
}).End()
  1. Add global interceptors to perform permission verification and other logic
restEndpoint.AddInterceptors(func(exchange *endpoint.Exchange) bool {
  //permission verification logic
  return true
})

Router

Refer to the documentation

Examples

The following are examples of using endpoint:

Extend endpoint

Endpoint module provides some built-in receiving service types, but you can also customize or extend other types of receiving services. To achieve this, you need to follow these steps:

  1. Implement the Message interface . The Message interface is an interface that abstracts different input source data, and it defines some methods to get or set the message content, header, source, parameters, status code, etc. You need to implement this interface for your receiving service type, so that your message type can interact with other types in the endpoint package.
  2. Implement the Endpoint interface . The Endpoint interface is an interface that defines different receiving service types, and it defines some methods to start, stop, add routes and interceptors, etc. You need to implement this interface for your receiving service type, so that your service type can interact with other types in the endpoint package.

The above are the basic steps to extend the endpoint package, you can refer to the existing endpoint type implementations to write your own code:

Documentation

Index

Constants

View Source
const (
	EventConnect    = "Connect"
	EventDisconnect = "Disconnect"
	EventInitServer = "InitServer"
)

Variables

View Source
var (
	//ChainNotFoundErr 规则链不存在错误
	ChainNotFoundErr = errors.New("chain not found error")
	//StopErr endpoint服务停止错误
	StopErr = errors.New("endpoint stop")
)
View Source
var DefaultExecutorFactory = new(ExecutorFactory)

DefaultExecutorFactory 默认to端执行器注册器

Registry endpoint组件默认注册器

Functions

This section is empty.

Types

type BaseEndpoint

type BaseEndpoint struct {

	//endpoint 路由存储器
	RouterStorage map[string]*Router
	sync.RWMutex
	// contains filtered or unexported fields
}

BaseEndpoint 基础端点 实现全局拦截器基础方法

func (*BaseEndpoint) AddInterceptors

func (e *BaseEndpoint) AddInterceptors(interceptors ...Process)

AddInterceptors 添加全局拦截器

func (*BaseEndpoint) DoProcess

func (e *BaseEndpoint) DoProcess(router *Router, exchange *Exchange)

func (*BaseEndpoint) OnMsg added in v0.14.0

func (e *BaseEndpoint) OnMsg(ctx types.RuleContext, msg types.RuleMsg)

type ChainExecutor

type ChainExecutor struct {
}

ChainExecutor 规则链执行器

func (*ChainExecutor) Execute

func (ce *ChainExecutor) Execute(ctx context.Context, router *Router, exchange *Exchange)

func (*ChainExecutor) Init

func (*ChainExecutor) IsPathSupportVar

func (ce *ChainExecutor) IsPathSupportVar() bool

IsPathSupportVar to路径允许带变量

func (*ChainExecutor) New

func (ce *ChainExecutor) New() Executor

type ComponentExecutor

type ComponentExecutor struct {
	// contains filtered or unexported fields
}

ComponentExecutor node组件执行器

func (*ComponentExecutor) Execute

func (ce *ComponentExecutor) Execute(ctx context.Context, router *Router, exchange *Exchange)

func (*ComponentExecutor) Init

func (ce *ComponentExecutor) Init(config types.Config, configuration types.Configuration) error

func (*ComponentExecutor) IsPathSupportVar

func (ce *ComponentExecutor) IsPathSupportVar() bool

IsPathSupportVar to路径不允许带变量

func (*ComponentExecutor) New

func (ce *ComponentExecutor) New() Executor

type ComponentRegistry added in v0.17.0

type ComponentRegistry struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

ComponentRegistry 组件注册器

func (*ComponentRegistry) New added in v0.17.0

func (r *ComponentRegistry) New(componentType string, ruleConfig types.Config, configuration interface{}) (Endpoint, error)

New 创建一个新的endpoint实例

func (*ComponentRegistry) Register added in v0.17.0

func (r *ComponentRegistry) Register(endpoint Endpoint) error

Register 注册规则引擎节点组件

func (*ComponentRegistry) Unregister added in v0.17.0

func (r *ComponentRegistry) Unregister(componentType string) error

type Endpoint added in v0.14.0

type Endpoint interface {
	//Node 继承node
	types.Node
	//Id 类型标识
	Id() string
	//Start 启动服务
	Start() error
	//AddInterceptors 添加全局拦截器
	AddInterceptors(interceptors ...Process)
	//AddRouter 添加路由,指定参数
	//params 为路由额外参数
	//返回路由ID,路由ID一般是from值,但某些Endpoint允许from值重复,Endpoint会返回新的路由ID,
	//路由ID用于路由删除
	AddRouter(router *Router, params ...interface{}) (string, error)
	//RemoveRouter 删除路由,指定参数
	//params 为路由额外参数
	//routerId:路由ID
	RemoveRouter(routerId string, params ...interface{}) error
}

func New added in v0.17.0

func New(componentType string, ruleConfig types.Config, configuration interface{}) (Endpoint, error)

New 创建指定类型的endpoint实例 componentType endpoint类型 ruleConfig rulego配置 configuration endpoint配置参数,可以是types.Configuration和endpoint对应Config的类型

type Exchange

type Exchange struct {
	//入数据
	In Message
	//出数据
	Out Message
}

Exchange 包含in 和out message

type Executor

type Executor interface {
	//New 创建新的实例
	New() Executor
	//IsPathSupportVar to路径是否支持${}变量方式,默认不支持
	IsPathSupportVar() bool
	//Init 初始化
	Init(config types.Config, configuration types.Configuration) error
	//Execute 执行逻辑
	Execute(ctx context.Context, router *Router, exchange *Exchange)
}

Executor to端执行器

type ExecutorFactory

type ExecutorFactory struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

ExecutorFactory to端执行器工厂

func (*ExecutorFactory) New

func (r *ExecutorFactory) New(name string) (Executor, bool)

New 根据类型创建to端执行器实例

func (*ExecutorFactory) Register

func (r *ExecutorFactory) Register(name string, executor Executor)

Register 注册to端执行器

type From

type From struct {
	//Config 配置
	Config types.Configuration
	//Router router指针
	Router *Router
	//来源路径
	From string
	// contains filtered or unexported fields
}

From from端

func (*From) End

func (f *From) End() *Router

End 结束返回*Router

func (*From) ExecuteProcess

func (f *From) ExecuteProcess(router *Router, exchange *Exchange) bool

ExecuteProcess 执行处理函数 true:执行To端逻辑,否则不执行

func (*From) GetProcessList

func (f *From) GetProcessList() []Process

GetProcessList 获取from端处理器列表

func (*From) GetTo

func (f *From) GetTo() *To

func (*From) Process

func (f *From) Process(process Process) *From

Process from端处理msg

func (*From) To

func (f *From) To(to string, configs ...types.Configuration) *To

To To端 参数是组件路径,格式{executorType}:{path} executorType:执行器组件类型,path:组件路径 如:chain:{chainId} 执行rulego中注册的规则链 component:{nodeType} 执行在config.ComponentsRegistry 中注册的组件 可在DefaultExecutorFactory中注册自定义执行器组件类型 componentConfigs 组件配置参数

func (*From) ToComponent

func (f *From) ToComponent(node types.Node) *To

ToComponent to组件 参数是types.Node类型组件

func (*From) ToString

func (f *From) ToString() string

func (*From) Transform

func (f *From) Transform(transform Process) *From

Transform from端转换msg

type Message

type Message interface {
	//Body message body
	Body() []byte
	Headers() textproto.MIMEHeader
	From() string
	//GetParam http.Request#FormValue
	GetParam(key string) string
	//SetMsg set RuleMsg
	SetMsg(msg *types.RuleMsg)
	//GetMsg 把接收数据转换成 RuleMsg
	GetMsg() *types.RuleMsg
	//SetStatusCode 响应 code
	SetStatusCode(statusCode int)
	//SetBody 响应 body
	SetBody(body []byte)
	//SetError 设置错误
	SetError(err error)
	//GetError 获取错误
	GetError() error
}

Message 接收端点数据抽象接口 不同输入源数据统一接口

type OnEvent added in v0.20.0

type OnEvent func(eventName string, params ...interface{})

OnEvent 监听事件函数

type Process

type Process func(router *Router, exchange *Exchange) bool

Process 处理函数 true:执行下一个处理器,否则不执行

type Router

type Router struct {

	//Config ruleEngine Config
	Config types.Config
	// contains filtered or unexported fields
}

Router 路由,抽象不同输入源数据路由 把消息从输入端(From),经过转换(Transform)成RuleMsg结构,或者处理Process,然后交给规则链处理(To) 或者 把消息从输入端(From),经过转换(Transform),然后处理响应(Process) 用法: http endpoint endpoint.NewRouter().From("/api/v1/msg/").Transform().To("chain:xx") endpoint.NewRouter().From("/api/v1/msg/").Transform().Process().To("chain:xx") endpoint.NewRouter().From("/api/v1/msg/").Transform().Process().To("component:nodeType") endpoint.NewRouter().From("/api/v1/msg/").Transform().Process() mqtt endpoint endpoint.NewRouter().From("#").Transform().Process().To("chain:xx") endpoint.NewRouter().From("topic").Transform().Process().To("chain:xx")

func NewRouter

func NewRouter(opts ...RouterOption) *Router

NewRouter 创建新的路由

func (*Router) Disable added in v0.14.0

func (r *Router) Disable(disable bool) *Router

Disable 设置状态 true:不可用,false:可以

func (*Router) From

func (r *Router) From(from string, configs ...types.Configuration) *From

func (*Router) FromToString

func (r *Router) FromToString() string

func (*Router) GetFrom

func (r *Router) GetFrom() *From

func (*Router) GetRuleGo added in v0.20.0

func (r *Router) GetRuleGo(exchange *Exchange) *rulego.RuleGo

func (*Router) IsDisable added in v0.14.0

func (r *Router) IsDisable() bool

IsDisable 是否是不可用状态 true:不可用,false:可以

type RouterOption

type RouterOption func(*Router) error

RouterOption 选项函数

func WithRuleConfig

func WithRuleConfig(config types.Config) RouterOption

WithRuleConfig 更改规则引擎配置

func WithRuleGo

func WithRuleGo(ruleGo *rulego.RuleGo) RouterOption

WithRuleGo 更改规则链池,默认使用rulego.DefaultRuleGo

func WithRuleGoFunc added in v0.20.0

func WithRuleGoFunc(f func(exchange *Exchange) *rulego.RuleGo) RouterOption

WithRuleGoFunc 动态获取规则链池函数

type To

type To struct {
	//toPath是否有占位符变量
	HasVars bool
	//Config to组件配置
	Config types.Configuration
	//Router router指针
	Router *Router
	//流转目标路径,例如"chain:{chainId}",则是交给规则引擎处理数据
	To string
	//去掉to执行器标记的路径
	ToPath string
	// contains filtered or unexported fields
}

To to端

func (*To) End

func (t *To) End() *Router

End 结束返回*Router

func (*To) Execute

func (t *To) Execute(ctx context.Context, exchange *Exchange)

Execute 执行To端逻辑

func (*To) GetProcessList

func (t *To) GetProcessList() []Process

GetProcessList 获取执行To端逻辑 处理器

func (*To) Process

func (t *To) Process(process Process) *To

Process 执行To端逻辑 后处理,如果规则链有多个结束点,则会执行多次

func (*To) RuleContextOption added in v0.20.0

func (t *To) RuleContextOption(opts ...types.RuleContextOption) *To

RuleContextOption 规则上下文配置

func (*To) ToString

func (t *To) ToString() string

func (*To) ToStringByDict

func (t *To) ToStringByDict(dict map[string]string) string

ToStringByDict 转换路径中的变量,并返回最终字符串

func (*To) Transform

func (t *To) Transform(transform Process) *To

Transform 执行To端逻辑 后转换,如果规则链有多个结束点,则会执行多次

func (*To) Wait added in v0.15.0

func (t *To) Wait() *To

Wait 等待规则链/组件执行结束,并恢复到父进程。同步得到规则链结果。 用于需要等待规则链执行结果,并且要保留父进程的场景,否则不需要设置该字段。例如:http的响应。

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL