client

package
v0.0.0-...-f2b9f5f Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 22, 2024 License: Apache-2.0 Imports: 17 Imported by: 0

Documentation

Index

Constants

View Source
const (
	EVENT_TYPE_MAX_LENGTH                   = 64
	SUBSCRIPTION_CONDITION_MAX              = 10
	SUBSCRIPTION_CONDITION_KEY_MAX_LENGTH   = 64
	SUBSCRIPTION_CONDITION_VALUE_MAX_LENGTH = 128
)

Variables

View Source
var (
	ErrBufferClosed   = errors.New("event buffer closed")
	ErrNotRecoverable = errors.New("session is not recoverable")
)
View Source
var (
	ErrAlreadySubscribed = fmt.Errorf("already subscribed")
	ErrNotSubscribed     = fmt.Errorf("not subscribed")
)

Functions

func GenerateSessionID

func GenerateSessionID(n int) ([]byte, error)

func IsClientSentOp

func IsClientSentOp(op events.Opcode) bool

Types

type Cache

type Cache interface {
	AddDispatch(h uint32) bool
	HasDispatch(h uint32) bool
	ExpireDispatch(h uint32)
}

func NewCache

func NewCache() Cache

type Connection

type Connection interface {
	Context() context.Context
	// Retrieve the hex-encoded ID of this session
	SessionID() string
	// Greet sends an Hello message to the client
	Greet(gctx global.Context) error
	// Listen for incoming and outgoing events
	Read(gctx global.Context)
	// SendHeartbeat lets the client know that the connection is healthy
	SendHeartbeat() error
	// SendAck sends an Ack message to the client
	SendAck(cmd events.Opcode, data json.RawMessage) error
	// SendError publishes an error message to the client
	SendError(txt string, fields map[string]any)
	// Write sends a message to the client
	Write(msg events.Message[json.RawMessage]) error
	// Actor returns the authenticated user for this connection
	Actor() *structures.User
	// Handler returns a utility to handle commands for the connection
	Handler() Handler
	// Subscriptions returns an instance of Events
	Events() *EventMap
	// Cache returns the connection's cache utility
	Cache() Cache
	// Buffer returns the connection's event buffer utility for resuming the session
	Buffer() EventBuffer
	// Ready returns a channel that is closed when the connection is ready
	OnReady() <-chan struct{}
	// OnClose returns a channel that is closed when the connection is closed
	OnClose() <-chan struct{}
	// Close sends a close frame with the specified code and ends the connection
	SendClose(code events.CloseCode, after time.Duration)
	// SetWriter defines the connection's writable stream (SSE only)
	SetWriter(w *bufio.Writer, f http.Flusher)
	// Return the name of the transport used by this connection
	Transport() Transport
}

type EventBuffer

type EventBuffer interface {
	Context() context.Context
	// Start begins tracking events and subscriptions
	Start(gctx global.Context) error
	// Push messages to the buffer
	Push(gctx global.Context, msg events.Message[events.DispatchPayload]) error
	// Recover retrieves the buffer from the previous session
	Recover(gctx global.Context) (eventList []events.Message[events.DispatchPayload], subList []StoredSubscription, err error)
	// Cleanup clears out redis keys
	Cleanup(gctx global.Context) error
}

EventBuffer handles the buffering of events and subscriptions in the event a connection drops

This allows a grace period where a client may resume their session and recover missed events and subscriptions

func NewEventBuffer

func NewEventBuffer(conn Connection, sessionID string, ttl time.Duration) EventBuffer

type EventChannel

type EventChannel struct {
	ID         []uint32                      `json:"id"`
	Conditions []events.EventCondition       `json:"conditions"`
	Properties []EventSubscriptionProperties `json:"properties"`
	// contains filtered or unexported fields
}

func (EventChannel) Match

func (ec EventChannel) Match(cond []events.EventCondition) []uint32

type EventMap

type EventMap struct {
	// contains filtered or unexported fields
}

func NewEventMap

func NewEventMap(sessionID string) *EventMap

func (*EventMap) Count

func (e *EventMap) Count() int32

func (*EventMap) Destroy

func (e *EventMap) Destroy(gctx global.Context)

func (*EventMap) DispatchChannel

func (e *EventMap) DispatchChannel() chan []byte

func (*EventMap) Get

func (e *EventMap) Get(t events.EventType) (*EventChannel, bool)

func (*EventMap) Subscribe

Subscribe sets up a subscription to dispatch events with the specified type

func (*EventMap) Unsubscribe

func (e *EventMap) Unsubscribe(gctx global.Context, t events.EventType, cond map[string]string) (uint32, error)

func (*EventMap) UnsubscribeWithID

func (e *EventMap) UnsubscribeWithID(id ...uint32) error

type EventSubscriptionProperties

type EventSubscriptionProperties struct {
	TTL  time.Time
	Auto bool
}

type Handler

type Handler interface {
	Subscribe(gctx global.Context, m events.Message[json.RawMessage]) (error, bool)
	Unsubscribe(gctx global.Context, m events.Message[json.RawMessage]) error
	OnDispatch(gctx global.Context, msg events.Message[events.DispatchPayload])
	OnResume(gctx global.Context, msg events.Message[json.RawMessage]) error
	OnBridge(gctx global.Context, msg events.Message[json.RawMessage]) error
}

func NewHandler

func NewHandler(conn Connection) Handler

type StoredSubscription

type StoredSubscription struct {
	Type    events.EventType `json:"type"`
	Channel EventChannel     `json:"channel"`
}

type Transport

type Transport string
const (
	TransportWebSocket   Transport = "WebSocket"
	TransportEventStream Transport = "EventStream"
)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL