Source code for mosfit.modules.arrays.kernel

"""Definitions for the `Kernel` class."""
from collections import OrderedDict

import numpy as np
from six import string_types

from mosfit.constants import ANG_CGS, C_CGS
from mosfit.modules.arrays.array import Array

# Important: Only define one ``Module`` class per file.


[docs]class Kernel(Array): """Calculate the kernel for use in computing the likelihood score.""" MIN_COV_TERM = 1.0e-30 def __init__(self, **kwargs): """Initialize module.""" super(Kernel, self).__init__(**kwargs) self._times = np.array([]) self._codeltatime = -1 self._codeltalambda = -1 self._type = kwargs.get('type', False)
[docs] def process(self, **kwargs): """Process module.""" self.preprocess(**kwargs) ret = OrderedDict() # If we are trying to krig between observations, we need an array with # dimensions equal to the number of intermediate observations. if self._type == 'full': kskey = 'kfmat' elif self._type == 'oa': kskey = 'koamat' elif self._type == 'ao': kskey = 'kaomat' else: kskey = 'kmat' # Get band variances self._variance = kwargs.get(self.key('variance'), 0.0) # Get array of real observations. self._observations = np.array([ ct if (t == 'countrate' or t == 'magcount') else y if (t == 'magnitude') else fd if t == 'fluxdensity' else None for y, ct, fd, t in zip(self._mags, self._cts, self._fds, self._o_otypes) ]) # Get array of model observations. self._model_observations = kwargs.get('model_observations', []) # Handle band-specific variances if that option is enabled. self._band_v_vars = OrderedDict() for key in kwargs: if key.startswith('variance-band-'): self._band_v_vars[key.split('-')[-1]] = kwargs[key] if self._variance_bands: self._o_variance_bands = [ self._variance_bands[i] for i in self._all_band_indices ] self._band_vs = np.array([ self._band_v_vars.get(i, self._variance) if isinstance( i, string_types) else (i[0] * self._band_v_vars.get(i[1][0], self._variance) + (1.0 - i[0]) * self._band_v_vars.get(i[1][0], self._variance)) for i in self._o_variance_bands ]) else: self._band_vs = np.full( len(self._all_band_indices), self._variance) # Compute relative errors for count-based observations. self._band_vs[self._count_inds] = ( 10.0**(self._band_vs[self._count_inds] / 2.5) - 1.0) * self._model_observations[self._count_inds] self._o_band_vs = self._band_vs[self._observed] if self._type == 'full': self._band_vs_1 = self._band_vs self._band_vs_2 = self._band_vs elif self._type == 'oa': self._band_vs_1 = self._o_band_vs self._band_vs_2 = self._band_vs elif self._type == 'ao': self._band_vs_1 = self._band_vs self._band_vs_2 = self._o_band_vs else: self._band_vs_1 = self._o_band_vs self._band_vs_2 = self._o_band_vs if self._codeltatime >= 0 or self._codeltalambda >= 0: kmat = np.outer(self._band_vs_1, self._band_vs_2) if self._codeltatime >= 0: kmat *= np.exp(self._dt2mat / self._codeltatime**2) if self._codeltalambda >= 0: kmat *= np.exp(self._dl2mat / self._codeltalambda**2) ret[kskey] = kmat else: ret['abandvs'] = self._band_vs ret['obandvs'] = self._o_band_vs return ret
[docs] def receive_requests(self, **requests): """Receive requests from other ``Module`` objects.""" self._average_wavelengths = requests.get('average_wavelengths', []) self._variance_bands = requests.get('variance_bands', [])
[docs] def preprocess(self, **kwargs): """Construct kernel distance arrays.""" new_times = np.array(kwargs.get('all_times', []), dtype=float) self._codeltatime = kwargs.get(self.key('codeltatime'), -1) self._codeltalambda = kwargs.get(self.key('codeltalambda'), -1) if np.array_equiv(new_times, self._times) and self._preprocessed: return self._times = new_times self._all_band_indices = kwargs.get('all_band_indices', []) self._are_bands = np.array(self._all_band_indices) >= 0 self._freqs = kwargs.get('all_frequencies', []) self._mags = np.array(kwargs.get('magnitudes', [])) self._fds = np.array(kwargs.get('fluxdensities', [])) self._cts = np.array(kwargs.get('countrates', [])) self._u_freqs = kwargs.get('all_u_frequencies', []) self._waves = np.array([ self._average_wavelengths[bi] if bi >= 0 else C_CGS / self._freqs[i] / ANG_CGS for i, bi in enumerate(self._all_band_indices) ]) self._observed = np.array(kwargs.get('observed', []), dtype=bool) self._observation_types = kwargs.get('observation_types') self._n_obs = len(self._observed) self._count_inds = self._observation_types != 'magnitude' self._o_times = self._times[self._observed] self._o_waves = self._waves[self._observed] self._o_otypes = self._observation_types[self._observed] if self._type == 'full': self._times_1 = self._times self._times_2 = self._times self._waves_1 = self._waves self._waves_2 = self._waves elif self._type == 'oa': self._times_1 = self._o_times self._times_2 = self._times self._waves_1 = self._o_waves self._waves_2 = self._waves elif self._type == 'ao': self._times_1 = self._times self._times_2 = self._o_times self._waves_1 = self._waves self._waves_2 = self._o_waves else: self._times_1 = self._o_times self._times_2 = self._o_times self._waves_1 = self._o_waves self._waves_2 = self._o_waves # Time deltas (radial distance) for covariance matrix. if self._codeltatime >= 0: self._dt2mat = self._times_1[:, None] - self._times_2[None, :] self._dt2mat **= 2 self._dt2mat *= -0.5 # Wavelength deltas (radial distance) for covariance matrix. if self._codeltalambda >= 0: self._dl2mat = self._waves_1[:, None] - self._waves_2[None, :] self._dl2mat **= 2 self._dl2mat *= -0.5 self._preprocessed = True