Skip to content

Commit

Permalink
graceful unwrap
Browse files Browse the repository at this point in the history
  • Loading branch information
lalitb committed Mar 1, 2024
1 parent 2b8549f commit 44efee7
Showing 1 changed file with 108 additions and 45 deletions.
153 changes: 108 additions & 45 deletions opentelemetry-sdk/src/metrics/internal/sum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ impl<T: Number<T>> ValueMap<T> {
.take(BUCKET_COUNT)
.collect::<Vec<_>>()
.try_into()
.unwrap();
.unwrap(); // this will never fail as Vec length is fixed

ValueMap {
buckets: Arc::new(buckets),
Expand All @@ -71,12 +71,16 @@ impl<T: Number<T>> ValueMap<T> {
// Calculate the total length of data points across all buckets.
fn total_data_points_count(&self) -> usize {
self.buckets
.iter()
.map(|bucket_mutex| {
let locked_bucket = bucket_mutex.lock().unwrap();
locked_bucket.as_ref().map_or(0, |bucket| bucket.len())
})
.sum::<usize>()
.iter()
.map(|bucket_mutex| {
bucket_mutex.lock()
.map(|locked_bucket| locked_bucket.as_ref().map_or(0, |bucket| bucket.len()))
.unwrap_or_else(|_| {
global::handle_error(MetricsError::Other("Failed to acquire lock on a bucket. Using default `0` as total data points.".into()));
0
})
})
.sum::<usize>()
}
}

Expand All @@ -87,12 +91,19 @@ impl<T: Number<T>> ValueMap<T> {
self.has_no_value_attribute_value
.store(true, Ordering::Release);
} else {
let bucket_index = Self::hash_to_bucket(&attrs) as usize; // Ensure index is usize for array indexing
let bucket_index = Self::hash_to_bucket(&attrs) as usize;
let bucket_mutex = &self.buckets[bucket_index];
let mut bucket_guard = bucket_mutex.lock().unwrap();

let mut bucket_guard = match bucket_mutex.lock() {
Ok(guard) => guard,
Err(e) => {
eprintln!("Failed to acquire lock due to: {:?}", e);
return; // TBD - retry ?
}
};

if bucket_guard.is_none() {
*bucket_guard = Some(HashMap::new()); // Initialize the bucket if it's None
*bucket_guard = Some(HashMap::new());
}

if let Some(ref mut values) = *bucket_guard {
Expand All @@ -106,7 +117,6 @@ impl<T: Number<T>> ValueMap<T> {
if is_under_cardinality_limit(size) {
vacant_entry.insert(measurement);
} else {
// TBD - Update total_count ??
values
.entry(STREAM_OVERFLOW_ATTRIBUTE_SET.clone())
.and_modify(|val| *val += measurement)
Expand Down Expand Up @@ -188,17 +198,31 @@ impl<T: Number<T>> Sum<T> {
}

for bucket_mutex in self.value_map.buckets.iter() {
if let Some(ref mut locked_bucket) = *bucket_mutex.lock().unwrap() {
for (attrs, value) in locked_bucket.drain() {
s_data.data_points.push(DataPoint {
attributes: attrs,
start_time: Some(*self.start.lock().unwrap()),
time: Some(t),
value,
exemplars: vec![],
});
match bucket_mutex.lock() {
Ok(mut locked_bucket) => {
if let Some(ref mut bucket) = *locked_bucket {
for (attrs, value) in bucket.drain() {
// Correctly handle lock acquisition on self.start
let start_time = self.start.lock().map_or_else(
|_| SystemTime::now(), // In case of an error, use SystemTime::now()
|guard| *guard, // In case of success, dereference the guard to get the SystemTime
);

s_data.data_points.push(DataPoint {
attributes: attrs,
start_time: Some(start_time),
time: Some(t),
value,
exemplars: vec![],
});
}
}
}
Err(e) => {
global::handle_error(MetricsError::Other(
format!("Failed to acquire lock on bucket due to: {:?}", e).into(),

Check failure on line 223 in opentelemetry-sdk/src/metrics/internal/sum.rs

View workflow job for this annotation

GitHub Actions / lint

useless conversion to the same type: `std::string::String`
));
}
// The bucket is automatically cleared by the .drain() method
}
}

Expand Down Expand Up @@ -261,16 +285,29 @@ impl<T: Number<T>> Sum<T> {
// sets that become "stale" need to be forgotten so this will not
// overload the system.
for bucket_mutex in self.value_map.buckets.iter() {
if let Some(ref locked_bucket) = *bucket_mutex.lock().unwrap() {
for (attrs, value) in locked_bucket.iter() {
s_data.data_points.push(DataPoint {
attributes: attrs.clone(),
start_time: Some(*self.start.lock().unwrap()), // Consider last reset time
time: Some(t),
value: *value,
exemplars: vec![],
});
// Handle potential lock failure gracefully
if let Ok(locked_bucket) = bucket_mutex.lock() {
if let Some(locked_bucket) = &*locked_bucket {
for (attrs, value) in locked_bucket.iter() {
// Handle potential lock failure on self.start and use current time as fallback
let start_time = self.start.lock().map_or_else(
|_| SystemTime::now(), // Use SystemTime::now() as fallback on error
|guard| *guard, // Dereference the guard to get the SystemTime on success
);

s_data.data_points.push(DataPoint {
attributes: attrs.clone(),
start_time: Some(start_time),
time: Some(t),
value: *value,
exemplars: vec![],
});
}
}
} else {
global::handle_error(MetricsError::Other(
"Failed to acquire lock on a bucket".into(),
));
}
}

Expand Down Expand Up @@ -352,20 +389,32 @@ impl<T: Number<T>> PrecomputedSum<T> {
}

for bucket_mutex in self.value_map.buckets.iter() {
if let Some(ref mut locked_bucket) = *bucket_mutex.lock().unwrap() {
let default = T::default();
for (attrs, value) in locked_bucket.drain() {
let delta = value - *reported.get(&attrs).unwrap_or(&default);
if delta != default {
new_reported.insert(attrs.clone(), value);
match bucket_mutex.lock() {
Ok(mut locked_bucket) => {
if let Some(locked_bucket) = &mut *locked_bucket {
let default = T::default();
for (attrs, value) in locked_bucket.drain() {
let delta = value - *reported.get(&attrs).unwrap_or(&default);
if delta != default {
new_reported.insert(attrs.clone(), value);
}
s_data.data_points.push(DataPoint {
attributes: attrs.clone(),
start_time: Some(prev_start),
time: Some(t),
value: delta,
exemplars: vec![],
});
}
}
s_data.data_points.push(DataPoint {
attributes: attrs.clone(),
start_time: Some(prev_start),
time: Some(t),
value: delta,
exemplars: vec![],
});
}
Err(e) => {
// Log or handle the lock acquisition error if necessary
global::handle_error(MetricsError::Other(
format!("Failed to acquire lock on bucket due to: {:?}", e).into(),

Check failure on line 414 in opentelemetry-sdk/src/metrics/internal/sum.rs

View workflow job for this annotation

GitHub Actions / lint

useless conversion to the same type: `std::string::String`
));
// Continue to the next bucket if the lock cannot be acquired
continue;
}
}
}
Expand Down Expand Up @@ -434,17 +483,31 @@ impl<T: Number<T>> PrecomputedSum<T> {

let default = T::default();
for bucket_mutex in self.value_map.buckets.iter() {
if let Some(ref locked_bucket) = *bucket_mutex.lock().unwrap() {
// Safely attempt to acquire the lock, handling any potential error.
let locked_bucket = match bucket_mutex.lock() {
Ok(bucket) => bucket,
Err(e) => {
// Log the error or handle it as needed.
global::handle_error(MetricsError::Other(
format!("Failed to acquire lock on bucket due to: {:?}", e).into(),

Check failure on line 492 in opentelemetry-sdk/src/metrics/internal/sum.rs

View workflow job for this annotation

GitHub Actions / lint

useless conversion to the same type: `std::string::String`
));
continue; // Skip to the next bucket if the lock cannot be acquired.
}
};

// Proceed only if the bucket is not empty.
if let Some(locked_bucket) = &*locked_bucket {
for (attrs, value) in locked_bucket.iter() {
let delta = *value - *reported.get(attrs).unwrap_or(&default);
if delta != default {
new_reported.insert(attrs.clone(), *value);
}

s_data.data_points.push(DataPoint {
attributes: attrs.clone(),
start_time: Some(prev_start),
time: Some(t),
value: *value, // For cumulative, we use the value directly without calculating delta
value: *value, // For cumulative, directly use the value without calculating the delta.
exemplars: vec![],
});
}
Expand Down

0 comments on commit 44efee7

Please sign in to comment.