Skip to content

Commit

Permalink
implement wasi kv 0.2.0-draft2 for redis and cosmosdb
Browse files Browse the repository at this point in the history
Signed-off-by: David Justice <[email protected]>
  • Loading branch information
devigned committed Nov 2, 2024
1 parent a0a132f commit 50f22a1
Show file tree
Hide file tree
Showing 13 changed files with 460 additions and 90 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/factor-key-value/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ spin-world = { path = "../world" }
tokio = { workspace = true, features = ["macros", "sync", "rt"] }
toml = { workspace = true }
tracing = { workspace = true }
thiserror = { workspace = true }

[dev-dependencies]
spin-factors-test = { path = "../factors-test" }
Expand Down
22 changes: 13 additions & 9 deletions crates/factor-key-value/src/host.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use super::Cas;
use super::{Cas, SwapError};
use anyhow::{Context, Result};
use spin_core::{async_trait, wasmtime::component::Resource};
use spin_resource_table::Table;
Expand Down Expand Up @@ -378,22 +378,21 @@ impl wasi_keyvalue::atomics::Host for KeyValueDispatch {
.map_err(|e| CasError::StoreError(atomics::Error::Other(e.to_string())))?;

match cas.swap(value).await {
Ok(cas) => Ok(Ok(())),
Err(err) => {
if err.to_string().contains("CAS_ERROR") {
Ok(_) => Ok(Ok(())),
Err(err) => match err {
SwapError::CasFailed(_) => {
let bucket = Resource::new_own(cas.bucket_rep().await);
let new_cas = self.new(bucket, cas.key().await).await?;
let new_cas_rep = new_cas.rep();
self.current(Resource::new_own(new_cas_rep)).await?;
Err(anyhow::Error::new(CasError::CasFailed(Resource::new_own(
new_cas_rep,
))))
} else {
Err(anyhow::Error::new(CasError::StoreError(
atomics::Error::Other(err.to_string()),
)))
}
}
SwapError::Other(msg) => Err(anyhow::Error::new(CasError::StoreError(
atomics::Error::Other(msg),
))),
},
}
}
}
Expand All @@ -403,6 +402,11 @@ pub fn log_error(err: impl std::fmt::Debug) -> Error {
Error::Other(format!("{err:?}"))
}

pub fn log_cas_error(err: impl std::fmt::Debug) -> SwapError {
tracing::warn!("key-value error: {err:?}");
SwapError::Other(format!("{err:?}"))
}

use spin_world::v1::key_value::Error as LegacyError;
use spin_world::wasi::keyvalue::atomics;
use spin_world::wasi::keyvalue::atomics::{CasError, HostCas};
Expand Down
29 changes: 27 additions & 2 deletions crates/factor-key-value/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use spin_locked_app::MetadataKey;

/// Metadata key for key-value stores.
pub const KEY_VALUE_STORES_KEY: MetadataKey<Vec<String>> = MetadataKey::new("key_value_stores");
pub use host::{log_error, Error, KeyValueDispatch, Store, StoreManager};
pub use host::{log_cas_error, log_error, Error, KeyValueDispatch, Store, StoreManager};
pub use runtime_config::RuntimeConfig;
use spin_core::async_trait;
pub use util::{CachingStoreManager, DelegatingStoreManager};
Expand All @@ -42,6 +42,8 @@ impl Factor for KeyValueFactor {
ctx.link_bindings(spin_world::v1::key_value::add_to_linker)?;
ctx.link_bindings(spin_world::v2::key_value::add_to_linker)?;
ctx.link_bindings(spin_world::wasi::keyvalue::store::add_to_linker)?;
ctx.link_bindings(spin_world::wasi::keyvalue::batch::add_to_linker)?;
ctx.link_bindings(spin_world::wasi::keyvalue::atomics::add_to_linker)?;
Ok(())
}

Expand Down Expand Up @@ -133,10 +135,33 @@ impl AppState {
}
}

/// `SwapError` are errors that occur during compare and swap operations
#[derive(Debug, thiserror::Error)]
pub enum SwapError {
#[error("{0}")]
CasFailed(String),

#[error("{0}")]
Other(String),
}

/// `Cas` trait describes the interface a key value compare and swap implementor must fulfill.
///
/// `current` is expected to get the current value for the key associated with the CAS operation
/// while also starting what is needed to ensure the value to be replaced will not have mutated
/// between the time of calling `current` and `swap`. For example, a get from a backend store
/// may provide the caller with an etag (a version stamp), which can be used with an if-match
/// header to ensure the version updated is the version that was read (optimistic concurrency).
/// Rather than an etag, one could start a transaction, if supported by the backing store, which
/// would provide atomicity.
///
/// `swap` is expected to replace the old value with the new value respecting the atomicity of the
/// operation. If there was no key / value with the given key in the store, the `swap` operation
/// should **insert** the key and value, disallowing an update.
#[async_trait]
pub trait Cas: Sync + Send {
async fn current(&self) -> anyhow::Result<Option<Vec<u8>>, Error>;
async fn swap(&self, value: Vec<u8>) -> anyhow::Result<(), Error>;
async fn swap(&self, value: Vec<u8>) -> anyhow::Result<(), SwapError>;
async fn bucket_rep(&self) -> u32;
async fn key(&self) -> String;
}
Expand Down
131 changes: 100 additions & 31 deletions crates/factor-key-value/src/util.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::{Cas, Error, Store, StoreManager};
use crate::{Cas, Error, Store, StoreManager, SwapError};
use lru::LruCache;
use spin_core::async_trait;
use std::{
Expand Down Expand Up @@ -92,10 +92,10 @@ impl<T: StoreManager> StoreManager for CachingStoreManager<T> {
async fn get(&self, name: &str) -> Result<Arc<dyn Store>, Error> {
Ok(Arc::new(CachingStore {
inner: self.inner.get(name).await?,
state: AsyncMutex::new(CachingStoreState {
state: Arc::new(AsyncMutex::new(CachingStoreState {
cache: LruCache::new(self.capacity),
previous_task: None,
}),
})),
}))
}

Expand Down Expand Up @@ -143,7 +143,7 @@ impl CachingStoreState {

struct CachingStore {
inner: Arc<dyn Store>,
state: AsyncMutex<CachingStoreState>,
state: Arc<AsyncMutex<CachingStoreState>>,
}

#[async_trait]
Expand Down Expand Up @@ -242,49 +242,118 @@ impl Store for CachingStore {
&self,
keys: Vec<String>,
) -> anyhow::Result<Vec<(String, Option<Vec<u8>>)>, Error> {
// // Retrieve the specified value from the cache, lazily populating the cache as necessary.
// let mut state = self.state.lock().await;
//
// let mut keys_and_values: Vec<Option<(String, Vec<u8>)>> = Vec::new();
// let mut keys_not_found: Vec<String> = Vec::new();
// for key in keys {
// match state.cache.get(key.as_str()).cloned() {
// Some(value) => keys_and_values.push(Some((key, value))),
// None => keys_not_found.push(key),
// }
// }
//
// // guarantee the guest will read its own writes even if entries have been popped off the end of the LRU
// // cache prior to their corresponding writes reaching the backing store.
// state.flush().await?;
//
// let value = self.inner.get(key).await?;
//
// state.cache.put(key.to_owned(), value.clone());
//
// Ok(value)
//
let mut state = self.state.lock().await;
let mut found: Vec<(String, Option<Vec<u8>>)> = Vec::new();
let mut not_found: Vec<String> = Vec::new();
for key in keys {
match state.cache.get(key.as_str()) {
Some(Some(value)) => found.push((key, Some(value.clone()))),
_ => not_found.push(key),
}
}

let keys_and_values = self.inner.get_many(not_found).await?;
for (key, value) in keys_and_values {
found.push((key.clone(), value.clone()));
state.cache.put(key, value);
}

todo!()
Ok(found)
}

async fn set_many(&self, key_values: Vec<(String, Vec<u8>)>) -> anyhow::Result<(), Error> {
todo!()
let mut state = self.state.lock().await;

for (key, value) in key_values.clone() {
state.cache.put(key, Some(value));
}

self.inner.set_many(key_values).await
}

async fn delete_many(&self, keys: Vec<String>) -> anyhow::Result<(), Error> {
todo!()
let mut state = self.state.lock().await;

for key in keys.clone() {
state.cache.put(key, None);
}

self.inner.delete_many(keys).await
}

async fn increment(&self, key: String, delta: i64) -> anyhow::Result<i64, Error> {
todo!()
let mut state = self.state.lock().await;
let counter = self.inner.increment(key.clone(), delta).await?;
state
.cache
.put(key, Some(i64::to_le_bytes(counter).to_vec()));
Ok(counter)
}

async fn new_compare_and_swap(
&self,
bucket_rep: u32,
key: &str,
) -> anyhow::Result<Arc<dyn Cas>, Error> {
todo!()
let inner = self.inner.new_compare_and_swap(bucket_rep, key).await?;
Ok(Arc::new(CompareAndSwap {
bucket_rep,
state: self.state.clone(),
key: key.to_string(),
inner_cas: inner,
}))
}
}

struct CompareAndSwap {
bucket_rep: u32,
key: String,
state: Arc<AsyncMutex<CachingStoreState>>,
inner_cas: Arc<dyn Cas>,
}

#[async_trait]
impl Cas for CompareAndSwap {
async fn current(&self) -> anyhow::Result<Option<Vec<u8>>, Error> {
let mut state = self.state.lock().await;
state.flush().await?;
let res = self.inner_cas.current().await;
match res.clone() {
Ok(value) => {
state.cache.put(self.key.clone(), value.clone());
state.flush().await?;
Ok(value)
}
Err(err) => Err(err),
}?;
res
}

async fn swap(&self, value: Vec<u8>) -> anyhow::Result<(), SwapError> {
let mut state = self.state.lock().await;
state
.flush()
.await
.map_err(|_e| SwapError::Other("failed flushing".to_string()))?;
let res = self.inner_cas.swap(value.clone()).await;
match res {
Ok(()) => {
state.cache.put(self.key.clone(), Some(value));
state
.flush()
.await
.map_err(|_e| SwapError::Other("failed flushing".to_string()))?;
Ok(())
}
Err(err) => Err(err),
}
}

async fn bucket_rep(&self) -> u32 {
self.bucket_rep
}

async fn key(&self) -> String {
self.key.clone()
}
}
34 changes: 33 additions & 1 deletion crates/factor-key-value/tests/factor_test.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use anyhow::bail;
use spin_core::async_trait;
use spin_factor_key_value::{KeyValueFactor, RuntimeConfig, Store, StoreManager};
use spin_factor_key_value::{Cas, KeyValueFactor, RuntimeConfig, Store, StoreManager};
use spin_factors::RuntimeFactors;
use spin_factors_test::{toml, TestEnvironment};
use spin_world::v2::key_value::{Error, HostStore};
Expand Down Expand Up @@ -140,4 +140,36 @@ impl Store for MockStore {
async fn get_keys(&self) -> Result<Vec<String>, Error> {
todo!()
}

async fn get_many(
&self,
keys: Vec<String>,
) -> anyhow::Result<Vec<(String, Option<Vec<u8>>)>, Error> {
let _ = keys;
todo!()
}

async fn set_many(&self, key_values: Vec<(String, Vec<u8>)>) -> anyhow::Result<(), Error> {
let _ = key_values;
todo!()
}

async fn delete_many(&self, keys: Vec<String>) -> anyhow::Result<(), Error> {
let _ = keys;
todo!()
}

async fn increment(&self, key: String, delta: i64) -> anyhow::Result<i64, Error> {
let (_, _) = (key, delta);
todo!()
}

async fn new_compare_and_swap(
&self,
bucket_rep: u32,
key: &str,
) -> anyhow::Result<Arc<dyn Cas>, Error> {
let (_, _) = (key, bucket_rep);
todo!()
}
}
1 change: 1 addition & 0 deletions crates/key-value-azure/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ rust-version.workspace = true
anyhow = { workspace = true }
azure_data_cosmos = { git = "https://github.com/azure/azure-sdk-for-rust.git", rev = "8c4caa251c3903d5eae848b41bb1d02a4d65231c" }
azure_identity = { git = "https://github.com/azure/azure-sdk-for-rust.git", rev = "8c4caa251c3903d5eae848b41bb1d02a4d65231c" }
azure_core = { git = "https://github.com/azure/azure-sdk-for-rust.git", rev = "8c4caa251c3903d5eae848b41bb1d02a4d65231c" }
futures = { workspace = true }
serde = { workspace = true }
spin-core = { path = "../core" }
Expand Down
Loading

0 comments on commit 50f22a1

Please sign in to comment.