Source code for pyrt.legendre

"""The legendre module provides functions for making Legendre decompositions."""
from warnings import warn
import numpy as np
from scipy import integrate


class _PhaseFunction(np.ndarray):
    """Designate an array as representing a phase function.

    Parameters
    ----------
    array
        Any phase function.

    Raises
    ------
    ValueError
        Raised if the input array is not 1D or if it contains negative,
        infinite, or NaN values.

    """
    def __new__(cls, array: np.ndarray):
        obj = np.asarray(array).view(cls)
        return obj

    def __array_finalize__(self, obj: np.ndarray):
        self.__raise_value_error_if_array_is_not_positive_finite(obj)
        self.__raise_value_error_if_array_is_not_1d(obj)

    @staticmethod
    def __raise_value_error_if_array_is_not_positive_finite(obj: np.ndarray):
        if ((obj < 0) | ~np.isfinite(obj)).any():
            message = 'Some values in the phase function are negative or ' \
                      'not finite.'
            raise ValueError(message)

    @staticmethod
    def __raise_value_error_if_array_is_not_1d(obj: np.ndarray):
        if obj.ndim != 1:
            message = 'The phase function should be 1D.'
            raise ValueError(message)


class _PhaseFunctionAngles(np.ndarray):
    """Designate an array as representing the angles where the phase function is
    defined.

    Parameters
    ----------
    array
        The angles where the phase function is defined.

    Raises
    ------
    ValueError
        Raised if the input array is not 1D; if contains negative, infinite,
        or NaN values; or if it is not monotonically increasing.

    """

    def __new__(cls, array: np.ndarray):
        obj = np.asarray(array).view(cls)
        return obj

    def __array_finalize__(self, obj: np.ndarray):
        self.__raise_value_error_if_array_is_not_between_0_and_pi(obj)
        self.__raise_value_error_if_array_is_not_1d(obj)
        self.__raise_value_error_if_array_is_not_monotonically_inc(obj)

    @staticmethod
    def __raise_value_error_if_array_is_not_between_0_and_pi(obj: np.ndarray):
        if ((obj < 0) | (obj > np.pi)).any():
            message = 'Some values in the phase function angles are not ' \
                      'between 0 and pi.'
            raise ValueError(message)

    @staticmethod
    def __raise_value_error_if_array_is_not_1d(obj: np.ndarray):
        if obj.ndim != 1:
            message = 'The phase function angles should be 1D.'
            raise ValueError(message)

    @staticmethod
    def __raise_value_error_if_array_is_not_monotonically_inc(obj: np.ndarray):
        if ~np.all(np.diff(obj) > 0):
            message = 'The phase function angles must be monotonically ' \
                      'increasing.'
            raise ValueError(message)


class _PhaseFunctionBundle:
    """Bundle together the phase function and its associated angles.

    Parameters
    ----------
    phase_function
        The phase function.
    angles
        The angles where the phase function is defined.

    Raises
    ------
    ValueError
        Raised if either :code:`phase_function` or :code:`angles` are unphysical
        or if they do not have the same shape.

    """
    def __init__(self, phase_function: np.ndarray, angles: np.ndarray):
        self.pf = _PhaseFunction(phase_function)
        self.angles = _PhaseFunctionAngles(angles)

        self.__raise_value_error_if_inputs_are_not_same_size()

    def __raise_value_error_if_inputs_are_not_same_size(self):
        if self.pf.shape != self.angles.shape:
            message = 'The phase function and its associated angles must ' \
                      'have the same shape.'
            raise ValueError(message)

    def resample_phase_function(self, n_samples) -> np.ndarray:
        return np.interp(self.resample_angles(n_samples), self.angles, self.pf)

    def resample_angles(self, n_samples) -> np.ndarray:
        return np.linspace(self.angles[0], self.angles[-1], num=n_samples)

    def normalize_resampled_phase_function(self, n_samples):
        resampled_pf = self.resample_phase_function(n_samples)
        resampled_angles = self.resample_angles(n_samples)
        resampled_norm = np.abs(integrate.simps(resampled_pf,
                                                np.cos(resampled_angles)))
        return 2 * resampled_pf / resampled_norm


class _NMoments(int):
    """Designate that a number represents the number of moments.

    Parameters
    ----------
    value
        The number of moments to use.

    Raises
    ------
    TypeError
        Raised if the input cannot be converted into an int.
    ValueError
        Raised if the number of samples is not positive.
    """

    def __new__(cls, value: int, *args, **kwargs):
        if value <= 0:
            raise ValueError("The number of moments must be positive.")
        return super(cls, cls).__new__(cls, value)


class _NSamples(int):
    """Designate that a number represents the number of samples.

    Parameters
    ----------
    value
        The number of samples to use.

    Raises
    ------
    TypeError
        Raised if the input cannot be converted into an int.
    ValueError
        Raised if the number of samples is not positive.
    """

    def __new__(cls, value: int, *args, **kwargs):
        if value <= 0:
            raise ValueError("The number of samples must be positive.")
        return super(cls, cls).__new__(cls, value)


class _LegendreDecomposer:
    """ A collection of methods for decomposing a phase function into
    polynomials.

    This class can decompose a phase function into Legendre polynomials. In
    principle it can decompose any function into those polynomials but it's set
    up specifically for that task.

    Parameters
    ----------
    phase_function
    angles
    n_moments
    n_samples

    Raises
    ------
    TypeError
        Raised if any of the inputs cannot be cast to the correct shape.
    ValueError
        Raised in any of the inputs are unphysical.

    """
    def __init__(self, phase_function: np.ndarray, angles: np.ndarray,
                 n_moments: int, n_samples: int):
        self.bundle = _PhaseFunctionBundle(phase_function, angles)
        self.n_moments = _NMoments(n_moments)
        self.n_samples = _NSamples(n_samples)

        # Fit P(x) = c0 + c1*L1(x) + ... where I force c0 = 1 for DISORT
        self.resamp_norm_pf = \
            self.bundle.normalize_resampled_phase_function(self.n_samples) - 1
        self.lpoly = self._make_legendre_polynomials()

    def _make_legendre_polynomials(self) -> np.ndarray:
        """Make an array of Legendre polynomials at the input angles.

        Notes
        -----
        This returns a 2D array. The 0th index is the i+1 polynomial and the
        1st index is the angle. So index [2, 6] will be the 3rd Legendre
        polynomial (L3) evaluated at the 6th angle

        """
        resampled_theta = self.bundle.resample_angles(self.n_samples)
        ones = np.ones((self.n_moments, self.n_samples))

        # This creates an MxN array with 1s on the diagonal and 0s elsewhere
        diag_mask = np.triu(ones) + np.tril(ones) - 1

        # Evaluate the polynomials at the input angles. I don't know why
        return np.polynomial.legendre.legval(
            np.cos(resampled_theta), diag_mask)[1:self.n_moments, :]

    def decompose(self) -> np.ndarray:
        """Decompose the phase function into its Legendre moments.

        """
        normal_matrix = self._make_normal_matrix()
        normal_vector = self.__make_normal_vector()
        cholesky = np.linalg.cholesky(normal_matrix)
        first_solution = np.linalg.solve(cholesky, normal_vector)
        second_solution = np.linalg.solve(cholesky.T, first_solution)
        coeff = np.concatenate((np.array([1]), second_solution))
        self._warn_if_negative_coefficients(coeff)
        return coeff

    def _make_normal_matrix(self) -> np.ndarray:
        return np.sum(self.lpoly[:, None, :] * self.lpoly[None, :, :] /
                      self.resamp_norm_pf**2, axis=-1)

    def __make_normal_vector(self) -> np.ndarray:
        return np.sum(self.lpoly / self.resamp_norm_pf, axis=-1)

    def _warn_if_negative_coefficients(self, coeff):
        first_negative_index = self._get_first_negative_coefficient_index(coeff)
        if first_negative_index:
            message = f'Coefficient {first_negative_index} is negative.'
            warn(message)

    @staticmethod
    def _get_first_negative_coefficient_index(coeff: np.ndarray) -> np.ndarray:
        return np.argmax(coeff < 0)

    def filter_negative_coefficients(self, coeff: np.ndarray) -> np.ndarray:
        """Set all coefficients at the first negative one to 0.

        Parameters
        ----------
        coeff
            The Legendre coefficients.

        """
        first_negative_index = self._get_first_negative_coefficient_index(coeff)
        if first_negative_index:
            coeff[first_negative_index:] = 0
        return coeff


[docs]def decompose_phase_function( phase_function: np.ndarray, angles: np.ndarray, n_moments: int, n_samples: int) -> np.ndarray: """Decompose a phase function into its Legendre moments. Parameters ---------- phase_function The phase function to decompose. angles The angles where the phase function is defined. n_moments The number of Legendre moments to decompose the phase function into. n_samples The number of samples to use for the resampling. Must be greater than or equal to :code:`n_moments`. Examples -------- Let's suppose we have a Henyey-Greenstein phase function with asymmetry parameter of 0.6. Let's create its first 65 moments along with the angles where it's defined. >>> from pyRT_DISORT.aerosol import HenyeyGreenstein >>> analytic_moments = HenyeyGreenstein(0.6).legendre_decomposition(65) >>> angles = np.radians(np.linspace(0, 180, num=181)) We can convert it to a phase fuction with the following: >>> import numpy as np >>> phase = np.polynomial.legendre.legval(np.cos(angles), analytic_moments) If we decompose the phase fuction into Legendre coefficients, we should get what we started with. Let's decompose the phase fuction back into 65 moments and use 360 samples to make sure the resolution is good. >>> from pyRT_DISORT.legendre import decompose_phase_function >>> decomp_moments = decompose_phase_function(phase, angles, 65, 360) >>> np.amax((analytic_moments - decomp_moments)**2) 1.6443715589020334e-07 We've recovered the original moments pretty well! """ ld = _LegendreDecomposer(phase_function, angles, n_moments, n_samples) return ld.decompose()