"""S1-ETAD data model core classes."""
import enum
import pathlib
import datetime
import warnings
import functools
import itertools
import collections
from typing import Union
import numpy as np
import pandas as pd
import shapely.ops
from lxml import etree
from scipy import constants
from netCDF4 import Dataset
from shapely.geometry import Polygon, MultiPolygon
from shapely.geometry.base import BaseGeometry
from ._s1utils import Sentinel1ProductName
__all__ = [
"Sentinel1Etad",
"Sentinel1EtadSwath",
"Sentinel1EtadBurst",
"ECorrectionType",
]
[docs]
class ECorrectionType(enum.Enum):
"""Enumeration for correction types."""
TROPOSPHERIC = "tropospheric"
IONOSPHERIC = "ionospheric"
GEODETIC = "geodetic"
BISTATIC = "bistatic"
DOPPLER = "doppler"
FMRATE = "fmrate"
SUM = "sum"
CorrectionType = Union[ECorrectionType, str]
_CORRECTION_NAMES_MAP = {
"tropospheric": {"x": "troposphericCorrectionRg"},
"ionospheric": {"x": "ionosphericCorrectionRg"},
"geodetic": {"x": "geodeticCorrectionRg", "y": "geodeticCorrectionAz"},
"bistatic": {"y": "bistaticCorrectionAz"},
"doppler": {"x": "dopplerRangeShiftRg"},
"fmrate": {"y": "fmMismatchCorrectionAz"},
"sum": {"x": "sumOfCorrectionsRg", "y": "sumOfCorrectionsAz"},
}
_STATS_TAG_MAP = {
ECorrectionType.TROPOSPHERIC: "troposphericCorrection",
ECorrectionType.IONOSPHERIC: "ionosphericCorrection",
ECorrectionType.GEODETIC: "geodeticCorrection",
ECorrectionType.BISTATIC: "bistaticCorrection",
ECorrectionType.DOPPLER: "dopplerRangeShift",
ECorrectionType.FMRATE: "fmMismatchCorrection",
ECorrectionType.SUM: "sumOfCorrections",
}
Statistics = collections.namedtuple("Statistics", ["min", "mean", "max"])
[docs]
class Sentinel1Etad:
"""Sentinel-1 ETAD product.
Class to decode and access the elements of the Sentinel ETAD product
which specification is governed by ETAD-DLR-PS-0014.
The index operator [] (implemented with the __getitem__ method) returns
a Sentinel1EtadSwath instance.
Parameters
----------
product : str or pathlib.Path
path of the S1-ETAD product (it is a directory)
Attributes
----------
product : pathlib.Path
path of the S1-ETAD product (it is a directory)
burst_catalogue : pandas.DataFrame
dataframe containing main information of all bursts present in
the product
ds : netCDF.Dataset
(provisional) the NetCDF.Dataset in which data are stored
"""
def __init__(self, product):
"""Initialize a `Sentinel1Etad` object."""
# TODO: make this read-only (property)
self.product = pathlib.Path(product)
# TODO: ds should not be exposed
self.ds = self._init_measurement_dataset()
self._annot = self._init_annotation_dataset()
self.burst_catalogue = self._init_burst_catalogue()
def _init_measurement_dataset(self):
"""Open the nc dataset."""
# @TODO: retrieve form manifest
netcdf_file = next(self.product.glob("measurement/*.nc"))
rootgrp = Dataset(netcdf_file, "r")
rootgrp.set_auto_mask(False)
return rootgrp
def _init_annotation_dataset(self):
"""Open the xml annotation dataset."""
list_ = list(self.product.glob("annotation/*.xml"))
xml_file = str(list_[0])
root = etree.parse(xml_file).getroot()
return root
@functools.lru_cache
def __getitem__(self, index):
assert index in self.swath_list, f"{index} is not in {self.swath_list}"
return Sentinel1EtadSwath(self.ds[index])
def __iter__(self):
yield from self.iter_swaths()
def __repr__(self):
return f'{self.__class__.__name__}("{self.product}") # 0x{id(self):x}'
def __str__(self):
return f'{self.__class__.__name__}("{self.product.name}")'
@property
def number_of_swath(self):
"""Return the number of swaths in the product."""
return len(self.ds.groups)
@property
def swath_list(self):
"""Return the list of swath identifiers (str) in the product."""
return list(self.ds.groups.keys())
[docs]
def s1_product_list(self):
"""Return the list of S-1 products used to compose the ETAD one."""
df = self.burst_catalogue
# this ensures that each product name is located at the correct pIndex
product_list = [
item[1] for item in sorted(set(zip(df["pIndex"], df["productID"])))
]
return product_list
@property
def grid_spacing(self):
"""Return the grid spacing in meters."""
xp_list = {
"x": ".//correctionGridRangeSampling",
"y": ".//correctionGridAzimuthSampling",
}
dd = {}
for tag, xp in xp_list.items():
dd[tag] = self._xpath_to_list(self._annot, xp, dtype=float)
dd["unit"] = "m"
return dd
@property
def grid_sampling(self):
"""Return the grid spacing in s."""
xp_list = {
"x": ".//productInformation/gridSampling/range",
"y": ".//productInformation/gridSampling/azimuth",
}
dd = {}
for tag, xp in xp_list.items():
dd[tag] = self._xpath_to_list(self._annot, xp, dtype=float)
dd["unit"] = "s"
return dd
@property
def min_azimuth_time(self):
"""Return the minimum azimuth time of all bursts in the product."""
return datetime.datetime.fromisoformat(self.ds.azimuthTimeMin)
@property
def max_azimuth_time(self):
"""Return the maximum azimuth time of all bursts in the product."""
return datetime.datetime.fromisoformat(self.ds.azimuthTimeMax)
@property
def min_range_time(self):
"""Return the minimum range time of all bursts in the product."""
return self.ds.rangeTimeMin
@property
def max_range_time(self):
"""Return the maximum range time of all bursts in the product."""
return self.ds.rangeTimeMax
@property
def vg(self):
"""Mean ground velocity [m/s]."""
try:
xp = (
"productInformation/gridGroundSampling/"
"averageZeroDopplerVelocity"
)
vg = float(self._annot.find(xp).taxt)
except (AttributeError, ValueError):
vg = self.grid_spacing["y"] / self.grid_sampling["y"]
return vg
[docs]
def processing_setting(self):
"""Return the corrections performed.
Read the xml file to identify the corrections performed.
If a correction is not performed the matrix is filled with zeros.
"""
correction_list = [
"troposphericDelayCorrection",
"ionosphericDelayCorrection",
"solidEarthTideCorrection",
"bistaticAzimuthCorrection",
"dopplerShiftRangeCorrection",
"FMMismatchAzimuthCorrection",
]
dd = {}
xp_root = (
"processingInformation/processor/setapConfigurationFile/"
"processorSettings/"
)
for correction in correction_list:
xp = xp_root + correction
ret = self._xpath_to_list(self._annot, xp)
if ret == "true":
ret = True
else:
ret = False
dd[correction] = ret
return dd
def _init_burst_catalogue(self):
"""Build the burst catalog.
Using information stored in the NetCDF file create a
pandas.DataFrame containing all the elements allowing to index
properly a burst.
"""
def _to_tdelta64(t):
return np.float64(t * 1e9).astype("timedelta64[ns]")
data = collections.defaultdict(list)
t0 = np.datetime64(self.ds.azimuthTimeMin, "ns")
for swath in self.ds.groups.values():
for burst in swath.groups.values():
ax = burst.variables["azimuth"]
tmin = t0 + _to_tdelta64(ax[0])
tmax = t0 + _to_tdelta64(ax[-1])
data["bIndex"].append(burst.bIndex)
data["pIndex"].append(burst.pIndex)
data["sIndex"].append(burst.sIndex)
data["productID"].append(burst.productID)
data["swathID"].append(burst.swathID)
data["azimuthTimeMin"].append(tmin)
data["azimuthTimeMax"].append(tmax)
df = pd.DataFrame(data=data)
return df
[docs]
def query_burst(
self,
first_time=None,
product_name=None,
last_time=None,
swath=None,
geometry=None,
):
"""Query the burst catalogue to retrieve the burst matching by time.
Parameters
----------
first_time : datetime
is set to None then set to the first time
last_time : datetime
if set to None the last_time = first_time
product_name : str
Name of a real S1 product e.g.
S1B_IW_SLC__1SDV_20190805T162509_20190805T162...SAFE
swath : str or list
list of swathID e.g. 'IW1' or ['IW1'] or ['IW1', 'IW2']
geometry : shapely.geometry.[Point, Polygon, ...]
A shapely geometry for which intersection will be searched
Returns
-------
pandas.DataFrame
Filtered panda dataframe
"""
# first sort the burst by time
df = self.burst_catalogue.sort_values(by=["azimuthTimeMin"])
if first_time is None:
first_time = df.iloc[0].azimuthTimeMin
if last_time is None:
last_time = df.iloc[-1].azimuthTimeMax
ix0 = (df.azimuthTimeMin >= first_time) & (
df.azimuthTimeMax <= last_time
)
if product_name is not None:
# build a regex based on the name to avoid issues with annotation
# products and CRC
product_name = Sentinel1ProductName(product_name)
product_name.to_annotation(value="[AS]")
product_name.crc = ""
filter_ = product_name.recompose(with_suffix=False)
ix0 = ix0 & self.burst_catalogue.productID.str.contains(
filter_, regex=True
)
if swath is not None:
if isinstance(swath, str):
swath = [swath]
ix0 = ix0 & df.swathID.isin(swath)
if geometry is not None:
bix_list = self.intersects(geometry)
ix0 = ix0 & df.bIndex.isin(bix_list)
return df.loc[ix0]
def _selection_to_swath_list(self, selection=None):
if selection is None:
selection = self.burst_catalogue
if isinstance(selection, pd.DataFrame):
burst_selection = selection
swath_list = selection.swathID.unique()
elif isinstance(selection, str):
burst_selection = None
swath_list = [selection]
else:
# assume it is a list of swaths already
import collections.abc
assert isinstance(selection, collections.abc.Iterable)
assert all(isinstance(item, str) for item in selection)
burst_selection = None
swath_list = selection
return swath_list, burst_selection
[docs]
def iter_swaths(self, selection=None):
"""Iterate over swaths according to the specified selection.
Parameters
----------
selection : list(str) or pd.Dataframe, optional
the list of selected swath IDs or the result of a
Sentinel1Etad.query_burst query.
If the selection is None (default) the iteration is performed
on all the swaths of the product.
"""
swath_list, _ = self._selection_to_swath_list(selection)
for swath_name in swath_list:
yield self[swath_name]
[docs]
def iter_bursts(self, selection=None):
"""Iterate over burst according to the specified selection.
Parameters
----------
selection : list(int) or pd.Dataframe, optional
the list of selected burst indexes or the result of a
Sentinel1Etad.query_burst query.
If the selection is None (default) the iteration is performed
on all the bursts of the product.
"""
if selection is None:
selection = self.burst_catalogue
elif not isinstance(selection, pd.DataFrame):
# assume it is a list of burst indexes
bursts = selection
if isinstance(bursts, int):
bursts = [selection]
# NOTE: preserve the order
selection = self.burst_catalogue.bIndex.isin(bursts)
assert isinstance(selection, pd.DataFrame)
for _, row in selection.iterrows():
burst = self[row.swathID][row.bIndex]
yield burst
@staticmethod
def _xpath_to_list(
root, xpath, dtype=None, namespace=None, parse_time_func=None
):
ll = [elt.text for elt in root.findall(xpath, namespace)]
if parse_time_func is not None:
ll = [datetime.datetime.fromisoformat(t) for t in ll]
ll = parse_time_func(ll) # TODO: check
ll = np.asarray(ll, dtype=dtype)
if ll.size == 1:
return ll.item(0)
else:
return ll
[docs]
def get_statistics(self, correction, meter=False):
"""Return the global statistic value of the specified correction.
The returned value is the pre-computed one that is stored in the
XML annotation file of the product.
Parameters
----------
correction : str or ECorrectionType
the corrections for which the statistic value is requested
meter : bool
if set to True then the returned value is expressed in meters,
otherwise it is expressed in seconds (default: False)
Returns
-------
dict
a dictionary containing :class:`Statistics` (min, mean and max)
for all available components of the specified correction:
:x:
a :class:`Statistics` instance relative to the range
component of the specified correction
:y:
a :class:`Statistics` instance relative to the azimuth
component of the specified correction
:unit:
the units of the returned statistics ("m" or "s")
"""
units = "m" if meter else "s"
stat_xp = "./qualityAndStatistics"
target = ECorrectionType(correction)
target_tag = _STATS_TAG_MAP[target]
statistics = {"unit": units}
# NOTE: looping on element and heuristic test on tags is necessary
# due to inconsistent naming of range and azimuth element
# TODO: report the inconsistency to DLR? (TBD)
correction_elem = self._annot.find(f"{stat_xp}/{target_tag}")
for elem in correction_elem:
if "range" in elem.tag:
direction = "x"
elif "azimuth" in elem.tag:
direction = "y"
else:
continue
statistics[direction] = Statistics(
float(elem.findtext(f'min[@unit="{units}"]')),
float(elem.findtext(f'mean[@unit="{units}"]')),
float(elem.findtext(f'max[@unit="{units}"]')),
)
return statistics
[docs]
def intersects(self, geometry: BaseGeometry):
"""Return the list of burst indexes intersecting the input geometry.
Computes the intersection of the footprint of the swath (all bursts)
with the input geometry.
Parameters
----------
geometry : shapely.geometry.[Point, Polygon, MultiPolygon, line]
Returns
-------
list
list of all the burst intersecting with the input shape geometry
"""
lists_of_burst_indexes = [
swath.intersects(geometry) for swath in self.iter_swaths()
]
# return the flattened list
return list(itertools.chain(*lists_of_burst_indexes))
def _swath_merger(
self,
burst_var,
selection=None,
set_auto_mask=False,
meter=False,
fill_value=0.0,
):
if selection is None:
df = self.burst_catalogue
elif not isinstance(selection, pd.DataFrame):
df = self.query_burst(swath=selection)
else:
assert isinstance(selection, pd.DataFrame)
df = selection
# NOTE: assume a specific order of swath IDs
first_swath = self[df.swathID.min()]
near_burst = first_swath[first_swath.burst_list[0]]
last_swath = self[df.swathID.max()]
far_burst = last_swath[last_swath.burst_list[0]]
rg_first_time = near_burst.sampling_start["x"]
rg_last_time = (
far_burst.sampling_start["x"]
+ far_burst.sampling["x"] * far_burst.samples
)
az_first_time = df.azimuthTimeMin.min()
az_last_time = df.azimuthTimeMax.max()
az_ref_time = self.min_azimuth_time
az_first_time_rel = (az_first_time - az_ref_time).total_seconds()
sampling = self.grid_sampling
dx = sampling["x"]
dy = sampling["y"]
num_samples = (
np.round((rg_last_time - rg_first_time) / dx).astype(int) + 1
)
num_lines = (
np.round(
(az_last_time - az_first_time).total_seconds() / dy
).astype(int)
+ 1
)
img = np.full((num_lines, num_samples), fill_value=fill_value)
# TODO: add some control option
img = np.ma.array(img, mask=True, fill_value=fill_value)
for swath in self.iter_swaths(df):
# NOTE: use the private "Sentinel1EtadSwath._burst_merger" method
# to be able to work only on the specified NetCDF variable
dd_ = swath._burst_merger(
burst_var,
selection=df, # noqa
set_auto_mask=set_auto_mask,
meter=meter,
)
yoffset = dd_["first_azimuth_time"] - az_first_time_rel
xoffset = dd_["first_slant_range_time"] - rg_first_time
line_ofs = np.round(yoffset / dy).astype(int)
sample_ofs = np.round(xoffset / dx).astype(int)
slice_y = slice(line_ofs, line_ofs + dd_[burst_var].shape[0])
slice_x = slice(sample_ofs, sample_ofs + dd_[burst_var].shape[1])
img[slice_y, slice_x] = dd_[burst_var]
return {
burst_var: img,
"first_azimuth_time": az_first_time,
"first_slant_range_time": rg_first_time,
"sampling": sampling,
}
def _core_merge_correction(
self, prm_list, selection=None, set_auto_mask=True, meter=False
):
dd = {}
for dim, field in prm_list.items():
dd_ = self._swath_merger(
field,
selection=selection,
set_auto_mask=set_auto_mask,
meter=meter,
)
dd[dim] = dd_[field]
dd["sampling"] = dd_["sampling"]
dd["first_azimuth_time"] = dd_["first_azimuth_time"]
dd["first_slant_range_time"] = dd_["first_slant_range_time"]
dd["unit"] = "m" if meter else "s"
# To compute lat/lon/h make a new selection with all gaps filled
swath_list, _ = self._selection_to_swath_list(selection)
near_swath = min(swath_list)
far_swath = max(swath_list)
idx = self.burst_catalogue.swathID >= near_swath
idx &= self.burst_catalogue.swathID <= far_swath
swaths = self.burst_catalogue.swathID[idx].unique()
data = dd["x" if "x" in prm_list else "y"]
lines = data.shape[0]
duration = lines * self.grid_sampling["y"]
duration = np.float64(duration * 1e9).astype("timedelta64[ns]")
first_time = dd["first_azimuth_time"]
last_time = first_time + duration
filled_selection = self.query_burst(
first_time=first_time, last_time=last_time, swath=swaths
)
dd["lats"] = self._swath_merger(
"lats",
selection=filled_selection,
set_auto_mask=set_auto_mask,
meter=False,
fill_value=np.nan,
)["lats"]
dd["lons"] = self._swath_merger(
"lons",
selection=filled_selection,
set_auto_mask=set_auto_mask,
meter=False,
fill_value=np.nan,
)["lons"]
dd["height"] = self._swath_merger(
"height",
selection=filled_selection,
set_auto_mask=set_auto_mask,
meter=False,
fill_value=np.nan,
)["height"]
return dd
[docs]
def merge_correction(
self,
name: CorrectionType = ECorrectionType.SUM,
selection=None,
set_auto_mask=True,
meter=False,
direction=None,
):
"""Merge multiple swaths of the specified correction variable.
Data of the selected swaths (typically overlapped) are merged
together to form a single data matrix with a consistent (range and
azimuth) time axis.
Note
----
The current implementation uses a very simple algorithm that
iterates over selected swaths and bursts and stitches correction
data together.
In overlapping regions, new data simply overwrite the old ones.
This is an easy algorithm and perfectly correct for atmospheric
and geodetic correction.
It is, instead, sub-optimal for system corrections (bi-static,
Doppler, FM Rate) which have different values in overlapping
regions. In this case results are *not* correct.
Parameters
----------
name : str or CorrectionType
the name of the desired correction
selection : list or pandas.DataFrame
list of selected bursts (by default all bursts are selected)
set_auto_mask : bool
requested for netCDF4 to avoid retrieving a masked array
meter : bool
transform the result in meters
direction : str or None
if set to "x" (for range) or "y" (for "azimuth") only extracts
the specified correction component.
By default (None) all available components are returned.
Returns
-------
dict
a dictionary containing merged data and sampling information:
:<burst_var_name>:
merged data for the selected burst_var
:first_azimuth_time:
the relative azimuth first time
:first_slant_range_time:
the relative (slant) range first time
:sampling:
a dictionary containing the sampling along the
'x' and 'y' directions and the 'unit'
:units:
of the correction (seconds or meters)
:lats:
the matrix of latitude values (in degrees) for each point
:lons:
the matrix of longitude values (in degrees) for each point
:height:
the matrix of height values (in meters) for each point
"""
correction_type = ECorrectionType(name) # check values
prm_list = _CORRECTION_NAMES_MAP[correction_type.value]
if direction is not None:
prm_list = {direction: prm_list[direction]}
correction = self._core_merge_correction(
prm_list,
selection=selection,
set_auto_mask=set_auto_mask,
meter=meter,
)
correction["name"] = correction_type.value
return correction
[docs]
class Sentinel1EtadSwath:
"""Object representing a swath in the S1-ETAD product.
This objects are returned by methods of the :class:`Sentine1Etad` class.
It is not expected that the user instantiates this objects directly.
"""
def __init__(self, nc_group):
"""Initialize a `Sentinel1EtadSwath` object."""
self._grp = nc_group
@functools.lru_cache
def __getitem__(self, burst_index):
burst_name = f"Burst{burst_index:04d}"
return Sentinel1EtadBurst(self._grp[burst_name])
def __iter__(self):
yield from self.iter_bursts()
def __repr__(self):
return f'{self.__class__.__name__}("{self._grp.path}") 0x{id(self):x}'
@property
def burst_list(self):
"""Return the list of burst identifiers of all bursts in the swath."""
return [burst.bIndex for burst in self._grp.groups.values()]
@property
def number_of_burst(self):
"""Return the number of bursts in the swath."""
return len(self._grp.groups)
@property
def swath_id(self):
"""Return the swath identifier (str)."""
return self._grp.swathID
@property
def swath_index(self):
"""Return the swath index (int)."""
return self._grp.sIndex
@property
def sampling_start(self):
"""Return the relative sampling start times."""
first_burst_index = self.burst_list[0]
first_burst = self[first_burst_index]
return first_burst.sampling_start
@property
def sampling(self):
"""Return the sampling in seconds used for all bursts of the swath.
A dictionary containing the following keys:
* "x": range spacing,
* "y": azimuth spacing,
* "units": the measurement units used for "x' and "y"
"""
first_burst_index = self.burst_list[0]
first_burst = self[first_burst_index]
return first_burst.sampling
def _selection_to_burst_index_list(self, selection=None):
if selection is None:
index_list = self.burst_list
elif isinstance(selection, pd.DataFrame):
idx = selection.swathID == self.swath_id
index_list = selection.bIndex[idx].values
else:
index_list = selection
return index_list
[docs]
def iter_bursts(self, selection=None):
"""Iterate over bursts according to the specified selection.
Parameters
----------
selection : list(int) or pd.Dataframe, optional
the list of selected bursts or result of a
Sentinel1Etad.query_burst query.
If the selection is None (default) the iteration is performed
on all the burst of the swath.
"""
index_list = self._selection_to_burst_index_list(selection)
for burst_index in index_list:
yield self[burst_index]
[docs]
def intersects(self, geometry: BaseGeometry):
"""Return the list of burst indexes intersecting the input geometry.
Computes the intersection of the footprint of the swath (all bursts)
with the input Geometry
Parameters
----------
geometry : shapely.geometry.[Point, Polygon, MultiPolygon, line]
Returns
-------
list
list of the indexes of all bursts intersecting with the input
geometry
"""
assert isinstance(
geometry, BaseGeometry
), "The input shape is not a shapely BaseGeometry object"
burst_index_list = []
swath_footprint = self.get_footprint(merge=True)
if swath_footprint.intersects(geometry):
burst_index_list = [
b.burst_index
for b in self.iter_bursts()
if b.intersects(geometry)
]
return burst_index_list
def _burst_merger(
self,
burst_var,
selection=None,
az_time_min=None,
az_time_max=None,
set_auto_mask=False,
meter=False,
fill_value=0.0,
):
"""Low level method to de-burst a NetCDF variable.
The de-burst strategy is simple as the latest line is on top of the
oldest.
Parameters
----------
burst_var : str
one of the burst netcdf variables
selection : list or pandas.DataFrame
list of selected bursts (by default all bursts are selected)
az_time_min : float
minimum azimuth time of the merged swath
(relative to the reference annotated in the NetCDF root)
az_time_max : float
maximum azimuth tim eof the merged swath
(relative to the reference annotated in the NetCDF root)
set_auto_mask : bool
requested for netCDF4 to avoid retrieving a masked array
meter : bool
transform the result in meters
Returns
-------
dict
a dictionary containing merged data and sampling information:
:<burst_var_name>: merged data for the selected burst_var
:first_azimuth_time: the relative azimuth first time
:first_slant_range_time: the relative (slant) range first time
:sampling: a dictionary containing the sampling along the
'x' and 'y' directions and the 'unit'
"""
burst_index_list = self._selection_to_burst_index_list(selection)
# Find what is the extent of the acquisition in azimuth
first_burst = self[burst_index_list[0]]
last_burst = self[burst_index_list[-1]]
if az_time_min is None:
t0 = first_burst.sampling_start["y"]
else:
t0 = az_time_min
last_azimuth, _ = last_burst.get_burst_grid()
if az_time_max is None:
t1 = last_azimuth[-1]
else:
t1 = az_time_max
tau0 = min(
burst.sampling_start["x"]
for burst in self.iter_bursts(burst_index_list)
)
# grid sampling
dt = first_burst.sampling["y"]
dtau = first_burst.sampling["x"]
num_lines = np.round((t1 - t0) / dt).astype(int) + 1
num_samples = max(
burst.samples for burst in self.iter_bursts(burst_index_list)
)
debursted_var = np.full(
(num_lines, num_samples), fill_value=fill_value
)
# TODO: add some control option
debursted_var = np.ma.array(
debursted_var, mask=True, fill_value=fill_value
)
for burst_ in self.iter_bursts(burst_index_list):
assert (
dt == burst_.sampling["y"]
), "The azimuth sampling is changing long azimuth"
assert (
first_burst.sampling_start["x"] == burst_.sampling_start["x"]
), "The 2-way range gridStartRangeTime is changing long azimuth"
# get the timing of the burst and convert into line index
az_time_, rg_time_ = burst_.get_burst_grid()
line_index_ = np.round((az_time_ - t0) / dt).astype(int)
p0 = np.round((rg_time_[0] - tau0) / dtau).astype(int)
# NOTE: use the private "Sentinel1EtadBurst._get_etad_param" method
# to be able to work only on the specified NetCDF variable
var_ = burst_._get_etad_param(
burst_var, set_auto_mask=set_auto_mask, meter=meter # noqa
)
_, burst_samples = var_.shape
debursted_var[line_index_, p0 : p0 + burst_samples] = var_
return {
burst_var: debursted_var,
"first_azimuth_time": t0,
"first_slant_range_time": first_burst.sampling_start["x"],
"sampling": first_burst.sampling,
}
def _core_merge_correction(
self, prm_list, selection=None, set_auto_mask=True, meter=False
):
dd = {}
for dim, field in prm_list.items():
dd_ = self._burst_merger(
field,
selection=selection,
set_auto_mask=set_auto_mask,
meter=meter,
)
dd[dim] = dd_[field]
dd["sampling"] = dd_["sampling"]
dd["first_azimuth_time"] = dd_["first_azimuth_time"]
dd["first_slant_range_time"] = dd_["first_slant_range_time"]
dd["unit"] = "m" if meter else "s"
dd["lats"] = self._burst_merger(
"lats", set_auto_mask=set_auto_mask, meter=False
)["lats"]
dd["lons"] = self._burst_merger(
"lons", set_auto_mask=set_auto_mask, meter=False
)["lons"]
dd["height"] = self._burst_merger(
"height", set_auto_mask=set_auto_mask, meter=False
)["height"]
return dd
[docs]
def merge_correction(
self,
name: CorrectionType = ECorrectionType.SUM,
selection=None,
set_auto_mask=True,
meter=False,
direction=None,
):
"""Merge multiple bursts of the specified correction variable.
Data of the selected bursts (typically overlapped) are merged
together to form a single data matrix with a consistent (azimuth)
time axis.
Note
----
The current implementation uses a very simple algorithm that
iterates over selected bursts and stitches correction data
together.
In overlapping regions, new data simply overwrite the old ones.
This is an easy algorithm and perfectly correct for atmospheric
and geodetic correction.
It is, instead, sub-optimal for system corrections (bi-static,
Doppler, FM Rate) which have different values in overlapping
regions. In this case results are *not* correct.
Parameters
----------
name : str or CorrectionType
the name of the desired correction
selection : list or pandas.DataFrame
list of selected bursts (by default all bursts are selected)
set_auto_mask : bool
requested for netCDF4 to avoid retrieving a masked array
meter : bool
transform the result in meters
direction : str or None
if set to "x" (for range) or "y" (for "azimuth") only extracts
the specified correction component.
By default (None) all available components are returned.
Returns
-------
dict
a dictionary containing merged data and sampling information:
:<burst_var_name>:
merged data for the selected burst_var
:first_azimuth_time:
the relative azimuth first time
:first_slant_range_time:
the relative (slant) range first time
:sampling:
a dictionary containing the sampling along the
'x' and 'y' directions and the 'unit'
:units:
of the correction (seconds or meters)
:lats:
the matrix of latitude values (in degrees) for each point
:lons:
the matrix of longitude values (in degrees) for each point
:height:
the matrix of height values (in meters) for each point
"""
correction_type = ECorrectionType(name) # check values
prm_list = _CORRECTION_NAMES_MAP[correction_type.value]
if direction is not None:
prm_list = {direction: prm_list[direction]}
correction = self._core_merge_correction(
prm_list,
selection=selection,
set_auto_mask=set_auto_mask,
meter=meter,
)
correction["name"] = correction_type.value
return correction
[docs]
class Sentinel1EtadBurst:
"""Object representing a burst in the S1-ETAD product.
This objects are returned by methods of the :class:`Sentinel1EtadSwath`
class.
It is not expected that the user instantiates this objects directly.
"""
def __init__(self, nc_group):
"""Initialize a `Sentinel1EtadBurst` object."""
self._grp = nc_group
self._geocoder = None
def __repr__(self):
return f'{self.__class__.__name__}("{self._grp.path}") 0x{id(self):x}'
@property
def product_id(self) -> str:
"""Return the S1 product name (str) to which the burst belongs."""
return self._grp.productID
@property
def swath_id(self) -> str:
"""Return the swath identifier (str) to which the burst belongs."""
return self._grp.swathID
@property
def burst_id(self) -> str:
"""Return the burst identifier."""
return self._grp.name
@property
def product_index(self):
"""Return the index of the S1 product to which the burst belongs."""
return self._grp.pIndex
@property
def swath_index(self):
"""Return the index (int) of the swath to which the burst belongs."""
return self._grp.sIndex
@property
def burst_index(self):
"""Return the index (int) of the burst."""
return self._grp.bIndex
[docs]
def intersects(self, geometry: BaseGeometry):
"""Intersect the footprint of the burst with the provided shape.
Parameters
----------
geometry : shapely.geometry.[Point, Polygon, MultiPolygon, line]
Returns
-------
bool
True if intersects, False otherwise
"""
assert isinstance(
geometry, BaseGeometry
), "Not a shapely BaseGeometry object"
return self.get_footprint().intersects(geometry)
[docs]
def get_burst_grid(self):
"""Return the t, tau grid of the burst."""
azimuth = self._get_etad_param("azimuth", set_auto_mask=True)
range_ = self._get_etad_param("range", set_auto_mask=True)
return azimuth, range_
@property
def sampling_start(self):
"""Relative sampling start times.
Value in seconds relative to the beginning of the product.
"""
# TODO: put a reference in the docstring to the proper
# Sentinel1Etad property.
return {
"x": self._grp.gridStartRangeTime,
"y": self._grp.gridStartAzimuthTime,
"units": "s",
}
@property
def sampling(self):
"""Return the sampling in seconds used for all bursts of the swath.
A dictionary containing the following keys:
* "x": range spacing,
* "y": azimuth spacing,
* "units": the measurement units used for "x' and "y"
"""
return {
"x": self._grp.gridSamplingRange,
"y": self._grp.gridSamplingAzimuth,
"units": "s",
}
@property
def lines(self):
"""Return the number of lines in the burst."""
return self._grp.dimensions["azimuthExtent"].size
@property
def samples(self):
"""Return the number of samples in the burst."""
return self._grp.dimensions["rangeExtent"].size
@property
def vg(self) -> float:
"""Average zero-Doppler ground velocity [m/s]."""
return self._grp.averageZeroDopplerVelocity
@property
def reference_polarization(self) -> str:
"""Reverence polarization (string)."""
return self._grp.referencePolarization
[docs]
def get_polarimetric_channel_offset(self, channel: str) -> dict:
"""Polarimetric channel delay.
Return the electronic delay of the specified polarimetric channel
w.r.t. the reference one (see
:data:`Sentinel1EtadBurst.reference_polarization`).
channel : str
the string ID of the requested polarimetric channel:
* 'VV' or 'VH' for DV products
* 'HH' or 'HV' for DH products
"""
if channel not in {"HH", "HV", "VV", "VH"}:
raise ValueError(f"invalid channel ID: {channel!r}")
if channel[0] != self._grp.referencePolarization[0]:
raise ValueError(
f"polarimetric channel not available: {channel!r}"
)
data = {"units": "s"}
if channel == "HH":
data["x"] = (self._grp.rangeOffsetHH,)
data["y"] = (self._grp.rangeOffsetHH,)
elif channel == "HV":
data["x"] = (self._grp.rangeOffsetHV,)
data["y"] = (self._grp.rangeOffsetHV,)
elif channel == "VH":
data["x"] = (self._grp.rangeOffsetVH,)
data["y"] = (self._grp.rangeOffsetVH,)
elif channel == "VV":
data["x"] = (self._grp.rangeOffsetVV,)
data["y"] = (self._grp.rangeOffsetVV,)
return data
[docs]
def get_timing_calibration_constants(self) -> dict:
"""Return the calibration constant for timing."""
try:
return {
"x": self._grp.instrumentTimingCalibrationRange,
"y": self._grp.instrumentTimingCalibrationAzimuth,
"units": "s",
}
except AttributeError:
# @COMPATIBILITY: with SETAP , v1.6
warnings.warn(
"instrument timing calibration constants are not available "
"in the NetCDF data component this product. "
"Calibration constants have been added to the NetCDF "
"component in SETAP v1.6 (ETAD-DLR-PS-0014 - "
'"ETAD Product Format Specification" Issue 1.5).',
stacklevel=2,
)
return {"x": 0, "y": 0, "units": "s"}
def _get_etad_param(
self, name, set_auto_mask=False, transpose=False, meter=False
):
assert (
name in self._grp.variables
), f"Parameter {name!r} is not allowed"
self._grp.set_auto_mask(set_auto_mask)
# TODO: avoid double copies
# TODO: decimation factor
field = np.asarray(self._grp[name])
if transpose:
field = np.transpose(field)
if meter:
if name.endswith("Az"):
k = self._grp.averageZeroDopplerVelocity
elif name.endswith("Rg"):
k = constants.c / 2
else:
# it is not a correction (azimuth, range, lats, lons, height)
k = 1
warnings.warn(
f"the {name} is not a correction: "
f"the 'meter' parameter will be ignored",
stacklevel=2,
)
field *= k
return field
[docs]
def get_lat_lon_height(self, transpose=False):
"""Return the latitude, longitude and height for each point.
Data are returned as (3) matrices (lines x samples).
Latitude and longitude are expressed in degrees, height is
expressed in meters.
"""
lats = self._get_etad_param(
"lats", transpose=transpose, meter=False, set_auto_mask=True
)
lons = self._get_etad_param(
"lons", transpose=transpose, meter=False, set_auto_mask=True
)
h = self._get_etad_param(
"height", transpose=transpose, meter=False, set_auto_mask=True
)
return lats, lons, h
def _core_get_correction(
self, prm_list, set_auto_mask=False, transpose=False, meter=False
):
correction = {}
for dim, field in prm_list.items():
correction[dim] = self._get_etad_param(
field,
set_auto_mask=set_auto_mask,
transpose=transpose,
meter=meter,
)
correction["unit"] = "m" if meter else "s"
return correction
[docs]
def get_correction(
self,
name: CorrectionType = ECorrectionType.SUM,
set_auto_mask=False,
transpose=False,
meter=False,
direction=None,
):
"""Retrieve the correction for the specified correction "name".
Puts the results in a dict.
Parameters
----------
name : ECorrectionType or str
the desired correction
set_auto_mask : bool
requested for netCDF4 to avoid retrieving a masked array
transpose : bool
requested to retrieve the correction in array following the
numpy convention for dimensions (default: False)
meter : bool
transform the result in meters
direction : str or None
if set to "x" (for range) or "y" (for "azimuth") only extracts
the specified correction component.
By default (None) all available components are returned.
Returns
-------
dict
a dictionary containing the following items for the
requested correction:
:x: correction in range (if applicable)
:y: correction in azimuth (if applicable)
:unit: 'm' or 's'
:name: name of the correction
"""
correction_type = ECorrectionType(name) # check values
name = correction_type.value
prm_list = _CORRECTION_NAMES_MAP[name]
if direction is not None:
prm_list = {direction: prm_list[direction]}
correction = self._core_get_correction(
prm_list,
set_auto_mask=set_auto_mask,
transpose=transpose,
meter=meter,
)
correction["name"] = name
return correction
def _get_geocoder(self):
if self._geocoder is None:
from .geometry import GridGeocoding
azimuth, range_ = self.get_burst_grid()
lats, lons, heights = self.get_lat_lon_height()
self._geocoder = GridGeocoding(
lats, lons, heights, xaxis=range_, yaxis=azimuth
)
return self._geocoder
[docs]
def radar_to_geodetic(self, tau, t, deg=True):
"""Convert RADAR coordinates into geodetic coordinates.
Compute the geodetic coordinates (lat, lon, h) corresponding to
RADAR coordinates (tau, t), i.e. fast time (range time) and slow
time (azimuth time expressed in seconds form the reference
:data:`Sentinel1Etad.min_azimuth_time`)::
(tau, t) -> (lat, lon, h)
If ``deg`` is True the output ``lat`` and ``lon`` are expressed
in degrees, otherwise in radians.
The implementation is approximated and exploits pre-computed grids
of latitude, longitude and height values.
The method is not as accurate as solving the range-Doppler equations.
.. seealso:: :class:`s1etad.geometry.GridGeocoding`.
"""
return self._get_geocoder().forward_geocode(tau, t, deg=deg)
[docs]
def geodetic_to_radar(self, lat, lon, h=0, deg=True):
"""Convert geodetic coordinates into RADAR coordinates.
Compute the RADAR coordinates (tau, t), i.e. fast time (range time)
and slow time (azimuth time expressed in seconds form the reference
:data:`Sentinel1Etad.min_azimuth_time`) corresponding to
geodetic coordinates (lat, lon, h)::
(lat, lon, h) -> (tau, t)
If ``deg`` is True it is assumed that input ``lat`` and ``lon``
are expressed in degrees, otherwise it is assumed that angles
are expressed in radians.
The implementation is approximated and exploits pre-computed grids
of latitude, longitude and height values.
The method is not as accurate as solving the range-Doppler equations.
.. seealso:: :class:`s1etad.geometry.GridGeocoding`.
"""
return self._get_geocoder().backward_geocode(lat, lon, h, deg=deg)
[docs]
def radar_to_image(self, t, tau):
"""Convert RADAR coordinates into image coordinates.
Compute the image coordinates (line, sample) corresponding
to RADAR coordinates (tau, t), i.e. fast time (range time) and
slow time (azimuth time expressed in seconds form the reference
:data:`Sentinel1Etad.min_azimuth_time`)::
(tau, t) -> (line, sample)
"""
line = (t - self.sampling_start["y"]) / self.sampling["y"]
sample = (tau - self.sampling_start["x"]) / self.sampling["x"]
return line, sample
[docs]
def image_to_radar(self, line, sample):
"""Convert image coordinates into RADAR coordinates.
Compute the RADAR coordinates (tau, t), i.e. fast time (range time)
and slow time (azimuth time expressed in seconds form the reference
:data:`Sentinel1Etad.min_azimuth_time`) corresponding to
image coordinates (line, sample)::
(line, sample) -> (t, tau)
"""
t = self.sampling_start["y"] + line * self.sampling["y"]
tau = self.sampling_start["x"] + sample * self.sampling["x"]
return t, tau