From 8a4383608f7c3d7d1dc7a92c7aeace4e99d43973 Mon Sep 17 00:00:00 2001 From: Daniel Norberg Date: Sun, 22 Dec 2024 01:21:54 +0900 Subject: [PATCH] spanner: set google-cloud-resource-prefix metadata --- spanner/src/apiv1/mod.rs | 3 +- spanner/src/apiv1/spanner_client.rs | 186 ++++++++++++++-------------- spanner/src/session.rs | 18 ++- 3 files changed, 106 insertions(+), 101 deletions(-) diff --git a/spanner/src/apiv1/mod.rs b/spanner/src/apiv1/mod.rs index c1d81958..66c666d8 100644 --- a/spanner/src/apiv1/mod.rs +++ b/spanner/src/apiv1/mod.rs @@ -19,6 +19,7 @@ mod tests { use crate::apiv1::conn_pool::ConnectionManager; use crate::apiv1::spanner_client::Client; + use crate::session::client_metadata; const DATABASE: &str = "projects/local-project/instances/test-instance/databases/local-database"; @@ -31,7 +32,7 @@ mod tests { ) .await .unwrap(); - cm.conn() + cm.conn().with_metadata(client_metadata(&DATABASE)) } async fn create_session(client: &mut Client) -> Session { diff --git a/spanner/src/apiv1/spanner_client.rs b/spanner/src/apiv1/spanner_client.rs index 9d4952f9..face9a36 100644 --- a/spanner/src/apiv1/spanner_client.rs +++ b/spanner/src/apiv1/spanner_client.rs @@ -1,9 +1,10 @@ use std::time::Duration; use google_cloud_gax::conn::Channel; -use google_cloud_gax::create_request; +use google_cloud_gax::grpc::metadata::{KeyAndValueRef, MetadataMap}; use google_cloud_gax::grpc::{Code, Response, Status, Streaming}; use google_cloud_gax::retry::{invoke_fn, RetrySetting}; +use google_cloud_gax::{create_request, grpc}; use google_cloud_googleapis::spanner::v1::spanner_client::SpannerClient; use google_cloud_googleapis::spanner::v1::{ BatchCreateSessionsRequest, BatchCreateSessionsResponse, BeginTransactionRequest, CommitRequest, CommitResponse, @@ -43,6 +44,7 @@ fn default_setting() -> RetrySetting { #[derive(Clone)] pub struct Client { inner: SpannerClient, + metadata: MetadataMap, } impl Client { @@ -51,6 +53,15 @@ impl Client { // https://github.com/googleapis/google-cloud-go/blob/65a9ba55ed3777f520bd881d891e8917323549a5/spanner/apiv1/spanner_client.go#L73 Client { inner: inner.max_decoding_message_size(i32::MAX as usize), + metadata: Default::default(), + } + } + + /// set metadata for spanner client + pub(crate) fn with_metadata(self, metadata: MetadataMap) -> Client { + Client { + inner: self.inner, + metadata, } } @@ -83,14 +94,11 @@ impl Client { let database = &req.database; invoke_fn( Some(setting), - |spanner_client| async { - let request = create_request(format!("database={database}"), req.clone()); - spanner_client - .create_session(request) - .await - .map_err(|e| (e, spanner_client)) + |this| async { + let request = this.create_request(format!("database={database}"), req.clone()); + this.inner.create_session(request).await.map_err(|e| (e, this)) }, - &mut self.inner, + self, ) .await } @@ -109,14 +117,11 @@ impl Client { let database = &req.database; invoke_fn( Some(setting), - |spanner_client| async { - let request = create_request(format!("database={database}"), req.clone()); - spanner_client - .batch_create_sessions(request) - .await - .map_err(|e| (e, spanner_client)) + |this| async { + let request = this.create_request(format!("database={database}"), req.clone()); + this.inner.batch_create_sessions(request).await.map_err(|e| (e, this)) }, - &mut self.inner, + self, ) .await } @@ -133,14 +138,11 @@ impl Client { let name = &req.name; invoke_fn( Some(setting), - |spanner_client| async { - let request = create_request(format!("name={name}"), req.clone()); - spanner_client - .get_session(request) - .await - .map_err(|e| (e, spanner_client)) + |this| async { + let request = this.create_request(format!("name={name}"), req.clone()); + this.inner.get_session(request).await.map_err(|e| (e, this)) }, - &mut self.inner, + self, ) .await } @@ -156,14 +158,11 @@ impl Client { let database = &req.database; invoke_fn( Some(setting), - |spanner_client| async { - let request = create_request(format!("database={database}"), req.clone()); - spanner_client - .list_sessions(request) - .await - .map_err(|e| (e, spanner_client)) + |this| async { + let request = this.create_request(format!("database={database}"), req.clone()); + this.inner.list_sessions(request).await.map_err(|e| (e, this)) }, - &mut self.inner, + self, ) .await } @@ -181,14 +180,11 @@ impl Client { let name = &req.name; invoke_fn( Some(setting), - |spanner_client| async { - let request = create_request(format!("name={name}"), req.clone()); - spanner_client - .delete_session(request) - .await - .map_err(|e| (e, spanner_client)) + |this| async { + let request = this.create_request(format!("name={name}"), req.clone()); + this.inner.delete_session(request).await.map_err(|e| (e, this)) }, - &mut self.inner, + self, ) .await } @@ -214,14 +210,11 @@ impl Client { let session = &req.session; invoke_fn( Some(setting), - |spanner_client| async { - let request = create_request(format!("session={session}"), req.clone()); - spanner_client - .execute_sql(request) - .await - .map_err(|e| (e, spanner_client)) + |this| async { + let request = this.create_request(format!("session={session}"), req.clone()); + this.inner.execute_sql(request).await.map_err(|e| (e, this)) }, - &mut self.inner, + self, ) .await } @@ -241,14 +234,11 @@ impl Client { let session = &req.session; invoke_fn( Some(setting), - |spanner_client| async { - let request = create_request(format!("session={session}"), req.clone()); - spanner_client - .execute_streaming_sql(request) - .await - .map_err(|e| (e, spanner_client)) + |this| async { + let request = this.create_request(format!("session={session}"), req.clone()); + this.inner.execute_streaming_sql(request).await.map_err(|e| (e, this)) }, - &mut self.inner, + self, ) .await } @@ -274,9 +264,9 @@ impl Client { let session = &req.session; invoke_fn( Some(setting), - |spanner_client| async { - let request = create_request(format!("session={session}"), req.clone()); - let result = spanner_client.execute_batch_dml(request).await; + |this| async { + let request = this.create_request(format!("session={session}"), req.clone()); + let result = this.inner.execute_batch_dml(request).await; match result { Ok(response) => match response.get_ref().status.as_ref() { Some(s) => { @@ -284,15 +274,15 @@ impl Client { if code == Code::Ok { Ok(response) } else { - Err((Status::new(code, s.message.to_string()), spanner_client)) + Err((Status::new(code, s.message.to_string()), this)) } } None => Ok(response), }, - Err(err) => Err((err, spanner_client)), + Err(err) => Err((err, this)), } }, - &mut self.inner, + self, ) .await } @@ -316,11 +306,11 @@ impl Client { let session = &req.session; invoke_fn( Some(setting), - |spanner_client| async { - let request = create_request(format!("session={session}"), req.clone()); - spanner_client.read(request).await.map_err(|e| (e, spanner_client)) + |this| async { + let request = this.create_request(format!("session={session}"), req.clone()); + this.inner.read(request).await.map_err(|e| (e, this)) }, - &mut self.inner, + self, ) .await } @@ -340,14 +330,11 @@ impl Client { let session = &req.session; invoke_fn( Some(setting), - |spanner_client| async { - let request = create_request(format!("session={session}"), req.clone()); - spanner_client - .streaming_read(request) - .await - .map_err(|e| (e, spanner_client)) + |this| async { + let request = this.create_request(format!("session={session}"), req.clone()); + this.inner.streaming_read(request).await.map_err(|e| (e, this)) }, - &mut self.inner, + self, ) .await } @@ -366,14 +353,11 @@ impl Client { let session = &req.session; invoke_fn( Some(setting), - |spanner_client| async { - let request = create_request(format!("session={session}"), req.clone()); - spanner_client - .begin_transaction(request) - .await - .map_err(|e| (e, spanner_client)) + |this| async { + let request = this.create_request(format!("session={session}"), req.clone()); + this.inner.begin_transaction(request).await.map_err(|e| (e, this)) }, - &mut self.inner, + self, ) .await } @@ -402,11 +386,11 @@ impl Client { let session = &req.session; invoke_fn( Some(setting), - |spanner_client| async { - let request = create_request(format!("session={session}"), req.clone()); - spanner_client.commit(request).await.map_err(|e| (e, spanner_client)) + |this| async { + let request = this.create_request(format!("session={session}"), req.clone()); + this.inner.commit(request).await.map_err(|e| (e, this)) }, - &mut self.inner, + self, ) .await } @@ -429,11 +413,11 @@ impl Client { let session = &req.session; invoke_fn( Some(setting), - |spanner_client| async { - let request = create_request(format!("session={session}"), req.clone()); - spanner_client.rollback(request).await.map_err(|e| (e, spanner_client)) + |this| async { + let request = this.create_request(format!("session={session}"), req.clone()); + this.inner.rollback(request).await.map_err(|e| (e, this)) }, - &mut self.inner, + self, ) .await } @@ -459,14 +443,11 @@ impl Client { let session = &req.session; invoke_fn( Some(setting), - |spanner_client| async { - let request = create_request(format!("session={session}"), req.clone()); - spanner_client - .partition_query(request) - .await - .map_err(|e| (e, spanner_client)) + |this| async { + let request = this.create_request(format!("session={session}"), req.clone()); + this.inner.partition_query(request).await.map_err(|e| (e, this)) }, - &mut self.inner, + self, ) .await } @@ -494,15 +475,28 @@ impl Client { let session = &req.session; invoke_fn( Some(setting), - |spanner_client| async { - let request = create_request(format!("session={session}"), req.clone()); - spanner_client - .partition_read(request) - .await - .map_err(|e| (e, spanner_client)) + |this| async { + let request = this.create_request(format!("session={session}"), req.clone()); + this.inner.partition_read(request).await.map_err(|e| (e, this)) }, - &mut self.inner, + self, ) .await } + + fn create_request(&self, param_string: String, into_request: impl grpc::IntoRequest) -> grpc::Request { + let mut req = create_request(param_string, into_request); + let target = req.metadata_mut(); + for entry in self.metadata.iter() { + match entry { + KeyAndValueRef::Ascii(k, v) => { + target.append(k, v.clone()); + } + KeyAndValueRef::Binary(k, v) => { + target.append_bin(k, v.clone()); + } + } + } + req + } } diff --git a/spanner/src/session.rs b/spanner/src/session.rs index 9f3a970e..79e674af 100644 --- a/spanner/src/session.rs +++ b/spanner/src/session.rs @@ -13,6 +13,7 @@ use tokio::task::JoinHandle; use tokio::time::{sleep, timeout}; use tokio_util::sync::CancellationToken; +use google_cloud_gax::grpc::metadata::MetadataMap; use google_cloud_gax::grpc::{Code, Status}; use google_cloud_gax::retry::TryAs; use google_cloud_googleapis::spanner::v1::{BatchCreateSessionsRequest, DeleteSessionRequest, Session}; @@ -236,7 +237,7 @@ impl SessionPool { let mut sessions = Vec::::new(); for _ in 0..channel_num { - let next_client = conn_pool.conn(); + let next_client = conn_pool.conn().with_metadata(client_metadata(&database)); let new_sessions = batch_create_sessions(next_client, database.as_str(), creation_count_per_channel).await?; sessions.extend(new_sessions); @@ -500,7 +501,8 @@ impl SessionManager { }, _ = cancel.cancelled() => break }; - let result = batch_create_sessions(conn_pool.conn(), database.as_str(), session_count).await; + let client = conn_pool.conn().with_metadata(client_metadata(&database)); + let result = batch_create_sessions(client, database.as_str(), session_count).await; session_pool.inner.write().replenish(session_count, result); } tracing::trace!("shutdown session creation task."); @@ -630,6 +632,12 @@ async fn batch_create_session( .collect::>()) } +pub(crate) fn client_metadata(database: &str) -> MetadataMap { + let mut metadata = MetadataMap::new(); + metadata.insert("google-cloud-resource-prefix", database.parse().unwrap()); + metadata +} + #[cfg(test)] mod tests { use std::sync::atomic::{AtomicI64, Ordering}; @@ -645,7 +653,9 @@ mod tests { use google_cloud_googleapis::spanner::v1::ExecuteSqlRequest; use crate::apiv1::conn_pool::ConnectionManager; - use crate::session::{batch_create_sessions, health_check, SessionConfig, SessionError, SessionManager}; + use crate::session::{ + batch_create_sessions, client_metadata, health_check, SessionConfig, SessionError, SessionManager, + }; pub const DATABASE: &str = "projects/local-project/instances/test-instance/databases/local-database"; @@ -1077,7 +1087,7 @@ mod tests { ) .await .unwrap(); - let client = cm.conn(); + let client = cm.conn().with_metadata(client_metadata(DATABASE)); let session_count = 125; let result = batch_create_sessions(client.clone(), DATABASE, session_count).await; match result {