From cc6e79bb933561cb6082300b465bb5c7e431bab1 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Thu, 30 Jan 2025 01:49:59 +0530 Subject: [PATCH] refactor: reuse `TimeRange` --- src/query/listing_table_builder.rs | 10 +- src/utils/mod.rs | 232 +------------------------ src/utils/time.rs | 269 ++++++++++++++++++++++++++++- 3 files changed, 272 insertions(+), 239 deletions(-) diff --git a/src/query/listing_table_builder.rs b/src/query/listing_table_builder.rs index 1ddb9c3bf..f8ea4f75c 100644 --- a/src/query/listing_table_builder.rs +++ b/src/query/listing_table_builder.rs @@ -34,7 +34,7 @@ use object_store::{path::Path, ObjectMeta, ObjectStore}; use crate::{ event::DEFAULT_TIMESTAMP_KEY, storage::{ObjectStorage, OBJECT_STORE_DATA_GRANULARITY}, - utils::TimePeriod, + utils::time::TimeRange, }; use super::PartialTimeFilter; @@ -88,12 +88,8 @@ impl ListingTableBuilder { }; // Generate prefixes for the given time range - let prefixes = TimePeriod::new( - start_time.and_utc(), - end_time.and_utc(), - OBJECT_STORE_DATA_GRANULARITY, - ) - .generate_prefixes(); + let prefixes = TimeRange::new(start_time.and_utc(), end_time.and_utc()) + .generate_prefixes(OBJECT_STORE_DATA_GRANULARITY); // Categorizes prefixes into "minute" and general resolve lists. let mut minute_resolve = HashMap::>::new(); diff --git a/src/utils/mod.rs b/src/utils/mod.rs index 87b528b6d..55ec1d1dd 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -31,7 +31,7 @@ use crate::rbac::role::{Action, Permission}; use crate::rbac::Users; use actix::extract_session_key_from_req; use actix_web::HttpRequest; -use chrono::{DateTime, NaiveDate, NaiveDateTime, NaiveTime, Timelike, Utc}; +use chrono::{NaiveDate, NaiveDateTime, NaiveTime, Utc}; use itertools::Itertools; use regex::Regex; use sha2::{Digest, Sha256}; @@ -85,153 +85,6 @@ pub fn minute_to_prefix(minute: u32, data_granularity: u32) -> Option { )) } -pub struct TimePeriod { - start: DateTime, - end: DateTime, - data_granularity: u32, -} - -impl TimePeriod { - pub fn new(start: DateTime, end: DateTime, data_granularity: u32) -> Self { - Self { - data_granularity, - start, - end, - } - } - - pub fn generate_prefixes(&self) -> Vec { - let end_minute = self.end.minute() + u32::from(self.end.second() > 0); - self.generate_date_prefixes( - self.start.date_naive(), - self.end.date_naive(), - (self.start.hour(), self.start.minute()), - (self.end.hour(), end_minute), - ) - } - - pub fn generate_minute_prefixes( - &self, - prefix: &str, - start_minute: u32, - end_minute: u32, - ) -> Vec { - if start_minute == end_minute { - return vec![]; - } - - let (start_block, end_block) = ( - start_minute / self.data_granularity, - end_minute / self.data_granularity, - ); - - let forbidden_block = 60 / self.data_granularity; - - // ensure both start and end are within the same hour, else return prefix as is - if end_block - start_block >= forbidden_block { - return vec![prefix.to_owned()]; - } - - let mut prefixes = vec![]; - - let push_prefix = |block: u32, prefixes: &mut Vec<_>| { - if let Some(minute_prefix) = - minute_to_prefix(block * self.data_granularity, self.data_granularity) - { - let prefix = prefix.to_owned() + &minute_prefix; - prefixes.push(prefix); - } - }; - - for block in start_block..end_block { - push_prefix(block, &mut prefixes); - } - - // NOTE: for block sizes larger than a minute ensure - // ensure last block is considered - if self.data_granularity > 1 { - push_prefix(end_block, &mut prefixes); - } - - prefixes - } - - pub fn generate_hour_prefixes( - &self, - prefix: &str, - start_hour: u32, - start_minute: u32, - end_hour: u32, - end_minute: u32, - ) -> Vec { - // ensure both start and end are within the same day - if end_hour - start_hour >= 24 { - return vec![prefix.to_owned()]; - } - - let mut prefixes = vec![]; - - for hour in start_hour..=end_hour { - if hour == 24 { - break; - } - let prefix = prefix.to_owned() + &hour_to_prefix(hour); - let is_start = hour == start_hour; - let is_end = hour == end_hour; - - if is_start || is_end { - let minute_prefixes = self.generate_minute_prefixes( - &prefix, - if is_start { start_minute } else { 0 }, - if is_end { end_minute } else { 60 }, - ); - prefixes.extend(minute_prefixes); - } else { - prefixes.push(prefix); - } - } - - prefixes - } - - pub fn generate_date_prefixes( - &self, - start_date: NaiveDate, - end_date: NaiveDate, - start_time: (u32, u32), - end_time: (u32, u32), - ) -> Vec { - let mut prefixes = vec![]; - let mut date = start_date; - - while date <= end_date { - let prefix = date_to_prefix(date); - let is_start = date == start_date; - let is_end = date == end_date; - - if is_start || is_end { - let ((start_hour, start_minute), (end_hour, end_minute)) = ( - if is_start { start_time } else { (0, 0) }, - if is_end { end_time } else { (24, 60) }, - ); - let hour_prefixes = self.generate_hour_prefixes( - &prefix, - start_hour, - start_minute, - end_hour, - end_minute, - ); - prefixes.extend(hour_prefixes); - } else { - prefixes.push(prefix); - } - date = date.succ_opt().unwrap(); - } - - prefixes - } -} - pub fn get_url() -> Url { if CONFIG.options.ingestor_endpoint.is_empty() { return format!( @@ -373,86 +226,3 @@ pub fn user_auth_for_query( Ok(()) } - -#[cfg(test)] -mod tests { - use chrono::DateTime; - use rstest::*; - - use super::TimePeriod; - - fn time_period_from_str(start: &str, end: &str) -> TimePeriod { - TimePeriod::new( - DateTime::parse_from_rfc3339(start).unwrap().into(), - DateTime::parse_from_rfc3339(end).unwrap().into(), - 1, - ) - } - - #[rstest] - #[case::same_minute( - "2022-06-11T16:30:00+00:00", "2022-06-11T16:30:59+00:00", - &["date=2022-06-11/hour=16/minute=30/"] - )] - #[case::same_hour_different_minute( - "2022-06-11T16:57:00+00:00", "2022-06-11T16:59:00+00:00", - &[ - "date=2022-06-11/hour=16/minute=57/", - "date=2022-06-11/hour=16/minute=58/" - ] - )] - #[case::same_hour_with_00_to_59_minute_block( - "2022-06-11T16:00:00+00:00", "2022-06-11T16:59:59+00:00", - &["date=2022-06-11/hour=16/"] - )] - #[case::same_date_different_hours_coherent_minute( - "2022-06-11T15:00:00+00:00", "2022-06-11T17:00:00+00:00", - &[ - "date=2022-06-11/hour=15/", - "date=2022-06-11/hour=16/" - ] - )] - #[case::same_date_different_hours_incoherent_minutes( - "2022-06-11T15:59:00+00:00", "2022-06-11T16:01:00+00:00", - &[ - "date=2022-06-11/hour=15/minute=59/", - "date=2022-06-11/hour=16/minute=00/" - ] - )] - #[case::same_date_different_hours_whole_hours_between_incoherent_minutes( - "2022-06-11T15:59:00+00:00", "2022-06-11T17:01:00+00:00", - &[ - "date=2022-06-11/hour=15/minute=59/", - "date=2022-06-11/hour=16/", - "date=2022-06-11/hour=17/minute=00/" - ] - )] - #[case::different_date_coherent_hours_and_minutes( - "2022-06-11T00:00:00+00:00", "2022-06-13T00:00:00+00:00", - &[ - "date=2022-06-11/", - "date=2022-06-12/" - ] - )] - #[case::different_date_incoherent_hours_coherent_minutes( - "2022-06-11T23:00:01+00:00", "2022-06-12T01:59:59+00:00", - &[ - "date=2022-06-11/hour=23/", - "date=2022-06-12/hour=00/", - "date=2022-06-12/hour=01/" - ] - )] - #[case::different_date_incoherent_hours_incoherent_minutes( - "2022-06-11T23:59:59+00:00", "2022-06-12T00:01:00+00:00", - &[ - "date=2022-06-11/hour=23/minute=59/", - "date=2022-06-12/hour=00/minute=00/" - ] - )] - fn prefix_generation(#[case] start: &str, #[case] end: &str, #[case] right: &[&str]) { - let time_period = time_period_from_str(start, end); - let prefixes = time_period.generate_prefixes(); - let left = prefixes.iter().map(String::as_str).collect::>(); - assert_eq!(left.as_slice(), right); - } -} diff --git a/src/utils/time.rs b/src/utils/time.rs index f728b0cca..8dc808bd4 100644 --- a/src/utils/time.rs +++ b/src/utils/time.rs @@ -16,7 +16,9 @@ * */ -use chrono::{DateTime, Utc}; +use chrono::{DateTime, NaiveDate, TimeDelta, Timelike, Utc}; + +use super::minute_to_slot; #[derive(Debug, thiserror::Error)] pub enum TimeParseError { @@ -30,6 +32,24 @@ pub enum TimeParseError { StartTimeAfterEndTime, } +type Prefix = String; + +#[derive(Clone, Copy)] +struct TimeBounds { + start_date: NaiveDate, + start_hour: u32, + start_minute: u32, + end_date: NaiveDate, + end_hour: u32, + end_minute: u32, +} + +impl TimeBounds { + fn spans_full_day(&self) -> bool { + self.end_hour - self.start_hour >= 24 + } +} + /// Represents a range of time with a start and end point. #[derive(Debug)] pub struct TimeRange { @@ -38,6 +58,10 @@ pub struct TimeRange { } impl TimeRange { + pub fn new(start: DateTime, end: DateTime) -> Self { + TimeRange { start, end } + } + /// Parses human-readable time strings into a `TimeRange` object. /// /// # Arguments @@ -73,12 +97,181 @@ impl TimeRange { Ok(Self { start, end }) } + + /// Generates prefixes for the time period, e.g: + /// 1. ("2022-06-11T23:00:01+00:00", "2022-06-12T01:59:59+00:00") => ["date=2022-06-11/hour=23/", "date=2022-06-12/hour=00/", "date=2022-06-12/hour=01/""] + /// 2. ("2022-06-11T15:59:00+00:00", "2022-06-11T17:01:00+00:00") => ["date=2022-06-11/hour=15/minute=59/", "date=2022-06-11/hour=16/", "date=2022-06-11/hour=17/minute=00/"] + pub fn generate_prefixes(self, data_granularity: u32) -> Vec { + let mut prefixes = vec![]; + let time_bounds = self.calculate_time_bounds(); + let mut current_date = time_bounds.start_date; + + while current_date <= time_bounds.end_date { + self.process_date(data_granularity, current_date, time_bounds, &mut prefixes); + current_date += TimeDelta::days(1); + } + + prefixes + } + + fn calculate_time_bounds(&self) -> TimeBounds { + TimeBounds { + start_date: self.start.date_naive(), + start_hour: self.start.hour(), + start_minute: self.start.minute(), + end_date: self.end.date_naive(), + end_hour: self.end.hour(), + end_minute: self.end.minute() + u32::from(self.end.second() > 0), + } + } + + fn process_date( + &self, + data_granularity: u32, + date: NaiveDate, + bounds: TimeBounds, + prefixes: &mut Vec, + ) { + let prefix = format!("date={date}/"); + let is_start = date == bounds.start_date; + let is_end = date == bounds.end_date; + + if !is_start && !is_end { + prefixes.push(prefix); + return; + } + + let time_bounds = self.get_time_bounds(is_start, is_end, bounds); + if time_bounds.spans_full_day() { + prefixes.push(prefix); + return; + } + + self.process_hours(data_granularity, prefix, time_bounds, prefixes); + } + + fn process_hours( + &self, + data_granularity: u32, + date_prefix: String, + time_bounds: TimeBounds, + prefixes: &mut Vec, + ) { + for hour in time_bounds.start_hour..=time_bounds.end_hour { + if hour == 24 { + break; + } + + let hour_prefix = format!("{date_prefix}hour={hour:02}/"); + let is_start_hour = hour == time_bounds.start_hour; + let is_end_hour = hour == time_bounds.end_hour; + + if !is_start_hour && !is_end_hour { + prefixes.push(hour_prefix); + continue; + } + + self.process_minutes( + data_granularity, + hour_prefix, + is_start_hour, + is_end_hour, + time_bounds, + prefixes, + ); + } + } + + fn process_minutes( + &self, + data_granularity: u32, + hour_prefix: String, + is_start_hour: bool, + is_end_hour: bool, + mut time_bounds: TimeBounds, + prefixes: &mut Vec, + ) { + if !is_start_hour { + time_bounds.start_minute = 0; + } + if !is_end_hour { + time_bounds.end_minute = 60; + }; + + if time_bounds.start_minute == time_bounds.end_minute { + return; + } + + let (start_block, end_block) = ( + time_bounds.start_minute / data_granularity, + time_bounds.end_minute / data_granularity, + ); + + let forbidden_block = 60 / data_granularity; + if end_block - start_block >= forbidden_block { + prefixes.push(hour_prefix); + return; + } + + self.generate_minute_prefixes( + data_granularity, + hour_prefix, + start_block, + end_block, + prefixes, + ); + } + + fn generate_minute_prefixes( + &self, + data_granularity: u32, + hour_prefix: String, + start_block: u32, + end_block: u32, + prefixes: &mut Vec, + ) { + let mut push_prefix = |block: u32| { + if let Some(minute_slot) = minute_to_slot(block * data_granularity, data_granularity) { + let prefix = format!("{hour_prefix}minute={minute_slot}/"); + prefixes.push(prefix); + } + }; + + for block in start_block..end_block { + push_prefix(block); + } + + // Handle last block for granularity > 1 + if data_granularity > 1 { + push_prefix(end_block); + } + } + + fn get_time_bounds( + &self, + is_start: bool, + is_end: bool, + mut time_bounds: TimeBounds, + ) -> TimeBounds { + if !is_start { + time_bounds.start_hour = 0; + time_bounds.start_minute = 0; + } + + if !is_end { + time_bounds.end_hour = 24; + time_bounds.end_minute = 60; + } + time_bounds + } } #[cfg(test)] mod tests { use super::*; + use chrono::{Duration, SecondsFormat, Utc}; + use rstest::*; #[test] fn valid_rfc3339_timestamps() { @@ -154,4 +347,78 @@ mod tests { let result = TimeRange::parse_human_time(start_time, end_time); assert!(matches!(result, Err(TimeParseError::HumanTime(_)))); } + + fn time_period_from_str(start: &str, end: &str) -> TimeRange { + TimeRange { + start: DateTime::parse_from_rfc3339(start).unwrap().into(), + end: DateTime::parse_from_rfc3339(end).unwrap().into(), + } + } + + #[rstest] + #[case::same_minute( + "2022-06-11T16:30:00+00:00", "2022-06-11T16:30:59+00:00", + &["date=2022-06-11/hour=16/minute=30/"] + )] + #[case::same_hour_different_minute( + "2022-06-11T16:57:00+00:00", "2022-06-11T16:59:00+00:00", + &[ + "date=2022-06-11/hour=16/minute=57/", + "date=2022-06-11/hour=16/minute=58/" + ] + )] + #[case::same_hour_with_00_to_59_minute_block( + "2022-06-11T16:00:00+00:00", "2022-06-11T16:59:59+00:00", + &["date=2022-06-11/hour=16/"] + )] + #[case::same_date_different_hours_coherent_minute( + "2022-06-11T15:00:00+00:00", "2022-06-11T17:00:00+00:00", + &[ + "date=2022-06-11/hour=15/", + "date=2022-06-11/hour=16/" + ] + )] + #[case::same_date_different_hours_incoherent_minutes( + "2022-06-11T15:59:00+00:00", "2022-06-11T16:01:00+00:00", + &[ + "date=2022-06-11/hour=15/minute=59/", + "date=2022-06-11/hour=16/minute=00/" + ] + )] + #[case::same_date_different_hours_whole_hours_between_incoherent_minutes( + "2022-06-11T15:59:00+00:00", "2022-06-11T17:01:00+00:00", + &[ + "date=2022-06-11/hour=15/minute=59/", + "date=2022-06-11/hour=16/", + "date=2022-06-11/hour=17/minute=00/" + ] + )] + #[case::different_date_coherent_hours_and_minutes( + "2022-06-11T00:00:00+00:00", "2022-06-13T00:00:00+00:00", + &[ + "date=2022-06-11/", + "date=2022-06-12/" + ] + )] + #[case::different_date_incoherent_hours_coherent_minutes( + "2022-06-11T23:00:01+00:00", "2022-06-12T01:59:59+00:00", + &[ + "date=2022-06-11/hour=23/", + "date=2022-06-12/hour=00/", + "date=2022-06-12/hour=01/" + ] + )] + #[case::different_date_incoherent_hours_incoherent_minutes( + "2022-06-11T23:59:59+00:00", "2022-06-12T00:01:00+00:00", + &[ + "date=2022-06-11/hour=23/minute=59/", + "date=2022-06-12/hour=00/minute=00/" + ] + )] + fn prefix_generation(#[case] start: &str, #[case] end: &str, #[case] right: &[&str]) { + let time_period = time_period_from_str(start, end); + let prefixes = time_period.generate_prefixes(1); + let left = prefixes.iter().map(String::as_str).collect::>(); + assert_eq!(left.as_slice(), right); + } }