Skip to content

Commit

Permalink
Tweaks to logs integration test (open-telemetry#2453)
Browse files Browse the repository at this point in the history
  • Loading branch information
cijothomas authored Dec 19, 2024
1 parent 0a7ad69 commit acf16ed
Show file tree
Hide file tree
Showing 6 changed files with 124 additions and 108 deletions.
5 changes: 2 additions & 3 deletions opentelemetry-otlp/tests/integration_test/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,17 @@ publish = false
opentelemetry = { path = "../../../opentelemetry", features = [] }
opentelemetry_sdk = { path = "../../../opentelemetry-sdk", features = ["rt-tokio", "testing"] }
opentelemetry-proto = { path = "../../../opentelemetry-proto", features = ["gen-tonic-messages", "trace", "logs", "metrics", "with-serde"] }
log = { workspace = true }
tokio = { version = "1.0", features = ["full"] }
serde_json = "1"
testcontainers = { version = "0.23.1", features = ["http_wait"]}
once_cell.workspace = true
anyhow = "1.0.94"
ctor = "0.2.9"
tracing-subscriber = { workspace = true, features = ["env-filter","registry", "std", "fmt"] }
tracing = "0.1.41"
tracing = {workspace = true}

[target.'cfg(unix)'.dependencies]
opentelemetry-appender-log = { path = "../../../opentelemetry-appender-log", default-features = false}
opentelemetry-appender-tracing = { path = "../../../opentelemetry-appender-tracing", default-features = false}
opentelemetry-otlp = { path = "../../../opentelemetry-otlp", default-features = false }
opentelemetry-semantic-conventions = { path = "../../../opentelemetry-semantic-conventions" }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
"scopeLogs": [
{
"scope": {
"name": "opentelemetry-log-appender",
"version": "0.3.0"
"name": "my-target",
"version": ""
},
"logRecords": [
{
Expand Down
4 changes: 2 additions & 2 deletions opentelemetry-otlp/tests/integration_test/expected/logs.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
"scopeLogs": [
{
"scope": {
"name": "opentelemetry-log-appender",
"version": "0.3.0"
"name": "my-target",
"version": ""
},
"logRecords": [
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ impl LogsAsserter {
let result_scope_logs = &result_resource_logs.scope_logs[i];
let expected_scope_logs = &expected_resource_logs.scope_logs[i];

assert_eq!(result_scope_logs.scope, expected_scope_logs.scope);

results_logs.extend(result_scope_logs.log_records.clone());
expected_logs.extend(expected_scope_logs.log_records.clone());
}
Expand Down
88 changes: 50 additions & 38 deletions opentelemetry-otlp/tests/integration_test/tests/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,11 @@ use anyhow::Result;
use ctor::dtor;
use integration_test_runner::logs_asserter::{read_logs_from_json, LogsAsserter};
use integration_test_runner::test_utils;
use log::{info, Level};
use opentelemetry_appender_log::OpenTelemetryLogBridge;
use opentelemetry_otlp::LogExporter;
use opentelemetry_sdk::logs::LoggerProvider;
use opentelemetry_sdk::{logs as sdklogs, Resource};
use std::fs::File;
use std::os::unix::fs::MetadataExt;
use std::time::Duration;

fn init_logs() -> Result<sdklogs::LoggerProvider> {
let exporter_builder = LogExporter::builder();
Expand All @@ -37,27 +34,56 @@ fn init_logs() -> Result<sdklogs::LoggerProvider> {
.build())
}

#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
pub async fn test_logs() -> Result<()> {
// Make sure the container is running
test_utils::start_collector_container().await?;

let logger_provider = init_logs().unwrap();
let otel_log_appender = OpenTelemetryLogBridge::new(&logger_provider);
log::set_boxed_logger(Box::new(otel_log_appender))?;
log::set_max_level(Level::Info.to_level_filter());

info!(target: "my-target", "hello from {}. My price is {}.", "banana", 2.99);

// TODO: remove below wait before calling logger_provider.shutdown()
tokio::time::sleep(Duration::from_secs(10)).await;
let _ = logger_provider.shutdown();

tokio::time::sleep(Duration::from_secs(10)).await;

assert_logs_results(test_utils::LOGS_FILE, "expected/logs.json");

Ok(())
#[cfg(test)]
mod logtests {
use super::*;
use integration_test_runner::logs_asserter::{read_logs_from_json, LogsAsserter};
use std::{fs::File, time::Duration};
#[test]
#[should_panic(expected = "assertion `left == right` failed: body does not match")]
pub fn test_assert_logs_eq_failure() {
let left = read_logs_from_json(File::open("./expected/logs.json").unwrap());
let right = read_logs_from_json(File::open("./expected/failed_logs.json").unwrap());
LogsAsserter::new(right, left).assert();
}

#[test]
pub fn test_assert_logs_eq() {
let logs = read_logs_from_json(File::open("./expected/logs.json").unwrap());
LogsAsserter::new(logs.clone(), logs).assert();
}

#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
#[cfg(not(feature = "hyper-client"))]
#[cfg(not(feature = "reqwest-client"))]
pub async fn test_logs() -> Result<()> {
// Make sure the container is running

use integration_test_runner::test_utils;
use opentelemetry_appender_tracing::layer;
use tracing::info;
use tracing_subscriber::layer::SubscriberExt;

use crate::{assert_logs_results, init_logs};
test_utils::start_collector_container().await?;

let logger_provider = init_logs().unwrap();
let layer = layer::OpenTelemetryTracingBridge::new(&logger_provider);
let subscriber = tracing_subscriber::registry().with(layer);
{
let _guard = tracing::subscriber::set_default(subscriber);
info!(target: "my-target", "hello from {}. My price is {}.", "banana", 2.99);
}
// TODO: remove below wait before calling logger_provider.shutdown()
// tokio::time::sleep(Duration::from_secs(10)).await;
let _ = logger_provider.shutdown();

tokio::time::sleep(Duration::from_secs(10)).await;

assert_logs_results(test_utils::LOGS_FILE, "expected/logs.json");

Ok(())
}
}

pub fn assert_logs_results(result: &str, expected: &str) {
Expand All @@ -69,20 +95,6 @@ pub fn assert_logs_results(result: &str, expected: &str) {
assert!(File::open(result).unwrap().metadata().unwrap().size() > 0)
}

#[test]
#[should_panic(expected = "assertion `left == right` failed: body does not match")]
pub fn test_assert_logs_eq_failure() {
let left = read_logs_from_json(File::open("./expected/logs.json").unwrap());
let right = read_logs_from_json(File::open("./expected/failed_logs.json").unwrap());
LogsAsserter::new(right, left).assert();
}

#[test]
pub fn test_assert_logs_eq() {
let logs = read_logs_from_json(File::open("./expected/logs.json").unwrap());
LogsAsserter::new(logs.clone(), logs).assert();
}

///
/// Make sure we stop the collector container, otherwise it will sit around hogging our
/// ports and subsequent test runs will fail.
Expand Down
129 changes: 66 additions & 63 deletions opentelemetry-sdk/src/logs/log_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -321,78 +321,81 @@ impl BatchLogProcessor {
let (message_sender, message_receiver) = mpsc::sync_channel(config.max_queue_size);
let max_queue_size = config.max_queue_size;

let handle = thread::spawn(move || {
let mut last_export_time = Instant::now();
let mut logs = Vec::new();
logs.reserve(config.max_export_batch_size);

loop {
let remaining_time_option = config
.scheduled_delay
.checked_sub(last_export_time.elapsed());
let remaining_time = match remaining_time_option {
Some(remaining_time) => remaining_time,
None => config.scheduled_delay,
};

match message_receiver.recv_timeout(remaining_time) {
Ok(BatchMessage::ExportLog(log)) => {
logs.push(log);
if logs.len() == config.max_export_batch_size
|| last_export_time.elapsed() >= config.scheduled_delay
{
let _ = export_with_timeout_sync(
let handle = thread::Builder::new()
.name("OpenTelemetry.Logs.BatchProcessor".to_string())
.spawn(move || {
let mut last_export_time = Instant::now();
let mut logs = Vec::new();
logs.reserve(config.max_export_batch_size);

loop {
let remaining_time_option = config
.scheduled_delay
.checked_sub(last_export_time.elapsed());
let remaining_time = match remaining_time_option {
Some(remaining_time) => remaining_time,
None => config.scheduled_delay,
};

match message_receiver.recv_timeout(remaining_time) {
Ok(BatchMessage::ExportLog(log)) => {
logs.push(log);
if logs.len() == config.max_export_batch_size
|| last_export_time.elapsed() >= config.scheduled_delay
{
let _ = export_with_timeout_sync(
remaining_time,
exporter.as_mut(),
logs.split_off(0),
&mut last_export_time,
);
}
}
Ok(BatchMessage::ForceFlush(sender)) => {
let result = export_with_timeout_sync(
remaining_time,
exporter.as_mut(),
logs.split_off(0),
&mut last_export_time,
);
let _ = sender.send(result);
}
}
Ok(BatchMessage::ForceFlush(sender)) => {
let result = export_with_timeout_sync(
remaining_time,
exporter.as_mut(),
logs.split_off(0),
&mut last_export_time,
);
let _ = sender.send(result);
}
Ok(BatchMessage::Shutdown(sender)) => {
let result = export_with_timeout_sync(
remaining_time,
exporter.as_mut(),
logs.split_off(0),
&mut last_export_time,
);
let _ = sender.send(result);
Ok(BatchMessage::Shutdown(sender)) => {
let result = export_with_timeout_sync(
remaining_time,
exporter.as_mut(),
logs.split_off(0),
&mut last_export_time,
);
let _ = sender.send(result);

//
// break out the loop and return from the current background thread.
//
break;
}
Ok(BatchMessage::SetResource(resource)) => {
exporter.set_resource(&resource);
}
Err(RecvTimeoutError::Timeout) => {
let _ = export_with_timeout_sync(
remaining_time,
exporter.as_mut(),
logs.split_off(0),
&mut last_export_time,
);
}
Err(err) => {
// TODO: this should not happen! Log the error and continue for now.
otel_error!(
name: "BatchLogProcessor.InternalError",
error = format!("{}", err)
);
//
// break out the loop and return from the current background thread.
//
break;
}
Ok(BatchMessage::SetResource(resource)) => {
exporter.set_resource(&resource);
}
Err(RecvTimeoutError::Timeout) => {
let _ = export_with_timeout_sync(
remaining_time,
exporter.as_mut(),
logs.split_off(0),
&mut last_export_time,
);
}
Err(err) => {
// TODO: this should not happen! Log the error and continue for now.
otel_error!(
name: "BatchLogProcessor.InternalError",
error = format!("{}", err)
);
}
}
}
}
});
})
.expect("Thread spawn failed."); //TODO: Handle thread spawn failure

// Return batch processor with link to worker
BatchLogProcessor {
Expand Down

0 comments on commit acf16ed

Please sign in to comment.