Skip to content

Commit

Permalink
Merge pull request #1022 from nasa/nisar_gcov_query_support
Browse files Browse the repository at this point in the history
L2_NISAR_GCOV Query Support and Logging Improvements
  • Loading branch information
hhlee445 authored Dec 11, 2024
2 parents 7b6e27e + 555b277 commit c98db6a
Show file tree
Hide file tree
Showing 42 changed files with 710 additions and 713 deletions.
8 changes: 8 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
######################################################################
# HySDS
######################################################################

celeryconfig.py
_alt_error.txt
_alt_traceback.txt

######################################################################
# Terraform
######################################################################
Expand Down
43 changes: 41 additions & 2 deletions commons/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

from enum import Enum

import boto3

# set logger and custom filter to handle being run from sciflo
log_format = "[%(asctime)s: %(levelname)s/%(funcName)s] %(message)s"
logging.basicConfig(format=log_format, level=logging.INFO)
Expand Down Expand Up @@ -43,12 +45,39 @@ def filter(self, record):
record.id = "--"
return True


logger = logging.getLogger("opera_pcm")
logger.setLevel(logging.INFO)
logger.addFilter(LogFilter())


logger_initialized = False
def get_logger(verbose=False, quiet=False):
global logger_initialized

if not logger_initialized:

if verbose:
log_level = LogLevels.DEBUG.value
elif quiet:
log_level = LogLevels.WARNING.value
else:
log_level = LogLevels.INFO.value

if verbose:
log_format = '[%(asctime)s: %(levelname)s/%(module)s:%(funcName)s:%(lineno)d] %(message)s'
else:
log_format = "[%(asctime)s: %(levelname)s/%(funcName)s] %(message)s"

logging.basicConfig(level=log_level, format=log_format, force=True)

logger.addFilter(NoLogUtilsFilter())

logger_initialized = True
logger.info("Initial logging configuration complete")
logger.info("Log level set to %s", log_level)

return logger


class NoLogUtilsFilter(logging.Filter):

"""Filters out large JSON output of HySDS internals. Apply to any logger (typically __main__) or its
Expand Down Expand Up @@ -91,3 +120,13 @@ def filter(self, record):
and "/containers/_doc/" not in record.getMessage() \
and "/_search?" not in record.getMessage() \
and "/_update" not in record.getMessage()


def configure_library_loggers():
logger_hysds_commons = logging.getLogger("hysds_commons")
logger_hysds_commons.addFilter(NoJobUtilsFilter())

logger_elasticsearch = logging.getLogger("elasticsearch")
logger_elasticsearch.addFilter(NoBaseFilter())

boto3.set_stream_logger(name='botocore.credentials', level=logging.ERROR)
64 changes: 31 additions & 33 deletions data_subscriber/asf_cslc_download.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import copy
import logging
import os
import urllib.parse
from datetime import datetime, timezone
Expand All @@ -24,7 +23,6 @@
from data_subscriber.cslc.cslc_dependency import CSLCDependency
from data_subscriber.cslc.cslc_blackout import DispS1BlackoutDates, localize_disp_blackout_dates

logger = logging.getLogger(__name__)

_C_CSLC_ES_INDEX_PATTERNS = "grq_1_l2_cslc_s1_compressed*"

Expand All @@ -41,7 +39,7 @@ def run_download(self, args, token, es_conn, netloc, username, password, cmr, jo

settings = SettingsConf().cfg
product_id = "_".join([batch_id for batch_id in args.batch_ids])
logger.info(f"{product_id=}")
self.logger.info(f"{product_id=}")
cslc_s3paths = []
c_cslc_s3paths = []
cslc_static_s3paths = []
Expand All @@ -61,7 +59,7 @@ def run_download(self, args, token, es_conn, netloc, username, password, cmr, jo
new_args = copy.deepcopy(args)

for batch_id in args.batch_ids:
logger.info(f"Downloading CSLC files for batch {batch_id}")
self.logger.info(f"Downloading CSLC files for batch {batch_id}")

# Determine the highest acquisition cycle index here for later use in retrieving m compressed CSLCs
_, acq_cycle_index = split_download_batch_id(batch_id)
Expand All @@ -75,7 +73,7 @@ def run_download(self, args, token, es_conn, netloc, username, password, cmr, jo
cslc_products_to_filepaths: dict[str, set[Path]] = super().run_download(
new_args, token, es_conn, netloc, username, password, cmr, job_id, rm_downloads_dir=False
)
logger.info(f"Uploading CSLC input files to S3")
self.logger.info(f"Uploading CSLC input files to S3")
cslc_files_to_upload = [fp for fp_set in cslc_products_to_filepaths.values() for fp in fp_set]
cslc_s3paths.extend(concurrent_s3_client_try_upload_file(bucket=settings["DATASET_BUCKET"],
key_prefix=f"tmp/disp_s1/{batch_id}",
Expand All @@ -88,7 +86,7 @@ def run_download(self, args, token, es_conn, netloc, username, password, cmr, jo

# For s3 we can use the files directly so simply copy over the paths
else: # s3 or auto
logger.info("Skipping download CSLC bursts and instead using ASF S3 paths for direct SCIFLO PGE ingestion")
self.logger.info("Skipping download CSLC bursts and instead using ASF S3 paths for direct SCIFLO PGE ingestion")
downloads = self.get_downloads(args, es_conn)
cslc_s3paths = [download["s3_url"] for download in downloads]
if len(cslc_s3paths) == 0:
Expand All @@ -104,9 +102,9 @@ def run_download(self, args, token, es_conn, netloc, username, password, cmr, jo

try:
head_object = boto3.client("s3").head_object(Bucket=bucket, Key=key)
logger.info(f"Adding CSLC file: {p}")
self.logger.info(f"Adding CSLC file: {p}")
except Exception as e:
logger.error("Failed when accessing the S3 object:" + p)
self.logger.error("Failed when accessing the S3 object:" + p)
raise e
file_size = int(head_object["ContentLength"])

Expand All @@ -125,27 +123,27 @@ def run_download(self, args, token, es_conn, netloc, username, password, cmr, jo
to_mark_downloaded.append((unique_id, file_size))
burst_id_set.add(burst_id)

logger.info(f"Querying CSLC-S1 Static Layer products for {batch_id}")
self.logger.info(f"Querying CSLC-S1 Static Layer products for {batch_id}")
cslc_static_granules = self.query_cslc_static_files_for_cslc_batch(
cslc_files_to_upload, args, token, job_id, settings
)

# Download the files from ASF only if the transfer protocol is HTTPS
if args.transfer_protocol == "https":
logger.info(f"Downloading CSLC Static Layer products for {batch_id}")
self.logger.info(f"Downloading CSLC Static Layer products for {batch_id}")
cslc_static_products_to_filepaths: dict[str, set[Path]] = self.download_cslc_static_files_for_cslc_batch(
cslc_static_granules, args, token, netloc,
username, password, job_id
)

logger.info("Uploading CSLC Static input files to S3")
self.logger.info("Uploading CSLC Static input files to S3")
cslc_static_files_to_upload = [fp for fp_set in cslc_static_products_to_filepaths.values() for fp in fp_set]
cslc_static_s3paths.extend(concurrent_s3_client_try_upload_file(bucket=settings["DATASET_BUCKET"],
key_prefix=f"tmp/disp_s1/{batch_id}",
files=cslc_static_files_to_upload))
# For s3 we can use the files directly so simply copy over the paths
else: # s3 or auto
logger.info("Skipping download CSLC static files and instead using ASF S3 paths for direct SCIFLO PGE ingestion")
self.logger.info("Skipping download CSLC static files and instead using ASF S3 paths for direct SCIFLO PGE ingestion")

cslc_static_products_to_filepaths = {} # Dummy when trying to delete files later in this function

Expand All @@ -159,11 +157,11 @@ def run_download(self, args, token, es_conn, netloc, username, password, cmr, jo

# Download all Ionosphere files corresponding to the dates covered by the input CSLC set
# We always download ionosphere files, there is no direct S3 ingestion option
logger.info(f"Downloading Ionosphere files for {batch_id}")
self.logger.info(f"Downloading Ionosphere files for {batch_id}")
ionosphere_paths = self.download_ionosphere_files_for_cslc_batch(cslc_files_to_upload,
self.downloads_dir)

logger.info(f"Uploading Ionosphere files to S3")
self.logger.info(f"Uploading Ionosphere files to S3")
# TODO: since all ionosphere files now go to the same S3 location,
# it should be possible to do a lookup before redownloading a file
ionosphere_s3paths.extend(concurrent_s3_client_try_upload_file(bucket=settings["DATASET_BUCKET"],
Expand All @@ -172,7 +170,7 @@ def run_download(self, args, token, es_conn, netloc, username, password, cmr, jo

# Delete the files from the file system after uploading to S3
if rm_downloads_dir:
logger.info("Removing downloaded files from local filesystem")
self.logger.info("Removing downloaded files from local filesystem")
for fp_set in cslc_products_to_filepaths.values():
for fp in fp_set:
os.remove(fp)
Expand All @@ -186,7 +184,7 @@ def run_download(self, args, token, es_conn, netloc, username, password, cmr, jo

# Determine M Compressed CSLCs by querying compressed cslc GRQ ES -------------->
k, m = es_conn.get_k_and_m(args.batch_ids[0])
logger.info(f"{k=}, {m=}")
self.logger.info(f"{k=}, {m=}")

cslc_dependency = CSLCDependency(k, m, self.disp_burst_map, args, token, cmr, settings, self.blackout_dates_obj)

Expand All @@ -197,14 +195,14 @@ def run_download(self, args, token, es_conn, netloc, username, password, cmr, jo
for ccslc in ccslcs:
cslc_path = ccslc["_source"]["metadata"]["product_s3_paths"]
c_cslc_s3paths.extend(cslc_path)
logger.info(f"Adding {cslc_path} to c_cslc_s3paths")
self.logger.info(f"Adding {cslc_path} to c_cslc_s3paths")

# Now acquire the Ionosphere files for the reference dates of the Compressed CSLC products
logger.info(f"Downloading Ionosphere files for Compressed CSLCs")
self.logger.info(f"Downloading Ionosphere files for Compressed CSLCs")
ionosphere_paths = self.download_ionosphere_files_for_cslc_batch(c_cslc_s3paths,
self.downloads_dir)

logger.info(f"Uploading Ionosphere files for Compressed CSLCs to S3")
self.logger.info(f"Uploading Ionosphere files for Compressed CSLCs to S3")
ionosphere_s3paths.extend(concurrent_s3_client_try_upload_file(bucket=settings["DATASET_BUCKET"],
key_prefix=f"tmp/disp_s1/ionosphere",
files=ionosphere_paths))
Expand All @@ -218,12 +216,12 @@ def run_download(self, args, token, es_conn, netloc, username, password, cmr, jo
print(f'{bounding_box=}')

# Now submit DISP-S1 SCIFLO job
logger.info(f"Submitting DISP-S1 SCIFLO job")
self.logger.info(f"Submitting DISP-S1 SCIFLO job")

save_compressed_cslc = False
if cslc_dependency.determine_k_cycle(None, latest_acq_cycle_index, frame_id) == 0:
save_compressed_cslc = True
logger.info(f"{save_compressed_cslc=}")
self.logger.info(f"{save_compressed_cslc=}")

product = {
"_id": product_id,
Expand Down Expand Up @@ -284,7 +282,7 @@ def get_downloads(self, args, es_conn):
Granules are stored in either cslc_catalog or k_cslc_catalog index. We assume that the latest batch_id (defined
as the one with the greatest acq_cycle_index) is stored in cslc_catalog. The rest are stored in k_cslc_catalog.'''

k_es_conn = KCSLCProductCatalog(logging.getLogger(__name__))
k_es_conn = KCSLCProductCatalog(self.logger)

# Sort the batch_ids by acq_cycle_index
batch_ids = sorted(args.batch_ids, key = lambda batch_id: split_download_batch_id(batch_id)[1])
Expand All @@ -293,25 +291,25 @@ def get_downloads(self, args, es_conn):

# Historical mode stores all granules in normal cslc_catalog
if "proc_mode" in args and args.proc_mode == "historical":
logger.info("Downloading cslc files for historical mode")
self.logger.info("Downloading cslc files for historical mode")
for batch_id in batch_ids:
downloads = es_conn.get_download_granule_revision(batch_id)
logger.info(f"Got {len(downloads)=} cslc downloads for {batch_id=}")
self.logger.info(f"Got {len(downloads)=} cslc downloads for {batch_id=}")
assert len(downloads) > 0, f"No downloads found for batch_id={batch_id}!"
all_downloads.extend(downloads)

# Forward and reprocessing modes store all granules in k_cslc_catalog
else:
logger.info("Downloading cslc files for forward/reprocessing mode")
self.logger.info("Downloading cslc files for forward/reprocessing mode")
downloads = es_conn.get_download_granule_revision(batch_ids[-1])
logger.info(f"Got {len(downloads)=} cslc granules downloads for batch_id={batch_ids[-1]}")
self.logger.info(f"Got {len(downloads)=} cslc granules downloads for batch_id={batch_ids[-1]}")
assert len(downloads) > 0, f"No downloads found for batch_id={batch_ids[-1]}!"
all_downloads.extend(downloads)

# Download K-CSLC granules
for batch_id in batch_ids[:-1]:
downloads = k_es_conn.get_download_granule_revision(batch_id)
logger.info(f"Got {len(downloads)=} k cslc downloads for {batch_id=}")
self.logger.info(f"Got {len(downloads)=} k cslc downloads for {batch_id=}")
assert len(downloads) > 0, f"No downloads found for batch_id={batch_id}!"
all_downloads.extend(downloads)

Expand All @@ -337,11 +335,11 @@ def query_cslc_static_files_for_cslc_batch(self, cslc_files, args, token, job_id
cslc_query_args.max_revision = 1000
cslc_query_args.proc_mode = "forward" # CSLC static query does not care about
# this, but a value must be provided
es_conn = CSLCStaticProductCatalog(logging.getLogger(__name__))
es_conn = CSLCStaticProductCatalog(self.logger)

cmr_query = CslcStaticCmrQuery(cslc_query_args, token, es_conn, cmr, job_id, settings)

result = cmr_query.run_query(cslc_query_args, token, es_conn, cmr, job_id, settings)
result = cmr_query.run_query()

return result["download_granules"]

Expand All @@ -351,7 +349,7 @@ def download_cslc_static_files_for_cslc_batch(self, cslc_static_granules, args,
session = SessionWithHeaderRedirection(username, password, netloc)

downloads = []
es_conn = CSLCStaticProductCatalog(logging.getLogger(__name__))
es_conn = CSLCStaticProductCatalog(self.logger)

for cslc_static_granule in cslc_static_granules:
download_dict = {
Expand Down Expand Up @@ -384,13 +382,13 @@ def download_ionosphere_files_for_cslc_batch(self, cslc_files, download_dir):
downloaded_ionosphere_files = set()

for cslc_file in cslc_files:
logger.info(f'Downloading Ionosphere file for CSLC granule {cslc_file}')
self.logger.info(f'Downloading Ionosphere file for CSLC granule {cslc_file}')

cslc_file_tokens = cslc_file.split('_')
acq_datetime = cslc_file_tokens[4]
acq_date = acq_datetime.split('T')[0]

logger.debug(f'{acq_date=}')
self.logger.debug(f'{acq_date=}')

if acq_date not in downloaded_ionosphere_dates:
ionosphere_filepath = ionosphere_download.download_ionosphere_correction_file(
Expand All @@ -400,7 +398,7 @@ def download_ionosphere_files_for_cslc_batch(self, cslc_files, download_dir):
downloaded_ionosphere_dates.add(acq_date)
downloaded_ionosphere_files.add(ionosphere_filepath)
else:
logger.info(f'Already downloaded Ionosphere file for date {acq_date}, skipping...')
self.logger.info(f'Already downloaded Ionosphere file for date {acq_date}, skipping...')

return downloaded_ionosphere_files

Expand Down
2 changes: 0 additions & 2 deletions data_subscriber/asf_download.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@
from datetime import datetime
from pathlib import PurePath, Path

import backoff
import requests
import requests.utils

from data_subscriber.download import DaacDownload
Expand Down
Loading

0 comments on commit c98db6a

Please sign in to comment.