Source code for ali.instrument.sensor

from skretrieval.core.sensor.imager import SpectralImager, SpectralImagerFast
from sasktran import Geometry
from skretrieval.core.sensor import OpticalGeometry
from skretrieval.core.lineshape import LineShape, Rectangle, DeltaFunction, Gaussian, UserLineShape
from skretrieval.core.radianceformat import RadianceSpectralImage
from skcomponents.optics import (VerticalPolarizer, HorizontalPolarizer, AOTF,
                                 CompositeComponent, IdealLCR, LinearPolarizer, LiquidCrystalRotator)
from ali.instrument.polarizer import MeadowlarkOWLPolarizer
from ali.instrument.aotf import Brimrose20mmAOTF, BrimroseAOTFSingleChannel, ER2AOTF
from ali.instrument.lcr import ArcOptixLCR
from ali.instrument.lineshape import ALILineShape, ALIER2LineShape
from ali.instrument.sensors import Sensor, Cardinal1280, Cardinal640, RaptorOWL640N, TeledyneCCD97
from ali.instrument.lenses import Mirror12AOI, Mirror45AOI, CodeVLens, GoldMirror
from skcomponents.processing import PhotonIntegration
import numpy as np
import xarray as xr
from typing import Union, List
from skretrieval.util import rotation_matrix
from sasktran import LineOfSight


[docs] class ALISensor(SpectralImagerFast): """ Parameters ---------- wavelength_nm: np.ndarray Array of wavelengths that will be measuremed. The input radiances should set to accurately samples these values. pixel_vert_fov: LineShape Vertical lineshape of the spatial point spread function. pixel_horiz_fov: LineShape Horizontal lineshape of the spatial point spread function. image_horiz_fov: float Horizontal field of view of the full image in degrees. image_vert_fov: float Vertical field of view of the full image in degrees. num_columns: int Number of columns in the sensor. num_rows: int Number of rows in the sensor ideal_optics: bool If True, optics are assumed to be lossless and LCR produces a perfect polarization response. Default False collapse_images: bool If True, measurements are concatenated along the wavelength dimension. If False (default) each wavelength is returned as an element of List[RadianceSpectralImage], as would be true in a real instrument. central_aotf_wavelength: bool Sets the wavelength shift and stretch of diffraction efficiency. aperture_effective_area_cm2: float Effective aperture area of the instrument. Along with the `image_horiz_fov` and `image_vert_fov` sets the etendue. single_channel_aotf: bool If `True` use the 10mm Brimrose AOTF. If `False` use the 20mm dual channel AOTF. """ def __init__(self, wavelength_nm: np.ndarray, pixel_vert_fov: LineShape = None, pixel_horiz_fov: LineShape = None, image_horiz_fov: float = 5.0, image_vert_fov: float = 1.5, num_columns: int = 1, num_rows: int = 100, ideal_optics: bool = False, collapse_images: bool = False, central_aotf_wavelength: float = 850.0, central_lcr_wavelength: float = 850.0, aperture_effective_area_cm2: float = 0.4626, single_channel_aotf: bool = True, straylight: float = 0.0): if num_columns == 1: horiz_mode = 'constant' vert_mode = 'linear' else: vert_mode = 'linear' horiz_mode = 'linear' if pixel_vert_fov is None: # pixel_vert_fov = Rectangle(0.0056 * np.pi / 180, mode=vert_mode) pixel_vert_fov = Gaussian(stdev=0.0056 * np.pi / 180, mode=vert_mode) if straylight > 0.0: x = np.linspace(-np.pi / 2, np.pi / 2, 1000) straylight_fov_vert = UserLineShape(x_values=x, line_values=np.cos(x), zero_centered=False, mode='integrate') straylight_fov_horiz = UserLineShape(x_values=x, line_values=np.cos(x), zero_centered=False, mode='simple') # pixel_vert_fov = pixel_vert_fov + straylight_fov_vert # pixel_horiz_fov = pixel_horiz_fov + straylight_fov_horiz else: straylight_fov_vert = None straylight_fov_horiz = None # pixel_vert_fov = pixel_vert_fov + straylight_fov # elif straylight < 0.0: # x = np.linspace(-np.pi/2, np.pi/2, 1000) # pixel_vert_fov = UserLineShape(x_values=x, line_values=np.cos(x), zero_centered=True, mode='integrate') if pixel_horiz_fov is None: pixel_horiz_fov = Rectangle(0.015 * np.pi / 180, mode=horiz_mode) # pixel_horiz_fov = Gaussian(0.025 * np.pi / 180, mode=horiz_mode) wavelength_nm = np.array(wavelength_nm) self.spectral_lineshape = ALILineShape() self.spectral_lineshape_area = np.ones(wavelength_nm.shape, dtype=float) for idx, wavel in enumerate(wavelength_nm): self.spectral_lineshape_area[idx] = self.spectral_lineshape.area(wavel) super().__init__(wavelength_nm, self.spectral_lineshape, pixel_vert_fov, pixel_horiz_fov, image_horiz_fov, image_vert_fov, num_columns, num_rows, straylight_scalar=straylight, pixel_vert_fov_straylight=straylight_fov_vert, pixel_horiz_fov_straylight=straylight_fov_horiz, ) self._apply_post_processing = True self.apply_calibration = True self._add_noise = True self._add_dark_current = True self._add_adc = True self._use_ideal_optics = ideal_optics self._aperture_area_cm2 = aperture_effective_area_cm2 # self._aperture_fov_sr = aperture_field_of_view_sr self._exposure_time = 1.0 self._ccd_temperature = 273.15 self._simulate_pixel_averaging = 0 self._gain = 0 self._collapse_images = collapse_images self._central_aotf_wavelength = central_aotf_wavelength self._central_lcr_wavelength = central_lcr_wavelength self._subsampled_detector = True self._single_channel_aotf = single_channel_aotf self.twist_angle_off = 0 self.twist_angle_on = 90 self.sensor = Sensor() self._optics = self._create_optics() self.auto_exposure = False self.max_exposure = 60 self.save_level0_signal = True self.level0 = None self.straylight = straylight self.save_diagnostics = False self.diagnostics = {'settings': {}, 'radiance': {}, 'signal': {}} @property def ccd(self): return self.sensor @ccd.setter def ccd(self, value): if value.lower().replace('_', '').replace(' ', '') == 'cardinal640': self.sensor = Cardinal640() elif value.lower().replace('_', '').replace(' ', '') == 'cardinal1280': self.sensor = Cardinal1280() elif value.lower().replace('_', '').replace(' ', '') == 'raptorowl640n': self.sensor = RaptorOWL640N() elif value.lower().replace('_', '').replace(' ', '') == 'teledyneccd97': self.sensor = TeledyneCCD97() else: raise ValueError('Sensor not recognized') self.sensor.add_noise = self._add_noise self.sensor.add_adc = self._add_adc self.sensor.add_dark_current = self._add_dark_current self.sensor.exposure_time = self._exposure_time self._optics = self._create_optics() # update quantum efficiency @property def apply_post_processing(self): return self._apply_post_processing @apply_post_processing.setter def apply_post_processing(self, value: bool): self._apply_post_processing = value if not self._apply_post_processing: self.apply_calibration = False @property def gain(self): return self.sensor.gain @gain.setter def gain(self, value): self.sensor.gain = value @property def add_noise(self) -> bool: """ If True, simulate readout and shot noise on the measurements """ return self._add_noise @add_noise.setter def add_noise(self, value: bool): self._add_noise = value self.sensor.add_noise = value @property def add_dark_current(self) -> bool: """ If True, simulate dark current on the measurements """ return self._add_dark_current @add_dark_current.setter def add_dark_current(self, value: bool): self._add_dark_current = value self.sensor.add_dark_current = value @property def add_adc(self) -> bool: """ If True, simulate the analog-to-digital conversion of the signal """ return self._add_adc @add_adc.setter def add_adc(self, value: bool): self._add_adc = value self.sensor.add_adc = value @property def exposure_time(self) -> Union[float, np.ndarray]: """ Exposure time of the measurement in seconds. May be an array if exposure has multiple wavelengths. """ return self._exposure_time @exposure_time.setter def exposure_time(self, value: Union[float, np.ndarray]): self._exposure_time = value self.sensor.exposure_time = value @property def ccd_temperature(self) -> float: """ Temperature of the sensor in kelvin. """ return self._ccd_temperature @ccd_temperature.setter def ccd_temperature(self, value: float): self._ccd_temperature = value self.sensor.ccd_temperature = value @property def use_ideal_optics(self): return self._use_ideal_optics @use_ideal_optics.setter def use_ideal_optics(self, value): self._use_ideal_optics = value self._optics = self._create_optics() @property def optics(self): return CompositeComponent([o for o in self._optics.values()]) @property def rotator_is_on(self): return self._optics['rotator'].twist_angle == self.twist_angle_on def _create_optics(self): if self.use_ideal_optics: return self._create_ideal_optics() else: return self._create_real_optics() def _create_real_optics(self): # 'mirror-45aoi': Mirror45AOI(), # 'mirror-12aoi': Mirror12AOI(), # 'lens-codev': CodeVLens(), comps = {} comps['rotator'] = ArcOptixLCR(twist_angle=0, thickness=3, reference_wavelength=self._central_lcr_wavelength) comps['frontend-polarizer'] = MeadowlarkOWLPolarizer(orientation=90) if self._single_channel_aotf: comps['aotf'] = BrimroseAOTFSingleChannel(central_wavelength=self._central_aotf_wavelength) else: comps['aotf'] = Brimrose20mmAOTF() comps['backend-polarizer'] = MeadowlarkOWLPolarizer(orientation=0) comps['quantum-efficiency'] = self.sensor.quantum_efficiency() return comps # return {'rotator': ArcOptixLCR(twist_angle=0, thickness=1), # 'frontend-polarizer': MeadowlarkOWLPolarizer(orientation=90), # 'aotf': BrimroseAOTFSingleChannel(central_wavelength=self._central_aotf_wavelength), # 'backend-polarizer': MeadowlarkOWLPolarizer(orientation=0), # 'quantum-efficiency': self.sensor.quantum_efficiency()} def _create_ideal_optics(self): return {'rotator': IdealLCR(twist_angle=0), 'frontend-polarizer': LinearPolarizer(orientation=90), 'aotf': AOTF(), 'backend-polarizer': LinearPolarizer(orientation=0)} @property def photon_integrator(self): if self._subsampled_detector: num_cols = self.sensor.num_columns num_rows = self.sensor.num_rows else: num_rows = self.num_rows num_cols = self.num_columns p = PhotonIntegration(aperture_area_cm2=self._aperture_area_cm2, num_rows=num_rows, num_cols=num_cols, horizontal_fov=self.horizontal_fov, vertical_fov=self.vertical_fov, integration_time=self._exposure_time, wavelength_area=self.spectral_lineshape_area) return p
[docs] def turn_rotator_on(self): """ Turn the liquid crystal rotator on. """ self._optics['rotator'].twist_angle = self.twist_angle_on
[docs] def turn_rotator_off(self): """ Turn the liquid crystal rotator off. """ self._optics['rotator'].twist_angle = self.twist_angle_off
def calibrate_signal(self, model_value, model_wavel_nm): # convert from ADC count back to electrons (L1) if self.add_adc: model_value.data['radiance'].values = model_value.data['radiance'].values * self.sensor.adc.adu model_value.data['error'].values = model_value.data['error'].values * self.sensor.adc.adu # remove dark current if self.add_dark_current: if type(self.exposure_time) is np.ndarray: dc = xr.DataArray(self.exposure_time * self.sensor.dark_current.electrons_per_second, dims=['wavelength'], coords=[model_value.data.wavelength.values]) model_value.data['radiance'].values = (model_value.data['radiance'] - dc).values else: dc = self.exposure_time * self.sensor.dark_current.electrons_per_second model_value.data['radiance'].values = model_value.data['radiance'].values - dc # convert from electrons to radiance model_value.data['radiance'].values = (model_value.data['radiance'].values / self.photon_integrator.scale_factor[:, np.newaxis, np.newaxis]) model_value.data['error'].values = (model_value.data['error'].values / self.photon_integrator.scale_factor[:, np.newaxis, np.newaxis]) # undo transmission effects g = self.g_parameters(model_wavel_nm) # scale_factor = np.abs(g[0]) + np.abs(g[1]) scale_factor = g[0] model_value.data['radiance'].values = model_value.data['radiance'].values / scale_factor[:, np.newaxis, np.newaxis] model_value.data['error'].values = model_value.data['error'].values / scale_factor[:, np.newaxis, np.newaxis] if 'wf' in model_value.data.keys(): model_value.data['wf'].values = model_value.data['wf'].values / scale_factor[:, np.newaxis, np.newaxis, np.newaxis] return model_value def g_parameters(self, model_wavel_nm: np.array = None): if model_wavel_nm is None: model_wavel_nm = self._wavelength_nm if type(model_wavel_nm) is float: model_wavel_nm = np.array([model_wavel_nm]) a = np.ones((4, len(model_wavel_nm)), dtype=float) for idx, wavel in enumerate(model_wavel_nm): a[:, idx] = self.optics.matrix(wavel)[0] g = np.ones((4, len(self._wavelength_nm)), dtype=float) for midx, meas_wavel in enumerate(self._wavelength_nm): w = self.spectral_lineshape.integration_weights(meas_wavel, model_wavel_nm, normalize=True) g[:, midx] = a @ w return g
[docs] def model_radiance(self, optical_geometry: OpticalGeometry, model_wavel_nm: np.array, model_geometry: Geometry, radiance: np.array, wf=None) -> Union[RadianceSpectralImage, List[RadianceSpectralImage]]: if self.save_diagnostics: self.diagnostics['settings']['model_geometry'] = model_geometry self.diagnostics['settings']['optical_geometry'] = optical_geometry self.diagnostics['settings']['model_wavel_nm'] = model_wavel_nm self.diagnostics['settings']['optical_axes'] = self.optical_geometries(optical_geometry) self.diagnostics['radiance']['frontend_radiance'] = radiance self.diagnostics['noise'] = {} # pass radiance through optical chain if any(['wf' in key for key in radiance.keys()]): wf_var = [key for key in radiance.keys() if 'wf' in key] radiance, wf = self.optics.model_radiance(optical_geometry, model_wavel_nm, model_geometry, radiance.drop_vars(wf_var), radiance[wf_var]) elif wf is not None: radiance, wf = self.optics.model_radiance(optical_geometry, model_wavel_nm, model_geometry, radiance, wf) else: radiance = self.optics.model_radiance(optical_geometry, model_wavel_nm, model_geometry, radiance) # if self.straylight > 0: # radiance['I'] += radiance.I.quantile(0.95) * self.straylight # radiance['Q'] += radiance.Q.quantile(0.95) * self.straylight if self.save_diagnostics: self.diagnostics['radiance']['post_optics_radiance'] = radiance # convert radiance to detected signal model_value = super().model_radiance(optical_geometry, model_wavel_nm, model_geometry, radiance, wf=wf) if self.save_diagnostics: self.diagnostics['signal']['post_sensor_signal'] = model_value.data['radiance'].copy() self.diagnostics['settings']['exposure_time'] = self.exposure_time # process the signal through electronic components if self.apply_post_processing: if np.any(np.isnan(model_value.data['radiance'].values)): print('Nan value encountered') model_value.data['radiance'].values = self.photon_integrator.process_signal(model_value.data['radiance'].values) if self.auto_exposure: max_val = model_value.data['radiance'].max(dim=['ny', 'nx']) scale = (self.sensor._max_well_depth / 2) / max_val if self.photon_integrator.integration_time * scale.values > self.max_exposure: # use scale/scale to maintain dimensions scale = scale * (self.max_exposure / self.photon_integrator.integration_time) / scale model_value.data['radiance'].values = (model_value.data['radiance'] * scale).values self.exposure_time = self.photon_integrator.integration_time * scale.values self.photon_integrator.integration_time = self.exposure_time if type(self.exposure_time) is np.ndarray: exp = self.exposure_time else: exp = np.ones_like(self.measurement_wavelengths()) * self.exposure_time model_value.data['exposure_time'] = xr.DataArray(exp, dims=['wavelength'], coords=[model_value.data.wavelength.values]) model_value.data['error'] = xr.ones_like(model_value.data['radiance']) model_value.data['error'].values = self.sensor.noise_estimate(model_value.data['radiance'].values) if np.any(np.isnan(model_value.data['radiance'].values)): print('Nan value encountered') if self.save_diagnostics: self.diagnostics['signal']['post_photon-integration_signal'] = model_value.data['radiance'].copy() self.diagnostics['signal']['post_photon-integration_error'] = model_value.data['error'].copy() self.diagnostics['settings']['exposure_time'] = self.exposure_time # update in case of auto exposure if self._simulate_pixel_averaging: rad = model_value.data['radiance'].values * 0 for i in range(self._simulate_pixel_averaging): rad += self.sensor.process_signal(model_value.data['radiance'].values) model_value.data['radiance'].values = rad / self._simulate_pixel_averaging model_value.data['error'].values = model_value.data['error'].values / np.sqrt(self._simulate_pixel_averaging) else: model_value.data['radiance'].values = self.sensor.process_signal(model_value.data['radiance'].values) if self.save_diagnostics: self.diagnostics['signal']['post_electronics_signal'] = model_value.data['radiance'].copy() self.diagnostics['signal']['post_electronics_error'] = model_value.data['error'].copy() if self.save_level0_signal: self.level0 = model_value.data.copy(deep=True) if self.apply_calibration: # calibrate the signal from L0 -> L1 model_value = self.calibrate_signal(model_value, model_wavel_nm) if self.save_diagnostics: self.diagnostics['signal'][f'post_calibration_signal'] = model_value.data['radiance'].copy() self.diagnostics['signal'][f'post_calibration_error'] = model_value.data['error'].copy() if not self._collapse_images and (len(model_value.data.wavelength.values) > 1): model_values = [] for wavel in model_value.data.wavelength.values: model_values.append(RadianceSpectralImage(model_value.to_gridded().data.sel(wavelength=float(wavel)), num_columns=self.num_columns)) return model_values if len(model_value.data.wavelength.values) == 1: model_value = RadianceSpectralImage(model_value.to_gridded().data.sel(wavelength=float(model_value.data.wavelength.values)), num_columns=self.num_columns) return model_value
[docs] def measurement_geometry(self, optical_geometry, num_columns: int = None, num_rows: int = None): if self.straylight: los = super().optical_geometries(optical_geometry, num_columns, num_rows) left = np.cross(optical_geometry.look_vector, optical_geometry.local_up) left /= np.linalg.norm(left) for angle in np.arange(-90, 90, 0.2) * np.pi / 180: if np.abs(angle) < (self.vertical_fov / 2 * (np.pi / 180)): continue r = rotation_matrix(left, angle) l = r @ optical_geometry.look_vector u = np.cross(left, l) u /= np.linalg.norm(u) o = OpticalGeometry(look_vector=l, observer=optical_geometry.observer, local_up=u, mjd=optical_geometry.mjd) los.append(o) angles = [] for l in los: angles.append(np.sign(np.dot(l.look_vector, optical_geometry.local_up)) * np.arccos(np.dot(l.look_vector, optical_geometry.look_vector)) * 180 / np.pi) angles = np.array(angles) sidx = np.argsort(angles) return [LineOfSight(mjd=los[idx].mjd, observer=los[idx].observer, look_vector=los[idx].look_vector) for idx in sidx] else: return super().measurement_geometry(optical_geometry, num_columns, num_rows)
[docs] class ALISensorER2(ALISensor): """ ALI sensor class designed to simulate the elegant breadboard version of ALI. This includes a single channel AOTF designed to measure between 800 and 1500nm. The optics include a 3-bounce mirror to simulate the periscope needed for the ER-2 mounting. """ def __init__(self, wavelength_nm: np.ndarray, pixel_vert_fov: LineShape = None, pixel_horiz_fov: LineShape = None, image_horiz_fov: float = 5.02, image_vert_fov: float = 6.28, num_columns: int = 1, num_rows: int = 100, ideal_optics: bool = False, collapse_images: bool = False, central_aotf_wavelength: float = 850.0, central_lcr_wavelength: float = 850.0, aperture_effective_area_cm2: float = 0.4626, single_channel_aotf: bool = True, straylight: float = 0.0): super().__init__(wavelength_nm=wavelength_nm, pixel_vert_fov=pixel_vert_fov, pixel_horiz_fov=pixel_horiz_fov, image_horiz_fov=image_horiz_fov, image_vert_fov=image_vert_fov, num_columns=num_columns, num_rows=num_rows, ideal_optics=ideal_optics, collapse_images=collapse_images, central_aotf_wavelength=central_aotf_wavelength, central_lcr_wavelength=central_lcr_wavelength, aperture_effective_area_cm2=aperture_effective_area_cm2, single_channel_aotf=single_channel_aotf, straylight=straylight) wavelength_nm = np.array(wavelength_nm) self.spectral_lineshape = ALIER2LineShape() self.spectral_lineshape_area = np.ones(wavelength_nm.shape, dtype=float) for idx, wavel in enumerate(wavelength_nm): self.spectral_lineshape_area[idx] = self.spectral_lineshape.area(wavel) self.ccd = 'raptorowl640n' def _create_real_optics(self): comps = {} comps['periscope'] = GoldMirror() + GoldMirror() + GoldMirror() comps['front-optics'] = Mirror12AOI() + Mirror12AOI() + Mirror12AOI() + Mirror12AOI() comps['rotator'] = ArcOptixLCR(twist_angle=0, thickness=3, reference_wavelength=self._central_lcr_wavelength) comps['frontend-polarizer'] = MeadowlarkOWLPolarizer(orientation=90) comps['aotf'] = ER2AOTF() comps['backend-polarizer'] = MeadowlarkOWLPolarizer(orientation=0) comps['back-optics'] = Mirror12AOI() + Mirror12AOI() + Mirror12AOI() + Mirror12AOI() comps['quantum-efficiency'] = self.sensor.quantum_efficiency() return comps