diff --git a/crypto/src/error.rs b/crypto/src/error.rs index 053e5880a3..6b7b4b142c 100644 --- a/crypto/src/error.rs +++ b/crypto/src/error.rs @@ -175,6 +175,11 @@ pub enum CryptoError { /// Tried to decrypt a commit created by self which is likely to have been replayed by the DS #[error("Tried to decrypt a commit created by self which is likely to have been replayed by the DS")] SelfCommitIgnored, + /// You tried to join with an external commit but did not merge it. We will renew this message for you when you merge your external commit + #[error( + "You tried to join with an external commit but did not merge it. We will renew this message for you when you merge your external commit" + )] + PendingMessage, } /// A simpler definition for Result types that the Error is a [CryptoError] diff --git a/crypto/src/mls/buffer_external_commit.rs b/crypto/src/mls/buffer_external_commit.rs new file mode 100644 index 0000000000..c552da483a --- /dev/null +++ b/crypto/src/mls/buffer_external_commit.rs @@ -0,0 +1,197 @@ +//! This file is intended to fix some issues we have with the Delivery Service. When a client joins +//! a group via an external commit, it sometimes receives messages (most of the time renewed external +//! proposals) for the new epoch whereas it does not yet have the confirmation from the DS that his +//! external has been accepted. Hence it is not merged locally and it cannot decrypt any message. +//! +//! Feel free to delete all of this when the issue is fixed on the DS side ! + +use crate::prelude::{ + ConversationId, CryptoError, CryptoResult, MlsCentral, MlsConversation, MlsConversationDecryptMessage, +}; +use crate::MlsError; +use core_crypto_keystore::entities::{EntityFindParams, MlsPendingMessage, PersistedMlsPendingGroup}; +use openmls::prelude::{MlsMessageIn, MlsMessageInBody}; +use tls_codec::Deserialize; + +impl MlsCentral { + pub(crate) async fn handle_when_group_is_pending( + &mut self, + id: &ConversationId, + message: impl AsRef<[u8]>, + ) -> CryptoResult { + let keystore = self.mls_backend.borrow_keystore(); + let Ok(Some(pending_group)) = keystore.find::(id).await else { + return Err(CryptoError::ConversationNotFound(id.clone())); + }; + + let pending_msg = MlsPendingMessage { + id: pending_group.id.clone(), + message: message.as_ref().to_vec(), + }; + keystore.save::(pending_msg).await?; + Err(CryptoError::PendingMessage) + } + + pub(crate) async fn restore_pending_messages( + &mut self, + conversation: &mut MlsConversation, + ) -> CryptoResult>> { + let keystore = self.mls_backend.borrow_keystore(); + + let mut pending_messages = keystore + .find_all::(EntityFindParams::default()) + .await? + .into_iter() + .try_fold(vec![], |mut acc, m| { + let msg = MlsMessageIn::tls_deserialize_bytes(&m.message).map_err(MlsError::from)?; + let ct = match msg.body_as_ref() { + MlsMessageInBody::PublicMessage(m) => Ok(m.content_type()), + MlsMessageInBody::PrivateMessage(m) => Ok(m.content_type()), + _ => Err(CryptoError::ImplementationError), + }?; + acc.push((ct as u8, msg)); + CryptoResult::Ok(acc) + })?; + + pending_messages.sort_by(|(a, _), (b, _)| a.cmp(b)); + + let mut decrypted_messages = vec![]; + for (_, m) in pending_messages { + let parent_conversation = if let Some(parent_id) = &conversation.parent_id { + Some( + self.get_conversation(parent_id) + .await + .map_err(|_| CryptoError::ParentGroupNotFound)?, + ) + } else { + None + }; + // let parent_conversation = self.get_parent_conversation(&conversation).await?; + let decrypted = conversation + .decrypt_message( + m, + parent_conversation, + self.mls_client()?, + &self.mls_backend, + self.callbacks.as_ref().map(|boxed| boxed.as_ref()), + ) + .await?; + decrypted_messages.push(decrypted); + } + + let decrypted_messages = (!decrypted_messages.is_empty()).then_some(decrypted_messages); + + Ok(decrypted_messages) + } +} + +#[cfg(test)] +pub mod tests { + use crate::{prelude::MlsProposal, test_utils::*, CryptoError}; + use wasm_bindgen_test::*; + + wasm_bindgen_test_configure!(run_in_browser); + + // #[apply(all_cred_cipher)] + // #[wasm_bindgen_test] + #[async_std::test] + pub async fn should_buffer_and_reapply_messages_after_external_commit_merged(/*case: TestCase*/) { + let case = TestCase::default(); + run_test_with_client_ids( + case.clone(), + ["alice", "bob", "charlie", "debbie"], + move |[mut alice_central, mut bob_central, mut charlie_central, mut debbie_central]| { + Box::pin(async move { + let id = conversation_id(); + alice_central + .new_conversation(id.clone(), case.credential_type, case.cfg.clone()) + .await + .unwrap(); + + // Bob tries to join Alice's group with an external commit + let gi = alice_central.get_group_info(&id).await; + let external_commit = bob_central + .join_by_external_commit(gi, case.custom_cfg(), case.credential_type) + .await + .unwrap(); + + // Alice decrypts the external commit... + alice_central + .decrypt_message(&id, external_commit.commit.to_bytes().unwrap()) + .await + .unwrap(); + + // Meanwhile Debbie joins the party by creating an external proposal + let epoch = alice_central.conversation_epoch(&id).await.unwrap(); + let external_proposal = debbie_central + .new_external_add_proposal(id.clone(), epoch.into(), case.ciphersuite(), case.credential_type) + .await + .unwrap(); + + // ...then Alice generates new messages for this epoch + let app_msg = alice_central.encrypt_message(&id, b"Hello Bob !").await.unwrap(); + let proposal = alice_central + .new_proposal(&id, MlsProposal::Update) + .await + .unwrap() + .proposal; + alice_central + .decrypt_message(&id, external_proposal.to_bytes().unwrap()) + .await + .unwrap(); + let charlie = charlie_central.rand_member(&case).await; + let commit = alice_central + .add_members_to_conversation(&id, &mut [charlie]) + .await + .unwrap(); + alice_central.commit_accepted(&id).await.unwrap(); + charlie_central.process_welcome_message(commit.welcome.clone().into(), case.custom_cfg()).await.unwrap(); + debbie_central.process_welcome_message(commit.welcome.clone().into(), case.custom_cfg()).await.unwrap(); + + // And now Bob will have to decrypt those messages while he hasn't yet merged its external commit + // To add more fun, he will buffer the messages in exactly the wrong order (to make + // sure he reapplies them in the right order afterwards) + let messages = [commit.commit, external_proposal, proposal].map(|m| m.to_bytes().unwrap()); + for m in messages { + let decrypt = bob_central.decrypt_message(&id, m).await; + assert!(matches!(decrypt.unwrap_err(), CryptoError::PendingMessage)); + } + let decrypt = bob_central.decrypt_message(&id, app_msg).await; + assert!(matches!(decrypt.unwrap_err(), CryptoError::PendingMessage)); + + // Finally, Bob receives the green light from the DS and he can merge the external commit + let Some(restored_messages) = bob_central.merge_pending_group_from_external_commit(&id).await.unwrap() else { + panic!("Alice's messages should have been restored at this point"); + }; + for (i, m) in restored_messages.into_iter().enumerate() { + match i { + 0 => { + // this is the application message + assert_eq!(&m.app_msg.unwrap(), b"Hello Bob !"); + assert!(!m.has_epoch_changed); + } + 1 | 2 => { + // this is either the member or the external proposal + assert!(m.app_msg.is_none()); + assert!(!m.has_epoch_changed); + } + 3 => { + // this is the commit + assert!(m.app_msg.is_none()); + assert!(m.has_epoch_changed); + } + _ => unreachable!(), + } + }; + // because external commit got merged + assert!(bob_central.try_talk_to(&id, &mut alice_central).await.is_ok()); + // because Alice's commit got merged + assert!(bob_central.try_talk_to(&id, &mut charlie_central).await.is_ok()); + // because Debbie's external proposal got merged through the commit + assert!(bob_central.try_talk_to(&id, &mut debbie_central).await.is_ok()); + }) + }, + ) + .await + } +} diff --git a/crypto/src/mls/conversation/decrypt.rs b/crypto/src/mls/conversation/decrypt.rs index 2fbf62b28f..7a647ae08d 100644 --- a/crypto/src/mls/conversation/decrypt.rs +++ b/crypto/src/mls/conversation/decrypt.rs @@ -61,21 +61,19 @@ impl MlsConversation { #[cfg_attr(test, crate::durable)] pub async fn decrypt_message( &mut self, - message: impl AsRef<[u8]>, + message: MlsMessageIn, parent_conversation: Option>, client: &Client, backend: &MlsCryptoProvider, callbacks: Option<&dyn CoreCryptoCallbacks>, ) -> CryptoResult { - let msg_in = openmls::framing::MlsMessageIn::tls_deserialize_bytes(message.as_ref()).map_err(MlsError::from)?; - // handles the crooked case where we receive our own commits. // Since this would result in an error in openmls, we handle it here - if let Some(ct) = self.maybe_self_member_commit(&msg_in)? { + if let Some(ct) = self.maybe_self_member_commit(&message)? { return self.handle_self_member_commit(backend, ct).await; } - let message = self.parse_message(backend, msg_in).await?; + let message = self.parse_message(backend, message).await?; let msg_epoch = message.epoch(); @@ -235,17 +233,20 @@ impl MlsCentral { /// from OpenMls and the KeyStore pub async fn decrypt_message( &mut self, - conversation_id: &ConversationId, + id: &ConversationId, message: impl AsRef<[u8]>, ) -> CryptoResult { - let parent_conversation = self.get_parent_conversation(conversation_id).await?; - let decrypt_message = self - .get_conversation(conversation_id) - .await? + let msg = MlsMessageIn::tls_deserialize_bytes(message.as_ref()).map_err(MlsError::from)?; + let Ok(conversation) = self.get_conversation(id).await else { + return self.handle_when_group_is_pending(id, message).await; + }; + // let conversation = self.get_conversation(id).await?; + let parent_conversation = self.get_parent_conversation(&conversation).await?; + let decrypt_message = conversation .write() .await .decrypt_message( - message.as_ref(), + msg, parent_conversation, self.mls_client()?, &self.mls_backend, @@ -254,7 +255,7 @@ impl MlsCentral { .await?; if !decrypt_message.is_active { - self.wipe_conversation(conversation_id).await?; + self.wipe_conversation(id).await?; } Ok(decrypt_message) } diff --git a/crypto/src/mls/conversation/mod.rs b/crypto/src/mls/conversation/mod.rs index ef6fca886d..34042e2939 100644 --- a/crypto/src/mls/conversation/mod.rs +++ b/crypto/src/mls/conversation/mod.rs @@ -37,6 +37,7 @@ use mls_crypto_provider::MlsCryptoProvider; use config::MlsConversationConfiguration; +use crate::group_store::GroupStoreValue; use crate::{ mls::{client::Client, member::MemberId, ClientId, MlsCentral}, prelude::{CryptoError, CryptoResult, MlsCiphersuite, MlsCredentialType, MlsError}, @@ -242,9 +243,8 @@ impl MlsCentral { pub(crate) async fn get_parent_conversation( &mut self, - id: &ConversationId, + conversation: &GroupStoreValue, ) -> CryptoResult>> { - let conversation = self.get_conversation(id).await?; let conversation_lock = conversation.read().await; if let Some(parent_id) = conversation_lock.parent_id.as_ref() { Ok(Some( diff --git a/crypto/src/mls/external_commit.rs b/crypto/src/mls/external_commit.rs index d46f86feb0..0a74208ae2 100644 --- a/crypto/src/mls/external_commit.rs +++ b/crypto/src/mls/external_commit.rs @@ -21,7 +21,7 @@ use core_crypto_keystore::entities::PersistedMlsPendingGroup; use core_crypto_keystore::CryptoKeystoreMls; use tls_codec::Serialize; -use crate::prelude::MlsCiphersuite; +use crate::prelude::{MlsCiphersuite, MlsConversationDecryptMessage}; use crate::{ group_store::GroupStoreValue, prelude::{ @@ -141,10 +141,12 @@ impl MlsCentral { /// /// # Errors /// Errors resulting from OpenMls, the KeyStore calls and deserialization - pub async fn merge_pending_group_from_external_commit(&mut self, id: &ConversationId) -> CryptoResult<()> { + pub async fn merge_pending_group_from_external_commit( + &mut self, + id: &ConversationId, + ) -> CryptoResult>> { // Retrieve the pending MLS group from the keystore - let keystore = self.mls_backend.key_store(); - let (group, cfg) = keystore.mls_pending_groups_load(id).await?; + let (group, cfg) = self.mls_backend.key_store().mls_pending_groups_load(id).await?; let mut mls_group = core_crypto_keystore::deser::(&group)?; @@ -164,12 +166,16 @@ impl MlsCentral { // Persist the now usable MLS group in the keystore // TODO: find a way to make the insertion of the MlsGroup and deletion of the pending group transactional - let conversation = MlsConversation::from_mls_group(mls_group, configuration, &self.mls_backend).await?; + let mut conversation = MlsConversation::from_mls_group(mls_group, configuration, &self.mls_backend).await?; + + let pending_messages = self.restore_pending_messages(&mut conversation).await?; + self.mls_groups.insert(id.clone(), conversation); // cleanup the pending group we no longer need - keystore.mls_pending_groups_delete(id).await?; - Ok(()) + self.mls_backend.key_store().mls_pending_groups_delete(id).await?; + + Ok(pending_messages) } /// In case the external commit generated by [join_by_external_commit] is rejected by the Delivery Service diff --git a/crypto/src/mls/mod.rs b/crypto/src/mls/mod.rs index 2ea81abc61..5e9e00264d 100644 --- a/crypto/src/mls/mod.rs +++ b/crypto/src/mls/mod.rs @@ -8,6 +8,7 @@ use crate::prelude::{ MlsError, }; +pub(crate) mod buffer_external_commit; pub(crate) mod ciphersuite; pub(crate) mod client; pub(crate) mod conversation; diff --git a/keystore/src/connection/platform/generic/migrations/V3__pending_messages.sql b/keystore/src/connection/platform/generic/migrations/V3__pending_messages.sql new file mode 100644 index 0000000000..7b6a7128a7 --- /dev/null +++ b/keystore/src/connection/platform/generic/migrations/V3__pending_messages.sql @@ -0,0 +1,5 @@ +CREATE TABLE mls_pending_messages ( + id BLOB, + message BLOB, + FOREIGN KEY(id) REFERENCES mls_pending_groups(id) +); diff --git a/keystore/src/connection/platform/wasm/mod.rs b/keystore/src/connection/platform/wasm/mod.rs index 697286daa9..26b70738f4 100644 --- a/keystore/src/connection/platform/wasm/mod.rs +++ b/keystore/src/connection/platform/wasm/mod.rs @@ -101,6 +101,11 @@ impl DatabaseConnection for WasmConnection { .auto_increment(false) .add_index(Index::new("id", "id").unique(true)), ) + .add_object_store( + ObjectStore::new("mls_pending_messages") + .auto_increment(false) + .add_index(Index::new("id", "id").unique(true)), + ) .add_object_store( ObjectStore::new("e2ei_enrollment") .auto_increment(false) diff --git a/keystore/src/entities/mls.rs b/keystore/src/entities/mls.rs index 36004a6d77..681b0fd06f 100644 --- a/keystore/src/entities/mls.rs +++ b/keystore/src/entities/mls.rs @@ -71,6 +71,15 @@ pub struct PersistedMlsPendingGroup { pub custom_configuration: Vec, } +/// Entity representing a buffered message +#[derive(Debug, Clone, PartialEq, Eq, Zeroize)] +#[zeroize(drop)] +#[cfg_attr(target_family = "wasm", derive(serde::Serialize, serde::Deserialize))] +pub struct MlsPendingMessage { + pub id: Vec, + pub message: Vec, +} + /// Entity representing a persisted `Credential` #[derive(Debug, Clone, PartialEq, Eq, Zeroize)] #[zeroize(drop)] diff --git a/keystore/src/entities/platform/generic/mls/mod.rs b/keystore/src/entities/platform/generic/mls/mod.rs index 5d79644dfd..9417adbd55 100644 --- a/keystore/src/entities/platform/generic/mls/mod.rs +++ b/keystore/src/entities/platform/generic/mls/mod.rs @@ -21,5 +21,6 @@ pub mod group; pub mod hpke_private_key; pub mod keypackage; pub mod pending_group; +pub mod pending_message; pub mod psk_bundle; pub mod signature_keypair; diff --git a/keystore/src/entities/platform/generic/mls/pending_message.rs b/keystore/src/entities/platform/generic/mls/pending_message.rs new file mode 100644 index 0000000000..5f1dcaf367 --- /dev/null +++ b/keystore/src/entities/platform/generic/mls/pending_message.rs @@ -0,0 +1,148 @@ +// Wire +// Copyright (C) 2022 Wire Swiss GmbH + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see http://www.gnu.org/licenses/. + +use crate::{ + connection::{DatabaseConnection, KeystoreDatabaseConnection}, + entities::{Entity, EntityBase, EntityFindParams, MlsPendingMessage, StringEntityId}, + MissingKeyErrorKind, +}; + +impl Entity for MlsPendingMessage { + fn id_raw(&self) -> &[u8] { + self.id.as_slice() + } +} + +#[async_trait::async_trait(?Send)] +impl EntityBase for MlsPendingMessage { + type ConnectionType = KeystoreDatabaseConnection; + type AutoGeneratedFields = (); + + fn to_missing_key_err_kind() -> MissingKeyErrorKind { + MissingKeyErrorKind::MlsPendingMessages + } + + async fn save(&self, conn: &mut Self::ConnectionType) -> crate::CryptoKeystoreResult<()> { + let transaction = conn.transaction()?; + + Self::ConnectionType::check_buffer_size(self.id.len())?; + Self::ConnectionType::check_buffer_size(self.message.len())?; + + let zid = rusqlite::blob::ZeroBlob(self.id.len() as i32); + let zmsg = rusqlite::blob::ZeroBlob(self.message.len() as i32); + + let id_bytes = &self.id; + + use rusqlite::ToSql as _; + transaction.execute( + "INSERT INTO mls_pending_messages (id, message) VALUES(?, ?)", + [&zid.to_sql()?, &zmsg.to_sql()?], + )?; + let rowid = transaction.last_insert_rowid(); + + let mut blob = + transaction.blob_open(rusqlite::DatabaseName::Main, "mls_pending_messages", "id", rowid, false)?; + use std::io::Write as _; + blob.write_all(id_bytes)?; + blob.close()?; + + let mut blob = transaction.blob_open( + rusqlite::DatabaseName::Main, + "mls_pending_messages", + "message", + rowid, + false, + )?; + blob.write_all(&self.message)?; + blob.close()?; + + transaction.commit()?; + + Ok(()) + } + + async fn find_one( + _conn: &mut Self::ConnectionType, + _id: &StringEntityId, + ) -> crate::CryptoKeystoreResult> { + unreachable!() + } + + async fn find_all( + conn: &mut Self::ConnectionType, + params: EntityFindParams, + ) -> crate::CryptoKeystoreResult> { + let transaction = conn.transaction()?; + let query: String = format!("SELECT rowid FROM mls_pending_messages {}", params.to_sql()); + + let mut stmt = transaction.prepare_cached(&query)?; + let mut rows = stmt.query_map([], |r| r.get(0))?; + let entities = rows.try_fold(Vec::new(), |mut acc, rowid_result| { + use std::io::Read as _; + let rowid = rowid_result?; + + let mut blob = + transaction.blob_open(rusqlite::DatabaseName::Main, "mls_pending_messages", "id", rowid, true)?; + let mut id = vec![]; + blob.read_to_end(&mut id)?; + blob.close()?; + + let mut blob = transaction.blob_open( + rusqlite::DatabaseName::Main, + "mls_pending_messages", + "message", + rowid, + true, + )?; + let mut message = vec![]; + blob.read_to_end(&mut message)?; + blob.close()?; + + acc.push(Self { id, message }); + crate::CryptoKeystoreResult::Ok(acc) + })?; + + Ok(entities) + } + + async fn find_many( + _conn: &mut Self::ConnectionType, + _ids: &[StringEntityId], + ) -> crate::CryptoKeystoreResult> { + unimplemented!() + } + + async fn count(conn: &mut Self::ConnectionType) -> crate::CryptoKeystoreResult { + Ok(conn.query_row("SELECT COUNT(*) FROM mls_pending_messages", [], |r| r.get(0))?) + } + + async fn delete(conn: &mut Self::ConnectionType, ids: &[StringEntityId]) -> crate::CryptoKeystoreResult<()> { + let transaction = conn.transaction()?; + let len = ids.len(); + let mut updated = 0; + for id in ids { + updated += transaction.execute("DELETE FROM mls_pending_messages WHERE id = ?", [id.as_slice()])?; + } + + if updated == len { + transaction.commit()?; + Ok(()) + } else { + transaction.rollback()?; + Err(Self::to_missing_key_err_kind().into()) + } + } +} diff --git a/keystore/src/error.rs b/keystore/src/error.rs index 7a8471de8c..0a4afe176d 100644 --- a/keystore/src/error.rs +++ b/keystore/src/error.rs @@ -33,6 +33,8 @@ pub enum MissingKeyErrorKind { MlsGroup, #[error("MLS Persisted Pending Group")] MlsPendingGroup, + #[error("MLS Pending Messages")] + MlsPendingMessages, #[error("End-to-end identity enrollment")] E2eiEnrollment, #[cfg(feature = "proteus-keystore")]