natsrpc

package
v0.0.0-...-1a2806f Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 5, 2022 License: MIT Imports: 18 Imported by: 0

README

连接nats集群

选择一台nats连接 a服务 a0 a1 a2 b服务 b0 b1 b2 同时连接到nats

stream创建6条 a0 a1 a2 b0 b1 b2 a0 sub ag b[0-2].a0 a1 sub ag b[0-2].a1 a2 sub ag b[0-2].a2

b0 sub bg a[0-2].b0 b1 sub bg a[0-2].b1 b2 sub bg a[0-2].b2

逻辑处理 a0<->b0 a0 pub a0.b0 b0 pub b0.a0

逻辑处理 a0广播->b0 b1 b2 a0 pub bg

逻辑处理 a0广播->a0 a1 a2 a0 pub ag

逻辑处理 b0广播->a0 a1 a2 b0 pub ag

逻辑处理 b0广播->b0 b1 b2 b0 pub bg

client <-10x2-> nginx <-5x2-> logic <-5x2-> nats <-5x2-> gamedb <-> mysql

Documentation

Index

Constants

View Source
const NatsServiceKey = "gameconfig"

Variables

This section is empty.

Functions

func NewConn

func NewConn(conn *nats.Conn, conf *NatsConfig) network.IConn

NewConn create websocket conn

Types

type Conn

type Conn struct {
	sync.Mutex

	AsyncMaxPending int
	// contains filtered or unexported fields
}

Conn nats conn define

func (*Conn) Close

func (c *Conn) Close()

Close grpc conn close

func (*Conn) Destroy

func (c *Conn) Destroy()

Destroy grpc conn destory

func (*Conn) DirectErrorMsg

func (c *Conn) DirectErrorMsg(m *ss.Message, err error) *ss.Message

func (*Conn) GetKVBucket

func (c *Conn) GetKVBucket() (nats.KeyValue, error)

func (*Conn) GetStreamName

func (c *Conn) GetStreamName(msgType, srcType, srcID uint32) string

func (*Conn) GetSubject

func (c *Conn) GetSubject(msgType, destType, destID, srcType, srcID uint32) string

func (*Conn) LocalAddr

func (c *Conn) LocalAddr() net.Addr

LocalAddr get local addr

func (*Conn) ReadMessage

func (c *Conn) ReadMessage() (interface{}, error)

ReadMessage goroutine not safe

func (*Conn) RegisterService

func (c *Conn) RegisterService(srcType, srcID uint32) error

func (*Conn) RegisterStream

func (c *Conn) RegisterStream(msgType, srcType, srcID uint32) error

func (*Conn) RegisterSubject

func (c *Conn) RegisterSubject(msgType, srcType, srcID uint32) error

func (*Conn) RemoteAddr

func (c *Conn) RemoteAddr() net.Addr

RemoteAddr get remote addr

func (*Conn) Reply

func (c *Conn) Reply(m *ss.Message) (*ss.Message, error)

func (*Conn) Request

func (c *Conn) Request(subject string, msg *ss.Message) (*ss.Message, error)

func (*Conn) ResetStream

func (c *Conn) ResetStream() error

func (*Conn) SubscribeToNats

func (c *Conn) SubscribeToNats(name, subject string) (*nats.Subscription, error)

func (*Conn) SubscribeToReply

func (c *Conn) SubscribeToReply(name, subject string)

func (*Conn) SubscribeToStream

func (c *Conn) SubscribeToStream(name, subject string) (*nats.Subscription, error)

func (*Conn) WriteMessage

func (c *Conn) WriteMessage(args ...interface{}) error

WriteMessage args must not be modified by the others goroutines

type INatsStream

type INatsStream interface {
	Send(*ss.Message) error
	Recv() (*ss.Message, error)
	// Context returns the context for this stream.
	Context() nats.JetStreamContext
	Close()
}

INatsStream define rpc stream interface

type NatsConfig

type NatsConfig struct {
	ServiceType         int            `mapstructure:"service_type"`
	ServiceID           int            `mapstructure:"service_id"`
	Nats                []string       `mapstructure:"nats"`
	Services            []ServiceGroup `mapstructure:"services"`
	PingInterval        time.Duration  `mapstructure:"ping_interval"`
	MaxPingsOutstanding int            `mapstructure:"max_ping_outstanding"`
	QueueSize           int            `mapstructure:"queue_size"`
	SocketQueueSize     int            `mapstructure:"socket_queue_size"`
	AsyncMaxPending     uint32         `mapstructure:"async_max_pending"`
	WorkerPoolCapacity  int            `mapstructure:"worker_pool_capacity"`
	WorkerPoolQueueSize int            `mapstructure:"worker_pool_queue_size"`
}

NatsConfig grpc client config

func (*NatsConfig) GetQueueSize

func (c *NatsConfig) GetQueueSize() int

GetQueueSize get module queue size

type NatsRPC

type NatsRPC struct {
	sync.Mutex
	Conf       *NatsConfig
	NewAgent   network.AgentCreateFunc
	CloseAgent network.AgentCloseFunc
	// contains filtered or unexported fields
}

NatsRPC nats PRC

func NewNatsRPC

func NewNatsRPC(conf config.IConfig, agentFunc network.AgentCreateFunc, agentCloseFunc network.AgentCloseFunc) *NatsRPC

NewNatsRPC create nats client

func (*NatsRPC) Close

func (c *NatsRPC) Close()

Close client connections

func (*NatsRPC) DisconnectError

func (c *NatsRPC) DisconnectError(nc *nats.Conn, err error)

func (*NatsRPC) ErrorHandler

func (c *NatsRPC) ErrorHandler(nc *nats.Conn, sub *nats.Subscription, err error)

func (*NatsRPC) GetAgent

func (c *NatsRPC) GetAgent() network.IAgent

func (*NatsRPC) GetHashValue

func (c *NatsRPC) GetHashValue(destType uint32, value uint64) uint32

func (*NatsRPC) LoadServiceInfo

func (c *NatsRPC) LoadServiceInfo(os nats.KeyValue, localInfo *ServiceGroup) error

func (*NatsRPC) Reconnect

func (c *NatsRPC) Reconnect(nc *nats.Conn)

func (*NatsRPC) RegisterConfig

func (c *NatsRPC) RegisterConfig() error

func (*NatsRPC) Run

func (c *NatsRPC) Run()

Run client start run

func (*NatsRPC) Wait

func (c *NatsRPC) Wait()

type ServiceGroup

type ServiceGroup struct {
	Key     string `mapstructure:"key" json:"key"`
	Type    int    `mapstructure:"type" json:"type"`
	Version int    `mapstructure:"version" json:"version"`
	Mode    int    `mapstructure:"mode" json:"mode"`
	Hash    []int  `mapstructure:"hash"  json:"hash"`
}

ServiceGroup service group info

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL