Skip to content

Commit

Permalink
feat(audit): query improvements
Browse files Browse the repository at this point in the history
query version 1.1 for CSLC. use temporal time instead of native-id query.

Refs #1041
  • Loading branch information
chrisjrd committed Dec 13, 2024
1 parent d3f5ee3 commit 88ef9a9
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 22 deletions.
41 changes: 28 additions & 13 deletions tools/ops/cmr_audit/cmr_audit_hls.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,15 @@
import os
import re
import sys
import urllib.parse
from collections import defaultdict
from typing import Union, Iterable

import aiohttp
import more_itertools
from dotenv import dotenv_values
from more_itertools import always_iterable


from tools.ops.cmr_audit.cmr_audit_utils import async_get_cmr_granules, get_cmr_audit_granules

Expand All @@ -20,7 +24,7 @@
format="%(levelname)7s: %(relativeCreated)7d %(name)s:%(filename)s:%(funcName)s:%(lineno)s - %(message)s", # alternative format which displays time elapsed.
# format="%(asctime)s %(levelname)7s %(name)4s:%(filename)8s:%(funcName)22s:%(lineno)3s - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
level=logging.DEBUG)
level=logging.INFO)
logger = logging.getLogger(__name__)

config = {
Expand Down Expand Up @@ -88,35 +92,46 @@ async def async_get_cmr_granules_hls_s30(temporal_date_start: str, temporal_date
platform_short_name=["Sentinel-2A", "Sentinel-2B"])


async def async_get_cmr_dswx(rtc_native_id_patterns: set, temporal_date_start: str, temporal_date_end: str):
return await async_get_cmr(rtc_native_id_patterns, collection_short_name="OPERA_L3_DSWX-HLS_PROVISIONAL_V1",
temporal_date_start=temporal_date_start, temporal_date_end=temporal_date_end)


async def async_get_cmr_dswx(dswx_native_id_patterns: set):
logger.debug(f"entry({len(dswx_native_id_patterns)=:,})")
async def async_get_cmr(
native_id_patterns: set,
collection_short_name: Union[str, Iterable[str]],
temporal_date_start: str, temporal_date_end: str,
chunk_size=1000
):
logger.debug(f"entry({len(native_id_patterns)=:,})")

# batch granules-requests due to CMR limitation. 1000 native-id clauses seems to be near the limit.
dswx_native_id_patterns = more_itertools.always_iterable(dswx_native_id_patterns)
dswx_native_id_pattern_batches = list(more_itertools.chunked(dswx_native_id_patterns, 1000)) # 1000 == 55,100 length
native_id_patterns = more_itertools.always_iterable(native_id_patterns)
native_id_pattern_batches = list(more_itertools.chunked(native_id_patterns, chunk_size)) # 1000 == 55,100 length

request_url = "https://cmr.earthdata.nasa.gov/search/granules.umm_json"

sem = asyncio.Semaphore(15)
async with aiohttp.ClientSession() as session:
post_cmr_tasks = []
for i, dswx_native_id_pattern_batch in enumerate(dswx_native_id_pattern_batches, start=1):
dswx_native_id_patterns_query_params = "&native_id[]=" + "&native_id[]=".join(dswx_native_id_pattern_batch)
for i, native_id_pattern_batch in enumerate(native_id_pattern_batches, start=1):
# native_id_patterns_query_params = "&native_id[]=" + "&native_id[]=".join(native_id_pattern_batch)

request_body = (
"provider=POCLOUD"
"&short_name[]=OPERA_L3_DSWX-HLS_PROVISIONAL_V1"
f'{"&short_name[]=" + "&short_name[]=".join(always_iterable(collection_short_name))}'
"&options[native-id][pattern]=true"
f"{dswx_native_id_patterns_query_params}"
# f"{native_id_patterns_query_params}"
f"&temporal[]={urllib.parse.quote(temporal_date_start, safe='/:')},{urllib.parse.quote(temporal_date_end, safe='/:')}"
)
logger.debug(f"Creating request task {i} of {len(dswx_native_id_pattern_batches)}")
logger.debug(f"Creating request task {i} of {len(native_id_pattern_batches)}")
post_cmr_tasks.append(get_cmr_audit_granules(request_url, request_body, session, sem))
break
logger.debug(f"Number of requests to make: {len(post_cmr_tasks)=}")

# issue requests in batches
logger.debug("Batching tasks")
dswx_granules = set()
cmr_granules = set()
task_chunks = list(more_itertools.chunked(post_cmr_tasks, len(post_cmr_tasks))) # CMR recommends 2-5 threads.
for i, task_chunk in enumerate(task_chunks, start=1):
logger.info(f"Processing batch {i} of {len(task_chunks)}")
Expand All @@ -125,8 +140,8 @@ async def async_get_cmr_dswx(dswx_native_id_patterns: set):
await asyncio.gather(*task_chunk, return_exceptions=False)
)
for post_cmr_tasks_result in post_cmr_tasks_results:
dswx_granules.update(post_cmr_tasks_result[0])
return dswx_granules
cmr_granules.update(post_cmr_tasks_result[0])
return cmr_granules


def hls_granule_ids_to_dswx_native_id_patterns(cmr_granules: set[str], input_to_outputs_map: defaultdict, output_to_inputs_map: defaultdict):
Expand Down
17 changes: 8 additions & 9 deletions tools/ops/cmr_audit/cmr_audit_slc.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,26 +95,24 @@ async def async_get_cmr_granules_slc_s1b(temporal_date_start: str, temporal_date


async def async_get_cmr_cslc(cslc_native_id_patterns: set, temporal_date_start: str, temporal_date_end: str):
return await async_get_cmr(cslc_native_id_patterns, collection_short_name="OPERA_L2_CSLC-S1_V1", collection_concept_id="C1257337155-ASF",
return await async_get_cmr(cslc_native_id_patterns, collection_short_name="OPERA_L2_CSLC-S1_V1",
temporal_date_start=temporal_date_start, temporal_date_end=temporal_date_end, chunk_size=100)


async def async_get_cmr_rtc(rtc_native_id_patterns: set, temporal_date_start: str, temporal_date_end: str):
return await async_get_cmr(rtc_native_id_patterns, collection_short_name="OPERA_L2_RTC-S1_V1", collection_concept_id="C1257337044-ASF",
return await async_get_cmr(rtc_native_id_patterns, collection_short_name="OPERA_L2_RTC-S1_V1",
temporal_date_start=temporal_date_start, temporal_date_end=temporal_date_end, chunk_size=100)


async def async_get_cmr(
native_id_patterns: set,
collection_short_name: Union[str, Iterable[str]],
collection_concept_id: str,
temporal_date_start: str, temporal_date_end: str,
chunk_size=1000): # 1000 ~= 55,100 length
"""
Issue CMR query requests.
:param native_id_patterns: the native ID patterns to use in the query. Corresponds to query param `&native-id[]`. Allows use of wildcards "*" and "?", but is descouraged.
:param collection_short_name: CMR collection short name. Typically found in PCM's settings.yaml
:param collection_concept_id: CMR collection concept ID for faster queries.
:param temporal_date_start: temporal start date. Corresponds to query param `&temporal[]=<start>,<end>`
:param temporal_date_end: temporal end date. Corresponds to query param `&temporal[]=<start>,<end>`
:param chunk_size: split queries across N native-id patterns per request. CMR request bodies have an implicit size limit of 55,100 length. Must be a value in the interval [1,1000].
Expand All @@ -130,21 +128,22 @@ async def async_get_cmr(
sem = asyncio.Semaphore(15)
async with aiohttp.ClientSession() as session:
post_cmr_tasks = []
for i, rtc_native_id_pattern_batch in enumerate(native_id_pattern_batches, start=1):
native_id_patterns_query_params = "&native_id[]=" + "&native_id[]=".join(rtc_native_id_pattern_batch)
for i, native_id_pattern_batch in enumerate(native_id_pattern_batches, start=1):
# native_id_patterns_query_params = "&native_id[]=" + "&native_id[]=".join(native_id_pattern_batch)

request_body = (
"provider=ASF"
f'{"&short_name[]=" + "&short_name[]=".join(always_iterable(collection_short_name))}'
"&platform[]=Sentinel-1A"
"&platform[]=Sentinel-1B"
"&bounding_box=-180,-60,180,90"
"&options[native-id][pattern]=true"
f"{native_id_patterns_query_params}"
# "&options[native-id][pattern]=true"
# f"{native_id_patterns_query_params}"
f"&temporal[]={urllib.parse.quote(temporal_date_start, safe='/:')},{urllib.parse.quote(temporal_date_end, safe='/:')}"
)
logger.debug(f"Creating request task {i} of {len(native_id_pattern_batches)}")
post_cmr_tasks.append(get_cmr_audit_granules(request_url, request_body, session, sem))
break
logger.debug(f"Number of requests to make: {len(post_cmr_tasks)=}")

# issue requests in batches
Expand Down Expand Up @@ -184,7 +183,7 @@ def slc_granule_ids_to_cslc_native_id_patterns(cmr_granules: set[str], input_to_
cslc_acquisition_dt_str = m.group("start_ts")

# OPERA_L2_CSLC-S1_*_20231124T124529Z_*_S1*
rtc_native_id_pattern = f'OPERA_L2_CSLC-S1_*_{cslc_acquisition_dt_str}Z_*_S1*'
rtc_native_id_pattern = f'OPERA_L2_CSLC-S1_*_{cslc_acquisition_dt_str}Z_*_S1*v1.1'
rtc_native_id_patterns.add(rtc_native_id_pattern)

# bi-directional mapping of HLS-DSWx inputs and outputs
Expand Down

0 comments on commit 88ef9a9

Please sign in to comment.