Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP ability to load orbit from annotation XML #41

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions src/s1reader/s1_orbit.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
import datetime
import os
import xml.etree.ElementTree as ET

import isce3

# date format used in file names
FMT = "%Y%m%dT%H%M%S"
Expand Down Expand Up @@ -121,3 +124,42 @@ def get_orbit_file_from_dir(path: str, orbit_dir: str) -> str:
path, [f'{orbit_dir}/{item}' for item in os.listdir(orbit_dir)])

return orbit_path

def burst_orbit_from_file(sensing_start, sensing_stop, osv_list: ET.Element):
'''Init and return ISCE3 orbit from element in orbit XML file.

Parameters:
-----------
sensing_start : datetime.datetime
Sensing start of burst; taken from azimuth time
sensing_stop : datetime.datetime
Sensing stop of burst
osv_list : xml.etree.ElementTree.Element
ElementTree containing orbit state vectors

Returns:
--------
_ : datetime
Sensing mid as datetime object.
'''
fmt = "UTC=%Y-%m-%dT%H:%M:%S.%f"
orbit_sv = []
# add start & end padding to ensure sufficient number of orbit points
pad = datetime.timedelta(seconds=60)
for osv in osv_list:
t_orbit = datetime.datetime.strptime(osv[1].text, fmt)

if t_orbit > sensing_stop + pad:
break

if t_orbit > sensing_start - pad:
pos = [float(osv[i].text) for i in range(4,7)]
vel = [float(osv[i].text) for i in range(7,10)]
orbit_sv.append(isce3.core.StateVector(isce3.core.DateTime(t_orbit),
pos, vel))

# use list of stateVectors to init and return isce3.core.Orbit
time_delta = datetime.timedelta(days=2)
ref_epoch = isce3.core.DateTime(sensing_start - time_delta)

return isce3.core.Orbit(orbit_sv, ref_epoch)
54 changes: 31 additions & 23 deletions src/s1reader/s1_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@
import isce3
from nisar.workflows.stage_dem import check_dateline
from s1reader.s1_burst_slc import Doppler, Sentinel1BurstSlc
from s1reader.s1_orbit import burst_orbit_from_file

# TODO evaluate if it make sense to combine below into a class
def as_datetime(t_str, fmt = "%Y-%m-%dT%H:%M:%S.%f"):
def as_datetime(t_str, fmt="%Y-%m-%dT%H:%M:%S.%f"):
'''Parse given time string to datetime.datetime object.

Parameters:
Expand Down Expand Up @@ -126,7 +127,7 @@ def doppler_poly1d_to_lut2d(doppler_poly1d, starting_slant_range,
return isce3.core.LUT2d(slant_ranges, az_times,
np.vstack((freq_1d, freq_1d)))

def get_burst_orbit(sensing_start, sensing_stop, osv_list: ET.Element):
def parse_annotation_orbit(sensing_start, sensing_stop, osv_list: ET.Element):
'''Init and return ISCE3 orbit.

Parameters:
Expand All @@ -143,21 +144,18 @@ def get_burst_orbit(sensing_start, sensing_stop, osv_list: ET.Element):
_ : datetime
Sensing mid as datetime object.
'''
fmt = "UTC=%Y-%m-%dT%H:%M:%S.%f"
orbit_sv = []
fmt = "%Y-%m-%dT%H:%M:%S.%f"
n_svs = int(osv_list.attrib['count'])
orbit_sv = [[]] * n_svs
# add start & end padding to ensure sufficient number of orbit points
pad = datetime.timedelta(seconds=60)
for osv in osv_list:
t_orbit = datetime.datetime.strptime(osv[1].text, fmt)
for i, osv in enumerate(osv_list):
t_orbit = datetime.datetime.strptime(osv[0].text, fmt)

if t_orbit > sensing_stop + pad:
break

if t_orbit > sensing_start - pad:
pos = [float(osv[i].text) for i in range(4,7)]
vel = [float(osv[i].text) for i in range(7,10)]
orbit_sv.append(isce3.core.StateVector(isce3.core.DateTime(t_orbit),
pos, vel))
pos = [float(osv[2][i].text) for i in range(3)]
vel = [float(osv[3][i].text) for i in range(3)]
orbit_sv[i] = isce3.core.StateVector(isce3.core.DateTime(t_orbit),
pos, vel)

# use list of stateVectors to init and return isce3.core.Orbit
time_delta = datetime.timedelta(days=2)
Expand Down Expand Up @@ -336,8 +334,13 @@ def burst_from_xml(annotation_path: str, orbit_path: str, tiff_path: str,
iw2_mid_range = iw2_starting_range + 0.5 * iw2_n_samples * range_pxl_spacing

# find orbit state vectors in 'Data_Block/List_of_OSVs'
orbit_tree = ET.parse(orbit_path)
osv_list = orbit_tree.find('Data_Block/List_of_OSVs')
if orbit_path is None:
osv_list = tree.find('generalAnnotation/orbitList')
get_orbit = parse_annotation_orbit
else:
orbit_tree = ET.parse(orbit_path)
osv_list = orbit_tree.find('Data_Block/List_of_OSVs')
get_orbit = burst_orbit_from_file

# load individual burst
burst_list_elements = tree.find('swathTiming/burstList')
Expand Down Expand Up @@ -369,8 +372,8 @@ def burst_from_xml(annotation_path: str, orbit_path: str, tiff_path: str,
# get orbit from state vector list/element tree
sensing_duration = datetime.timedelta(
seconds=n_samples * azimuth_time_interval)
orbit = get_burst_orbit(sensing_start, sensing_start + sensing_duration,
osv_list)
orbit = get_orbit(sensing_start, sensing_start + sensing_duration,
osv_list)

# determine burst offset and dimensions
# TODO move to own method
Expand Down Expand Up @@ -425,20 +428,20 @@ def _is_zip_annotation_xml(path: str, id_str: str) -> bool:
return True
return False

def load_bursts(path: str, orbit_path: str, swath_num: int, pol: str='vv',
def load_bursts(path: str, swath_num: int, pol: str='vv', orbit_path: str=None,
burst_ids: list[str]=None):
'''Find bursts in a Sentinel-1 zip file or a SAFE structured directory.

Parameters:
-----------
path : str
Path to Sentinel-1 zip file or SAFE directory
orbit_path : str
Path the orbit file.
swath_num : int
Integer of subswath of desired burst. {1, 2, 3}
pol : str
Polarization of desired burst. {hh, vv, hv, vh}
orbit_path : str
Path the orbit file. If None, annotation XML orbit is parsed.
burst_ids : list[str]
List of burst IDs for which their Sentinel1BurstSlc objects will be
returned. Default of None returns all bursts. Empty list returned if
Expand Down Expand Up @@ -466,6 +469,11 @@ def load_bursts(path: str, orbit_path: str, swath_num: int, pol: str='vv',
if pol not in pols:
raise ValueError(f"polarization not in {pols}")

if orbit_path is None:
warn_str = 'No external orbit given. '
warn_str += 'Orbit will be constructed from annotation XML.'
warnings.warn(warn_str)

id_str = f'iw{swath_num}-slc-{pol}'

if not os.path.exists(path):
Expand Down Expand Up @@ -506,7 +514,7 @@ def _burst_from_zip(zip_path: str, id_str: str, orbit_path: str):
id_str: str
Identifcation of desired burst. Format: iw[swath_num]-slc-[pol]
orbit_path : str
Path the orbit file.
Path the orbit file. If None, annotation XML orbit is parsed.

Returns:
--------
Expand Down Expand Up @@ -547,7 +555,7 @@ def _burst_from_safe_dir(safe_dir_path: str, id_str: str, orbit_path: str):
id_str: str
Identifcation of desired burst. Format: iw[swath_num]-slc-[pol]
orbit_path : str
Path the orbit file.
Path the orbit file. If None, annotation XML orbit is parsed.

Returns:
--------
Expand Down