Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat message management #378

Draft
wants to merge 10 commits into
base: main
Choose a base branch
from
97 changes: 97 additions & 0 deletions django_project/dcas/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from dcas.rules.rule_engine import DCASRuleEngine
from dcas.rules.variables import DCASData
from dcas.service import GrowthStageService
from dcas.utils import read_grid_crop_data


def calculate_growth_stage(
Expand Down Expand Up @@ -114,3 +115,99 @@
var_name = f'message_{idx + 1}' if idx > 0 else 'message'
row[var_name] = code
return row


def get_last_message_date(
farm_id: int,
crop_id: int,
message_code: str,
historical_parquet_path: str
) -> pd.Timestamp:
"""
Get the last date a message code was sent for a specific farm and crop.

:param farm_id: ID of the farm
:type farm_id: int
:param crop_id: ID of the crop
:type crop_id: int
:param message_code: The message code to check
:type message_code: str
:param historical_parquet_path: Path to the historical message parquet file
:type historical_parquet_path: str
:return: Timestamp of the last message occurrence or None if not found
:rtype: pd.Timestamp or None
"""
# Read historical messages
historical_data = read_grid_crop_data(
historical_parquet_path, [], [crop_id], []
)

# Filter messages for the given farm, crop, and message code
filtered_data = historical_data[
(historical_data['farm_id'] == farm_id) &
(historical_data['crop_id'] == crop_id) &
(
(historical_data['message'] == message_code) |
(historical_data['message_2'] == message_code) |
(historical_data['message_3'] == message_code) |
(historical_data['message_4'] == message_code) |
(historical_data['message_5'] == message_code)
)
]

# If no record exists, return None
if filtered_data.empty:
return None

# Return the most recent message date
return filtered_data['message_date'].max()


def filter_messages_by_weeks(
df: pd.DataFrame,
historical_parquet_path: str,
weeks_constraint: int
) -> pd.DataFrame:
"""
Remove messages that have been sent within the last X weeks.

:param df: DataFrame containing new messages to be sent
:type df: pd.DataFrame
:param historical_parquet_path: Path to historical message parquet file
:type historical_parquet_path: str
:param weeks_constraint: Number of weeks to check for duplicate messages
:type weeks_constraint: int
:return: DataFrame with duplicate messages removed
:rtype: pd.DataFrame
"""
print("Available columns in df:", df.columns) # Debugging line

if 'farm_id' not in df.columns:
raise KeyError("Column 'farm_id' is missing in the DataFrame!")

Check warning on line 186 in django_project/dcas/functions.py

View check run for this annotation

Codecov / codecov/patch

django_project/dcas/functions.py#L186

Added line #L186 was not covered by tests
min_allowed_date = (
pd.Timestamp.now() - pd.Timedelta(weeks=weeks_constraint)
)

for idx, row in df.iterrows():
for message_column in [
'message',
'message_2',
'message_3',
'message_4',
'message_5'
]:
message_code = row[message_column]

if pd.isna(message_code):
continue # Skip empty messages

last_sent_date = get_last_message_date(
row['farm_id'],
row['crop_id'],
message_code,
historical_parquet_path)

if last_sent_date and last_sent_date >= min_allowed_date:
df.at[idx, message_column] = None # Remove duplicate message

return df
6 changes: 6 additions & 0 deletions django_project/dcas/outputs.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,12 @@
"""Return full path to grid with crop data."""
return self.grid_crop_data_dir_path + '/*.parquet'

@property
def farm_crop_data_path(self):
"""Return full path to the farm crop data parquet file."""
return self._get_directory_path(

Check warning on line 81 in django_project/dcas/outputs.py

View check run for this annotation

Codecov / codecov/patch

django_project/dcas/outputs.py#L81

Added line #L81 was not covered by tests
self.DCAS_OUTPUT_DIR) + '/*.parquet'

def _setup_s3fs(self):
"""Initialize s3fs."""
self.s3 = self._get_s3_variables()
Expand Down
30 changes: 30 additions & 0 deletions django_project/dcas/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
from dcas.queries import DataQuery
from dcas.outputs import DCASPipelineOutput, OutputType
from dcas.inputs import DCASPipelineInput
from dcas.functions import filter_messages_by_weeks
from dcas.service import GrowthStageService


Expand Down Expand Up @@ -306,6 +307,13 @@
grid_crop_df_meta = self.data_query.grid_data_with_crop_meta(
self.farm_registry_group
)
# add farm_id
if "farm_id" not in grid_crop_df_meta.columns:
grid_crop_df_meta = grid_crop_df_meta.assign(

Check warning on line 312 in django_project/dcas/pipeline.py

View check run for this annotation

Codecov / codecov/patch

django_project/dcas/pipeline.py#L311-L312

Added lines #L311 - L312 were not covered by tests
farm_id=pd.Series(dtype='Int64')
)
# Ensure the column order in `meta` matches the expected DataFrame
grid_crop_df_meta = grid_crop_df_meta[grid_crop_df.columns]

Check warning on line 316 in django_project/dcas/pipeline.py

View check run for this annotation

Codecov / codecov/patch

django_project/dcas/pipeline.py#L316

Added line #L316 was not covered by tests

# Process gdd cumulative
# for Total GDD, we use date from planting_date to request_date - 1
Expand Down Expand Up @@ -445,6 +453,25 @@

return file_path

def filter_message_output(self):
"""Filter messages before extracting CSV."""
print(f'Applying message: {self.data_output.farm_crop_data_path}')

Check warning on line 458 in django_project/dcas/pipeline.py

View check run for this annotation

Codecov / codecov/patch

django_project/dcas/pipeline.py#L458

Added line #L458 was not covered by tests

# Read Parquet file (processed farm crop data)
df = dd.read_parquet(self.data_output.farm_crop_data_path)

Check warning on line 461 in django_project/dcas/pipeline.py

View check run for this annotation

Codecov / codecov/patch

django_project/dcas/pipeline.py#L461

Added line #L461 was not covered by tests

# Apply message filtering
df = df.map_partitions(

Check warning on line 464 in django_project/dcas/pipeline.py

View check run for this annotation

Codecov / codecov/patch

django_project/dcas/pipeline.py#L464

Added line #L464 was not covered by tests
filter_messages_by_weeks,
self.data_output.farm_crop_data_path,
2, # Weeks constraint (default: 2 weeks)
)

# Save the filtered Parquet file (overwrite previous Parquet)
df.to_parquet(self.data_output.farm_crop_data_path, write_index=False)

Check warning on line 471 in django_project/dcas/pipeline.py

View check run for this annotation

Codecov / codecov/patch

django_project/dcas/pipeline.py#L471

Added line #L471 was not covered by tests

print('Finished filtering messages.')

Check warning on line 473 in django_project/dcas/pipeline.py

View check run for this annotation

Codecov / codecov/patch

django_project/dcas/pipeline.py#L473

Added line #L473 was not covered by tests

def run(self):
"""Run data pipeline."""
self.setup()
Expand All @@ -454,6 +481,9 @@
self.process_grid_crop_data()

self.process_farm_registry_data()

self.filter_message_output()

Check warning on line 485 in django_project/dcas/pipeline.py

View check run for this annotation

Codecov / codecov/patch

django_project/dcas/pipeline.py#L485

Added line #L485 was not covered by tests

self.extract_csv_output()

self.cleanup_gdd_matrix()
Expand Down
135 changes: 134 additions & 1 deletion django_project/dcas/tests/test_pipeline_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,13 @@
import numpy as np
import pandas as pd
from mock import patch
from datetime import datetime, timedelta

from dcas.tests.base import DCASPipelineBaseTest
from dcas.functions import calculate_growth_stage
from dcas.functions import (
calculate_growth_stage, get_last_message_date,
filter_messages_by_weeks
)


def set_cache_dummy(cache_key, growth_stage_matrix, timeout):
Expand Down Expand Up @@ -217,3 +221,132 @@ def test_calculate_growth_stage_na_value(self, mock_cache):
self.assertIn('growth_stage_start_date', row)
self.assertEqual(row['growth_stage_id'], 2)
self.assertEqual(row['growth_stage_start_date'], 124)

@patch("dcas.functions.read_grid_crop_data")
def test_get_last_message_date_exists(self, mock_read_grid_crop_data):
"""
Test when a message exists in history.

It should return the latest timestamp among all message columns.
"""
now = datetime.now()
mock_data = pd.DataFrame({
'farm_id': [1, 1, 1, 2, 2, 3],
'crop_id': [100, 100, 100, 101, 101, 102],
'message': ['MSG1', 'MSG2', 'MSG1', 'MSG3', 'MSG1', 'MSG4'],
'message_2': [None, 'MSG1', None, None, 'MSG3', None],
'message_3': [None, None, 'MSG1', None, None, None],
'message_4': [None, None, None, None, None, None],
'message_5': [None, None, None, 'MSG1', None, 'MSG4'],
'message_date': [
now - timedelta(days=15), # MSG1 - Old
now - timedelta(days=10), # MSG2
now - timedelta(days=5), # MSG1 - More recent
now - timedelta(days=12), # MSG3
now - timedelta(days=3), # MSG1 - Most recent
now - timedelta(days=20) # MSG4 - Oldest
]
})

mock_read_grid_crop_data.return_value = mock_data

# Latest MSG1 should be at index 4 (3 days ago)
result = get_last_message_date(2, 101, "MSG1", "/fake/path")
assert result == mock_data['message_date'].iloc[4]

# Latest MSG3 should be at index 3 (12 days ago)
result = get_last_message_date(2, 101, "MSG3", "/fake/path")
assert result == mock_data['message_date'].iloc[4]

# Latest MSG2 should be at index 1 (10 days ago)
result = get_last_message_date(1, 100, "MSG2", "/fake/path")
assert result == mock_data['message_date'].iloc[1]

# Latest MSG1 for farm 1, crop 100 should be at index 2 (5 days ago)
result = get_last_message_date(1, 100, "MSG1", "/fake/path")
assert result == mock_data['message_date'].iloc[2]

# MSG5 exists only once, at index 3 (12 days ago)
result = get_last_message_date(2, 101, "MSG5", "/fake/path")
assert result is None # No MSG5 found

@patch("dcas.functions.read_grid_crop_data")
def test_get_last_message_date_not_exists(self, mock_read_grid_crop_data):
"""Test when the message does not exist in history."""
mock_data = pd.DataFrame({
'farm_id': [1, 1, 2],
'crop_id': [100, 100, 101],
'message': ['MSG2', 'MSG3', 'MSG4'],
'message_2': [None, None, None],
'message_3': [None, None, None],
'message_4': [None, None, None],
'message_5': [None, None, None],
'message_date': [
pd.Timestamp(datetime.now() - timedelta(days=10)),
pd.Timestamp(datetime.now() - timedelta(days=5)),
pd.Timestamp(datetime.now() - timedelta(days=3))
]
})
mock_read_grid_crop_data.return_value = mock_data

result = get_last_message_date(1, 100, "MSG1", "/fake/path")
self.assertIsNone(result)

@patch("dcas.functions.read_grid_crop_data")
def test_get_last_message_date_multiple_messages(
self,
mock_read_grid_crop_data
):
"""
Test when the same message appears multiple times.

And should return the most recent timestamp.
"""
mock_data = pd.DataFrame({
'farm_id': [1, 1, 1],
'crop_id': [100, 100, 100],
'message': ['MSG1', 'MSG1', 'MSG1'],
'message_2': [None, None, None],
'message_3': [None, None, None],
'message_4': [None, None, None],
'message_5': [None, None, None],
'message_date': [
pd.Timestamp(datetime.now() - timedelta(days=15)),
pd.Timestamp(datetime.now() - timedelta(days=7)),
pd.Timestamp(datetime.now() - timedelta(days=2))
]
})
mock_read_grid_crop_data.return_value = mock_data

result = get_last_message_date(1, 100, "MSG1", "/fake/path")
self.assertEqual(result, mock_data['message_date'].iloc[2])

@patch("dcas.functions.get_last_message_date")
def test_filter_messages_by_weeks(self, mock_get_last_message_date):
"""Test filtering messages based on the time constraint (weeks)."""
test_weeks = 2 # Remove messages sent within the last 2 weeks
current_date = pd.Timestamp(datetime.now())

df = pd.DataFrame({
'farm_id': [1, 2, 3],
'crop_id': [100, 200, 300],
'message': ['MSG1', 'MSG2', 'MSG3'],
'message_2': [None, None, None],
'message_3': [None, None, None],
'message_4': [None, None, None],
'message_5': [None, None, None],
})

# Simulating last message dates for each row
mock_get_last_message_date.side_effect = [
current_date - timedelta(weeks=1), # Should be removed
current_date - timedelta(weeks=3), # Should stay
None # No history, should stay
]

filtered_df = filter_messages_by_weeks(df, "/fake/path", test_weeks)

# Assert that the correct messages were removed
self.assertIsNone(filtered_df.loc[0, 'message']) # Removed
self.assertEqual(filtered_df.loc[1, 'message'], 'MSG2') # Kept
self.assertEqual(filtered_df.loc[2, 'message'], 'MSG3') # Kept
Loading