Skip to content

Commit

Permalink
implement wasi kv 0.2.0-draft2 for redis and cosmosdb
Browse files Browse the repository at this point in the history
Signed-off-by: David Justice <[email protected]>
  • Loading branch information
devigned committed Oct 31, 2024
1 parent a0a132f commit d627003
Show file tree
Hide file tree
Showing 8 changed files with 285 additions and 45 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion crates/factor-key-value/src/host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ impl wasi_keyvalue::atomics::Host for KeyValueDispatch {
.map_err(|e| CasError::StoreError(atomics::Error::Other(e.to_string())))?;

match cas.swap(value).await {
Ok(cas) => Ok(Ok(())),
Ok(_) => Ok(Ok(())),
Err(err) => {
if err.to_string().contains("CAS_ERROR") {
let bucket = Resource::new_own(cas.bucket_rep().await);
Expand Down
68 changes: 41 additions & 27 deletions crates/factor-key-value/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,49 +242,63 @@ impl Store for CachingStore {
&self,
keys: Vec<String>,
) -> anyhow::Result<Vec<(String, Option<Vec<u8>>)>, Error> {
// // Retrieve the specified value from the cache, lazily populating the cache as necessary.
// let mut state = self.state.lock().await;
//
// let mut keys_and_values: Vec<Option<(String, Vec<u8>)>> = Vec::new();
// let mut keys_not_found: Vec<String> = Vec::new();
// for key in keys {
// match state.cache.get(key.as_str()).cloned() {
// Some(value) => keys_and_values.push(Some((key, value))),
// None => keys_not_found.push(key),
// }
// }
//
// // guarantee the guest will read its own writes even if entries have been popped off the end of the LRU
// // cache prior to their corresponding writes reaching the backing store.
// state.flush().await?;
//
// let value = self.inner.get(key).await?;
//
// state.cache.put(key.to_owned(), value.clone());
//
// Ok(value)
//
let mut state = self.state.lock().await;
let mut found: Vec<(String, Option<Vec<u8>>)> = Vec::new();
let mut not_found: Vec<String> = Vec::new();
for key in keys {
match state.cache.get(key.as_str()) {
Some(res) => match res {
Some(value) => found.push((key, Some(value.clone()))),
None => not_found.push(key),
},
None => not_found.push(key),
}
}

todo!()
let keys_and_values = self.inner.get_many(not_found).await?;
for (key, value) in keys_and_values {
found.push((key.clone(), value.clone()));
state.cache.put(key, value);
}

Ok(found)
}

async fn set_many(&self, key_values: Vec<(String, Vec<u8>)>) -> anyhow::Result<(), Error> {
todo!()
let mut state = self.state.lock().await;

for (key, value) in key_values.clone() {
state.cache.put(key.to_owned(), Some(value));
}

self.inner.set_many(key_values).await
}

async fn delete_many(&self, keys: Vec<String>) -> anyhow::Result<(), Error> {
todo!()
let mut state = self.state.lock().await;

for key in keys.clone() {
state.cache.put(key.to_owned(), None);
}

self.inner.delete_many(keys).await
}

async fn increment(&self, key: String, delta: i64) -> anyhow::Result<i64, Error> {
todo!()
let counter = self.inner.increment(key.clone(), delta).await?;
self.state
.lock()
.await
.cache
.put(key, Some(i64::to_le_bytes(counter).to_vec()));
Ok(counter)
}

async fn new_compare_and_swap(
&self,
bucket_rep: u32,
key: &str,
) -> anyhow::Result<Arc<dyn Cas>, Error> {
todo!()
self.inner.new_compare_and_swap(bucket_rep, key).await
}
}
1 change: 1 addition & 0 deletions crates/key-value-azure/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ rust-version.workspace = true
anyhow = { workspace = true }
azure_data_cosmos = { git = "https://github.com/azure/azure-sdk-for-rust.git", rev = "8c4caa251c3903d5eae848b41bb1d02a4d65231c" }
azure_identity = { git = "https://github.com/azure/azure-sdk-for-rust.git", rev = "8c4caa251c3903d5eae848b41bb1d02a4d65231c" }
azure_core = { git = "https://github.com/azure/azure-sdk-for-rust.git", rev = "8c4caa251c3903d5eae848b41bb1d02a4d65231c" }
futures = { workspace = true }
serde = { workspace = true }
spin-core = { path = "../core" }
Expand Down
145 changes: 137 additions & 8 deletions crates/key-value-azure/src/store.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
use std::sync::Arc;

use anyhow::Result;
use azure_data_cosmos::prelude::Operation;
use azure_data_cosmos::resources::collection::PartitionKey;
use azure_data_cosmos::{
prelude::{AuthorizationToken, CollectionClient, CosmosClient, Query},
CosmosEntity,
};
use futures::StreamExt;
use serde::{Deserialize, Serialize};
use spin_core::async_trait;
use spin_factor_key_value::{log_error, Error, Store, StoreManager};
use spin_factor_key_value::{log_error, Cas, Error, Store, StoreManager};
use std::sync::{Arc, Mutex};

pub struct KeyValueAzureCosmos {
client: CollectionClient,
Expand Down Expand Up @@ -111,11 +112,19 @@ impl StoreManager for KeyValueAzureCosmos {
}
}

#[derive(Clone)]
struct AzureCosmosStore {
_name: String,
client: CollectionClient,
}

struct CompareAndSwap {
key: String,
client: CollectionClient,
bucket_rep: u32,
etag: Mutex<Option<String>>,
}

#[async_trait]
impl Store for AzureCosmosStore {
async fn get(&self, key: &str) -> Result<Option<Vec<u8>>, Error> {
Expand Down Expand Up @@ -153,27 +162,147 @@ impl Store for AzureCosmosStore {
}

async fn get_many(&self, keys: Vec<String>) -> Result<Vec<(String, Option<Vec<u8>>)>, Error> {
todo!()
let in_clause: String = keys
.into_iter()
.map(|k| format!("'{}'", k))
.collect::<Vec<String>>()
.join(", ");
let stmt = Query::new(format!("SELECT * FROM c WHERE c.id IN ({})", in_clause));
let query = self
.client
.query_documents(stmt)
.query_cross_partition(true);

let mut res = Vec::new();
let mut stream = query.into_stream::<Pair>();
while let Some(resp) = stream.next().await {
let resp = resp.map_err(log_error)?;
for (pair, _) in resp.results {
res.push((pair.id, Some(pair.value)));
}
}
Ok(res)
}

async fn set_many(&self, key_values: Vec<(String, Vec<u8>)>) -> Result<(), Error> {
todo!()
for (key, value) in key_values {
self.set(key.as_ref(), &value).await?
}
Ok(())
}

async fn delete_many(&self, keys: Vec<String>) -> Result<(), Error> {
todo!()
for key in keys {
self.delete(key.as_ref()).await?
}
Ok(())
}

async fn increment(&self, key: String, delta: i64) -> Result<i64, Error> {
todo!()
let operations = vec![Operation::incr("/value", delta).map_err(log_error)?];
let _ = self
.client
.document_client(key.clone(), &key.as_str())
.map_err(log_error)?
.patch_document(operations)
.await
.map_err(log_error)?;
let pair = self.get_pair(key.as_ref()).await?;
match pair {
Some(p) => Ok(i64::from_le_bytes(
p.value.try_into().expect("incorrect length"),
)),
None => Err(Error::Other(
"increment returned an empty value after patching, which indicates a bug"
.to_string(),
)),
}
}

async fn new_compare_and_swap(
&self,
bucket_rep: u32,
key: &str,
) -> Result<Arc<dyn spin_factor_key_value::Cas>, Error> {
todo!()
Ok(Arc::new(CompareAndSwap {
key: key.to_string(),
client: self.client.clone(),
etag: Mutex::new(None),
bucket_rep,
}))
}
}

#[async_trait]
impl Cas for CompareAndSwap {
async fn current(&self) -> Result<Option<Vec<u8>>, Error> {
let mut stream = self
.client
.query_documents(Query::new(format!(
"SELECT * FROM c WHERE c.id='{}'",
self.key
)))
.query_cross_partition(true)
.max_item_count(1)
.into_stream::<Pair>();

let current_value: Option<(Vec<u8>, String)> = match stream.next().await {
Some(r) => {
let r = r.map_err(log_error)?;
match r.results.first() {
Some((item, attr)) => {
Some((item.clone().value, attr.clone().unwrap().etag().to_string()))
}
None => None,
}
}
None => None,
};

match current_value {
Some((value, etag)) => {
*self.etag.lock().unwrap() = Some(etag);
Ok(Some(value))
}
None => Ok(None),
}
}

async fn swap(&self, value: Vec<u8>) -> Result<(), Error> {
let pk = PartitionKey::from(&self.key);
let pair = Pair {
id: self.key.clone(),
value,
};

let replace_builder = self
.client
.document_client(&self.key, &pk)
.map_err(log_error)?
.replace_document(pair);

let etag_value = self.etag.lock().unwrap().clone();
let res = match etag_value {
Some(etag) => {
replace_builder
.if_match_condition(azure_core::request_options::IfMatchCondition::Match(etag))
.await
}
None => replace_builder.await,
};

match res {
Ok(_) => Ok(()),
Err(_) => Err(Error::Other("CAS_ERROR".to_owned())),
}
}

async fn bucket_rep(&self) -> u32 {
self.bucket_rep
}

async fn key(&self) -> String {
self.key.clone()
}
}

Expand Down
1 change: 1 addition & 0 deletions crates/key-value-redis/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ spin-core = { path = "../core" }
spin-factor-key-value = { path = "../factor-key-value" }
tokio = { workspace = true }
url = { workspace = true }
log = "0.4.22"

[lints]
workspace = true
Loading

0 comments on commit d627003

Please sign in to comment.