sync2

package
v0.99.16 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 26, 2024 License: Apache-2.0 Imports: 29 Imported by: 0

Documentation

Index

Constants

View Source
const AccountDataGlobalRoom = ""

Variables

View Source
var HTTP401 error = fmt.Errorf("HTTP 401")
View Source
var ProxyVersion = ""

Functions

This section is empty.

Types

type Client

type Client interface {
	// Versions fetches and parses the list of Matrix versions that the homeserver
	// advertises itself as supporting.
	Versions(ctx context.Context) (version []string, err error)
	// WhoAmI asks the homeserver to lookup the access token using the CSAPI /whoami
	// endpoint. The response must contain a device ID (meaning that we assume the
	// homeserver supports Matrix >= 1.1.)
	WhoAmI(ctx context.Context, accessToken string) (userID, deviceID string, err error)
	DoSyncV2(ctx context.Context, accessToken, since string, isFirst bool, toDeviceOnly bool) (*SyncResponse, int, error)
}

type Device

type Device struct {
	UserID   string `db:"user_id"`
	DeviceID string `db:"device_id"`
	Since    string `db:"since"`
}

type DeviceDataTicker added in v0.99.4

type DeviceDataTicker struct {
	// contains filtered or unexported fields
}

This struct remembers user+device IDs to notify for then periodically emits them all to the caller. Use to rate limit the frequency of device list updates.

func NewDeviceDataTicker added in v0.99.4

func NewDeviceDataTicker(d time.Duration) *DeviceDataTicker

Create a new device data ticker, which batches calls to Remember and invokes a callback every d duration. If d is 0, no batching is performed and the callback is invoked synchronously, which is useful for testing.

func (*DeviceDataTicker) Remember added in v0.99.4

func (t *DeviceDataTicker) Remember(pid PollerID)

Remember this user/device ID, and emit it later on.

func (*DeviceDataTicker) Run added in v0.99.4

func (t *DeviceDataTicker) Run()

Blocks forever, ticking until Stop() is called.

func (*DeviceDataTicker) SetCallback added in v0.99.4

func (t *DeviceDataTicker) SetCallback(fn func(payload *pubsub.V2DeviceData))

Set the function which should be called when the tick happens.

func (*DeviceDataTicker) Stop added in v0.99.4

func (t *DeviceDataTicker) Stop()

Stop ticking.

type DevicesTable added in v0.99.3

type DevicesTable struct {
	// contains filtered or unexported fields
}

DevicesTable remembers syncv2 since positions per-device

func NewDevicesTable added in v0.99.3

func NewDevicesTable(db *sqlx.DB) *DevicesTable

func (*DevicesTable) FindOldDevices added in v0.99.6

func (t *DevicesTable) FindOldDevices(inactivityPeriod time.Duration) (devices []Device, err error)

FindOldDevices fetches the user_id and device_id of all devices which haven't /synced for at least as long as the given inactivityPeriod. Such devices are returned in no particular order.

This is determined using the syncv3_sync2_tokens.last_seen column, which is updated at most once per day to save DB throughtput (see TokensTable.MaybeUpdateLastSeen). The caller should therefore use an inactivityPeriod of at least two days to avoid considering a recently-used device as old.

func (*DevicesTable) InsertDevice added in v0.99.3

func (t *DevicesTable) InsertDevice(txn *sqlx.Tx, userID, deviceID string) error

InsertDevice creates a new devices row with a blank since token if no such row exists. Otherwise, it does nothing.

func (*DevicesTable) UpdateDeviceSince added in v0.99.3

func (t *DevicesTable) UpdateDeviceSince(userID, deviceID, since string) error

type EventsResponse

type EventsResponse struct {
	Events []json.RawMessage `json:"events"`
}

type HTTPClient

type HTTPClient struct {
	Client            *http.Client
	LongTimeoutClient *http.Client
	DestinationServer string
}

HTTPClient represents a Sync v2 Client. One client can be shared among many users.

func NewHTTPClient added in v0.99.11

func NewHTTPClient(shortTimeout, longTimeout time.Duration, destHomeServer string) *HTTPClient

func (*HTTPClient) DoSyncV2

func (v *HTTPClient) DoSyncV2(ctx context.Context, accessToken, since string, isFirst, toDeviceOnly bool) (*SyncResponse, int, error)

DoSyncV2 performs a sync v2 request. Returns the sync response and the response status code or an error. Set isFirst=true on the first sync to force a timeout=0 sync to ensure snapiness.

func (*HTTPClient) Versions added in v0.99.11

func (v *HTTPClient) Versions(ctx context.Context) ([]string, error)

func (*HTTPClient) WhoAmI

func (v *HTTPClient) WhoAmI(ctx context.Context, accessToken string) (string, string, error)

Return sync2.HTTP401 if this request returns 401

type IPollerMap added in v0.99.3

type IPollerMap interface {
	EnsurePolling(pid PollerID, accessToken, v2since string, isStartup bool, logger zerolog.Logger) (created bool, err error)
	NumPollers() int
	Terminate()
	DeviceIDs(userID string) []string
	// ExpirePollers requests that the given pollers are terminated as if their access
	// tokens had expired. Returns the number of pollers successfully terminated.
	ExpirePollers(ids []PollerID) int
}

type PendingTransactionIDs added in v0.99.5

type PendingTransactionIDs struct {
	// contains filtered or unexported fields
}

PendingTransactionIDs is (conceptually) a map from event IDs to a list of device IDs. Its keys are the IDs of event we've seen which a) lack a transaction ID, and b) were sent by one of the users we are polling for. The values are the list of the sender's devices whose pollers are yet to see a transaction ID.

If another poller sees the same event

  • with a transaction ID, it emits a V2TransactionID payload with that ID and removes the event ID from this map.

  • without a transaction ID, it removes the polling device ID from the values list. If the device ID list is now empty, the poller emits an "all clear" V2TransactionID payload.

This is a best-effort affair to ensure that the rest of the proxy can wait for transaction IDs to appear before transmitting an event down /sync to its sender.

It's possible that we add an entry to this map and then the list of remaining device IDs becomes out of date, either due to a new device creation or an existing device expiring. We choose not to handle this case, because it is relatively rare.

To avoid the map growing without bound, we use a ttlcache and drop entries after a short period of time.

func NewPendingTransactionIDs added in v0.99.5

func NewPendingTransactionIDs(loader loaderFunc) *PendingTransactionIDs

func (*PendingTransactionIDs) MissingTxnID added in v0.99.5

func (c *PendingTransactionIDs) MissingTxnID(eventID, userID, myDeviceID string) (bool, error)

MissingTxnID should be called to report that this device ID did not see a transaction ID for this event ID. Returns true if this is the first time we know for sure that we'll never see a txn ID for this event.

func (*PendingTransactionIDs) SeenTxnID added in v0.99.5

func (c *PendingTransactionIDs) SeenTxnID(eventID string) error

SeenTxnID should be called to report that this device saw a transaction ID for this event.

type PollerID added in v0.99.3

type PollerID struct {
	UserID   string
	DeviceID string
}

type PollerMap

type PollerMap struct {
	Pollers map[PollerID]*poller
	// contains filtered or unexported fields
}

PollerMap is a map of device ID to Poller

func NewPollerMap

func NewPollerMap(v2Client Client, enablePrometheus bool) *PollerMap

NewPollerMap makes a new PollerMap. Guarantees that the V2DataReceiver will be called on the same goroutine for all pollers. This is required to avoid race conditions at the Go level. Whilst we use SQL transactions to ensure that the DB doesn't race, we then subsequently feed new events from that call into a global cache. This can race which can result in out of order latest NIDs which, if we assert NIDs only increment, will result in missed events.

Consider these events in the same room, with 3 different pollers getting the data:

1 2 3 4 5 6 7 eventual DB event NID
A B C D E F G
-----          poll loop 1 = A,B,C          new events = A,B,C latest=3
---------      poll loop 2 = A,B,C,D,E      new events = D,E   latest=5
-------------  poll loop 3 = A,B,C,D,E,F,G  new events = F,G   latest=7

The DB layer will correctly assign NIDs and stop duplicates, resulting in a set of new events which do not overlap. However, there is a gap between this point and updating the cache, where variable delays can be introduced, so F,G latest=7 could be injected first. If we then never walk back to earlier NIDs, A,B,C,D,E will be dropped from the cache.

This only affects resources which are shared across multiple DEVICES such as:

  • room resources: events, EDUs
  • user resources: notif counts, account data

NOT to-device messages,or since tokens.

func (*PollerMap) Accumulate

func (h *PollerMap) Accumulate(ctx context.Context, userID, deviceID, roomID string, timeline TimelineResponse) (err error)

func (*PollerMap) AddToDeviceMessages

func (h *PollerMap) AddToDeviceMessages(ctx context.Context, userID, deviceID string, msgs []json.RawMessage) error

Add messages for this device. If an error is returned, the poll loop is terminated as continuing would implicitly acknowledge these messages.

func (*PollerMap) DeviceIDs added in v0.99.5

func (h *PollerMap) DeviceIDs(userID string) []string

DeviceIDs returns the slice of all devices currently being polled for by this user. The return value is brand-new and is fully owned by the caller.

func (*PollerMap) EnsurePolling

func (h *PollerMap) EnsurePolling(pid PollerID, accessToken, v2since string, isStartup bool, logger zerolog.Logger) (bool, error)

EnsurePolling makes sure there is a poller for this device, making one if need be. Blocks until at least 1 sync is done if and only if the poller was just created. This ensures that calls to the database will return data. Guarantees only 1 poller will be running per deviceID. Note that we will immediately return if there is a poller for the same user but a different device. We do this to allow for logins on clients to be snappy fast, even though they won't yet have the to-device msgs to decrypt E2EE rooms.

func (*PollerMap) ExpirePollers added in v0.99.6

func (h *PollerMap) ExpirePollers(pids []PollerID) int

func (*PollerMap) Initialise

func (h *PollerMap) Initialise(ctx context.Context, roomID string, state []json.RawMessage) (err error)

func (*PollerMap) NumPollers

func (h *PollerMap) NumPollers() (count int)

func (*PollerMap) OnAccountData

func (h *PollerMap) OnAccountData(ctx context.Context, userID, roomID string, events []json.RawMessage) (err error)

func (*PollerMap) OnE2EEData

func (h *PollerMap) OnE2EEData(ctx context.Context, userID, deviceID string, otkCounts map[string]int, fallbackKeyTypes []string, deviceListChanges map[string]int) error

func (*PollerMap) OnExpiredToken added in v0.99.2

func (h *PollerMap) OnExpiredToken(ctx context.Context, accessTokenHash, userID, deviceID string)

func (*PollerMap) OnInvite

func (h *PollerMap) OnInvite(ctx context.Context, userID, roomID string, inviteState []json.RawMessage) (err error)

func (*PollerMap) OnLeftRoom

func (h *PollerMap) OnLeftRoom(ctx context.Context, userID, roomID string, leaveEvent json.RawMessage) (err error)

func (*PollerMap) OnReceipt

func (h *PollerMap) OnReceipt(ctx context.Context, userID, roomID, ephEventType string, ephEvent json.RawMessage)

func (*PollerMap) OnTerminated

func (h *PollerMap) OnTerminated(ctx context.Context, pollerID PollerID)

func (*PollerMap) SetCallbacks

func (h *PollerMap) SetCallbacks(callbacks V2DataReceiver)

func (*PollerMap) SetTyping

func (h *PollerMap) SetTyping(ctx context.Context, pollerID PollerID, roomID string, ephEvent json.RawMessage)

func (*PollerMap) Terminate

func (h *PollerMap) Terminate()

Terminate all pollers. Useful in tests.

func (*PollerMap) UpdateDeviceSince

func (h *PollerMap) UpdateDeviceSince(ctx context.Context, userID, deviceID, since string)

func (*PollerMap) UpdateUnreadCounts

func (h *PollerMap) UpdateUnreadCounts(ctx context.Context, roomID, userID string, highlightCount, notifCount *int)

type Storage

type Storage struct {
	DevicesTable *DevicesTable
	TokensTable  *TokensTable
	DB           *sqlx.DB
}

func NewStore

func NewStore(postgresURI, secret string) *Storage

func NewStoreWithDB added in v0.99.4

func NewStoreWithDB(db *sqlx.DB, secret string) *Storage

func (*Storage) Teardown

func (s *Storage) Teardown()

type SyncResponse

type SyncResponse struct {
	NextBatch   string         `json:"next_batch"`
	AccountData EventsResponse `json:"account_data"`
	Presence    struct {
		Events []json.RawMessage `json:"events,omitempty"`
	} `json:"presence"`
	Rooms       SyncRoomsResponse `json:"rooms"`
	ToDevice    EventsResponse    `json:"to_device"`
	DeviceLists struct {
		Changed []string `json:"changed,omitempty"`
		Left    []string `json:"left,omitempty"`
	} `json:"device_lists"`
	DeviceListsOTKCount          map[string]int `json:"device_one_time_keys_count,omitempty"`
	DeviceUnusedFallbackKeyTypes []string       `json:"device_unused_fallback_key_types,omitempty"`
}

type SyncRoomsResponse

type SyncRoomsResponse struct {
	Join   map[string]SyncV2JoinResponse   `json:"join"`
	Invite map[string]SyncV2InviteResponse `json:"invite"`
	Leave  map[string]SyncV2LeaveResponse  `json:"leave"`
}

type SyncV2InviteResponse

type SyncV2InviteResponse struct {
	InviteState EventsResponse `json:"invite_state"`
}

InviteResponse represents a /sync response for a room which is under the 'invite' key.

type SyncV2JoinResponse

type SyncV2JoinResponse struct {
	State               EventsResponse      `json:"state"`
	Timeline            TimelineResponse    `json:"timeline"`
	Ephemeral           EventsResponse      `json:"ephemeral"`
	AccountData         EventsResponse      `json:"account_data"`
	UnreadNotifications UnreadNotifications `json:"unread_notifications"`
}

JoinResponse represents a /sync response for a room which is under the 'join' or 'peek' key.

type SyncV2LeaveResponse

type SyncV2LeaveResponse struct {
	State struct {
		Events []json.RawMessage `json:"events"`
	} `json:"state"`
	Timeline struct {
		Events    []json.RawMessage `json:"events"`
		Limited   bool              `json:"limited"`
		PrevBatch string            `json:"prev_batch,omitempty"`
	} `json:"timeline"`
}

LeaveResponse represents a /sync response for a room which is under the 'leave' key.

type TimelineResponse

type TimelineResponse struct {
	Events    []json.RawMessage `json:"events"`
	Limited   bool              `json:"limited"`
	PrevBatch string            `json:"prev_batch,omitempty"`
}

type Token added in v0.99.3

type Token struct {
	AccessToken          string
	AccessTokenHash      string
	AccessTokenEncrypted string    `db:"token_encrypted"`
	UserID               string    `db:"user_id"`
	DeviceID             string    `db:"device_id"`
	LastSeen             time.Time `db:"last_seen"`
}

type TokenForPoller added in v0.99.3

type TokenForPoller struct {
	*Token
	Since string `db:"since"`
}

TokenForPoller represents a row of the tokens table, together with any data maintained by pollers for that token's device.

type TokensTable added in v0.99.3

type TokensTable struct {
	// contains filtered or unexported fields
}

TokensTable remembers sync v2 tokens

func NewTokensTable added in v0.99.3

func NewTokensTable(db *sqlx.DB, secret string) *TokensTable

NewTokensTable creates the syncv3_sync2_tokens table if it does not already exist.

func (*TokensTable) Delete added in v0.99.3

func (t *TokensTable) Delete(accessTokenHash string) error

Delete looks up a token by its hash and deletes the row. If no token exists with the given hash, a warning is logged but no error is returned.

func (*TokensTable) GetTokenAndSince added in v0.99.3

func (t *TokensTable) GetTokenAndSince(userID, deviceID, tokenHash string) (accessToken, since string, err error)

func (*TokensTable) Insert added in v0.99.3

func (t *TokensTable) Insert(txn *sqlx.Tx, plaintextToken, userID, deviceID string, lastSeen time.Time) (*Token, error)

Insert a new token into the table.

func (*TokensTable) MaybeUpdateLastSeen added in v0.99.3

func (t *TokensTable) MaybeUpdateLastSeen(token *Token, newLastSeen time.Time) error

MaybeUpdateLastSeen actions a request to update a Token struct with its last_seen value in the DB. To avoid spamming the DB with a write every time a sync3 request arrives, we only update the last seen timestamp or the if it is at least 24 hours old. The timestamp is updated on the Token struct if and only if it is updated in the DB.

func (*TokensTable) Token added in v0.99.3

func (t *TokensTable) Token(plaintextToken string) (*Token, error)

Token retrieves a tokens row from the database if it exists. Errors with sql.NoRowsError if the token does not exist. Errors with an unspecified error otherwise.

func (*TokensTable) TokenForEachDevice added in v0.99.3

func (t *TokensTable) TokenForEachDevice(txn *sqlx.Tx) (tokens []TokenForPoller, err error)

TokenForEachDevice loads the most recently used token for each device. If given a transaction, it will SELECT inside that transaction.

type UnreadNotifications

type UnreadNotifications struct {
	HighlightCount    *int `json:"highlight_count,omitempty"`
	NotificationCount *int `json:"notification_count,omitempty"`
}

type V2DataReceiver

type V2DataReceiver interface {
	// Update the since token for this device. Called AFTER all other data in this sync response has been processed.
	UpdateDeviceSince(ctx context.Context, userID, deviceID, since string)
	// Accumulate data for this room. This means the timeline section of the v2 response.
	// Return an error to stop the since token advancing.
	Accumulate(ctx context.Context, userID, deviceID, roomID string, timeline TimelineResponse) error // latest pos with event nids of timeline entries
	// Initialise the room, if it hasn't been already. This means the state section of the v2 response.
	// If given a state delta from an incremental sync, returns the slice of all state events unknown to the DB.
	// Return an error to stop the since token advancing.
	Initialise(ctx context.Context, roomID string, state []json.RawMessage) error // snapshot ID?
	// SetTyping indicates which users are typing.
	SetTyping(ctx context.Context, pollerID PollerID, roomID string, ephEvent json.RawMessage)
	// Sent when there is a new receipt
	OnReceipt(ctx context.Context, userID, roomID, ephEventType string, ephEvent json.RawMessage)
	// AddToDeviceMessages adds this chunk of to_device messages. Preserve the ordering.
	// Return an error to stop the since token advancing.
	AddToDeviceMessages(ctx context.Context, userID, deviceID string, msgs []json.RawMessage) error
	// UpdateUnreadCounts sets the highlight_count and notification_count for this user in this room.
	UpdateUnreadCounts(ctx context.Context, roomID, userID string, highlightCount, notifCount *int)
	// Set the latest account data for this user.
	// Return an error to stop the since token advancing.
	OnAccountData(ctx context.Context, userID, roomID string, events []json.RawMessage) error // ping update with types? Can you race when re-querying?
	// Sent when there is a room in the `invite` section of the v2 response.
	// Return an error to stop the since token advancing.
	OnInvite(ctx context.Context, userID, roomID string, inviteState []json.RawMessage) error // invitestate in db
	// Sent when there is a room in the `leave` section of the v2 response.
	// Return an error to stop the since token advancing.
	OnLeftRoom(ctx context.Context, userID, roomID string, leaveEvent json.RawMessage) error
	// Sent when there is a _change_ in E2EE data, not all the time
	OnE2EEData(ctx context.Context, userID, deviceID string, otkCounts map[string]int, fallbackKeyTypes []string, deviceListChanges map[string]int) error
	// Sent when the poll loop terminates
	OnTerminated(ctx context.Context, pollerID PollerID)
	// Sent when the token gets a 401 response
	OnExpiredToken(ctx context.Context, accessTokenHash, userID, deviceID string)
}

V2DataReceiver is the receiver for all the v2 sync data the poller gets

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL