redis

package
v0.0.0-...-c46648f Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 10, 2024 License: MIT Imports: 15 Imported by: 0

README

redis adapter

see from socket.io-redis-adapter

1.communication method

1.1 Communication between nodes use the redis subscription model
1.2 And Multi-node layout uses nginx.see https://socket.io/zh-CN/docs/v4/reverse-proxy/#nginx or see nginx_config_test file;
This is just a simple example. For specific situations, you need to add logic to your project yourself.

error

_onpacket function no ack function reback in cluder node in the cluster

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	HandMessagePool sync.Pool
	RequestIdPool   sync.Pool
)

Functions

This section is empty.

Types

type AckRequest

type AckRequest interface {
	ClientCountCallback(clientCount uint64)
	Ack([]any, error)
}

type HandMessage

type HandMessage struct {
	LocalHandMessage
	Channal   chan any     `json:"channal"` // 接受其他节点反馈的内容通道 socket 和 data
	MsgCount  atomic.Int32 `json:"msg_count"`
	CloseFlag atomic.Int32 `json:"close_flag"` // 关闭 HandMessage channel 的标志
}

HandMessage message processing unit use sync.pool Recycle

func (*HandMessage) Recycle

func (h *HandMessage) Recycle()

type LocalHandMessage

type LocalHandMessage struct {
	// Uid is each service unique id
	Uid string `json:"uid"`
	// Sid is socket id
	Sid         socket.SocketId             `json:"sid"`
	Type        SocketDataType              `json:"type"`
	RequestId   string                      `json:"request_id"` // every request id
	Rooms       []socket.Room               `json:"rooms"`
	Opts        *socket.BroadcastOptions    `json:"opts"`
	Close       bool                        `json:"close"`
	Sockets     []RemoteSocket              `json:"sockets"` // bool or []socket.Socket
	SocketIds   *types.Set[socket.SocketId] `json:"socket_ids"`
	Packet      *parser.Packet              `json:"packet"`
	ClientCount uint64                      `json:"client_count"`
	Responses   []any                       `json:"responses"`
	Data        any                         `json:"data"`
}

this is the redis‘s information passed between channels

func (LocalHandMessage) MarshalJSON

func (l LocalHandMessage) MarshalJSON() ([]byte, error)

func (*LocalHandMessage) UnmarshalJSON

func (h *LocalHandMessage) UnmarshalJSON(data []byte) error

type LocalHandMessageJson

type LocalHandMessageJson struct {
	Uid       string          `json:"uid"`
	Sid       socket.SocketId `json:"sid"`
	Type      SocketDataType  `json:"type"`
	RequestId string          `json:"request_id"`
	Rooms     []socket.Room   `json:"rooms"`
	Opts      struct {
		Rooms  map[socket.Room]types.Void `json:"rooms,omitempty"`
		Except map[socket.Room]types.Void `json:"except,omitempty"`
		Flags  *socket.BroadcastFlags     `json:"flags,omitempty"`
	} `json:"opts"`
	Close       bool                           `json:"close"`
	Sockets     []RemoteSocket                 `json:"sockets"` // bool or []socket.Socket
	SocketIds   map[socket.SocketId]types.Void `json:"socket_ids"`
	Packet      *parser.Packet                 `json:"packet"`
	ClientCount uint64                         `json:"client_count"`
	Responses   []any                          `json:"responses"`
	Data        any                            `json:"data"`
}

type set can not json ,temporary processing @review

type Option

type Option func(*option)

func WithRedisAddress

func WithRedisAddress(ads string) Option

WithRedisAddress eg : 127.0.0.1:6379

func WithRedisDb

func WithRedisDb(db int) Option

func WithRedisHeartbeatInterval

func WithRedisHeartbeatInterval(tm int) Option

func WithRedisHeartbeatTimeout

func WithRedisHeartbeatTimeout(tm int) Option

type RedisAdapter

type RedisAdapter struct {
	events.EventEmitter

	// The number of ms between two heartbeats.
	// 5000
	HeartbeatInterval int

	// The number of ms without heartbeat before we consider a node down.
	// 10000
	HeartbeatTimeout int

	Subs  []*redis.PubSub
	PSubs []*redis.PubSub
	// contains filtered or unexported fields
}

func NewRedisAdapter

func NewRedisAdapter(opts ...Option) (*RedisAdapter, error)

func (*RedisAdapter) AddAll

func (r *RedisAdapter) AddAll(id socket.SocketId, rooms *types.Set[socket.Room])

Adds a socket to a list of room.

func (*RedisAdapter) AddSockets

func (r *RedisAdapter) AddSockets(opts *socket.BroadcastOptions, rooms []socket.Room)

Makes the matching socket instances join the specified rooms

func (*RedisAdapter) Broadcast

func (r *RedisAdapter) Broadcast(packet *parser.Packet, opts *socket.BroadcastOptions)

Broadcasts a packet.

Options:

  • `Flags` {*BroadcastFlags} flags for this packet
  • `Except` {*types.Set[Room]} sids that should be excluded
  • `Rooms` {*types.Set[Room]} list of rooms to broadcast to

func (*RedisAdapter) BroadcastWithAck

func (r *RedisAdapter) BroadcastWithAck(packet *parser.Packet, opts *socket.BroadcastOptions, clientCountCallback func(uint64), ack func([]any, error))

Broadcasts a packet and expects multiple acknowledgements.

Options:

  • `Flags` {*BroadcastFlags} flags for this packet
  • `Except` {*types.Set[Room]} sids that should be excluded
  • `Rooms` {*types.Set[Room]} list of rooms to broadcast to

func (*RedisAdapter) Close

func (r *RedisAdapter) Close()

To be overridden

func (*RedisAdapter) Del

func (r *RedisAdapter) Del(id socket.SocketId, room socket.Room)

Removes a socket from a room.

func (*RedisAdapter) DelAll

func (r *RedisAdapter) DelAll(id socket.SocketId)

Removes a socket from all rooms it's joined.

func (*RedisAdapter) DelSockets

func (r *RedisAdapter) DelSockets(opts *socket.BroadcastOptions, rooms []socket.Room)

Makes the matching socket instances leave the specified rooms

func (*RedisAdapter) DisconnectSockets

func (r *RedisAdapter) DisconnectSockets(opts *socket.BroadcastOptions, close bool)

Makes the matching socket instances disconnect

func (*RedisAdapter) FetchSockets

func (r *RedisAdapter) FetchSockets(opts *socket.BroadcastOptions) func(func([]socket.SocketDetails, error))

Returns the matching socket instances

func (*RedisAdapter) GetBroadcast

func (r *RedisAdapter) GetBroadcast() func(*parser.Packet, *socket.BroadcastOptions)

func (*RedisAdapter) Init

func (r *RedisAdapter) Init()

func (*RedisAdapter) New

func (*RedisAdapter) Nsp

func (*RedisAdapter) PersistSession

func (r *RedisAdapter) PersistSession(s *socket.SessionToPersist)

Save the client session in order to restore it upon reconnection.

func (*RedisAdapter) RestoreSession

func (r *RedisAdapter) RestoreSession(id socket.PrivateSessionId, pack string) (*socket.Session, error)

Restore the session and find the packets that were missed by the client.

func (*RedisAdapter) Rooms

func (*RedisAdapter) ServerCount

func (r *RedisAdapter) ServerCount() int64

Returns the number of Socket.IO servers in the cluster Number of subscriptions to requestChannel

func (*RedisAdapter) ServerSideEmit

func (r *RedisAdapter) ServerSideEmit(packet []any) error

Send a packet to the other Socket.IO servers in the cluster this is globe packet packet is append([]any{ev}, args...) this adapter does not support the ServerSideEmit() functionality

func (*RedisAdapter) SetBroadcast

func (r *RedisAdapter) SetBroadcast(broadcast func(*parser.Packet, *socket.BroadcastOptions))

func (*RedisAdapter) Sids

func (*RedisAdapter) SocketRooms

func (r *RedisAdapter) SocketRooms(id socket.SocketId) *types.Set[socket.Room]

Gets the list of rooms a given socket has joined.

func (*RedisAdapter) Sockets

func (r *RedisAdapter) Sockets(room *types.Set[socket.Room]) *types.Set[socket.SocketId]

Gets a list of sockets by sid.

type RemoteSocket

type RemoteSocket struct {
	Id        socket.SocketId         `json:"id"`
	Handshake *socket.Handshake       `json:"handshake"`
	Rooms     *types.Set[socket.Room] `json:"rooms"`
	Data      any                     `json:"data"`
}

type SocketDataType

type SocketDataType int
const (
	// MessageType
	INITIAL_HEARTBEAT SocketDataType = iota + 1
	HEARTBEAT
	BROADCAST
	SOCKETS_JOIN
	SOCKETS_LEAVE
	DISCONNECT_SOCKETS
	FETCH_SOCKETS
	FETCH_SOCKETS_RESPONSE
	SERVER_SIDE_EMIT
	SERVER_SIDE_EMIT_RESPONSE
	BROADCAST_CLIENT_COUNT
	BROADCAST_ACK

	// RequestType
	SOCKETS SocketDataType = iota + 1
	ALL_ROOMS
	REMOTE_JOIN
	REMOTE_LEAVE
	REMOTE_DISCONNECT
	REMOTE_FETCH
	Request_SERVER_SIDE_EMIT
	Request_BROADCAST
	Request_BROADCAST_CLIENT_COUNT
	Request_BROADCAST_ACK
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL