sync3

package
v0.7.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 6, 2022 License: Apache-2.0 Imports: 17 Imported by: 0

Documentation

Index

Constants

View Source
const (
	OpSync       = "SYNC"
	OpInvalidate = "INVALIDATE"
	OpInsert     = "INSERT"
	OpDelete     = "DELETE"
)
View Source
const DispatcherAllUsers = "-"

Variables

View Source
var (
	SortByName              = "by_name"
	SortByRecency           = "by_recency"
	SortByNotificationLevel = "by_notification_level"
	SortByNotificationCount = "by_notification_count" // deprecated
	SortByHighlightCount    = "by_highlight_count"    // deprecated
	SortBy                  = []string{SortByHighlightCount, SortByName, SortByNotificationCount, SortByRecency, SortByNotificationLevel}

	Wildcard     = "*"
	StateKeyLazy = "$LAZY"
	StateKeyMe   = "$ME"

	DefaultTimelineLimit = int64(20)
	DefaultTimeoutMSecs  = 10 * 1000 // 10s
)

Functions

This section is empty.

Types

type Conn

type Conn struct {
	ConnID ConnID
	// contains filtered or unexported fields
}

Conn is an abstraction of a long-poll connection. It automatically handles the position values of the /sync request, including sending cached data in the event of retries. It does not handle the contents of the data at all.

func NewConn

func NewConn(connID ConnID, h ConnHandler) *Conn

func (*Conn) Alive

func (c *Conn) Alive() bool

func (*Conn) OnIncomingRequest

func (c *Conn) OnIncomingRequest(ctx context.Context, req *Request) (resp *Response, herr *internal.HandlerError)

OnIncomingRequest advances the clients position in the stream, returning the response position and data.

func (*Conn) UserID added in v0.3.1

func (c *Conn) UserID() string

type ConnHandler

type ConnHandler interface {
	// Callback which is allowed to block as long as the context is active. Return the response
	// to send back or an error. Errors of type *internal.HandlerError are inspected for the correct
	// status code to send back.
	OnIncomingRequest(ctx context.Context, cid ConnID, req *Request, isInitial bool) (*Response, error)
	UserID() string
	Destroy()
	Alive() bool
}

type ConnID

type ConnID struct {
	DeviceID string
}

func (*ConnID) String

func (c *ConnID) String() string

type ConnMap

type ConnMap struct {
	// contains filtered or unexported fields
}

ConnMap stores a collection of Conns.

func NewConnMap

func NewConnMap() *ConnMap

func (*ConnMap) CloseConn

func (m *ConnMap) CloseConn(connID ConnID)

func (*ConnMap) Conn

func (m *ConnMap) Conn(cid ConnID) *Conn

Conn returns a connection with this ConnID. Returns nil if no connection exists.

func (*ConnMap) CreateConn

func (m *ConnMap) CreateConn(cid ConnID, newConnHandler func() ConnHandler) (*Conn, bool)

Atomically gets or creates a connection with this connection ID. Calls newConn if a new connection is required.

func (*ConnMap) Len added in v0.5.1

func (m *ConnMap) Len() int

func (*ConnMap) Teardown added in v0.5.0

func (m *ConnMap) Teardown()

type Dispatcher

type Dispatcher struct {
	// contains filtered or unexported fields
}

Dispatches live events to caches

func NewDispatcher

func NewDispatcher() *Dispatcher

func (*Dispatcher) IsUserJoined

func (d *Dispatcher) IsUserJoined(userID, roomID string) bool

func (*Dispatcher) OnEphemeralEvent added in v0.7.3

func (d *Dispatcher) OnEphemeralEvent(roomID string, ephEvent json.RawMessage)

func (*Dispatcher) OnNewEvents

func (d *Dispatcher) OnNewEvents(
	roomID string, events []json.RawMessage, latestPos int64,
)

Called by v2 pollers when we receive new events

func (*Dispatcher) OnNewInitialRoomState added in v0.5.2

func (d *Dispatcher) OnNewInitialRoomState(roomID string, state []json.RawMessage)

Called by v2 pollers when we receive an initial state block. Very similar to OnNewEvents but done in bulk for speed.

func (*Dispatcher) Register

func (d *Dispatcher) Register(userID string, r Receiver) error

func (*Dispatcher) Startup

func (d *Dispatcher) Startup(roomToJoinedUsers map[string][]string) error

Load joined members into the dispatcher. MUST BE CALLED BEFORE V2 POLL LOOPS START.

func (*Dispatcher) Unregister

func (d *Dispatcher) Unregister(userID string)

type FilteredSortableRooms

type FilteredSortableRooms struct {
	*SortableRooms
	// contains filtered or unexported fields
}

FilteredSortableRooms is SortableRooms but where rooms are filtered before being added to the list. Updates to room metadata may result in rooms being added/removed.

func NewFilteredSortableRooms

func NewFilteredSortableRooms(finder RoomFinder, roomIDs []string, filter *RequestFilters) *FilteredSortableRooms

func (*FilteredSortableRooms) Add

func (f *FilteredSortableRooms) Add(roomID string) bool

type InternalRequestLists added in v0.2.0

type InternalRequestLists struct {
	// contains filtered or unexported fields
}

InternalRequestLists is a list of lists which matches each index position in the request JSON 'lists'. It contains all the internal metadata for rooms and controls access and updatings of said lists.

func NewInternalRequestLists added in v0.3.1

func NewInternalRequestLists() *InternalRequestLists

func (*InternalRequestLists) AssignList added in v0.2.0

func (s *InternalRequestLists) AssignList(index int, filters *RequestFilters, sort []string, shouldOverwrite OverwriteVal) (*FilteredSortableRooms, bool)

Assign a new list at the given index. If Overwrite, any existing list is replaced. If DoNotOverwrite, the existing list is returned if one exists, else a new list is created. Returns the list and true if the list was overwritten.

func (*InternalRequestLists) Count added in v0.2.0

func (s *InternalRequestLists) Count(index int) int

Count returns the count of total rooms in this list

func (*InternalRequestLists) DeleteList added in v0.2.0

func (s *InternalRequestLists) DeleteList(index int)

func (*InternalRequestLists) Get added in v0.4.1

func (s *InternalRequestLists) Get(listIndex int) *FilteredSortableRooms

func (*InternalRequestLists) Len added in v0.2.0

func (s *InternalRequestLists) Len() int

func (*InternalRequestLists) RemoveRoom added in v0.2.0

func (s *InternalRequestLists) RemoveRoom(roomID string)

Remove a room from all lists e.g retired an invite, left a room

func (*InternalRequestLists) Room added in v0.4.1

func (s *InternalRequestLists) Room(roomID string) *RoomConnMetadata

func (*InternalRequestLists) SetRoom added in v0.4.1

func (s *InternalRequestLists) SetRoom(r RoomConnMetadata) (delta RoomDelta)

type JoinedRoomsTracker

type JoinedRoomsTracker struct {
	// contains filtered or unexported fields
}

Tracks who is joined to which rooms. This is critical from a security perspective in order to ensure that only the users joined to the room receive events in that room. Consider the situation where Alice and Bob are joined to room X. If Alice gets kicked from X, the proxy server will still receive messages for room X due to Bob being joined to the room. We therefore need to decide which active connections should be pushed events, which is what this tracker does.

func NewJoinedRoomsTracker

func NewJoinedRoomsTracker() *JoinedRoomsTracker

func (*JoinedRoomsTracker) IsUserJoined

func (t *JoinedRoomsTracker) IsUserJoined(userID, roomID string) bool

func (*JoinedRoomsTracker) JoinedRoomsForUser

func (t *JoinedRoomsTracker) JoinedRoomsForUser(userID string) []string

func (*JoinedRoomsTracker) JoinedUsersForRoom

func (t *JoinedRoomsTracker) JoinedUsersForRoom(roomID string, filter func(userID string) bool) (matchedUserIDs []string, joinCount int)

JoinedUsersForRoom returns the joined users in the given room, filtered by the filter function if provided. If one is not provided, all joined users are returned. Returns the join count at the time this function was called.

func (*JoinedRoomsTracker) NumInvitedUsersForRoom added in v0.4.1

func (t *JoinedRoomsTracker) NumInvitedUsersForRoom(roomID string) int

func (*JoinedRoomsTracker) Startup added in v0.5.2

func (t *JoinedRoomsTracker) Startup(roomToJoinedUsers map[string][]string)

Startup efficiently sets up the joined rooms tracker, but isn't safe to call with live traffic, as it replaces all known in-memory state. Panics if called on a non-empty tracker.

func (*JoinedRoomsTracker) UserJoinedRoom

func (t *JoinedRoomsTracker) UserJoinedRoom(userID, roomID string) bool

returns true if the state changed

func (*JoinedRoomsTracker) UserLeftRoom

func (t *JoinedRoomsTracker) UserLeftRoom(userID, roomID string)

func (*JoinedRoomsTracker) UsersInvitedToRoom added in v0.5.2

func (t *JoinedRoomsTracker) UsersInvitedToRoom(userIDs []string, roomID string)

func (*JoinedRoomsTracker) UsersJoinedRoom added in v0.5.2

func (t *JoinedRoomsTracker) UsersJoinedRoom(userIDs []string, roomID string) bool

returns true if the state changed for any user in userIDs

type List added in v0.4.1

type List interface {
	IndexOf(roomID string) (int, bool)
	Len() int64
	Sort(sortBy []string) error
	Add(roomID string) bool
	Remove(roomID string) int
	Get(index int) string
}

type ListOp added in v0.4.1

type ListOp uint8

ListOp represents the possible operations on a list

var (
	// The room is added to the list
	ListOpAdd ListOp = 1
	// The room is removed from the list
	ListOpDel ListOp = 2
	// The room may change position in the list
	ListOpChange ListOp = 3
)

type OverwriteVal added in v0.2.0

type OverwriteVal bool
var (
	DoNotOverwrite OverwriteVal = false
	Overwrite      OverwriteVal = true
)

type Receiver

type Receiver interface {
	OnNewEvent(event *caches.EventData)
	OnEphemeralEvent(roomID string, ephEvent json.RawMessage)
	OnRegistered(latestPos int64) error
}

type Request

type Request struct {
	TxnID             string                      `json:"txn_id"`
	Lists             []RequestList               `json:"lists"`
	RoomSubscriptions map[string]RoomSubscription `json:"room_subscriptions"`
	UnsubscribeRooms  []string                    `json:"unsubscribe_rooms"`
	Extensions        extensions.Request          `json:"extensions"`
	// contains filtered or unexported fields
}

func (*Request) ApplyDelta

func (r *Request) ApplyDelta(nextReq *Request) (result *Request, delta *RequestDelta)

Apply this delta on top of the request. Returns a new Request with the combined output, along with the delta operations `nextReq` cannot be nil, but `r` can be nil in the case of an initial request.

func (*Request) GetTimelineLimit

func (r *Request) GetTimelineLimit(listIndex int, roomID string) int64

func (*Request) Same

func (r *Request) Same(other *Request) bool

func (*Request) SetPos

func (r *Request) SetPos(pos int64)

func (*Request) SetTimeoutMSecs

func (r *Request) SetTimeoutMSecs(timeout int)

func (*Request) TimeoutMSecs

func (r *Request) TimeoutMSecs() int

type RequestDelta added in v0.2.0

type RequestDelta struct {
	// new room IDs to subscribe to
	Subs []string
	// room IDs to unsubscribe from
	Unsubs []string
	// The complete union of both lists (contains max(a,b) lists)
	Lists []RequestListDelta
}

Internal struct used to represent the diffs between 2 requests

type RequestFilters

type RequestFilters struct {
	Spaces         []string  `json:"spaces"`
	IsDM           *bool     `json:"is_dm"`
	IsEncrypted    *bool     `json:"is_encrypted"`
	IsInvite       *bool     `json:"is_invite"`
	IsTombstoned   *bool     `json:"is_tombstoned"` // deprecated
	RoomTypes      []*string `json:"room_types"`
	NotRoomTypes   []*string `json:"not_room_types"`
	RoomNameFilter string    `json:"room_name_like"`
	Tags           []string  `json:"tags"`
	NotTags        []string  `json:"not_tags"`
}

func (*RequestFilters) Include

func (rf *RequestFilters) Include(r *RoomConnMetadata, finder RoomFinder) bool

type RequestList

type RequestList struct {
	RoomSubscription
	Ranges          SliceRanges     `json:"ranges"`
	Sort            []string        `json:"sort"`
	Filters         *RequestFilters `json:"filters"`
	SlowGetAllRooms *bool           `json:"slow_get_all_rooms,omitempty"`
}

func (*RequestList) CalculateMoveIndexes added in v0.2.0

func (rl *RequestList) CalculateMoveIndexes(fromIndex, toIndex int) (fromTos [][2]int)

Calculate the real from -> to index positions for the two input index positions. This takes into account the ranges on the list.

func (*RequestList) FiltersChanged added in v0.2.0

func (rl *RequestList) FiltersChanged(next *RequestList) bool

func (*RequestList) ShouldGetAllRooms added in v0.2.0

func (rl *RequestList) ShouldGetAllRooms() bool

func (*RequestList) SortOrderChanged added in v0.2.0

func (rl *RequestList) SortOrderChanged(next *RequestList) bool

func (*RequestList) WriteDeleteOp added in v0.2.0

func (rl *RequestList) WriteDeleteOp(deletedIndex int) *ResponseOpSingle

Write a delete operation for this list. Can return nil for invalid indexes or if this index isn't being tracked. Useful when rooms are removed from the list e.g left rooms.

func (*RequestList) WriteInsertOp added in v0.2.0

func (rl *RequestList) WriteInsertOp(insertedIndex int, roomID string) *ResponseOpSingle

Write an insert operation for this list. Can return nil for indexes not being tracked. Useful when rooms are added to the list e.g newly joined rooms.

func (*RequestList) WriteSwapOp added in v0.2.0

func (rl *RequestList) WriteSwapOp(
	roomID string, fromIndex, toIndex int,
) []ResponseOp

Move a room from an absolute index position to another absolute position. These positions do not need to be inside a valid range. Returns 0-2 operations. For example:

1,2,3,4,5 tracking range [0,4]
3 bumps to top -> 3,1,2,4,5 -> DELETE index=2, INSERT val=3 index=0
7 bumps to top -> 7,1,2,3,4 -> DELETE index=4, INSERT val=7 index=0
7 bumps to op again -> 7,1,2,3,4 -> no-op as from == to index
new room 8 in i=5 -> 7,1,2,3,4,8 -> no-op as 8 is outside the range.

Returns the list of ops as well as the new toIndex if it wasn't inside a range.

type RequestListDelta added in v0.2.0

type RequestListDelta struct {
	// What was there before, nullable
	Prev *RequestList
	// What is there now, nullable. Combined result.
	Curr *RequestList
}

Internal struct used to represent a single list delta.

type Response

type Response struct {
	Lists []ResponseList `json:"lists"`

	Rooms      map[string]Room     `json:"rooms"`
	Extensions extensions.Response `json:"extensions"`

	Pos     string `json:"pos"`
	TxnID   string `json:"txn_id,omitempty"`
	Session string `json:"session_id,omitempty"`
}

func (*Response) ListOps added in v0.2.0

func (r *Response) ListOps() int

func (*Response) PosInt

func (r *Response) PosInt() int64

func (*Response) UnmarshalJSON

func (r *Response) UnmarshalJSON(b []byte) error

Custom unmarshal so we can dynamically create the right ResponseOp for Ops

type ResponseList added in v0.2.0

type ResponseList struct {
	Ops   []ResponseOp `json:"ops,omitempty"`
	Count int          `json:"count"`
}

type ResponseOp

type ResponseOp interface {
	Op() string
	// which rooms are we giving data about
	IncludedRoomIDs() []string
}

func CalculateListOps added in v0.4.1

func CalculateListOps(reqList *RequestList, list List, roomID string, listOp ListOp) (ops []ResponseOp, subs []string)

CalculateListOps contains the core list moving algorithm. It accepts the client's list of ranges, the underlying list on which to perform operations on, and the room which was modified and in what way. It returns a list of INSERT/DELETE operations, which may be zero length, as well as which rooms are newly added into the window.

A,B,C,D,E,F,G,H,I        <-- List
`----`    `----`         <-- RequestList.Ranges
DEL E | ADD J | CHANGE C <-- ListOp RoomID

returns:

[ {op:DELETE, index:2}, {op:INSERT, index:0, room_id:A} ] <--- []ResponseOp
[ "A" ] <--- []string, new room subscriptions, if it wasn't in the window before

This function will modify List to Add/Delete/Sort appropriately.

type ResponseOpRange

type ResponseOpRange struct {
	Operation string   `json:"op"`
	Range     []int64  `json:"range,omitempty"`
	RoomIDs   []string `json:"room_ids,omitempty"`
}

func (*ResponseOpRange) IncludedRoomIDs

func (r *ResponseOpRange) IncludedRoomIDs() []string

func (*ResponseOpRange) Op

func (r *ResponseOpRange) Op() string

type ResponseOpSingle

type ResponseOpSingle struct {
	Operation string `json:"op"`
	Index     *int   `json:"index,omitempty"` // 0 is a valid value, hence *int
	RoomID    string `json:"room_id,omitempty"`
}

func (*ResponseOpSingle) IncludedRoomIDs

func (r *ResponseOpSingle) IncludedRoomIDs() []string

func (*ResponseOpSingle) Op

func (r *ResponseOpSingle) Op() string

type Room

type Room struct {
	Name              string            `json:"name,omitempty"`
	RequiredState     []json.RawMessage `json:"required_state,omitempty"`
	Timeline          []json.RawMessage `json:"timeline,omitempty"`
	InviteState       []json.RawMessage `json:"invite_state,omitempty"`
	NotificationCount int64             `json:"notification_count"`
	HighlightCount    int64             `json:"highlight_count"`
	Initial           bool              `json:"initial,omitempty"`
	IsDM              bool              `json:"is_dm,omitempty"`
	JoinedCount       int               `json:"joined_count,omitempty"`
	InvitedCount      int               `json:"invited_count,omitempty"`
	PrevBatch         string            `json:"prev_batch,omitempty"`
	NumLive           int               `json:"num_live,omitempty"`
}

type RoomConnMetadata

type RoomConnMetadata struct {
	internal.RoomMetadata
	caches.UserRoomData
}

type RoomDelta added in v0.4.1

type RoomDelta struct {
	RoomNameChanged    bool
	JoinCountChanged   bool
	InviteCountChanged bool
	Lists              []RoomListDelta
}

type RoomFinder added in v0.4.1

type RoomFinder interface {
	Room(roomID string) *RoomConnMetadata
}

type RoomListDelta added in v0.4.1

type RoomListDelta struct {
	ListIndex int
	Op        ListOp
}

type RoomSubscription

type RoomSubscription struct {
	RequiredState   [][2]string       `json:"required_state"`
	TimelineLimit   int64             `json:"timeline_limit"`
	IncludeOldRooms *RoomSubscription `json:"include_old_rooms"`
}

func (RoomSubscription) Combine added in v0.2.0

Combine this subcription with another, returning a union of both as a copy.

func (RoomSubscription) LazyLoadMembers added in v0.7.0

func (rs RoomSubscription) LazyLoadMembers() bool

func (RoomSubscription) RequiredStateMap added in v0.2.0

func (rs RoomSubscription) RequiredStateMap(userID string) *internal.RequiredStateMap

Calculate the required state map for this room subscription. Given event types A,B,C and state keys 1,2,3, the following Venn diagrams are possible:

.---------[*,*]----------.
|      .---------.       |
|      |   A,2   | A,3   |
| .----+--[B,*]--+-----. |
| |    | .-----. |     | |
| |B,1 | | B,2 | | B,3 | |
| |    | `[B,2]` |     | |
| `----+---------+-----` |
|      |   C,2   | C,3   |
|      `--[*,2]--`       |
`------------------------`

The largest set will be used when returning the required state map. For example, [B,2] + [B,*] = [B,*] because [B,*] encompasses [B,2]. This means [*,*] encompasses everything. 'userID' is the ID of the user performing this request, so $ME can be replaced.

type SliceRanges

type SliceRanges [][2]int64

func (SliceRanges) ClosestInDirection added in v0.2.0

func (r SliceRanges) ClosestInDirection(i int64, towardsZero bool) (closestIndex int64)

ClosestInDirection returns the index position of a range bound that is closest to `i`, heading either towards 0 or towards infinity. If there is no range boundary in that direction, -1 is returned. For example:

[0,20] i=25,towardsZero=true => 20
[0,20] i=15,towardsZero=true => 0
[0,20] i=15,towardsZero=false => 20
[0,20] i=25,towardsZero=false => -1
[0,20],[40,60] i=25,towardsZero=true => 20
[0,20],[40,60] i=25,towardsZero=false => 40
[0,20],[40,60] i=40,towardsZero=true => 40
[20,40] i=40,towardsZero=true => 20

func (SliceRanges) Delta

func (r SliceRanges) Delta(next SliceRanges) (added SliceRanges, removed SliceRanges, same SliceRanges)

Delta returns the ranges which are unchanged, added and removed. Intelligently handles overlaps.

func (SliceRanges) Inside

func (r SliceRanges) Inside(i int64) ([2]int64, bool)

Inside returns true if i is inside the range

func (SliceRanges) SliceInto

func (r SliceRanges) SliceInto(slice Subslicer) []Subslicer

Slice into this range, returning subslices of slice

func (SliceRanges) Valid

func (r SliceRanges) Valid() bool

type SortableRooms

type SortableRooms struct {
	// contains filtered or unexported fields
}

SortableRooms represents a list of rooms which can be sorted and updated. Maintains mappings of room IDs to current index positions after sorting.

func NewSortableRooms

func NewSortableRooms(finder RoomFinder, rooms []string) *SortableRooms

func (*SortableRooms) Add

func (s *SortableRooms) Add(roomID string) bool

Add a room to the list. Returns true if the room was added.

func (*SortableRooms) Get

func (s *SortableRooms) Get(index int) string

func (*SortableRooms) IndexOf

func (s *SortableRooms) IndexOf(roomID string) (int, bool)

func (*SortableRooms) Len

func (s *SortableRooms) Len() int64

func (*SortableRooms) Remove

func (s *SortableRooms) Remove(roomID string) int

func (*SortableRooms) RoomIDs

func (s *SortableRooms) RoomIDs() []string

func (*SortableRooms) Sort

func (s *SortableRooms) Sort(sortBy []string) error

func (*SortableRooms) Subslice

func (s *SortableRooms) Subslice(i, j int64) Subslicer

type Subslicer

type Subslicer interface {
	Len() int64
	Subslice(i, j int64) Subslicer
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL