Source code for desietc.calculator

"""High-level calculator for online exposure-time forecasts.
"""
from __future__ import print_function, division

import numpy as np

import scipy.interpolate

import sklearn.gaussian_process


[docs]class Calculator(object): """Create a new calculator object for a single spectrograph exposure. Model the time-evolution of independent signal and background rate estimators, and use models to forecast the currently acheived signal-to- noise ratio (SNR) and the remaining exposure time. Register updates to the estimated signal and background rates using :meth:`update_signal` and :meth:`update_background`, respectively. The model is initialized with an initial guess at these rates which is gradually replaced by actual rate updates. The signal and background rate estimates are assumed to be statistically independent. Use :meth:`will_timeout`, :meth:`get_snr_now` and :meth:`get_remaining` to query the latest models. The target SNR is calculated as :math:`S/\sqrt(S+B)` where :math:`S = \\alpha s` and :math:`B = \\beta b` are calibrated versions of the estimates :math:`s` and :math:`b` passed to :meth:`update_signal` and :meth:`update_background`. The dimensions of :math:`a`, :math:`b`, :math:`\\alpha`, and :math:`\\beta` are arbitrary, but the combinations :math:`S = \\alpha s` and :math:`B = \\beta b` must be dimensionless and scaled appropriately to have counting statistics. The calibration constants :math:`\\alpha`, and :math:`\\beta` can have associated errors, :math:`\\sigma_{\\alpha}` and :math:`\\sigma_{\\beta}` that are propagated to SNR estimates. In general, the calibration depends on the program (DARK, GRAY, BRIGHT) and can be refined by comparing this object's predictions with the pipeline outputs from previous spetrograph exposures. Parameters ---------- alpha : float Constant conversion factor from raw signal rate to fiducial target signal rate. Must be > 0. dalpha : float Error on alpha. Must be > 0. beta : float Constant conversion factor from raw signal rate to fiducial target signal rate. Must be > 0. dbeta : float Error on beta. Must be > 0. sig0 : float Prior on raw (uncalibrated) signal rate. Must be > 0. dsig0 : float One sigma error on ``sig0``. Must be > 0. tcorr_sig : float Correlation time for changes in signal rate. Acts as a prior on how rapidly the signal rate changes. Must be > 0. bg0 : float Prior on raw (uncalibrated) background rate. Must be > 0. dbg0 : float One sigma error on ``bg0``. Must be > 0. tcorr_bg : float Correlation time for changes in background rate. Acts as a prior on how rapidly the background rate changes. Must be > 0. t0 : float Timestamp for when shutter was opened for the current exposure, in units of seconds with an arbitrary origin. snr_goal : float Value of calibrated SNR that we ideally want to achieve. Must be > 0. dtmax : float Maximum allowed total exposure time in seconds. Must be > 0. npred : int Number of equally spaced times spanning [0, dtmax] where predictions are calculated internally. """ def __init__(self, alpha, dalpha, beta, dbeta, sig0, dsig0, tcorr_sig, bg0, dbg0, tcorr_bg, t0, snr_goal, dtmax=4000., npred=401): self.t0 = t0 assert snr_goal > 0 self.snr_goal = snr_goal # Remember calibration constants. assert alpha > 0 and dalpha >= 0, 'Invalid alpha, dalpha' assert beta > 0 and dbeta >= 0, 'Invalid beta, dbeta' self.alpha = alpha self.beta = beta self.dalpha = dalpha self.dbeta = dbeta # Remember priors. assert sig0 > 0 and dsig0 > 0, 'Invalid sig0, dsig0' assert bg0 > 0 and dbg0 > 0, 'Invalid bg0, dbg0' assert tcorr_sig > 0 and tcorr_bg > 0, 'Invalid tcorr_sig, tcorr_bg' self.sig0 = sig0 self.dsig0 = dsig0 self.bg0 = bg0 self.dbg0 = dbg0 self.tcorr_sig = tcorr_sig self.tcorr_bg = tcorr_bg # Initialize uncalibrated signal and background rate estimates. self.sig = [] self.dsig = [] self.dtsig = [] self.bg = [] self.dbg = [] self.dtbg = [] # Initialize times relative to t0 where model is predicted. self.dt_pred = np.linspace(0., dtmax, npred) # Initialize models and our prediction. self.sig_pred, self.dsig_pred, self.sig_model = self._update_model( [], [], [], self.sig0, self.dsig0, self.tcorr_sig) self.bg_pred, self.dbg_pred, self.bg_model = self._update_model( [], [], [], self.bg0, self.dbg0, self.tcorr_bg) self._update_snr()
[docs] def update_signal(self, tsig, sig, dsig): """Update the signal estimate. Can be called with timestamps ``tsig`` in any order. Parameters ---------- tsig : float Timestamp in seconds for this signal rate estimate, using the same (arbitrary) origin as the ``t0`` passed to the constructor. The elapsed time ``tsig - t0`` must be >= 0. sig : float Estimated raw (uncalibrated) signal rate at ``tsig``. Must be > 0. dsig : float Estimated 1-sigma error on the value of ``sig``. Must be > 0. """ assert tsig >= self.t0, 'Expected tsig >= t0' assert sig > 0 and dsig > 0, 'Invalid sig, dsig' self.sig.append(sig) self.dsig.append(dsig) self.dtsig.append(tsig - self.t0) # Update signal rate model. self.sig_pred, self.dsig_pred, self.sig_model = self._update_model( self.dtsig, self.sig, self.dsig, self.sig0, self.dsig0, self.tcorr_sig) # Update estimates of calibrated SNR. self._update_snr()
[docs] def update_background(self, tbg, bg, dbg): """Update the background estimate. Can be called with timestamps ``tbg`` in any order. Parameters ---------- tbg : float Timestamp in seconds for this background rate estimate, using the same (arbitrary) origin as the ``t0`` passed to the constructor. The elapsed time ``tbg - t0`` must be >= 0. bg : float Estimated raw (uncalibrated) background rate at ``tbg``. Must be > 0. dbg : float Estimated 1-sigma error on the value of ``bg``. Must be > 0. """ assert tbg >= self.t0, 'Expected tbg >= t0' assert bg > 0 and dbg > 0, 'Invalid bg, dbg' self.bg.append(bg) self.dbg.append(dbg) self.dtbg.append(tbg - self.t0) # Update background rate model. self.bg_pred, self.dbg_pred, self.bg_model = self._update_model( self.dtbg, self.bg, self.dbg, self.bg0, self.dbg0, self.tcorr_bg) # Update estimates of calibrated SNR. self._update_snr()
[docs] def will_timeout(self): """Is this exposure expected to time out before reaching its goal SNR? Based on all signal and background rate updates recorded so far. Returns ------- bool True if the current exposure is not expected to achieve ``snr_goal`` with an exposure duration less than ``dtmax``. """ return self.dt_goal >= self.dt_pred[-1]
[docs] def get_remaining(self, tnow): """Forecast the remaining exposure time in seconds. Based on all signal and background rate updates recorded so far. Parameters ---------- tnow : float Current time used for forecasting. Must be >= t0. Returns ------- float Remaining time in seconds, which might be < 0 if the goal SNR has already been achieved. """ assert tnow >= self.t0, 'Expected tnow >= t0' return self.dt_goal - (tnow - self.t0)
[docs] def get_snr_now(self, tnow, CL=0.6827, nsamples=1000, gen=None): """Estimate the current integrated SNR. Based on all signal and background rate updates recorded so far. This calculation is relatively expensive since it requires generating random samples from the signal and background rate models. Parameters ---------- tnow : float Current time used for forecasting. Must be between t0, t0+dtmax. CL : float Confidence level to use for the returned range. Must be 0-1. nsamples : int Number of random samples of the signal and background rate models to generate. A larger number gives more accurate ranges but takes longer to run. Must be > 0. gen : numpy.random.RandomState or None Use the specified random number generator for reproducible results. Returns ------- tuple Tuple (lo, hi) containing the true integrated SNR with posterior probability CL. """ assert tnow >= self.t0, 'Expected tnow >= t0' assert tnow <= self.t0 + self.dt_pred[-1], 'Expected tnow <= t0 + dtmax' assert 0 < CL < 1, 'Invalid CL' assert nsamples > 0, 'Expected nsamples > 0' # Generate relative timestamps covering [0, tnow - t0]. last = np.argmax(self.t0 + self.dt_pred >= tnow) assert self.t0 + self.dt_pred[last] >= tnow assert last == 0 or (self.t0 + self.dt_pred[last - 1] < tnow) dt = self.dt_pred[:last + 1].copy() dt[last] = tnow - self.t0 # Generate random realizations of SNR at tnow. _, _, snr_now = self.get_samples(dt, nsamples, gen) assert snr_now.shape == (nsamples, last + 1) # Estimate percentiles that cover a central fraction CL. lo = 50 * (1.0 - CL) cuts = (lo, 100 - lo) assert np.allclose(cuts[1] - cuts[0], 100 * CL) return np.percentile(snr_now[:, -1], cuts)
[docs] def _update_model(self, dt, rate, drate, rate0, drate0, tcorr): """Internal method to update a rate model. This method is used by both :meth:`update_signal` and :meth:`update_background` to update their respective models. Call :meth:`update_snr` after calling this method to calculate the updated SNR. Parameters ---------- dt : array 1D array of N elapsed times in seconds since ``self.t0``, not necessarily in increasing order. An empty array (N=0) is allowed. rate : array 1D array of N raw (uncalibrated) rate estimates corresponding to each ``dt`` value. Must be > 0. drate : array 1D array of N 1-sigma errors on each ``rate`` value. Must be > 0. rate0 : float Prior on the raw (uncalibrated) rate that is combined with the rate estimates to obtain a posterior estimate of the rate. Must be > 0. drate0 : float 1-sigma error on the prior value ``rate0``. Must be > 0. tcorr : float Correlation time in seconds for changes in the raw rate. Acts as a prior on how rapidly the rate changes. Must be > 0. Returns ------- tuple Tuple (pred, dpred, gp) where pred and dpred are arrays of predicted rates and their 1-sigma errors for each elapsed time in ``self.dt_pred``, and ``gp`` is the updated Gaussian process model. """ dt = np.asarray(dt) rate = np.asarray(rate) drate = np.asarray(drate) assert dt.shape == rate.shape == drate.shape # Fit a Gaussian process regression model. kernel = ( sklearn.gaussian_process.kernels.ConstantKernel(drate0 ** 2) * sklearn.gaussian_process.kernels.RBF(tcorr)) gp = sklearn.gaussian_process.GaussianProcessRegressor( # See https://github.com/scikit-learn/scikit-learn/pull/10026 kernel=kernel, optimizer=None, alpha=drate ** 2) if len(dt): gp.fit(dt.reshape(-1, 1), rate - rate0) # Evaluate the model. pred, dpred = gp.predict(self.dt_pred.reshape(-1, 1), return_std=True) pred += rate0 return pred, dpred, gp
[docs] def _eval_snr(self, t, S, B): """Evaluate the SNR for calibrated rates S and B at increasing times t. Automatically broadcasts over input arrays whose last index corresponds to time. Parameters ---------- t : array 1D array of N increasing times in seconds. S : array Array with shape (...,N) of calibrated signal rates in counts per second at each time. B : array Array with shape (...,N) of calibrated background rates in counts per second at each time. Returns ------- array Array with shape (...,N) of cummulative signal-to-noise ratios at each time calculated as :math:`S / np.sqrt(S + B)`. Any time when :math:`S+B \le 0` will have SNR = 0. """ assert len(t.shape) == 1, 't must be 1D array' # Use trapezoid rule to integrate cummulatively over each interval. tstep = np.diff(t) assert np.all(tstep > 0) assert S.shape == B.shape, 'S, B must have same shape' Scum = np.cumsum(0.5 * tstep * (S[...,:-1] + S[...,1:]), axis=-1) Bcum = np.cumsum(0.5 * tstep * (B[...,:-1] + B[...,1:]), axis=-1) snr = np.zeros_like(S) nonzero = Scum + Bcum > 0 snr[...,1:][...,nonzero] = Scum[nonzero] / np.sqrt( Scum[nonzero] + Bcum[nonzero]) return snr
[docs] def get_samples(self, dt, nsamples, gen=None): """Generate random samples of our signal and background rate models. Parameters ---------- dt : array of floats Array of times where models should be sampled. Times are in seconds relative to the exposure start. Must all be between 0 and dtmax. nsamples : int Number of independent samples to generate. Must be > 0. gen : numpy.random.RandomState or None Use the specified random number generator for reproducible results. Returns ------- tuple Tuple (S, B, snr) of calibrated signal and background rate samples, and the corresponding SNR values. Each is an array of floats with dimensions (nsamples, len(dt)). """ if gen is None: gen = np.random.RandomState() dt = np.asarray(dt) assert (np.all(dt >= 0) and np.all(dt <= self.dt_pred[-1])), 'dt not in range [0, dtmax]' assert nsamples > 0, 'Expected nsamples > 0' # Generate random realizations of uncalibrated signal, bg rates # from the latest Gaussian process rate models. X = dt.reshape(-1, 1) S_samples = ( self.sig0 + self.sig_model.sample_y(X, nsamples, gen)).T B_samples = ( self.bg0 + self.bg_model.sample_y(X, nsamples, gen)).T # Apply calibration with random errors. if self.dalpha > 0: S_samples *= gen.normal( loc=self.alpha, scale=self.dalpha, size=(nsamples, 1)) else: S_samples *= self.alpha if self.dbeta > 0: B_samples *= gen.normal( loc=self.beta, scale=self.dbeta, size=(nsamples, 1)) else: B_samples *= self.beta # Calcuate SNR for each (S,B) pair. snr_samples = self._eval_snr(dt, S_samples, B_samples) return S_samples, B_samples, snr_samples
[docs] def _update_snr(self): """Internal method to update SNR forecast. Uses the the most recent updates to the signal and background models to forecast the calibrated SNR out to ``dtmax``. This forecast is then used to estimate the current SNR and the time remaining until ``snr_goal`` is achieved. Sets the flag ``attr:timeout`` if the updated model indicates that this exposure will not complete before ``dtmax`` is reached. """ # Calculate nominal calibrated signal and background rates. S = self.alpha * np.asarray(self.sig_pred) B = self.beta * np.asarray(self.bg_pred) # Evaluate the corresponding nominal SNR model. snr = self._eval_snr(self.dt_pred, S, B) assert np.all(np.diff(snr) >= 0), 'nominal SNR is not increasing' # Estimate when the nominal SNR model hits the SNR goal using # linear interpolation. # If the result equals self.dt_pred[-1], this indicates that # the exposure is not expected to reach its SNR goal within the # maximum allowed exposure time. interpolator = scipy.interpolate.interp1d( snr, self.dt_pred, kind='cubic', assume_sorted=True, bounds_error=False, fill_value=self.dt_pred[-1]) self.dt_goal = interpolator(self.snr_goal)