Skip to content

Commit

Permalink
backend/feat: Rolling Window Stop Detection
Browse files Browse the repository at this point in the history
  • Loading branch information
khuzema786 committed Oct 16, 2024
1 parent 8bcac68 commit 8439f81
Show file tree
Hide file tree
Showing 10 changed files with 175 additions and 170 deletions.
237 changes: 121 additions & 116 deletions crates/location_tracking_service/src/common/stop_detection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,196 +7,201 @@
*/

use crate::common::types::*;
use crate::common::utils::{distance_between_in_meters, get_bucket_from_timestamp};
use crate::domain::types::ui::location::UpdateDriverLocationRequest;
use crate::common::utils::{
distance_between_in_meters, get_bucket_from_timestamp, get_bucket_weightage_from_timestamp,
};
use crate::environment::StopDetectionConfig;

/// Sums the location points within the same timestamp bucket.
/// Filters driver locations based on bucket weightage, including both current and previous bucket.
///
/// # Arguments
///
/// * `locations` - A slice of `Location` objects to be processed.
/// * `bucket_size` - The target bucket size.
/// * `initial` - The initial sum and count of the geographical coordinates (lat/lon).
/// * `locations` - A slice of `DriverLocation` representing the driver's location history.
/// * `bucket_size` - The size of the bucket in terms of time (e.g., seconds).
/// * `curr_bucket` - The current bucket based on the latest driver's location timestamp.
/// * `curr_bucket_capacity_based_on_weightage` - The weightage used to limit the number of points in the current bucket.
///
/// # Returns
///
/// A tuple containing the updated sum of location points and the total number of points in the bucket.
fn sum_locations_in_bucket(
locations: &[UpdateDriverLocationRequest],
/// A vector containing filtered driver locations based on the bucket weightage (current and previous buckets).
fn filter_locations_based_on_bucket_weightage(
locations: &[DriverLocation],
bucket_size: u64,
curr_bucket: u64,
initial: (Point, usize),
) -> (Point, usize) {
locations
.iter()
.fold(initial, |(loc_sum, count), location| {
let bucket = get_bucket_from_timestamp(&bucket_size, location.ts);
curr_bucket_capacity_based_on_weightage: u64,
) -> Vec<DriverLocation> {
let prev_bucket_capacity_based_on_weightage =
std::cmp::max(0, bucket_size - curr_bucket_capacity_based_on_weightage);

// Using fold to filter locations
let (mut filtered_locations, _, _) = locations.iter().rev().fold(
(
vec![],
curr_bucket_capacity_based_on_weightage,
prev_bucket_capacity_based_on_weightage,
),
|(
mut locations,
curr_bucket_capacity_based_on_weightage,
prev_bucket_capacity_based_on_weightage,
),
location| {
let bucket = get_bucket_from_timestamp(&bucket_size, location.timestamp);
if bucket == curr_bucket {
if curr_bucket_capacity_based_on_weightage > 0 {
locations.push(location.to_owned());
(
locations,
curr_bucket_capacity_based_on_weightage - 1,
prev_bucket_capacity_based_on_weightage,
)
} else {
locations.push(location.to_owned());
(
locations,
curr_bucket_capacity_based_on_weightage,
prev_bucket_capacity_based_on_weightage - 1,
)
}
} else if bucket == curr_bucket - 1 && prev_bucket_capacity_based_on_weightage > 0 {
locations.push(location.to_owned());
(
Point {
lat: Latitude(loc_sum.lat.inner() + location.pt.lat.inner()),
lon: Longitude(loc_sum.lon.inner() + location.pt.lon.inner()),
},
count + 1,
locations,
curr_bucket_capacity_based_on_weightage,
prev_bucket_capacity_based_on_weightage - 1,
)
} else {
(loc_sum, count)
(
locations,
curr_bucket_capacity_based_on_weightage,
prev_bucket_capacity_based_on_weightage,
)
}
})
},
);

filtered_locations.reverse(); // Ensure locations are returned in correct order
filtered_locations
}

/// Calculates the mean location (latitude and longitude) based on the summed locations and total points.
/// Calculates the mean location (latitude and longitude) based on a set of driver locations.
///
/// # Arguments
///
/// * `location_sum` - The sum of latitude and longitude values.
/// * `locations` - A slice of `DriverLocation` to compute the mean from.
/// * `total_points` - The total number of points used to calculate the mean.
///
/// # Returns
///
/// A `Point` representing the mean location.
fn calculate_mean_location(location_sum: Point, total_points: usize) -> Point {
fn calculate_mean_location(locations: &[DriverLocation], total_points: usize) -> Point {
let location_sum = locations.iter().fold(
Point {
lat: Latitude(0.0),
lon: Longitude(0.0),
},
|location_sum, location| Point {
lat: Latitude(location_sum.lat.inner() + location.location.lat.inner()),
lon: Longitude(location_sum.lon.inner() + location.location.lon.inner()),
},
);
Point {
lat: Latitude(location_sum.lat.inner() / total_points as f64),
lon: Longitude(location_sum.lon.inner() / total_points as f64),
}
}

/// Determines whether a stop is detected based on distance and the number of points within the radius.
/// Determines whether a stop has been detected by comparing the mean location to the latest location.
///
/// # Arguments
///
/// * `mean_location` - The mean location of the points.
/// * `latest_location` - The latest location of the driver.
/// * `total_points_in_bucket` - The total number of points in the current bucket.
/// * `config` - Configuration parameters for stop detection.
/// * `mean_location` - The calculated mean location of driver pings.
/// * `total_points` - The total number of points used to calculate the mean.
/// * `latest_location` - The most recent driver location.
/// * `config` - Configuration for stop detection.
///
/// # Returns
///
/// A boolean indicating whether a stop has been detected.
/// A boolean indicating whether the stop has been detected based on the radius threshold.
fn is_stop_detected(
mean_location: &Point,
total_points: usize,
latest_location: &Point,
total_points_in_bucket: usize,
config: &StopDetectionConfig,
) -> bool {
let distance = distance_between_in_meters(mean_location, latest_location);
distance < config.radius_threshold_meters as f64
&& total_points_in_bucket >= config.min_points_within_radius_threshold
&& total_points >= config.min_points_within_radius_threshold
}

/// Detects whether a stop has occurred based on driver locations and stop detection configuration.
/// Detects whether the driver has stopped based on a sliding window of driver locations and the stop detection configuration.
///
/// # Arguments
///
/// * `driver_ride_status` - The current ride status of the driver.
/// * `stop_detection` - Optional stop detection data, if already available.
/// * `locations` - A slice of `Location` objects representing driver locations.
/// * `latest_driver_location` - The latest driver location.
/// * `driver_ride_status` - The current ride status of the driver (must be `NEW` to process).
/// * `stop_detection` - Optional stop detection data from previous checks, if any.
/// * `latest_driver_location` - The latest known driver location.
/// * `config` - Configuration parameters for stop detection.
///
/// # Returns
///
/// A tuple containing an `Option<(Point, usize)>` with the detected stop location and number of points, and an `Option<StopDetection>` with the updated stop detection data.
/// A tuple:
/// * `Option<Point>` - The detected stop location, if a stop has been identified.
/// * `Option<StopDetection>` - Updated stop detection data to be used for further checks.
pub fn detect_stop(
driver_ride_status: Option<&RideStatus>,
stop_detection: Option<StopDetection>,
locations: &[UpdateDriverLocationRequest],
latest_driver_location: &UpdateDriverLocationRequest,
latest_driver_location: DriverLocation,
config: &StopDetectionConfig,
) -> (Option<(Point, usize)>, Option<StopDetection>) {
) -> (Option<Point>, Option<StopDetection>) {
if driver_ride_status != Some(&RideStatus::NEW) {
return (None, None);
}

let duration_bucket = get_bucket_from_timestamp(
// Determine the current time bucket and the associated weightage
let curr_bucket = get_bucket_from_timestamp(
&config.duration_threshold_seconds,
latest_driver_location.ts,
latest_driver_location.timestamp,
);

let curr_bucket_weightage = get_bucket_weightage_from_timestamp(
&config.duration_threshold_seconds,
latest_driver_location.timestamp,
);

let curr_bucket_max_points_to_consider =
(curr_bucket_weightage * config.min_points_within_radius_threshold as f64).round() as u64;

if let Some(stop_detection) = stop_detection {
// Sum the locations for the current bucket
let (location_sum, total_points_in_bucket) = sum_locations_in_bucket(
locations,
let mut locations = filter_locations_based_on_bucket_weightage(
&stop_detection.locations,
config.duration_threshold_seconds,
stop_detection.duration_bucket,
(
stop_detection.location_sum,
stop_detection.total_points_in_bucket,
),
curr_bucket,
curr_bucket_max_points_to_consider,
);

// Check if the bucket has changed
if stop_detection.duration_bucket != duration_bucket {
let mean_location = calculate_mean_location(location_sum, total_points_in_bucket);

// Determine if a stop is detected
let stop_detected = if is_stop_detected(
&mean_location,
&latest_driver_location.pt,
total_points_in_bucket,
config,
) {
Some((mean_location, total_points_in_bucket))
} else {
None
};

// Sum locations for the new bucket after detecting the stop
let (new_location_sum, new_total_points) = sum_locations_in_bucket(
locations,
config.duration_threshold_seconds,
duration_bucket,
(
Point {
lat: Latitude(0.0),
lon: Longitude(0.0),
},
0,
),
);
let mean_location = calculate_mean_location(&locations, locations.len());

(
stop_detected,
Some(StopDetection {
location_sum: new_location_sum,
duration_bucket,
total_points_in_bucket: new_total_points,
}),
)
let stop_detected = if is_stop_detected(
&mean_location,
locations.len(),
&latest_driver_location.location,
config,
) {
locations.clear(); // Clear the history if a stop is detected
Some(mean_location)
} else {
// Update the stop detection without changing the bucket
(
None,
Some(StopDetection {
location_sum,
duration_bucket,
total_points_in_bucket,
}),
)
}
} else {
// Case where stop detection has just started
let (location_sum, total_points_in_bucket) = sum_locations_in_bucket(
locations,
config.duration_threshold_seconds,
duration_bucket,
(
Point {
lat: Latitude(0.0),
lon: Longitude(0.0),
},
0,
),
);
locations.push(latest_driver_location); // Add the latest location
None
};

(stop_detected, Some(StopDetection { locations }))
} else {
// First time: start with the latest location
(
None,
Some(StopDetection {
location_sum,
duration_bucket,
total_points_in_bucket,
locations: vec![latest_driver_location],
}),
)
}
Expand Down
10 changes: 7 additions & 3 deletions crates/location_tracking_service/src/common/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,11 +185,15 @@ pub struct DriverLastKnownLocation {
pub merchant_id: MerchantId,
}

#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct DriverLocation {
pub location: Point,
pub timestamp: TimeStamp,
}

#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct StopDetection {
pub location_sum: Point,
pub duration_bucket: u64,
pub total_points_in_bucket: usize,
pub locations: Vec<DriverLocation>,
}

#[derive(Serialize, Deserialize, Clone, Debug)]
Expand Down
7 changes: 7 additions & 0 deletions crates/location_tracking_service/src/common/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,13 @@ pub fn get_bucket_from_timestamp(bucket_expiry_in_seconds: &u64, TimeStamp(ts):
ts.timestamp() as u64 / bucket_expiry_in_seconds
}

pub fn get_bucket_weightage_from_timestamp(
bucket_expiry_in_seconds: &u64,
TimeStamp(ts): TimeStamp,
) -> f64 {
ts.timestamp() as f64 % *bucket_expiry_in_seconds as f64 / *bucket_expiry_in_seconds as f64
}

/// Calculates the distance between two geographical points in meters.
///
/// The function utilizes the haversine formula to compute the great-circle
Expand Down
Loading

0 comments on commit 8439f81

Please sign in to comment.