Documentation ¶
Index ¶
- Constants
- Variables
- func GenerateSessionID(n int) ([]byte, error)
- func IsClientSentOp(op events.Opcode) bool
- type Cache
- type Connection
- type EventBuffer
- type EventChannel
- type EventMap
- func (e *EventMap) Count() int32
- func (e *EventMap) Destroy(gctx global.Context)
- func (e *EventMap) DispatchChannel() chan []byte
- func (e *EventMap) Get(t events.EventType) (*EventChannel, bool)
- func (e *EventMap) Subscribe(gctx global.Context, ctx context.Context, t events.EventType, ...) (EventChannel, uint32, error)
- func (e *EventMap) Unsubscribe(gctx global.Context, t events.EventType, cond map[string]string) (uint32, error)
- func (e *EventMap) UnsubscribeWithID(id ...uint32) error
- type EventSubscriptionProperties
- type Handler
- type StoredSubscription
- type Transport
Constants ¶
View Source
const ( EVENT_TYPE_MAX_LENGTH = 64 SUBSCRIPTION_CONDITION_MAX = 10 SUBSCRIPTION_CONDITION_KEY_MAX_LENGTH = 64 SUBSCRIPTION_CONDITION_VALUE_MAX_LENGTH = 128 )
Variables ¶
View Source
var ( ErrBufferClosed = errors.New("event buffer closed") ErrNotRecoverable = errors.New("session is not recoverable") )
View Source
var ( ErrAlreadySubscribed = fmt.Errorf("already subscribed") ErrNotSubscribed = fmt.Errorf("not subscribed") )
Functions ¶
func GenerateSessionID ¶
func IsClientSentOp ¶
Types ¶
type Cache ¶
type Connection ¶
type Connection interface { Context() context.Context // Retrieve the hex-encoded ID of this session SessionID() string // Greet sends an Hello message to the client Greet(gctx global.Context) error // Listen for incoming and outgoing events Read(gctx global.Context) // SendHeartbeat lets the client know that the connection is healthy SendHeartbeat() error // SendAck sends an Ack message to the client SendAck(cmd events.Opcode, data json.RawMessage) error // SendError publishes an error message to the client SendError(txt string, fields map[string]any) // Write sends a message to the client Write(msg events.Message[json.RawMessage]) error // Actor returns the authenticated user for this connection Actor() *structures.User // Handler returns a utility to handle commands for the connection Handler() Handler // Subscriptions returns an instance of Events Events() *EventMap // Cache returns the connection's cache utility Cache() Cache // Buffer returns the connection's event buffer utility for resuming the session Buffer() EventBuffer // Ready returns a channel that is closed when the connection is ready OnReady() <-chan struct{} // OnClose returns a channel that is closed when the connection is closed OnClose() <-chan struct{} // Close sends a close frame with the specified code and ends the connection SendClose(code events.CloseCode, after time.Duration) // SetWriter defines the connection's writable stream (SSE only) SetWriter(w *bufio.Writer, f http.Flusher) // Return the name of the transport used by this connection Transport() Transport }
type EventBuffer ¶
type EventBuffer interface { Context() context.Context // Start begins tracking events and subscriptions Start(gctx global.Context) error // Push messages to the buffer Push(gctx global.Context, msg events.Message[events.DispatchPayload]) error // Recover retrieves the buffer from the previous session Recover(gctx global.Context) (eventList []events.Message[events.DispatchPayload], subList []StoredSubscription, err error) // Cleanup clears out redis keys Cleanup(gctx global.Context) error }
EventBuffer handles the buffering of events and subscriptions in the event a connection drops
This allows a grace period where a client may resume their session and recover missed events and subscriptions
func NewEventBuffer ¶
func NewEventBuffer(conn Connection, sessionID string, ttl time.Duration) EventBuffer
type EventChannel ¶
type EventChannel struct { ID []uint32 `json:"id"` Conditions []events.EventCondition `json:"conditions"` Properties []EventSubscriptionProperties `json:"properties"` // contains filtered or unexported fields }
func (EventChannel) Match ¶
func (ec EventChannel) Match(cond []events.EventCondition) []uint32
type EventMap ¶
type EventMap struct {
// contains filtered or unexported fields
}
func NewEventMap ¶
func (*EventMap) DispatchChannel ¶
func (*EventMap) Subscribe ¶
func (e *EventMap) Subscribe( gctx global.Context, ctx context.Context, t events.EventType, cond events.EventCondition, props EventSubscriptionProperties, ) (EventChannel, uint32, error)
Subscribe sets up a subscription to dispatch events with the specified type
func (*EventMap) Unsubscribe ¶
func (*EventMap) UnsubscribeWithID ¶
type Handler ¶
type Handler interface { Subscribe(gctx global.Context, m events.Message[json.RawMessage]) (error, bool) Unsubscribe(gctx global.Context, m events.Message[json.RawMessage]) error OnDispatch(gctx global.Context, msg events.Message[events.DispatchPayload]) OnResume(gctx global.Context, msg events.Message[json.RawMessage]) error OnBridge(gctx global.Context, msg events.Message[json.RawMessage]) error }
func NewHandler ¶
func NewHandler(conn Connection) Handler
type StoredSubscription ¶
type StoredSubscription struct { Type events.EventType `json:"type"` Channel EventChannel `json:"channel"` }
Click to show internal directories.
Click to hide internal directories.