"""The :code:`aerosol` module contains structures to make the aerosol properties
required by DISORT.
"""
# TODO: add parameter validators if these classes are ok
import numpy as np
from pyrt.eos import _Altitude
from scipy.interpolate import interp2d
[docs]class Conrath:
r"""A structure to compute a Conrath profile(s).
Conrath creates Conrath profile(s) on an input grid of altitudes given a
volumetric mixing ratio and Conrath parameters.
"""
def __init__(self, altitude: np.ndarray, q0: np.ndarray,
scale_height: np.ndarray, nu: np.ndarray) -> None:
r"""
Parameters
----------
altitude
The altitude [km] at which to construct a Conrath profile.
q0
The surface mixing ratio for each of the Conrath profiles.
scale_height
The scale height [km] of each of the Conrath profiles.
nu
The nu parameter of each of the Conrath profiles.
Raises
------
TypeError
Raised if any of the inputs are not a numpy.ndarray.
ValueError
Raised if many things...
Notes
-----
The Conrath profile is defined as
.. math::
q(z) = q_0 * e^{\nu(1 - e^{z/H})}
where :math:`q` is a volumetric mixing ratio, :math:`z` is the altitude,
:math:`\nu` is the Conrath nu parameter, and :math:`H` is the scale
height.
For an MxN array of pixels, :code:`q0`, :code:`scale_height`, and
:code:`nu` should be of shape MxN. :code:`altitude` should have shape
ZxMxN, where Z is the number of altitudes. Additionally, the units of
:code:`altitude` and :code:`scale_height` should be the same.
"""
self.__altitude = _Altitude(altitude, 'altitude')
self.__q0 = _ConrathParameter(q0, 'q0', 0, np.inf)
self.__H = _ConrathParameter(scale_height, 'scale_height', 0, np.inf)
self.__nu = _ConrathParameter(nu, 'nu', 0, np.inf)
self.__raise_value_error_if_inputs_have_differing_shapes()
self.__profile = self.__make_profile()
def __raise_value_error_if_inputs_have_differing_shapes(self) -> None:
if not (self.__altitude.shape[1:] == self.__q0.shape ==
self.__H.shape == self.__nu.shape):
message = 'All inputs must have the same shape along the pixel ' \
'dimension.'
raise ValueError(message)
def __make_profile(self) -> np.ndarray:
altitude_scaling = self.__altitude.val / self.__H.val
return self.__q0.val * np.exp(self.__nu.val *
(1 - np.exp(altitude_scaling)))
@property
def profile(self) -> np.ndarray:
"""Get the Conrath profile(s). This will have the same shape as
``altitude``.
"""
return self.__profile
class _ConrathParameter:
def __init__(self, parameter: np.ndarray, name: str,
low: float, high: float) -> None:
self.__param = parameter
self.__name = name
self.__low = low
self.__high = high
self.__raise_error_if_param_is_bad()
def __raise_error_if_param_is_bad(self) -> None:
self.__raise_type_error_if_not_ndarray_or_float()
self.__raise_value_error_if_not_in_range()
def __raise_type_error_if_not_ndarray_or_float(self) -> None:
if not isinstance(self.__param, (int, float, np.ndarray)):
message = f'{self.__name} must be a float or a numpy.ndarray.'
raise TypeError(message)
def __raise_value_error_if_not_in_range(self) -> None:
if not (np.all(self.__low <= self.__param) and
(np.all(self.__param <= self.__high))):
message = f'{self.__name} must be between {self.__low} and ' \
f'{self.__high}.'
raise ValueError(message)
def __getattr__(self, method):
return getattr(self.val, method)
@property
def val(self) -> np.ndarray:
"""Get the value of the input wavelength.
"""
return np.squeeze(np.array([self.__param]))
# TODO: right now this just copies ConrathParameter. Think of a more general
# name or make the classes different.
class _UniformParameter:
def __init__(self, parameter: np.ndarray, name: str) -> None:
self.__param = parameter
self.__name = name
self.__raise_error_if_param_is_bad()
def __raise_error_if_param_is_bad(self) -> None:
self.__raise_type_error_if_not_ndarray()
def __raise_type_error_if_not_ndarray_or_float(self) -> None:
if not isinstance(self.__param, (float, np.ndarray)):
message = f'{self.__name} must be a float or numpy.ndarray.'
raise TypeError(message)
def __getattr__(self, method):
return getattr(self.val, method)
@property
def val(self) -> np.ndarray:
"""Get the value of the input wavelength.
"""
return np.squeeze(np.array([self.__param]))
class _ForwardScatteringProperty:
"""An abstract class to check something is a forward scattering property.
"""
def __init__(self, parameter: np.ndarray, particle_size: np.ndarray,
wavelength: np.ndarray, name: str) -> None:
self.__parameter = parameter
self.__particle_size = particle_size
self.__wavelength = wavelength
self.__name = name
self.__raise_error_if_inputs_are_bad()
def __raise_error_if_inputs_are_bad(self) -> None:
self.__raise_type_error_if_parameter_is_not_ndarray()
self.__raise_value_error_if_shapes_do_not_match()
def __raise_type_error_if_parameter_is_not_ndarray(self) -> None:
if not isinstance(self.__parameter, np.ndarray):
message = f'{self.__name} must be a numpy.ndarray.'
raise TypeError(message)
def __raise_value_error_if_shapes_do_not_match(self) -> None:
if np.ndim(self.__parameter) != 2:
message = f'{self.__name} must be a 2D array.'
raise ValueError(message)
if self.__parameter.shape[0] != self.__particle_size.shape[0]:
raise ValueError('parameter does not match particle size shape')
if self.__parameter.shape[1] != self.__wavelength.shape[0]:
raise ValueError('parameter does not match wavelength shape.')
@property
def parameter(self) -> np.ndarray:
return self.__parameter
@property
def particle_size(self) -> np.ndarray:
return self.__particle_size
@property
def wavelength(self) -> np.ndarray:
return self.__wavelength
class _Interpolator:
"""A structure to get values over a grid.
Interpolator accepts an array of coefficients, along with the 1D particle
size and wavelength grid over which they're defined. It contains methods
to interpolate onto a new grid.
"""
def __init__(self, coefficient: np.ndarray,
particle_size_grid: np.ndarray, wavelength_grid: np.ndarray,
particle_size: np.ndarray, wavelength: np.ndarray) -> None:
"""
Parameters
----------
coefficient
ND array of coefficients to interpolate over. axis=-1 is assumed
to be the wavelength dimension and axis=-2 is assumed to be the
particle size dimension.
particle_size_grid
1D array of particle sizes over which the above properties are
defined.
wavelength_grid
1D array of wavelengths over which the above properties are defined.
particle_size
1D array of particle sizes to regrid the properties onto.
wavelength
1D array of wavelengths to regrid the properties onto.
"""
self.__coeff = coefficient
self.__pgrid = particle_size_grid
self.__wgrid = wavelength_grid
self.__particle_size = particle_size
self.__wavelength = wavelength
def nearest_neighbor(self) -> np.ndarray:
"""Get the input coefficients at the input particle sizes and
wavelengths using nearest neighbor interpolation to the input particle
size and wavelength grid.
"""
size_indices = self.__get_nearest_indices(self.__pgrid,
self.__particle_size)
wavelength_indices = self.__get_nearest_indices(self.__wgrid,
self.__wavelength)
return np.take(np.take(self.__coeff, size_indices, axis=-2),
wavelength_indices, axis=-1)
def linear(self) -> np.ndarray:
intpf = np.zeros((self.__coeff.shape[0], self.__particle_size.shape[0], self.__wavelength.shape[0]))
for i in range(len(intpf)):
pf = interp2d(self.__pgrid, self.__wgrid, self.__coeff[i, :, :].T)
intpf[i, :, :] = pf(self.__particle_size, self.__wavelength).T
return intpf
@staticmethod
def __get_nearest_indices(grid: np.ndarray, values: np.ndarray) \
-> np.ndarray:
# grid should be 1D; values can be ND
return np.abs(np.subtract.outer(grid, values)).argmin(0)
[docs]class ForwardScattering:
"""A structure to store forward scattering properties.
ForwardScattering will simply hang on to forward scattering properties and
provides methods to regrid them into the input particle size profile and
wavelengths.
"""
def __init__(self, scattering_cross_section: np.ndarray,
extinction_cross_section: np.ndarray,
asymmetry_parameter: np.ndarray,
particle_size_grid: np.ndarray, wavelength_grid: np.ndarray,
particle_size: np.ndarray,
wavelength: np.ndarray,
reference_wavelength: float) -> None:
"""
Parameters
----------
scattering_cross_section
2D array of the scattering cross section (the 0th axis is assumed to
be the particle size axis, while the 1st axis is assumed to be the
wavelength axis).
extinction_cross_section
2D array of the extinction cross section with same dims as above.
asymmetry_parameter
2D array of the Henyey-Greenstein asymmetry parameter with the same
dims as above.
particle_size_grid
1D array of particle sizes over which the above properties are
defined.
wavelength_grid
1D array of wavelengths over which the above properties are defined.
particle_size
1D array of particle sizes to regrid the properties onto.
wavelength
1D array of wavelengths to regrid the properties onto.
"""
self.__c_sca = _ForwardScatteringProperty(
scattering_cross_section, particle_size_grid, wavelength_grid,
'scattering')
self.__c_ext = _ForwardScatteringProperty(
extinction_cross_section, particle_size_grid, wavelength_grid,
'extinction')
self.__asymmetry_parameter = asymmetry_parameter
self.__particle_size = particle_size
self.__wavelength = wavelength
self.__wave_ref = reference_wavelength
self.__gridded_c_scattering = np.array([])
self.__gridded_c_extinction = np.array([])
self.__gridded_ssa = np.array([])
self.__extinction = np.array([])
[docs] def make_nn_properties(self) -> None:
"""Make the forward scattering properties at the nearest neighbor
particle sizes and wavelengths.
"""
self.__gridded_c_scattering = self.__make_gridded_c_scattering('nn')
self.__gridded_c_extinction = self.__make_gridded_c_extinction('nn')
self.__gridded_ssa = self.__make_gridded_ssa()
self.__extinction = self.__make_extinction_grid('nn')
[docs] def make_linear_properties(self) -> None:
"""Make the forward scattering properties at the input particle sizes
and wavelengths by linearly interpolating them onto the input grid.
.. warning::
Due to a known bug, this does not work!
"""
self.__gridded_c_scattering = self.__make_gridded_c_scattering('linear')
self.__gridded_c_extinction = self.__make_gridded_c_extinction('linear')
self.__gridded_ssa = self.__make_gridded_ssa()
self.__extinction = self.__make_extinction_grid('linear')
def __make_gridded_c_scattering(self, kind: str) -> np.ndarray:
interp = _Interpolator(self.__c_sca.parameter,
self.__c_sca.particle_size,
self.__c_sca.wavelength, self.__particle_size,
self.__wavelength)
return self.__interpolate_by_kind(interp, kind)
def __make_gridded_c_extinction(self, kind: str) -> np.ndarray:
interp = _Interpolator(self.__c_ext.parameter,
self.__c_ext.particle_size,
self.__c_ext.wavelength, self.__particle_size,
self.__wavelength)
return self.__interpolate_by_kind(interp, kind)
def __make_gridded_ssa(self):
return self.__gridded_c_scattering / self.__gridded_c_extinction
def __make_extinction_grid(self, kind: str) -> np.ndarray:
"""Make a grid of the extinction cross section at a reference wavelength.
This is the extinction cross section at the input wavelengths divided by
the extinction cross section at the reference wavelength.
"""
interp = _Interpolator(self.__c_ext.parameter,
self.__c_ext.particle_size,
self.__c_ext.wavelength, self.__particle_size,
np.array([self.__wave_ref]))
extinction_profile = np.squeeze(self.__interpolate_by_kind(interp, kind))
return (self.__gridded_c_extinction.T / extinction_profile).T
@staticmethod
def __interpolate_by_kind(interp: _Interpolator, kind: str) -> np.ndarray:
# TODO: in python3.10 do a switch case
if kind == 'nn':
return interp.nearest_neighbor()
elif kind == 'linear':
return interp.linear()
@property
def scattering_cross_section(self) -> np.ndarray:
"""Get the scattering cross section on the new grid.
"""
return self.__gridded_c_scattering
@property
def extinction_cross_section(self) -> np.ndarray:
"""Get the extinction cross section on the new grid.
"""
return self.__gridded_c_extinction
@property
def single_scattering_albedo(self) -> np.ndarray:
"""Get the single scattering albedo on the new grid.
"""
return self.__gridded_ssa
@property
def extinction(self) -> np.ndarray:
"""Get the extinction coefficient at the reference wavelength on the new
grid.
"""
return self.__extinction
@property
def asymmetry_parameter(self) -> np.ndarray:
"""Get the asymmetry parameter on the new grid.
"""
return self.__asymmetry_parameter
[docs]class OpticalDepth:
"""A structure to compute the optical depth given profiles.
OpticalDepth accepts a mixing ratio profile and atmospheric column density
profile to compute the optical depth at each wavelength. It also accepts
an extinction profile to scale the optical depth to a reference wavelength,
and ensures that the sum over the layers is equal to the total column
integrated optical depth.
"""
def __init__(self, mixing_ratio_profile: np.ndarray,
column_density_layers: np.ndarray,
extinction: np.ndarray,
column_integrated_optical_depth: float) -> None:
"""
Parameters
----------
mixing_ratio_profile
1D array of mixing ratio.
column_density_layers
1D array of the column density in the layers.
extinction
2D array of extinction coefficients.
column_integrated_optical_depth
The total column integrated optical depth.
"""
self.__q_prof = mixing_ratio_profile
self.__colden = column_density_layers
self.__extinction = extinction
self.__OD = column_integrated_optical_depth
self.__optical_depth = self.__calculate_total_optical_depth()
def __calculate_total_optical_depth(self) -> np.ndarray:
normalization = np.sum(self.__q_prof * self.__colden)
profile = self.__q_prof * self.__colden * self.__OD / normalization
return (profile * self.__extinction.T).T
@property
def total(self) -> np.ndarray:
"""Get the total optical depth in each of the layers.
"""
return self.__optical_depth
[docs]class TabularLegendreCoefficients:
"""Create a grid of Legendre coefficients (particle size, wavelength).
TabularLegendreCoefficients accepts a 3D array of Legendre polynomial
coefficients of the phase function that depend on particle size and
wavelength and casts them to an array of the proper shape for use in DISORT
given a vertical particle size gradient.
"""
def __init__(self, coefficients: np.ndarray, particle_size_grid: np.ndarray,
wavelength_grid: np.ndarray, particle_size_profile: np.ndarray,
wavelengths: np.ndarray, max_moments: int = None) -> None:
"""
Parameters
----------
coefficients
3D array of Legendre coefficients that depend on particle
size and wavelength. It is assumed to have shape [n_moments,
particle_size_grid, wavelength_grid]
particle_size_grid
1D array of particle sizes associated with the coefficients matrix.
wavelength_grid
1D array of wavelengths associated with the coefficients matrix.
wavelengths
ND array of wavelengths where to cast coefficients to.
particle_size_profile
1D array of particle sizes.
max_moments
The maximum number of coefficients to use in the array. If None,
all available coefficients are used.
"""
self.__coefficients = coefficients
self.__particle_grid = particle_size_grid
self.__wavelength_grid = wavelength_grid
self.__particle_profile = particle_size_profile
self.__wavelengths = wavelengths
self.__n_layers = self.__make_n_layers()
self.__n_moments = self.__make_n_moments(max_moments)
self.__phase_function = np.array([])
def __make_n_layers(self) -> int:
return len(self.__particle_grid)
def __make_n_moments(self, max_moments) -> int:
return self.__coefficients.shape[0] if max_moments is None else max_moments
def __normalize_coefficients(self, unnormalized_coefficients):
coeff = unnormalized_coefficients[:self.__n_moments, :]
moment_indices = np.linspace(0, self.__n_moments-1, num=self.__n_moments)
normalization = moment_indices * 2 + 1
return (coeff.T / normalization).T
[docs] def make_nn_phase_function(self) -> None:
"""Make the phase function at the nearest neighbor particle sizes and
wavelengths.
"""
self.__make_phase_function('nn')
[docs] def make_linear_phase_function(self) -> None:
"""Make the phase function by linearly interpolating between the input
grid of particle sizes and wavelengths.
"""
self.__make_phase_function('linear')
def __make_phase_function(self, kind: str) -> None:
interp = _Interpolator(self.__coefficients, self.__particle_grid,
self.__wavelength_grid, self.__particle_profile,
self.__wavelengths)
# TODO: in python3.10 do a switch case
if kind == 'nn':
pf = interp.nearest_neighbor()
elif kind == 'linear':
pf = interp.linear()
self.__phase_function = self.__normalize_coefficients(pf)
@property
def phase_function(self) -> np.ndarray:
"""Get the Legendre coefficients cast to the proper grid.
"""
return self.__phase_function
[docs]class HenyeyGreenstein:
"""Hold the HenyeyGreenstein asymmetry parameters.
HenyeyGreenstein holds the asymmetry parameters and ensures they're valid.
It provides a method to convert these to Legendre polynomials.
"""
def __init__(self, asymmetry: np.ndarray) -> None:
r"""
Parameters
----------
asymmetry
The asymmetry parameter.
Raises
------
ValueError
Raised if the asymmetry parameter is not between -1 and 1.
Notes
-----
The Henyey-Greenstein phase function is defined as follows
.. math::
p(\theta) = \frac{1}{4\pi} \frac{1 - g^2}{[1 + g^2 -2g \cos(\theta)]^{3/2}}
"""
self.__g = asymmetry
_HGAsymmetryParameter(asymmetry)
[docs] def legendre_decomposition(self, n_moments: int) -> np.ndarray:
r"""Get the Legendre decomposition of the asymmetry parameters up to
a given number of moments.
Parameters
----------
n_moments
The maximum number of moments to get the coefficients for (not
including the 0th moment).
Notes
-----
The Legendre decomposition is actually quite simple. The phase function
can be decomposed as follows
.. math::
p(\mu) = \sum_{n=0}^{\infty} (2n + 1)g^n P_n(\mu)
where :math:`n` is the moment number, :math:`g` is the asymmetry
parameter, and :math:`P_n(\mu)` is the :math:`n`:sup:`th` Legendre
polynomial. I'm not a mathematician so if :math:`g=0` I still assume
the 0 :sup:`th` coefficient is 1.
"""
moments = np.linspace(0, n_moments-1, num=n_moments)
coeff = (2 * moments + 1) * np.power.outer(self.__g, moments)
return np.moveaxis(coeff, -1, 0)
class _HGAsymmetryParameter:
def __init__(self, asymmetry: np.ndarray) -> None:
self.__g = asymmetry
self.__raise_value_error_if_parameter_not_in_valid_range()
def __raise_value_error_if_parameter_not_in_valid_range(self) -> None:
if np.any(self.__g > 1) or np.any(self.__g < -1):
raise ValueError('asymmetry must be in range [-1, 1].')