Skip to content

Commit

Permalink
#944: More refactor and core code for grouping CSLC granules and dete…
Browse files Browse the repository at this point in the history
…rmining triggering logic for DISP-S1 processing for validation purposes
  • Loading branch information
philipjyoon committed Dec 16, 2024
1 parent 7af5280 commit 8932db9
Show file tree
Hide file tree
Showing 4 changed files with 213 additions and 102 deletions.
98 changes: 7 additions & 91 deletions report/opera-validator/opera_validator.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
import argparse
import concurrent.futures
import multiprocessing
import random
import re
import sqlite3
Expand All @@ -13,81 +11,13 @@
import tqdm
import logging

from opv_util import (generate_url_params, parallel_fetch, retrieve_r3_products, get_total_granules, get_burst_id,
get_burst_sensing_datetime, get_burst_ids_from_file)
from opv_util import (get_granules_from_query, retrieve_r3_products, get_burst_id, get_burst_sensing_datetime, get_burst_ids_from_file)
from opv_disp_s1 import validate_disp_s1, map_cslc_bursts_to_frames

from data_subscriber.cslc_utils import parse_cslc_file_name, localize_disp_frame_burst_hist

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

def get_granules_from_query(start, end, timestamp, endpoint, provider = 'ASF', shortname = 'OPERA_L2_RTC-S1_V1'):
"""
Fetches granule metadata from the CMR API within a specified temporal range using parallel requests.
:start: Start time in ISO 8601 format.
:end: End time in ISO 8601 format.
:timestamp: Type of timestamp to filter granules (e.g., 'TEMPORAL', 'PRODUCTION').
:endpoint: CMR API endpoint ('OPS' or 'UAT').
:provider: Data provider ID (default 'ASF').
:shortname: Short name of the product (default 'OPERA_L2_RTC-S1_V1').
:return: List of granule metadata.
"""

granules = []

base_url, params = generate_url_params(start=start, end=end, timestamp_type=timestamp, endpoint=endpoint, provider=provider, short_name=shortname)

# Construct the URL for the total granules query
total_granules = get_total_granules(base_url, params)
print(f"Total granules: {total_granules}")
print(f"Querying CMR for time range {start} to {end}.")

# Exit with error code if no granules to process
if (total_granules == 0):
print(f"Error: no granules to process.")
sys.exit(1)

# Optimize page_size and number of workers based on total_granules
page_size = min(1000, total_granules)

# Initialize progress bar
tqdm.tqdm._instances.clear() # Clear any existing tqdm instances
print()

# Main loop to fetch granules, update progress bar, and extract burst_ids
with tqdm.tqdm(total=total_granules, desc="Fetching granules", position=0) as pbar_global:
downloaded_batches = multiprocessing.Value('i', 0) # For counting downloaded batches
total_batches = (total_granules + page_size - 1) // page_size

with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
# NOTE: parallelized workers beyond 5 not working well, but here's some code to make it work in the future
# max_workers = min(5, (total_granules + page_size - 1) // page_size)
# futures = [executor.submit(parallel_fetch, base_url, params, page_num, page_size, downloaded_batches, total_batches) for page_num in range(1, total_batches + 1)]
futures = []
for page_num in range(1, total_batches + 1):
future = executor.submit(parallel_fetch, base_url, params, page_num, page_size, downloaded_batches)
futures.append(future)
random_delay = random.uniform(0, 0.1)
time.sleep(random_delay) # Stagger the submission of function calls for CMR optimization
logging.debug(f"Scheduled granule fetch for batch {page_num}")

for future in concurrent.futures.as_completed(futures):
granules_result = future.result()
pbar_global.update(len(granules_result))

granules.extend(granules_result)

print("\nGranule fetching complete.")

# Integrity check for total granules
total_downloaded = sum(len(future.result()) for future in futures)
if total_downloaded != total_granules:
print(f"\nError: Expected {total_granules} granules, but downloaded {total_downloaded}. Try running again after some delay.")
sys.exit(1)

return granules

def get_granule_ids_from_granules(granules):
"""
Extracts granule IDs from a list of granule metadata.
Expand Down Expand Up @@ -257,6 +187,7 @@ def count_missing(row):
parser.add_argument("--endpoint_daac_output", required=False, choices=['UAT', 'OPS'], default='OPS', help='CMR endpoint venue for DSWx-S1 granules')
parser.add_argument("--validate", action='store_true', help="Validate if DSWx-S1 products have been delivered for given time range (use --timestamp TEMPORAL mode only)")
parser.add_argument("--product", required=True, choices=['DSWx-S1', 'DISP-S1'], default='DSWx-S1', help="The product to validate")
parser.add_argument("--disp_s1_frames_only", required=False, help="Restrict validation to these frame numbers only. Comma-separated list of frames")
# Parse the command-line arguments
args = parser.parse_args()

Expand Down Expand Up @@ -375,30 +306,15 @@ def count_missing(row):
print(tabulate(df[['MGRS Set ID', 'Coverage Percentage', 'Total RTC Burst IDs Count', 'Covered RTC Burst ID Count']], headers='keys', tablefmt='plain', showindex=False))
print(f"Expected DSWx-S1 products: {df['MGRS Tiles Count'].sum()}, MGRS Set IDs covered: {len(df)}")
elif (args.product == 'DISP-S1'):
# Gather list of bursts and dates for CSLC sening time range
burst_ids, burst_dates = get_burst_ids_and_sensing_times_from_query(start=args.start, end=args.end, endpoint='OPS', timestamp=args.timestamp, shortname='OPERA_L2_CSLC-S1_V1')

# Process the disp s1 consistent database file
frames_to_bursts, burst_to_frames, _ = localize_disp_frame_burst_hist()

# Generate a table that has frames, all bursts, and matching bursts listed
df = map_cslc_bursts_to_frames(burst_ids=burst_ids.keys(), bursts_to_frames = burst_to_frames, frames_to_bursts=frames_to_bursts)

print(df)

print(burst_dates)
smallest_date = None
greatest_date = None

validate_disp_s1(smallest_date, greatest_date, args.endpoint_daac_output, df)

# Filter to only those frames that have full coverage (i.e. all bursts == matching)
df = df[df['All Possible Bursts Count'] == df['Matching Bursts Count']]
# Perform all validation work in this function
result_df = validate_disp_s1(args.start, args.end, args.timestamp, args.endpoint_daac_input, args.endpoint_daac_output, args.disp_s1_frames_only)
print(result_df)

if (args.verbose):
print(tabulate(df[['Frame ID','All Possible Bursts', 'Matching Bursts']], headers='keys', tablefmt='plain', showindex=False))
print(tabulate(result_df[['Frame ID','All Possible Bursts', 'Matching Bursts']], headers='keys', tablefmt='plain', showindex=False))
else:
print(tabulate(df[['Frame ID','All Possible Bursts Count', 'Matching Bursts Count']], headers='keys', tablefmt='plain', showindex=False))
print(tabulate(result_df[['Frame ID','All Possible Bursts Count', 'Matching Bursts Count']], headers='keys', tablefmt='plain', showindex=False))

else:
logging.error(f"Arguments for for --product '{args.product}' missing or not invalid.")
106 changes: 100 additions & 6 deletions report/opera-validator/opv_disp_s1.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,108 @@
from collections import defaultdict
import logging
import requests
import re
import sys
import pandas as pd

from opv_util import retrieve_r3_products, BURST_AND_DATE_GRANULE_PATTERN
from opv_util import retrieve_r3_products, BURST_AND_DATE_GRANULE_PATTERN, get_granules_from_query
from data_subscriber import es_conn_util
from data_subscriber.cslc_utils import parse_cslc_native_id, localize_disp_frame_burst_hist

_DISP_S1_INDEX_PATTERNS = "grq_v*_l3_disp_s1*"

def validate_disp_s1(smallest_date, greatest_date, endpoint, df, logger):
def get_frame_to_dayindex_to_granule(granule_ids, frames_to_validate, burst_to_frames, frame_to_bursts):
"""
Looks something like:
{8889:
{0: ['S1B_IW_SLC__1SDV_20210701T235959_20210702T000026_027000_033D7D_1', 'S1B_IW_SLC__1SDV_20210701T235959_20210702T000026_027000_033D7D_1'] },
{12: ['S1B_IW_SLC__1SDV_20210701T235959_20210702T000026_027000_033D7D_1', 'S1B_IW_SLC__1SDV_20210701T235959_20210702T000026_027000_033D7D_1'] }},
8890:
{0: ['S1B_IW_SLC__1SDV_20210701T235959_20210702T000026_027000_033D7D_1', 'S1B_IW_SLC__1SDV_20210701T235959_20210702T000026_027000_033D7D_1'] },
{24: ['S1B_IW_SLC__1SDV_20210701T235959_20210702T000026_027000_033D7D_1', 'S1B_IW_SLC__1SDV_20210701T235959_20210702T000026_027000_033D7D_1'] }}
}
"""

# Create a map of frame IDs to acquisition day index to the granule ID
frame_to_dayindex_to_granule = defaultdict(lambda: defaultdict(set))
for granule_id in granule_ids:
burst_id, acquisition_dts, acquisition_cycles, frame_ids = parse_cslc_native_id(granule_id, burst_to_frames,
frame_to_bursts)
for frame_id in frame_ids:

# 1. If the frame does not show up in the database file, skip it
if frame_id not in frames_to_validate:
logging.debug(f"Frame ID {frame_id} is not in the list of frames to validate. Skipping.")
continue

# 2. If the acquisition cycle is not in the database file, skip it
acq_cycle = acquisition_cycles[frame_id]
if acq_cycle < 0 or \
acq_cycle < frame_to_bursts[frame_id].sensing_datetime_days_index[-1] and acq_cycle not in frame_to_bursts[frame_id].sensing_datetime_days_index:
logging.debug(f"Frame ID {frame_id} has no acquisition cycle {acq_cycle} in the database file. Skipping.")
continue

frame_to_dayindex_to_granule[frame_id][acq_cycle].add(granule_id)

return frame_to_dayindex_to_granule

def filter_for_trigger_frame(frame_to_dayindex_to_granule, frame_to_bursts, burst_to_frames):
'''
Given a dictionary of frame IDs to day indices to granule IDs, filter for the frame that should trigger the DISP-S1 job.
The frame at given day index is triggered if its granule burst ids is a subset of the corresponding list in the database.
This is a purely deductive function. Remove any day indices that do not meet the criteria.
WARNING! The input dictionary is modified in place. It's also being returned for convenience.
'''

for frame_id in frame_to_dayindex_to_granule:
for day_index in list(frame_to_dayindex_to_granule[frame_id].keys()):
granule_ids = frame_to_dayindex_to_granule[frame_id][day_index]
burst_set = set()
for granule_id in granule_ids:
burst_id, _, _, _ = parse_cslc_native_id(granule_id, burst_to_frames, frame_to_bursts)
burst_set.add(burst_id)
if burst_set.issuperset(frame_to_bursts[frame_id].burst_ids):
continue
else:
frame_to_dayindex_to_granule[frame_id].pop(day_index)

# If the frame has no day indices left, remove it completely
for frame_id in list(frame_to_dayindex_to_granule.keys()):
if len(frame_to_dayindex_to_granule[frame_id].keys()) == 0:
frame_to_dayindex_to_granule.pop(frame_id)

return frame_to_dayindex_to_granule
def validate_disp_s1(start_date, end_date, timestamp, input_endpoint, output_endpoint, disp_s1_frames_only, shortname='OPERA_L2_CSLC-S1_V1'):

# Process the disp s1 consistent database file
frame_to_bursts, burst_to_frames, _ = localize_disp_frame_burst_hist()

# If no frame list is provided, we will validate for all frames DISP-S1 is supposed to process.
if disp_s1_frames_only is not None:
frames_to_validate = set([int(f) for f in disp_s1_frames_only.split(',')])
else:
frames_to_validate = set(frame_to_bursts.keys())

granules = get_granules_from_query(start=start_date, end=end_date, timestamp=timestamp, endpoint=input_endpoint, provider="ASF",
shortname=shortname)
if (granules):
granule_ids = [granule.get("umm").get("GranuleUR") for granule in granules]
else:
logging.error("Problem querying for granules. Unable to proceed.")
sys.exit(1)

print(granule_ids)
frame_to_dayindex_to_granule = get_frame_to_dayindex_to_granule(granule_ids, frames_to_validate, burst_to_frames, frame_to_bursts)


for frame_id in frame_to_dayindex_to_granule:
print(frame_id)
print(frame_to_dayindex_to_granule[frame_id])

# Determine which frame-dayindex pairs were supposed to have been processed. Remove any one that weren't supposed to have been processed.


def validate_disp_s1_with_products(smallest_date, greatest_date, endpoint, df, logger):
"""
Validates that the granules from the CMR query are accurately reflected in the DataFrame provided.
It extracts granule information based on the input dates and checks which granules are missing from the DataFrame.
Expand Down Expand Up @@ -38,22 +132,22 @@ def validate_disp_s1(smallest_date, greatest_date, endpoint, df, logger):

all_granules = retrieve_r3_products(smallest_date, greatest_date, endpoint, 'OPERA_L2_CSLC-S1_V1')

es_util = es_conn_util.get_es_connection(logger)
#es_util = es_conn_util.get_es_connection(logger)

try:
# Extract MGRS tiles and create the mapping to InputGranules
available_cslc_bursts = []
for item in all_granules:
print(item)

ccslc = es_util.query(
'''ccslc = es_util.query(
index=_DISP_S1_INDEX_PATTERNS,
body={"query": {"bool": {"must": [
{"term": {"metadata.ccslc_m_index.keyword": ccslc_m_index}},
{"term": {"metadata.frame_id": frame_id}}
]}}})
]}}})'''

#input_granules = item['umm']['InputGranules']
input_granules = item['umm']['InputGranules']

# Extract the granule burst ID from the full path
for path in input_granules:
Expand Down
77 changes: 76 additions & 1 deletion report/opera-validator/opv_util.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
import time
import random
import sys
import re
import concurrent.futures
import multiprocessing
import tqdm
import requests
import logging
from datetime import datetime, timedelta
Expand Down Expand Up @@ -269,4 +274,74 @@ def get_burst_ids_from_file(filename):
else:
print(f"\nWarning: Could not extract burst information from malformed granule ID {granule_id}.")

return burst_ids, burst_dates
return burst_ids, burst_dates


def get_granules_from_query(start, end, timestamp, endpoint, provider='ASF', shortname='OPERA_L2_RTC-S1_V1'):
"""
Fetches granule metadata from the CMR API within a specified temporal range using parallel requests.
:start: Start time in ISO 8601 format.
:end: End time in ISO 8601 format.
:timestamp: Type of timestamp to filter granules (e.g., 'TEMPORAL', 'PRODUCTION').
:endpoint: CMR API endpoint ('OPS' or 'UAT').
:provider: Data provider ID (default 'ASF').
:shortname: Short name of the product (default 'OPERA_L2_RTC-S1_V1').
:return: List of granule metadata.
"""

granules = []

base_url, params = generate_url_params(start=start, end=end, timestamp_type=timestamp, endpoint=endpoint,
provider=provider, short_name=shortname)

# Construct the URL for the total granules query
total_granules = get_total_granules(base_url, params)
print(f"Total granules: {total_granules}")
print(f"Querying CMR for time range {start} to {end}.")

# Exit with error code if no granules to process
if (total_granules == 0):
print(f"Error: no granules to process.")
sys.exit(1)

# Optimize page_size and number of workers based on total_granules
page_size = min(1000, total_granules)

# Initialize progress bar
tqdm.tqdm._instances.clear() # Clear any existing tqdm instances
print()

# Main loop to fetch granules, update progress bar, and extract burst_ids
with tqdm.tqdm(total=total_granules, desc="Fetching granules", position=0) as pbar_global:
downloaded_batches = multiprocessing.Value('i', 0) # For counting downloaded batches
total_batches = (total_granules + page_size - 1) // page_size

with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
# NOTE: parallelized workers beyond 5 not working well, but here's some code to make it work in the future
# max_workers = min(5, (total_granules + page_size - 1) // page_size)
# futures = [executor.submit(parallel_fetch, base_url, params, page_num, page_size, downloaded_batches, total_batches) for page_num in range(1, total_batches + 1)]
futures = []
for page_num in range(1, total_batches + 1):
future = executor.submit(parallel_fetch, base_url, params, page_num, page_size, downloaded_batches)
futures.append(future)
random_delay = random.uniform(0, 0.1)
time.sleep(random_delay) # Stagger the submission of function calls for CMR optimization
logging.debug(f"Scheduled granule fetch for batch {page_num}")

for future in concurrent.futures.as_completed(futures):
granules_result = future.result()
pbar_global.update(len(granules_result))

granules.extend(granules_result)

print("\nGranule fetching complete.")

# Integrity check for total granules
total_downloaded = sum(len(future.result()) for future in futures)
if total_downloaded != total_granules:
print(
f"\nError: Expected {total_granules} granules, but downloaded {total_downloaded}. Try running again after some delay.")
sys.exit(1)

return granules
34 changes: 30 additions & 4 deletions report/opera-validator/test_opera_validator.py

Large diffs are not rendered by default.

0 comments on commit 8932db9

Please sign in to comment.