Skip to content

Commit

Permalink
fix: used the typed_store error type
Browse files Browse the repository at this point in the history
This updates the typed_store version to one with a TypedStore error type, rather than using `eyre::Error`.
As a consequence, it allows a more fine-grained conversion of store errors to `FastPay::StorageError`.
  • Loading branch information
huitseeker committed Jan 24, 2022
1 parent 80f1bbf commit 8f9491f
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 118 deletions.
3 changes: 2 additions & 1 deletion fastpay_core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ move-core-types = { git = "https://github.com/diem/move", rev="d253bf1c21314679d
move-package = { git = "https://github.com/diem/move", rev="d253bf1c21314679d17e6935116f30baf409f264" }
move-vm-runtime = { git = "https://github.com/diem/move", rev="d253bf1c21314679d17e6935116f30baf409f264" }

typed-store = { git = "https://github.com/MystenLabs/mysten-infra", rev = "0ef3f1fedcfbf3dfe0eeea65e05de073b7c25733" }

typed-store = { git = "https://github.com/MystenLabs/mysten-infra", rev = "2e829074f40c9ef852ecd6808b80d083174c778b" }

[dev-dependencies]
fdlimit = "0.2.1"
175 changes: 59 additions & 116 deletions fastpay_core/src/authority/authority_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,7 @@ impl AuthorityStore {
.owner_index
.iter()
// The object id [0; 16] is the smallest possible
.skip_to(&(account, AccountAddress::from([0; 16])))
.map_err(|_| FastPayError::StorageError)?
.skip_to(&(account, AccountAddress::from([0; 16])))?
.take_while(|((owner, _id), _object_ref)| (owner == &account))
.map(|((_owner, _id), object_ref)| object_ref)
.collect())
Expand All @@ -91,34 +90,22 @@ impl AuthorityStore {
transaction_digest: &TransactionDigest,
) -> Result<OrderInfoResponse, FastPayError> {
Ok(OrderInfoResponse {
signed_order: self
.signed_orders
.get(transaction_digest)
.map_err(|_| FastPayError::StorageError)?,
certified_order: self
.certificates
.get(transaction_digest)
.map_err(|_| FastPayError::StorageError)?,
signed_effects: self
.signed_effects
.get(transaction_digest)
.map_err(|_| FastPayError::StorageError)?,
signed_order: self.signed_orders.get(transaction_digest)?,
certified_order: self.certificates.get(transaction_digest)?,
signed_effects: self.signed_effects.get(transaction_digest)?,
})
}

/// Read an object and return it, or Err(ObjectNotFound) if the object was not found.
pub fn object_state(&self, object_id: &ObjectID) -> Result<Object, FastPayError> {
self.objects
.get(object_id)
.map_err(|_| FastPayError::StorageError)?
.get(object_id)?
.ok_or(FastPayError::ObjectNotFound)
}

/// Get many objects
pub fn get_objects(&self, _objects: &[ObjectID]) -> Result<Vec<Option<Object>>, FastPayError> {
self.objects
.multi_get(_objects)
.map_err(|_| FastPayError::StorageError)
self.objects.multi_get(_objects).map_err(|e| e.into())
}

/// Read a lock or returns Err(OrderLockDoesNotExist) if the lock does not exist.
Expand All @@ -128,15 +115,13 @@ impl AuthorityStore {
) -> Result<Option<SignedOrder>, FastPayError> {
let order_option = self
.order_lock
.get(object_ref)
.map_err(|_| FastPayError::StorageError)?
.get(object_ref)?
.ok_or(FastPayError::OrderLockDoesNotExist)?;

match order_option {
Some(tx_digest) => Ok(Some(
self.signed_orders
.get(&tx_digest)
.map_err(|_| FastPayError::StorageError)?
.get(&tx_digest)?
.expect("Stored a lock without storing order?"),
)),
None => Ok(None),
Expand All @@ -148,9 +133,7 @@ impl AuthorityStore {
&self,
digest: &TransactionDigest,
) -> Result<Option<CertifiedOrder>, FastPayError> {
self.certificates
.get(digest)
.map_err(|_| FastPayError::StorageError)
self.certificates.get(digest).map_err(|e| e.into())
}

/// Read the transactionDigest that is the parent of an object reference
Expand All @@ -159,9 +142,7 @@ impl AuthorityStore {
&self,
object_ref: &ObjectRef,
) -> Result<Option<TransactionDigest>, FastPayError> {
self.parent_sync
.get(object_ref)
.map_err(|_| FastPayError::StorageError)
self.parent_sync.get(object_ref).map_err(|e| e.into())
}

/// Returns all parents (object_ref and transaction digests) that match an object_id, at
Expand All @@ -178,8 +159,7 @@ impl AuthorityStore {
.parent_sync
.iter()
// The object id [0; 16] is the smallest possible
.skip_to(&(object_id, seq_inner, obj_dig_inner))
.map_err(|_| FastPayError::StorageError)?
.skip_to(&(object_id, seq_inner, obj_dig_inner))?
.take_while(|((id, iseq, _digest), _txd)| {
let mut flag = id == &object_id;
if seq.is_some() {
Expand All @@ -194,24 +174,19 @@ impl AuthorityStore {

/// Insert an object
pub fn insert_object(&self, object: Object) -> Result<(), FastPayError> {
self.objects
.insert(&object.id(), &object)
.map_err(|_| FastPayError::StorageError)?;
self.objects.insert(&object.id(), &object)?;

// Update the index
self.owner_index
.insert(&(object.owner, object.id()), &object.to_object_reference())
.map_err(|_| FastPayError::StorageError)?;
.insert(&(object.owner, object.id()), &object.to_object_reference())?;

Ok(())
}

/// Initialize a lock to an object reference to None, but keep it
/// as it is if a value is already present.
pub fn init_order_lock(&self, object_ref: ObjectRef) -> Result<(), FastPayError> {
self.order_lock
.get_or_insert(&object_ref, || None)
.map_err(|_| FastPayError::StorageError)?;
self.order_lock.get_or_insert(&object_ref, || None)?;
Ok(())
}

Expand All @@ -235,24 +210,19 @@ impl AuthorityStore {
mutable_input_objects
.iter()
.map(|obj_ref| (*obj_ref, Some(tx_digest))),
)
.map_err(|_| FastPayError::StorageError)?
)?
.insert_batch(
&self.signed_orders,
std::iter::once((tx_digest, signed_order)),
)
.map_err(|_| FastPayError::StorageError)?;
)?;

// This is the critical region: testing the locks and writing the
// new locks must be atomic, and not writes should happen in between.
{
// Aquire the lock to ensure no one else writes when we are in here.
let _mutexes = self.aqcuire_locks(mutable_input_objects);

let locks = self
.order_lock
.multi_get(mutable_input_objects)
.map_err(|_| FastPayError::StorageError)?;
let locks = self.order_lock.multi_get(mutable_input_objects)?;

for (obj_ref, lock) in mutable_input_objects.iter().zip(locks) {
// The object / version must exist, and therefore lock initialized.
Expand All @@ -261,8 +231,7 @@ impl AuthorityStore {
if let Some(previous_tx_digest) = lock {
if previous_tx_digest != tx_digest {
let prev_order = self
.get_order_lock(obj_ref)
.map_err(|_| FastPayError::StorageError)?
.get_order_lock(obj_ref)?
.expect("If we have a lock we should have an order.");

// TODO: modify ConflictingOrder to only return the order digest here.
Expand All @@ -274,7 +243,7 @@ impl AuthorityStore {
}

// Atomic write of all locks
lock_batch.write().map_err(|_| FastPayError::StorageError)
lock_batch.write().map_err(|e| e.into())

// Implicit: drop(_mutexes);
} // End of critical region
Expand All @@ -297,31 +266,23 @@ impl AuthorityStore {
let mut write_batch = self.order_lock.batch();

// Archive the old lock.
write_batch = write_batch
.delete_batch(&self.order_lock, active_inputs.iter().cloned())
.map_err(|_| FastPayError::StorageError)?;
write_batch = write_batch.delete_batch(&self.order_lock, active_inputs.iter().cloned())?;

// Store the certificate indexed by transaction digest
let transaction_digest: TransactionDigest = certificate.order.digest();
write_batch = write_batch
.insert_batch(
&self.certificates,
std::iter::once((transaction_digest, certificate.clone())),
)
.map_err(|_| FastPayError::StorageError)?;
write_batch = write_batch.insert_batch(
&self.certificates,
std::iter::once((transaction_digest, certificate.clone())),
)?;

// Store the signed effects of the order
write_batch = write_batch
.insert_batch(
&self.signed_effects,
std::iter::once((transaction_digest, signed_effects.clone())),
)
.map_err(|_| FastPayError::StorageError)?;
write_batch = write_batch.insert_batch(
&self.signed_effects,
std::iter::once((transaction_digest, signed_effects.clone())),
)?;

// Delete objects
write_batch = write_batch
.delete_batch(&self.objects, deleted.clone().into_iter())
.map_err(|_| FastPayError::StorageError)?;
write_batch = write_batch.delete_batch(&self.objects, deleted.clone().into_iter())?;

// Make an iterator over all objects that are either deleted or have changed owner,
// along with their old owner. This is used to update the owner index.
Expand All @@ -341,51 +302,41 @@ impl AuthorityStore {
);

// Delete the old owner index entries
write_batch = write_batch
.delete_batch(&self.owner_index, old_object_owners)
.map_err(|_| FastPayError::StorageError)?;
write_batch = write_batch.delete_batch(&self.owner_index, old_object_owners)?;

// Index the certificate by the objects created
write_batch = write_batch
.insert_batch(
&self.parent_sync,
written
.iter()
.map(|(_, object)| (object.to_object_reference(), transaction_digest)),
)
.map_err(|_| FastPayError::StorageError)?;
write_batch = write_batch.insert_batch(
&self.parent_sync,
written
.iter()
.map(|(_, object)| (object.to_object_reference(), transaction_digest)),
)?;

// Create locks for new objects, if they are not immutable
write_batch = write_batch
.insert_batch(
&self.order_lock,
written.iter().filter_map(|(_, new_object)| {
if !new_object.is_read_only() {
Some((new_object.to_object_reference(), None))
} else {
None
}
}),
)
.map_err(|_| FastPayError::StorageError)?;
write_batch = write_batch.insert_batch(
&self.order_lock,
written.iter().filter_map(|(_, new_object)| {
if !new_object.is_read_only() {
Some((new_object.to_object_reference(), None))
} else {
None
}
}),
)?;

// Update the indexes of the objects written
write_batch = write_batch
.insert_batch(
&self.owner_index,
written.iter().map(|(id, new_object)| {
((new_object.owner, *id), new_object.to_object_reference())
}),
)
.map_err(|_| FastPayError::StorageError)?;
write_batch = write_batch.insert_batch(
&self.owner_index,
written.iter().map(|(id, new_object)| {
((new_object.owner, *id), new_object.to_object_reference())
}),
)?;

// Insert each output object into the stores
write_batch = write_batch
.insert_batch(
&self.objects,
written.into_iter().map(|(id, new_object)| (id, new_object)),
)
.map_err(|_| FastPayError::StorageError)?;
write_batch = write_batch.insert_batch(
&self.objects,
written.into_iter().map(|(id, new_object)| (id, new_object)),
)?;

// Update the indexes of the objects written

Expand All @@ -397,27 +348,19 @@ impl AuthorityStore {

// Check the locks are still active
// TODO: maybe we could just check if the certificate is there instead?
let locks = self
.order_lock
.multi_get(&active_inputs[..])
.map_err(|_| FastPayError::StorageError)?;
let locks = self.order_lock.multi_get(&active_inputs[..])?;
for object_lock in locks {
object_lock.ok_or(FastPayError::OrderLockDoesNotExist)?;
}

// Atomic write of all locks & other data
write_batch
.write()
.map_err(|_| FastPayError::StorageError)?;
write_batch.write()?;

// implict: drop(_mutexes);
} // End of critical region

Ok(OrderInfoResponse {
signed_order: self
.signed_orders
.get(&transaction_digest)
.map_err(|_| FastPayError::StorageError)?,
signed_order: self.signed_orders.get(&transaction_digest)?,
certified_order: Some(certificate),
signed_effects: Some(signed_effects),
})
Expand Down
1 change: 1 addition & 0 deletions fastx_types/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ hex = "0.4.3"
serde_bytes = "0.11.5"
serde_with = "1.11.0"

typed-store = { git = "https://github.com/MystenLabs/mysten-infra", rev = "2e829074f40c9ef852ecd6808b80d083174c778b" }

move-binary-format = { git = "https://github.com/diem/move", rev="d253bf1c21314679d17e6935116f30baf409f264" }
move-core-types = { git = "https://github.com/diem/move", rev="d253bf1c21314679d17e6935116f30baf409f264" }
2 changes: 1 addition & 1 deletion fastx_types/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ pub enum FastPayError {
#[error("Execution invariant violated")]
ExecutionInvariantViolation,
#[error("Storage error")]
StorageError,
StorageError(#[from] typed_store::rocks::TypedStoreError),
}

pub type FastPayResult<T = ()> = Result<T, FastPayError>;
Expand Down

0 comments on commit 8f9491f

Please sign in to comment.