event

package
v1.0.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 19, 2024 License: Apache-2.0 Imports: 9 Imported by: 0

Documentation

Index

Constants

View Source
const (
	GlobalMidIncrementKey = "Auto-Increment-Message-Mid-Global"
)

Variables

This section is empty.

Functions

func InitBus

func InitBus()

func Push

func Push()

Types

type EventBus

type EventBus struct {
	Bus sync.Map
}
var (
	Bus *EventBus
)

func (*EventBus) AddTopics

func (bus *EventBus) AddTopics(userKey string, topics ...string)

func (*EventBus) GetEventChannel

func (bus *EventBus) GetEventChannel(userKey string) *EventChannel

func (*EventBus) PubByTopic

func (bus *EventBus) PubByTopic(topic string, params []byte)

func (*EventBus) PubByUser

func (bus *EventBus) PubByUser(userKey string, params []byte)

func (*EventBus) RemoveTopics

func (bus *EventBus) RemoveTopics()

func (*EventBus) Sub

func (bus *EventBus) Sub(userKey string, topic *EventChannel) *EventChannel

type EventChannel

type EventChannel struct {
	// contains filtered or unexported fields
}

func NewTopic

func NewTopic() *EventChannel

func (*EventChannel) Chan

func (c *EventChannel) Chan() chan *EventData

type EventData

type EventData struct {
	// contains filtered or unexported fields
}

func NewEventData

func NewEventData(data []byte) *EventData

func NewParams

func NewParams(params []byte) *EventData

func (*EventData) GetData

func (c *EventData) GetData() []byte

type EventMonitorItem

type EventMonitorItem interface {
	ProgramID() string          //返回实例的名字
	Type() string               //返回实例的消息类型
	Send(topic ...string) error //发送消息到消息队列
}

type IMessage

type IMessage interface {
	GetId() uint64     //获取消息ID
	GetMethod() string //获取消息协议
	GetBytes() []byte  //获取消息内容
	GetData() string   //获取消息内容的字符串
}

type Item

type Item interface {
	MsgType() string    //消息一级类型
	MsgSubType() string //消息二级类型
	PubBus()            //发送数据到消息总线

	Set(k, v string) //设置
	Updates() error  //更新
}

type JsonBase

type JsonBase struct {
	Method     string      `json:"method"`
	From       string      `json:"from,omitempty"`       //调用者(登录人)
	To         string      `json:"to,omitempty"`         //被调者
	Origin     string      `json:"origin,omitempty"`     //来源域名(或Ip)
	PlatFormId string      `json:"platformId,omitempty"` //平台id
	Zone       int         `json:"zone,omitempty"`       //时区
	RemoteIP   string      `json:"remoteIP,omitempty"`   //被调者Ip
	JWT        interface{} `json:"jwt,omitempty"`        //来源jwt验证信息
}

type JsonBody

type JsonBody struct {
	Data string `json:"data,omitempty"`
}

type JsonError

type JsonError struct {
	Code    uint64      `json:"code"`
	Message string      `json:"message"`
	Data    interface{} `json:"data,omitempty"`
}

type JsonMsg

type JsonMsg struct {
	*JsonRequest
	*JsonParams
}

func AbleJsonMsg

func AbleJsonMsg() *JsonMsg

func (*JsonMsg) GetId

func (m *JsonMsg) GetId() uint64

func (*JsonMsg) GetMessage

func (m *JsonMsg) GetMessage() []byte

func (*JsonMsg) GetParams

func (m *JsonMsg) GetParams() interface{}

func (*JsonMsg) GetRequest

func (m *JsonMsg) GetRequest() *JsonRequest

func (*JsonMsg) PackDecode

func (m *JsonMsg) PackDecode(buf []byte, value interface{}) error

func (*JsonMsg) PackEncode

func (m *JsonMsg) PackEncode() []byte

func (*JsonMsg) SetMessage

func (m *JsonMsg) SetMessage(option *MsgOptions)

type JsonNotify

type JsonNotify struct {
	ProgramID     string          `json:"programId,omitempty"`     //系统程序id
	Id            uint64          `json:"id,omitempty"`            //消息id
	Type          MessageType     `json:"type,omitempty"`          //消息类型
	SubType       MessageSub      `json:"subType,omitempty"`       //一级类型(二级类型)
	Operator      MessageOperator `json:"operator,omitempty"`      //仅Data类型消息才有操作权限
	Timestamp     int64           `json:"timestamp,omitempty"`     //消息推送的时间戳
	KeepaliveTime time.Duration   `json:"keepaliveTime,omitempty"` //定时消息同步间隔时间
}

type JsonParams

type JsonParams struct {
	Params interface{} `json:"params,omitempty"`
}

type JsonRequest

type JsonRequest struct {
	ID uint64 `json:"id"`
	*JsonBase
}

type Message

type Message[T MsgAble] struct {
	Able     T
	MqRabbit *mq.RabbitMQ
	MqRedis  *redis.Client
	Opts     *MsgOptions
}

func NewMessage

func NewMessage[T MsgAble](able T, opts ...MsgOption) *Message[T]

func (*Message[T]) GetBytes

func (m *Message[T]) GetBytes() []byte

func (*Message[T]) GetData

func (m *Message[T]) GetData() string

func (*Message[T]) GetId

func (m *Message[T]) GetId() uint64

func (*Message[T]) GetMethod

func (m *Message[T]) GetMethod() string

func (*Message[T]) Pack

func (m *Message[T]) Pack()

func (*Message[T]) ProgramID

func (m *Message[T]) ProgramID() string

func (*Message[T]) RabbitMq

func (m *Message[T]) RabbitMq(conf *gofkConf.MqConfig)

func (*Message[T]) RedisMq

func (m *Message[T]) RedisMq(conf *gofkConf.RedisConfig)

func (*Message[T]) Send

func (m *Message[T]) Send(topic ...string) error

Send 向指定的消息队列发送消息

func (*Message[T]) Type

func (m *Message[T]) Type() string

func (*Message[T]) UnPackFromBytes

func (m *Message[T]) UnPackFromBytes(msg []byte) *Message[T]

type MessageOperator

type MessageOperator string
const (
	AddIncrement  MessageOperator = "addIncrement"
	EditIncrement MessageOperator = "editIncrement"
	QueryOperator MessageOperator = "queryOperator"
)

type MessageSub

type MessageSub string

type MessageType

type MessageType string
const (
	MonitorHeartBeat   MessageType = "HeartBeat" //监控心跳
	WebSocketKeepalive MessageType = "webSocketKeepalive"
	DataSync           MessageType = "DataSync"     //数据同步(增量递增,单位:天)
	DataResponse       MessageType = "DataResponse" //数据响应
)

type MsgAble

type MsgAble interface {
	*JsonMsg | *XMLMsg
	PackageMsg
}

type MsgOption

type MsgOption func(options *MsgOptions)

func WithId

func WithId(id uint64) MsgOption

func WithJsonNotify

func WithJsonNotify(notify *JsonNotify) MsgOption

func WithMethod

func WithMethod(method string) MsgOption

func WithParams

func WithParams(params interface{}) MsgOption

func WithRequest

func WithRequest(request *JsonRequest) MsgOption

type MsgOptions

type MsgOptions struct {
	Id uint64
	*JsonParams
	*JsonBody
	*JsonNotify
	*JsonBase
	*JsonError
	*XMLHeader
	*XMLBody
}

type PackageMsg

type PackageMsg interface {
	PackEncode() []byte
	PackDecode(data []byte, value interface{}) error

	SetMessage(options *MsgOptions)
	GetMessage() []byte
}

type Subject

type Subject interface {
	NotifyUp() error
	EventUp() error
}

type XMLBody

type XMLBody struct {
	Items []struct {
		Data string `xml:",innerxml"`
	} `xml:"items"`
}

type XMLHeader

type XMLHeader struct {
	Id      uint64 `xml:"id"`      //当前消息ID
	Type    string `xml:"msgType"` //消息类型一级类
	SubType string `xml:"subType"` //消息子类型(二级分类)

	//执行的操作[ack,select,increment]
	//ack:当前消息为编辑响应消息
	//select:当前消息为查询响应消息
	//increment:当前消息为增量推送消息
	Operator string `xml:"operator"`
}

type XMLMsg

type XMLMsg struct {
	XMLName xml.Name   `xml:"Msg"`
	Head    *XMLHeader `xml:"Head"`
	Body    *XMLBody   `xml:"Body"`
}

func AbleXmlMsg

func AbleXmlMsg() *XMLMsg

func (*XMLMsg) GetMessage

func (m *XMLMsg) GetMessage() []byte

func (*XMLMsg) Marshal

func (m *XMLMsg) Marshal() []byte

func (*XMLMsg) PackDecode

func (m *XMLMsg) PackDecode(data []byte, value interface{}) error

func (*XMLMsg) PackEncode

func (m *XMLMsg) PackEncode() []byte

func (*XMLMsg) SetMessage

func (m *XMLMsg) SetMessage(option *MsgOptions)

func (*XMLMsg) UnMarshal

func (m *XMLMsg) UnMarshal(buf []byte) (PackageMsg, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL