Skip to content

Commit

Permalink
changes..
Browse files Browse the repository at this point in the history
  • Loading branch information
lalitb committed Sep 3, 2024
1 parent 0de6e9a commit 39520db
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 41 deletions.
8 changes: 4 additions & 4 deletions opentelemetry-sdk/src/export/logs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use std::fmt::Debug;
pub struct LogBatch<'a> {
/// The data field contains a slice of tuples, where each tuple consists of a reference to
/// a `LogRecord` and a reference to an `InstrumentationLibrary`.
data: &'a [(&'a LogRecord, &'a InstrumentationLibrary)],
data: &'a [(&'a LogRecord<'a>, &'a InstrumentationLibrary)],
}

impl<'a> LogBatch<'a> {
Expand All @@ -42,12 +42,12 @@ impl<'a> LogBatch<'a> {
/// Note - this is not a public function, and should not be used directly. This would be
/// made private in the future.
pub fn new(data: &'a [(&'a LogRecord, &'a InstrumentationLibrary)]) -> LogBatch<'a> {
pub fn new(data: &'a [(&'a LogRecord<'a>, &'a InstrumentationLibrary)]) -> LogBatch<'a> {
LogBatch { data }
}
}

impl LogBatch<'_> {
impl<'a> LogBatch<'a> {
/// Returns an iterator over the log records and instrumentation libraries in the batch.
///
/// Each item yielded by the iterator is a tuple containing references to a `LogRecord`
Expand All @@ -57,7 +57,7 @@ impl LogBatch<'_> {
///
/// An iterator that yields references to the `LogRecord` and `InstrumentationLibrary` in the batch.
///
pub fn iter(&self) -> impl Iterator<Item = (&LogRecord, &InstrumentationLibrary)> {
pub fn iter(&self) -> impl Iterator<Item = (&LogRecord<'a>, &InstrumentationLibrary)> {
self.data
.iter()
.map(|(record, library)| (*record, *library))
Expand Down
6 changes: 3 additions & 3 deletions opentelemetry-sdk/src/growable_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ mod tests {
use opentelemetry::logs::AnyValue;
use opentelemetry::Key;

type KeyValuePair = Option<(Key, AnyValue)>;
type KeyValuePair<'a> = Option<(Key, AnyValue<'a>)>;

#[test]
fn test_push_and_get() {
Expand Down Expand Up @@ -234,7 +234,7 @@ mod tests {

#[test]
fn test_key_value_pair_storage_growable_array() {
let mut collection = GrowableArray::<KeyValuePair>::new();
let mut collection = GrowableArray::<KeyValuePair<'_>>::new();

let key1 = Key::from("key1");
let value1 = AnyValue::String("value1".into());
Expand Down Expand Up @@ -267,7 +267,7 @@ mod tests {

#[test]
fn test_empty_attributes() {
let collection = GrowableArray::<KeyValuePair>::new();
let collection = GrowableArray::<KeyValuePair<'_>>::new();
assert_eq!(collection.len(), 0);
assert_eq!(collection.get(0), None);

Expand Down
10 changes: 5 additions & 5 deletions opentelemetry-sdk/src/logs/log_emitter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,14 +239,14 @@ impl Logger {
}

impl opentelemetry::logs::Logger for Logger {
type LogRecord = LogRecord;
type LogRecord<'a> = LogRecord<'a>;

fn create_log_record(&self) -> Self::LogRecord {
fn create_log_record<'a>(&self) -> Self::LogRecord<'a> {
LogRecord::default()
}

/// Emit a `LogRecord`.
fn emit(&self, mut record: Self::LogRecord) {
fn emit<'a>(&self, mut record: Self::LogRecord<'a>) {
let provider = self.provider();
let processors = provider.log_processors();
let trace_context = Context::map_current(|cx| {
Expand Down Expand Up @@ -320,7 +320,7 @@ mod tests {
}

impl LogProcessor for ShutdownTestLogProcessor {
fn emit(&self, _data: &mut LogRecord, _library: &InstrumentationLibrary) {
fn emit<'a>(&self, _data: &mut LogRecord<'a>, _library: &InstrumentationLibrary) {
self.is_shutdown
.lock()
.map(|is_shutdown| {
Expand Down Expand Up @@ -550,7 +550,7 @@ mod tests {
}

impl LogProcessor for LazyLogProcessor {
fn emit(&self, _data: &mut LogRecord, _library: &InstrumentationLibrary) {
fn emit<'a>(&self, _data: &mut LogRecord<'a>, _library: &InstrumentationLibrary) {
// nothing to do.
}

Expand Down
51 changes: 32 additions & 19 deletions opentelemetry-sdk/src/logs/log_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ pub trait LogProcessor: Send + Sync + Debug {
/// # Parameters
/// - `record`: A mutable reference to `LogData` representing the log record.
/// - `instrumentation`: The instrumentation library associated with the log record.
fn emit(&self, data: &mut LogRecord, instrumentation: &InstrumentationLibrary);
fn emit<'a>(&self, data: &mut LogRecord<'a>, instrumentation: &InstrumentationLibrary);
/// Force the logs lying in the cache to be exported.
fn force_flush(&self) -> LogResult<()>;
/// Shuts down the processor.
Expand Down Expand Up @@ -95,7 +95,7 @@ impl SimpleLogProcessor {
}

impl LogProcessor for SimpleLogProcessor {
fn emit(&self, record: &mut LogRecord, instrumentation: &InstrumentationLibrary) {
fn emit<'a>(&self, record: &mut LogRecord<'a>, instrumentation: &InstrumentationLibrary) {
// noop after shutdown
if self.is_shutdown.load(std::sync::atomic::Ordering::Relaxed) {
return;
Expand All @@ -106,7 +106,7 @@ impl LogProcessor for SimpleLogProcessor {
.lock()
.map_err(|_| LogError::Other("simple logprocessor mutex poison".into()))
.and_then(|mut exporter| {
let log_tuple = &[(record as &LogRecord, instrumentation)];
let log_tuple = &[(record as &LogRecord<'a>, instrumentation)];
futures_executor::block_on(exporter.export(LogBatch::new(log_tuple)))
});
if let Err(err) = result {
Expand Down Expand Up @@ -153,7 +153,7 @@ impl<R: RuntimeChannel> Debug for BatchLogProcessor<R> {
}

impl<R: RuntimeChannel> LogProcessor for BatchLogProcessor<R> {
fn emit(&self, record: &mut LogRecord, instrumentation: &InstrumentationLibrary) {
fn emit<'a>(&self, record: &mut LogRecord<'a>, instrumentation: &InstrumentationLibrary) {
let result = self.message_sender.try_send(BatchMessage::ExportLog((
record.clone(),
instrumentation.clone(),
Expand Down Expand Up @@ -300,11 +300,11 @@ impl<R: RuntimeChannel> BatchLogProcessor<R> {
}
}

async fn export_with_timeout<R, E>(
async fn export_with_timeout<'a, R, E>(
time_out: Duration,
exporter: &mut E,
runtime: &R,
batch: Vec<(LogRecord, InstrumentationLibrary)>,
batch: Vec<(LogRecord<'a>, InstrumentationLibrary)>,
) -> ExportResult
where
R: RuntimeChannel,
Expand All @@ -315,7 +315,7 @@ where
}

// TBD - Can we avoid this conversion as it involves heap allocation with new vector?
let log_vec: Vec<(&LogRecord, &InstrumentationLibrary)> = batch
let log_vec: Vec<(&LogRecord<'a>, &InstrumentationLibrary)> = batch
.iter()
.map(|log_data| (&log_data.0, &log_data.1))
.collect();
Expand Down Expand Up @@ -497,9 +497,9 @@ where
/// Messages sent between application thread and batch log processor's work thread.
#[allow(clippy::large_enum_variant)]
#[derive(Debug)]
enum BatchMessage {
enum BatchMessage<'a> {
/// Export logs, usually called when the log is emitted.
ExportLog((LogRecord, InstrumentationLibrary)),
ExportLog((LogRecord<'a>, InstrumentationLibrary)),
/// Flush the current buffer to the backend, it can be triggered by
/// pre configured interval or a call to `force_push` function.
Flush(Option<oneshot::Sender<ExportResult>>),
Expand Down Expand Up @@ -777,7 +777,7 @@ mod tests {
runtime::Tokio,
);

let mut record: LogRecord = Default::default();
let mut record: LogRecord<'_> = Default::default();
let instrumentation: InstrumentationLibrary = Default::default();

processor.emit(&mut record, &instrumentation);
Expand All @@ -795,7 +795,7 @@ mod tests {
.build();
let processor = SimpleLogProcessor::new(Box::new(exporter.clone()));

let mut record: LogRecord = Default::default();
let mut record: LogRecord<'_> = Default::default();
let instrumentation: InstrumentationLibrary = Default::default();

processor.emit(&mut record, &instrumentation);
Expand All @@ -813,12 +813,12 @@ mod tests {
}

#[derive(Debug)]
struct FirstProcessor {
pub(crate) logs: Arc<Mutex<Vec<(LogRecord, InstrumentationLibrary)>>>,
struct FirstProcessor<'b> {
pub(crate) logs: Arc<Mutex<Vec<(LogRecord<'b>, InstrumentationLibrary)>>>,
}

impl LogProcessor for FirstProcessor {
fn emit(&self, record: &mut LogRecord, instrumentation: &InstrumentationLibrary) {
impl<'b> LogProcessor for FirstProcessor<'b> {
fn emit<'a>(&self, record: &mut LogRecord<'a>, instrumentation: &InstrumentationLibrary) {
// add attribute
record.add_attribute(
Key::from_static_str("processed_by"),
Expand All @@ -844,11 +844,12 @@ mod tests {

#[derive(Debug)]
struct SecondProcessor {
pub(crate) logs: Arc<Mutex<Vec<(LogRecord, InstrumentationLibrary)>>>,
pub(crate) logs: Arc<Mutex<Vec<(LogRecord<'static>, InstrumentationLibrary)>>>,
}

impl LogProcessor for SecondProcessor {
fn emit(&self, record: &mut LogRecord, instrumentation: &InstrumentationLibrary) {
impl<'b> LogProcessor for SecondProcessor {
fn emit<'a>(&self, record: &mut LogRecord<'a>, instrumentation: &InstrumentationLibrary)
{
assert!(record.attributes_contains(
&Key::from_static_str("processed_by"),
&AnyValue::String("FirstProcessor".into())
Expand All @@ -857,10 +858,22 @@ mod tests {
record.body.clone().unwrap()
== AnyValue::String("Updated by FirstProcessor".into())
);
// Clone the `LogRecord` to have a `'static` version to store in the log.
let static_record: LogRecord<'static> = LogRecord {
event_name: record.event_name,
target: record.target.clone().map(|t| t.into_owned().into()),
timestamp: record.timestamp,
observed_timestamp: record.observed_timestamp,
trace_context: record.trace_context.clone(),
severity_text: record.severity_text,
severity_number: record.severity_number,
body: record.body.clone().map(AnyValue::to_owned_value),
attributes: record.attributes.clone(), // Assuming `attributes` can be cloned
};
self.logs
.lock()
.unwrap()
.push((record.clone(), instrumentation.clone()));
.push((static_record, instrumentation.clone()));
}

fn force_flush(&self) -> LogResult<()> {
Expand Down
10 changes: 0 additions & 10 deletions opentelemetry-sdk/src/logs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,6 @@ pub use log_processor::{
};
pub use record::{LogRecord, TraceContext};

use opentelemetry::InstrumentationLibrary;
/// `LogData` represents a single log event without resource context.
#[derive(Clone, Debug)]
pub struct LogData {
/// Log record
pub record: LogRecord,
/// Instrumentation details for the emitter who produced this `LogEvent`.
pub instrumentation: InstrumentationLibrary,
}

#[cfg(all(test, feature = "testing"))]
mod tests {
use super::*;
Expand Down

0 comments on commit 39520db

Please sign in to comment.