Source code for ali.retrieval.measvec.transformer

from copy import copy
from typing import Tuple, Union, List, Dict
import numpy as np
import xarray as xr
from ali.retrieval.measvec import Transformer
from scipy.interpolate import UnivariateSpline
from skretrieval.core.radianceformat import RadianceBase, RadianceGridded


[docs] class SpectralRatio(Transformer): """ Spectral ratio at two wavelengths. Parameters ---------- wavel_1: float Wavelength of numerator. wavel_2: float Wavelength of numerator. """ def __init__(self, wavel_1: float, wavel_2: float): self._w1 = wavel_1 self._w2 = wavel_2 def transform(self, l1_data: RadianceBase, wf=None, covariance=False): data_1 = l1_data.data.sel(wavelength=self._w1, method='nearest') data_2 = l1_data.data.sel(wavelength=self._w2, method='nearest') data = l1_data.data.sel(wavelength=self._w1, method='nearest').copy(deep=True) data['radiance'] = data_1['radiance'] / data_2['radiance'] if 'error' in data.keys(): data['error'] = np.sqrt((data_1['error'] / data_2['radiance']) ** 2 + (data_2['error'] * data_1['radiance'] / data_2['radiance'] ** 2)) wf_keys = [key for key in data.keys() if 'wf' in key] if wf_keys: for key in wf_keys: data[key] = data_1[key] / data_2.radiance - data_2[key] * data_1.radiance / (data_2.radiance ** 2) l1_copy = copy(l1_data) l1_copy.data = data return l1_copy
[docs] class AltitudeNormalization(Transformer): """ Apply an altitude normalization Parameters ---------- norm_alts : Tuple[float, float] Tuple consisting of low and high altitude limits of the normalization range. nan_above: bool Make values above the altitude normalization range NaN. Default False couple_altitude: bool Account for altitude coupling in the Jacobian. Default True Examples -------- >>> from ali.retrieval.measvec.transformer import AltitudeNormalization >>> from ali.retrieval.measvec import MeasurementVectorElement >>> alt_norm = AltitudeNormalization((35000.0, 40000.0)) >>> meas_vec_el = MeasurementVectorElement() >>> meas_vec_el.add_transform(alt_norm) """ def __init__(self, norm_alts: Tuple[float, float], nan_above=False, couple_altitudes: bool = True): self._norm_alts = norm_alts self._nan_above = nan_above self._couple_altitude = couple_altitudes def transform(self, l1_data: Union[RadianceBase, List[RadianceBase]], wf=None, covariance=False): if type(l1_data) is list: return [self._transform(l1, covariance=covariance) for l1 in l1_data] return self._transform(l1_data, covariance=covariance) def _transform(self, l1_data, covariance=False): tanalts = l1_data.tangent_locations().altitude norm_data = l1_data.data.where((tanalts > self._norm_alts[0]) & (tanalts < self._norm_alts[1]), drop=True) data = l1_data.data.copy(deep=True) norm_rad = norm_data['radiance'].mean(dim='los') data['radiance'] = data['radiance'] / norm_rad if 'error' in data.keys(): norm_error = np.sqrt(1 / (len(norm_data.los) ** 2) * ((norm_data['error']) ** 2).sum()) variance = (((l1_data.data['error'] / l1_data.data['radiance']) ** 2 + (norm_error / norm_rad) ** 2)) * (data['radiance'] ** 2) data['error'] = np.sqrt(variance) if covariance: S = self.covariance(l1_data) data['error'] = xr.DataArray(np.sqrt(np.diag(S)), dims=['los'], coords=[l1_data.data.los.values]) data['covariance'] = S if self._nan_above: data['radiance'] = data['radiance'].where(tanalts < self._norm_alts[0]) wf_keys = [key for key in data.keys() if 'wf' in key] if wf_keys: for key in wf_keys: if self._couple_altitude: data[key] = (l1_data.data[key] / norm_rad + norm_data[key].mean(dim='los') * (l1_data.data['radiance'] / (norm_rad ** 2))) else: data[key] = (l1_data.data[key] / norm_rad) l1_copy = copy(l1_data) l1_copy.data = data return l1_copy def jacobian(self, l1_data): tanalts = l1_data.tangent_locations().altitude norm_data = l1_data.data.where((tanalts > self._norm_alts[0]) & (tanalts < self._norm_alts[1]), drop=True) norm_rad = norm_data['radiance'].mean(dim='los') J = np.zeros((len(l1_data.data.los), len(l1_data.data.los))) N = len(norm_data.los) norm_idx = norm_data.los.values sum_norm_rad = float(norm_data.radiance.sum().values) rad = l1_data.data.radiance.values # TODO: vectorize # if self._couple_altitude: for i in range(len(l1_data.data.los)): for j in range(len(l1_data.data.los)): if j == i: if j in norm_idx: J[i, j] = N * (sum_norm_rad - rad[i]) / (sum_norm_rad ** 2) else: J[i, j] = 1 / norm_rad.values else: if j in norm_idx: J[i, j] = rad[i] / (sum_norm_rad ** 2) else: J[i, j] = 0 return J
[docs] class AltitudeNormalizationShift(AltitudeNormalization): """ Apply an altitude normalization that shifts the measurements rather than using a division. This can be useful for cases when Parameters ---------- norm_alts : Tuple[float, float] Tuple consisting of low and high altitude limits of the normalization range. """ def transform(self, l1_data, wf=None, covariance=False): tanalts = l1_data.tangent_locations().altitude norm_data = l1_data.data.where((tanalts > self._norm_alts[0]) & (tanalts < self._norm_alts[1]), drop=True) data = l1_data.data.copy(deep=True) norm_rad = norm_data['radiance'].mean(dim='los') data['radiance'] = data['radiance'] - norm_rad if 'error' in data.keys(): norm_error = np.sqrt(((norm_data['error']) ** 2).sum()) / len(norm_data['error'].values) variance = ((l1_data.data['error'] ** 2) + (norm_error ** 2)) data['error'] = np.sqrt(variance) if covariance: S = self.covariance(l1_data) data['error'] = xr.DataArray(np.sqrt(np.diag(S)), dims=['los'], coords=[l1_data.data.los.values]) data['covariance'] = S l1_copy = copy(l1_data) l1_copy.data = data return l1_copy
[docs] class LogRadiance(Transformer): def transform(self, l1_data: RadianceBase, wf=None, covariance=False): data = l1_data.data.copy(deep=True) data['radiance'] = np.log(data['radiance']) if 'error' in data.keys(): data['error'] = l1_data.data['error'] / l1_data.data['radiance'] wf_keys = [key for key in data.keys() if 'wf' in key] if wf_keys: for key in wf_keys: data[key] = l1_data.data[key] / l1_data.data['radiance'] l1_copy = copy(l1_data) l1_copy.data = data return l1_copy
[docs] class FrameSelect(Transformer): def __init__(self, index=0): self._index = index def transform(self, l1_data: List[RadianceBase], wf=None, covariance=False): return l1_data[self._index]
[docs] class WavelengthSelect(Transformer): """ Select a single wavelength """ def __init__(self, wavelength: float, method='nearest'): self._wavelength = wavelength self._method = method def transform(self, l1_data: Union[RadianceBase, List[RadianceBase]], wf=None, covariance=False): if type(l1_data) is list: return [self._transform(l1, covariance=covariance) for l1 in l1_data] return self._transform(l1_data, covariance=covariance) def _transform(self, l1_data, wf=None, covariance=False): data = l1_data.data.copy(deep=True) l1_copy = copy(l1_data) l1_copy.data = data.sel(wavelength=self._wavelength, method=self._method) return l1_copy
[docs] class LinearCombination(Transformer): """ Compute a linear combination of radiances. `l1_data` is expected to be a List[RadianceBase]. Often useful for computing polarization states. .. math:: y = \sum_{i=0}^{N} l1[key_i] * weight_i Parameters ---------- weights: Dict[int, float] Dictionary in the form {frame_index: weight, ...} """ def __init__(self, weights: Dict[int, float]): self._weights = weights def transform(self, l1_data: List[RadianceBase], wf=None, covariance=False): idx0 = list(self._weights.keys())[0] data = l1_data[idx0].data.copy(deep=True) data['radiance'] = data['radiance'] * 0.0 for idx, w in self._weights.items(): data['radiance'] += w * l1_data[idx].data['radiance'] if 'error' in data.keys(): data['error'] = l1_data[idx0].data['error'] * 0.0 for idx, w in self._weights.items(): data['error'] += (w ** 2) * (l1_data[idx].data['error'] ** 2) data['error'] = np.sqrt(data['error']) wf_keys = [key for key in data.keys() if 'wf' in key] if wf_keys: for key in wf_keys: data[key] = l1_data[idx0].data[key] * 0.0 for idx, w in self._weights.items(): data[key] += w * l1_data[idx].data[key] l1_copy = copy(l1_data[idx0]) l1_copy.data = data return l1_copy
[docs] class FrameRatio(Transformer): def __init__(self, index_1: int = 0, index_2: int = 1): self._index_1 = index_1 self._index_2 = index_2 def transform(self, l1_data: List[RadianceBase], wf=None, covariance=False): data_1 = l1_data[self._index_1].data data_2 = l1_data[self._index_2].data data = l1_data[self._index_1].data.copy(deep=True) data['radiance'].values = data_1['radiance'].values / data_2['radiance'].values if 'error' in data.keys(): data['error'].values = np.sqrt((data_1['error'] / data_1['radiance']) ** 2 + (data_2['error'] / data_2['radiance']) ** 2) * np.abs(data['radiance']) data['error'] = data['error'].fillna(data['error'].max()) wf_keys = [key for key in data.keys() if 'wf' in key] if wf_keys: for key in wf_keys: data[key] = data_1[key] / data_2.radiance - data_2[key] * data_1.radiance / (data_2.radiance ** 2) l1_copy = copy(l1_data[self._index_1]) l1_copy.data = data return l1_copy
class FrameRatios(Transformer): def __init__(self, index: List[Tuple[int, int]]): self._index = index def transform(self, l1_data: List[RadianceBase], wf=None, covariance=False): data = [] for i1, i2 in self._index: mv = FrameRatio(index_1=i1, index_2=i2) data.append(mv.transform(l1_data, covariance=covariance)) return data class LinearCombinations(Transformer): def __init__(self, weights: List[Dict[int, float]]): self._weights = weights def transform(self, l1_data: List[RadianceBase], wf=None, covariance=False): data = [] for weights in self._weights: mv = LinearCombination(weights) data.append(mv.transform(l1_data, covariance=covariance)) return data
[docs] class SplineSmoothing(Transformer): def __init__(self, smoothing=200): self.smoothing = smoothing def transform(self, l1_data: Union[RadianceBase, List[RadianceBase]], wf=None, covariance=False): if type(l1_data) is list: return [self._transform(l1, covariance=covariance) for l1 in l1_data] return self._transform(l1_data, covariance=covariance) def _transform(self, l1_data: RadianceBase, covariance=False): data = l1_data.data.copy(deep=True) tanalts = l1_data.tangent_locations().altitude old_dims = data.radiance.dims good_data = data.radiance.transpose('los', ...).values good_data[~np.isfinite(good_data)] = 0.0 good_err = data.error.transpose('los', ...).values good_err[~np.isfinite(good_err)] = np.max(good_err[np.isfinite(good_err)]) if len(good_data.shape) > 1: new_data = [] for gd, ge in zip(good_data.T, good_err.T): spl = UnivariateSpline(x=tanalts, y=gd, w=1 / ge, s=self.smoothing) new_data.append(spl(tanalts)) new_data = np.array(new_data).T else: spl = UnivariateSpline(x=tanalts, y=good_data, w=1 / good_err, s=self.smoothing) new_data = spl(tanalts) data_rad = data.transpose('los', ...)['radiance'] data_rad.values = new_data data['radiance'].values = data_rad.transpose(*old_dims).values l1_copy = copy(l1_data) l1_copy.data = data return l1_copy
[docs] class VerticalDerivative(Transformer): def __init__(self, smoothing=200): self.smoothing = smoothing def transform(self, l1_data: Union[RadianceBase, List[RadianceBase]], wf=None, covariance=False): if type(l1_data) is list: return [self._transform(l1, covariance=covariance) for l1 in l1_data] return self._transform(l1_data, covariance=covariance) def _transform(self, l1_data: RadianceBase, covariance=False): data = l1_data.data.copy(deep=True) tanalts = l1_data.tangent_locations().altitude # Differentiation matrix n = len(data.los) h = np.concatenate([np.diff(tanalts), [tanalts[-1] - tanalts[-2]]]) D = np.diag(-np.ones(n) / h) + np.diag(np.ones(n - 1) / h[:-1], 1) # Fix the final row D[n - 1, n - 2] = -1 / h[-1] D[n - 1, n - 1] = 1 / h[-1] old_dims = data.radiance.dims good_data = data.radiance.transpose('los', ...).values good_data[~np.isfinite(good_data)] = 0.0 new_data = D @ good_data data_rad = data.transpose('los', ...)['radiance'] data_rad.values = new_data # D @ np.diag(data.radiance.transpose('los', ...).values ** 2) @ D.T data['radiance'].values = data_rad.transpose(*old_dims).values if 'error' in data.keys(): good_err = data.error.transpose('los', ...).values good_err[~np.isfinite(good_err)] = np.max(good_err[np.isfinite(good_err)]) new_error = np.sqrt((D ** 2) @ (good_err ** 2)) data_err = data.transpose('los', ...)['error'] data_err.values = new_error data['error'].values = data_err.transpose(*old_dims).values wf_keys = [key for key in data.keys() if 'wf' in key] if wf_keys: for key in wf_keys: old_dims = data[key].dims data_wf = data[key].transpose('los', ...) new_wf = D @ data_wf.values data_wf.values = new_wf data[key].values = data_wf.transpose(*old_dims).values l1_copy = copy(l1_data) l1_copy.data = data return l1_copy
class AltitudeBinning(Transformer): def __init__(self, altitudes: np.ndarray): self._altitudes = altitudes def transform(self, l1_data: Union[RadianceBase, List[RadianceBase]], wf=None, covariance=False): if type(l1_data) is list: return [self._transform(l1, covariance=covariance) for l1 in l1_data] return self._transform(l1_data, covariance=covariance) def _transform(self, l1_data: RadianceBase, wf=None, covariance=False): tanalts = l1_data.tangent_locations().altitude rad = [] los_dims = [dim for dim in l1_data.data['los_vectors'].dims if dim != 'xyz'] for idx, (min_alt, max_alt) in enumerate(zip(self._altitudes[:-1], self._altitudes[1:])): good = (tanalts > min_alt) & (tanalts < max_alt) tmp = l1_data.data.where(good).mean(dim=los_dims) tmp['los'] = idx if 'error' in tmp: tmp['error'] = tmp['error'] / np.sqrt(np.sum(good.values)) rad.append(tmp) rad = xr.concat(rad, dim='los') return RadianceGridded(rad) class RowAverage(Transformer): """ Reduce the dimension of the measurement data by averaging across a row from pixels `first_pixel` to `last_pixel`. If `first_pixel` is not given the entire row is used. """ def __init__(self, dim='nx', to_gridded: bool = True, first_pixel: int = None, last_pixel: int = None): self._dim = dim self._to_gridded = to_gridded self._first_pixel = first_pixel self._last_pixel = last_pixel def transform(self, l1_data: Union[RadianceBase, List[RadianceBase]], wf=None, covariance=False): if type(l1_data) is list: return [self._transform(l1, covariance=covariance) for l1 in l1_data] return self._transform(l1_data, covariance=covariance) def _transform(self, l1_data: RadianceBase, wf=None, covariance=False): if self._first_pixel: num_cols = self._last_pixel - self._first_pixel data = l1_data.data.isel({self._dim: slice(self._first_pixel, self._last_pixel)}).mean(dim=self._dim) else: num_cols = l1_data.data[self._dim].shape[0] data = l1_data.data.mean(dim=self._dim) if 'error' in data.keys(): data['error'].values = data['error'] / np.sqrt(num_cols) l1_copy = copy(l1_data) l1_copy.data = data if self._to_gridded: # return l1_copy.to_gridded() return RadianceGridded(data.rename({'ny': 'los'})) # TODO: implement error and weighting function transforms return l1_copy class Truncate(Transformer): def __init__(self, lower_bound: float = None, upper_bound: float = None): self._lowerbound = lower_bound self._upperbound = upper_bound def transform(self, l1_data: Union[RadianceBase, List[RadianceBase]], wf=None, covariance=False): if type(l1_data) is list: return [self._transform(l1, covariance=covariance) for l1 in l1_data] return self._transform(l1_data, covariance=covariance) def _transform(self, l1_data: RadianceBase, covariance=False): data = l1_data.data.copy(deep=True) tanalts = l1_data.tangent_locations().altitude if self._lowerbound is not None: data['radiance'] = data.radiance.where(tanalts > self._lowerbound) if self._upperbound is not None: data['radiance'] = data.radiance.where(tanalts < self._upperbound) l1_copy = copy(l1_data) l1_copy.data = data return l1_copy