-
Notifications
You must be signed in to change notification settings - Fork 259
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Azure Cosmos multiple stores per container #2953
Open
rylev
wants to merge
2
commits into
main
Choose a base branch
from
cosmos-multistore-container
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
2 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -13,6 +13,7 @@ use std::sync::{Arc, Mutex}; | |
|
||
pub struct KeyValueAzureCosmos { | ||
client: CollectionClient, | ||
app_id: Option<String>, | ||
} | ||
|
||
/// Azure Cosmos Key / Value runtime config literal options for authentication | ||
|
@@ -71,6 +72,7 @@ impl KeyValueAzureCosmos { | |
database: String, | ||
container: String, | ||
auth_options: KeyValueAzureCosmosAuthOptions, | ||
app_id: Option<String>, | ||
) -> Result<Self> { | ||
let token = match auth_options { | ||
KeyValueAzureCosmosAuthOptions::RuntimeConfigValues(config) => { | ||
|
@@ -86,15 +88,16 @@ impl KeyValueAzureCosmos { | |
let database_client = cosmos_client.database_client(database); | ||
let client = database_client.collection_client(container); | ||
|
||
Ok(Self { client }) | ||
Ok(Self { client, app_id }) | ||
} | ||
} | ||
|
||
#[async_trait] | ||
impl StoreManager for KeyValueAzureCosmos { | ||
async fn get(&self, _name: &str) -> Result<Arc<dyn Store>, Error> { | ||
async fn get(&self, name: &str) -> Result<Arc<dyn Store>, Error> { | ||
Ok(Arc::new(AzureCosmosStore { | ||
client: self.client.clone(), | ||
partition_key: self.app_id.as_ref().map(|i| format!("{i}/{name}")), | ||
})) | ||
} | ||
|
||
|
@@ -114,13 +117,10 @@ impl StoreManager for KeyValueAzureCosmos { | |
#[derive(Clone)] | ||
struct AzureCosmosStore { | ||
client: CollectionClient, | ||
} | ||
|
||
struct CompareAndSwap { | ||
key: String, | ||
client: CollectionClient, | ||
bucket_rep: u32, | ||
etag: Mutex<Option<String>>, | ||
/// An optional partition key to use for all operations. | ||
/// | ||
/// If the partition key is not set, the store will use `/id` as the partition key. | ||
partition_key: Option<String>, | ||
} | ||
|
||
#[async_trait] | ||
|
@@ -134,6 +134,7 @@ impl Store for AzureCosmosStore { | |
let pair = Pair { | ||
id: key.to_string(), | ||
value: value.to_vec(), | ||
partition_key: self.partition_key.clone(), | ||
}; | ||
self.client | ||
.create_document(pair) | ||
|
@@ -145,7 +146,10 @@ impl Store for AzureCosmosStore { | |
|
||
async fn delete(&self, key: &str) -> Result<(), Error> { | ||
if self.exists(key).await? { | ||
let document_client = self.client.document_client(key, &key).map_err(log_error)?; | ||
let document_client = self | ||
.client | ||
.document_client(key, &self.partition_key) | ||
.map_err(log_error)?; | ||
document_client.delete_document().await.map_err(log_error)?; | ||
} | ||
Ok(()) | ||
|
@@ -160,12 +164,7 @@ impl Store for AzureCosmosStore { | |
} | ||
|
||
async fn get_many(&self, keys: Vec<String>) -> Result<Vec<(String, Option<Vec<u8>>)>, Error> { | ||
let in_clause: String = keys | ||
.into_iter() | ||
.map(|k| format!("'{}'", k)) | ||
.collect::<Vec<String>>() | ||
.join(", "); | ||
let stmt = Query::new(format!("SELECT * FROM c WHERE c.id IN ({})", in_clause)); | ||
let stmt = Query::new(self.get_in_query(keys)); | ||
let query = self | ||
.client | ||
.query_documents(stmt) | ||
|
@@ -175,9 +174,11 @@ impl Store for AzureCosmosStore { | |
let mut stream = query.into_stream::<Pair>(); | ||
while let Some(resp) = stream.next().await { | ||
let resp = resp.map_err(log_error)?; | ||
for (pair, _) in resp.results { | ||
res.push((pair.id, Some(pair.value))); | ||
} | ||
res.extend( | ||
resp.results | ||
.into_iter() | ||
.map(|(pair, _)| (pair.id, Some(pair.value))), | ||
); | ||
} | ||
Ok(res) | ||
} | ||
|
@@ -200,7 +201,7 @@ impl Store for AzureCosmosStore { | |
let operations = vec![Operation::incr("/value", delta).map_err(log_error)?]; | ||
let _ = self | ||
.client | ||
.document_client(key.clone(), &key.as_str()) | ||
.document_client(key.clone(), &self.partition_key) | ||
.map_err(log_error)? | ||
.patch_document(operations) | ||
.await | ||
|
@@ -227,21 +228,43 @@ impl Store for AzureCosmosStore { | |
client: self.client.clone(), | ||
etag: Mutex::new(None), | ||
bucket_rep, | ||
partition_key: self.partition_key.clone(), | ||
})) | ||
} | ||
} | ||
|
||
struct CompareAndSwap { | ||
key: String, | ||
client: CollectionClient, | ||
bucket_rep: u32, | ||
etag: Mutex<Option<String>>, | ||
partition_key: Option<String>, | ||
} | ||
|
||
impl CompareAndSwap { | ||
fn get_query(&self) -> String { | ||
let mut query = format!("SELECT * FROM c WHERE c.id='{}'", self.key); | ||
self.append_partition_key(&mut query); | ||
query | ||
} | ||
|
||
fn append_partition_key(&self, query: &mut String) { | ||
if let Some(pk) = &self.partition_key { | ||
query.push_str(" AND c.partition_key='"); | ||
query.push_str(pk); | ||
query.push('\'') | ||
} | ||
} | ||
} | ||
Comment on lines
+245
to
+258
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why are these redefined here? |
||
|
||
#[async_trait] | ||
impl Cas for CompareAndSwap { | ||
/// `current` will fetch the current value for the key and store the etag for the record. The | ||
/// etag will be used to perform and optimistic concurrency update using the `if-match` header. | ||
async fn current(&self) -> Result<Option<Vec<u8>>, Error> { | ||
let mut stream = self | ||
.client | ||
.query_documents(Query::new(format!( | ||
"SELECT * FROM c WHERE c.id='{}'", | ||
self.key | ||
))) | ||
.query_documents(Query::new(self.get_query())) | ||
.query_cross_partition(true) | ||
.max_item_count(1) | ||
.into_stream::<Pair>(); | ||
|
@@ -272,10 +295,15 @@ impl Cas for CompareAndSwap { | |
/// `swap` updates the value for the key using the etag saved in the `current` function for | ||
/// optimistic concurrency. | ||
async fn swap(&self, value: Vec<u8>) -> Result<(), SwapError> { | ||
let pk = PartitionKey::from(&self.key); | ||
let pk = PartitionKey::from( | ||
self.partition_key | ||
.as_deref() | ||
.unwrap_or_else(|| self.key.as_str()), | ||
); | ||
let pair = Pair { | ||
id: self.key.clone(), | ||
value, | ||
partition_key: self.partition_key.clone(), | ||
}; | ||
|
||
let doc_client = self | ||
|
@@ -318,55 +346,85 @@ impl AzureCosmosStore { | |
async fn get_pair(&self, key: &str) -> Result<Option<Pair>, Error> { | ||
let query = self | ||
.client | ||
.query_documents(Query::new(format!("SELECT * FROM c WHERE c.id='{}'", key))) | ||
.query_documents(Query::new(self.get_query(key))) | ||
.query_cross_partition(true) | ||
.max_item_count(1); | ||
|
||
// There can be no duplicated keys, so we create the stream and only take the first result. | ||
let mut stream = query.into_stream::<Pair>(); | ||
let res = stream.next().await; | ||
match res { | ||
Some(r) => { | ||
let r = r.map_err(log_error)?; | ||
match r.results.first().cloned() { | ||
Some((p, _)) => Ok(Some(p)), | ||
None => Ok(None), | ||
} | ||
} | ||
None => Ok(None), | ||
} | ||
let Some(res) = stream.next().await else { | ||
return Ok(None); | ||
}; | ||
Ok(res | ||
.map_err(log_error)? | ||
.results | ||
.first() | ||
.map(|(p, _)| p.clone())) | ||
} | ||
|
||
async fn get_keys(&self) -> Result<Vec<String>, Error> { | ||
let query = self | ||
.client | ||
.query_documents(Query::new("SELECT * FROM c".to_string())) | ||
.query_documents(Query::new(self.get_keys_query())) | ||
.query_cross_partition(true); | ||
let mut res = Vec::new(); | ||
|
||
let mut stream = query.into_stream::<Pair>(); | ||
while let Some(resp) = stream.next().await { | ||
let resp = resp.map_err(log_error)?; | ||
for (pair, _) in resp.results { | ||
res.push(pair.id); | ||
} | ||
res.extend(resp.results.into_iter().map(|(pair, _)| pair.id)); | ||
} | ||
|
||
Ok(res) | ||
} | ||
|
||
fn get_query(&self, key: &str) -> String { | ||
let mut query = format!("SELECT * FROM c WHERE c.id='{}'", key); | ||
self.append_partition_key(&mut query); | ||
query | ||
} | ||
|
||
fn get_keys_query(&self) -> String { | ||
let mut query = "SELECT * FROM c".to_owned(); | ||
self.append_partition_key(&mut query); | ||
query | ||
} | ||
|
||
fn get_in_query(&self, keys: Vec<String>) -> String { | ||
let in_clause: String = keys | ||
.into_iter() | ||
.map(|k| format!("'{}'", k)) | ||
.collect::<Vec<String>>() | ||
.join(", "); | ||
|
||
let mut query = format!("SELECT * FROM c WHERE c.id IN ({})", in_clause); | ||
self.append_partition_key(&mut query); | ||
query | ||
} | ||
|
||
fn append_partition_key(&self, query: &mut String) { | ||
if let Some(pk) = &self.partition_key { | ||
query.push_str(" AND c.partition_key='"); | ||
query.push_str(pk); | ||
query.push('\'') | ||
} | ||
} | ||
} | ||
|
||
#[derive(Serialize, Deserialize, Clone, Debug)] | ||
pub struct Pair { | ||
// In Azure CosmosDB, the default partition key is "/id", and this implementation assumes that partition ID is not changed. | ||
pub id: String, | ||
pub value: Vec<u8>, | ||
#[serde(skip_serializing_if = "Option::is_none")] | ||
pub partition_key: Option<String>, | ||
} | ||
|
||
impl CosmosEntity for Pair { | ||
type Entity = String; | ||
|
||
fn partition_key(&self) -> Self::Entity { | ||
self.id.clone() | ||
self.partition_key | ||
.clone() | ||
.unwrap_or_else(|| self.id.clone()) | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If
app_id
is None will this incorrectly leave a leading forward slash in the partition key?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see below that it seems like the leading slash is okay, but I'll leave the question here to double check.