natsrpc

package module
v0.0.0-...-d3db74c Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 12, 2023 License: MIT Imports: 18 Imported by: 0

README

natsrpc

a rpc framework base on nats

usage

gate

mgr := &clientMgr{
    clients: make(map[natsrpc.Session]struct{}),
}

g, err := stub.NewGate(100, *addr, natsrpc.Config{Nats: "127.0.0.1:4222"})
if err != nil {
    fmt.Println(err)
    return
}

g.RegisterNetWorkEvent(func(conn natsrpc.Session) {
    mgr.clients[conn] = struct{}{}
}, func(conn natsrpc.Session) {
    delete(mgr.clients, conn)
})
g.RouteSessionMsg((*pb.ReqHello)(nil), BServerID)

g.Run()

server

s, err := stub.NewServer(101, natsrpc.Config{Nats: "127.0.0.1:4222"})
if err != nil {
    fmt.Println(err)
    return
}

//注册客户端消息事件handler
s.RegisterSessionMsgHandler(func(client engine.Session, req *pb.ReqHello) {
    resp := &pb.RespHello{Name: "回复您的请求"}
    fmt.Println("收到客户端来的消息:", req.Name)
    client.SendMsg(resp)
})

//注册send事件handler
s.RegisterServerHandler(func(server engine.Server, req *pb.ReqSend) {
    fmt.Println("收到gate来的消息:", req.Name)
})

//注册request事件handler
s.RegisterRequestMsgHandler(func(server engine.RequestServer, req *pb.ReqRequest) {
    resp := &pb.RespRequest{Name: "我是request返回消息"}
    fmt.Println("收到gate来的request消息:", req.Name)
    server.Answer(resp)
})

//注册call事件handler
s.RegisterRequestMsgHandler(func(server engine.RequestServer, req *pb.ReqCall) {
    resp := &pb.RespCall{Name: "我是call返回消息"}
    fmt.Println("收到gate来的call消息:", req.Name)
    server.Answer(resp)
})

s.Run()

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func CRC32Hash

func CRC32Hash(s string) uint32

func CheckArgs1MsgFun

func CheckArgs1MsgFun(cb interface{}) (err error, funValue reflect.Value, msgType reflect.Type)

func IsExists

func IsExists(path string) bool

func ProtoHash

func ProtoHash(msg proto.Message) (uint32, reflect.Type)

func StringHash

func StringHash(s string) (hash uint16)

字符串转为16位整形哈希

Types

type Client

type Client struct {
	// contains filtered or unexported fields
}

func NewClient

func NewClient(conn Conn, mgr *Mgr) *Client

func (*Client) Close

func (p *Client) Close()

func (*Client) ID

func (p *Client) ID() int32

func (*Client) OnClose

func (p *Client) OnClose()

func (*Client) OnNew

func (p *Client) OnNew()

func (*Client) ReadLoop

func (p *Client) ReadLoop()

func (*Client) SendMsg

func (p *Client) SendMsg(msg proto.Message)

func (*Client) SendRawMsg

func (p *Client) SendRawMsg(msgID uint32, data []byte)

type Config

type Config struct {
	Nats string `json:"nats"`
}

func ReadConfig

func ReadConfig(filename string) (*Config, error)

type Conn

type Conn interface {
	ReadMsg() ([]byte, error)
	WriteMsg(args []byte) error
	LocalAddr() net.Addr
	ID() int32
	Close()
}

type Mgr

type Mgr struct {
	// contains filtered or unexported fields
}

func NewMgr

func NewMgr(wsAddr string, worker Worker) *Mgr

func (*Mgr) Close

func (p *Mgr) Close()

func (*Mgr) GetSession

func (p *Mgr) GetSession(sesID int32) (Session, bool)

func (*Mgr) ListenAddr

func (p *Mgr) ListenAddr() *net.TCPAddr

func (*Mgr) Post

func (p *Mgr) Post(f func())

func (*Mgr) RegisterEvent

func (p *Mgr) RegisterEvent(onNew, onClose func(conn Session))

func (*Mgr) RegisterRawSessionMsgHandler

func (p *Mgr) RegisterRawSessionMsgHandler(msg proto.Message, handler MsgHandler)

func (*Mgr) RegisterSessionMsgHandler

func (p *Mgr) RegisterSessionMsgHandler(cb interface{})

func (*Mgr) Run

func (p *Mgr) Run()

type MsgHandler

type MsgHandler func(client Session, msg proto.Message)

type MsgInfo

type MsgInfo struct {
	// contains filtered or unexported fields
}

type Processor

type Processor struct {
	// contains filtered or unexported fields
}

func NewProcessor

func NewProcessor() *Processor

func (*Processor) Encode

func (p *Processor) Encode(msgID uint32, data []byte) []byte

func (*Processor) Handle

func (p *Processor) Handle(msg proto.Message, client Session) error

func (*Processor) Marshal

func (p *Processor) Marshal(msg proto.Message) ([]byte, error)

func (*Processor) RegisterSessionMsgHandler

func (p *Processor) RegisterSessionMsgHandler(msg proto.Message, handler MsgHandler)

func (*Processor) SetByteOrder

func (p *Processor) SetByteOrder(littleEndian bool)

func (*Processor) Unmarshal

func (p *Processor) Unmarshal(data []byte) (proto.Message, error)

type Session

type Session interface {
	SendMsg(msg proto.Message)
	SendRawMsg(msgID uint32, data []byte)
	ID() int32
	Close()
}

type Work

type Work struct {
	// contains filtered or unexported fields
}

func (*Work) AfterPost

func (p *Work) AfterPost(d time.Duration, f func()) *time.Timer

func (*Work) Close

func (p *Work) Close()

func (*Work) Len

func (p *Work) Len() int

func (*Work) NewTicker

func (p *Work) NewTicker(d time.Duration, f func()) io.Closer

func (*Work) NewTryTicker

func (p *Work) NewTryTicker(d time.Duration, maxLen int, f func()) *time.Ticker

worker长度超过maxLen就丢弃f

func (*Work) Post

func (p *Work) Post(f func())

func (*Work) Run

func (p *Work) Run()

func (*Work) TryPost

func (p *Work) TryPost(f func(), maxLen int)

type Worker

type Worker interface {
	Post(f func())
	Run()
	Close()
	AfterPost(duration time.Duration, f func()) *time.Timer
	NewTicker(d time.Duration, f func()) io.Closer
	Len() int
}

func NewWorker

func NewWorker() Worker

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL