Source code for squirrel.diagnostics

"""This module contains class and functions for diagnostics."""

import numpy as np
import matplotlib.pyplot as plt
from copy import deepcopy
from numpy.polynomial import legendre
from ppxf.ppxf import ppxf
from ppxf.ppxf_util import convolve_gauss_hermite
from tqdm.notebook import tqdm

from .util import is_positive_definite
from .util import get_nearest_positive_definite_matrix


[docs] class Diagnostics(object): """This class contains functions to diagnose the performance of the pipeline."""
[docs] @staticmethod def get_specific_signal_and_noise(spectra, mask, z_factor=1.0): """Get the mean signal per (wavelength unit)^(1/2) and noise of the spectra. :param spectra: spectra object :type spectra: squirrel.data.Spectra :param mask: mask for spectra :type mask: numpy.ndarray :param z_factor: multiplicative factor for wavelength (e.g., 1 + z), if the SNR is computed at a different frame :type z_factor: float :return: mean signal per (wavelength unit)^(1/2), and noise :rtype: float, float """ # Extract the signal from the spectra using the provided mask signal_slice = spectra.flux[mask] total_signal = np.sum(signal_slice) # Calculate the total noise based on the covariance or noise array if spectra.covariance is not None: covariance_slice = spectra.covariance[mask][:, mask] total_noise = np.sqrt(np.sum(covariance_slice)) else: total_noise = np.sqrt(np.sum(spectra.noise[mask] ** 2)) # Calculate the wavelength range covered by the mask wavelength_slice = spectra.wavelengths[mask] delta_lambda = wavelength_slice[-1] - wavelength_slice[0] return total_signal / np.sqrt(delta_lambda * z_factor), total_noise
[docs] @classmethod def get_specific_snr(cls, spectra, mask, z_factor=1.0): """Get the mean SNR of the spectra. :param spectra: spectra object :type spectra: squirrel.data.Spectra :param mask: mask for spectra :type mask: numpy.ndarray :param z_factor: multiplicative factor for wavelength (e.g., 1 + z), if the SNR is computed at a different frame :type z_factor: float :return: mean SNR per (wavelength unit)^(1/2) :rtype: float """ # Get the specific signal and noise using the provided mask and z_factor signal, noise = cls.get_specific_signal_and_noise( spectra, mask, z_factor=z_factor ) return signal / noise
[docs] @classmethod def check_bias_vs_snr( cls, spectra_data, template, spectra_mask_for_snr=None, target_snrs=np.arange(10, 51, 10), input_velocity_dispersions=[250], template_weight=1.0, polynomial_degree=0, multiplicative_polynomial_degree=0, polynomial_weights=[1.0], multiplicative_component=1.0, add_component=0.0, num_sample=50, z_factor=1.0, v_systematic=0.0, plot=True, ): """Check the bias in the velocity dispersion measurement as a function of SNR. :param spectra_data: data object :type spectra_data: squirrel.data.Spectra :param template: template object :type template: squirrel.template.Template :param spectra_mask_for_snr: mask of the spectra to compute SNR from :type spectra_mask_for_snr: numpy.ndarray :param target_snrs: target SNRs :type target_snrs: numpy.ndarray :param input_velocity_dispersions: input velocity dispersions :type input_velocity_dispersions: numpy.ndarray :param template_weight: weight of the template :type template_weight: float :param polynomial_degree: degree of the polynomial :type polynomial_degree: int :param multiplicative_polynomial_degree: degree of the multiplicative polynomial for fitting :type multiplicative_polynomial_degree: int :param polynomial_weights: weights for the additive polynomial to be added to the mock spectra :type polynomial_weights: numpy.ndarray :param multiplicative_component: multiplicative component to be multiplied to the mock spectra :type multiplicative_component: np.ndarray or float :param add_component: additive component to be added to the mock spectra :type add_component: float :param num_sample: number of Monte Carlo noise realizations :type num_sample: int :param z_factor: multiplicative factor for wavelength (e.g., 1 + z), if the SNR is computed at a different frame :type z_factor: float :param v_systematic: systematic velocity, km/s, the `vsyst` parameter in pPXF :type v_systematic: float :param plot: plot one example simulation for each input velocity dispersion and SNR :type plot: bool :return: recovered values :rtype: tuple """ # If no mask is provided, create a default mask that includes all wavelengths if spectra_mask_for_snr is None: spectra_mask_for_snr = np.ones_like(spectra_data.wavelengths).astype(bool) # Ensure that either covariance or noise is provided in the spectra data if spectra_data.covariance is None and spectra_data.noise is None: raise ValueError("Either covariance or noise must be provided.") # Initialize arrays to store recovered values recovered_velocities = np.zeros( (len(input_velocity_dispersions), len(target_snrs)) ) recovered_velocity_uncertainties = np.zeros( (len(input_velocity_dispersions), len(target_snrs)) ) recovered_velocity_scatters = np.zeros( (len(input_velocity_dispersions), len(target_snrs)) ) recovered_dispersions = np.zeros( (len(input_velocity_dispersions), len(target_snrs)) ) recovered_dispersion_uncertainties = np.zeros( (len(input_velocity_dispersions), len(target_snrs)) ) recovered_dispersion_scatters = np.zeros( (len(input_velocity_dispersions), len(target_snrs)) ) recovered_snrs = np.zeros((len(input_velocity_dispersions), len(target_snrs))) # Loop over each input velocity dispersion for i in tqdm(range(len(input_velocity_dispersions)), desc="Input dispersion"): input_dispersion = input_velocity_dispersions[i] # Create mock spectra by convolving the template with the input dispersion mock_flux = cls.make_convolved_spectra( template.flux[:, 0], input_dispersion, spectra_data.velocity_scale, int(spectra_data.velocity_scale / template.velocity_scale), data_wavelength=spectra_data.wavelengths, data_weight=template_weight, polynomial_degree=polynomial_degree, polynomial_weights=polynomial_weights, multiplicative_polynomial=multiplicative_component, v_systematic=v_systematic, ) # Add the additive component to the mock flux mock_flux += add_component # Loop over each target SNR for j in tqdm(range(len(target_snrs)), desc="Target SNR"): target_snr = target_snrs[j] # Initialize arrays to store samples for each Monte Carlo realization dispersion_samples = np.zeros(num_sample) dispersion_uncertainty_samples = np.zeros(num_sample) velocity_samples = np.zeros(num_sample) velocity_uncertainty_samples = np.zeros(num_sample) snr_samples = np.zeros(num_sample) # Perform Monte Carlo realizations for k in range(num_sample): data = deepcopy(spectra_data) data.flux = deepcopy(mock_flux) # Calculate the initial specific SNR initial_specific_snr = cls.get_specific_snr( spectra_data, spectra_mask_for_snr, z_factor=z_factor ) # Calculate the noise multiplier to achieve the target SNR noise_multiplier = initial_specific_snr / target_snr # Add noise to the mock spectra if data.covariance is not None: data.covariance *= noise_multiplier**2 if not is_positive_definite(data.covariance): data.covariance = get_nearest_positive_definite_matrix( data.covariance ) noise = np.random.multivariate_normal( data.flux * 0, data.covariance ) elif data.noise is not None: data.noise *= noise_multiplier noise = np.random.normal( data.flux * 0, data.noise, size=len(data.flux) ) data.flux += noise # Fit the mock spectra using pPXF mock_ppxf_fit = ppxf( templates=template.flux, galaxy=data.flux, noise=deepcopy( data.covariance if data.covariance is not None else data.noise ), # sending deepcopy just in case, as pPXF may manipulate the noise array/matrix velscale=data.velocity_scale, start=[0, input_dispersion], plot=False, lam=data.wavelengths, degree=polynomial_degree, mdegree=multiplicative_polynomial_degree, vsyst=v_systematic, quiet=True, velscale_ratio=int( data.velocity_scale / template.velocity_scale ), ) # Plot the fit for the first realization if plotting is enabled if plot and k == 0: mock_ppxf_fit.plot() plt.title( f"Input dispersion: {input_dispersion} km/s, SNR: {target_snr}" ) plt.show() # Store the results of the fit dispersion_samples[k] = mock_ppxf_fit.sol[1] dispersion_uncertainty_samples[k] = mock_ppxf_fit.error[1] velocity_samples[k] = mock_ppxf_fit.sol[0] velocity_uncertainty_samples[k] = mock_ppxf_fit.error[0] snr_samples[k] = cls.get_specific_snr( data, spectra_mask_for_snr, z_factor=z_factor ) # Calculate the statistics for the recovered values mean, uncertainty_mean, scatter = cls.get_stats( velocity_samples, velocity_uncertainty_samples ) recovered_velocities[i, j] = mean recovered_velocity_uncertainties[i, j] = uncertainty_mean recovered_velocity_scatters[i, j] = scatter mean, uncertainty_mean, scatter = cls.get_stats( dispersion_samples, dispersion_uncertainty_samples ) recovered_dispersions[i, j] = mean recovered_dispersion_uncertainties[i, j] = uncertainty_mean recovered_dispersion_scatters[i, j] = scatter recovered_snrs[i, j] = np.mean(snr_samples) return ( recovered_snrs, recovered_dispersions, recovered_dispersion_uncertainties, recovered_dispersion_scatters, recovered_velocities, recovered_velocity_uncertainties, recovered_velocity_scatters, )
[docs] @staticmethod def get_stats(values, uncertainties, sigma=3): """Compute the mean and standard deviation of the array after sigma- clipping. :param values: values :type arr: numpy.ndarray :param uncertainties: uncertainties :type uncertainties: numpy.ndarray :param sigma: sigma for clipping :type sigma: float :return: mean, uncertainty of the mean, and scatter :rtype: tuple """ # Generate samples from a normal distribution using the values and uncertainties samples = np.random.normal(values, uncertainties, size=(1000, len(values))) # Calculate the median of the values mean = np.median(values) # Calculate the uncertainty of the mean using the standard deviation of the medians uncertainty_mean = np.std(np.median(samples, axis=1)) # Calculate the scatter as the standard deviation of the values scatter = np.std(values) return mean, uncertainty_mean, scatter
[docs] @classmethod def plot_bias_vs_snr( cls, recovered_values, input_velocity_dispersions, fig_width=10, bias_threshold=0.02, show_scatter=True, show_mean_uncertainty=False, errorbar_kwargs_scatter={}, errorbar_kwargs_mean={}, **kwargs, ): """Plot the bias in the velocity dispersion measurement as a function of SNR. This function generates plots to visualize the bias in the velocity dispersion and velocity measurements as a function of Signal-to-Noise Ratio (SNR). It creates subplots for each input velocity dispersion and plots the recovered values along with their uncertainties and scatters. :param recovered_values: recovered values from check_bias_vs_snr :type recovered_values: tuple :param input_velocity_dispersions: input velocity dispersions :type input_velocity_dispersions: numpy.ndarray :param fig_width: width of the figure :type fig_width: float :param bias_threshold: bias threshold line for plotting :type bias_threshold: float :param show_scatter: show scatter of the points :type show_scatter: bool :param show_mean_uncertainty: show mean uncertainty of the points :type show_mean_uncertainty: bool :param errorbar_kwargs_scatter: keyword arguments for errorbar for scatter :type errorbar_kwargs_scatter: dict :param errorbar_kwargs_mean: keyword arguments for errorbar for mean uncertainty :type errorbar_kwargs_mean: dict :return: figure and axes :rtype: tuple """ # Unpack the recovered values ( recovered_snrs, recovered_dispersions, recovered_dispersion_uncertainties, recovered_dispersion_scatters, recovered_velocities, recovered_velocity_uncertainties, recovered_velocity_scatters, ) = recovered_values # Create subplots for each input velocity dispersion fig, axes = plt.subplots( len(input_velocity_dispersions), 2, figsize=(fig_width, fig_width / 8.0 * len(input_velocity_dispersions)), ) # Plot the bias in velocity dispersion cls.plot_bias_vs_snr_single( axes[:, 0], input_velocity_dispersions, recovered_snrs, recovered_dispersions, recovered_dispersion_uncertainties, recovered_dispersion_scatters, show_scatter=show_scatter, show_mean_uncertainty=show_mean_uncertainty, bias_threshold=bias_threshold, x_label=r"SNR (${\AA}^{-1/2}$)", y_label=r"$\sigma$ (km s$^{-1}$)", errorbar_kwargs_mean=errorbar_kwargs_mean, errorbar_kwargs_scatter=errorbar_kwargs_scatter, **kwargs, ) # Plot the bias in velocity cls.plot_bias_vs_snr_single( axes[:, 1], np.zeros(len(recovered_velocities)), recovered_snrs, recovered_velocities, recovered_velocity_uncertainties, recovered_velocity_scatters, show_scatter=show_scatter, show_mean_uncertainty=show_mean_uncertainty, bias_threshold=0.0, x_label=r"SNR (${\AA}^{-1/2}$)", y_label=r"$\Delta v$ (km s$^{-1}$)", errorbar_kwargs_mean=errorbar_kwargs_mean, errorbar_kwargs_scatter=errorbar_kwargs_scatter, **kwargs, ) # Adjust the spacing between subplots plt.subplots_adjust(hspace=0.0) return fig, axes
[docs] @staticmethod def plot_bias_vs_snr_single( axes, input_values, recovered_snrs, recovered_values, recovered_value_uncertainties, recovered_value_scatters, show_scatter=True, show_mean_uncertainty=False, bias_threshold=0.02, x_label="", y_label="", errorbar_kwargs_mean={}, errorbar_kwargs_scatter={}, **kwargs, ): """Plot the bias in velocity dispersion measurement as a function of SNR. This function visualizes the bias in a single kinematic value as a function of Signal-to-Noise Ratio (SNR). It plots the recovered values along with their uncertainties and scatters. :param axes: array of ax objects to plot, must match the length of `input_values` :type axes: numpy.ndarray or list :param input_values: input values :type input_values: numpy.ndarray :param recovered_snrs: recovered SNRs :type recovered_snrs: numpy.ndarray :param recovered_values: recovered values :type recovered_values: numpy.ndarray :param recovered_value_uncertainties: uncertainties of recovered values :type recovered_value_uncertainties: numpy.ndarray :param recovered_value_scatters: scatters of recovered values :type recovered_value_scatters: numpy.ndarray :param show_scatter: whether to show scatter of the points :type show_scatter: bool :param show_mean_uncertainty: whether to show mean uncertainty of the points :type show_mean_uncertainty: bool :param bias_threshold: bias threshold line for plotting :type bias_threshold: float :param x_label: label for the x-axis :type x_label: str :param y_label: label for the y-axis :type y_label: str :param errorbar_kwargs_mean: keyword arguments for errorbar for mean uncertainty :type errorbar_kwargs_mean: dict :param errorbar_kwargs_scatter: keyword arguments for errorbar for scatter :type errorbar_kwargs_scatter: dict :return: None :rtype: None """ # Assert axes has the same length as input_values assert len(axes) == len(input_values) # Plot the recovered values with scatter and mean uncertainty for i, input_value in enumerate(input_values): if show_scatter: axes[i].errorbar( recovered_snrs[i], recovered_values[i], yerr=recovered_value_scatters[i], **errorbar_kwargs_mean, ) if show_mean_uncertainty: axes[i].errorbar( recovered_snrs[i], recovered_values[i], yerr=recovered_value_uncertainties[i], **errorbar_kwargs_scatter, ) # Set default marker and linestyle if not provided if "marker" not in kwargs: kwargs["marker"] = "o" if "ls" not in kwargs: kwargs["ls"] = "--" if "markersize" not in kwargs: kwargs["markersize"] = 2 # Plot the recovered values axes[i].plot( recovered_snrs[i], recovered_values[i], mec="k", zorder=20, **kwargs, ) # Plot the input value as a horizontal line axes[i].axhline(input_value, color="grey", linestyle="--", alpha=0.6) # Plot the bias threshold as a shaded region if bias_threshold > 0.0: axes[i].axhspan( input_value * (1 - bias_threshold), input_value * (1 + bias_threshold), color="grey", alpha=0.3, zorder=-10, ) axes[i].set_ylabel(y_label) axes[-1].set_xlabel(x_label)
[docs] @classmethod def make_convolved_spectra( cls, template_flux, velocity_dispersion, velocity_scale, velocity_scale_ratio, data_wavelength, velocity=0.0, data_weight=1, polynomial_degree=0, polynomial_weights=[1.0], multiplicative_polynomial=1.0, v_systematic=0.0, ): """Make a convolved spectra. This function generates a mock spectrum by convolving a template spectrum with a given velocity dispersion. It also applies a polynomial to the convolved spectrum to simulate various observational effects. :param template_flux: template flux. Wavelengths are not needed as `v_systematic` and `velocity_scale_ratio` will be used to obtain that. :type template_flux: numpy.ndarray :param velocity_dispersion: velocity dispersion, in km/s :type velocity_dispersion: float :param velocity_scale: velocity scale, in km/s :type velocity_scale: float :param velocity_scale_ratio: velocity scale ratio :type velocity_scale_ratio: int :param data_wavelength: data wavelength :type data_wavelength: numpy.ndarray :param velocity: velocity, km/s :type velocity: float :param data_weight: multiplicative factor for the template flux, effective when polynomial_degree > 0 to set the relative amplitude of data and polynomial :type data_weight: float :param polynomial_degree: degree of the polynomial :type polynomial_degree: int :param polynomial_weights: weights of the polynomial :type polynomial_weights: numpy.ndarray :param v_systematic: systematic velocity, km/s, the `vsyst` parameter in pPXF :type v_systematic: float :return: convolved spectra :rtype: numpy.ndarray """ # Number of pixels in the data wavelength array data_num_pix = len(data_wavelength) # Convolve the template flux with the given velocity dispersion galaxy_model = convolve_gauss_hermite( template_flux, velocity_scale, np.array([velocity, velocity_dispersion]), data_num_pix, velscale_ratio=velocity_scale_ratio, vsyst=v_systematic, ) # Generate a Legendre polynomial x = np.linspace(-1, 1, data_num_pix) vand = legendre.legvander(x, polynomial_degree) # Return the convolved spectrum with the polynomial applied return multiplicative_polynomial * data_weight * galaxy_model + np.dot( vand, polynomial_weights )