Skip to content

Commit

Permalink
spanner: set google-cloud-resource-prefix metadata
Browse files Browse the repository at this point in the history
  • Loading branch information
danielnorberg committed Dec 21, 2024
1 parent 2f9219b commit 8a43836
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 101 deletions.
3 changes: 2 additions & 1 deletion spanner/src/apiv1/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ mod tests {

use crate::apiv1::conn_pool::ConnectionManager;
use crate::apiv1::spanner_client::Client;
use crate::session::client_metadata;

const DATABASE: &str = "projects/local-project/instances/test-instance/databases/local-database";

Expand All @@ -31,7 +32,7 @@ mod tests {
)
.await
.unwrap();
cm.conn()
cm.conn().with_metadata(client_metadata(&DATABASE))
}

async fn create_session(client: &mut Client) -> Session {
Expand Down
186 changes: 90 additions & 96 deletions spanner/src/apiv1/spanner_client.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use std::time::Duration;

use google_cloud_gax::conn::Channel;
use google_cloud_gax::create_request;
use google_cloud_gax::grpc::metadata::{KeyAndValueRef, MetadataMap};
use google_cloud_gax::grpc::{Code, Response, Status, Streaming};
use google_cloud_gax::retry::{invoke_fn, RetrySetting};
use google_cloud_gax::{create_request, grpc};
use google_cloud_googleapis::spanner::v1::spanner_client::SpannerClient;
use google_cloud_googleapis::spanner::v1::{
BatchCreateSessionsRequest, BatchCreateSessionsResponse, BeginTransactionRequest, CommitRequest, CommitResponse,
Expand Down Expand Up @@ -43,6 +44,7 @@ fn default_setting() -> RetrySetting {
#[derive(Clone)]
pub struct Client {
inner: SpannerClient<Channel>,
metadata: MetadataMap,
}

impl Client {
Expand All @@ -51,6 +53,15 @@ impl Client {
// https://github.com/googleapis/google-cloud-go/blob/65a9ba55ed3777f520bd881d891e8917323549a5/spanner/apiv1/spanner_client.go#L73
Client {
inner: inner.max_decoding_message_size(i32::MAX as usize),
metadata: Default::default(),
}
}

/// set metadata for spanner client
pub(crate) fn with_metadata(self, metadata: MetadataMap) -> Client {
Client {
inner: self.inner,
metadata,
}
}

Expand Down Expand Up @@ -83,14 +94,11 @@ impl Client {
let database = &req.database;
invoke_fn(
Some(setting),
|spanner_client| async {
let request = create_request(format!("database={database}"), req.clone());
spanner_client
.create_session(request)
.await
.map_err(|e| (e, spanner_client))
|this| async {
let request = this.create_request(format!("database={database}"), req.clone());
this.inner.create_session(request).await.map_err(|e| (e, this))
},
&mut self.inner,
self,
)
.await
}
Expand All @@ -109,14 +117,11 @@ impl Client {
let database = &req.database;
invoke_fn(
Some(setting),
|spanner_client| async {
let request = create_request(format!("database={database}"), req.clone());
spanner_client
.batch_create_sessions(request)
.await
.map_err(|e| (e, spanner_client))
|this| async {
let request = this.create_request(format!("database={database}"), req.clone());
this.inner.batch_create_sessions(request).await.map_err(|e| (e, this))
},
&mut self.inner,
self,
)
.await
}
Expand All @@ -133,14 +138,11 @@ impl Client {
let name = &req.name;
invoke_fn(
Some(setting),
|spanner_client| async {
let request = create_request(format!("name={name}"), req.clone());
spanner_client
.get_session(request)
.await
.map_err(|e| (e, spanner_client))
|this| async {
let request = this.create_request(format!("name={name}"), req.clone());
this.inner.get_session(request).await.map_err(|e| (e, this))
},
&mut self.inner,
self,
)
.await
}
Expand All @@ -156,14 +158,11 @@ impl Client {
let database = &req.database;
invoke_fn(
Some(setting),
|spanner_client| async {
let request = create_request(format!("database={database}"), req.clone());
spanner_client
.list_sessions(request)
.await
.map_err(|e| (e, spanner_client))
|this| async {
let request = this.create_request(format!("database={database}"), req.clone());
this.inner.list_sessions(request).await.map_err(|e| (e, this))
},
&mut self.inner,
self,
)
.await
}
Expand All @@ -181,14 +180,11 @@ impl Client {
let name = &req.name;
invoke_fn(
Some(setting),
|spanner_client| async {
let request = create_request(format!("name={name}"), req.clone());
spanner_client
.delete_session(request)
.await
.map_err(|e| (e, spanner_client))
|this| async {
let request = this.create_request(format!("name={name}"), req.clone());
this.inner.delete_session(request).await.map_err(|e| (e, this))
},
&mut self.inner,
self,
)
.await
}
Expand All @@ -214,14 +210,11 @@ impl Client {
let session = &req.session;
invoke_fn(
Some(setting),
|spanner_client| async {
let request = create_request(format!("session={session}"), req.clone());
spanner_client
.execute_sql(request)
.await
.map_err(|e| (e, spanner_client))
|this| async {
let request = this.create_request(format!("session={session}"), req.clone());
this.inner.execute_sql(request).await.map_err(|e| (e, this))
},
&mut self.inner,
self,
)
.await
}
Expand All @@ -241,14 +234,11 @@ impl Client {
let session = &req.session;
invoke_fn(
Some(setting),
|spanner_client| async {
let request = create_request(format!("session={session}"), req.clone());
spanner_client
.execute_streaming_sql(request)
.await
.map_err(|e| (e, spanner_client))
|this| async {
let request = this.create_request(format!("session={session}"), req.clone());
this.inner.execute_streaming_sql(request).await.map_err(|e| (e, this))
},
&mut self.inner,
self,
)
.await
}
Expand All @@ -274,25 +264,25 @@ impl Client {
let session = &req.session;
invoke_fn(
Some(setting),
|spanner_client| async {
let request = create_request(format!("session={session}"), req.clone());
let result = spanner_client.execute_batch_dml(request).await;
|this| async {
let request = this.create_request(format!("session={session}"), req.clone());
let result = this.inner.execute_batch_dml(request).await;
match result {
Ok(response) => match response.get_ref().status.as_ref() {
Some(s) => {
let code = Code::from(s.code);
if code == Code::Ok {
Ok(response)
} else {
Err((Status::new(code, s.message.to_string()), spanner_client))
Err((Status::new(code, s.message.to_string()), this))
}
}
None => Ok(response),
},
Err(err) => Err((err, spanner_client)),
Err(err) => Err((err, this)),
}
},
&mut self.inner,
self,
)
.await
}
Expand All @@ -316,11 +306,11 @@ impl Client {
let session = &req.session;
invoke_fn(
Some(setting),
|spanner_client| async {
let request = create_request(format!("session={session}"), req.clone());
spanner_client.read(request).await.map_err(|e| (e, spanner_client))
|this| async {
let request = this.create_request(format!("session={session}"), req.clone());
this.inner.read(request).await.map_err(|e| (e, this))
},
&mut self.inner,
self,
)
.await
}
Expand All @@ -340,14 +330,11 @@ impl Client {
let session = &req.session;
invoke_fn(
Some(setting),
|spanner_client| async {
let request = create_request(format!("session={session}"), req.clone());
spanner_client
.streaming_read(request)
.await
.map_err(|e| (e, spanner_client))
|this| async {
let request = this.create_request(format!("session={session}"), req.clone());
this.inner.streaming_read(request).await.map_err(|e| (e, this))
},
&mut self.inner,
self,
)
.await
}
Expand All @@ -366,14 +353,11 @@ impl Client {
let session = &req.session;
invoke_fn(
Some(setting),
|spanner_client| async {
let request = create_request(format!("session={session}"), req.clone());
spanner_client
.begin_transaction(request)
.await
.map_err(|e| (e, spanner_client))
|this| async {
let request = this.create_request(format!("session={session}"), req.clone());
this.inner.begin_transaction(request).await.map_err(|e| (e, this))
},
&mut self.inner,
self,
)
.await
}
Expand Down Expand Up @@ -402,11 +386,11 @@ impl Client {
let session = &req.session;
invoke_fn(
Some(setting),
|spanner_client| async {
let request = create_request(format!("session={session}"), req.clone());
spanner_client.commit(request).await.map_err(|e| (e, spanner_client))
|this| async {
let request = this.create_request(format!("session={session}"), req.clone());
this.inner.commit(request).await.map_err(|e| (e, this))
},
&mut self.inner,
self,
)
.await
}
Expand All @@ -429,11 +413,11 @@ impl Client {
let session = &req.session;
invoke_fn(
Some(setting),
|spanner_client| async {
let request = create_request(format!("session={session}"), req.clone());
spanner_client.rollback(request).await.map_err(|e| (e, spanner_client))
|this| async {
let request = this.create_request(format!("session={session}"), req.clone());
this.inner.rollback(request).await.map_err(|e| (e, this))
},
&mut self.inner,
self,
)
.await
}
Expand All @@ -459,14 +443,11 @@ impl Client {
let session = &req.session;
invoke_fn(
Some(setting),
|spanner_client| async {
let request = create_request(format!("session={session}"), req.clone());
spanner_client
.partition_query(request)
.await
.map_err(|e| (e, spanner_client))
|this| async {
let request = this.create_request(format!("session={session}"), req.clone());
this.inner.partition_query(request).await.map_err(|e| (e, this))
},
&mut self.inner,
self,
)
.await
}
Expand Down Expand Up @@ -494,15 +475,28 @@ impl Client {
let session = &req.session;
invoke_fn(
Some(setting),
|spanner_client| async {
let request = create_request(format!("session={session}"), req.clone());
spanner_client
.partition_read(request)
.await
.map_err(|e| (e, spanner_client))
|this| async {
let request = this.create_request(format!("session={session}"), req.clone());
this.inner.partition_read(request).await.map_err(|e| (e, this))
},
&mut self.inner,
self,
)
.await
}

fn create_request<T>(&self, param_string: String, into_request: impl grpc::IntoRequest<T>) -> grpc::Request<T> {
let mut req = create_request(param_string, into_request);
let target = req.metadata_mut();
for entry in self.metadata.iter() {
match entry {
KeyAndValueRef::Ascii(k, v) => {
target.append(k, v.clone());
}
KeyAndValueRef::Binary(k, v) => {
target.append_bin(k, v.clone());
}
}
}
req
}
}
Loading

0 comments on commit 8a43836

Please sign in to comment.