Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Switch from async-lock to tokio primitives #3021

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion crates/subspace-farmer-components/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ include = [
bench = false

[dependencies]
async-lock = "3.3.0"
async-trait = "0.1.81"
backoff = { version = "0.4.0", features = ["futures", "tokio"] }
bitvec = "1.0.1"
Expand Down
5 changes: 2 additions & 3 deletions crates/subspace-farmer-components/src/plotting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ use crate::sector::{
};
use crate::segment_reconstruction::recover_missing_piece;
use crate::{FarmerProtocolInfo, PieceGetter};
use async_lock::Mutex as AsyncMutex;
use backoff::future::retry;
use backoff::{Error as BackoffError, ExponentialBackoff};
use futures::stream::FuturesUnordered;
Expand All @@ -34,7 +33,7 @@ use subspace_core_primitives::{
use subspace_erasure_coding::ErasureCoding;
use subspace_proof_of_space::{Table, TableGenerator};
use thiserror::Error;
use tokio::sync::{AcquireError, Semaphore};
use tokio::sync::{AcquireError, Mutex as AsyncMutex, Semaphore};
use tracing::{debug, trace, warn};

const RECONSTRUCTION_CONCURRENCY_LIMIT: usize = 1;
Expand Down Expand Up @@ -397,7 +396,7 @@ where

loop {
// Take mutex briefly to make sure encoding is allowed right now
global_mutex.lock_blocking();
let _ = global_mutex.blocking_lock();

// This instead of `while` above because otherwise mutex will be held
// for the duration of the loop and will limit concurrency to 1 record
Expand Down
3 changes: 1 addition & 2 deletions crates/subspace-farmer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ required-features = ["binary"]

[dependencies]
anyhow = "1.0.86"
async-lock = "3.3.0"
async-nats = { version = "0.35.1", optional = true }
async-trait = "0.1.81"
backoff = { version = "0.4.0", features = ["futures", "tokio"] }
Expand Down Expand Up @@ -65,7 +64,7 @@ supports-color = { version = "3.0.0", optional = true }
tempfile = "3.12.0"
thiserror = "1.0.63"
thread-priority = "1.1.0"
tokio = { version = "1.39.2", features = ["macros", "parking_lot", "rt-multi-thread", "signal", "time"] }
tokio = { version = "1.39.2", features = ["macros", "parking_lot", "rt-multi-thread", "signal", "sync", "time"] }
tokio-stream = { version = "0.1.15", features = ["sync"] }
tracing = "0.1.40"
tracing-subscriber = { version = "0.3.18", features = ["env-filter"], optional = true }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use crate::commands::cluster::controller::farms::{maintain_farms, FarmIndex};
use crate::commands::shared::derive_libp2p_keypair;
use crate::commands::shared::network::{configure_network, NetworkArgs};
use anyhow::anyhow;
use async_lock::RwLock as AsyncRwLock;
use backoff::ExponentialBackoff;
use clap::{Parser, ValueHint};
use futures::stream::FuturesUnordered;
Expand All @@ -31,6 +30,7 @@ use subspace_farmer::node_client::NodeClient;
use subspace_farmer::single_disk_farm::identity::Identity;
use subspace_farmer::utils::{run_future_in_dedicated_thread, AsyncJoinOnDrop};
use subspace_networking::utils::piece_provider::PieceProvider;
use tokio::sync::RwLock as AsyncRwLock;
use tracing::info;

/// Get piece retry attempts number.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

use crate::commands::cluster::farmer::FARMER_IDENTIFICATION_BROADCAST_INTERVAL;
use anyhow::anyhow;
use async_lock::RwLock as AsyncRwLock;
use futures::channel::oneshot;
use futures::future::FusedFuture;
use futures::stream::FuturesUnordered;
Expand All @@ -25,6 +24,7 @@ use subspace_farmer::cluster::farmer::{ClusterFarm, ClusterFarmerIdentifyFarmBro
use subspace_farmer::cluster::nats_client::NatsClient;
use subspace_farmer::farm::plotted_pieces::PlottedPieces;
use subspace_farmer::farm::{Farm, FarmId, SectorPlottingDetails, SectorUpdate};
use tokio::sync::RwLock as AsyncRwLock;
use tokio::task;
use tokio::time::MissedTickBehavior;
use tracing::{error, info, trace, warn};
Expand Down Expand Up @@ -178,7 +178,7 @@ pub(super) async fn maintain_farms(
let plotted_pieces = Arc::clone(plotted_pieces);

let delete_farm_fut = task::spawn_blocking(move || {
plotted_pieces.write_blocking().delete_farm(farm_index);
plotted_pieces.blocking_write().delete_farm(farm_index);
});
if let Err(error) = delete_farm_fut.await {
error!(
Expand Down Expand Up @@ -235,7 +235,7 @@ pub(super) async fn maintain_farms(
let plotted_pieces = Arc::clone(plotted_pieces);

let delete_farm_fut = task::spawn_blocking(move || {
plotted_pieces.write_blocking().delete_farm(farm_index);
plotted_pieces.blocking_write().delete_farm(farm_index);
});
if let Err(error) = delete_farm_fut.await {
error!(
Expand Down Expand Up @@ -322,7 +322,7 @@ fn process_farm_identify_message<'a>(
let plotted_pieces = Arc::clone(plotted_pieces);

let delete_farm_fut = task::spawn_blocking(move || {
plotted_pieces.write_blocking().delete_farm(farm_index);
plotted_pieces.blocking_write().delete_farm(farm_index);
});
if let Err(error) = delete_farm_fut.await {
error!(
Expand Down Expand Up @@ -434,7 +434,7 @@ async fn initialize_farm(
drop(sector_update_handler);
let plotted_sectors_buffer = mem::take(&mut *plotted_sectors_buffer.lock());
let add_buffered_sectors_fut = task::spawn_blocking(move || {
let mut plotted_pieces = plotted_pieces.write_blocking();
let mut plotted_pieces = plotted_pieces.blocking_write();

for (plotted_sector, old_plotted_sector) in plotted_sectors_buffer {
if let Some(old_plotted_sector) = old_plotted_sector {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

use crate::commands::shared::DiskFarm;
use anyhow::anyhow;
use async_lock::Mutex as AsyncMutex;
use backoff::ExponentialBackoff;
use bytesize::ByteSize;
use clap::Parser;
Expand Down Expand Up @@ -35,7 +34,7 @@ use subspace_farmer::utils::{
};
use subspace_farmer_components::reading::ReadSectorRecordChunksMode;
use subspace_proof_of_space::Table;
use tokio::sync::{Barrier, Semaphore};
use tokio::sync::{Barrier, Mutex as AsyncMutex, Semaphore};
use tracing::{error, info, info_span, warn, Instrument};

const FARM_ERROR_PRINT_INTERVAL: Duration = Duration::from_secs(30);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use crate::commands::shared::PlottingThreadPriority;
use anyhow::anyhow;
use async_lock::Mutex as AsyncMutex;
use clap::Parser;
use futures::{select, FutureExt};
use prometheus_client::registry::Registry;
Expand All @@ -27,7 +26,7 @@ use subspace_farmer::utils::{
};
use subspace_farmer_components::PieceGetter;
use subspace_proof_of_space::Table;
use tokio::sync::Semaphore;
use tokio::sync::{Mutex as AsyncMutex, Semaphore};
use tracing::info;

const PLOTTING_RETRY_INTERVAL: Duration = Duration::from_secs(5);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use crate::commands::shared::network::{configure_network, NetworkArgs};
use crate::commands::shared::{derive_libp2p_keypair, DiskFarm, PlottingThreadPriority};
use crate::utils::shutdown_signal;
use anyhow::anyhow;
use async_lock::{Mutex as AsyncMutex, RwLock as AsyncRwLock};
use backoff::ExponentialBackoff;
use bytesize::ByteSize;
use clap::{Parser, ValueHint};
Expand Down Expand Up @@ -51,7 +50,8 @@ use subspace_farmer_components::PieceGetter;
use subspace_metrics::{start_prometheus_metrics_server, RegistryAdapter};
use subspace_networking::utils::piece_provider::PieceProvider;
use subspace_proof_of_space::Table;
use tokio::sync::{Barrier, Semaphore};
use tokio::sync::{Barrier, Mutex as AsyncMutex, RwLock as AsyncRwLock, Semaphore};
use tokio::task;
use tracing::{error, info, info_span, warn, Instrument};

/// Get piece retry attempts number.
Expand Down Expand Up @@ -713,7 +713,8 @@ where
{
let _span_guard = span.enter();

let mut plotted_pieces = plotted_pieces.write_blocking();
let mut plotted_pieces =
task::block_in_place(|| plotted_pieces.blocking_write());

if let Some(old_plotted_sector) = &old_plotted_sector {
plotted_pieces.delete_sector(farm_index, old_plotted_sector);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use async_lock::RwLock as AsyncRwLock;
use clap::Parser;
use prometheus_client::registry::Registry;
use std::collections::HashSet;
Expand All @@ -22,6 +21,7 @@ use subspace_networking::{
SegmentHeaderBySegmentIndexesRequestHandler, SegmentHeaderRequest, SegmentHeaderResponse,
};
use subspace_rpc_primitives::MAX_SEGMENT_HEADERS_PER_REQUEST;
use tokio::sync::RwLock as AsyncRwLock;
use tracing::{debug, error, info, Instrument};

/// How many segment headers can be requested at a time.
Expand Down Expand Up @@ -139,7 +139,8 @@ where

let read_piece_fut = match weak_plotted_pieces.upgrade() {
Some(plotted_pieces) => plotted_pieces
.try_read()?
.try_read()
.ok()?
.read_piece(piece_index)?
.in_current_span(),
None => {
Expand Down
2 changes: 1 addition & 1 deletion crates/subspace-farmer/src/cluster/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ use crate::farm::{PieceCacheId, PieceCacheOffset};
use crate::farmer_cache::FarmerCache;
use crate::node_client::{Error as NodeClientError, NodeClient};
use anyhow::anyhow;
use async_lock::Semaphore;
use async_nats::HeaderValue;
use async_trait::async_trait;
use futures::{select, FutureExt, Stream, StreamExt};
Expand All @@ -29,6 +28,7 @@ use subspace_farmer_components::PieceGetter;
use subspace_rpc_primitives::{
FarmerAppInfo, RewardSignatureResponse, RewardSigningInfo, SlotInfo, SolutionResponse,
};
use tokio::sync::Semaphore;
use tracing::{debug, trace, warn};

/// Broadcast sent by controllers requesting farmers to identify themselves
Expand Down
9 changes: 5 additions & 4 deletions crates/subspace-farmer/src/farmer_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ use crate::farmer_cache::metrics::FarmerCacheMetrics;
use crate::farmer_cache::piece_cache_state::PieceCachesState;
use crate::node_client::NodeClient;
use crate::utils::run_future_in_dedicated_thread;
use async_lock::RwLock as AsyncRwLock;
use event_listener_primitives::{Bag, HandlerId};
use futures::stream::{FuturesOrdered, FuturesUnordered};
use futures::{select, FutureExt, StreamExt};
Expand All @@ -32,7 +31,7 @@ use subspace_networking::libp2p::PeerId;
use subspace_networking::utils::multihash::ToMultihash;
use subspace_networking::{KeyWithDistance, LocalRecordProvider};
use tokio::runtime::Handle;
use tokio::sync::mpsc;
use tokio::sync::{mpsc, RwLock as AsyncRwLock};
use tokio::task::{block_in_place, yield_now};
use tracing::{debug, error, info, trace, warn};

Expand Down Expand Up @@ -1208,7 +1207,8 @@ where
let distance_key = KeyWithDistance::new(self.peer_id, key.clone());
if self
.piece_caches
.try_read()?
.try_read()
.ok()?
.contains_stored_piece(&distance_key)
{
// Note: We store our own provider records locally without local addresses
Expand All @@ -1225,7 +1225,8 @@ where
let found_fut = self
.plot_caches
.caches
.try_read()?
.try_read()
.ok()?
.iter()
.map(|plot_cache| {
let plot_cache = Arc::clone(plot_cache);
Expand Down
13 changes: 7 additions & 6 deletions crates/subspace-farmer/src/farmer_piece_getter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,6 @@
use crate::farm::plotted_pieces::PlottedPieces;
use crate::farmer_cache::FarmerCache;
use crate::node_client::NodeClient;
use async_lock::{
Mutex as AsyncMutex, MutexGuardArc as AsyncMutexGuardArc, RwLock as AsyncRwLock, Semaphore,
};
use async_trait::async_trait;
use backoff::backoff::Backoff;
use backoff::future::retry;
Expand All @@ -22,6 +19,9 @@ use subspace_core_primitives::{Piece, PieceIndex};
use subspace_farmer_components::PieceGetter;
use subspace_networking::utils::multihash::ToMultihash;
use subspace_networking::utils::piece_provider::{PieceProvider, PieceValidator};
use tokio::sync::{
Mutex as AsyncMutex, OwnedMutexGuard as AsyncOwnedMutexGuard, RwLock as AsyncRwLock, Semaphore,
};
use tracing::{debug, error, trace};

pub mod piece_validator;
Expand All @@ -30,7 +30,7 @@ const MAX_RANDOM_WALK_ROUNDS: usize = 15;

struct InProgressPieceGetting<'a> {
piece_index: PieceIndex,
in_progress_piece: AsyncMutexGuardArc<Option<Piece>>,
in_progress_piece: AsyncOwnedMutexGuard<Option<Piece>>,
in_progress_pieces: &'a Mutex<HashMap<PieceIndex, Arc<AsyncMutex<Option<Piece>>>>>,
}

Expand Down Expand Up @@ -63,8 +63,8 @@ impl<'a> InProgressPiece<'a> {
// Take lock before anything else, set to `None` when another piece getting is already in
// progress
let mut local_in_progress_piece_guard = Some(
in_progress_piece_mutex
.try_lock_arc()
Arc::clone(&in_progress_piece_mutex)
.try_lock_owned()
.expect("Just created; qed"),
);
let in_progress_piece_mutex = in_progress_pieces
Expand Down Expand Up @@ -282,6 +282,7 @@ where
let maybe_read_piece_fut = inner
.plotted_pieces
.try_read()
.ok()
.and_then(|plotted_pieces| plotted_pieces.read_piece(piece_index));

if let Some(read_piece_fut) = maybe_read_piece_fut {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

use crate::node_client::{Error as RpcError, Error, NodeClient, NodeClientExt};
use crate::utils::AsyncJoinOnDrop;
use async_lock::{Mutex as AsyncMutex, RwLock as AsyncRwLock};
use async_trait::async_trait;
use futures::{select, FutureExt, Stream, StreamExt};
use std::pin::Pin;
Expand All @@ -14,7 +13,7 @@ use subspace_rpc_primitives::{
FarmerAppInfo, RewardSignatureResponse, RewardSigningInfo, SlotInfo, SolutionResponse,
MAX_SEGMENT_HEADERS_PER_REQUEST,
};
use tokio::sync::watch;
use tokio::sync::{watch, Mutex as AsyncMutex, RwLock as AsyncRwLock};
use tokio_stream::wrappers::WatchStream;
use tracing::{info, trace, warn};

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
//! Node client implementation that connects to node via RPC (WebSockets)

use crate::node_client::{Error as RpcError, Error, NodeClient, NodeClientExt};
use async_lock::Semaphore;
use async_trait::async_trait;
use futures::{Stream, StreamExt};
use jsonrpsee::core::client::{ClientT, Error as JsonError, SubscriptionClientT};
Expand All @@ -13,6 +12,7 @@ use subspace_core_primitives::{Piece, PieceIndex, SegmentHeader, SegmentIndex};
use subspace_rpc_primitives::{
FarmerAppInfo, RewardSignatureResponse, RewardSigningInfo, SlotInfo, SolutionResponse,
};
use tokio::sync::Semaphore;

/// TODO: Node is having a hard time responding for many piece requests, specifically this results
/// in subscriptions become broken on the node: https://github.com/paritytech/jsonrpsee/issues/1409
Expand Down
5 changes: 2 additions & 3 deletions crates/subspace-farmer/src/plotter/cpu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use crate::plotter::cpu::metrics::CpuPlotterMetrics;
use crate::plotter::{Plotter, SectorPlottingProgress};
use crate::thread_pool_manager::PlottingThreadPoolManager;
use crate::utils::AsyncJoinOnDrop;
use async_lock::Mutex as AsyncMutex;
use async_trait::async_trait;
use event_listener_primitives::{Bag, HandlerId};
use futures::channel::mpsc;
Expand All @@ -32,7 +31,7 @@ use subspace_farmer_components::plotting::{
};
use subspace_farmer_components::{FarmerProtocolInfo, PieceGetter};
use subspace_proof_of_space::Table;
use tokio::sync::{OwnedSemaphorePermit, Semaphore};
use tokio::sync::{Mutex as AsyncMutex, OwnedSemaphorePermit, Semaphore};
use tokio::task::yield_now;
use tracing::{warn, Instrument};

Expand Down Expand Up @@ -302,7 +301,7 @@ where
}

// Take mutex briefly to make sure plotting is allowed right now
global_mutex.lock().await;
let _ = global_mutex.lock().await;

let downloading_start = Instant::now();

Expand Down
Loading
Loading