diff --git a/opentelemetry-appender-tracing/benches/logs.rs b/opentelemetry-appender-tracing/benches/logs.rs index 8b5376767e..a5ebb83249 100644 --- a/opentelemetry-appender-tracing/benches/logs.rs +++ b/opentelemetry-appender-tracing/benches/logs.rs @@ -18,7 +18,7 @@ use criterion::{criterion_group, criterion_main, Criterion}; use opentelemetry::logs::LogResult; use opentelemetry::{InstrumentationLibrary, KeyValue}; use opentelemetry_appender_tracing::layer as tracing_layer; -use opentelemetry_sdk::export::logs::LogExporter; +use opentelemetry_sdk::export::logs::{LogBatch, LogExporter}; use opentelemetry_sdk::logs::{LogProcessor, LogRecord, LoggerProvider}; use opentelemetry_sdk::Resource; use pprof::criterion::{Output, PProfProfiler}; @@ -34,7 +34,7 @@ struct NoopExporter { #[async_trait] impl LogExporter for NoopExporter { - async fn export(&mut self, _: Vec<(&LogRecord, &InstrumentationLibrary)>) -> LogResult<()> { + async fn export(&mut self, _: LogBatch<'_>) -> LogResult<()> { LogResult::Ok(()) } diff --git a/opentelemetry-otlp/src/exporter/http/logs.rs b/opentelemetry-otlp/src/exporter/http/logs.rs index 83f25c1f9f..db1932868b 100644 --- a/opentelemetry-otlp/src/exporter/http/logs.rs +++ b/opentelemetry-otlp/src/exporter/http/logs.rs @@ -3,15 +3,13 @@ use std::sync::Arc; use async_trait::async_trait; use http::{header::CONTENT_TYPE, Method}; use opentelemetry::logs::{LogError, LogResult}; -use opentelemetry::InstrumentationLibrary; -use opentelemetry_sdk::export::logs::LogExporter; -use opentelemetry_sdk::logs::LogRecord; +use opentelemetry_sdk::export::logs::{LogBatch, LogExporter}; use super::OtlpHttpClient; #[async_trait] impl LogExporter for OtlpHttpClient { - async fn export(&mut self, batch: Vec<(&LogRecord, &InstrumentationLibrary)>) -> LogResult<()> { + async fn export(&mut self, batch: LogBatch<'_>) -> LogResult<()> { let client = self .client .lock() diff --git a/opentelemetry-otlp/src/exporter/http/mod.rs b/opentelemetry-otlp/src/exporter/http/mod.rs index 1b60971d76..3ccefa4caa 100644 --- a/opentelemetry-otlp/src/exporter/http/mod.rs +++ b/opentelemetry-otlp/src/exporter/http/mod.rs @@ -7,18 +7,16 @@ use crate::{ OTEL_EXPORTER_OTLP_TIMEOUT, }; use http::{HeaderName, HeaderValue, Uri}; -#[cfg(feature = "logs")] -use opentelemetry::InstrumentationLibrary; use opentelemetry_http::HttpClient; use opentelemetry_proto::transform::common::tonic::ResourceAttributesWithSchema; #[cfg(feature = "logs")] use opentelemetry_proto::transform::logs::tonic::group_logs_by_resource_and_scope; #[cfg(feature = "trace")] use opentelemetry_proto::transform::trace::tonic::group_spans_by_resource_and_scope; +#[cfg(feature = "logs")] +use opentelemetry_sdk::export::logs::LogBatch; #[cfg(feature = "trace")] use opentelemetry_sdk::export::trace::SpanData; -#[cfg(feature = "logs")] -use opentelemetry_sdk::logs::LogRecord; #[cfg(feature = "metrics")] use opentelemetry_sdk::metrics::data::ResourceMetrics; use prost::Message; @@ -330,7 +328,7 @@ impl OtlpHttpClient { #[cfg(feature = "logs")] fn build_logs_export_body( &self, - logs: Vec<(&LogRecord, &InstrumentationLibrary)>, + logs: LogBatch<'_>, ) -> opentelemetry::logs::LogResult<(Vec, &'static str)> { use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest; let resource_logs = group_logs_by_resource_and_scope(logs, &self.resource); diff --git a/opentelemetry-otlp/src/exporter/tonic/logs.rs b/opentelemetry-otlp/src/exporter/tonic/logs.rs index e6d9661b8d..bf9b6c9ed3 100644 --- a/opentelemetry-otlp/src/exporter/tonic/logs.rs +++ b/opentelemetry-otlp/src/exporter/tonic/logs.rs @@ -4,14 +4,12 @@ use opentelemetry::logs::{LogError, LogResult}; use opentelemetry_proto::tonic::collector::logs::v1::{ logs_service_client::LogsServiceClient, ExportLogsServiceRequest, }; -use opentelemetry_sdk::export::logs::LogExporter; +use opentelemetry_sdk::export::logs::{LogBatch, LogExporter}; use tonic::{codegen::CompressionEncoding, service::Interceptor, transport::Channel, Request}; use opentelemetry_proto::transform::logs::tonic::group_logs_by_resource_and_scope; use super::BoxInterceptor; -use opentelemetry::InstrumentationLibrary; -use opentelemetry_sdk::logs::LogRecord; pub(crate) struct TonicLogsClient { inner: Option, @@ -56,7 +54,7 @@ impl TonicLogsClient { #[async_trait] impl LogExporter for TonicLogsClient { - async fn export(&mut self, batch: Vec<(&LogRecord, &InstrumentationLibrary)>) -> LogResult<()> { + async fn export(&mut self, batch: LogBatch<'_>) -> LogResult<()> { let (mut client, metadata, extensions) = match &mut self.inner { Some(inner) => { let (m, e, _) = inner diff --git a/opentelemetry-otlp/src/logs.rs b/opentelemetry-otlp/src/logs.rs index 65b913d11b..71f5a34b3d 100644 --- a/opentelemetry-otlp/src/logs.rs +++ b/opentelemetry-otlp/src/logs.rs @@ -13,9 +13,9 @@ use async_trait::async_trait; use std::fmt::Debug; use opentelemetry::logs::{LogError, LogResult}; -use opentelemetry::InstrumentationLibrary; -use opentelemetry_sdk::{logs::LogRecord, runtime::RuntimeChannel, Resource}; +use opentelemetry_sdk::export::logs::LogBatch; +use opentelemetry_sdk::{runtime::RuntimeChannel, Resource}; /// Compression algorithm to use, defaults to none. pub const OTEL_EXPORTER_OTLP_LOGS_COMPRESSION: &str = "OTEL_EXPORTER_OTLP_LOGS_COMPRESSION"; @@ -99,7 +99,7 @@ impl LogExporter { #[async_trait] impl opentelemetry_sdk::export::logs::LogExporter for LogExporter { - async fn export(&mut self, batch: Vec<(&LogRecord, &InstrumentationLibrary)>) -> LogResult<()> { + async fn export(&mut self, batch: LogBatch<'_>) -> LogResult<()> { self.client.export(batch).await } diff --git a/opentelemetry-proto/src/transform/logs.rs b/opentelemetry-proto/src/transform/logs.rs index b07d9a9be2..4cd2c1617b 100644 --- a/opentelemetry-proto/src/transform/logs.rs +++ b/opentelemetry-proto/src/transform/logs.rs @@ -12,6 +12,7 @@ pub mod tonic { transform::common::{to_nanos, tonic::ResourceAttributesWithSchema}, }; use opentelemetry::logs::{AnyValue as LogsAnyValue, Severity}; + use opentelemetry_sdk::export::logs::LogBatch; use std::borrow::Cow; use std::collections::HashMap; @@ -177,10 +178,7 @@ pub mod tonic { } pub fn group_logs_by_resource_and_scope( - logs: Vec<( - &opentelemetry_sdk::logs::LogRecord, - &opentelemetry::InstrumentationLibrary, - )>, + logs: LogBatch<'_>, resource: &ResourceAttributesWithSchema, ) -> Vec { // Group logs by target or instrumentation name @@ -237,7 +235,7 @@ mod tests { use crate::transform::common::tonic::ResourceAttributesWithSchema; use opentelemetry::logs::LogRecord as _; use opentelemetry::InstrumentationLibrary; - use opentelemetry_sdk::{logs::LogRecord, Resource}; + use opentelemetry_sdk::{export::logs::LogBatch, logs::LogRecord, Resource}; use std::time::SystemTime; fn create_test_log_data( @@ -258,11 +256,12 @@ mod tests { let (log_record1, instrum_lib1) = create_test_log_data("test-lib", "Log 1"); let (log_record2, instrum_lib2) = create_test_log_data("test-lib", "Log 2"); - let logs = vec![(&log_record1, &instrum_lib1), (&log_record2, &instrum_lib2)]; + let logs = [(&log_record1, &instrum_lib1), (&log_record2, &instrum_lib2)]; + let log_batch = LogBatch::new(&logs); let resource: ResourceAttributesWithSchema = (&resource).into(); // Convert Resource to ResourceAttributesWithSchema let grouped_logs = - crate::transform::logs::tonic::group_logs_by_resource_and_scope(logs, &resource); + crate::transform::logs::tonic::group_logs_by_resource_and_scope(log_batch, &resource); assert_eq!(grouped_logs.len(), 1); let resource_logs = &grouped_logs[0]; @@ -278,10 +277,11 @@ mod tests { let (log_record1, instrum_lib1) = create_test_log_data("lib1", "Log 1"); let (log_record2, instrum_lib2) = create_test_log_data("lib2", "Log 2"); - let logs = vec![(&log_record1, &instrum_lib1), (&log_record2, &instrum_lib2)]; + let logs = [(&log_record1, &instrum_lib1), (&log_record2, &instrum_lib2)]; + let log_batch = LogBatch::new(&logs); let resource: ResourceAttributesWithSchema = (&resource).into(); // Convert Resource to ResourceAttributesWithSchema let grouped_logs = - crate::transform::logs::tonic::group_logs_by_resource_and_scope(logs, &resource); + crate::transform::logs::tonic::group_logs_by_resource_and_scope(log_batch, &resource); assert_eq!(grouped_logs.len(), 1); let resource_logs = &grouped_logs[0]; diff --git a/opentelemetry-sdk/CHANGELOG.md b/opentelemetry-sdk/CHANGELOG.md index 283db572c3..2f44b29e7f 100644 --- a/opentelemetry-sdk/CHANGELOG.md +++ b/opentelemetry-sdk/CHANGELOG.md @@ -26,6 +26,7 @@ - Provide default implementation for `event_enabled` method in `LogProcessor` trait that returns `true` always. - **Breaking** [#2041](https://github.com/open-telemetry/opentelemetry-rust/pull/2041) + and [#2057](https://github.com/open-telemetry/opentelemetry-rust/pull/2057) - The Exporter::export() interface is modified as below: Previous Signature: ```rust @@ -34,9 +35,17 @@ Updated Signature: ```rust - async fn export(&mut self, batch: Vec<(&LogRecord, &InstrumentationLibrary)>) -> LogResult<()>; + async fn export(&mut self, batch: LogBatch<'_>) -> LogResult<()>; ``` - This change simplifies the processing required by exporters. Exporters no longer need to determine if the LogData is borrowed or owned, as they now work directly with references. As a result, exporters must explicitly create a copy of LogRecord and/or InstrumentationLibrary when needed, as the new interface only provides references to these structures. + + where + ```rust + pub struct LogBatch<'a> { + + data: &'a [(&'a LogRecord, &'a InstrumentationLibrary)], + } + ``` + This change enhances performance by reducing unnecessary heap allocations and maintains object safety, allowing for more efficient handling of log records. It also simplifies the processing required by exporters. Exporters no longer need to determine if the LogData is borrowed or owned, as they now work directly with references. As a result, exporters must explicitly create a copy of LogRecord and/or InstrumentationLibrary when needed, as the new interface only provides references to these structures. ## v0.24.1 diff --git a/opentelemetry-sdk/benches/log_exporter.rs b/opentelemetry-sdk/benches/log_exporter.rs index cf241a26d5..3549c08af5 100644 --- a/opentelemetry-sdk/benches/log_exporter.rs +++ b/opentelemetry-sdk/benches/log_exporter.rs @@ -6,8 +6,8 @@ RAM: 64.0 GB | Test | Average time| |--------------------------------|-------------| - | LogExporterWithFuture | 122 ns | - | LogExporterWithoutFuture | 89 ns | + | LogExporterWithFuture | 111 ns | + | LogExporterWithoutFuture | 92 ns | */ use std::sync::Mutex; @@ -19,6 +19,7 @@ use criterion::{criterion_group, criterion_main, Criterion}; use opentelemetry::logs::{LogRecord as _, LogResult, Logger as _, LoggerProvider as _, Severity}; use opentelemetry::InstrumentationLibrary; +use opentelemetry_sdk::export::logs::LogBatch; use opentelemetry_sdk::logs::LogProcessor; use opentelemetry_sdk::logs::LogRecord; use opentelemetry_sdk::logs::LoggerProvider; @@ -29,11 +30,11 @@ use std::fmt::Debug; // cargo bench --bench log_exporter #[async_trait] pub trait LogExporterWithFuture: Send + Sync + Debug { - async fn export(&mut self, batch: Vec<(&LogRecord, &InstrumentationLibrary)>); + async fn export(&mut self, batch: LogBatch<'_>); } pub trait LogExporterWithoutFuture: Send + Sync + Debug { - fn export(&mut self, batch: Vec<(&LogRecord, &InstrumentationLibrary)>); + fn export(&mut self, batch: LogBatch<'_>); } #[derive(Debug)] @@ -41,13 +42,13 @@ struct NoOpExporterWithFuture {} #[async_trait] impl LogExporterWithFuture for NoOpExporterWithFuture { - async fn export(&mut self, _batch: Vec<(&LogRecord, &InstrumentationLibrary)>) {} + async fn export(&mut self, _batch: LogBatch<'_>) {} } #[derive(Debug)] struct NoOpExporterWithoutFuture {} impl LogExporterWithoutFuture for NoOpExporterWithoutFuture { - fn export(&mut self, _batch: Vec<(&LogRecord, &InstrumentationLibrary)>) {} + fn export(&mut self, _batch: LogBatch<'_>) {} } #[derive(Debug)] @@ -66,7 +67,8 @@ impl ExportingProcessorWithFuture { impl LogProcessor for ExportingProcessorWithFuture { fn emit(&self, record: &mut LogRecord, library: &InstrumentationLibrary) { let mut exporter = self.exporter.lock().expect("lock error"); - futures_executor::block_on(exporter.export(vec![(record, library)])); + let logs = [(record as &LogRecord, library)]; + futures_executor::block_on(exporter.export(LogBatch::new(&logs))); } fn force_flush(&self) -> LogResult<()> { @@ -93,10 +95,11 @@ impl ExportingProcessorWithoutFuture { impl LogProcessor for ExportingProcessorWithoutFuture { fn emit(&self, record: &mut LogRecord, library: &InstrumentationLibrary) { + let logs = [(record as &LogRecord, library)]; self.exporter .lock() .expect("lock error") - .export(vec![(record, library)]); + .export(LogBatch::new(&logs)); } fn force_flush(&self) -> LogResult<()> { diff --git a/opentelemetry-sdk/src/export/logs/mod.rs b/opentelemetry-sdk/src/export/logs/mod.rs index afa6df0ee2..8056f28222 100644 --- a/opentelemetry-sdk/src/export/logs/mod.rs +++ b/opentelemetry-sdk/src/export/logs/mod.rs @@ -10,11 +10,81 @@ use opentelemetry::{ }; use std::fmt::Debug; +/// A batch of log records to be exported by a `LogExporter`. +/// +/// The `LogBatch` struct holds a collection of log records along with their associated +/// instrumentation libraries. This structure is used to group log records together for efficient +/// export operations. +/// +/// # Type Parameters +/// - `'a`: The lifetime of the references to the log records and instrumentation libraries. +/// +#[derive(Debug)] +pub struct LogBatch<'a> { + /// The data field contains a slice of tuples, where each tuple consists of a reference to + /// a `LogRecord` and a reference to an `InstrumentationLibrary`. + data: &'a [(&'a LogRecord, &'a InstrumentationLibrary)], +} + +impl<'a> LogBatch<'a> { + /// Creates a new instance of `LogBatch`. + /// + /// # Arguments + /// + /// * `data` - A slice of tuples, where each tuple consists of a reference to a `LogRecord` + /// and a reference to an `InstrumentationLibrary`. These tuples represent the log records + /// and their associated instrumentation libraries to be exported. + /// + /// # Returns + /// + /// A `LogBatch` instance containing the provided log records and instrumentation libraries. + /// + /// Note - this is not a public function, and should not be used directly. This would be + /// made private in the future. + + pub fn new(data: &'a [(&'a LogRecord, &'a InstrumentationLibrary)]) -> LogBatch<'a> { + LogBatch { data } + } +} + +impl LogBatch<'_> { + /// Returns an iterator over the log records and instrumentation libraries in the batch. + /// + /// Each item yielded by the iterator is a tuple containing references to a `LogRecord` + /// and an `InstrumentationLibrary`. + /// + /// # Returns + /// + /// An iterator that yields references to the `LogRecord` and `InstrumentationLibrary` in the batch. + /// + pub fn iter(&self) -> impl Iterator { + self.data + .iter() + .map(|(record, library)| (*record, *library)) + } +} + /// `LogExporter` defines the interface that log exporters should implement. #[async_trait] pub trait LogExporter: Send + Sync + Debug { - /// Exports a batch of [`LogRecord`, `InstrumentationLibrary`]. - async fn export(&mut self, batch: Vec<(&LogRecord, &InstrumentationLibrary)>) -> LogResult<()>; + /// Exports a batch of log records and their associated instrumentation libraries. + /// + /// The `export` method is responsible for sending a batch of log records to an external + /// destination. It takes a `LogBatch` as an argument, which contains references to the + /// log records and their corresponding instrumentation libraries. The method returns + /// a `LogResult` indicating the success or failure of the export operation. + /// + /// # Arguments + /// + /// * `batch` - A `LogBatch` containing the log records and instrumentation libraries + /// to be exported. + /// + /// # Returns + /// + /// A `LogResult<()>`, which is a result type indicating either a successful export (with + /// `Ok(())`) or an error (`Err(LogError)`) if the export operation failed. + /// + async fn export(&mut self, batch: LogBatch<'_>) -> LogResult<()>; /// Shuts down the exporter. fn shutdown(&mut self) {} #[cfg(feature = "logs_level_enabled")] diff --git a/opentelemetry-sdk/src/logs/log_processor.rs b/opentelemetry-sdk/src/logs/log_processor.rs index ce1032ec62..b615acb9b8 100644 --- a/opentelemetry-sdk/src/logs/log_processor.rs +++ b/opentelemetry-sdk/src/logs/log_processor.rs @@ -1,5 +1,5 @@ use crate::{ - export::logs::{ExportResult, LogExporter}, + export::logs::{ExportResult, LogBatch, LogExporter}, logs::LogRecord, runtime::{RuntimeChannel, TrySend}, Resource, @@ -106,7 +106,8 @@ impl LogProcessor for SimpleLogProcessor { .lock() .map_err(|_| LogError::Other("simple logprocessor mutex poison".into())) .and_then(|mut exporter| { - futures_executor::block_on(exporter.export(vec![(record, instrumentation)])) + let log_tuple = &[(record as &LogRecord, instrumentation)]; + futures_executor::block_on(exporter.export(LogBatch::new(log_tuple))) }); if let Err(err) = result { global::handle_error(err); @@ -312,14 +313,13 @@ where if batch.is_empty() { return Ok(()); } - // Convert the Vec<&LogData> to Vec<(&LogRecord, &InstrumentationLibrary)> + // TBD - Can we avoid this conversion as it involves heap allocation with new vector? - let export_batch = batch + let log_vec: Vec<(&LogRecord, &InstrumentationLibrary)> = batch .iter() .map(|log_data| (&log_data.0, &log_data.1)) .collect(); - - let export = exporter.export(export_batch); + let export = exporter.export(LogBatch::new(log_vec.as_slice())); let timeout = runtime.delay(time_out); pin_mut!(export); pin_mut!(timeout); @@ -515,7 +515,7 @@ mod tests { BatchLogProcessor, OTEL_BLRP_EXPORT_TIMEOUT, OTEL_BLRP_MAX_EXPORT_BATCH_SIZE, OTEL_BLRP_MAX_QUEUE_SIZE, OTEL_BLRP_SCHEDULE_DELAY, }; - use crate::export::logs::LogExporter; + use crate::export::logs::{LogBatch, LogExporter}; use crate::logs::LogRecord; use crate::testing::logs::InMemoryLogsExporterBuilder; use crate::{ @@ -547,10 +547,7 @@ mod tests { #[async_trait] impl LogExporter for MockLogExporter { - async fn export( - &mut self, - _batch: Vec<(&LogRecord, &InstrumentationLibrary)>, - ) -> LogResult<()> { + async fn export(&mut self, _batch: LogBatch<'_>) -> LogResult<()> { Ok(()) } diff --git a/opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs b/opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs index 170139bf9d..958ab11fe1 100644 --- a/opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs +++ b/opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs @@ -1,4 +1,4 @@ -use crate::export::logs::LogExporter; +use crate::export::logs::{LogBatch, LogExporter}; use crate::logs::LogRecord; use crate::Resource; use async_trait::async_trait; @@ -184,12 +184,12 @@ impl InMemoryLogsExporter { #[async_trait] impl LogExporter for InMemoryLogsExporter { - async fn export(&mut self, batch: Vec<(&LogRecord, &InstrumentationLibrary)>) -> LogResult<()> { + async fn export(&mut self, batch: LogBatch<'_>) -> LogResult<()> { let mut logs_guard = self.logs.lock().map_err(LogError::from)?; - for (log_record, instrumentation) in batch.into_iter() { + for (log_record, instrumentation) in batch.iter() { let owned_log = OwnedLogData { - record: log_record.clone(), - instrumentation: instrumentation.clone(), + record: (*log_record).clone(), + instrumentation: (*instrumentation).clone(), }; logs_guard.push(owned_log); } diff --git a/opentelemetry-stdout/src/logs/exporter.rs b/opentelemetry-stdout/src/logs/exporter.rs index 959d66d049..48a8b1a120 100644 --- a/opentelemetry-stdout/src/logs/exporter.rs +++ b/opentelemetry-stdout/src/logs/exporter.rs @@ -2,8 +2,7 @@ use async_trait::async_trait; use chrono::{DateTime, Utc}; use core::fmt; use opentelemetry::logs::LogResult; -use opentelemetry::InstrumentationLibrary; -use opentelemetry_sdk::logs::LogRecord; +use opentelemetry_sdk::export::logs::LogBatch; use opentelemetry_sdk::Resource; use std::sync::atomic; @@ -33,7 +32,7 @@ impl fmt::Debug for LogExporter { #[async_trait] impl opentelemetry_sdk::export::logs::LogExporter for LogExporter { /// Export spans to stdout - async fn export(&mut self, batch: Vec<(&LogRecord, &InstrumentationLibrary)>) -> LogResult<()> { + async fn export(&mut self, batch: LogBatch<'_>) -> LogResult<()> { if self.is_shutdown.load(atomic::Ordering::SeqCst) { return Err("exporter is shut down".into()); } else { @@ -66,8 +65,8 @@ impl opentelemetry_sdk::export::logs::LogExporter for LogExporter { } } -fn print_logs(batch: Vec<(&LogRecord, &InstrumentationLibrary)>) { - for (i, log) in batch.into_iter().enumerate() { +fn print_logs(batch: LogBatch<'_>) { + for (i, log) in batch.iter().enumerate() { println!("Log #{}", i); let (record, _library) = log; if let Some(event_name) = record.event_name {