Source code for ali.retrieval.statevector

from skretrieval.retrieval.statevector.profile import StateVectorProfileLogND
from skretrieval.retrieval.tikhonov import two_dim_vertical_second_deriv
import sasktran as sk
import numpy as np
import xarray as xr
from copy import copy


[docs] class StateVectorAerosolProfile(StateVectorProfileLogND): def __init__(self, altitudes_m: np.array, values: np.array, species_name: str, optical_property: sk.OpticalProperty, lowerbound: float, upperbound: float, second_order_tikhonov_factor: np.ndarray = np.zeros(2), second_order_tikhonov_altitude: np.ndarray = np.array((0.0, 100_000.0)), bounding_factor=1e4, zero_factor: float = 1e-20, max_update_factor: float = 5, scale_range: float = 200): """ A state vector element which defines a vertical profile as the logarithm of number density. The profile is bounded above and below by the upperbound and lowerbound through the use of the apriori. A second order tikhonov smoothing is included. Number dennsities less than a certain factor (zero_factor) are treated as identically equal to 0 within the RTM to prevent numerical precision problems Parameters ---------- altitudes_m: np.array Altitudes of the profile, should match weighting function altitudes values: np.array log(number density) for the profile species_name: str Name of the species in the atmosphere optical_property: sk.OpticalProperty Optical property of the species lowerbound: float Lowerbound of the retrieval in m upperbound: float Upperbound of the retrieval in m second_order_tikhonov_factor: np.ndarray Altitude dependent second order tikhonov factor, should be the same size as second_order_tikhonov_altitude second_order_tikhonov_altitude: np.ndarray Altitudes of Tikhonov factors, should be the same size as second_order_tikhonov_factor bounding_factor: float, optional Bounding factor used for the apriori upper/lower bounding. Default 1e4 zero_factor: float, optional Number densities less than this number are treated as identically 0. Default 1e-20 max_update_factor: float, optional Maximum factor to update any element of the state vector every iteration. Default 5 scale_range: float The range in km that is used for scaling of the profile below the upper bound and above the lower bound. For example, if the `upper_bound` is 40km and `scale_range` is 5km, then the range between 35 and 40km will be used as the region to determine the scale factor above 40km. """ self._second_order_tikhonov_factor = second_order_tikhonov_factor self._second_order_tikhonov_altitude = second_order_tikhonov_altitude super().__init__(altitudes_m=altitudes_m, values=values, species_name=species_name, optical_property=optical_property, lowerbound=lowerbound, upperbound=upperbound, second_order_tikhonov_factor=1.0, bounding_factor=bounding_factor, zero_factor=zero_factor, max_update_factor=max_update_factor) self._second_order_tikhonov_factor = second_order_tikhonov_factor self._second_order_tikhonov_altitude = second_order_tikhonov_altitude self._upper_scale_range = scale_range self._lower_scale_range = scale_range self._upper_scale_alts = None self._lower_scale_alts = None self._compute_apriori_covariance() @property def initial_state(self): return self._initial_state def max_value(self): return 1000.0 def min_value(self): return 0.0 def _compute_apriori_covariance(self): if type(self._second_order_tikhonov_factor * 1.0) == float: tikh_factor = self._second_order_tikhonov_factor else: tikh_factor = np.interp(self._altitudes_m[self.retrieval_alts()], self._second_order_tikhonov_altitude, self._second_order_tikhonov_factor) # n = len(self._values[~self._zero_mask]) n = len(self.state()) # Link above # bound_alts = np.array([0, self._upperbound - 2000, self._upperbound + 2000, 100000.0]) # bounds = np.array([0, 0, self._bounding_factor, self._bounding_factor]) # bounding_factor = np.interp(self._altitudes_m, bound_alts, bounds) # gamma = two_dim_vertical_first_deriv(1, n, factor=bounding_factor) # gamma = two_dim_vertical_first_deriv(1, n, factor=self._bounding_factor) # bounded_altitudes = self._altitudes_m[~self._zero_mask] > self._upperbound # first_idx = np.nonzero(bounded_altitudes)[0][0] # bounded_altitudes[first_idx - 1] = True # gamma[~bounded_altitudes, :] = 0 # self._inverse_Sa_bounding = gamma.T @ gamma # # # Link below # gamma = two_dim_vertical_first_deriv(1, n, factor=self._bounding_factor) # bounded_altitudes = self._altitudes_m[~self._zero_mask] < self._lowerbound # gamma[~bounded_altitudes, :] = 0 # # self._inverse_Sa_bounding += gamma.T @ gamma self._inverse_Sa_bounding = np.zeros((n, n)) gamma = two_dim_vertical_second_deriv(1, n, factor=tikh_factor) self._inverse_Sa = gamma.T @ gamma def retrieval_alts(self): return (self._altitudes_m < self._upperbound) & (self._altitudes_m > self._lowerbound) def state(self): return super().state()[self.retrieval_alts()] def update_state(self, x: np.array): m_factors = np.exp(x - (self._values[~self._zero_mask & self.retrieval_alts()])) m_factors = np.interp(self._altitudes_m, self._altitudes_m[self.retrieval_alts()], m_factors, left=m_factors[0], right=m_factors[-1]) # clip update factors to avoid too large of swings m_factors[m_factors > self._max_update_factor] = self._max_update_factor m_factors[m_factors < 1 / self._max_update_factor] = 1 / self._max_update_factor # compute the scale factor above the retrieval bounds m_factors[self._altitudes_m > self._upperbound] = np.mean(m_factors[self.upper_scale_alts]) # compute the scale factor below the retrieval bounds m_factors[self._altitudes_m <= self._lowerbound] = np.mean(m_factors[self.lower_scale_alts]) # apply scale factor in number density space and clip extreme values new_masked_values = m_factors * np.exp(self._values[~self._zero_mask]) new_masked_values[new_masked_values > self.max_value()] = self.max_value() new_masked_values[new_masked_values < self.min_value()] = self.min_value() self._values[~self._zero_mask] = np.log(copy(new_masked_values)) self._internal_update() @property def upper_scale_alts(self): """ Altitudes in the upper altitude range used for profile scaling """ good = (self._altitudes_m > self._upperbound - self._upper_scale_range) & \ (self._altitudes_m < self._upperbound) if sum(good) == 0: # if no values in range, choose highest possible altitude good[np.where(self._altitudes_m < self._upperbound)[0][-1]] = True self._upper_scale_alts = good return self._upper_scale_alts @property def lower_scale_alts(self): """ Altitudes in the lower altitude range used for profile scaling """ good = (self._altitudes_m > self._lowerbound) & (self._altitudes_m < self._lowerbound + self._lower_scale_range) if sum(good) == 0: # if no values in range, choose lowest possible altitude good[np.where(self._altitudes_m > self._lowerbound)[0][0]] = True self._lower_scale_alts = good return self._lower_scale_alts def propagate_wf(self, radiance) -> xr.Dataset: wf = radiance['wf_' + self.name()] # upper_scale_alts = (self._altitudes_m > (self._upperbound - self._upper_scale_range)) & \ # (self._altitudes_m < self._upperbound) above_scale_alts = self._altitudes_m >= self._upperbound Ns = np.sum(self.upper_scale_alts) if Ns is None: Ns = 1 if len(wf.shape) == 3: new_wf = wf * np.exp(self._values)[np.newaxis, np.newaxis, :] # wf_above = new_wf[:, self.upper_scale_alts].sum(axis=1) # new_wf[:, :, self.upper_scale_alts] = new_wf[:, :, self.upper_scale_alts] + wf_above[:, np.newaxis] / Ns good = ~self._zero_mask & self.retrieval_alts() return new_wf[:, :, good] else: new_wf = wf * np.exp(self._values)[np.newaxis, :] new_wf[:, above_scale_alts] *= new_wf[:, above_scale_alts] * np.exp(self._values)[np.newaxis, above_scale_alts] wf_above = new_wf[:, above_scale_alts].sum(axis=1) new_wf[:, self.upper_scale_alts] = new_wf[:, self.upper_scale_alts] + wf_above[:, np.newaxis] / Ns good = ~self._zero_mask & self.retrieval_alts() return new_wf[:, good]
[docs] class StateVectorProfileParticleSize(StateVectorAerosolProfile): def __init__(self, altitudes_m: np.array, values: np.array, species_name: str, optical_property: sk.OpticalProperty, size_type: str, lowerbound: float, upperbound: float, second_order_tikhonov_factor: np.ndarray = np.zeros(2), second_order_tikhonov_altitude: np.ndarray = np.array((0.0, 100_000.0)), bounding_factor=1e4, zero_factor: float = 1e-20, max_update_factor: float = 5): self._aerosol_species_name = species_name self._type = size_type self.min_radius = 0.04 self.max_radius = 0.5 self.min_width = 1.05 self.max_width = 2.3 if self._type == 'lognormal_medianradius': self._speciesid = 'SKCLIMATOLOGY_LOGNORMAL_MODERADIUS_MICRONS' elif self._type == 'lognormal_modewidth': self._speciesid = 'SKCLIMATOLOGY_LOGNORMAL_MODEWIDTH' super().__init__(altitudes_m=altitudes_m, values=values, species_name=species_name, optical_property=optical_property, lowerbound=lowerbound, upperbound=upperbound, second_order_tikhonov_factor=second_order_tikhonov_factor, second_order_tikhonov_altitude=second_order_tikhonov_altitude, bounding_factor=bounding_factor, zero_factor=zero_factor, max_update_factor=max_update_factor) def _internal_update(self): """ Updates the internal climatology """ altitudes, values = self._climatology_altitudes_and_density() self._optical_property.particlesize_climatology[self._speciesid] = values def name(self) -> str: return self._aerosol_species_name + '_' + self._type def max_value(self): if self._type == 'lognormal_modewidth': return self.max_width if self._type == 'lognormal_medianradius': return self.max_radius def min_value(self): if self._type == 'lognormal_modewidth': return self.min_width if self._type == 'lognormal_medianradius': return self.min_radius # def update_state(self, x: np.array): # # m_factors = np.exp(x - (self._values[~self._zero_mask])) # m_factors[m_factors > self._max_update_factor] = self._max_update_factor # m_factors[m_factors < 1 / self._max_update_factor] = 1 / self._max_update_factor # self._values[~self._zero_mask] = np.log(copy(m_factors * np.exp(self._values[~self._zero_mask]))) # # if self._type == 'lognormal_modewidth': # self._values[self._values > np.log(self.max_width)] = np.log(self.max_width) # self._values[self._values < np.log(self.min_width)] = np.log(self.min_width) # if self._type == 'lognormal_medianradius': # self._values[self._values > np.log(self.max_radius)] = np.log(self.max_radius) # self._values[self._values < np.log(self.min_radius)] = np.log(self.min_radius) # self._internal_update()
[docs] def add_to_atmosphere(self, atmosphere: sk.Atmosphere): """ Adds the species to the atmosphere Parameters ---------- atmosphere: sk.Atmosphere """ aer_wf = self._aerosol_species_name + '_' + self._type if atmosphere.wf_species is None: atmosphere.wf_species = [aer_wf] if aer_wf not in atmosphere.wf_species: old_wf_species = copy(atmosphere.wf_species) old_wf_species.append(aer_wf) atmosphere.wf_species = old_wf_species
class StateVectorCloudProfile(StateVectorAerosolProfile): def max_value(self): return 0.01 class StateVectorProfileEffectiveRadius(StateVectorProfileParticleSize): def __init__(self, altitudes_m: np.array, values: np.array, species_name: str, optical_property: sk.OpticalProperty, lowerbound: float, upperbound: float, second_order_tikhonov_factor: np.ndarray = np.zeros(2), second_order_tikhonov_altitude: np.ndarray = np.array((0.0, 100_000.0)), bounding_factor=1e4, zero_factor: float = 1e-20, max_update_factor: float = 5): self._aerosol_species_name = species_name self.min_radius = 0.04 self.max_radius = 0.5 self.min_width = 1.05 self.max_width = 2.3 super().__init__(altitudes_m=altitudes_m, values=values, species_name='lognormal_effective_radius', optical_property=optical_property, lowerbound=lowerbound, upperbound=upperbound, second_order_tikhonov_factor=second_order_tikhonov_factor, second_order_tikhonov_altitude=second_order_tikhonov_altitude, bounding_factor=bounding_factor, zero_factor=zero_factor, size_type='lognormal_medianradius', max_update_factor=max_update_factor) def name(self): return self._aerosol_species_name + '_' + self._species_name def max_value(self): return 1000.0 def min_value(self): return 0.0 def lognormal_params_from_effective_radius(self, reff): # log(sg) = a * log(rm) + b from 2009-2019 LPC fine mode data p = np.array([-0.30773813, -0.40106275]) rg_line = np.arange(0.04, 0.25, 0.001) sg_line = np.exp(p[0] * np.log(rg_line) + p[1]) reff_line = rg_line * np.exp(5 / 2 * (np.log(sg_line) ** 2)) rg = np.interp(reff, reff_line, rg_line, left=rg_line[0], right=rg_line[-1]) sg = np.interp(reff, reff_line, sg_line, left=sg_line[0], right=sg_line[-1]) return sg, rg def update_state(self, x: np.array): m_factors = np.exp(x - (self._values[~self._zero_mask])) m_factors[m_factors > self._max_update_factor] = self._max_update_factor m_factors[m_factors < 1 / self._max_update_factor] = 1 / self._max_update_factor new_values = np.log(copy(m_factors * np.exp(self._values[~self._zero_mask]))) self._values[~self._zero_mask] = np.log(copy(m_factors * np.exp(self._values[~self._zero_mask]))) self._internal_update() def _internal_update(self): """ Updates the internal climatology """ altitudes, values = self._climatology_altitudes_and_density() sg, rg = self.lognormal_params_from_effective_radius(values) self._optical_property.particlesize_climatology['SKCLIMATOLOGY_LOGNORMAL_MODERADIUS_MICRONS'] = sg self._optical_property.particlesize_climatology['SKCLIMATOLOGY_LOGNORMAL_MODEWIDTH'] = rg def add_to_atmosphere(self, atmosphere: sk.Atmosphere): """ Adds the species to the atmosphere Parameters ---------- atmosphere: sk.Atmosphere """ rm_wf = self._aerosol_species_name + '_' + 'lognormal_medianradius' sg_wf = self._aerosol_species_name + '_' + 'lognormal_modewidth' if atmosphere.wf_species is None: atmosphere.wf_species = [rm_wf, sg_wf] if rm_wf not in atmosphere.wf_species: old_wf_species = copy(atmosphere.wf_species) old_wf_species.append(rm_wf) atmosphere.wf_species = old_wf_species if sg_wf not in atmosphere.wf_species: old_wf_species = copy(atmosphere.wf_species) old_wf_species.append(sg_wf) atmosphere.wf_species = old_wf_species def propagate_wf(self, radiance) -> xr.Dataset: # wf = radiance['wf_' + self.name()] altitudes, values = self._climatology_altitudes_and_density() sg, rm = self.lognormal_params_from_effective_radius(values) wf_rg = f'wf_{self._aerosol_species_name}_lognormal_medianradius' wf_sg = f'wf_{self._aerosol_species_name}_lognormal_modewidth' x = np.sqrt(np.log(values / rm)) wf = (radiance[wf_rg] * np.exp(-5 / 2 * (np.log(sg) ** 2)) + radiance[wf_sg] * np.exp(np.sqrt(2 / 5) * x) / (np.sqrt(10) * values * x)) if len(wf.shape) == 3: new_wf = wf * np.exp(self._values)[np.newaxis, np.newaxis, :] return new_wf[:, :, ~self._zero_mask] else: new_wf = wf * np.exp(self._values)[np.newaxis, :] return new_wf[:, ~self._zero_mask]