Source code for mosfit.modules.arrays.diagonal

"""Definitions for the `Diagonal` class."""
from math import isnan

import numpy as np

from mosfit.modules.arrays.array import Array
from mosfit.utils import flux_density_unit


# Important: Only define one ``Module`` class per file.


[docs]class Diagonal(Array): """Calculate the diagonal/residuals for a model kernel.""" MIN_COV_TERM = 1.0e-30 def __init__(self, **kwargs): """Initialize module.""" super(Diagonal, self).__init__(**kwargs) self._observation_types = np.array([])
[docs] def process(self, **kwargs): """Process module.""" self.preprocess(**kwargs) self._model_observations = np.copy(kwargs['model_observations']) self._model_observations = self._model_observations[self._observed] ret = {} allowed_otypes = ['countrate', 'magnitude', 'fluxdensity', 'magcount'] if np.any([x not in allowed_otypes for x in self._o_types]): print([x for x in self._o_types if x not in allowed_otypes]) raise ValueError('Unrecognized observation type.') # Calculate (model - obs) residuals. residuals = np.array([ (abs(x - ct) if (not u and ct is not None) or ( not isnan(x) and ct is not None and x < ct) else 0.0) if (t == 'countrate' or t == 'magcount') else ((abs(x - y) if (not u and y is not None) or ( not isnan(x) and y is not None and x < y) else 0.0) if t == 'magnitude' else ((abs(x - fd) if (not u and fd is not None) or ( not isnan(x) and fd is not None and x > fd) else 0.0) if t == 'fluxdensity' else None)) for x, y, ct, fd, u, t in zip( self._model_observations, self._mags, self._cts, self._fds, self._upper_limits, self._o_types) ]) if np.any(residuals == None): # noqa: E711 raise ValueError('Null residual.') # Observational errors to be put in diagonal of error matrix. diag = [ ((ctel if (ct is not None and x > ct) else cteu)) if (t == 'countrate' or t == 'magcount') else ((el if (y is None or x > y) else eu)) if t == 'magnitude' else ((fdel if (fd is not None and x > fd) else fdeu)) if t == 'fluxdensity' else None for x, y, eu, el, fd, fdeu, fdel, ct, ctel, cteu, t in zip( self._model_observations, self._mags, self._e_u_mags, self._e_l_mags, self._fds, self._e_u_fds, self._e_l_fds, self._cts, self._e_l_cts, self._e_u_cts, self._o_types) ] diag = [0.0 if x is None else x for x in diag] diag = np.array(diag) ** 2 if np.any(diag == None): # noqa: E711 raise ValueError('Null error.') ret['kdiagonal'] = diag ret['kresiduals'] = residuals return ret
[docs] def preprocess(self, **kwargs): """Construct arrays of observations based on data keys.""" otypes = np.array(kwargs.get('observation_types', [])) if np.array_equiv( otypes, self._observation_types) and self._preprocessed: return self._observation_types = otypes self._mags = np.array(kwargs.get('magnitudes', [])) self._fds = np.array(kwargs.get('fluxdensities', [])) self._cts = np.array(kwargs.get('countrates', [])) self._e_u_mags = kwargs.get('e_upper_magnitudes', []) self._e_l_mags = kwargs.get('e_lower_magnitudes', []) self._e_mags = kwargs.get('e_magnitudes', []) self._e_u_fds = kwargs.get('e_upper_fluxdensities', []) self._e_l_fds = kwargs.get('e_lower_fluxdensities', []) self._e_fds = kwargs.get('e_fluxdensities', []) self._u_fds = kwargs.get('u_fluxdensities', []) self._e_u_cts = kwargs.get('e_upper_countrates', []) self._e_l_cts = kwargs.get('e_lower_countrates', []) self._e_cts = kwargs.get('e_countrates', []) self._u_cts = kwargs.get('u_countrates', []) self._upper_limits = np.array(kwargs.get('upperlimits', []), dtype=bool) self._observed = np.array(kwargs.get('observed', []), dtype=bool) self._o_types = self._observation_types[self._observed] # Magnitudes first # Note: Upper limits (censored data) currently treated as a # half-Gaussian, this is very approximate and can be improved upon. self._e_u_mags = [ kwargs['default_upper_limit_error'] if (e is None and eu is None and self._upper_limits[i]) else (kwargs['default_no_error_bar_error'] if (e is None and eu is None) else (e if eu is None else eu)) for i, (e, eu) in enumerate(zip(self._e_mags, self._e_u_mags)) ] self._e_l_mags = [ kwargs['default_upper_limit_error'] if (e is None and el is None and self._upper_limits[i]) else (kwargs['default_no_error_bar_error'] if (e is None and el is None) else (e if el is None else el)) for i, (e, el) in enumerate(zip(self._e_mags, self._e_l_mags)) ] # Ignore upperlimits for countrate if magnitude is present. self._upper_limits[self._observation_types[ self._observed] == 'magcount'] = False self._e_u_cts = [ c if (e is None and eu is None) else e if eu is None else eu for i, (c, e, eu) in enumerate(zip( self._cts, self._e_cts, self._e_u_cts)) ] self._e_l_cts = [ c if (e is None and el is None) else e if el is None else el for i, (c, e, el) in enumerate(zip( self._cts, self._e_cts, self._e_l_cts)) ] # Now flux densities self._e_u_fds = [ v if (e is None and eu is None and self._upper_limits[i]) else (v if (e is None and eu is None) else (e if eu is None else eu)) for i, (e, eu, v) in enumerate( zip(self._e_fds, self._e_u_fds, self._fds)) ] self._e_l_fds = [ 0.0 if self._upper_limits[i] else ( v if (e is None and el is None) else (e if el is None else el)) for i, (e, el, v) in enumerate( zip(self._e_fds, self._e_l_fds, self._fds)) ] self._fds = np.array([ x / flux_density_unit(y) if x is not None else None for x, y in zip(self._fds, self._u_fds) ]) self._e_u_fds = [ x / flux_density_unit(y) if x is not None else None for x, y in zip(self._e_u_fds, self._u_fds) ] self._e_l_fds = [ x / flux_density_unit(y) if x is not None else None for x, y in zip(self._e_l_fds, self._u_fds) ] self._preprocessed = True