sync2

package
v0.7.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 6, 2022 License: Apache-2.0 Imports: 24 Imported by: 0

Documentation

Index

Constants

View Source
const AccountDataGlobalRoom = ""

Variables

This section is empty.

Functions

This section is empty.

Types

type Client

type Client interface {
	WhoAmI(accessToken string) (string, error)
	DoSyncV2(ctx context.Context, accessToken, since string, isFirst bool) (*SyncResponse, int, error)
}

type Device

type Device struct {
	UserID               string `db:"user_id"`
	DeviceID             string `db:"device_id"`
	Since                string `db:"since"`
	AccessToken          string
	AccessTokenEncrypted string `db:"v2_token_encrypted"`
}

type E2EEFetcher

type E2EEFetcher interface {
	DeviceData(userID, deviceID string, isInitial bool) *internal.DeviceData
}

Fetcher used by the E2EE extension

type EventsResponse

type EventsResponse struct {
	Events []json.RawMessage `json:"events"`
}

type HTTPClient

type HTTPClient struct {
	Client            *http.Client
	DestinationServer string
}

HTTPClient represents a Sync v2 Client. One client can be shared among many users.

func (*HTTPClient) DoSyncV2

func (v *HTTPClient) DoSyncV2(ctx context.Context, accessToken, since string, isFirst bool) (*SyncResponse, int, error)

DoSyncV2 performs a sync v2 request. Returns the sync response and the response status code or an error. Set isFirst=true on the first sync to force a timeout=0 sync to ensure snapiness.

func (*HTTPClient) WhoAmI

func (v *HTTPClient) WhoAmI(accessToken string) (string, error)

type PollerMap

type PollerMap struct {
	Pollers map[string]*poller // device_id -> poller
	// contains filtered or unexported fields
}

PollerMap is a map of device ID to Poller

func NewPollerMap

func NewPollerMap(v2Client Client, enablePrometheus bool) *PollerMap

NewPollerMap makes a new PollerMap. Guarantees that the V2DataReceiver will be called on the same goroutine for all pollers. This is required to avoid race conditions at the Go level. Whilst we use SQL transactions to ensure that the DB doesn't race, we then subsequently feed new events from that call into a global cache. This can race which can result in out of order latest NIDs which, if we assert NIDs only increment, will result in missed events.

Consider these events in the same room, with 3 different pollers getting the data:

1 2 3 4 5 6 7 eventual DB event NID
A B C D E F G
-----          poll loop 1 = A,B,C          new events = A,B,C latest=3
---------      poll loop 2 = A,B,C,D,E      new events = D,E   latest=5
-------------  poll loop 3 = A,B,C,D,E,F,G  new events = F,G   latest=7

The DB layer will correctly assign NIDs and stop duplicates, resulting in a set of new events which do not overlap. However, there is a gap between this point and updating the cache, where variable delays can be introduced, so F,G latest=7 could be injected first. If we then never walk back to earlier NIDs, A,B,C,D,E will be dropped from the cache.

This only affects resources which are shared across multiple DEVICES such as:

  • room resources: events, EDUs
  • user resources: notif counts, account data

NOT to-device messages,or since tokens.

func (*PollerMap) Accumulate

func (h *PollerMap) Accumulate(userID, roomID, prevBatch string, timeline []json.RawMessage)

func (*PollerMap) AddToDeviceMessages

func (h *PollerMap) AddToDeviceMessages(userID, deviceID string, msgs []json.RawMessage)

Add messages for this device. If an error is returned, the poll loop is terminated as continuing would implicitly acknowledge these messages.

func (*PollerMap) EnsurePolling

func (h *PollerMap) EnsurePolling(accessToken, userID, deviceID, v2since string, logger zerolog.Logger)

EnsurePolling makes sure there is a poller for this user, making one if need be. Blocks until at least 1 sync is done if and only if the poller was just created. This ensures that calls to the database will return data. Guarantees only 1 poller will be running per deviceID. Note that we will immediately return if there is a poller for the same user but a different device. We do this to allow for logins on clients to be snappy fast, even though they won't yet have the to-device msgs to decrypt E2EE roms.

func (*PollerMap) Initialise

func (h *PollerMap) Initialise(roomID string, state []json.RawMessage)

func (*PollerMap) NumPollers added in v0.5.1

func (h *PollerMap) NumPollers() (count int)

func (*PollerMap) OnAccountData

func (h *PollerMap) OnAccountData(userID, roomID string, events []json.RawMessage)

func (*PollerMap) OnE2EEData added in v0.5.0

func (h *PollerMap) OnE2EEData(userID, deviceID string, otkCounts map[string]int, fallbackKeyTypes []string, deviceListChanges map[string]int)

func (*PollerMap) OnInvite

func (h *PollerMap) OnInvite(userID, roomID string, inviteState []json.RawMessage)

func (*PollerMap) OnLeftRoom added in v0.4.1

func (h *PollerMap) OnLeftRoom(userID, roomID string)

func (*PollerMap) OnReceipt added in v0.7.3

func (h *PollerMap) OnReceipt(userID, roomID, ephEventType string, ephEvent json.RawMessage)

func (*PollerMap) OnTerminated added in v0.5.1

func (h *PollerMap) OnTerminated(userID, deviceID string)

func (*PollerMap) SetCallbacks added in v0.5.0

func (h *PollerMap) SetCallbacks(callbacks V2DataReceiver)

func (*PollerMap) SetTyping

func (h *PollerMap) SetTyping(roomID string, ephEvent json.RawMessage)

func (*PollerMap) Terminate added in v0.5.0

func (h *PollerMap) Terminate()

Terminate all pollers. Useful in tests.

func (*PollerMap) UpdateDeviceSince

func (h *PollerMap) UpdateDeviceSince(deviceID, since string)

func (*PollerMap) UpdateUnreadCounts

func (h *PollerMap) UpdateUnreadCounts(roomID, userID string, highlightCount, notifCount *int)

type Storage

type Storage struct {
	// contains filtered or unexported fields
}

Storage remembers sync v2 tokens per-device

func NewStore

func NewStore(postgresURI, secret string) *Storage

func (*Storage) AllDevices added in v0.2.0

func (s *Storage) AllDevices() (devices []Device, err error)

func (*Storage) Device

func (s *Storage) Device(deviceID string) (*Device, error)

func (*Storage) InsertDevice

func (s *Storage) InsertDevice(deviceID, accessToken string) (*Device, error)

func (*Storage) Teardown added in v0.5.0

func (s *Storage) Teardown()

func (*Storage) UpdateDeviceSince

func (s *Storage) UpdateDeviceSince(deviceID, since string) error

func (*Storage) UpdateUserIDForDevice

func (s *Storage) UpdateUserIDForDevice(deviceID, userID string) error

type SyncResponse

type SyncResponse struct {
	NextBatch   string         `json:"next_batch"`
	AccountData EventsResponse `json:"account_data"`
	Presence    struct {
		Events []gomatrixserverlib.ClientEvent `json:"events,omitempty"`
	} `json:"presence"`
	Rooms       SyncRoomsResponse `json:"rooms"`
	ToDevice    EventsResponse    `json:"to_device"`
	DeviceLists struct {
		Changed []string `json:"changed,omitempty"`
		Left    []string `json:"left,omitempty"`
	} `json:"device_lists"`
	DeviceListsOTKCount          map[string]int `json:"device_one_time_keys_count,omitempty"`
	DeviceUnusedFallbackKeyTypes []string       `json:"device_unused_fallback_key_types,omitempty"`
}

type SyncRoomsResponse

type SyncRoomsResponse struct {
	Join   map[string]SyncV2JoinResponse   `json:"join"`
	Invite map[string]SyncV2InviteResponse `json:"invite"`
	Leave  map[string]SyncV2LeaveResponse  `json:"leave"`
}

type SyncV2InviteResponse

type SyncV2InviteResponse struct {
	InviteState EventsResponse `json:"invite_state"`
}

InviteResponse represents a /sync response for a room which is under the 'invite' key.

type SyncV2JoinResponse

type SyncV2JoinResponse struct {
	State               EventsResponse      `json:"state"`
	Timeline            TimelineResponse    `json:"timeline"`
	Ephemeral           EventsResponse      `json:"ephemeral"`
	AccountData         EventsResponse      `json:"account_data"`
	UnreadNotifications UnreadNotifications `json:"unread_notifications"`
}

JoinResponse represents a /sync response for a room which is under the 'join' or 'peek' key.

type SyncV2LeaveResponse

type SyncV2LeaveResponse struct {
	State struct {
		Events []json.RawMessage `json:"events"`
	} `json:"state"`
	Timeline struct {
		Events    []json.RawMessage `json:"events"`
		Limited   bool              `json:"limited"`
		PrevBatch string            `json:"prev_batch,omitempty"`
	} `json:"timeline"`
}

LeaveResponse represents a /sync response for a room which is under the 'leave' key.

type TimelineResponse

type TimelineResponse struct {
	Events    []json.RawMessage `json:"events"`
	Limited   bool              `json:"limited"`
	PrevBatch string            `json:"prev_batch,omitempty"`
}

type TransactionIDFetcher

type TransactionIDFetcher interface {
	TransactionIDForEvents(userID string, eventIDs []string) (eventIDToTxnID map[string]string)
}

type UnreadNotifications

type UnreadNotifications struct {
	HighlightCount    *int `json:"highlight_count,omitempty"`
	NotificationCount *int `json:"notification_count,omitempty"`
}

type V2DataReceiver

type V2DataReceiver interface {
	// Update the since token for this device. Called AFTER all other data in this sync response has been processed.
	UpdateDeviceSince(deviceID, since string)
	// Accumulate data for this room. This means the timeline section of the v2 response.
	Accumulate(userID, roomID, prevBatch string, timeline []json.RawMessage) // latest pos with event nids of timeline entries
	// Initialise the room, if it hasn't been already. This means the state section of the v2 response.
	Initialise(roomID string, state []json.RawMessage) // snapshot ID?
	// SetTyping indicates which users are typing.
	SetTyping(roomID string, ephEvent json.RawMessage)
	// Sent when there is a new receipt
	OnReceipt(userID, roomID, ephEventType string, ephEvent json.RawMessage)
	// AddToDeviceMessages adds this chunk of to_device messages. Preserve the ordering.
	AddToDeviceMessages(userID, deviceID string, msgs []json.RawMessage) // start/end stream pos
	// UpdateUnreadCounts sets the highlight_count and notification_count for this user in this room.
	UpdateUnreadCounts(roomID, userID string, highlightCount, notifCount *int)
	// Set the latest account data for this user.
	OnAccountData(userID, roomID string, events []json.RawMessage) // ping update with types? Can you race when re-querying?
	// Sent when there is a room in the `invite` section of the v2 response.
	OnInvite(userID, roomID string, inviteState []json.RawMessage) // invitestate in db
	// Sent when there is a room in the `leave` section of the v2 response.
	OnLeftRoom(userID, roomID string)
	// Sent when there is a _change_ in E2EE data, not all the time
	OnE2EEData(userID, deviceID string, otkCounts map[string]int, fallbackKeyTypes []string, deviceListChanges map[string]int)
	// Sent when the upstream homeserver sends back a 401 invalidating the token
	OnTerminated(userID, deviceID string)
}

V2DataReceiver is the receiver for all the v2 sync data the poller gets. There exists 2 concrete implementations of this interface:

  • The sync2 code which dumps this data into a database and issues a NOTIFY
  • The sync3 code which LISTENs and repopulates this data and passes it through to handlers

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL