From 8439f81b91bc2495d9736bc51bebe141a1807cbd Mon Sep 17 00:00:00 2001 From: khuzema786 Date: Thu, 17 Oct 2024 01:50:05 +0530 Subject: [PATCH] backend/feat: Rolling Window Stop Detection --- .../src/common/stop_detection.rs | 237 +++++++++--------- .../src/common/types.rs | 10 +- .../src/common/utils.rs | 7 + .../src/domain/action/internal/location.rs | 14 +- .../src/domain/action/ui/location.rs | 45 ++-- .../src/domain/types/internal/location.rs | 6 +- .../src/kafka/producers.rs | 19 +- .../src/kafka/types.rs | 4 +- .../src/outbound/external.rs | 2 - .../src/outbound/types.rs | 1 - 10 files changed, 175 insertions(+), 170 deletions(-) diff --git a/crates/location_tracking_service/src/common/stop_detection.rs b/crates/location_tracking_service/src/common/stop_detection.rs index 50c19a6..c5865b9 100644 --- a/crates/location_tracking_service/src/common/stop_detection.rs +++ b/crates/location_tracking_service/src/common/stop_detection.rs @@ -7,196 +7,201 @@ */ use crate::common::types::*; -use crate::common::utils::{distance_between_in_meters, get_bucket_from_timestamp}; -use crate::domain::types::ui::location::UpdateDriverLocationRequest; +use crate::common::utils::{ + distance_between_in_meters, get_bucket_from_timestamp, get_bucket_weightage_from_timestamp, +}; use crate::environment::StopDetectionConfig; -/// Sums the location points within the same timestamp bucket. +/// Filters driver locations based on bucket weightage, including both current and previous bucket. /// /// # Arguments /// -/// * `locations` - A slice of `Location` objects to be processed. -/// * `bucket_size` - The target bucket size. -/// * `initial` - The initial sum and count of the geographical coordinates (lat/lon). +/// * `locations` - A slice of `DriverLocation` representing the driver's location history. +/// * `bucket_size` - The size of the bucket in terms of time (e.g., seconds). +/// * `curr_bucket` - The current bucket based on the latest driver's location timestamp. +/// * `curr_bucket_capacity_based_on_weightage` - The weightage used to limit the number of points in the current bucket. /// /// # Returns /// -/// A tuple containing the updated sum of location points and the total number of points in the bucket. -fn sum_locations_in_bucket( - locations: &[UpdateDriverLocationRequest], +/// A vector containing filtered driver locations based on the bucket weightage (current and previous buckets). +fn filter_locations_based_on_bucket_weightage( + locations: &[DriverLocation], bucket_size: u64, curr_bucket: u64, - initial: (Point, usize), -) -> (Point, usize) { - locations - .iter() - .fold(initial, |(loc_sum, count), location| { - let bucket = get_bucket_from_timestamp(&bucket_size, location.ts); + curr_bucket_capacity_based_on_weightage: u64, +) -> Vec { + let prev_bucket_capacity_based_on_weightage = + std::cmp::max(0, bucket_size - curr_bucket_capacity_based_on_weightage); + + // Using fold to filter locations + let (mut filtered_locations, _, _) = locations.iter().rev().fold( + ( + vec![], + curr_bucket_capacity_based_on_weightage, + prev_bucket_capacity_based_on_weightage, + ), + |( + mut locations, + curr_bucket_capacity_based_on_weightage, + prev_bucket_capacity_based_on_weightage, + ), + location| { + let bucket = get_bucket_from_timestamp(&bucket_size, location.timestamp); if bucket == curr_bucket { + if curr_bucket_capacity_based_on_weightage > 0 { + locations.push(location.to_owned()); + ( + locations, + curr_bucket_capacity_based_on_weightage - 1, + prev_bucket_capacity_based_on_weightage, + ) + } else { + locations.push(location.to_owned()); + ( + locations, + curr_bucket_capacity_based_on_weightage, + prev_bucket_capacity_based_on_weightage - 1, + ) + } + } else if bucket == curr_bucket - 1 && prev_bucket_capacity_based_on_weightage > 0 { + locations.push(location.to_owned()); ( - Point { - lat: Latitude(loc_sum.lat.inner() + location.pt.lat.inner()), - lon: Longitude(loc_sum.lon.inner() + location.pt.lon.inner()), - }, - count + 1, + locations, + curr_bucket_capacity_based_on_weightage, + prev_bucket_capacity_based_on_weightage - 1, ) } else { - (loc_sum, count) + ( + locations, + curr_bucket_capacity_based_on_weightage, + prev_bucket_capacity_based_on_weightage, + ) } - }) + }, + ); + + filtered_locations.reverse(); // Ensure locations are returned in correct order + filtered_locations } -/// Calculates the mean location (latitude and longitude) based on the summed locations and total points. +/// Calculates the mean location (latitude and longitude) based on a set of driver locations. /// /// # Arguments /// -/// * `location_sum` - The sum of latitude and longitude values. +/// * `locations` - A slice of `DriverLocation` to compute the mean from. /// * `total_points` - The total number of points used to calculate the mean. /// /// # Returns /// /// A `Point` representing the mean location. -fn calculate_mean_location(location_sum: Point, total_points: usize) -> Point { +fn calculate_mean_location(locations: &[DriverLocation], total_points: usize) -> Point { + let location_sum = locations.iter().fold( + Point { + lat: Latitude(0.0), + lon: Longitude(0.0), + }, + |location_sum, location| Point { + lat: Latitude(location_sum.lat.inner() + location.location.lat.inner()), + lon: Longitude(location_sum.lon.inner() + location.location.lon.inner()), + }, + ); Point { lat: Latitude(location_sum.lat.inner() / total_points as f64), lon: Longitude(location_sum.lon.inner() / total_points as f64), } } -/// Determines whether a stop is detected based on distance and the number of points within the radius. +/// Determines whether a stop has been detected by comparing the mean location to the latest location. /// /// # Arguments /// -/// * `mean_location` - The mean location of the points. -/// * `latest_location` - The latest location of the driver. -/// * `total_points_in_bucket` - The total number of points in the current bucket. -/// * `config` - Configuration parameters for stop detection. +/// * `mean_location` - The calculated mean location of driver pings. +/// * `total_points` - The total number of points used to calculate the mean. +/// * `latest_location` - The most recent driver location. +/// * `config` - Configuration for stop detection. /// /// # Returns /// -/// A boolean indicating whether a stop has been detected. +/// A boolean indicating whether the stop has been detected based on the radius threshold. fn is_stop_detected( mean_location: &Point, + total_points: usize, latest_location: &Point, - total_points_in_bucket: usize, config: &StopDetectionConfig, ) -> bool { let distance = distance_between_in_meters(mean_location, latest_location); distance < config.radius_threshold_meters as f64 - && total_points_in_bucket >= config.min_points_within_radius_threshold + && total_points >= config.min_points_within_radius_threshold } -/// Detects whether a stop has occurred based on driver locations and stop detection configuration. +/// Detects whether the driver has stopped based on a sliding window of driver locations and the stop detection configuration. /// /// # Arguments /// -/// * `driver_ride_status` - The current ride status of the driver. -/// * `stop_detection` - Optional stop detection data, if already available. -/// * `locations` - A slice of `Location` objects representing driver locations. -/// * `latest_driver_location` - The latest driver location. +/// * `driver_ride_status` - The current ride status of the driver (must be `NEW` to process). +/// * `stop_detection` - Optional stop detection data from previous checks, if any. +/// * `latest_driver_location` - The latest known driver location. /// * `config` - Configuration parameters for stop detection. /// /// # Returns /// -/// A tuple containing an `Option<(Point, usize)>` with the detected stop location and number of points, and an `Option` with the updated stop detection data. +/// A tuple: +/// * `Option` - The detected stop location, if a stop has been identified. +/// * `Option` - Updated stop detection data to be used for further checks. pub fn detect_stop( driver_ride_status: Option<&RideStatus>, stop_detection: Option, - locations: &[UpdateDriverLocationRequest], - latest_driver_location: &UpdateDriverLocationRequest, + latest_driver_location: DriverLocation, config: &StopDetectionConfig, -) -> (Option<(Point, usize)>, Option) { +) -> (Option, Option) { if driver_ride_status != Some(&RideStatus::NEW) { return (None, None); } - let duration_bucket = get_bucket_from_timestamp( + // Determine the current time bucket and the associated weightage + let curr_bucket = get_bucket_from_timestamp( &config.duration_threshold_seconds, - latest_driver_location.ts, + latest_driver_location.timestamp, ); + let curr_bucket_weightage = get_bucket_weightage_from_timestamp( + &config.duration_threshold_seconds, + latest_driver_location.timestamp, + ); + + let curr_bucket_max_points_to_consider = + (curr_bucket_weightage * config.min_points_within_radius_threshold as f64).round() as u64; + if let Some(stop_detection) = stop_detection { - // Sum the locations for the current bucket - let (location_sum, total_points_in_bucket) = sum_locations_in_bucket( - locations, + let mut locations = filter_locations_based_on_bucket_weightage( + &stop_detection.locations, config.duration_threshold_seconds, - stop_detection.duration_bucket, - ( - stop_detection.location_sum, - stop_detection.total_points_in_bucket, - ), + curr_bucket, + curr_bucket_max_points_to_consider, ); - // Check if the bucket has changed - if stop_detection.duration_bucket != duration_bucket { - let mean_location = calculate_mean_location(location_sum, total_points_in_bucket); - - // Determine if a stop is detected - let stop_detected = if is_stop_detected( - &mean_location, - &latest_driver_location.pt, - total_points_in_bucket, - config, - ) { - Some((mean_location, total_points_in_bucket)) - } else { - None - }; - - // Sum locations for the new bucket after detecting the stop - let (new_location_sum, new_total_points) = sum_locations_in_bucket( - locations, - config.duration_threshold_seconds, - duration_bucket, - ( - Point { - lat: Latitude(0.0), - lon: Longitude(0.0), - }, - 0, - ), - ); + let mean_location = calculate_mean_location(&locations, locations.len()); - ( - stop_detected, - Some(StopDetection { - location_sum: new_location_sum, - duration_bucket, - total_points_in_bucket: new_total_points, - }), - ) + let stop_detected = if is_stop_detected( + &mean_location, + locations.len(), + &latest_driver_location.location, + config, + ) { + locations.clear(); // Clear the history if a stop is detected + Some(mean_location) } else { - // Update the stop detection without changing the bucket - ( - None, - Some(StopDetection { - location_sum, - duration_bucket, - total_points_in_bucket, - }), - ) - } - } else { - // Case where stop detection has just started - let (location_sum, total_points_in_bucket) = sum_locations_in_bucket( - locations, - config.duration_threshold_seconds, - duration_bucket, - ( - Point { - lat: Latitude(0.0), - lon: Longitude(0.0), - }, - 0, - ), - ); + locations.push(latest_driver_location); // Add the latest location + None + }; + (stop_detected, Some(StopDetection { locations })) + } else { + // First time: start with the latest location ( None, Some(StopDetection { - location_sum, - duration_bucket, - total_points_in_bucket, + locations: vec![latest_driver_location], }), ) } diff --git a/crates/location_tracking_service/src/common/types.rs b/crates/location_tracking_service/src/common/types.rs index 6a19156..fe61739 100644 --- a/crates/location_tracking_service/src/common/types.rs +++ b/crates/location_tracking_service/src/common/types.rs @@ -185,11 +185,15 @@ pub struct DriverLastKnownLocation { pub merchant_id: MerchantId, } +#[derive(Serialize, Deserialize, Clone, Debug)] +pub struct DriverLocation { + pub location: Point, + pub timestamp: TimeStamp, +} + #[derive(Serialize, Deserialize, Clone, Debug)] pub struct StopDetection { - pub location_sum: Point, - pub duration_bucket: u64, - pub total_points_in_bucket: usize, + pub locations: Vec, } #[derive(Serialize, Deserialize, Clone, Debug)] diff --git a/crates/location_tracking_service/src/common/utils.rs b/crates/location_tracking_service/src/common/utils.rs index af8bd77..9d47fa5 100644 --- a/crates/location_tracking_service/src/common/utils.rs +++ b/crates/location_tracking_service/src/common/utils.rs @@ -136,6 +136,13 @@ pub fn get_bucket_from_timestamp(bucket_expiry_in_seconds: &u64, TimeStamp(ts): ts.timestamp() as u64 / bucket_expiry_in_seconds } +pub fn get_bucket_weightage_from_timestamp( + bucket_expiry_in_seconds: &u64, + TimeStamp(ts): TimeStamp, +) -> f64 { + ts.timestamp() as f64 % *bucket_expiry_in_seconds as f64 / *bucket_expiry_in_seconds as f64 +} + /// Calculates the distance between two geographical points in meters. /// /// The function utilizes the haversine formula to compute the great-circle diff --git a/crates/location_tracking_service/src/domain/action/internal/location.rs b/crates/location_tracking_service/src/domain/action/internal/location.rs index e8f2e14..f3c491c 100644 --- a/crates/location_tracking_service/src/domain/action/internal/location.rs +++ b/crates/location_tracking_service/src/domain/action/internal/location.rs @@ -35,7 +35,7 @@ async fn search_nearby_drivers_with_vehicle( bucket: &u64, location: Point, radius: &Radius, -) -> Result, AppError> { +) -> Result, AppError> { let nearby_drivers = get_drivers_within_radius( redis, nearby_bucket_threshold, @@ -63,7 +63,7 @@ async fn search_nearby_drivers_with_vehicle( .as_ref() .map(|driver_last_known_location| driver_last_known_location.timestamp) .unwrap_or(TimeStamp(Utc::now())); - DriverLocation { + DriverLocationDetail { driver_id: driver.driver_id.to_owned(), lat: driver.location.lat, lon: driver.location.lon, @@ -73,7 +73,7 @@ async fn search_nearby_drivers_with_vehicle( merchant_id: merchant_id.to_owned(), } }) - .collect::>(); + .collect::>(); Ok(resp) } @@ -95,7 +95,7 @@ pub async fn get_nearby_drivers( match vehicle_type { None => { - let mut resp: Vec = Vec::new(); + let mut resp: Vec = Vec::new(); for vehicle in VehicleType::iter() { let nearby_drivers = search_nearby_drivers_with_vehicle( @@ -122,7 +122,7 @@ pub async fn get_nearby_drivers( Ok(resp) } Some(vehicles) => { - let mut resp: Vec = Vec::new(); + let mut resp: Vec = Vec::new(); for vehicle in vehicles { let nearby_drivers = search_nearby_drivers_with_vehicle( &data.redis, @@ -153,7 +153,7 @@ pub async fn get_nearby_drivers( pub async fn get_drivers_location( data: Data, driver_ids: Vec, -) -> Result, AppError> { +) -> Result, AppError> { let mut driver_locations = Vec::with_capacity(driver_ids.len()); let driver_last_known_location = @@ -163,7 +163,7 @@ pub async fn get_drivers_location( driver_ids.iter().zip(driver_last_known_location.iter()) { if let Some(driver_last_known_location) = driver_last_known_location { - let driver_location = DriverLocation { + let driver_location = DriverLocationDetail { driver_id: driver_id.to_owned(), lat: driver_last_known_location.location.lat, lon: driver_last_known_location.location.lon, diff --git a/crates/location_tracking_service/src/domain/action/ui/location.rs b/crates/location_tracking_service/src/domain/action/ui/location.rs index f1a6b34..c33b15f 100644 --- a/crates/location_tracking_service/src/domain/action/ui/location.rs +++ b/crates/location_tracking_service/src/domain/action/ui/location.rs @@ -10,7 +10,7 @@ use crate::common::utils::{distance_between_in_meters, get_city, is_blacklist_fo use crate::common::{ sliding_window_rate_limiter::sliding_window_limiter, stop_detection::detect_stop, types::*, }; -use crate::domain::types::ui::location::*; +use crate::domain::types::ui::location::{DriverLocationResponse, UpdateDriverLocationRequest}; use crate::environment::AppState; use crate::kafka::producers::kafka_stream_updates; use crate::outbound::external::{ @@ -279,28 +279,13 @@ async fn process_driver_locations( .as_ref() .map(|driver_location_details| driver_location_details.stop_detection.to_owned()) .flatten(), - &locations, - &latest_driver_location, + DriverLocation { + location: latest_driver_location.pt.to_owned(), + timestamp: latest_driver_location.ts, + }, &data.stop_detection, ); - let set_driver_last_location_update = async { - set_driver_last_location_update( - &data.redis, - &data.last_location_timstamp_expiry, - &driver_id, - &merchant_id, - &latest_driver_location.pt, - &latest_driver_location_ts, - &None::, - stop_detection, - // travelled_distance.to_owned(), - ) - .await?; - Ok(()) - }; - all_tasks.push(Box::pin(set_driver_last_location_update)); - let is_blacklist_for_special_zone = is_blacklist_for_special_zone( &merchant_id, &data.blacklist_merchants, @@ -366,6 +351,23 @@ async fn process_driver_locations( locations }; + let set_driver_last_location_update = async { + set_driver_last_location_update( + &data.redis, + &data.last_location_timstamp_expiry, + &driver_id, + &merchant_id, + &latest_driver_location.pt, + &latest_driver_location_ts, + &None::, + stop_detection, + // travelled_distance.to_owned(), + ) + .await?; + Ok(()) + }; + all_tasks.push(Box::pin(set_driver_last_location_update)); + if let (Some(RideStatus::INPROGRESS), Some(ride_id)) = (driver_ride_status.as_ref(), driver_ride_id.as_ref()) { @@ -448,11 +450,10 @@ async fn process_driver_locations( // } // } // } - if let Some((location, total_points)) = stop_detected.as_ref() { + if let Some(location) = stop_detected.as_ref() { let _ = trigger_stop_detection_event( &data.stop_detection.stop_detection_update_callback_url, location, - total_points, ) .await; } diff --git a/crates/location_tracking_service/src/domain/types/internal/location.rs b/crates/location_tracking_service/src/domain/types/internal/location.rs index 1e01cb1..39371ea 100644 --- a/crates/location_tracking_service/src/domain/types/internal/location.rs +++ b/crates/location_tracking_service/src/domain/types/internal/location.rs @@ -19,7 +19,7 @@ pub struct NearbyDriversRequest { pub merchant_id: MerchantId, } -pub type NearbyDriverResponse = Vec; +pub type NearbyDriverResponse = Vec; #[derive(Serialize, Deserialize, Clone, Debug)] #[serde(rename_all = "camelCase")] @@ -27,11 +27,11 @@ pub struct GetDriversLocationRequest { pub driver_ids: Vec, } -pub type GetDriversLocationResponse = Vec; +pub type GetDriversLocationResponse = Vec; #[derive(Serialize, Deserialize, Debug, Clone)] #[serde(rename_all = "camelCase")] -pub struct DriverLocation { +pub struct DriverLocationDetail { pub driver_id: DriverId, pub lat: Latitude, pub lon: Longitude, diff --git a/crates/location_tracking_service/src/kafka/producers.rs b/crates/location_tracking_service/src/kafka/producers.rs index 28d1b53..b070d29 100644 --- a/crates/location_tracking_service/src/kafka/producers.rs +++ b/crates/location_tracking_service/src/kafka/producers.rs @@ -45,7 +45,7 @@ pub async fn kafka_stream_updates( driver_mode: DriverMode, DriverId(key): &DriverId, vehicle_type: VehicleType, - stop_detected: Option<(Point, usize)>, + stop_location: Option, // travelled_distance: Meters, ) { let ride_status = match ride_status { @@ -54,17 +54,11 @@ pub async fn kafka_stream_updates( _ => DriverRideStatus::IDLE, }; - let (is_stop_detected, stop_lat, stop_lon, stop_points) = - if let Some((stop_mean_location, stop_total_points)) = stop_detected { - ( - Some(true), - Some(stop_mean_location.lat), - Some(stop_mean_location.lon), - Some(stop_total_points), - ) - } else { - (None, None, None, None) - }; + let (is_stop_detected, stop_lat, stop_lon) = if let Some(stop_location) = stop_location { + (Some(true), Some(stop_location.lat), Some(stop_location.lon)) + } else { + (None, None, None) + }; for loc in locations { let message = LocationUpdate { @@ -91,7 +85,6 @@ pub async fn kafka_stream_updates( is_stop_detected, stop_lat, stop_lon, - stop_points, // travelled_distance: travelled_distance.to_owned(), }; if let Err(err) = push_to_kafka(producer, topic, key.as_str(), message).await { diff --git a/crates/location_tracking_service/src/kafka/types.rs b/crates/location_tracking_service/src/kafka/types.rs index fd1b8bd..219ec5c 100644 --- a/crates/location_tracking_service/src/kafka/types.rs +++ b/crates/location_tracking_service/src/kafka/types.rs @@ -38,7 +38,5 @@ pub struct LocationUpdate { pub vehicle_variant: VehicleType, pub is_stop_detected: Option, pub stop_lat: Option, - pub stop_lon: Option, - pub stop_points: Option, - // pub travelled_distance: Meters, + pub stop_lon: Option, // pub travelled_distance: Meters, } diff --git a/crates/location_tracking_service/src/outbound/external.rs b/crates/location_tracking_service/src/outbound/external.rs index 6956d18..df05a91 100644 --- a/crates/location_tracking_service/src/outbound/external.rs +++ b/crates/location_tracking_service/src/outbound/external.rs @@ -109,7 +109,6 @@ pub async fn trigger_fcm_dobpp( pub async fn trigger_stop_detection_event( stop_detection_callback_url: &Url, location: &Point, - total_points: &usize, ) -> Result { call_api::( Protocol::Http1, @@ -118,7 +117,6 @@ pub async fn trigger_stop_detection_event( vec![("content-type", "application/json")], Some(StopDetectionReq { location: location.to_owned(), - total_locations: *total_points, }), ) .await diff --git a/crates/location_tracking_service/src/outbound/types.rs b/crates/location_tracking_service/src/outbound/types.rs index 1ba1583..00073f6 100644 --- a/crates/location_tracking_service/src/outbound/types.rs +++ b/crates/location_tracking_service/src/outbound/types.rs @@ -42,7 +42,6 @@ pub struct TriggerFcmReq { #[serde(rename_all = "camelCase")] pub struct StopDetectionReq { pub location: Point, - pub total_locations: usize, } // Live activity notification trigger for IOS