Skip to content

Commit

Permalink
Merge pull request #347 from GeoscienceAustralia/mg/fix_unit_conversion
Browse files Browse the repository at this point in the history
Bugfix phase_data unit conversion in correct step
Matt Garthwaite authored Sep 15, 2021
2 parents dde59c6 + 0a5363e commit 8893509
Showing 26 changed files with 232 additions and 231 deletions.
4 changes: 2 additions & 2 deletions input_parameters.conf
Original file line number Diff line number Diff line change
@@ -116,8 +116,8 @@ nan_conversion: 1
# refnx/y: number of search grid points in x/y image dimensions
# refchipsize: size of the data window at each search grid point
# refminfrac: minimum fraction of valid (non-NaN) pixels in the data window
refx: -99.083
refy: 19.441
refx: -99.18
refy: 19.44
refnx: 5
refny: 5
refchipsize: 7
119 changes: 62 additions & 57 deletions pyrate/core/aps.py
Original file line number Diff line number Diff line change
@@ -34,16 +34,20 @@
from pyrate.core import shared, ifgconstants as ifc, mpiops
from pyrate.core.covariance import cvd_from_phase, RDist
from pyrate.core.algorithm import get_epochs
from pyrate.core.shared import Ifg, Tile, EpochList
from pyrate.core.shared import Ifg, Tile, EpochList, nan_and_mm_convert
from pyrate.core.timeseries import time_series
from pyrate.merge import assemble_tiles
from pyrate.configuration import MultiplePaths, Configuration


def wrap_spatio_temporal_filter(params: dict) -> None:
def spatio_temporal_filter(params: dict) -> None:
"""
A wrapper for the spatio-temporal filter so it can be tested.
See docstring for spatio_temporal_filter.
Applies a spatio-temporal filter to remove the atmospheric phase screen
(APS) and saves the corrected interferograms. Firstly the incremental
time series is computed using the SVD method, before a cascade of temporal
then spatial Gaussian filters is applied. The resulting APS corrections are
saved to disc before being subtracted from each interferogram.
:param params: Dictionary of PyRate configuration parameters.
"""
if params[C.APSEST]:
@@ -61,44 +65,38 @@ def wrap_spatio_temporal_filter(params: dict) -> None:
log.debug('Finished APS correction')
return # return if True condition returned

aps_error_files_on_disc = [MultiplePaths.aps_error_path(i, params) for i in ifg_paths]
if all(a.exists() for a in aps_error_files_on_disc):
log.warning("Reusing APS errors from previous run!!!")
for ifg_path, a in mpiops.array_split(list(zip(ifg_paths, aps_error_files_on_disc))):
phase = np.load(a)
_save_aps_corrected_phase(ifg_path, phase)
else:
tsincr = _calc_svd_time_series(ifg_paths, params, preread_ifgs, tiles)
mpiops.comm.barrier()
aps_paths = [MultiplePaths.aps_error_path(i, params) for i in ifg_paths]
if all(a.exists() for a in aps_paths):
log.warning('Reusing APS errors from previous run')
_apply_aps_correction(ifg_paths, aps_paths, params)
return

spatio_temporal_filter(tsincr, ifg_paths, params, preread_ifgs)
# obtain the incremental time series using SVD
tsincr = _calc_svd_time_series(ifg_paths, params, preread_ifgs, tiles)
mpiops.comm.barrier()
shared.save_numpy_phase(ifg_paths, params)

# get lists of epochs and ifgs
ifgs = list(OrderedDict(sorted(preread_ifgs.items())).values())
epochlist = mpiops.run_once(get_epochs, ifgs)[0]

# first perform temporal high pass filter
ts_hp = temporal_high_pass_filter(tsincr, epochlist, params)

def spatio_temporal_filter(tsincr: np.ndarray, ifg_paths: List[str], params: dict,
preread_ifgs: dict) -> None:
"""
Applies a spatio-temporal filter to remove the atmospheric phase screen
(APS) and saves the corrected interferograms. Before performing this step,
the time series is computed using the SVD method. This function then
performs temporal and spatial filtering.
:param tsincr: incremental time series array of size (ifg.shape, nepochs-1)
:param ifg_paths: List of interferogram file paths
:param params: Dictionary of PyRate configuration parameters
:param preread_ifgs: Dictionary of shared.PrereadIfg class instances
"""
# second perform spatial low pass filter to obtain APS correction in ts domain
ifg = Ifg(ifg_paths[0]) # just grab any for parameters in slpfilter
ifg.open()
epochlist = mpiops.run_once(get_epochs, preread_ifgs)[0]
ts_hp = temporal_high_pass_filter(tsincr, epochlist, params)
ts_aps = spatial_low_pass_filter(ts_hp, ifg, params)
tsincr -= ts_aps

_ts_to_ifgs(tsincr, preread_ifgs, params)
ifg.close()

# construct APS corrections for each ifg
_make_aps_corrections(ts_aps, ifgs, params)

# apply correction to ifgs and save ifgs to disc.
_apply_aps_correction(ifg_paths, aps_paths, params)

# update/save the phase_data in the tiled numpy files
shared.save_numpy_phase(ifg_paths, params)


def _calc_svd_time_series(ifg_paths: List[str], params: dict, preread_ifgs: dict,
tiles: List[Tile]) -> np.ndarray:
@@ -150,42 +148,49 @@ def _assemble_tsincr(ifg_paths: List[str], params: dict, preread_ifgs: dict,
return np.dstack([v[1] for v in sorted(tsincr_g.items())])


def _ts_to_ifgs(tsincr: np.ndarray, preread_ifgs: dict, params: dict) -> None:
def _make_aps_corrections(ts_aps: np.ndarray, ifgs: List[Ifg], params: dict) -> None:
"""
Function that converts an incremental displacement time series into
interferometric phase observations. Used to re-construct an interferogram
network from a time series.
:param tsincr: incremental time series array of size (ifg.shape, nepochs-1)
:param preread_ifgs: Dictionary of shared.PrereadIfg class instances
Function to convert the time series APS filter output into interferometric
phase corrections and save them to disc.
:param ts_aps: Incremental APS time series array.
:param ifgs: List of Ifg class objects.
:param params: Dictionary of PyRate configuration parameters.
"""
log.debug('Reconstructing interferometric observations from time series')
ifgs = list(OrderedDict(sorted(preread_ifgs.items())).values())
_, n = mpiops.run_once(get_epochs, ifgs)
# get first and second image indices
_ , n = mpiops.run_once(get_epochs, ifgs)
index_first, index_second = n[:len(ifgs)], n[len(ifgs):]

num_ifgs_tuples = mpiops.array_split(list(enumerate(ifgs)))
num_ifgs_tuples = [(int(num), ifg) for num, ifg in num_ifgs_tuples]

for i, ifg in num_ifgs_tuples:
for i, ifg in [(int(num), ifg) for num, ifg in num_ifgs_tuples]:
# sum time slice data from first to second epoch
ifg_aps = np.sum(ts_aps[:, :, index_first[i]: index_second[i]], axis=2)
aps_error_on_disc = MultiplePaths.aps_error_path(ifg.tmp_path, params)
phase = np.sum(tsincr[:, :, index_first[i]: index_second[i]], axis=2)
np.save(file=aps_error_on_disc, arr=phase)
_save_aps_corrected_phase(ifg.tmp_path, phase)
np.save(file=aps_error_on_disc, arr=ifg_aps) # save APS as numpy array

mpiops.comm.barrier()

def _save_aps_corrected_phase(ifg_path: str, phase: np.ndarray) -> None:

def _apply_aps_correction(ifg_paths: List[str], aps_paths: List[str], params: dict) -> None:
"""
Save (update) interferogram metadata and phase data after
spatio-temporal filter (APS) correction.
Function to read and apply (subtract) APS corrections from interferogram data.
"""
ifg = Ifg(ifg_path)
ifg.open(readonly=False)
ifg.phase_data[~np.isnan(ifg.phase_data)] = phase[~np.isnan(ifg.phase_data)]
# set aps tags after aps error correction
ifg.dataset.SetMetadataItem(ifc.PYRATE_APS_ERROR, ifc.APS_REMOVED)
ifg.write_modified_phase()
ifg.close()
for ifg_path, aps_path in mpiops.array_split(list(zip(ifg_paths, aps_paths))):
# read the APS correction from numpy array
aps_corr = np.load(aps_path)
# open the Ifg object
ifg = Ifg(ifg_path)
ifg.open(readonly=False)
# convert NaNs and convert to mm
nan_and_mm_convert(ifg, params)
# subtract the correction from the ifg phase data
ifg.phase_data[~np.isnan(ifg.phase_data)] -= aps_corr[~np.isnan(ifg.phase_data)]
# set meta-data tags after aps error correction
ifg.dataset.SetMetadataItem(ifc.PYRATE_APS_ERROR, ifc.APS_REMOVED)
# write phase data to disc and close ifg.
ifg.write_modified_phase()
ifg.close()


def spatial_low_pass_filter(ts_hp: np.ndarray, ifg: Ifg, params: dict) -> np.ndarray:
29 changes: 7 additions & 22 deletions pyrate/core/covariance.py
Original file line number Diff line number Diff line change
@@ -68,7 +68,6 @@ def cvd(ifg_path, params, r_dist, calc_alpha=False, write_vals=False, save_acg=F
radial average of its 2D autocorrelation.
:param str ifg_path: An interferogram file path. OR
:param Ifg class ifg_path: A pyrate.shared.Ifg class object
:param dict params: Dictionary of configuration parameters
:param ndarray r_dist: Array of distance values from the image centre
(See Rdist class for more details)
@@ -83,13 +82,8 @@ def cvd(ifg_path, params, r_dist, calc_alpha=False, write_vals=False, save_acg=F
:return: alpha: the exponential length-scale of decay factor
:rtype: float
"""

if isinstance(ifg_path, str): # used during MPI
ifg = shared.Ifg(ifg_path)
ifg.open()
else:
ifg = ifg_path

ifg = shared.Ifg(ifg_path)
ifg.open()
shared.nan_and_mm_convert(ifg, params)
# calculate 2D auto-correlation of image using the
# spectral method (Wiener-Khinchin theorem)
@@ -99,26 +93,17 @@ def cvd(ifg_path, params, r_dist, calc_alpha=False, write_vals=False, save_acg=F
phase = ifg.phase_data

maxvar, alpha = cvd_from_phase(phase, ifg, r_dist, calc_alpha, save_acg=save_acg, params=params)

if write_vals:
_add_metadata(ifg, maxvar, alpha)
ifg.add_metadata(**{
ifc.PYRATE_MAXVAR: str(maxvar),
ifc.PYRATE_ALPHA: str(alpha)
})

if isinstance(ifg_path, str):
ifg.close()
ifg.close()

return maxvar, alpha


def _add_metadata(ifg, maxvar, alpha):
"""
Convenience function for saving metadata to ifg
"""
md = ifg.meta_data
md[ifc.PYRATE_MAXVAR] = str(maxvar)
md[ifc.PYRATE_ALPHA] = str(alpha)
ifg.write_modified_phase()


def _save_cvd_data(acg, r_dist, ifg_path, outdir):
"""
Function to save numpy array of autocorrelation data to disk
3 changes: 2 additions & 1 deletion pyrate/core/dem_error.py
Original file line number Diff line number Diff line change
@@ -246,7 +246,7 @@ def _per_tile_setup(ifgs: list) -> Tuple[np.ndarray, np.ndarray, int, int, np.nd
nifgs = len(ifgs)
ifg_data = np.zeros((nifgs, nrows, ncols), dtype=np.float32)
for ifg_num in range(nifgs):
ifg_data[ifg_num] = ifgs[ifg_num].phase_data
ifg_data[ifg_num] = ifgs[ifg_num].phase_data # phase data is read from numpy files
mst = ~np.isnan(ifg_data)
ifg_time_span = np.zeros((nifgs))
for ifg_num in range(nifgs):
@@ -287,6 +287,7 @@ def _write_dem_errors(ifg_paths: list, params: dict, preread_ifgs: dict) -> None
for ifg_path in ifg_paths:
ifg = Ifg(ifg_path)
ifg.open()
shared.nan_and_mm_convert(ifg, params) # ensure we have phase data in mm
# read dem error correction file from tmpdir
dem_error_correction_ifg = assemble_tiles(shape, params[C.TMPDIR], tiles, out_type='dem_error_correction',
index=idx)
9 changes: 3 additions & 6 deletions pyrate/core/orbital.py
Original file line number Diff line number Diff line change
@@ -227,15 +227,13 @@ def independent_orbital_correction(ifg_path, params):
fullres_dm = get_design_matrix(ifg0, degree, intercept=intercept)

ifg = shared.dem_or_ifg(ifg_path) if isinstance(ifg_path, str) else ifg_path
ifg_path = ifg.data_path
multi_path = MultiplePaths(ifg.data_path, params)
orb_on_disc = MultiplePaths.orb_error_path(ifg.data_path, params)

multi_path = MultiplePaths(ifg_path, params)
fullres_ifg = ifg # keep a backup
orb_on_disc = MultiplePaths.orb_error_path(ifg_path, params)
if not ifg.is_open:
ifg.open()

shared.nan_and_mm_convert(ifg, params)
fullres_ifg = ifg # keep a backup
fullres_phase = fullres_ifg.phase_data

if orb_on_disc.exists():
@@ -270,7 +268,6 @@ def independent_orbital_correction(ifg_path, params):

# set orbfit meta tag and save phase to file
_save_orbital_error_corrected_phase(fullres_ifg, params)
fullres_ifg.close()


def __orb_correction(fullres_dm, mlooked_dm, fullres_phase, mlooked_phase, offset=False):
11 changes: 5 additions & 6 deletions pyrate/core/phase_closure/closure_check.py
Original file line number Diff line number Diff line change
@@ -25,7 +25,7 @@
from pyrate.configuration import Configuration, MultiplePaths
from pyrate.core.phase_closure.sum_closure import sum_phase_closures
from pyrate.core.phase_closure.plot_closure import plot_closure
from pyrate.core.shared import Ifg
from pyrate.core.shared import Ifg, nan_and_mm_convert
from pyrate.core.logger import pyratelogger as log


@@ -34,23 +34,22 @@ def mask_pixels_with_unwrapping_errors(ifgs_breach_count: NDArray[(Any, Any, Any
params: dict) -> None:
"""
Find pixels in the phase data that breach closure_thr, and mask
(assign nans) to those pixels in those ifgs.
(assign NaNs) to those pixels in those ifgs.
:param ifgs_breach_count: unwrapping issues at pixels in all loops
:param num_occurrences_each_ifg: frequency of ifgs appearing in all loops
:param params: params dict
"""
log.debug("Updating phase data of retained ifgs")
log.debug("Masking phase data of retained ifgs")

for i, m_p in enumerate(params[C.INTERFEROGRAM_FILES]):
pix_index = ifgs_breach_count[:, :, i] == num_occurrences_each_ifg[i]
ifg = Ifg(m_p.tmp_sampled_path)
ifg.open()
ifg.nodata_value = params[C.NO_DATA_VALUE]
ifg.convert_to_nans()
nan_and_mm_convert(ifg, params)
ifg.phase_data[pix_index] = np.nan
ifg.write_modified_phase()

log.info(f"Updated phase data of {i + 1} retained ifgs after phase closure")
log.info(f"Masked phase data of {i + 1} retained ifgs after phase closure")
return None


7 changes: 3 additions & 4 deletions pyrate/core/ref_phs_est.py
Original file line number Diff line number Diff line change
@@ -24,7 +24,7 @@

import pyrate.constants as C
from pyrate.core import ifgconstants as ifc, shared
from pyrate.core.shared import joblib_log_level, nanmedian, Ifg
from pyrate.core.shared import joblib_log_level, nanmedian, Ifg, nan_and_mm_convert
from pyrate.core import mpiops
from pyrate.configuration import Configuration
from pyrate.core.logger import pyratelogger as log
@@ -172,9 +172,8 @@ def _update_phase_and_metadata(ifgs, ref_phs, params):
"""
def __inner(ifg, ref_ph):
ifg.open()
# nan-convert before subtracting ref phase
ifg.nodata_value = params["noDataValue"]
ifg.convert_to_nans()
# nan-convert and mm-convert before subtracting ref phase
nan_and_mm_convert(ifg, params)
# add 1e-20 to avoid 0.0 values being converted to NaN downstream (Github issue #310)
# TODO: implement a more robust way of avoiding this issue, e.g. using numpy masked
# arrays to mark invalid pixel values rather than directly changing values to NaN
11 changes: 3 additions & 8 deletions pyrate/core/refpixel.py
Original file line number Diff line number Diff line change
@@ -30,7 +30,7 @@
import pyrate.constants as C
from pyrate.core import ifgconstants as ifc
from pyrate.core import mpiops
from pyrate.core.shared import Ifg
from pyrate.core.shared import Ifg, nan_and_mm_convert
from pyrate.core.shared import joblib_log_level
from pyrate.core.logger import pyratelogger as log
from pyrate.core import prepifg_helper
@@ -53,12 +53,7 @@ def update_refpix_metadata(ifg_paths, refx, refy, transform, params):
ifg = Ifg(ifg_file)
log.debug("Open dataset")
ifg.open(readonly=True)
log.debug("Set no data value")
ifg.nodata_value = params["noDataValue"]
log.debug("Update no data values in dataset")
ifg.convert_to_nans()
log.debug("Convert mm")
ifg.convert_to_mm()
nan_and_mm_convert(ifg, params)
half_patch_size = params["refchipsize"] // 2
x, y = refx, refy
log.debug("Extract reference pixel windows")
@@ -76,7 +71,7 @@ def update_refpix_metadata(ifg_paths, refx, refy, transform, params):
ifc.PYRATE_MEAN_REF_AREA: str(mean_ref_area),
ifc.PYRATE_STDDEV_REF_AREA: str(stddev_ref_area)
})

ifg.write_modified_phase()
ifg.close()


Loading

0 comments on commit 8893509

Please sign in to comment.