diff --git a/Cargo.lock b/Cargo.lock index f34e336832..ee55b796ea 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2331,7 +2331,7 @@ dependencies = [ [[package]] name = "oasis-runtime-sdk" -version = "0.9.1" +version = "0.9.5" dependencies = [ "anyhow", "async-trait", diff --git a/contract-sdk/specs/access/oas173/Cargo.lock b/contract-sdk/specs/access/oas173/Cargo.lock index b1a4a5cabc..5e32e4d146 100644 --- a/contract-sdk/specs/access/oas173/Cargo.lock +++ b/contract-sdk/specs/access/oas173/Cargo.lock @@ -1794,7 +1794,7 @@ dependencies = [ [[package]] name = "oasis-runtime-sdk" -version = "0.9.1" +version = "0.9.5" dependencies = [ "anyhow", "async-trait", diff --git a/contract-sdk/specs/token/oas20/Cargo.lock b/contract-sdk/specs/token/oas20/Cargo.lock index f138d40fdb..0887fed94f 100644 --- a/contract-sdk/specs/token/oas20/Cargo.lock +++ b/contract-sdk/specs/token/oas20/Cargo.lock @@ -1795,7 +1795,7 @@ dependencies = [ [[package]] name = "oasis-runtime-sdk" -version = "0.9.1" +version = "0.9.5" dependencies = [ "anyhow", "async-trait", diff --git a/runtime-sdk/Cargo.toml b/runtime-sdk/Cargo.toml index b2ab422a37..705410faaf 100644 --- a/runtime-sdk/Cargo.toml +++ b/runtime-sdk/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "oasis-runtime-sdk" -version = "0.9.1" +version = "0.9.5" authors = ["Oasis Protocol Foundation "] edition = "2021" license = "Apache-2.0" @@ -38,7 +38,7 @@ rand = "0.8.5" rand_core = { version = "0.6.4", default-features = false } slog = "2.7.0" tiny-keccak = { version = "2.0", features = ["tuple_hash"] } -tokio = { version = "1.38", features = ["rt"] } +tokio = { version = "1.38", features = ["rt", "rt-multi-thread", "sync", "time", "macros"] } tokio-retry = "0.3.0" zeroize = "1.8" lru = "0.12.3" diff --git a/runtime-sdk/src/crypto/signature/mod.rs b/runtime-sdk/src/crypto/signature/mod.rs index cec4a19057..9bbfdc6a0f 100644 --- a/runtime-sdk/src/crypto/signature/mod.rs +++ b/runtime-sdk/src/crypto/signature/mod.rs @@ -406,6 +406,45 @@ pub trait Signer: Send + Sync { fn sign_raw(&self, message: &[u8]) -> Result; } +impl Signer for std::sync::Arc { + fn random(_rng: &mut impl RngCore) -> Result + where + Self: Sized, + { + Err(Error::InvalidArgument) + } + + fn new_from_seed(_seed: &[u8]) -> Result + where + Self: Sized, + { + Err(Error::InvalidArgument) + } + + fn from_bytes(_bytes: &[u8]) -> Result + where + Self: Sized, + { + Err(Error::InvalidArgument) + } + + fn to_bytes(&self) -> Vec { + T::to_bytes(self) + } + + fn public_key(&self) -> PublicKey { + T::public_key(self) + } + + fn sign(&self, context: &[u8], message: &[u8]) -> Result { + T::sign(self, context, message) + } + + fn sign_raw(&self, message: &[u8]) -> Result { + T::sign_raw(self, message) + } +} + impl Signer for &T { fn random(_rng: &mut impl RngCore) -> Result where @@ -446,6 +485,46 @@ impl Signer for &T { } } +impl Signer for crate::core::identity::Identity { + fn random(_rng: &mut impl RngCore) -> Result + where + Self: Sized, + { + Err(Error::InvalidArgument) + } + + fn new_from_seed(_seed: &[u8]) -> Result + where + Self: Sized, + { + Err(Error::InvalidArgument) + } + + fn from_bytes(_bytes: &[u8]) -> Result + where + Self: Sized, + { + Err(Error::InvalidArgument) + } + + fn to_bytes(&self) -> Vec { + vec![] + } + + fn public_key(&self) -> PublicKey { + PublicKey::Ed25519(self.public().into()) + } + + fn sign(&self, context: &[u8], message: &[u8]) -> Result { + let raw_sig = CoreSigner::sign(self, context, message).map_err(|_| Error::SigningError)?; + Ok(Signature(raw_sig.as_ref().into())) + } + + fn sign_raw(&self, _message: &[u8]) -> Result { + Err(Error::InvalidArgument) + } +} + /// A memory-backed signer. pub enum MemorySigner { Ed25519(ed25519::MemorySigner), diff --git a/runtime-sdk/src/modules/rofl/app/client.rs b/runtime-sdk/src/modules/rofl/app/client.rs index 199842f440..d4111e4ac5 100644 --- a/runtime-sdk/src/modules/rofl/app/client.rs +++ b/runtime-sdk/src/modules/rofl/app/client.rs @@ -1,6 +1,10 @@ use std::{ collections::{BTreeMap, HashSet}, - sync::Arc, + sync::{ + atomic::{AtomicU64, Ordering}, + Arc, + }, + time::Duration, }; use anyhow::{anyhow, Result}; @@ -31,13 +35,32 @@ use crate::{ use super::{processor, App}; +/// Size of various command queues. +const CMDQ_BACKLOG: usize = 16; + /// EnclaveRPC endpoint for communicating with the RONL component. const ENCLAVE_RPC_ENDPOINT_RONL: &str = "ronl"; +/// Transaction submission options. +#[derive(Clone, Debug)] +pub struct SubmitTxOpts { + /// Optional timeout when submitting a transaction. Setting this to `None` means that the host + /// node timeout will be used. + pub timeout: Option, +} + +impl Default for SubmitTxOpts { + fn default() -> Self { + Self { + timeout: Some(Duration::from_millis(15_000)), // 15 seconds. + } + } +} + /// A runtime client meant for use within runtimes. pub struct Client { - state: Arc>, - cmdq: mpsc::WeakSender, + imp: ClientImpl, + submission_mgr: Arc>, } impl Client @@ -49,28 +72,148 @@ where state: Arc>, cmdq: mpsc::WeakSender, ) -> Self { - Self { state, cmdq } + let imp = ClientImpl::new(state, cmdq); + let mut submission_mgr = SubmissionManager::new(imp.clone()); + submission_mgr.start(); + + Self { + imp, + submission_mgr: Arc::new(submission_mgr), + } } /// Retrieve the latest known runtime round. pub async fn latest_round(&self) -> Result { + self.imp.latest_round().await + } + + /// Retrieve the nonce for the given account. + pub async fn account_nonce(&self, round: u64, address: Address) -> Result { + self.imp.account_nonce(round, address).await + } + + /// Retrieve the gas price in the given denomination. + pub async fn gas_price(&self, round: u64, denom: &token::Denomination) -> Result { + self.imp.gas_price(round, denom).await + } + + /// Securely query the on-chain runtime component. + pub async fn query(&self, round: u64, method: &str, args: Rq) -> Result + where + Rq: cbor::Encode, + Rs: cbor::Decode + Send + 'static, + { + self.imp.query(round, method, args).await + } + + /// Securely perform gas estimation. + pub async fn estimate_gas(&self, req: EstimateGasQuery) -> Result { + self.imp.estimate_gas(req).await + } + + /// Sign a given transaction, submit it and wait for block inclusion. + /// + /// This method supports multiple transaction signers. + pub async fn multi_sign_and_submit_tx( + &self, + signers: &[Arc], + tx: transaction::Transaction, + ) -> Result { + self.multi_sign_and_submit_tx_opts(signers, tx, SubmitTxOpts::default()) + .await + } + + /// Sign a given transaction, submit it and wait for block inclusion. + /// + /// This method supports multiple transaction signers. + pub async fn multi_sign_and_submit_tx_opts( + &self, + signers: &[Arc], + tx: transaction::Transaction, + opts: SubmitTxOpts, + ) -> Result { + self.submission_mgr + .multi_sign_and_submit_tx(signers, tx, opts) + .await + } + + /// Sign a given transaction, submit it and wait for block inclusion. + pub async fn sign_and_submit_tx( + &self, + signer: Arc, + tx: transaction::Transaction, + ) -> Result { + self.multi_sign_and_submit_tx(&[signer], tx).await + } + + /// Run a closure inside a `CurrentState` context with store for the given round. + pub async fn with_store_for_round(&self, round: u64, f: F) -> Result + where + F: FnOnce() -> Result + Send + 'static, + R: Send + 'static, + { + self.imp.with_store_for_round(round, f).await + } + + /// Return a store corresponding to the given round. + pub async fn store_for_round(&self, round: u64) -> Result { + self.imp.store_for_round(round).await + } +} + +impl Clone for Client +where + A: App, +{ + fn clone(&self) -> Self { + Self { + imp: self.imp.clone(), + submission_mgr: self.submission_mgr.clone(), + } + } +} + +struct ClientImpl { + state: Arc>, + cmdq: mpsc::WeakSender, + latest_round: Arc, +} + +impl ClientImpl +where + A: App, +{ + fn new(state: Arc>, cmdq: mpsc::WeakSender) -> Self { + Self { + state, + cmdq, + latest_round: Arc::new(AtomicU64::new(0)), + } + } + + /// Retrieve the latest known runtime round. + async fn latest_round(&self) -> Result { let cmdq = self .cmdq .upgrade() .ok_or(anyhow!("processor has shut down"))?; let (tx, rx) = oneshot::channel(); cmdq.send(processor::Command::GetLatestRound(tx)).await?; - Ok(rx.await?) + let round = rx.await?; + Ok(self + .latest_round + .fetch_max(round, Ordering::SeqCst) + .max(round)) } /// Retrieve the nonce for the given account. - pub async fn account_nonce(&self, round: u64, address: Address) -> Result { + async fn account_nonce(&self, round: u64, address: Address) -> Result { self.query(round, "accounts.Nonce", NonceQuery { address }) .await } /// Retrieve the gas price in the given denomination. - pub async fn gas_price(&self, round: u64, denom: &token::Denomination) -> Result { + async fn gas_price(&self, round: u64, denom: &token::Denomination) -> Result { let mgp: BTreeMap = self.query(round, "core.MinGasPrice", ()).await?; mgp.get(denom) @@ -79,7 +222,7 @@ where } /// Securely query the on-chain runtime component. - pub async fn query(&self, round: u64, method: &str, args: Rq) -> Result + async fn query(&self, round: u64, method: &str, args: Rq) -> Result where Rq: cbor::Encode, Rs: cbor::Decode + Send + 'static, @@ -143,53 +286,213 @@ where } /// Securely perform gas estimation. - pub async fn estimate_gas(&self, req: EstimateGasQuery) -> Result { + async fn estimate_gas(&self, req: EstimateGasQuery) -> Result { let round = self.latest_round().await?; self.query(round, "core.EstimateGas", req).await } - /// Sign a given transaction and submit it. - /// - /// This method supports multiple transaction signers. - pub async fn multi_sign_and_submit_tx( + /// Run a closure inside a `CurrentState` context with store for the given round. + async fn with_store_for_round(&self, round: u64, f: F) -> Result + where + F: FnOnce() -> Result + Send + 'static, + R: Send + 'static, + { + let store = self.store_for_round(round).await?; + + tokio::task::spawn_blocking(move || CurrentState::enter(store, f)).await? + } + + /// Return a store corresponding to the given round. + async fn store_for_round(&self, round: u64) -> Result { + HostStore::new_for_round( + self.state.host.clone(), + &self.state.consensus_verifier, + self.state.host.get_runtime_id(), + round, + ) + .await + } +} + +impl Clone for ClientImpl +where + A: App, +{ + fn clone(&self) -> Self { + Self { + state: self.state.clone(), + cmdq: self.cmdq.clone(), + latest_round: self.latest_round.clone(), + } + } +} + +enum Cmd { + SubmitTx( + Vec>, + transaction::Transaction, + SubmitTxOpts, + oneshot::Sender>, + ), +} + +/// Transaction submission manager for avoiding nonce conflicts. +struct SubmissionManager { + imp: Option>, + cmdq_tx: mpsc::Sender, +} + +impl SubmissionManager +where + A: App, +{ + /// Create a new submission manager. + fn new(client: ClientImpl) -> Self { + let (tx, rx) = mpsc::channel(CMDQ_BACKLOG); + + Self { + imp: Some(SubmissionManagerImpl { + client, + cmdq_rx: rx, + }), + cmdq_tx: tx, + } + } + + /// Start the submission manager task. + fn start(&mut self) { + if let Some(imp) = self.imp.take() { + imp.start(); + } + } + + /// Sign a given transaction, submit it and wait for block inclusion. + async fn multi_sign_and_submit_tx( &self, - signers: &[&dyn Signer], + signers: &[Arc], + tx: transaction::Transaction, + opts: SubmitTxOpts, + ) -> Result { + let (ch, rx) = oneshot::channel(); + self.cmdq_tx + .send(Cmd::SubmitTx(signers.to_vec(), tx, opts, ch)) + .await?; + rx.await? + } +} + +struct SubmissionManagerImpl { + client: ClientImpl, + cmdq_rx: mpsc::Receiver, +} + +impl SubmissionManagerImpl +where + A: App, +{ + /// Start the submission manager task. + fn start(self) { + tokio::task::spawn(self.run()); + } + + /// Run the submission manager task. + async fn run(mut self) { + let (notify_tx, mut notify_rx) = mpsc::channel::>(CMDQ_BACKLOG); + let mut queue: Vec = Vec::new(); + let mut pending: HashSet = HashSet::new(); + + loop { + tokio::select! { + // Process incoming commands. + Some(cmd) = self.cmdq_rx.recv() => queue.push(cmd), + + // Process incoming completion notifications. + Some(signers) = notify_rx.recv() => { + for pk in signers { + pending.remove(&pk); + } + }, + + else => break, + } + + // Check if there is anything in the queue that can be executed without conflicts. + let mut new_queue = Vec::with_capacity(queue.len()); + for cmd in queue { + match cmd { + Cmd::SubmitTx(signers, tx, opts, ch) => { + // Check if transaction can be executed (no conflicts with in-flight txs). + let signer_set = + HashSet::from_iter(signers.iter().map(|signer| signer.public_key())); + if !signer_set.is_disjoint(&pending) { + // Defer any non-executable commands. + new_queue.push(Cmd::SubmitTx(signers, tx, opts, ch)); + continue; + } + // Include all signers in the pending set. + pending.extend(signer_set.iter().cloned()); + + // Execute in a separate task. + let client = self.client.clone(); + let notify_tx = notify_tx.clone(); + + tokio::spawn(async move { + let result = + Self::multi_sign_and_submit_tx(client, &signers, tx, opts).await; + let _ = ch.send(result); + + // Notify the submission manager task that submission is done. + let _ = notify_tx.send(signer_set).await; + }); + } + } + } + queue = new_queue; + } + } + + /// Sign a given transaction, submit it and wait for block inclusion. + async fn multi_sign_and_submit_tx( + client: ClientImpl, + signers: &[Arc], mut tx: transaction::Transaction, + opts: SubmitTxOpts, ) -> Result { if signers.is_empty() { return Err(anyhow!("no signers specified")); } - let round = self.latest_round().await?; + // Resolve signer addresses. + let addresses = signers + .iter() + .map(|signer| -> Result<_> { + let sigspec = SignatureAddressSpec::try_from_pk(&signer.public_key()) + .ok_or(anyhow!("signature scheme not supported"))?; + Ok((Address::from_sigspec(&sigspec), sigspec)) + }) + .collect::>>()?; + + let round = client.latest_round().await?; // Resolve account nonces. - let mut first_signer_address = Default::default(); - for (idx, signer) in signers.iter().enumerate() { - let sigspec = SignatureAddressSpec::try_from_pk(&signer.public_key()) - .ok_or(anyhow!("signature scheme not supported"))?; - let address = Address::from_sigspec(&sigspec); - let nonce = self.account_nonce(round, address).await?; - - tx.append_auth_signature(sigspec, nonce); - - // Store first signer address for gas estimation to avoid rederivation. - if idx == 0 { - first_signer_address = address; - } + for (address, sigspec) in &addresses { + let nonce = client.account_nonce(round, *address).await?; + + tx.append_auth_signature(sigspec.clone(), nonce); } // Perform gas estimation after all signer infos have been added as otherwise we may // underestimate the amount of gas needed. if tx.fee_gas() == 0 { let signer = &signers[0]; // Checked to have at least one signer above. - let gas = self + let gas = client .estimate_gas(EstimateGasQuery { caller: if let PublicKey::Secp256k1(pk) = signer.public_key() { Some(CallerAddress::EthAddress( pk.to_eth_address().try_into().unwrap(), )) } else { - Some(CallerAddress::Address(first_signer_address)) + Some(CallerAddress::Address(addresses[0].0)) // Checked above. }, tx: tx.clone(), propagate_failures: false, @@ -204,73 +507,39 @@ where } // Determine gas price. Currently we always use the native denomination. - let mgp = self.gas_price(round, &token::Denomination::NATIVE).await?; + let mgp = client + .gas_price(round, &token::Denomination::NATIVE) + .await?; let fee = mgp.saturating_mul(tx.fee_gas().into()); tx.set_fee_amount(token::BaseUnits::new(fee, token::Denomination::NATIVE)); // Sign the transaction. let mut tx = tx.prepare_for_signing(); for signer in signers { - tx.append_sign(*signer)?; + tx.append_sign(signer)?; } let tx = tx.finalize(); // Submit the transaction. - let result = self - .state - .host - .submit_tx( - cbor::to_vec(tx), - host::SubmitTxOpts { - wait: true, - ..Default::default() - }, - ) - .await? - .ok_or(anyhow!("missing result"))?; - cbor::from_slice(&result.output).map_err(|_| anyhow!("malformed result")) - } - - /// Sign a given transaction and submit it. - pub async fn sign_and_submit_tx( - &self, - signer: &dyn Signer, - tx: transaction::Transaction, - ) -> Result { - self.multi_sign_and_submit_tx(&[signer], tx).await - } - - /// Run a closure inside a `CurrentState` context with store for the given round. - pub async fn with_store_for_round(&self, round: u64, f: F) -> Result - where - F: FnOnce() -> Result + Send + 'static, - R: Send + 'static, - { - let store = self.store_for_round(round).await?; - - tokio::task::spawn_blocking(move || CurrentState::enter(store, f)).await? - } + let submit_tx_task = client.state.host.submit_tx( + cbor::to_vec(tx), + host::SubmitTxOpts { + wait: true, + ..Default::default() + }, + ); + let result = if let Some(timeout) = opts.timeout { + tokio::time::timeout(timeout, submit_tx_task).await? + } else { + submit_tx_task.await + }; + let result = result?.ok_or(anyhow!("missing result"))?; - /// Return a store corresponding to the given round. - pub async fn store_for_round(&self, round: u64) -> Result { - HostStore::new_for_round( - self.state.host.clone(), - &self.state.consensus_verifier, - self.state.host.get_runtime_id(), - round, - ) - .await - } -} + // Update latest known round. + client + .latest_round + .fetch_max(result.round, Ordering::SeqCst); -impl Clone for Client -where - A: App, -{ - fn clone(&self) -> Self { - Self { - state: self.state.clone(), - cmdq: self.cmdq.clone(), - } + cbor::from_slice(&result.output).map_err(|_| anyhow!("malformed result")) } } diff --git a/runtime-sdk/src/modules/rofl/app/env.rs b/runtime-sdk/src/modules/rofl/app/env.rs index 48554e818e..7ab6e5267c 100644 --- a/runtime-sdk/src/modules/rofl/app/env.rs +++ b/runtime-sdk/src/modules/rofl/app/env.rs @@ -36,8 +36,8 @@ where } /// Transaction signer. - pub fn signer(&self) -> &dyn Signer { - self.signer.as_ref() + pub fn signer(&self) -> Arc { + self.signer.clone() } /// Send a command to the processor. diff --git a/runtime-sdk/src/modules/rofl/app/registration.rs b/runtime-sdk/src/modules/rofl/app/registration.rs index ba07c05bd1..64e78ae444 100644 --- a/runtime-sdk/src/modules/rofl/app/registration.rs +++ b/runtime-sdk/src/modules/rofl/app/registration.rs @@ -124,7 +124,7 @@ where let result = self .env .client() - .multi_sign_and_submit_tx(&[&self.state.identity.as_ref(), self.env.signer()], tx) + .multi_sign_and_submit_tx(&[self.state.identity.clone(), self.env.signer()], tx) .await? .ok()?; diff --git a/tests/contracts/hello/Cargo.lock b/tests/contracts/hello/Cargo.lock index 6b81c09044..07ef598fbc 100644 --- a/tests/contracts/hello/Cargo.lock +++ b/tests/contracts/hello/Cargo.lock @@ -1805,7 +1805,7 @@ dependencies = [ [[package]] name = "oasis-runtime-sdk" -version = "0.9.1" +version = "0.9.5" dependencies = [ "anyhow", "async-trait",