Skip to content

Commit

Permalink
Add existence cache (TraceMachina#383)
Browse files Browse the repository at this point in the history
  • Loading branch information
blakehatch authored Nov 15, 2023
1 parent 8a1cb6b commit e8e6701
Show file tree
Hide file tree
Showing 9 changed files with 299 additions and 0 deletions.
4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ members = [
"gencargo/evicting_map",
"gencargo/evicting_map_test",
"gencargo/execution_server",
"gencargo/existence_store",
"gencargo/existence_store_test",
"gencargo/fast_slow_store",
"gencargo/fast_slow_store_test",
"gencargo/fastcdc",
Expand Down Expand Up @@ -186,6 +188,8 @@ error = { path = "gencargo/error" }
evicting_map = { path = "gencargo/evicting_map" }
evicting_map_test = { path = "gencargo/evicting_map_test" }
execution_server = { path = "gencargo/execution_server" }
existence_store = { path = "gencargo/existence_store" }
existence_store_test = { path = "gencargo/existence_store_test" }
fast_slow_store = { path = "gencargo/fast_slow_store" }
fast_slow_store_test = { path = "gencargo/fast_slow_store_test" }
fastcdc = { path = "gencargo/fastcdc" }
Expand Down
39 changes: 39 additions & 0 deletions cas/store/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ rust_library(
":store",
":traits",
":verify_store",
":existence_store",
"//config",
"//util:error",
"//util:metrics_utils",
Expand Down Expand Up @@ -143,6 +144,27 @@ rust_library(
],
)

rust_library(
name = "existence_store",
srcs = ["existence_store.rs"],
proc_macro_deps = ["@crate_index//:async-trait"],
visibility = ["//cas:__pkg__"],
deps = [
":traits",
":ac_utils",
"//proto",
"//config",
"//util:buf_channel",
"//util:common",
"//util:error",
"//util:metrics_utils",
"@crate_index//:hashbrown",
"@crate_index//:hex",
"@crate_index//:sha2",
"@crate_index//:tokio",
],
)

rust_library(
name = "fast_slow_store",
srcs = ["fast_slow_store.rs"],
Expand Down Expand Up @@ -405,6 +427,23 @@ rust_test(
],
)

rust_test(
name = "existence_store_test",
srcs = ["tests/existence_store_test.rs"],
deps = [
":memory_store",
":traits",
":existence_store",
"//config",
"//util:buf_channel",
"//util:common",
"//util:error",
"@crate_index//:futures",
"@crate_index//:pretty_assertions",
"@crate_index//:tokio",
],
)

rust_test(
name = "s3_store_test",
srcs = ["tests/s3_store_test.rs"],
Expand Down
4 changes: 4 additions & 0 deletions cas/store/default_store_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use compression_store::CompressionStore;
use config::{self, stores::StoreConfig};
use dedup_store::DedupStore;
use error::Error;
use existence_store::ExistenceStore;
use fast_slow_store::FastSlowStore;
use filesystem_store::FilesystemStore;
use grpc_store::GrpcStore;
Expand Down Expand Up @@ -58,6 +59,9 @@ pub fn store_factory<'a>(
store_factory(&config.index_store, store_manager, None).await?,
store_factory(&config.content_store, store_manager, None).await?,
)),
StoreConfig::existence_store(config) => Arc::new(ExistenceStore::new(
store_factory(&config.inner, store_manager, None).await?,
)),
StoreConfig::fast_slow(config) => Arc::new(FastSlowStore::new(
config,
store_factory(&config.fast, store_manager, None).await?,
Expand Down
105 changes: 105 additions & 0 deletions cas/store/existence_store.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
// Copyright 2022 The Turbo Cache Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use hashbrown::HashSet;
use std::pin::Pin;
use std::sync::{Arc, Mutex};

use async_trait::async_trait;

use buf_channel::{DropCloserReadHalf, DropCloserWriteHalf};
use common::DigestInfo;
use error::Error;
use traits::{StoreTrait, UploadSizeInfo};

pub struct ExistenceStore {
inner_store: Arc<dyn StoreTrait>,
pub existence_cache: Mutex<HashSet<DigestInfo>>,
}

impl ExistenceStore {
pub fn new(inner_store: Arc<dyn StoreTrait>) -> Self {
Self {
inner_store,
// TODO (BlakeHatch):
// Consider using RwLock in a future commit.
// Since HashSet implements Send and Sync this should
// be a drop-in replacement in theory.
// Make sure benchmark is done to justify.
existence_cache: Mutex::new(HashSet::new()),
}
}

fn pin_inner(&self) -> Pin<&dyn StoreTrait> {
Pin::new(self.inner_store.as_ref())
}
}

#[async_trait]
impl StoreTrait for ExistenceStore {
async fn has_with_results(
self: Pin<&Self>,
digests: &[DigestInfo],
results: &mut [Option<usize>],
) -> Result<(), Error> {
let mut pruned_digests = Vec::new();

for (i, digest) in digests.iter().enumerate() {
if self.existence_cache.lock().unwrap().contains(digest) {
results[i] = Some(1);
} else {
pruned_digests.push(*digest);
}
}

if !pruned_digests.is_empty() {
let mut inner_results = vec![None; pruned_digests.len()];
self.pin_inner()
.has_with_results(&pruned_digests, &mut inner_results)
.await?;

for (i, result) in inner_results.iter().enumerate() {
if result.is_some() {
self.existence_cache.lock().unwrap().insert(pruned_digests[i]);
results[i] = Some(1);
}
}
}

Ok(())
}

async fn update(
self: Pin<&Self>,
digest: DigestInfo,
reader: DropCloserReadHalf,
size_info: UploadSizeInfo,
) -> Result<(), Error> {
self.pin_inner().update(digest, reader, size_info).await
}

async fn get_part_ref(
self: Pin<&Self>,
digest: DigestInfo,
writer: &mut DropCloserWriteHalf,
offset: usize,
length: Option<usize>,
) -> Result<(), Error> {
self.pin_inner().get_part_ref(digest, writer, offset, length).await
}

fn as_any(self: Arc<Self>) -> Box<dyn std::any::Any + Send> {
Box::new(self)
}
}
61 changes: 61 additions & 0 deletions cas/store/tests/existence_store_test.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Copyright 2022 The Turbo Cache Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::pin::Pin;
use std::sync::Arc;

#[cfg(test)]
mod verify_store_tests {
use super::*;

use common::DigestInfo;
use error::Error;
use existence_store::ExistenceStore;
use memory_store::MemoryStore;

use traits::StoreTrait;

const VALID_HASH1: &str = "0123456789abcdef000000000000000000010000000000000123456789abcdef";

async fn has_in_cache(store: Pin<&ExistenceStore>, digest: &DigestInfo) -> bool {
match store.existence_cache.lock() {
Ok(cache) => cache.contains(digest),
Err(_) => false,
}
}

#[tokio::test]
async fn verify_existence_caching() -> Result<(), Error> {
const VALUE: &str = "123";
let inner_store = Arc::new(MemoryStore::new(&config::stores::MemoryStore::default()));
let store_owned = ExistenceStore::new(inner_store.clone());
let store = Pin::new(&store_owned);

let digest = DigestInfo::try_new(VALID_HASH1, 3).unwrap();
let _result = store.update_oneshot(digest, VALUE.into()).await;

assert!(
!has_in_cache(store, &digest).await,
"Expected digest to not exist in cache before has call"
);

let _result = store.has(digest).await;

assert!(
has_in_cache(store, &digest).await,
"Expected digest to exist in cache after has call"
);
Ok(())
}
}
16 changes: 16 additions & 0 deletions config/stores.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,12 @@ pub enum StoreConfig {
/// if the individual chunks exist in the `content_store`.
dedup(Box<DedupStore>),

/// Existence store will wrap around another store and cache calls
/// to has so that subsequent has_with_results calls will be
/// faster. This is useful for cases when you have a store that
/// is slow to respond to has calls.
existence_store(Box<ExistenceStore>),

/// FastSlow store will first try to fetch the data from the `fast`
/// store and then if it does not exist try the `slow` store.
/// When the object does exist in the `slow` store, it will copy
Expand Down Expand Up @@ -279,6 +285,16 @@ pub struct DedupStore {
pub max_concurrent_fetch_per_get: u32,
}

#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct ExistenceStore {
/// The underlying store wrap around. All content will first flow
/// through self before forwarding to backend. In the event there
/// is an error detected in self, the connection to the backend
/// will be terminated, and early termination should always cause
/// updates to fail on the backend.
pub inner: StoreConfig,
}

#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct VerifyStore {
/// The underlying store wrap around. All content will first flow
Expand Down
1 change: 1 addition & 0 deletions gencargo/default_store_factory/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ futures = { workspace = true }
# Local libraries.
compression_store = { workspace = true }
dedup_store = { workspace = true }
existence_store = { workspace = true }
fast_slow_store = { workspace = true }
filesystem_store = { workspace = true }
grpc_store = { workspace = true }
Expand Down
36 changes: 36 additions & 0 deletions gencargo/existence_store/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# This file is automatically generated from `tools/build_cargo_manifest.py`.
# If you want to add a dependency add it to `tools/cargo_shared.bzl`
# then run `python tools/build_cargo_manifest.py`.
# Do not edit this file directly.

[package]
name = "existence_store"
version = "0.0.0"
edition = "2021"
autobins = false
autoexamples = false
autotests = false
autobenches = false

[lib]
name = "existence_store"
path = "../../cas/store/existence_store.rs"
# TODO(allada) We should support doctests.
doctest = false

[dependencies]
async-trait = { workspace = true }
hashbrown = { workspace = true }
hex = { workspace = true }
sha2 = { workspace = true }
tokio = { workspace = true }

# Local libraries.
ac_utils = { workspace = true }
traits = { workspace = true }
config = { workspace = true }
proto = { workspace = true }
buf_channel = { workspace = true }
common = { workspace = true }
error = { workspace = true }
metrics_utils = { workspace = true }
33 changes: 33 additions & 0 deletions gencargo/existence_store_test/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# This file is automatically generated from `tools/build_cargo_manifest.py`.
# If you want to add a dependency add it to `tools/cargo_shared.bzl`
# then run `python tools/build_cargo_manifest.py`.
# Do not edit this file directly.

[package]
name = "existence_store_test"
version = "0.0.0"
edition = "2021"
autobins = false
autoexamples = false
autotests = false
autobenches = false

[[test]]
name = "existence_store_test"
path = "../../cas/store/tests/existence_store_test.rs"
# TODO(allada) We should support doctests.
doctest = false

[dependencies]
futures = { workspace = true }
pretty_assertions = { workspace = true }
tokio = { workspace = true }

# Local libraries.
existence_store = { workspace = true }
memory_store = { workspace = true }
traits = { workspace = true }
config = { workspace = true }
buf_channel = { workspace = true }
common = { workspace = true }
error = { workspace = true }

0 comments on commit e8e6701

Please sign in to comment.