Skip to content

Commit

Permalink
feat: buffer pending messages during join by external commit process …
Browse files Browse the repository at this point in the history
…to tolerate unordered messages
  • Loading branch information
beltram committed Jul 20, 2023
1 parent b7c18cd commit 680c487
Show file tree
Hide file tree
Showing 12 changed files with 401 additions and 21 deletions.
5 changes: 5 additions & 0 deletions crypto/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,11 @@ pub enum CryptoError {
/// Tried to decrypt a commit created by self which is likely to have been replayed by the DS
#[error("Tried to decrypt a commit created by self which is likely to have been replayed by the DS")]
SelfCommitIgnored,
/// You tried to join with an external commit but did not merge it. We will renew this message for you when you merge your external commit
#[error(
"You tried to join with an external commit but did not merge it. We will renew this message for you when you merge your external commit"
)]
PendingMessage,
}

/// A simpler definition for Result types that the Error is a [CryptoError]
Expand Down
197 changes: 197 additions & 0 deletions crypto/src/mls/buffer_external_commit.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
//! This file is intended to fix some issues we have with the Delivery Service. When a client joins
//! a group via an external commit, it sometimes receives messages (most of the time renewed external
//! proposals) for the new epoch whereas it does not yet have the confirmation from the DS that his
//! external has been accepted. Hence it is not merged locally and it cannot decrypt any message.
//!
//! Feel free to delete all of this when the issue is fixed on the DS side !

use crate::prelude::{
ConversationId, CryptoError, CryptoResult, MlsCentral, MlsConversation, MlsConversationDecryptMessage,
};
use crate::MlsError;
use core_crypto_keystore::entities::{EntityFindParams, MlsPendingMessage, PersistedMlsPendingGroup};
use openmls::prelude::{MlsMessageIn, MlsMessageInBody};
use tls_codec::Deserialize;

impl MlsCentral {
pub(crate) async fn handle_when_group_is_pending(
&mut self,
id: &ConversationId,
message: impl AsRef<[u8]>,
) -> CryptoResult<MlsConversationDecryptMessage> {
let keystore = self.mls_backend.borrow_keystore();
let Ok(Some(pending_group)) = keystore.find::<PersistedMlsPendingGroup>(id).await else {
return Err(CryptoError::ConversationNotFound(id.clone()));
};

let pending_msg = MlsPendingMessage {
id: pending_group.id.clone(),
message: message.as_ref().to_vec(),
};
keystore.save::<MlsPendingMessage>(pending_msg).await?;
Err(CryptoError::PendingMessage)
}

pub(crate) async fn restore_pending_messages(
&mut self,
conversation: &mut MlsConversation,
) -> CryptoResult<Option<Vec<MlsConversationDecryptMessage>>> {
let keystore = self.mls_backend.borrow_keystore();

let mut pending_messages = keystore
.find_all::<MlsPendingMessage>(EntityFindParams::default())
.await?
.into_iter()
.try_fold(vec![], |mut acc, m| {
let msg = MlsMessageIn::tls_deserialize_bytes(&m.message).map_err(MlsError::from)?;
let ct = match msg.body_as_ref() {
MlsMessageInBody::PublicMessage(m) => Ok(m.content_type()),
MlsMessageInBody::PrivateMessage(m) => Ok(m.content_type()),
_ => Err(CryptoError::ImplementationError),
}?;
acc.push((ct as u8, msg));
CryptoResult::Ok(acc)
})?;

pending_messages.sort_by(|(a, _), (b, _)| a.cmp(b));

let mut decrypted_messages = vec![];
for (_, m) in pending_messages {
let parent_conversation = if let Some(parent_id) = &conversation.parent_id {
Some(
self.get_conversation(parent_id)
.await
.map_err(|_| CryptoError::ParentGroupNotFound)?,
)
} else {
None
};
// let parent_conversation = self.get_parent_conversation(&conversation).await?;
let decrypted = conversation
.decrypt_message(
m,
parent_conversation,
self.mls_client()?,
&self.mls_backend,
self.callbacks.as_ref().map(|boxed| boxed.as_ref()),
)
.await?;
decrypted_messages.push(decrypted);
}

let decrypted_messages = (!decrypted_messages.is_empty()).then_some(decrypted_messages);

Ok(decrypted_messages)
}
}

#[cfg(test)]
pub mod tests {
use crate::{prelude::MlsProposal, test_utils::*, CryptoError};
use wasm_bindgen_test::*;

wasm_bindgen_test_configure!(run_in_browser);

// #[apply(all_cred_cipher)]
// #[wasm_bindgen_test]
#[async_std::test]
pub async fn should_buffer_and_reapply_messages_after_external_commit_merged(/*case: TestCase*/) {
let case = TestCase::default();
run_test_with_client_ids(
case.clone(),
["alice", "bob", "charlie", "debbie"],
move |[mut alice_central, mut bob_central, mut charlie_central, mut debbie_central]| {
Box::pin(async move {
let id = conversation_id();
alice_central
.new_conversation(id.clone(), case.credential_type, case.cfg.clone())
.await
.unwrap();

// Bob tries to join Alice's group with an external commit
let gi = alice_central.get_group_info(&id).await;
let external_commit = bob_central
.join_by_external_commit(gi, case.custom_cfg(), case.credential_type)
.await
.unwrap();

// Alice decrypts the external commit...
alice_central
.decrypt_message(&id, external_commit.commit.to_bytes().unwrap())
.await
.unwrap();

// Meanwhile Debbie joins the party by creating an external proposal
let epoch = alice_central.conversation_epoch(&id).await.unwrap();
let external_proposal = debbie_central
.new_external_add_proposal(id.clone(), epoch.into(), case.ciphersuite(), case.credential_type)
.await
.unwrap();

// ...then Alice generates new messages for this epoch
let app_msg = alice_central.encrypt_message(&id, b"Hello Bob !").await.unwrap();
let proposal = alice_central
.new_proposal(&id, MlsProposal::Update)
.await
.unwrap()
.proposal;
alice_central
.decrypt_message(&id, external_proposal.to_bytes().unwrap())
.await
.unwrap();
let charlie = charlie_central.rand_member(&case).await;
let commit = alice_central
.add_members_to_conversation(&id, &mut [charlie])
.await
.unwrap();
alice_central.commit_accepted(&id).await.unwrap();
charlie_central.process_welcome_message(commit.welcome.clone().into(), case.custom_cfg()).await.unwrap();
debbie_central.process_welcome_message(commit.welcome.clone().into(), case.custom_cfg()).await.unwrap();

// And now Bob will have to decrypt those messages while he hasn't yet merged its external commit
// To add more fun, he will buffer the messages in exactly the wrong order (to make
// sure he reapplies them in the right order afterwards)
let messages = [commit.commit, external_proposal, proposal].map(|m| m.to_bytes().unwrap());
for m in messages {
let decrypt = bob_central.decrypt_message(&id, m).await;
assert!(matches!(decrypt.unwrap_err(), CryptoError::PendingMessage));
}
let decrypt = bob_central.decrypt_message(&id, app_msg).await;
assert!(matches!(decrypt.unwrap_err(), CryptoError::PendingMessage));

// Finally, Bob receives the green light from the DS and he can merge the external commit
let Some(restored_messages) = bob_central.merge_pending_group_from_external_commit(&id).await.unwrap() else {
panic!("Alice's messages should have been restored at this point");
};
for (i, m) in restored_messages.into_iter().enumerate() {
match i {
0 => {
// this is the application message
assert_eq!(&m.app_msg.unwrap(), b"Hello Bob !");
assert!(!m.has_epoch_changed);
}
1 | 2 => {
// this is either the member or the external proposal
assert!(m.app_msg.is_none());
assert!(!m.has_epoch_changed);
}
3 => {
// this is the commit
assert!(m.app_msg.is_none());
assert!(m.has_epoch_changed);
}
_ => unreachable!(),
}
};
// because external commit got merged
assert!(bob_central.try_talk_to(&id, &mut alice_central).await.is_ok());
// because Alice's commit got merged
assert!(bob_central.try_talk_to(&id, &mut charlie_central).await.is_ok());
// because Debbie's external proposal got merged through the commit
assert!(bob_central.try_talk_to(&id, &mut debbie_central).await.is_ok());
})
},
)
.await
}
}
25 changes: 13 additions & 12 deletions crypto/src/mls/conversation/decrypt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,21 +61,19 @@ impl MlsConversation {
#[cfg_attr(test, crate::durable)]
pub async fn decrypt_message(
&mut self,
message: impl AsRef<[u8]>,
message: MlsMessageIn,
parent_conversation: Option<GroupStoreValue<MlsConversation>>,
client: &Client,
backend: &MlsCryptoProvider,
callbacks: Option<&dyn CoreCryptoCallbacks>,
) -> CryptoResult<MlsConversationDecryptMessage> {
let msg_in = openmls::framing::MlsMessageIn::tls_deserialize_bytes(message.as_ref()).map_err(MlsError::from)?;

// handles the crooked case where we receive our own commits.
// Since this would result in an error in openmls, we handle it here
if let Some(ct) = self.maybe_self_member_commit(&msg_in)? {
if let Some(ct) = self.maybe_self_member_commit(&message)? {
return self.handle_self_member_commit(backend, ct).await;
}

let message = self.parse_message(backend, msg_in).await?;
let message = self.parse_message(backend, message).await?;

let msg_epoch = message.epoch();

Expand Down Expand Up @@ -235,17 +233,20 @@ impl MlsCentral {
/// from OpenMls and the KeyStore
pub async fn decrypt_message(
&mut self,
conversation_id: &ConversationId,
id: &ConversationId,
message: impl AsRef<[u8]>,
) -> CryptoResult<MlsConversationDecryptMessage> {
let parent_conversation = self.get_parent_conversation(conversation_id).await?;
let decrypt_message = self
.get_conversation(conversation_id)
.await?
let msg = MlsMessageIn::tls_deserialize_bytes(message.as_ref()).map_err(MlsError::from)?;
let Ok(conversation) = self.get_conversation(id).await else {
return self.handle_when_group_is_pending(id, message).await;
};
// let conversation = self.get_conversation(id).await?;
let parent_conversation = self.get_parent_conversation(&conversation).await?;
let decrypt_message = conversation
.write()
.await
.decrypt_message(
message.as_ref(),
msg,
parent_conversation,
self.mls_client()?,
&self.mls_backend,
Expand All @@ -254,7 +255,7 @@ impl MlsCentral {
.await?;

if !decrypt_message.is_active {
self.wipe_conversation(conversation_id).await?;
self.wipe_conversation(id).await?;
}
Ok(decrypt_message)
}
Expand Down
4 changes: 2 additions & 2 deletions crypto/src/mls/conversation/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ use mls_crypto_provider::MlsCryptoProvider;

use config::MlsConversationConfiguration;

use crate::group_store::GroupStoreValue;
use crate::{
mls::{client::Client, member::MemberId, ClientId, MlsCentral},
prelude::{CryptoError, CryptoResult, MlsCiphersuite, MlsCredentialType, MlsError},
Expand Down Expand Up @@ -242,9 +243,8 @@ impl MlsCentral {

pub(crate) async fn get_parent_conversation(
&mut self,
id: &ConversationId,
conversation: &GroupStoreValue<MlsConversation>,
) -> CryptoResult<Option<crate::group_store::GroupStoreValue<MlsConversation>>> {
let conversation = self.get_conversation(id).await?;
let conversation_lock = conversation.read().await;
if let Some(parent_id) = conversation_lock.parent_id.as_ref() {
Ok(Some(
Expand Down
20 changes: 13 additions & 7 deletions crypto/src/mls/external_commit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use core_crypto_keystore::entities::PersistedMlsPendingGroup;
use core_crypto_keystore::CryptoKeystoreMls;
use tls_codec::Serialize;

use crate::prelude::MlsCiphersuite;
use crate::prelude::{MlsCiphersuite, MlsConversationDecryptMessage};
use crate::{
group_store::GroupStoreValue,
prelude::{
Expand Down Expand Up @@ -141,10 +141,12 @@ impl MlsCentral {
///
/// # Errors
/// Errors resulting from OpenMls, the KeyStore calls and deserialization
pub async fn merge_pending_group_from_external_commit(&mut self, id: &ConversationId) -> CryptoResult<()> {
pub async fn merge_pending_group_from_external_commit(
&mut self,
id: &ConversationId,
) -> CryptoResult<Option<Vec<MlsConversationDecryptMessage>>> {
// Retrieve the pending MLS group from the keystore
let keystore = self.mls_backend.key_store();
let (group, cfg) = keystore.mls_pending_groups_load(id).await?;
let (group, cfg) = self.mls_backend.key_store().mls_pending_groups_load(id).await?;

let mut mls_group = core_crypto_keystore::deser::<MlsGroup>(&group)?;

Expand All @@ -164,12 +166,16 @@ impl MlsCentral {

// Persist the now usable MLS group in the keystore
// TODO: find a way to make the insertion of the MlsGroup and deletion of the pending group transactional
let conversation = MlsConversation::from_mls_group(mls_group, configuration, &self.mls_backend).await?;
let mut conversation = MlsConversation::from_mls_group(mls_group, configuration, &self.mls_backend).await?;

let pending_messages = self.restore_pending_messages(&mut conversation).await?;

self.mls_groups.insert(id.clone(), conversation);

// cleanup the pending group we no longer need
keystore.mls_pending_groups_delete(id).await?;
Ok(())
self.mls_backend.key_store().mls_pending_groups_delete(id).await?;

Ok(pending_messages)
}

/// In case the external commit generated by [join_by_external_commit] is rejected by the Delivery Service
Expand Down
1 change: 1 addition & 0 deletions crypto/src/mls/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use crate::prelude::{
MlsError,
};

pub(crate) mod buffer_external_commit;
pub(crate) mod ciphersuite;
pub(crate) mod client;
pub(crate) mod conversation;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
CREATE TABLE mls_pending_messages (
id BLOB,
message BLOB,
FOREIGN KEY(id) REFERENCES mls_pending_groups(id)
);
5 changes: 5 additions & 0 deletions keystore/src/connection/platform/wasm/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,11 @@ impl DatabaseConnection for WasmConnection {
.auto_increment(false)
.add_index(Index::new("id", "id").unique(true)),
)
.add_object_store(
ObjectStore::new("mls_pending_messages")
.auto_increment(false)
.add_index(Index::new("id", "id").unique(true)),
)
.add_object_store(
ObjectStore::new("e2ei_enrollment")
.auto_increment(false)
Expand Down
9 changes: 9 additions & 0 deletions keystore/src/entities/mls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,15 @@ pub struct PersistedMlsPendingGroup {
pub custom_configuration: Vec<u8>,
}

/// Entity representing a buffered message
#[derive(Debug, Clone, PartialEq, Eq, Zeroize)]
#[zeroize(drop)]
#[cfg_attr(target_family = "wasm", derive(serde::Serialize, serde::Deserialize))]
pub struct MlsPendingMessage {
pub id: Vec<u8>,
pub message: Vec<u8>,
}

/// Entity representing a persisted `Credential`
#[derive(Debug, Clone, PartialEq, Eq, Zeroize)]
#[zeroize(drop)]
Expand Down
1 change: 1 addition & 0 deletions keystore/src/entities/platform/generic/mls/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,6 @@ pub mod group;
pub mod hpke_private_key;
pub mod keypackage;
pub mod pending_group;
pub mod pending_message;
pub mod psk_bundle;
pub mod signature_keypair;
Loading

0 comments on commit 680c487

Please sign in to comment.