diff --git a/opentelemetry-sdk/src/metrics/internal/sum.rs b/opentelemetry-sdk/src/metrics/internal/sum.rs index 939b4848f3..17420847ca 100644 --- a/opentelemetry-sdk/src/metrics/internal/sum.rs +++ b/opentelemetry-sdk/src/metrics/internal/sum.rs @@ -47,7 +47,7 @@ impl> ValueMap { .take(BUCKET_COUNT) .collect::>() .try_into() - .unwrap(); + .unwrap(); // this will never fail as Vec length is fixed ValueMap { buckets: Arc::new(buckets), @@ -71,12 +71,16 @@ impl> ValueMap { // Calculate the total length of data points across all buckets. fn total_data_points_count(&self) -> usize { self.buckets - .iter() - .map(|bucket_mutex| { - let locked_bucket = bucket_mutex.lock().unwrap(); - locked_bucket.as_ref().map_or(0, |bucket| bucket.len()) - }) - .sum::() + .iter() + .map(|bucket_mutex| { + bucket_mutex.lock() + .map(|locked_bucket| locked_bucket.as_ref().map_or(0, |bucket| bucket.len())) + .unwrap_or_else(|_| { + global::handle_error(MetricsError::Other("Failed to acquire lock on a bucket. Using default `0` as total data points.".into())); + 0 + }) + }) + .sum::() } } @@ -87,12 +91,19 @@ impl> ValueMap { self.has_no_value_attribute_value .store(true, Ordering::Release); } else { - let bucket_index = Self::hash_to_bucket(&attrs) as usize; // Ensure index is usize for array indexing + let bucket_index = Self::hash_to_bucket(&attrs) as usize; let bucket_mutex = &self.buckets[bucket_index]; - let mut bucket_guard = bucket_mutex.lock().unwrap(); + + let mut bucket_guard = match bucket_mutex.lock() { + Ok(guard) => guard, + Err(e) => { + eprintln!("Failed to acquire lock due to: {:?}", e); + return; // TBD - retry ? + } + }; if bucket_guard.is_none() { - *bucket_guard = Some(HashMap::new()); // Initialize the bucket if it's None + *bucket_guard = Some(HashMap::new()); } if let Some(ref mut values) = *bucket_guard { @@ -106,7 +117,6 @@ impl> ValueMap { if is_under_cardinality_limit(size) { vacant_entry.insert(measurement); } else { - // TBD - Update total_count ?? values .entry(STREAM_OVERFLOW_ATTRIBUTE_SET.clone()) .and_modify(|val| *val += measurement) @@ -188,17 +198,31 @@ impl> Sum { } for bucket_mutex in self.value_map.buckets.iter() { - if let Some(ref mut locked_bucket) = *bucket_mutex.lock().unwrap() { - for (attrs, value) in locked_bucket.drain() { - s_data.data_points.push(DataPoint { - attributes: attrs, - start_time: Some(*self.start.lock().unwrap()), - time: Some(t), - value, - exemplars: vec![], - }); + match bucket_mutex.lock() { + Ok(mut locked_bucket) => { + if let Some(ref mut bucket) = *locked_bucket { + for (attrs, value) in bucket.drain() { + // Correctly handle lock acquisition on self.start + let start_time = self.start.lock().map_or_else( + |_| SystemTime::now(), // In case of an error, use SystemTime::now() + |guard| *guard, // In case of success, dereference the guard to get the SystemTime + ); + + s_data.data_points.push(DataPoint { + attributes: attrs, + start_time: Some(start_time), + time: Some(t), + value, + exemplars: vec![], + }); + } + } + } + Err(e) => { + global::handle_error(MetricsError::Other( + format!("Failed to acquire lock on bucket due to: {:?}", e).into(), + )); } - // The bucket is automatically cleared by the .drain() method } } @@ -261,16 +285,29 @@ impl> Sum { // sets that become "stale" need to be forgotten so this will not // overload the system. for bucket_mutex in self.value_map.buckets.iter() { - if let Some(ref locked_bucket) = *bucket_mutex.lock().unwrap() { - for (attrs, value) in locked_bucket.iter() { - s_data.data_points.push(DataPoint { - attributes: attrs.clone(), - start_time: Some(*self.start.lock().unwrap()), // Consider last reset time - time: Some(t), - value: *value, - exemplars: vec![], - }); + // Handle potential lock failure gracefully + if let Ok(locked_bucket) = bucket_mutex.lock() { + if let Some(locked_bucket) = &*locked_bucket { + for (attrs, value) in locked_bucket.iter() { + // Handle potential lock failure on self.start and use current time as fallback + let start_time = self.start.lock().map_or_else( + |_| SystemTime::now(), // Use SystemTime::now() as fallback on error + |guard| *guard, // Dereference the guard to get the SystemTime on success + ); + + s_data.data_points.push(DataPoint { + attributes: attrs.clone(), + start_time: Some(start_time), + time: Some(t), + value: *value, + exemplars: vec![], + }); + } } + } else { + global::handle_error(MetricsError::Other( + "Failed to acquire lock on a bucket".into(), + )); } } @@ -352,20 +389,32 @@ impl> PrecomputedSum { } for bucket_mutex in self.value_map.buckets.iter() { - if let Some(ref mut locked_bucket) = *bucket_mutex.lock().unwrap() { - let default = T::default(); - for (attrs, value) in locked_bucket.drain() { - let delta = value - *reported.get(&attrs).unwrap_or(&default); - if delta != default { - new_reported.insert(attrs.clone(), value); + match bucket_mutex.lock() { + Ok(mut locked_bucket) => { + if let Some(locked_bucket) = &mut *locked_bucket { + let default = T::default(); + for (attrs, value) in locked_bucket.drain() { + let delta = value - *reported.get(&attrs).unwrap_or(&default); + if delta != default { + new_reported.insert(attrs.clone(), value); + } + s_data.data_points.push(DataPoint { + attributes: attrs.clone(), + start_time: Some(prev_start), + time: Some(t), + value: delta, + exemplars: vec![], + }); + } } - s_data.data_points.push(DataPoint { - attributes: attrs.clone(), - start_time: Some(prev_start), - time: Some(t), - value: delta, - exemplars: vec![], - }); + } + Err(e) => { + // Log or handle the lock acquisition error if necessary + global::handle_error(MetricsError::Other( + format!("Failed to acquire lock on bucket due to: {:?}", e).into(), + )); + // Continue to the next bucket if the lock cannot be acquired + continue; } } } @@ -434,17 +483,31 @@ impl> PrecomputedSum { let default = T::default(); for bucket_mutex in self.value_map.buckets.iter() { - if let Some(ref locked_bucket) = *bucket_mutex.lock().unwrap() { + // Safely attempt to acquire the lock, handling any potential error. + let locked_bucket = match bucket_mutex.lock() { + Ok(bucket) => bucket, + Err(e) => { + // Log the error or handle it as needed. + global::handle_error(MetricsError::Other( + format!("Failed to acquire lock on bucket due to: {:?}", e).into(), + )); + continue; // Skip to the next bucket if the lock cannot be acquired. + } + }; + + // Proceed only if the bucket is not empty. + if let Some(locked_bucket) = &*locked_bucket { for (attrs, value) in locked_bucket.iter() { let delta = *value - *reported.get(attrs).unwrap_or(&default); if delta != default { new_reported.insert(attrs.clone(), *value); } + s_data.data_points.push(DataPoint { attributes: attrs.clone(), start_time: Some(prev_start), time: Some(t), - value: *value, // For cumulative, we use the value directly without calculating delta + value: *value, // For cumulative, directly use the value without calculating the delta. exemplars: vec![], }); }