diff --git a/Cargo.lock b/Cargo.lock
index 78f0dd0704..1b2d1d7a90 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -12631,7 +12631,6 @@ name = "subspace-farmer"
version = "0.1.0"
dependencies = [
"anyhow",
- "async-lock 3.4.0",
"async-nats",
"async-trait",
"backoff",
@@ -12691,7 +12690,6 @@ dependencies = [
name = "subspace-farmer-components"
version = "0.1.0"
dependencies = [
- "async-lock 3.4.0",
"async-trait",
"backoff",
"bitvec",
diff --git a/crates/subspace-farmer-components/Cargo.toml b/crates/subspace-farmer-components/Cargo.toml
index 88aa5e4c69..3dafc859a5 100644
--- a/crates/subspace-farmer-components/Cargo.toml
+++ b/crates/subspace-farmer-components/Cargo.toml
@@ -16,7 +16,6 @@ include = [
bench = false
[dependencies]
-async-lock = "3.3.0"
async-trait = "0.1.81"
backoff = { version = "0.4.0", features = ["futures", "tokio"] }
bitvec = "1.0.1"
diff --git a/crates/subspace-farmer-components/src/plotting.rs b/crates/subspace-farmer-components/src/plotting.rs
index 508ba3aa27..07a4651ca1 100644
--- a/crates/subspace-farmer-components/src/plotting.rs
+++ b/crates/subspace-farmer-components/src/plotting.rs
@@ -12,7 +12,6 @@ use crate::sector::{
};
use crate::segment_reconstruction::recover_missing_piece;
use crate::{FarmerProtocolInfo, PieceGetter};
-use async_lock::Mutex as AsyncMutex;
use backoff::future::retry;
use backoff::{Error as BackoffError, ExponentialBackoff};
use futures::stream::FuturesUnordered;
@@ -34,7 +33,7 @@ use subspace_core_primitives::{
use subspace_erasure_coding::ErasureCoding;
use subspace_proof_of_space::{Table, TableGenerator};
use thiserror::Error;
-use tokio::sync::{AcquireError, Semaphore};
+use tokio::sync::{AcquireError, Mutex as AsyncMutex, Semaphore};
use tracing::{debug, trace, warn};
const RECONSTRUCTION_CONCURRENCY_LIMIT: usize = 1;
@@ -397,7 +396,7 @@ where
loop {
// Take mutex briefly to make sure encoding is allowed right now
- global_mutex.lock_blocking();
+ let _ = global_mutex.blocking_lock();
// This instead of `while` above because otherwise mutex will be held
// for the duration of the loop and will limit concurrency to 1 record
diff --git a/crates/subspace-farmer/Cargo.toml b/crates/subspace-farmer/Cargo.toml
index 98dc1198cf..0d70fda086 100644
--- a/crates/subspace-farmer/Cargo.toml
+++ b/crates/subspace-farmer/Cargo.toml
@@ -17,7 +17,6 @@ required-features = ["binary"]
[dependencies]
anyhow = "1.0.86"
-async-lock = "3.3.0"
async-nats = { version = "0.35.1", optional = true }
async-trait = "0.1.81"
backoff = { version = "0.4.0", features = ["futures", "tokio"] }
@@ -65,7 +64,7 @@ supports-color = { version = "3.0.0", optional = true }
tempfile = "3.12.0"
thiserror = "1.0.63"
thread-priority = "1.1.0"
-tokio = { version = "1.39.2", features = ["macros", "parking_lot", "rt-multi-thread", "signal", "time"] }
+tokio = { version = "1.39.2", features = ["macros", "parking_lot", "rt-multi-thread", "signal", "sync", "time"] }
tokio-stream = { version = "0.1.15", features = ["sync"] }
tracing = "0.1.40"
tracing-subscriber = { version = "0.3.18", features = ["env-filter"], optional = true }
diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/controller.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/controller.rs
index 6f40a5be03..2f41466447 100644
--- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/controller.rs
+++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/controller.rs
@@ -6,7 +6,6 @@ use crate::commands::cluster::controller::farms::{maintain_farms, FarmIndex};
use crate::commands::shared::derive_libp2p_keypair;
use crate::commands::shared::network::{configure_network, NetworkArgs};
use anyhow::anyhow;
-use async_lock::RwLock as AsyncRwLock;
use backoff::ExponentialBackoff;
use clap::{Parser, ValueHint};
use futures::stream::FuturesUnordered;
@@ -31,6 +30,7 @@ use subspace_farmer::node_client::NodeClient;
use subspace_farmer::single_disk_farm::identity::Identity;
use subspace_farmer::utils::{run_future_in_dedicated_thread, AsyncJoinOnDrop};
use subspace_networking::utils::piece_provider::PieceProvider;
+use tokio::sync::RwLock as AsyncRwLock;
use tracing::info;
/// Get piece retry attempts number.
diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/controller/farms.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/controller/farms.rs
index e8efec6895..e0b03e54a7 100644
--- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/controller/farms.rs
+++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/controller/farms.rs
@@ -6,7 +6,6 @@
use crate::commands::cluster::farmer::FARMER_IDENTIFICATION_BROADCAST_INTERVAL;
use anyhow::anyhow;
-use async_lock::RwLock as AsyncRwLock;
use futures::channel::oneshot;
use futures::future::FusedFuture;
use futures::stream::FuturesUnordered;
@@ -25,6 +24,7 @@ use subspace_farmer::cluster::farmer::{ClusterFarm, ClusterFarmerIdentifyFarmBro
use subspace_farmer::cluster::nats_client::NatsClient;
use subspace_farmer::farm::plotted_pieces::PlottedPieces;
use subspace_farmer::farm::{Farm, FarmId, SectorPlottingDetails, SectorUpdate};
+use tokio::sync::RwLock as AsyncRwLock;
use tokio::task;
use tokio::time::MissedTickBehavior;
use tracing::{error, info, trace, warn};
@@ -178,7 +178,7 @@ pub(super) async fn maintain_farms(
let plotted_pieces = Arc::clone(plotted_pieces);
let delete_farm_fut = task::spawn_blocking(move || {
- plotted_pieces.write_blocking().delete_farm(farm_index);
+ plotted_pieces.blocking_write().delete_farm(farm_index);
});
if let Err(error) = delete_farm_fut.await {
error!(
@@ -235,7 +235,7 @@ pub(super) async fn maintain_farms(
let plotted_pieces = Arc::clone(plotted_pieces);
let delete_farm_fut = task::spawn_blocking(move || {
- plotted_pieces.write_blocking().delete_farm(farm_index);
+ plotted_pieces.blocking_write().delete_farm(farm_index);
});
if let Err(error) = delete_farm_fut.await {
error!(
@@ -322,7 +322,7 @@ fn process_farm_identify_message<'a>(
let plotted_pieces = Arc::clone(plotted_pieces);
let delete_farm_fut = task::spawn_blocking(move || {
- plotted_pieces.write_blocking().delete_farm(farm_index);
+ plotted_pieces.blocking_write().delete_farm(farm_index);
});
if let Err(error) = delete_farm_fut.await {
error!(
@@ -434,7 +434,7 @@ async fn initialize_farm(
drop(sector_update_handler);
let plotted_sectors_buffer = mem::take(&mut *plotted_sectors_buffer.lock());
let add_buffered_sectors_fut = task::spawn_blocking(move || {
- let mut plotted_pieces = plotted_pieces.write_blocking();
+ let mut plotted_pieces = plotted_pieces.blocking_write();
for (plotted_sector, old_plotted_sector) in plotted_sectors_buffer {
if let Some(old_plotted_sector) = old_plotted_sector {
diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/farmer.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/farmer.rs
index 16ff9de26d..ecdf1acbd4 100644
--- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/farmer.rs
+++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/farmer.rs
@@ -2,7 +2,6 @@
use crate::commands::shared::DiskFarm;
use anyhow::anyhow;
-use async_lock::Mutex as AsyncMutex;
use backoff::ExponentialBackoff;
use bytesize::ByteSize;
use clap::Parser;
@@ -35,7 +34,7 @@ use subspace_farmer::utils::{
};
use subspace_farmer_components::reading::ReadSectorRecordChunksMode;
use subspace_proof_of_space::Table;
-use tokio::sync::{Barrier, Semaphore};
+use tokio::sync::{Barrier, Mutex as AsyncMutex, Semaphore};
use tracing::{error, info, info_span, warn, Instrument};
const FARM_ERROR_PRINT_INTERVAL: Duration = Duration::from_secs(30);
diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/plotter.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/plotter.rs
index c898a5ffaa..4da8c6b8db 100644
--- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/plotter.rs
+++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/plotter.rs
@@ -1,6 +1,5 @@
use crate::commands::shared::PlottingThreadPriority;
use anyhow::anyhow;
-use async_lock::Mutex as AsyncMutex;
use clap::Parser;
use futures::{select, FutureExt};
use prometheus_client::registry::Registry;
@@ -27,7 +26,7 @@ use subspace_farmer::utils::{
};
use subspace_farmer_components::PieceGetter;
use subspace_proof_of_space::Table;
-use tokio::sync::Semaphore;
+use tokio::sync::{Mutex as AsyncMutex, Semaphore};
use tracing::info;
const PLOTTING_RETRY_INTERVAL: Duration = Duration::from_secs(5);
diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs
index 8b86bbf769..b1b51abead 100644
--- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs
+++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs
@@ -2,7 +2,6 @@ use crate::commands::shared::network::{configure_network, NetworkArgs};
use crate::commands::shared::{derive_libp2p_keypair, DiskFarm, PlottingThreadPriority};
use crate::utils::shutdown_signal;
use anyhow::anyhow;
-use async_lock::{Mutex as AsyncMutex, RwLock as AsyncRwLock};
use backoff::ExponentialBackoff;
use bytesize::ByteSize;
use clap::{Parser, ValueHint};
@@ -51,7 +50,8 @@ use subspace_farmer_components::PieceGetter;
use subspace_metrics::{start_prometheus_metrics_server, RegistryAdapter};
use subspace_networking::utils::piece_provider::PieceProvider;
use subspace_proof_of_space::Table;
-use tokio::sync::{Barrier, Semaphore};
+use tokio::sync::{Barrier, Mutex as AsyncMutex, RwLock as AsyncRwLock, Semaphore};
+use tokio::task;
use tracing::{error, info, info_span, warn, Instrument};
/// Get piece retry attempts number.
@@ -713,7 +713,8 @@ where
{
let _span_guard = span.enter();
- let mut plotted_pieces = plotted_pieces.write_blocking();
+ let mut plotted_pieces =
+ task::block_in_place(|| plotted_pieces.blocking_write());
if let Some(old_plotted_sector) = &old_plotted_sector {
plotted_pieces.delete_sector(farm_index, old_plotted_sector);
diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/shared/network.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/shared/network.rs
index 8f1ed29ee0..e5ad780b60 100644
--- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/shared/network.rs
+++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/shared/network.rs
@@ -1,4 +1,3 @@
-use async_lock::RwLock as AsyncRwLock;
use clap::Parser;
use prometheus_client::registry::Registry;
use std::collections::HashSet;
@@ -22,6 +21,7 @@ use subspace_networking::{
SegmentHeaderBySegmentIndexesRequestHandler, SegmentHeaderRequest, SegmentHeaderResponse,
};
use subspace_rpc_primitives::MAX_SEGMENT_HEADERS_PER_REQUEST;
+use tokio::sync::RwLock as AsyncRwLock;
use tracing::{debug, error, info, Instrument};
/// How many segment headers can be requested at a time.
@@ -139,7 +139,8 @@ where
let read_piece_fut = match weak_plotted_pieces.upgrade() {
Some(plotted_pieces) => plotted_pieces
- .try_read()?
+ .try_read()
+ .ok()?
.read_piece(piece_index)?
.in_current_span(),
None => {
diff --git a/crates/subspace-farmer/src/cluster/controller.rs b/crates/subspace-farmer/src/cluster/controller.rs
index ddc4730917..2c4856168c 100644
--- a/crates/subspace-farmer/src/cluster/controller.rs
+++ b/crates/subspace-farmer/src/cluster/controller.rs
@@ -14,7 +14,6 @@ use crate::farm::{PieceCacheId, PieceCacheOffset};
use crate::farmer_cache::FarmerCache;
use crate::node_client::{Error as NodeClientError, NodeClient};
use anyhow::anyhow;
-use async_lock::Semaphore;
use async_nats::HeaderValue;
use async_trait::async_trait;
use futures::{select, FutureExt, Stream, StreamExt};
@@ -29,6 +28,7 @@ use subspace_farmer_components::PieceGetter;
use subspace_rpc_primitives::{
FarmerAppInfo, RewardSignatureResponse, RewardSigningInfo, SlotInfo, SolutionResponse,
};
+use tokio::sync::Semaphore;
use tracing::{debug, trace, warn};
/// Broadcast sent by controllers requesting farmers to identify themselves
diff --git a/crates/subspace-farmer/src/farmer_cache.rs b/crates/subspace-farmer/src/farmer_cache.rs
index 1d4a8e81d6..6fb19111da 100644
--- a/crates/subspace-farmer/src/farmer_cache.rs
+++ b/crates/subspace-farmer/src/farmer_cache.rs
@@ -13,7 +13,6 @@ use crate::farmer_cache::metrics::FarmerCacheMetrics;
use crate::farmer_cache::piece_cache_state::PieceCachesState;
use crate::node_client::NodeClient;
use crate::utils::run_future_in_dedicated_thread;
-use async_lock::RwLock as AsyncRwLock;
use event_listener_primitives::{Bag, HandlerId};
use futures::stream::{FuturesOrdered, FuturesUnordered};
use futures::{select, FutureExt, StreamExt};
@@ -32,7 +31,7 @@ use subspace_networking::libp2p::PeerId;
use subspace_networking::utils::multihash::ToMultihash;
use subspace_networking::{KeyWithDistance, LocalRecordProvider};
use tokio::runtime::Handle;
-use tokio::sync::mpsc;
+use tokio::sync::{mpsc, RwLock as AsyncRwLock};
use tokio::task::{block_in_place, yield_now};
use tracing::{debug, error, info, trace, warn};
@@ -1208,7 +1207,8 @@ where
let distance_key = KeyWithDistance::new(self.peer_id, key.clone());
if self
.piece_caches
- .try_read()?
+ .try_read()
+ .ok()?
.contains_stored_piece(&distance_key)
{
// Note: We store our own provider records locally without local addresses
@@ -1225,7 +1225,8 @@ where
let found_fut = self
.plot_caches
.caches
- .try_read()?
+ .try_read()
+ .ok()?
.iter()
.map(|plot_cache| {
let plot_cache = Arc::clone(plot_cache);
diff --git a/crates/subspace-farmer/src/farmer_piece_getter.rs b/crates/subspace-farmer/src/farmer_piece_getter.rs
index 5ced33ac58..20bba024d2 100644
--- a/crates/subspace-farmer/src/farmer_piece_getter.rs
+++ b/crates/subspace-farmer/src/farmer_piece_getter.rs
@@ -3,9 +3,6 @@
use crate::farm::plotted_pieces::PlottedPieces;
use crate::farmer_cache::FarmerCache;
use crate::node_client::NodeClient;
-use async_lock::{
- Mutex as AsyncMutex, MutexGuardArc as AsyncMutexGuardArc, RwLock as AsyncRwLock, Semaphore,
-};
use async_trait::async_trait;
use backoff::backoff::Backoff;
use backoff::future::retry;
@@ -22,6 +19,9 @@ use subspace_core_primitives::{Piece, PieceIndex};
use subspace_farmer_components::PieceGetter;
use subspace_networking::utils::multihash::ToMultihash;
use subspace_networking::utils::piece_provider::{PieceProvider, PieceValidator};
+use tokio::sync::{
+ Mutex as AsyncMutex, OwnedMutexGuard as AsyncOwnedMutexGuard, RwLock as AsyncRwLock, Semaphore,
+};
use tracing::{debug, error, trace};
pub mod piece_validator;
@@ -30,7 +30,7 @@ const MAX_RANDOM_WALK_ROUNDS: usize = 15;
struct InProgressPieceGetting<'a> {
piece_index: PieceIndex,
- in_progress_piece: AsyncMutexGuardArc