Skip to content

Commit

Permalink
Emit events during migration from libolm (#3982)
Browse files Browse the repository at this point in the history
* Fix `CryptoStore.countEndToEndSessions`

This was apparently never tested, and was implemented incorrectly.

* Add `CryptoStore.countEndToEndInboundGroupSessions`

* Emit events to indicate migration progress
  • Loading branch information
richvdh authored Jan 16, 2024
1 parent 815c36e commit 06e8cea
Show file tree
Hide file tree
Showing 12 changed files with 185 additions and 9 deletions.
25 changes: 23 additions & 2 deletions spec/integ/crypto/rust-crypto.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import "fake-indexeddb/auto";
import { IDBFactory } from "fake-indexeddb";
import fetchMock from "fetch-mock-jest";

import { createClient, IndexedDBCryptoStore } from "../../../src";
import { createClient, CryptoEvent, IndexedDBCryptoStore } from "../../../src";
import { populateStore } from "../../test-utils/test_indexeddb_cryptostore_dump";

jest.setTimeout(15000);
Expand Down Expand Up @@ -124,6 +124,9 @@ describe("MatrixClient.initRustCrypto", () => {
pickleKey: "+1k2Ppd7HIisUY824v7JtV3/oEE4yX0TqtmNPyhaD7o",
});

const progressListener = jest.fn();
matrixClient.addListener(CryptoEvent.LegacyCryptoStoreMigrationProgress, progressListener);

await matrixClient.initRustCrypto();

// Do some basic checks on the imported data
Expand All @@ -132,7 +135,25 @@ describe("MatrixClient.initRustCrypto", () => {
expect(deviceKeys.ed25519).toEqual("qK70DEqIXq7T+UU3v/al47Ab4JkMEBLpNrTBMbS5rrw");

expect(await matrixClient.getCrypto()!.getActiveSessionBackupVersion()).toEqual("7");
});

// check the progress callback
expect(progressListener.mock.calls.length).toBeGreaterThan(50);

// The first call should have progress == 0
const [firstProgress, totalSteps] = progressListener.mock.calls[0];
expect(totalSteps).toBeGreaterThan(3000);
expect(firstProgress).toEqual(0);

for (let i = 1; i < progressListener.mock.calls.length - 1; i++) {
const [progress, total] = progressListener.mock.calls[i];
expect(total).toEqual(totalSteps);
expect(progress).toBeGreaterThan(progressListener.mock.calls[i - 1][0]);
expect(progress).toBeLessThanOrEqual(totalSteps);
}

// The final call should have progress == total == -1
expect(progressListener).toHaveBeenLastCalledWith(-1, -1);
}, 60000);
});

describe("MatrixClient.clearStores", () => {
Expand Down
8 changes: 8 additions & 0 deletions spec/unit/crypto/store/CryptoStore.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,12 @@ describe.each([
const N_SESSIONS_PER_DEVICE = 6;
await createSessions(N_DEVICES, N_SESSIONS_PER_DEVICE);

let nSessions = 0;
await store.doTxn("readonly", [IndexedDBCryptoStore.STORE_SESSIONS], (txn) =>
store.countEndToEndSessions(txn, (n) => (nSessions = n)),
);
expect(nSessions).toEqual(N_DEVICES * N_SESSIONS_PER_DEVICE);

// Then, get a batch and check it looks right.
const batch = await store.getEndToEndSessionsBatch();
expect(batch!.length).toEqual(N_DEVICES * N_SESSIONS_PER_DEVICE);
Expand Down Expand Up @@ -150,6 +156,8 @@ describe.each([
await store.markSessionsNeedingBackup([{ senderKey: pad43("device5"), sessionId: "session5" }], txn);
});

expect(await store.countEndToEndInboundGroupSessions()).toEqual(N_DEVICES * N_SESSIONS_PER_DEVICE);

const batch = await store.getEndToEndInboundGroupSessionsBatch();
expect(batch!.length).toEqual(N_DEVICES * N_SESSIONS_PER_DEVICE);
for (let i = 0; i < N_DEVICES; i++) {
Expand Down
5 changes: 5 additions & 0 deletions spec/unit/rust-crypto/rust-crypto.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,10 @@ describe("initRustCrypto", () => {

fetchMock.get("path:/_matrix/client/v3/room_keys/version", { version: "45" });

function legacyMigrationProgressListener(progress: number, total: number): void {
logger.log(`migrated ${progress} of ${total}`);
}

await initRustCrypto({
logger,
http: makeMatrixHttpApi(),
Expand All @@ -204,6 +208,7 @@ describe("initRustCrypto", () => {
storePassphrase: "storePassphrase",
legacyCryptoStore: legacyStore,
legacyPickleKey: PICKLE_KEY,
legacyMigrationProgressListener,
});

// Check that the migration functions were correctly called
Expand Down
6 changes: 5 additions & 1 deletion src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -962,7 +962,8 @@ type CryptoEvents =
| CryptoEvent.KeysChanged
| CryptoEvent.Warning
| CryptoEvent.DevicesUpdated
| CryptoEvent.WillUpdateDevices;
| CryptoEvent.WillUpdateDevices
| CryptoEvent.LegacyCryptoStoreMigrationProgress;

type MatrixEventEvents = MatrixEventEvent.Decrypted | MatrixEventEvent.Replaced | MatrixEventEvent.VisibilityChange;

Expand Down Expand Up @@ -2330,6 +2331,9 @@ export class MatrixClient extends TypedEventEmitter<EmittedEvents, ClientEventHa
storePassphrase: this.pickleKey,
legacyCryptoStore: this.cryptoStore,
legacyPickleKey: this.pickleKey ?? "DEFAULT_KEY",
legacyMigrationProgressListener: (progress, total) => {
this.emit(CryptoEvent.LegacyCryptoStoreMigrationProgress, progress, total);
},
});

rustCrypto.setSupportedVerificationMethods(this.verificationMethods);
Expand Down
11 changes: 11 additions & 0 deletions src/crypto/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,15 @@ export enum CryptoEvent {
WillUpdateDevices = "crypto.willUpdateDevices",
DevicesUpdated = "crypto.devicesUpdated",
KeysChanged = "crossSigning.keysChanged",

/**
* Fires when data is being migrated from legacy crypto to rust crypto.
*
* The payload is a pair `(progress, total)`, where `progress` is the number of steps completed so far, and
* `total` is the total number of steps. When migration is complete, a final instance of the event is emitted, with
* `progress === total === -1`.
*/
LegacyCryptoStoreMigrationProgress = "crypto.legacyCryptoStoreMigrationProgress",
}

export type CryptoEventHandlerMap = {
Expand Down Expand Up @@ -368,6 +377,8 @@ export type CryptoEventHandlerMap = {
*/
[CryptoEvent.DevicesUpdated]: (users: string[], initialFetch: boolean) => void;
[CryptoEvent.UserCrossSigningUpdated]: (userId: string) => void;

[CryptoEvent.LegacyCryptoStoreMigrationProgress]: (progress: number, total: number) => void;
};

export class Crypto extends TypedEventEmitter<CryptoEvent, CryptoEventHandlerMap> implements CryptoBackend {
Expand Down
7 changes: 7 additions & 0 deletions src/crypto/store/base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,13 @@ export interface CryptoStore {
txn: unknown,
): void;

/**
* Count the number of Megolm sessions in the database.
*
* @internal
*/
countEndToEndInboundGroupSessions(): Promise<number>;

/**
* Get a batch of Megolm sessions from the database.
*
Expand Down
19 changes: 19 additions & 0 deletions src/crypto/store/indexeddb-crypto-store-backend.ts
Original file line number Diff line number Diff line change
Expand Up @@ -811,6 +811,25 @@ export class Backend implements CryptoStore {
});
}

/**
* Count the number of Megolm sessions in the database.
*
* Implementation of {@link CryptoStore.countEndToEndInboundGroupSessions}.
*
* @internal
*/
public async countEndToEndInboundGroupSessions(): Promise<number> {
let result = 0;
await this.doTxn("readonly", [IndexedDBCryptoStore.STORE_INBOUND_GROUP_SESSIONS], (txn) => {
const sessionStore = txn.objectStore(IndexedDBCryptoStore.STORE_INBOUND_GROUP_SESSIONS);
const countReq = sessionStore.count();
countReq.onsuccess = (): void => {
result = countReq.result;
};
});
return result;
}

/**
* Fetch a batch of Megolm sessions from the database.
*
Expand Down
11 changes: 11 additions & 0 deletions src/crypto/store/indexeddb-crypto-store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,17 @@ export class IndexedDBCryptoStore implements CryptoStore {
return this.backend!.filterOutNotifiedErrorDevices(devices);
}

/**
* Count the number of Megolm sessions in the database.
*
* Implementation of {@link CryptoStore.countEndToEndInboundGroupSessions}.
*
* @internal
*/
public countEndToEndInboundGroupSessions(): Promise<number> {
return this.backend!.countEndToEndInboundGroupSessions();
}

/**
* Fetch a batch of Olm sessions from the database.
*
Expand Down
24 changes: 23 additions & 1 deletion src/crypto/store/localStorage-crypto-store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,11 @@ export class LocalStorageCryptoStore extends MemoryCryptoStore implements Crypto
public countEndToEndSessions(txn: unknown, func: (count: number) => void): void {
let count = 0;
for (let i = 0; i < this.store.length; ++i) {
if (this.store.key(i)?.startsWith(keyEndToEndSessions(""))) ++count;
const key = this.store.key(i);
if (key?.startsWith(keyEndToEndSessions(""))) {
const sessions = getJsonItem(this.store, key);
count += Object.keys(sessions ?? {}).length;
}
}
func(count);
}
Expand Down Expand Up @@ -351,6 +355,24 @@ export class LocalStorageCryptoStore extends MemoryCryptoStore implements Crypto
setJsonItem(this.store, keyEndToEndInboundGroupSessionWithheld(senderCurve25519Key, sessionId), sessionData);
}

/**
* Count the number of Megolm sessions in the database.
*
* Implementation of {@link CryptoStore.countEndToEndInboundGroupSessions}.
*
* @internal
*/
public async countEndToEndInboundGroupSessions(): Promise<number> {
let count = 0;
for (let i = 0; i < this.store.length; ++i) {
const key = this.store.key(i);
if (key?.startsWith(KEY_INBOUND_SESSION_PREFIX)) {
count += 1;
}
}
return count;
}

/**
* Fetch a batch of Megolm sessions from the database.
*
Expand Down
17 changes: 16 additions & 1 deletion src/crypto/store/memory-crypto-store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,11 @@ export class MemoryCryptoStore implements CryptoStore {
// Olm Sessions

public countEndToEndSessions(txn: unknown, func: (count: number) => void): void {
func(Object.keys(this.sessions).length);
let count = 0;
for (const deviceSessions of Object.values(this.sessions)) {
count += Object.keys(deviceSessions).length;
}
func(count);
}

public getEndToEndSession(
Expand Down Expand Up @@ -528,6 +532,17 @@ export class MemoryCryptoStore implements CryptoStore {
this.inboundGroupSessionsWithheld[k] = sessionData;
}

/**
* Count the number of Megolm sessions in the database.
*
* Implementation of {@link CryptoStore.countEndToEndInboundGroupSessions}.
*
* @internal
*/
public async countEndToEndInboundGroupSessions(): Promise<number> {
return Object.keys(this.inboundGroupSessions).length;
}

/**
* Fetch a batch of Megolm sessions from the database.
*
Expand Down
7 changes: 7 additions & 0 deletions src/rust-crypto/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,13 @@ export async function initRustCrypto(args: {

/** The pickle key for `legacyCryptoStore` */
legacyPickleKey?: string;

/**
* A callback which will receive progress updates on migration from `legacyCryptoStore`.
*
* Called with (-1, -1) to mark the end of migration.
*/
legacyMigrationProgressListener?: (progress: number, total: number) => void;
}): Promise<RustCrypto> {
const { logger } = args;

Expand Down
54 changes: 50 additions & 4 deletions src/rust-crypto/libolm_migration.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,13 @@ export async function migrateFromLegacyCrypto(args: {

/** Rust crypto store to migrate data into. */
storeHandle: RustSdkCryptoJs.StoreHandle;

/**
* A callback which will receive progress updates on migration from `legacyStore`.
*
* Called with (-1, -1) to mark the end of migration.
*/
legacyMigrationProgressListener?: (progress: number, total: number) => void;
}): Promise<void> {
const { logger, legacyStore } = args;

Expand All @@ -74,6 +81,20 @@ export async function migrateFromLegacyCrypto(args: {
return;
}

const nOlmSessions = await countOlmSessions(logger, legacyStore);
const nMegolmSessions = await countMegolmSessions(logger, legacyStore);
const totalSteps = 1 + nOlmSessions + nMegolmSessions;
logger.info(
`Migrating data from legacy crypto store. ${nOlmSessions} olm sessions and ${nMegolmSessions} megolm sessions to migrate.`,
);

let stepsDone = 0;
function onProgress(steps: number): void {
stepsDone += steps;
args.legacyMigrationProgressListener?.(stepsDone, totalSteps);
}
onProgress(0);

const pickleKey = new TextEncoder().encode(args.legacyPickleKey);

if (migrationState === MigrationState.NOT_STARTED) {
Expand All @@ -83,23 +104,30 @@ export async function migrateFromLegacyCrypto(args: {
migrationState = MigrationState.INITIAL_DATA_MIGRATED;
await legacyStore.setMigrationState(migrationState);
}
onProgress(1);

if (migrationState === MigrationState.INITIAL_DATA_MIGRATED) {
logger.info("Migrating data from legacy crypto store. Step 2: olm sessions");
await migrateOlmSessions(logger, legacyStore, pickleKey, args.storeHandle);
logger.info(
`Migrating data from legacy crypto store. Step 2: olm sessions (${nOlmSessions} sessions to migrate).`,
);
await migrateOlmSessions(logger, legacyStore, pickleKey, args.storeHandle, onProgress);

migrationState = MigrationState.OLM_SESSIONS_MIGRATED;
await legacyStore.setMigrationState(migrationState);
}

if (migrationState === MigrationState.OLM_SESSIONS_MIGRATED) {
logger.info("Migrating data from legacy crypto store. Step 3: megolm sessions");
await migrateMegolmSessions(logger, legacyStore, pickleKey, args.storeHandle);
logger.info(
`Migrating data from legacy crypto store. Step 3: megolm sessions (${nMegolmSessions} sessions to migrate).`,
);
await migrateMegolmSessions(logger, legacyStore, pickleKey, args.storeHandle, onProgress);

migrationState = MigrationState.MEGOLM_SESSIONS_MIGRATED;
await legacyStore.setMigrationState(migrationState);
}

// Migration is done.
args.legacyMigrationProgressListener?.(-1, -1);
logger.info("Migration from legacy crypto store complete");
}

Expand Down Expand Up @@ -147,11 +175,26 @@ async function migrateBaseData(
await RustSdkCryptoJs.Migration.migrateBaseData(migrationData, pickleKey, storeHandle);
}

async function countOlmSessions(logger: Logger, legacyStore: CryptoStore): Promise<number> {
logger.debug("Counting olm sessions to be migrated");
let nSessions: number;
await legacyStore.doTxn("readonly", [IndexedDBCryptoStore.STORE_SESSIONS], (txn) =>
legacyStore.countEndToEndSessions(txn, (n) => (nSessions = n)),
);
return nSessions!;
}

async function countMegolmSessions(logger: Logger, legacyStore: CryptoStore): Promise<number> {
logger.debug("Counting megolm sessions to be migrated");
return await legacyStore.countEndToEndInboundGroupSessions();
}

async function migrateOlmSessions(
logger: Logger,
legacyStore: CryptoStore,
pickleKey: Uint8Array,
storeHandle: RustSdkCryptoJs.StoreHandle,
onBatchDone: (batchSize: number) => void,
): Promise<void> {
// eslint-disable-next-line no-constant-condition
while (true) {
Expand All @@ -170,6 +213,7 @@ async function migrateOlmSessions(

await RustSdkCryptoJs.Migration.migrateOlmSessions(migrationData, pickleKey, storeHandle);
await legacyStore.deleteEndToEndSessionsBatch(batch);
onBatchDone(batch.length);
}
}

Expand All @@ -178,6 +222,7 @@ async function migrateMegolmSessions(
legacyStore: CryptoStore,
pickleKey: Uint8Array,
storeHandle: RustSdkCryptoJs.StoreHandle,
onBatchDone: (batchSize: number) => void,
): Promise<void> {
// eslint-disable-next-line no-constant-condition
while (true) {
Expand All @@ -204,6 +249,7 @@ async function migrateMegolmSessions(

await RustSdkCryptoJs.Migration.migrateMegolmSessions(migrationData, pickleKey, storeHandle);
await legacyStore.deleteEndToEndInboundGroupSessionsBatch(batch);
onBatchDone(batch.length);
}
}

Expand Down

0 comments on commit 06e8cea

Please sign in to comment.