diff --git a/fastpay_core/Cargo.toml b/fastpay_core/Cargo.toml index 13d6d09f85a6f..7cd8d377390e0 100644 --- a/fastpay_core/Cargo.toml +++ b/fastpay_core/Cargo.toml @@ -27,7 +27,8 @@ move-core-types = { git = "https://github.com/diem/move", rev="d253bf1c21314679d move-package = { git = "https://github.com/diem/move", rev="d253bf1c21314679d17e6935116f30baf409f264" } move-vm-runtime = { git = "https://github.com/diem/move", rev="d253bf1c21314679d17e6935116f30baf409f264" } -typed-store = { git = "https://github.com/MystenLabs/mysten-infra", rev = "0ef3f1fedcfbf3dfe0eeea65e05de073b7c25733" } + +typed-store = { git = "https://github.com/MystenLabs/mysten-infra", rev = "2e829074f40c9ef852ecd6808b80d083174c778b" } [dev-dependencies] fdlimit = "0.2.1" diff --git a/fastpay_core/src/authority/authority_store.rs b/fastpay_core/src/authority/authority_store.rs index e32008a7a6acc..f5cd96f12d9fd 100644 --- a/fastpay_core/src/authority/authority_store.rs +++ b/fastpay_core/src/authority/authority_store.rs @@ -79,8 +79,7 @@ impl AuthorityStore { .owner_index .iter() // The object id [0; 16] is the smallest possible - .skip_to(&(account, AccountAddress::from([0; 16]))) - .map_err(|_| FastPayError::StorageError)? + .skip_to(&(account, AccountAddress::from([0; 16])))? .take_while(|((owner, _id), _object_ref)| (owner == &account)) .map(|((_owner, _id), object_ref)| object_ref) .collect()) @@ -91,34 +90,22 @@ impl AuthorityStore { transaction_digest: &TransactionDigest, ) -> Result { Ok(OrderInfoResponse { - signed_order: self - .signed_orders - .get(transaction_digest) - .map_err(|_| FastPayError::StorageError)?, - certified_order: self - .certificates - .get(transaction_digest) - .map_err(|_| FastPayError::StorageError)?, - signed_effects: self - .signed_effects - .get(transaction_digest) - .map_err(|_| FastPayError::StorageError)?, + signed_order: self.signed_orders.get(transaction_digest)?, + certified_order: self.certificates.get(transaction_digest)?, + signed_effects: self.signed_effects.get(transaction_digest)?, }) } /// Read an object and return it, or Err(ObjectNotFound) if the object was not found. pub fn object_state(&self, object_id: &ObjectID) -> Result { self.objects - .get(object_id) - .map_err(|_| FastPayError::StorageError)? + .get(object_id)? .ok_or(FastPayError::ObjectNotFound) } /// Get many objects pub fn get_objects(&self, _objects: &[ObjectID]) -> Result>, FastPayError> { - self.objects - .multi_get(_objects) - .map_err(|_| FastPayError::StorageError) + self.objects.multi_get(_objects).map_err(|e| e.into()) } /// Read a lock or returns Err(OrderLockDoesNotExist) if the lock does not exist. @@ -128,15 +115,13 @@ impl AuthorityStore { ) -> Result, FastPayError> { let order_option = self .order_lock - .get(object_ref) - .map_err(|_| FastPayError::StorageError)? + .get(object_ref)? .ok_or(FastPayError::OrderLockDoesNotExist)?; match order_option { Some(tx_digest) => Ok(Some( self.signed_orders - .get(&tx_digest) - .map_err(|_| FastPayError::StorageError)? + .get(&tx_digest)? .expect("Stored a lock without storing order?"), )), None => Ok(None), @@ -148,9 +133,7 @@ impl AuthorityStore { &self, digest: &TransactionDigest, ) -> Result, FastPayError> { - self.certificates - .get(digest) - .map_err(|_| FastPayError::StorageError) + self.certificates.get(digest).map_err(|e| e.into()) } /// Read the transactionDigest that is the parent of an object reference @@ -159,9 +142,7 @@ impl AuthorityStore { &self, object_ref: &ObjectRef, ) -> Result, FastPayError> { - self.parent_sync - .get(object_ref) - .map_err(|_| FastPayError::StorageError) + self.parent_sync.get(object_ref).map_err(|e| e.into()) } /// Returns all parents (object_ref and transaction digests) that match an object_id, at @@ -178,8 +159,7 @@ impl AuthorityStore { .parent_sync .iter() // The object id [0; 16] is the smallest possible - .skip_to(&(object_id, seq_inner, obj_dig_inner)) - .map_err(|_| FastPayError::StorageError)? + .skip_to(&(object_id, seq_inner, obj_dig_inner))? .take_while(|((id, iseq, _digest), _txd)| { let mut flag = id == &object_id; if seq.is_some() { @@ -194,14 +174,11 @@ impl AuthorityStore { /// Insert an object pub fn insert_object(&self, object: Object) -> Result<(), FastPayError> { - self.objects - .insert(&object.id(), &object) - .map_err(|_| FastPayError::StorageError)?; + self.objects.insert(&object.id(), &object)?; // Update the index self.owner_index - .insert(&(object.owner, object.id()), &object.to_object_reference()) - .map_err(|_| FastPayError::StorageError)?; + .insert(&(object.owner, object.id()), &object.to_object_reference())?; Ok(()) } @@ -209,9 +186,7 @@ impl AuthorityStore { /// Initialize a lock to an object reference to None, but keep it /// as it is if a value is already present. pub fn init_order_lock(&self, object_ref: ObjectRef) -> Result<(), FastPayError> { - self.order_lock - .get_or_insert(&object_ref, || None) - .map_err(|_| FastPayError::StorageError)?; + self.order_lock.get_or_insert(&object_ref, || None)?; Ok(()) } @@ -235,13 +210,11 @@ impl AuthorityStore { mutable_input_objects .iter() .map(|obj_ref| (*obj_ref, Some(tx_digest))), - ) - .map_err(|_| FastPayError::StorageError)? + )? .insert_batch( &self.signed_orders, std::iter::once((tx_digest, signed_order)), - ) - .map_err(|_| FastPayError::StorageError)?; + )?; // This is the critical region: testing the locks and writing the // new locks must be atomic, and not writes should happen in between. @@ -249,10 +222,7 @@ impl AuthorityStore { // Aquire the lock to ensure no one else writes when we are in here. let _mutexes = self.aqcuire_locks(mutable_input_objects); - let locks = self - .order_lock - .multi_get(mutable_input_objects) - .map_err(|_| FastPayError::StorageError)?; + let locks = self.order_lock.multi_get(mutable_input_objects)?; for (obj_ref, lock) in mutable_input_objects.iter().zip(locks) { // The object / version must exist, and therefore lock initialized. @@ -261,8 +231,7 @@ impl AuthorityStore { if let Some(previous_tx_digest) = lock { if previous_tx_digest != tx_digest { let prev_order = self - .get_order_lock(obj_ref) - .map_err(|_| FastPayError::StorageError)? + .get_order_lock(obj_ref)? .expect("If we have a lock we should have an order."); // TODO: modify ConflictingOrder to only return the order digest here. @@ -274,7 +243,7 @@ impl AuthorityStore { } // Atomic write of all locks - lock_batch.write().map_err(|_| FastPayError::StorageError) + lock_batch.write().map_err(|e| e.into()) // Implicit: drop(_mutexes); } // End of critical region @@ -297,31 +266,23 @@ impl AuthorityStore { let mut write_batch = self.order_lock.batch(); // Archive the old lock. - write_batch = write_batch - .delete_batch(&self.order_lock, active_inputs.iter().cloned()) - .map_err(|_| FastPayError::StorageError)?; + write_batch = write_batch.delete_batch(&self.order_lock, active_inputs.iter().cloned())?; // Store the certificate indexed by transaction digest let transaction_digest: TransactionDigest = certificate.order.digest(); - write_batch = write_batch - .insert_batch( - &self.certificates, - std::iter::once((transaction_digest, certificate.clone())), - ) - .map_err(|_| FastPayError::StorageError)?; + write_batch = write_batch.insert_batch( + &self.certificates, + std::iter::once((transaction_digest, certificate.clone())), + )?; // Store the signed effects of the order - write_batch = write_batch - .insert_batch( - &self.signed_effects, - std::iter::once((transaction_digest, signed_effects.clone())), - ) - .map_err(|_| FastPayError::StorageError)?; + write_batch = write_batch.insert_batch( + &self.signed_effects, + std::iter::once((transaction_digest, signed_effects.clone())), + )?; // Delete objects - write_batch = write_batch - .delete_batch(&self.objects, deleted.clone().into_iter()) - .map_err(|_| FastPayError::StorageError)?; + write_batch = write_batch.delete_batch(&self.objects, deleted.clone().into_iter())?; // Make an iterator over all objects that are either deleted or have changed owner, // along with their old owner. This is used to update the owner index. @@ -341,51 +302,41 @@ impl AuthorityStore { ); // Delete the old owner index entries - write_batch = write_batch - .delete_batch(&self.owner_index, old_object_owners) - .map_err(|_| FastPayError::StorageError)?; + write_batch = write_batch.delete_batch(&self.owner_index, old_object_owners)?; // Index the certificate by the objects created - write_batch = write_batch - .insert_batch( - &self.parent_sync, - written - .iter() - .map(|(_, object)| (object.to_object_reference(), transaction_digest)), - ) - .map_err(|_| FastPayError::StorageError)?; + write_batch = write_batch.insert_batch( + &self.parent_sync, + written + .iter() + .map(|(_, object)| (object.to_object_reference(), transaction_digest)), + )?; // Create locks for new objects, if they are not immutable - write_batch = write_batch - .insert_batch( - &self.order_lock, - written.iter().filter_map(|(_, new_object)| { - if !new_object.is_read_only() { - Some((new_object.to_object_reference(), None)) - } else { - None - } - }), - ) - .map_err(|_| FastPayError::StorageError)?; + write_batch = write_batch.insert_batch( + &self.order_lock, + written.iter().filter_map(|(_, new_object)| { + if !new_object.is_read_only() { + Some((new_object.to_object_reference(), None)) + } else { + None + } + }), + )?; // Update the indexes of the objects written - write_batch = write_batch - .insert_batch( - &self.owner_index, - written.iter().map(|(id, new_object)| { - ((new_object.owner, *id), new_object.to_object_reference()) - }), - ) - .map_err(|_| FastPayError::StorageError)?; + write_batch = write_batch.insert_batch( + &self.owner_index, + written.iter().map(|(id, new_object)| { + ((new_object.owner, *id), new_object.to_object_reference()) + }), + )?; // Insert each output object into the stores - write_batch = write_batch - .insert_batch( - &self.objects, - written.into_iter().map(|(id, new_object)| (id, new_object)), - ) - .map_err(|_| FastPayError::StorageError)?; + write_batch = write_batch.insert_batch( + &self.objects, + written.into_iter().map(|(id, new_object)| (id, new_object)), + )?; // Update the indexes of the objects written @@ -397,27 +348,19 @@ impl AuthorityStore { // Check the locks are still active // TODO: maybe we could just check if the certificate is there instead? - let locks = self - .order_lock - .multi_get(&active_inputs[..]) - .map_err(|_| FastPayError::StorageError)?; + let locks = self.order_lock.multi_get(&active_inputs[..])?; for object_lock in locks { object_lock.ok_or(FastPayError::OrderLockDoesNotExist)?; } // Atomic write of all locks & other data - write_batch - .write() - .map_err(|_| FastPayError::StorageError)?; + write_batch.write()?; // implict: drop(_mutexes); } // End of critical region Ok(OrderInfoResponse { - signed_order: self - .signed_orders - .get(&transaction_digest) - .map_err(|_| FastPayError::StorageError)?, + signed_order: self.signed_orders.get(&transaction_digest)?, certified_order: Some(certificate), signed_effects: Some(signed_effects), }) diff --git a/fastx_types/Cargo.toml b/fastx_types/Cargo.toml index d724f5c3f18ec..257033fe17f0b 100644 --- a/fastx_types/Cargo.toml +++ b/fastx_types/Cargo.toml @@ -22,6 +22,7 @@ hex = "0.4.3" serde_bytes = "0.11.5" serde_with = "1.11.0" +typed-store = { git = "https://github.com/MystenLabs/mysten-infra", rev = "2e829074f40c9ef852ecd6808b80d083174c778b" } move-binary-format = { git = "https://github.com/diem/move", rev="d253bf1c21314679d17e6935116f30baf409f264" } move-core-types = { git = "https://github.com/diem/move", rev="d253bf1c21314679d17e6935116f30baf409f264" } diff --git a/fastx_types/src/error.rs b/fastx_types/src/error.rs index 706f9fe6f66e1..31924edf032f3 100644 --- a/fastx_types/src/error.rs +++ b/fastx_types/src/error.rs @@ -153,7 +153,7 @@ pub enum FastPayError { #[error("Execution invariant violated")] ExecutionInvariantViolation, #[error("Storage error")] - StorageError, + StorageError(#[from] typed_store::rocks::TypedStoreError), } pub type FastPayResult = Result;