Skip to content

Commit

Permalink
Clean up wallet syncing code (#2108)
Browse files Browse the repository at this point in the history
* const fn

* sync options

* move instead of clone

* combinators

* rm unnecessary cloning 1

* rm unnecessary cloning 2

* slices; impl Into<Option>

* refactor

* get rid of output_data

* nits

* clippy

* nit

* nest

* nit

* update python binding

* update nodejs binding

* update core binding

* rm output_data.py

* nit

* suggestion

Co-authored-by: Thoralf-M <[email protected]>

* rename local var

* fix tests and clippy

* ci-doc

* rm HashSet piping of foundry output ids

* rm TODO

* fix nodejs how-to

* remove todo

* undo rename

* rust: cleanup

* python: cleanup

* nodejs: cleanup

* rust: more cleanup

* ...

* ....

* .....

---------

Co-authored-by: Thoralf-M <[email protected]>
Co-authored-by: DaughterOfMars <[email protected]>
  • Loading branch information
3 people authored Apr 30, 2024
1 parent 19294a7 commit 3e44c80
Show file tree
Hide file tree
Showing 16 changed files with 259 additions and 232 deletions.
4 changes: 2 additions & 2 deletions bindings/core/src/method/wallet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,8 @@ pub enum WalletMethod {
/// Expected response: [`Transaction`](crate::Response::Transaction)
#[serde(rename_all = "camelCase")]
GetIncomingTransaction { transaction_id: TransactionId },
/// Get the [`OutputData`](iota_sdk::wallet::types::OutputData) of an output stored in the wallet.
/// Expected response: [`OutputData`](crate::Response::OutputData)
/// Get the [`OutputData`](iota_sdk::wallet::types::OutputData) of an output stored
/// in the wallet. Expected response: [`OutputData`](crate::Response::OutputData)
#[serde(rename_all = "camelCase")]
GetOutput { output_id: OutputId },
// /// Expected response: [`ParticipationEvent`](crate::Response::ParticipationEvent)
Expand Down
12 changes: 5 additions & 7 deletions bindings/nodejs/examples/how_tos/wallet/consolidate-outputs.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright 2023 IOTA Stiftung
// SPDX-License-Identifier: Apache-2.0

import { CommonOutput, Utils, Wallet, initLogger } from '@iota/sdk';
import { CommonOutput, Wallet, initLogger } from '@iota/sdk';

// This example uses secrets in environment variables for simplicity which should not be done in production.
require('dotenv').config({ path: '.env' });
Expand Down Expand Up @@ -46,11 +46,10 @@ async function run() {
const outputs = await wallet.unspentOutputs();
console.log('Outputs BEFORE consolidation:');

outputs.forEach(({ output, address }, i) => {
outputs.forEach(({ output }, i) => {
console.log(`OUTPUT #${i}`);
console.log(
'- address: %s\n- amount: %d\n- native token: %s',
Utils.addressToBech32(address, 'rms'),
'- amount: %d\n- native token: %s',
output.getAmount(),
output instanceof CommonOutput
? (output as CommonOutput).getNativeToken() ?? []
Expand Down Expand Up @@ -80,11 +79,10 @@ async function run() {

// Outputs after consolidation
console.log('Outputs AFTER consolidation:');
outputs.forEach(({ output, address }, i) => {
outputs.forEach(({ output }, i) => {
console.log(`OUTPUT #${i}`);
console.log(
'- address: %s\n- amount: %d\n- native tokens: %s',
Utils.addressToBech32(address, 'rms'),
'- amount: %d\n- native tokens: %s',
output.getAmount(),
output instanceof CommonOutput
? (output as CommonOutput).getNativeToken()
Expand Down
28 changes: 11 additions & 17 deletions bindings/nodejs/lib/types/wallet/output.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,8 @@
// SPDX-License-Identifier: Apache-2.0

import { Type } from 'class-transformer';
import { Address, AddressDiscriminator } from '../block/address';
import { Output, OutputDiscriminator, OutputId } from '../block/output';
import { OutputMetadataResponse } from '../models/api';
import { OutputIdProof, OutputMetadataResponse } from '../models/api';

/** Output to claim */
export enum OutputsToClaim {
Expand All @@ -15,28 +14,23 @@ export enum OutputsToClaim {
All = 'All',
}

/** An output with metadata */
/** An output with additional data */
export class OutputData {
/** The identifier of an Output */
outputId!: OutputId;
/** The metadata of the output */
metadata!: OutputMetadataResponse;
/** The actual Output */
/** The output itself */
@Type(() => Output, {
discriminator: OutputDiscriminator,
})
output!: Output;
/** Associated account address */
@Type(() => Address, {
discriminator: AddressDiscriminator,
})
address!: Address;
/** Network ID */
/** The metadata of the output */
metadata!: OutputMetadataResponse;
/** The output ID proof */
OutputIdProof!: OutputIdProof;
/** The corresponding output ID */
outputId!: OutputId;
/** The network ID the output belongs to */
networkId!: string;
/** Remainder */
/** Whether the output represents a remainder amount */
remainder!: boolean;
/** BIP32 path */
chain?: Segment[];
}

/** A Segment of the BIP32 path*/
Expand Down
12 changes: 6 additions & 6 deletions bindings/python/iota_sdk/types/output_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,19 @@
@json
@dataclass
class OutputData:
"""Output data.
"""An output with additional data.
Attributes:
output_id: With the output data corresponding output ID.
metadata: With the output corresponding metadata.
output: The output object itself.
output: The output itself.
metadata: The metadata of the output.
output_id_proof: The output ID proof.
output_id: The corresponding output ID.
network_id: The network ID the output belongs to.
remainder: Whether the output represents a remainder amount.
"""
output_id: OutputId
metadata: OutputMetadata
output: Output
metadata: OutputMetadata
output_id_proof: OutputIdProof
output_id: OutputId
network_id: str
remainder: bool
2 changes: 1 addition & 1 deletion bindings/python/iota_sdk/wallet/wallet.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@
from iota_sdk.types.client_options import ClientOptions
from iota_sdk.types.filter_options import FilterOptions
from iota_sdk.types.native_token import NativeToken
from iota_sdk.types.output import BasicOutput, NftOutput, Output, deserialize_output
from iota_sdk.types.output_data import OutputData
from iota_sdk.types.output_id import OutputId
from iota_sdk.types.output import BasicOutput, NftOutput, Output, deserialize_output
from iota_sdk.types.output_params import OutputParams
from iota_sdk.types.transaction_data import PreparedTransactionData, SignedTransactionData
from iota_sdk.types.transaction_id import TransactionId
Expand Down
8 changes: 4 additions & 4 deletions cli/src/wallet_cli/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1765,14 +1765,14 @@ pub async fn prompt_internal(
Ok(PromptResponse::Reprompt)
}

fn print_outputs(mut outputs: Vec<OutputData>, title: &str) -> Result<(), Error> {
if outputs.is_empty() {
fn print_outputs(mut outputs_data: Vec<OutputData>, title: &str) -> Result<(), Error> {
if outputs_data.is_empty() {
println_log_info!("No outputs found");
} else {
println_log_info!("{title}");
outputs.sort_unstable_by_key(|o| o.output_id);
outputs_data.sort_unstable_by_key(|o| o.output_id);

for (i, output_data) in outputs.into_iter().enumerate() {
for (i, output_data) in outputs_data.into_iter().enumerate() {
let kind_str = if output_data.output.is_implicit_account() {
"ImplicitAccount"
} else {
Expand Down
3 changes: 2 additions & 1 deletion sdk/src/wallet/core/operations/background_syncing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,12 @@ where
/// Start the background syncing process for the wallet, default interval is 7 seconds
pub async fn start_background_syncing(
&self,
options: Option<SyncOptions>,
options: impl Into<Option<SyncOptions>> + Send,
interval: Option<Duration>,
) -> Result<(), WalletError> {
log::debug!("[start_background_syncing]");

let options = options.into();
let (tx_background_sync, mut rx_background_sync) = self.background_syncing_status.clone();

// stop existing process if running
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ impl<S: 'static + SecretManage> Wallet<S> {
) -> Result<Vec<OutputId>, WalletError> {
log::debug!("[SYNC] get_foundry_output_ids");
// Get account outputs, so we can then get the foundry outputs with the account addresses
let account_outputs_with_meta = self.get_outputs(account_output_ids.to_vec()).await?;
let account_outputs_with_meta = self.get_outputs_request_unknown(account_output_ids).await?;

let bech32_hrp = self.client().get_bech32_hrp().await?;

Expand All @@ -77,14 +77,9 @@ impl<S: 'static + SecretManage> Wallet<S> {
}
}

let mut output_ids = HashSet::new();
let results: Vec<Result<OutputIdsResponse, WalletError>> = futures::future::try_join_all(tasks).await?;
let responses: Vec<OutputIdsResponse> = results.into_iter().collect::<Result<Vec<_>, _>>()?;

for res in results {
let foundry_output_ids = res?;
output_ids.extend(foundry_output_ids.items);
}

Ok(output_ids.into_iter().collect())
Ok(responses.into_iter().flat_map(|res| res.items).collect())
}
}
50 changes: 25 additions & 25 deletions sdk/src/wallet/operations/syncing/addresses/output_ids/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@ use crate::{
},
types::block::{address::Bech32Address, output::OutputId},
wallet::{
constants::PARALLEL_REQUESTS_AMOUNT, operations::syncing::SyncOptions,
types::address::AddressWithUnspentOutputs, Wallet, WalletError,
constants::PARALLEL_REQUESTS_AMOUNT,
operations::syncing::SyncOptions,
types::address::{AddressWithUnspentOutputIds, SpentOutputId},
Wallet, WalletError,
},
};

Expand Down Expand Up @@ -217,11 +219,8 @@ impl<S: 'static + SecretManage> Wallet<S> {
let results = futures::future::try_join_all(tasks).await?;

// Get all results
let mut output_ids = HashSet::new();
for res in results {
let found_output_ids = res?;
output_ids.extend(found_output_ids);
}
let output_ids = results.into_iter().collect::<Result<Vec<_>, _>>()?;
let output_ids: HashSet<OutputId> = HashSet::from_iter(output_ids.into_iter().flat_map(|v| v.into_iter()));

Ok(output_ids.into_iter().collect())
}
Expand All @@ -230,20 +229,20 @@ impl<S: 'static + SecretManage> Wallet<S> {
/// return spent outputs separated
pub(crate) async fn get_output_ids_for_addresses(
&self,
addresses_with_unspent_outputs: Vec<AddressWithUnspentOutputs>,
addresses: &[AddressWithUnspentOutputIds],
options: &SyncOptions,
) -> Result<(Vec<AddressWithUnspentOutputs>, Vec<OutputId>), WalletError> {
) -> Result<(Vec<AddressWithUnspentOutputIds>, Vec<SpentOutputId>), WalletError> {
log::debug!("[SYNC] start get_output_ids_for_addresses");
let address_output_ids_start_time = Instant::now();

let mut addresses_with_outputs = Vec::new();
let mut addresses_with_unspent_outputs = Vec::new();
// spent outputs or account/nft/foundries that don't get synced anymore, because of other sync options
let mut spent_or_not_anymore_synced_outputs = Vec::new();
let mut spent_or_ignored_outputs = Vec::new();

// We split the addresses into chunks so we don't get timeouts if we have thousands
for addresses_chunk in &mut addresses_with_unspent_outputs
for addresses_chunk in addresses
.chunks(PARALLEL_REQUESTS_AMOUNT)
.map(|x: &[AddressWithUnspentOutputs]| x.to_vec())
.map(|x: &[AddressWithUnspentOutputIds]| x.to_vec())
{
let results: Vec<Result<_, WalletError>>;
#[cfg(target_family = "wasm")]
Expand Down Expand Up @@ -276,35 +275,36 @@ impl<S: 'static + SecretManage> Wallet<S> {
results = futures::future::try_join_all(tasks).await?;
}

for res in results {
let (mut address, output_ids): (AddressWithUnspentOutputs, Vec<OutputId>) = res?;
let addresses_with_new_unspent_output_ids = results.into_iter().collect::<Result<Vec<_>, _>>()?;

for (mut address, new_unspent_output_ids) in addresses_with_new_unspent_output_ids {
// only return addresses with outputs
if !output_ids.is_empty() {
if !new_unspent_output_ids.is_empty() {
// outputs we had before, but now not anymore, got spent or are account/nft/foundries that don't
// get synced anymore because of other sync options
for output_id in address.output_ids {
if !output_ids.contains(&output_id) {
spent_or_not_anymore_synced_outputs.push(output_id);
for output_id in address.unspent_output_ids {
if !new_unspent_output_ids.contains(&output_id) {
spent_or_ignored_outputs.push(output_id);
}
}
address.output_ids = output_ids;
addresses_with_outputs.push(address);
address.unspent_output_ids = new_unspent_output_ids;
addresses_with_unspent_outputs.push(address);
} else {
// outputs we had before, but now not anymore, got spent or are account/nft/foundries that don't
// get synced anymore because of other sync options
spent_or_not_anymore_synced_outputs.extend(address.output_ids);
spent_or_ignored_outputs.extend(address.unspent_output_ids);
}
}
}

log::debug!(
"[SYNC] spent or not anymore synced account/nft/foundries outputs: {:?}",
spent_or_not_anymore_synced_outputs
"[SYNC] spent or ignored account/nft/foundries outputs: {:?}",
spent_or_ignored_outputs
);
log::debug!(
"[SYNC] finished get_output_ids_for_addresses in {:.2?}",
address_output_ids_start_time.elapsed()
);
Ok((addresses_with_outputs, spent_or_not_anymore_synced_outputs))
Ok((addresses_with_unspent_outputs, spent_or_ignored_outputs))
}
}
33 changes: 19 additions & 14 deletions sdk/src/wallet/operations/syncing/addresses/outputs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,47 +8,52 @@ use crate::{
wallet::{
constants::PARALLEL_REQUESTS_AMOUNT,
task,
types::{address::AddressWithUnspentOutputs, OutputData},
types::address::{AddressWithUnspentOutputIds, AddressWithUnspentOutputs},
Wallet, WalletError,
},
};

impl<S: 'static + SecretManage> Wallet<S> {
/// Get outputs from addresses
/// Get unspent outputs from addresses
pub(crate) async fn get_outputs_from_address_output_ids(
&self,
addresses_with_unspent_outputs: Vec<AddressWithUnspentOutputs>,
) -> Result<Vec<(AddressWithUnspentOutputs, Vec<OutputData>)>, WalletError> {
addresses_with_unspent_output_ids: &[AddressWithUnspentOutputIds],
) -> Result<Vec<AddressWithUnspentOutputs>, WalletError> {
log::debug!("[SYNC] start get_outputs_from_address_output_ids");
let address_outputs_start_time = Instant::now();

let network_id = self.client().get_network_id().await?;

let mut addresses_with_outputs = Vec::new();

// We split the addresses into chunks so we don't get timeouts if we have thousands
for addresses_chunk in &mut addresses_with_unspent_outputs
for addresses_chunk in addresses_with_unspent_output_ids
.chunks(PARALLEL_REQUESTS_AMOUNT)
.map(|x: &[AddressWithUnspentOutputs]| x.to_vec())
.map(|x: &[AddressWithUnspentOutputIds]| x.to_vec())
{
let mut tasks = Vec::new();
for address_with_unspent_outputs in addresses_chunk {
for address_with_unspent_output_ids in addresses_chunk {
let wallet = self.clone();
tasks.push(async move {
task::spawn(async move {
let unspent_outputs_with_metadata = wallet
.get_outputs(address_with_unspent_outputs.output_ids.clone())
.get_outputs_request_unknown(address_with_unspent_output_ids.unspent_output_ids())
.await?;
let unspent_outputs_data = wallet
.output_response_to_output_data(unspent_outputs_with_metadata)
let unspent_outputs = wallet
.output_response_to_output_data(unspent_outputs_with_metadata, network_id)
.await?;
Ok((address_with_unspent_outputs, unspent_outputs_data))

Ok(AddressWithUnspentOutputs {
address_with_unspent_output_ids,
unspent_outputs,
})
})
.await
});
}
let results: Vec<Result<_, WalletError>> = futures::future::try_join_all(tasks).await?;
for res in results {
addresses_with_outputs.push(res?);
}
let result = results.into_iter().collect::<Result<Vec<_>, _>>()?;
addresses_with_outputs.extend(result.into_iter());
}
log::debug!(
"[SYNC] finished get_outputs_from_address_output_ids in {:.2?}",
Expand Down
Loading

0 comments on commit 3e44c80

Please sign in to comment.