Skip to content

Commit

Permalink
Ensure that EvictingMap is threadsafe (#1564)
Browse files Browse the repository at this point in the history
Clamp down some trait bounds so that thread safety issues have a higher
chance of providing useful compiler errors.
  • Loading branch information
aaronmondal authored Jan 27, 2025
1 parent 2d2986b commit 4b5fe2e
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 22 deletions.
64 changes: 43 additions & 21 deletions nativelink-util/src/evicting_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ impl<T: LenEntry + Send + Sync> LenEntry for Arc<T> {
}

#[derive(MetricsComponent)]
struct State<K: Ord + Hash + Eq + Clone + Debug, T: LenEntry + Debug> {
struct State<K: Ord + Hash + Eq + Clone + Debug + Send, T: LenEntry + Debug + Send> {
lru: LruCache<K, EvictionItem<T>>,
btree: Option<BTreeSet<K>>,
#[metric(help = "Total size of all items in the store")]
Expand All @@ -116,12 +116,14 @@ struct State<K: Ord + Hash + Eq + Clone + Debug, T: LenEntry + Debug> {
lifetime_inserted_bytes: Counter,
}

impl<K: Ord + Hash + Eq + Clone + Debug, T: LenEntry + Debug + Sync> State<K, T> {
impl<K: Ord + Hash + Eq + Clone + Debug + Send + Sync, T: LenEntry + Debug + Sync + Send>
State<K, T>
{
/// Removes an item from the cache.
async fn remove<Q>(&mut self, key: &Q, eviction_item: &EvictionItem<T>, replaced: bool)
where
K: Borrow<Q>,
Q: Ord + Hash + Eq + Debug,
Q: Ord + Hash + Eq + Debug + Sync,
{
if let Some(btree) = &mut self.btree {
btree.remove(key.borrow());
Expand Down Expand Up @@ -153,7 +155,11 @@ impl<K: Ord + Hash + Eq + Clone + Debug, T: LenEntry + Debug + Sync> State<K, T>
}

#[derive(MetricsComponent)]
pub struct EvictingMap<K: Ord + Hash + Eq + Clone + Debug, T: LenEntry + Debug, I: InstantWrapper> {
pub struct EvictingMap<
K: Ord + Hash + Eq + Clone + Debug + Send,
T: LenEntry + Debug + Send,
I: InstantWrapper,
> {
#[metric]
state: Mutex<State<K, T>>,
anchor_time: I,
Expand All @@ -169,7 +175,7 @@ pub struct EvictingMap<K: Ord + Hash + Eq + Clone + Debug, T: LenEntry + Debug,

impl<K, T, I> EvictingMap<K, T, I>
where
K: Ord + Hash + Eq + Clone + Debug,
K: Ord + Hash + Eq + Clone + Debug + Send + Sync,
T: LenEntry + Debug + Clone + Send + Sync,
I: InstantWrapper,
{
Expand Down Expand Up @@ -210,11 +216,11 @@ where
/// and return the number of items that were processed.
/// The `handler` function should return `true` to continue processing the next item
/// or `false` to stop processing.
pub async fn range<F, Q>(&self, prefix_range: impl RangeBounds<Q>, mut handler: F) -> u64
pub async fn range<F, Q>(&self, prefix_range: impl RangeBounds<Q> + Send, mut handler: F) -> u64
where
F: FnMut(&K, &T) -> bool,
F: FnMut(&K, &T) -> bool + Send,
K: Borrow<Q> + Ord,
Q: Ord + Hash + Eq + Debug,
Q: Ord + Hash + Eq + Debug + Sync,
{
let mut state = self.state.lock().await;
let btree = if let Some(ref btree) = state.btree {
Expand Down Expand Up @@ -302,7 +308,7 @@ where
pub async fn size_for_key<Q>(&self, key: &Q) -> Option<u64>
where
K: Borrow<Q>,
Q: Ord + Hash + Eq + Debug,
Q: Ord + Hash + Eq + Debug + Sync,
{
let mut results = [None];
self.sizes_for_keys([key], &mut results[..], false).await;
Expand All @@ -317,15 +323,18 @@ where
/// LRU cache. Note: peek may still evict, but won't promote.
pub async fn sizes_for_keys<It, Q, R>(&self, keys: It, results: &mut [Option<u64>], peek: bool)
where
It: IntoIterator<Item = R>,
It: IntoIterator<Item = R> + Send,
// Note: It's not enough to have the inserts themselves be Send. The
// returned iterator should be Send as well.
<It as IntoIterator>::IntoIter: Send,
// This may look strange, but what we are doing is saying:
// * `K` must be able to borrow `Q`
// * `R` (the input stream item type) must also be able to borrow `Q`
// Note: That K and R do not need to be the same type, they just both need
// to be able to borrow a `Q`.
K: Borrow<Q>,
R: Borrow<Q>,
Q: Ord + Hash + Eq + Debug,
R: Borrow<Q> + Send,
Q: Ord + Hash + Eq + Debug + Sync,
{
let mut state = self.state.lock().await;

Expand Down Expand Up @@ -369,7 +378,7 @@ where
pub async fn get<Q>(&self, key: &Q) -> Option<T>
where
K: Borrow<Q>,
Q: Ord + Hash + Eq + Debug,
Q: Ord + Hash + Eq + Debug + Sync,
{
let mut state = self.state.lock().await;
self.evict_items(&mut *state).await;
Expand Down Expand Up @@ -404,7 +413,13 @@ where

/// Same as `insert()`, but optimized for multiple inserts.
/// Returns the replaced items if any.
pub async fn insert_many(&self, inserts: impl IntoIterator<Item = (K, T)>) -> Vec<T> {
pub async fn insert_many<It>(&self, inserts: It) -> Vec<T>
where
It: IntoIterator<Item = (K, T)> + Send,
// Note: It's not enough to have the inserts themselves be Send. The
// returned iterator should be Send as well.
<It as IntoIterator>::IntoIter: Send,
{
let mut inserts = inserts.into_iter().peekable();
// Shortcut for cases where there are no inserts, so we don't need to lock.
if inserts.peek().is_none() {
Expand All @@ -415,12 +430,18 @@ where
.await
}

async fn inner_insert_many(
async fn inner_insert_many<It>(
&self,
state: &mut State<K, T>,
inserts: impl IntoIterator<Item = (K, T)>,
inserts: It,
seconds_since_anchor: i32,
) -> Vec<T> {
) -> Vec<T>
where
It: IntoIterator<Item = (K, T)> + Send,
// Note: It's not enough to have the inserts themselves be Send. The
// returned iterator should be Send as well.
<It as IntoIterator>::IntoIter: Send,
{
let mut replaced_items = Vec::new();
for (key, data) in inserts {
let new_item_size = data.len();
Expand All @@ -442,7 +463,7 @@ where
pub async fn remove<Q>(&self, key: &Q) -> bool
where
K: Borrow<Q>,
Q: Ord + Hash + Eq + Debug,
Q: Ord + Hash + Eq + Debug + Sync,
{
let mut state = self.state.lock().await;
self.inner_remove(&mut state, key).await
Expand All @@ -451,7 +472,7 @@ where
async fn inner_remove<Q>(&self, state: &mut State<K, T>, key: &Q) -> bool
where
K: Borrow<Q>,
Q: Ord + Hash + Eq + Debug,
Q: Ord + Hash + Eq + Debug + Sync,
{
self.evict_items(state).await;
if let Some(entry) = state.lru.pop(key.borrow()) {
Expand All @@ -463,10 +484,11 @@ where

/// Same as `remove()`, but allows for a conditional to be applied to the
/// entry before removal in an atomic fashion.
pub async fn remove_if<Q, F: FnOnce(&T) -> bool>(&self, key: &Q, cond: F) -> bool
pub async fn remove_if<Q, F>(&self, key: &Q, cond: F) -> bool
where
K: Borrow<Q>,
Q: Ord + Hash + Eq + Debug,
Q: Ord + Hash + Eq + Debug + Sync,
F: FnOnce(&T) -> bool + Send,
{
let mut state = self.state.lock().await;
if let Some(entry) = state.lru.get(key.borrow()) {
Expand Down
2 changes: 1 addition & 1 deletion nativelink-util/tests/evicting_map_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -590,7 +590,7 @@ async fn remove_evicts_on_time() -> Result<(), Error> {
async fn range_multiple_items_test() -> Result<(), Error> {
async fn get_map_range(
evicting_map: &EvictingMap<String, BytesWrapper, MockInstantWrapped>,
range: impl std::ops::RangeBounds<String>,
range: impl std::ops::RangeBounds<String> + Send,
) -> Vec<(String, Bytes)> {
let mut found_values = Vec::new();
evicting_map
Expand Down

0 comments on commit 4b5fe2e

Please sign in to comment.