From a22637cc924e6db04051e46e662d6f86bd78c824 Mon Sep 17 00:00:00 2001 From: Julio Gonzalez <107922352+hoolioh@users.noreply.github.com> Date: Mon, 11 Nov 2024 11:33:59 +0100 Subject: [PATCH] Refactor proxy handling. --- data-pipeline/src/trace_exporter.rs | 2 +- sidecar/src/service/sidecar_server.rs | 2 +- trace-mini-agent/src/trace_flusher.rs | 12 ++- trace-mini-agent/src/trace_processor.rs | 8 +- trace-utils/Cargo.toml | 1 + trace-utils/src/send_data/mod.rs | 97 ++++++++----------- .../src/test_utils/datadog_test_agent.rs | 1 - trace-utils/src/test_utils/mod.rs | 1 - trace-utils/src/trace_utils.rs | 1 - trace-utils/tests/test_send_data.rs | 1 - 10 files changed, 51 insertions(+), 75 deletions(-) diff --git a/data-pipeline/src/trace_exporter.rs b/data-pipeline/src/trace_exporter.rs index ff535a1b6..9f3983887 100644 --- a/data-pipeline/src/trace_exporter.rs +++ b/data-pipeline/src/trace_exporter.rs @@ -635,7 +635,7 @@ impl TraceExporter { url: self.output_format.add_path(&self.endpoint.url), ..self.endpoint.clone() }; - let send_data = SendData::new(size, tracer_payload, header_tags, &endpoint, None); + let send_data = SendData::new(size, tracer_payload, header_tags, &endpoint); self.runtime.block_on(async { match send_data.send().await.last_result { Ok(response) => match response.into_body().collect().await { diff --git a/sidecar/src/service/sidecar_server.rs b/sidecar/src/service/sidecar_server.rs index b6e67bf56..cd8c33d3c 100644 --- a/sidecar/src/service/sidecar_server.rs +++ b/sidecar/src/service/sidecar_server.rs @@ -280,7 +280,7 @@ impl SidecarServer { payload_params.measure_size(&mut size); match payload_params.try_into() { Ok(payload) => { - let data = SendData::new(size, payload, headers, target, None); + let data = SendData::new(size, payload, headers, target); self.trace_flusher.enqueue(data); } Err(e) => { diff --git a/trace-mini-agent/src/trace_flusher.rs b/trace-mini-agent/src/trace_flusher.rs index ba2809130..7987e7646 100644 --- a/trace-mini-agent/src/trace_flusher.rs +++ b/trace-mini-agent/src/trace_flusher.rs @@ -17,7 +17,7 @@ pub trait TraceFlusher { /// implementing flushing logic that calls flush_traces. async fn start_trace_flusher(&self, config: Arc, mut rx: Receiver); /// Flushes traces to the Datadog trace intake. - async fn flush_traces(&self, traces: Vec); + async fn flush_traces(&self, traces: Vec, config: Arc); } #[derive(Clone)] @@ -43,20 +43,24 @@ impl TraceFlusher for ServerlessTraceFlusher { let mut buffer = buffer_consumer.lock().await; if !buffer.is_empty() { - self.flush_traces(buffer.to_vec()).await; + self.flush_traces(buffer.to_vec(), config.clone()).await; buffer.clear(); } } } - async fn flush_traces(&self, traces: Vec) { + async fn flush_traces(&self, traces: Vec, config: Arc) { if traces.is_empty() { return; } info!("Flushing {} traces", traces.len()); for traces in trace_utils::coalesce_send_data(traces) { - match traces.send().await.last_result { + match traces + .send_proxy(config.proxy_url.as_deref()) + .await + .last_result + { Ok(_) => info!("Successfully flushed traces"), Err(e) => { error!("Error sending trace: {e:?}") diff --git a/trace-mini-agent/src/trace_processor.rs b/trace-mini-agent/src/trace_processor.rs index 389719a13..2ff861188 100644 --- a/trace-mini-agent/src/trace_processor.rs +++ b/trace-mini-agent/src/trace_processor.rs @@ -98,13 +98,7 @@ impl TraceProcessor for ServerlessTraceProcessor { true, // In mini agent, we always send agentless ); - let send_data = SendData::new( - body_size, - payload, - tracer_header_tags, - &config.trace_intake, - config.proxy_url.clone(), - ); + let send_data = SendData::new(body_size, payload, tracer_header_tags, &config.trace_intake); // send trace payload to our trace flusher match tx.send(send_data).await { diff --git a/trace-utils/Cargo.toml b/trace-utils/Cargo.toml index 78aa335c1..2f5415a49 100644 --- a/trace-utils/Cargo.toml +++ b/trace-utils/Cargo.toml @@ -52,5 +52,6 @@ tokio = { version = "1", features = ["macros", "rt-multi-thread"] } datadog-trace-utils = { path = ".", features = ["test-utils"] } [features] +default = ["proxy"] test-utils = ["httpmock", "testcontainers", "cargo_metadata", "cargo-platform"] proxy = ["hyper-proxy"] diff --git a/trace-utils/src/send_data/mod.rs b/trace-utils/src/send_data/mod.rs index d6de84bb9..2694f7007 100644 --- a/trace-utils/src/send_data/mod.rs +++ b/trace-utils/src/send_data/mod.rs @@ -5,8 +5,6 @@ pub mod retry_strategy; pub mod send_data_result; pub use crate::send_data::retry_strategy::{RetryBackoffType, RetryStrategy}; -#[cfg(feature = "proxy")] -use ddcommon::connector::Connector; use crate::trace_utils::{SendDataResult, TracerHeaderTags}; use crate::tracer_payload::TracerPayloadCollection; @@ -18,7 +16,6 @@ use futures::stream::FuturesUnordered; use futures::StreamExt; use hyper::header::HeaderValue; use hyper::{Body, Client, HeaderMap, Method, Response}; -#[cfg(feature = "proxy")] use hyper_proxy::{Intercept, Proxy, ProxyConnector}; use std::collections::HashMap; use std::time::Duration; @@ -70,13 +67,6 @@ pub(crate) enum RequestResult { BuildError((Attempts, ChunksDropped)), } -#[derive(Debug, Clone)] -#[cfg(feature = "proxy")] -pub enum ClientWrapper { - Direct(Client), - Proxy(Client>), -} - #[derive(Debug, Clone)] /// `SendData` is a structure that holds the data to be sent to a target endpoint. /// It includes the payloads to be sent, the size of the data, the target endpoint, @@ -102,7 +92,7 @@ pub enum ClientWrapper { /// let tracer_header_tags = TracerHeaderTags::default(); // Replace with actual header tags /// let target = Endpoint::default(); // Replace with actual endpoint /// -/// let mut send_data = SendData::new(size, tracer_payload, tracer_header_tags, &target, None); +/// let mut send_data = SendData::new(size, tracer_payload, tracer_header_tags, &target); /// /// // Set a custom retry strategy /// let retry_strategy = RetryStrategy::new(3, 10, RetryBackoffType::Exponential, Some(5)); @@ -119,22 +109,6 @@ pub struct SendData { target: Endpoint, headers: HashMap<&'static str, String>, retry_strategy: RetryStrategy, - #[cfg(feature = "proxy")] - client: ClientWrapper, -} - -#[cfg(feature = "proxy")] -pub fn build_client(http_proxy: Option) -> ClientWrapper { - let builder = Client::builder(); - - if let Some(proxy) = http_proxy { - let proxy = Proxy::new(Intercept::Https, proxy.parse().unwrap()); - let proxy_connector = - ProxyConnector::from_proxy(connector::Connector::default(), proxy).unwrap(); - ClientWrapper::Proxy(builder.build(proxy_connector)) - } else { - ClientWrapper::Direct(builder.build(connector::Connector::default())) - } } impl SendData { @@ -156,7 +130,6 @@ impl SendData { tracer_payload: TracerPayloadCollection, tracer_header_tags: TracerHeaderTags, target: &Endpoint, - http_proxy: Option, ) -> SendData { let mut headers = if let Some(api_key) = &target.api_key { HashMap::from([(DD_API_KEY, api_key.as_ref().to_string())]) @@ -167,17 +140,12 @@ impl SendData { headers.insert("x-datadog-test-session-token", token.to_string()); } - #[cfg(feature = "proxy")] - let client = build_client(http_proxy); - SendData { tracer_payloads: tracer_payload, size, target: target.clone(), headers, retry_strategy: RetryStrategy::default(), - #[cfg(feature = "proxy")] - client, } } @@ -232,10 +200,23 @@ impl SendData { /// /// A `SendDataResult` instance containing the result of the operation. pub async fn send(&self) -> SendDataResult { + self.send_internal(None).await + } + + /// Sends the data to the target endpoint. + /// + /// # Returns + /// + /// A `SendDataResult` instance containing the result of the operation. + pub async fn send_proxy(&self, http_proxy: Option<&str>) -> SendDataResult { + self.send_internal(http_proxy).await + } + + async fn send_internal(&self, http_proxy: Option<&str>) -> SendDataResult { if self.use_protobuf() { - self.send_with_protobuf().await + self.send_with_protobuf(http_proxy).await } else { - self.send_with_msgpack().await + self.send_with_msgpack(http_proxy).await } } @@ -243,6 +224,7 @@ impl SendData { &self, req: HttpRequestBuilder, payload: Bytes, + http_proxy: Option<&str>, ) -> Result, RequestError> { let req = match req.body(Body::from(payload)) { Ok(req) => req, @@ -251,15 +233,16 @@ impl SendData { match tokio::time::timeout( Duration::from_millis(self.target.timeout_ms), - #[cfg(feature = "proxy")] - match &self.client { - ClientWrapper::Direct(client) => client.request(req), - ClientWrapper::Proxy(client) => client.request(req), + if let Some(proxy) = http_proxy { + let proxy = Proxy::new(Intercept::Https, proxy.parse().unwrap()); + let proxy_connector = + ProxyConnector::from_proxy(connector::Connector::default(), proxy).unwrap(); + Client::builder().build(proxy_connector).request(req) + } else { + Client::builder() + .build(connector::Connector::default()) + .request(req) }, - #[cfg(not(feature = "proxy"))] - Client::builder() - .build(connector::Connector::default()) - .request(req), ) .await { @@ -292,6 +275,7 @@ impl SendData { payload_chunks: u64, // For payload specific headers that need to be added to the request like trace count. additional_payload_headers: Option>, + http_proxy: Option<&str>, ) -> RequestResult { let mut request_attempt = 0; let payload = Bytes::from(payload); @@ -312,7 +296,7 @@ impl SendData { .expect("HttpRequestBuilder unable to get headers for request") .extend(headers.clone()); - match self.send_request(req, payload.clone()).await { + match self.send_request(req, payload.clone(), http_proxy).await { // An Ok response doesn't necessarily mean the request was successful, we need to // check the status code and if it's not a 2xx or 3xx we treat it as an error Ok(response) => { @@ -396,7 +380,7 @@ impl SendData { req } - async fn send_with_protobuf(&self) -> SendDataResult { + async fn send_with_protobuf(&self, http_proxy: Option<&str>) -> SendDataResult { let mut result = SendDataResult::default(); let chunks = u64::try_from(self.tracer_payloads.size()).unwrap(); @@ -417,6 +401,7 @@ impl SendData { serialized_trace_payload, chunks, None, + http_proxy, ) .await, ) @@ -428,7 +413,7 @@ impl SendData { } } - async fn send_with_msgpack(&self) -> SendDataResult { + async fn send_with_msgpack(&self, http_proxy: Option<&str>) -> SendDataResult { let mut result = SendDataResult::default(); let mut futures = FuturesUnordered::new(); @@ -448,6 +433,7 @@ impl SendData { payload, chunks, additional_payload_headers, + http_proxy, )); } } @@ -460,7 +446,13 @@ impl SendData { Err(e) => return result.error(anyhow!(e)), }; - futures.push(self.send_payload(HEADER_CTYPE_MSGPACK, payload, chunks, headers)); + futures.push(self.send_payload( + HEADER_CTYPE_MSGPACK, + payload, + chunks, + headers, + http_proxy, + )); } } @@ -611,7 +603,6 @@ mod tests { timeout_ms: ONE_SECOND, ..Endpoint::default() }, - None, ); assert_eq!(data.size, 100); @@ -637,7 +628,6 @@ mod tests { timeout_ms: ONE_SECOND, ..Endpoint::default() }, - None, ); assert_eq!(data.size, 100); @@ -676,7 +666,6 @@ mod tests { timeout_ms: ONE_SECOND, ..Endpoint::default() }, - None, ); let data_payload_len = compute_payload_len(&data.tracer_payloads); @@ -721,7 +710,6 @@ mod tests { timeout_ms: ONE_SECOND, ..Endpoint::default() }, - None, ); let data_payload_len = compute_payload_len(&data.tracer_payloads); @@ -780,7 +768,6 @@ mod tests { timeout_ms: ONE_SECOND, ..Endpoint::default() }, - None, ); let data_payload_len = rmp_compute_payload_len(&data.tracer_payloads); @@ -839,7 +826,6 @@ mod tests { timeout_ms: ONE_SECOND, ..Endpoint::default() }, - None, ); let data_payload_len = rmp_compute_payload_len(&data.tracer_payloads); @@ -884,7 +870,6 @@ mod tests { timeout_ms: ONE_SECOND, ..Endpoint::default() }, - None, ); let data_payload_len = rmp_compute_payload_len(&data.tracer_payloads); @@ -927,7 +912,6 @@ mod tests { timeout_ms: ONE_SECOND, ..Endpoint::default() }, - None, ); let res = data.send().await; @@ -959,7 +943,6 @@ mod tests { timeout_ms: ONE_SECOND, ..Endpoint::default() }, - None, ); let res = data.send().await; @@ -1027,7 +1010,6 @@ mod tests { timeout_ms: 200, ..Endpoint::default() }, - None, ); let res = data.send().await; @@ -1070,7 +1052,6 @@ mod tests { timeout_ms: 200, ..Endpoint::default() }, - None, ); let res = data.send().await; diff --git a/trace-utils/src/test_utils/datadog_test_agent.rs b/trace-utils/src/test_utils/datadog_test_agent.rs index f0ad9d2dd..05321c419 100644 --- a/trace-utils/src/test_utils/datadog_test_agent.rs +++ b/trace-utils/src/test_utils/datadog_test_agent.rs @@ -131,7 +131,6 @@ impl DatadogTestAgentContainer { /// TracerPayloadCollection::V04(vec![trace.clone()]), /// TracerHeaderTags::default(), /// &endpoint, -/// None, /// ); /// /// let _result = data.send().await; diff --git a/trace-utils/src/test_utils/mod.rs b/trace-utils/src/test_utils/mod.rs index 1cd5871e1..357260e77 100644 --- a/trace-utils/src/test_utils/mod.rs +++ b/trace-utils/src/test_utils/mod.rs @@ -268,6 +268,5 @@ pub fn create_send_data(size: usize, target_endpoint: &Endpoint) -> SendData { TracerPayloadCollection::V07(vec![tracer_payload]), tracer_header_tags, target_endpoint, - None, ) } diff --git a/trace-utils/src/trace_utils.rs b/trace-utils/src/trace_utils.rs index 6b9801098..d14ff9d2b 100644 --- a/trace-utils/src/trace_utils.rs +++ b/trace-utils/src/trace_utils.rs @@ -708,7 +708,6 @@ mod tests { }]), TracerHeaderTags::default(), &Endpoint::default(), - None, ); let coalesced = trace_utils::coalesce_send_data(vec![ dummy.clone(), diff --git a/trace-utils/tests/test_send_data.rs b/trace-utils/tests/test_send_data.rs index 1265378f4..81bad5a51 100644 --- a/trace-utils/tests/test_send_data.rs +++ b/trace-utils/tests/test_send_data.rs @@ -55,7 +55,6 @@ mod tracing_integration_tests { TracerPayloadCollection::V04(vec![trace.clone()]), header_tags, &endpoint, - None, ); let _result = data.send().await;