"""This module contains the class to wrap the pPXF package for kinematic analysis."""
import numpy as np
from copy import deepcopy
import matplotlib.pyplot as plt
from scipy import ndimage
from scipy.special import ndtr
from ppxf import ppxf_util
from ppxf.ppxf import ppxf
from ppxf import sps_util
from vorbin.voronoi_2d_binning import voronoi_2d_binning
from vorbin.voronoi_2d_binning import _compute_useful_bin_quantities
from vorbin.voronoi_2d_binning import _sn_func
from powerbin import PowerBin
from tqdm import tqdm
import warnings
from .data import VoronoiBinnedSpectra
from .data import PowerBinnedSpectra
from .template import Template
from .util import is_positive_definite
from .util import get_nearest_positive_definite_matrix
from .util import powerbin_capacity_spec
[docs]
class Pipeline(object):
"""A class to wrap the pPXF package for kinematic analysis.
This class provides various static methods to perform kinematic analysis using the
pPXF package. It includes methods for log rebinning, Voronoi binning, creating
kinematic maps, and running pPXF fits.
"""
_speed_of_light = 299792.458 # speed of light in km/s
[docs]
@staticmethod
def log_rebin(
spectra,
velocity_scale=None,
num_samples_for_covariance=None,
take_covariance=True,
):
"""Rebin the data to log scale.
:param spectra: data to rebin
:type spectra: `Spectra` class
:param velocity_scale: velocity scale for the rebinning
:type velocity_scale: float
:param num_samples_for_covariance: number of samples for the covariance estimation
:type num_samples_for_covariance: int
:param take_covariance: take the covariance into account
:type take_covariance: bool
:return: rebinned spectra
:rtype: `Spectra` class
"""
if "log_rebinned" in spectra.spectra_modifications:
raise ValueError("Data has already been log rebinned.")
spectra = deepcopy(spectra)
# Define the wavelength range for rebinning
wavelength_range = spectra.wavelengths[[0, -1]]
# Perform log rebinning using ppxf_util
rebinned_spectra, log_rebinned_wavelength, velocity_scale = ppxf_util.log_rebin(
wavelength_range, spectra.flux, velscale=velocity_scale
)
# Estimate covariance matrix of rebinned spectra through sampling from noise realizations
if num_samples_for_covariance is None:
# If number of samples not provided, picking the data size following this paper: https://arxiv.org/abs/1004.3484
num_samples_for_covariance = spectra.flux.shape[0]
# Flatten the flux and noise arrays for easier manipulation
flux_shape = spectra.flux.shape
flux_flattened = np.atleast_2d(spectra.flux.reshape(flux_shape[0], -1))
noise_flattened = np.atleast_2d(spectra.noise.reshape(flux_shape[0], -1))
# Initialize covariance or noise arrays based on the take_covariance flag
if take_covariance:
covariance = np.atleast_3d(
np.zeros(
(
rebinned_spectra.shape[0],
rebinned_spectra.shape[0],
*flux_flattened.shape[1:],
)
)
)
noise = None
else:
covariance = None
noise = np.atleast_2d(
np.zeros((rebinned_spectra.shape[0], *flux_flattened.shape[1:]))
)
# Loop through each flux realization and perform log rebinning
for i in tqdm(range(flux_flattened.shape[1])):
flux_realizations = np.random.normal(
flux_flattened[:, i],
noise_flattened[:, i],
(num_samples_for_covariance, len(flux_flattened[:, i])),
).T
rebinned_realizations, _, _ = ppxf_util.log_rebin(
wavelength_range, flux_realizations, velscale=velocity_scale
)
# Calculate covariance or noise based on the take_covariance flag
if take_covariance:
covariance[:, :, i] = np.cov(rebinned_realizations)
# Set non-adjacent covariance to zero to avoid numerical issues from noise
for j in range(covariance.shape[0]):
for k in range(covariance.shape[1]):
if np.abs(j - k) > 1:
covariance[j, k, i] = 0
else:
noise[:, i] = np.std(rebinned_realizations, axis=1)
# Reshape the covariance or noise arrays back to the original shape
if take_covariance:
covariance = covariance.reshape(
rebinned_spectra.shape[0], rebinned_spectra.shape[0], *flux_shape[1:]
)
else:
noise = noise.reshape(rebinned_spectra.shape[0], *flux_shape[1:])
# Update the spectra object with the rebinned data
spectra.flux = rebinned_spectra
spectra.noise = noise
spectra.covariance = covariance
spectra.wavelengths = np.exp(log_rebinned_wavelength)
spectra.velocity_scale = velocity_scale
spectra.spectra_modifications += ["log_rebinned"]
return spectra
[docs]
@staticmethod
def get_voronoi_binning_map(
datacube,
signal_image_per_wavelength_unit,
noise_image,
target_snr,
max_radius=None,
min_snr_per_spaxel=1.0,
plot=False,
quiet=True,
**kwargs,
):
"""Get the Voronoi binning map.
:param datacube: datacube to bin
:type datacube: `DataCube` class
:param signal_image_per_wavelength_unit: signal image per wavelength unit
:type signal_image_per_wavelength_unit: np.ndarray
:param noise_image: noise image
:type noise_image: np.ndarray
:param target_snr: target S/N per wavelength unit for each bin
:type target_snr: float
:param max_radius: maximum radius for binning, in the unit of in `datacube.x_coordinates`
:type max_radius: float
:param min_snr_per_spaxel: minimum S/N per spaxel to include in the binning
:type min_snr_per_spaxel: float
:param plot: plot the results
:type plot: bool
:param quiet: suppress the output
:type quiet: bool
:param kwargs: additional arguments for `voronoi_2d_binning()`
:type kwargs: dict
:return: Voronoi binning map outputs: number of bins, x pixels, y pixels, bin center x, bin center y, S/N per bin, area of each bin
:rtype: tuple
"""
# Calculate the radius for each spaxel in the datacube
radius = np.sqrt(datacube.x_coordinates**2 + datacube.y_coordinates**2)
# Calculate the SNR image per wavelength unit and create a mask based on the minimum SNR per spaxel
snr_image_per_wavelength_unit = signal_image_per_wavelength_unit / noise_image
snr_mask = snr_image_per_wavelength_unit > min_snr_per_spaxel
# Apply the maximum radius mask if provided
if max_radius is not None:
snr_mask = snr_mask & (radius < max_radius)
# Create pixel coordinate grids
x_pixels = np.arange(datacube.flux.shape[2])
y_pixels = np.arange(datacube.flux.shape[1])
xx_pixels, yy_pixels = np.meshgrid(x_pixels, y_pixels)
# Mask the pixel coordinates based on the SNR mask
xx_pixels_masked = xx_pixels[snr_mask]
yy_pixels_masked = yy_pixels[snr_mask]
# Mask the coordinates and signal/noise images based on the SNR mask
xx_coordinates_masked = datacube.x_coordinates[snr_mask]
yy_coordinates_masked = datacube.y_coordinates[snr_mask]
signal_image_per_wavelength_unit_masked = signal_image_per_wavelength_unit[
snr_mask
]
noise_image_masked = noise_image[snr_mask]
# Perform Voronoi binning using the masked coordinates and signal/noise images
(
num_bins,
x_node,
y_node,
bin_center_x,
bin_center_y,
snr,
n_pixels,
scale,
) = voronoi_2d_binning(
xx_coordinates_masked,
yy_coordinates_masked,
signal_image_per_wavelength_unit_masked,
noise_image_masked,
target_snr,
plot=plot,
quiet=quiet,
**kwargs,
)
# Compute useful bin quantities
classes, x_bar, y_bar, snr, area = _compute_useful_bin_quantities(
xx_coordinates_masked,
yy_coordinates_masked,
signal_image_per_wavelength_unit_masked,
noise_image_masked,
x_node,
y_node,
scale,
sn_func=_sn_func,
)
if plot:
plt.tight_layout()
return (
num_bins,
xx_pixels_masked,
yy_pixels_masked,
bin_center_x,
bin_center_y,
snr,
area,
)
[docs]
@staticmethod
def get_voronoi_binned_spectra(datacube, bin_mapping_output):
"""Perform the Voronoi binning.
:param datacube: datacube to bin
:type datacube: `DataCube` class
:param bin_mapping_output: outputs from `get_voronoi_binning_map()`
:type bin_mapping_output: tuple
:return: Voronoi binned spectra
:rtype: `VoronoiBinnedSpectra` class
"""
(
num_bins,
xx_pixels_masked,
yy_pixels_masked,
bin_center_x,
bin_center_y,
snr,
area,
) = bin_mapping_output
# Initialize arrays for the Voronoi binned flux, noise, and covariance
voronoi_binned_flux = np.zeros(
(datacube.flux.shape[0], int(np.max(num_bins)) + 1)
)
if datacube.noise is not None:
voronoi_binned_noise = np.zeros_like(voronoi_binned_flux)
else:
voronoi_binned_noise = None
if datacube.covariance is not None:
voronoi_binned_covariance = np.zeros(
(
voronoi_binned_flux.shape[0],
voronoi_binned_flux.shape[0],
int(np.max(num_bins)) + 1,
)
)
else:
voronoi_binned_covariance = None
# Sum the flux, noise, and covariance for each bin
for x, y, n_bin in zip(xx_pixels_masked, yy_pixels_masked, num_bins):
voronoi_binned_flux[:, n_bin] += datacube.flux[:, y, x]
if datacube.noise is not None:
voronoi_binned_noise[:, n_bin] += datacube.noise[:, y, x] ** 2
if datacube.covariance is not None:
voronoi_binned_covariance[:, :, n_bin] += datacube.covariance[
:, :, y, x
]
# Take the square root of the noise to get the standard deviation
if datacube.noise is not None:
voronoi_binned_noise = np.sqrt(voronoi_binned_noise)
# Create the VoronoiBinnedSpectra object with the binned data
voronoi_binned_spectra = VoronoiBinnedSpectra(
wavelengths=datacube.wavelengths,
flux=voronoi_binned_flux,
wavelength_unit=datacube.wavelength_unit,
fwhm=datacube.fwhm,
z_lens=datacube.z_lens,
z_source=datacube.z_source,
x_coordinates=datacube.x_coordinates,
y_coordinates=datacube.y_coordinates,
num_bins=num_bins,
x_pixel_index_of_bins=xx_pixels_masked,
y_pixel_index_of_bins=yy_pixels_masked,
flux_unit=datacube.flux_unit,
noise=voronoi_binned_noise,
covariance=voronoi_binned_covariance,
bin_center_x=bin_center_x,
bin_center_y=bin_center_y,
area=area,
snr=snr,
)
# Copy the spectra modifications and wavelength frame from the datacube
voronoi_binned_spectra.spectra_modifications = deepcopy(
datacube.spectra_modifications
)
voronoi_binned_spectra.wavelengths_frame = deepcopy(datacube.wavelengths_frame)
voronoi_binned_spectra.velocity_scale = deepcopy(datacube.velocity_scale)
return voronoi_binned_spectra
[docs]
@staticmethod
def get_power_binning_map(
datacube,
signal_image_per_wavelength_unit,
noise_image,
target_snr,
max_radius=None,
min_snr_per_spaxel=1.0,
capacity_spec=None,
capacity_spec_args=None,
capacity_spec_snr_relation=None,
plot=False,
quiet=True,
**kwargs,
):
"""Get the PowerBin binning map.
:param datacube: datacube to bin
:type datacube: `DataCube` class
:param signal_image_per_wavelength_unit: signal image per wavelength unit
:type signal_image_per_wavelength_unit: np.ndarray
:param noise_image: noise image
:type noise_image: np.ndarray
:param target_snr: target S/N per wavelength unit for each bin
:type target_snr: float
:param max_radius: maximum radius for binning, in the unit of in `datacube.x_coordinates`
:type max_radius: float
:param min_snr_per_spaxel: minimum S/N per spaxel to include in the binning
:type min_snr_per_spaxel: float
:param capacity_spec: specification of bin capacity related to S/N, default will use `util.powerbin_capacity_spec()`
:type: callable with `capacity_spec_args`, or string 'additive'
:param capacity_spec_args: input parameters for callable `capacity_spec`
:type: tuple
:param capacity_spec_snr_relation: functional relationship between capacity definition and S/N, so that the output is given as S/N, S/N = capacity_spec_snr_relation(capacity)
:type: callable
:param plot: plot the results
:type plot: bool
:param quiet: suppress the output
:type quiet: bool
:param kwargs: additional arguments for `PowerBin()`
:type kwargs: dict
:return: PowerBin binning map outputs: bin number assigned to each pixel, x pixels, y pixels, bin center x, bin center y, capacity per bin, area of each bin
:rtype: tuple
"""
# Calculate the radius for each spaxel in the datacube
radius = np.sqrt(datacube.x_coordinates**2 + datacube.y_coordinates**2)
# Calculate the SNR image per wavelength unit and create a mask based on the minimum SNR per spaxel
snr_image_per_wavelength_unit = signal_image_per_wavelength_unit / noise_image
snr_mask = snr_image_per_wavelength_unit > min_snr_per_spaxel
# Apply the maximum radius mask if provided
if max_radius is not None:
snr_mask = snr_mask & (radius < max_radius)
# Create pixel coordinate grids
x_pixels = np.arange(datacube.flux.shape[2])
y_pixels = np.arange(datacube.flux.shape[1])
xx_pixels, yy_pixels = np.meshgrid(x_pixels, y_pixels)
# Mask the pixel coordinates based on the SNR mask
xx_pixels_masked = xx_pixels[snr_mask]
yy_pixels_masked = yy_pixels[snr_mask]
# Mask the coordinates and signal/noise images based on the SNR mask
xx_coordinates_masked = datacube.x_coordinates[snr_mask]
yy_coordinates_masked = datacube.y_coordinates[snr_mask]
xy_coords = np.column_stack((xx_coordinates_masked, yy_coordinates_masked))
signal_image_per_wavelength_unit_masked = signal_image_per_wavelength_unit[
snr_mask
]
noise_image_masked = noise_image[snr_mask]
# set capacity_spec
if capacity_spec is None:
# take the default _capacity_spec function
capacity_spec = powerbin_capacity_spec
capacity_spec_args = (
signal_image_per_wavelength_unit_masked,
noise_image_masked,
)
# make sure capacity_spec_snr_relation is None
assert (
capacity_spec_snr_relation is None
), "For default capacity_spec, capacity_spec_snr_relation must be None"
target_capacity = target_snr**2
elif capacity_spec == "additive":
capacity_spec = (
signal_image_per_wavelength_unit_masked / noise_image_masked
) ** 2
target_capacity = target_snr**2
# make sure capacity_spec_snr_relation is None
assert (
capacity_spec_snr_relation is None
), "For 'additive' capacity_spec, capacity_spec_snr_relation must be None"
capacity_spec_args = ()
else:
target_capacity = target_snr
# Check if any arguments are the same size as the raw input images
is_masked = (min_snr_per_spaxel != 0) or (max_radius is not None)
if is_masked:
for arg in (
capacity_spec_args
if isinstance(capacity_spec_args, (tuple, list))
else [capacity_spec_args]
):
if (
isinstance(arg, np.ndarray)
and arg.shape == signal_image_per_wavelength_unit.shape
):
print(
"WARNING: An argument in 'capacity_spec_args' has the same dimensions as the input images. "
"Note that 'capacity_spec' runs on MASKED data. If your function expects full-sized "
"images, you must manually mask them using the same 'min_snr_per_spaxel' and 'max_radius' "
"criteria, or set 'min_snr_per_spaxel=0' and 'max_radius=None' to pass full images."
)
# ensure args are a tuple
capacity_spec_args = (
tuple(capacity_spec_args)
if isinstance(capacity_spec_args, (tuple, list))
else (capacity_spec_args,)
)
# set verbose setting
verbose = 0 if quiet else 2
# Perform PowerBin binning using the masked coordinates and signal/noise images
powerbin = PowerBin(
xy_coords,
capacity_spec,
target_capacity,
verbose=verbose,
args=capacity_spec_args,
**kwargs,
)
# Collect and compute useful bin quantities
num_bins = powerbin.bin_num
bin_center_x, bin_center_y = powerbin.xybin.T
if capacity_spec_snr_relation is None:
snr = np.sqrt(powerbin.bin_capacity)
else:
snr = capacity_spec_snr_relation(powerbin.bin_capacity)
area = np.bincount(powerbin.bin_num)
if plot:
# set the scale for the capacity_cnr_relation, default 'sqrt'
if capacity_spec_snr_relation is None:
capacity_scale = "sqrt"
else:
capacity_scale = "raw"
powerbin.plot(capacity_scale=capacity_scale)
plt.tight_layout()
return (
num_bins,
xx_pixels_masked,
yy_pixels_masked,
bin_center_x,
bin_center_y,
snr,
area,
)
[docs]
@staticmethod
def get_power_binned_spectra(datacube, bin_mapping_output):
"""Perform the PowerBin binning.
:param datacube: datacube to bin
:type datacube: `DataCube` class
:param bin_mapping_output: outputs from `get_power_binning_map()`
:type bin_mapping_output: tuple
:return: PowerBin binned spectra
:rtype: `PowerBinBinnedSpectra` class
"""
(
num_bins,
xx_pixels_masked,
yy_pixels_masked,
bin_center_x,
bin_center_y,
snr,
area,
) = bin_mapping_output
# Initialize arrays for the PowerBin binned flux, noise, and covariance
power_binned_flux = np.zeros(
(datacube.flux.shape[0], int(np.max(num_bins)) + 1)
)
if datacube.noise is not None:
power_binned_noise = np.zeros_like(power_binned_flux)
else:
power_binned_noise = None
if datacube.covariance is not None:
power_binned_covariance = np.zeros(
(
power_binned_flux.shape[0],
power_binned_flux.shape[0],
int(np.max(num_bins)) + 1,
)
)
else:
power_binned_covariance = None
# Sum the flux, noise, and covariance for each bin
for x, y, n_bin in zip(xx_pixels_masked, yy_pixels_masked, num_bins):
power_binned_flux[:, n_bin] += datacube.flux[:, y, x]
if datacube.noise is not None:
power_binned_noise[:, n_bin] += datacube.noise[:, y, x] ** 2
if datacube.covariance is not None:
power_binned_covariance[:, :, n_bin] += datacube.covariance[:, :, y, x]
# Take the square root of the noise to get the standard deviation
if datacube.noise is not None:
power_binned_noise = np.sqrt(power_binned_noise)
# Create the PowerBinBinnedSpectra object with the binned data
power_binned_spectra = PowerBinnedSpectra(
wavelengths=datacube.wavelengths,
flux=power_binned_flux,
wavelength_unit=datacube.wavelength_unit,
fwhm=datacube.fwhm,
z_lens=datacube.z_lens,
z_source=datacube.z_source,
x_coordinates=datacube.x_coordinates,
y_coordinates=datacube.y_coordinates,
num_bins=num_bins,
x_pixel_index_of_bins=xx_pixels_masked,
y_pixel_index_of_bins=yy_pixels_masked,
flux_unit=datacube.flux_unit,
noise=power_binned_noise,
covariance=power_binned_covariance,
bin_center_x=bin_center_x,
bin_center_y=bin_center_y,
area=area,
snr=snr,
)
# Copy the spectra modifications and wavelength frame from the datacube
power_binned_spectra.spectra_modifications = deepcopy(
datacube.spectra_modifications
)
power_binned_spectra.wavelengths_frame = deepcopy(datacube.wavelengths_frame)
power_binned_spectra.velocity_scale = deepcopy(datacube.velocity_scale)
return power_binned_spectra
[docs]
@staticmethod
def create_kinematic_map_from_bins(
bin_mapping, kinematic_values, nan_outside_bins=False
):
"""Create a kinematic map from the binned spectra and the kinematic values.
This function generates a 2D kinematic map by assigning
kinematic values to each pixel based on the bin mapping.
:param bin_mapping: A 2D array showing bin numbers for each
pixel. Pixels not assigned to any bin should have a value of
-1.
:type bin_mapping: np.ndarray
:param kinematic_values: A list of kinematic values
corresponding to each bin.
:param nan_outside_bins: Whether to set pixels outside of bins (with bin number -1) to NaN.
If False (default), those pixels will be set to zero.
Having NaN values can be useful for better visualization (e.g. with plt.imshow).
:type value_outside_bins: bool, optional
:type kinematic_values: list of float
:return: A 2D kinematic map with the same shape as bin_mapping.
:rtype: np.ndarray
"""
# Initialize the kinematic map with either NaNs or zeros
if nan_outside_bins:
kinematic_map = np.nan * np.ones_like(bin_mapping)
else:
kinematic_map = np.zeros_like(bin_mapping)
# Loop through each pixel in the bin mapping
for i in range(kinematic_map.shape[0]):
for j in range(kinematic_map.shape[1]):
# Skip pixels not assigned to any bin
if bin_mapping[i, j] == -1:
continue
# Assign the kinematic value to the corresponding pixel
kinematic_map[i, j] = kinematic_values[int(bin_mapping[i, j])]
return kinematic_map
[docs]
@staticmethod
def get_template_from_library(
library_path,
spectra,
velocity_scale_ratio,
wavelength_factor=1.0,
wavelength_range_extend_factor=1.05,
**kwargs,
):
"""Get the template object created for a stellar template library.
The `library_path` should point to a `numpy.savez()` file containing the following arrays for a given SPS models library, like FSPS, Miles, GALEXEV, BPASS. This file will be sent to `ppxf.sps_util.sps_lib()`. See the documentation of that function for the format of the file.
The EMILES, FSPS, GALEXEV libraries are available at https://github.com/micappe/ppxf_data.
:param library_path: Path to the library.
:type library_path: str
:param spectra: Log rebinned spectra, which the templates will be used for.
:type spectra: `Spectra` or a child class
:param velocity_scale_ratio: Velocity scale ratio for the template.
:type velocity_scale_ratio: float
:param wavelength_factor: Factor to multiply the wavelength range to get the templates for, used for de-redshifting, if necessary.
:type wavelength_factor: float
:param wavelength_range_extend_factor: Factor to extend the wavelength range.
:type wavelength_range_extend_factor: float
:param kwargs: Additional arguments for `ppxf.sps_util.sps_lib()` function.
:type kwargs: dict
:return: Template object.
:rtype: `Template` class
"""
assert spectra.wavelength_unit == "AA", "Wavelength unit must be in Angstrom."
assert (
"log_rebinned" in spectra.spectra_modifications
), "Data must be log rebinned."
# Define the wavelength range for the templates
wavelength_range_templates = (
spectra.wavelengths[0] / wavelength_range_extend_factor * wavelength_factor,
spectra.wavelengths[-1]
* wavelength_range_extend_factor
* wavelength_factor,
)
# Sample the template library at data resolution times the velscale_ratio in the given wavelength range
sps = sps_util.sps_lib(
library_path,
spectra.velocity_scale / velocity_scale_ratio,
spectra.fwhm,
lam_range=wavelength_range_templates,
norm_range=[
spectra.wavelengths[0] * wavelength_factor,
spectra.wavelengths[-1] * wavelength_factor,
],
**kwargs,
)
# Reshape the template fluxes and get the template wavelengths
template_fluxes = sps.templates.reshape(sps.templates.shape[0], -1)
templates_wavelengths = sps.lam_temp / wavelength_factor
# Create the Template object
template = Template(
templates_wavelengths,
template_fluxes,
wavelength_unit="AA",
fwhm=spectra.fwhm,
)
template.velocity_scale = spectra.velocity_scale / velocity_scale_ratio
return template
[docs]
@staticmethod
def get_emission_line_template(
spectra,
template_wavelengths,
wavelength_factor=1.0,
wavelength_range_extend_factor=1.05,
**kwargs,
):
"""Get the emission line template.
This function generates an emission line template for the given spectra.
:param spectra: Log rebinned spectra.
:type spectra: `Spectra` or a child class
:param template_wavelengths: Wavelengths of the template in Angstrom.
:type template_wavelengths: np.ndarray
:param wavelength_factor: Factor to multiply the wavelength range to get the templates for, used for de-redshifting, if necessary.
:type wavelength_factor: float
:param wavelength_range_extend_factor: Factor to extend the wavelength range.
:type wavelength_range_extend_factor: float
:param kwargs: Additional arguments for `ppxf_util.emission_lines`.
:type kwargs: dict
:return: Emission line template, line names, line wavelengths.
:rtype: `Template` class, list of str, np.ndarray
"""
# Define the wavelength range for the templates
wavelength_range_templates = (
spectra.wavelengths[0] / wavelength_range_extend_factor * wavelength_factor,
spectra.wavelengths[-1]
* wavelength_range_extend_factor
* wavelength_factor,
)
# Generate the emission line templates using ppxf_util
gas_templates, line_names, line_wavelengths = ppxf_util.emission_lines(
np.log(template_wavelengths * wavelength_factor),
wavelength_range_templates,
spectra.fwhm,
**kwargs,
)
# Create the Template object
template = Template(
template_wavelengths,
gas_templates,
wavelength_unit="AA",
fwhm=spectra.fwhm,
)
return template, line_names, line_wavelengths
[docs]
@staticmethod
def join_templates(
kinematic_template,
kinematic_template_2=None,
emission_line_template=None,
emission_line_groups=None,
):
"""Join multiple templates into a single template object.
This function combines kinematic and emission line templates into a single template object.
It also generates component indices to identify which component each template belongs to.
:param kinematic_template: The primary kinematic template.
:type kinematic_template: `Template` class
:param kinematic_template_2: An optional secondary kinematic template.
:type kinematic_template_2: `Template` class, optional
:param emission_line_template: An optional emission line template.
:type emission_line_template: `Template` class, optional
:param emission_line_groups: Groups for the emission line templates.
:type emission_line_groups: list of int, optional
:return: Combined template, component indices, and emission line indices.
:rtype: tuple of `Template` class, np.ndarray, np.ndarray
"""
# Ensure the primary kinematic template flux is 2D
assert (
len(kinematic_template.flux.shape) == 2
), "kinematic_template.flux must be 2D."
# Initialize the combined flux and component indices
flux = kinematic_template.flux
component_indices = np.zeros(kinematic_template.flux.shape[1], dtype=int)
# If a secondary kinematic template is provided, append its flux and update component indices
if kinematic_template_2 is not None:
assert (
len(kinematic_template_2.flux.shape) == 2
), "kinematic_template_2.flux must be 2D."
assert (
kinematic_template.flux.shape[0] == kinematic_template_2.flux.shape[0]
), "Flux array shape mismatch between the templates!"
flux = np.append(flux, kinematic_template_2.flux, axis=1)
component_indices = np.append(
component_indices,
np.ones(kinematic_template_2.flux.shape[1], dtype=int),
)
# If an emission line template is provided, append its flux and update component indices
if emission_line_template is not None:
flux = np.append(flux, emission_line_template.flux, axis=1)
if kinematic_template_2 is not None:
component_indices = np.append(
component_indices, np.array(emission_line_groups) + 2
)
emission_line_indices = component_indices > 1.0
else:
component_indices = np.append(
component_indices, np.array(emission_line_groups) + 1
)
emission_line_indices = component_indices > 0.0
else:
emission_line_indices = np.zeros_like(component_indices, dtype=bool)
# Create the combined template object
template = Template(
kinematic_template.wavelengths,
flux,
wavelength_unit=kinematic_template.wavelength_unit,
fwhm=kinematic_template.fwhm,
)
template.velocity_scale = kinematic_template.velocity_scale
return template, component_indices, emission_line_indices
[docs]
@staticmethod
def make_template_from_array(
fluxes,
wavelengths,
fwhm_template,
spectra,
velocity_scale_ratio,
wavelength_factor=1.0,
wavelength_range_extend_factor=1.05,
):
"""Create a template object from given fluxes and wavelengths.
This function generates a template object from provided fluxes and wavelengths.
It performs convolution to match the spectral resolution and log rebinning.
:param fluxes: Fluxes of the templates, dimensions must be (n_wavelengths, n_templates).
:type fluxes: np.ndarray
:param wavelengths: Wavelengths of the templates in Angstrom.
:type wavelengths: np.ndarray
:param spectra: Log rebinned spectra, which the templates will be used for.
:type spectra: `Spectra` or a child class
:param velocity_scale_ratio: Velocity scale ratio for the template.
:type velocity_scale_ratio: float
:param wavelength_factor: Factor to multiply the wavelength range to get the templates for, used for de-redshifting, if necessary.
:type wavelength_factor: float
:param wavelength_range_extend_factor: Factor to extend the wavelength range.
:type wavelength_range_extend_factor: float
:return: Template object.
:rtype: `Template` class
"""
# Ensure the wavelength unit is in Angstrom and data is log rebinned
assert spectra.wavelength_unit == "AA", "Wavelength unit must be in Angstrom."
assert (
"log_rebinned" in spectra.spectra_modifications
), "Data must be log rebinned."
# Define the wavelength range for the templates
wavelength_min = (
spectra.wavelengths[0] / wavelength_range_extend_factor * wavelength_factor
)
wavelength_max = (
spectra.wavelengths[-1] * wavelength_range_extend_factor * wavelength_factor
)
# Calculate the wavelength difference
wavelength_diff = np.mean(np.diff(wavelengths))
# Filter the fluxes and wavelengths based on the defined range
fluxes = fluxes[
(wavelengths > wavelength_min - wavelength_diff)
& (wavelengths < wavelength_max + wavelength_diff),
:,
]
wavelengths = wavelengths[
(wavelengths > wavelength_min - wavelength_diff)
& (wavelengths < wavelength_max + wavelength_diff)
]
wavelengths /= wavelength_factor
fwhm_template /= wavelength_factor
# Convolve the fluxes to match the spectral resolution if necessary
convolved_fluxes = fluxes
if fwhm_template < spectra.fwhm:
sigma_diff = (
np.sqrt(spectra.fwhm**2 - fwhm_template**2) / 2.355 / wavelength_diff
)
convolved_fluxes = ndimage.gaussian_filter1d(fluxes, sigma_diff, axis=0)
else:
warnings.warn(
"""The templates' resolution is lower than the spectra's resolution,
threfore, the templates are not convolved further. It will be needed to
subtract the offset in quadrature at the end of the kinematic extraction!"""
)
# Perform log rebinning on the convolved fluxes
rebinned_fluxes, log_wavelengths, velocity_scale_template = ppxf_util.log_rebin(
wavelengths,
convolved_fluxes,
velscale=spectra.velocity_scale / velocity_scale_ratio,
)
# Normalize the rebinned fluxes
rebinned_fluxes /= np.nanmean(rebinned_fluxes, axis=0)
# Convert the log wavelengths back to linear scale
templates_wavelengths = np.exp(log_wavelengths)
# Create the template object
template = Template(
templates_wavelengths,
rebinned_fluxes,
wavelength_unit="AA",
fwhm=spectra.fwhm,
)
template.velocity_scale = velocity_scale_template
return template
[docs]
@staticmethod
def run_ppxf(
data,
template,
start,
background_template=None,
spectra_indices=None,
quiet=True,
plot=False,
**kwargs_ppxf,
):
"""Perform the kinematic analysis using pPXF.
This method runs the Penalized Pixel-Fitting (pPXF) method on the provided data using the given template.
It allows for the analysis of both single spectra and datacubes or binned spectra.
:param data: Data to analyze.
:type data: `Data` class
:param template: Template library to use for fitting.
:type template: `Template` class
:param start: Initial guess for the velocity and dispersion for each kinematic component.
:type start: list
:param background_template: Background spectra to fit, if any.
:type background_template: `Data` class, optional
:param spectra_indices: Indices of the spectra to fit, used for datacubes or binned spectra.
:type spectra_indices: list of int or int, optional
:param quiet: Suppress the output.
:type quiet: bool, optional
:param plot: Plot the fit results.
:type plot: bool, optional
:param kwargs_ppxf: Additional options for `ppxf`, check documentation of `ppxf.ppxf()`.
:type kwargs_ppxf: dict
:return: pPXF fit object.
:rtype: `ppxf` class
"""
noise = None
# Check if spectra indices are provided for datacubes or binned spectra
if data.flux.ndim > 1 and spectra_indices is None:
raise ValueError(
"Spectra indices must be provided for datacubes or binned spectra."
)
# Extract the flux and noise for the specified spectra indices
if spectra_indices is not None:
if isinstance(spectra_indices, int) and data.flux.ndim == 2:
flux = data.flux[:, spectra_indices]
if data.covariance is not None:
noise = data.covariance[:, :, spectra_indices]
elif data.noise is not None:
noise = data.noise[:, spectra_indices]
elif (
isinstance(spectra_indices, list)
and len(spectra_indices) == data.flux.ndim - 1
and data.flux.ndim == 3
):
flux = data.flux[:, spectra_indices[0], spectra_indices[1]]
if data.covariance is not None:
noise = data.covariance[
:, :, spectra_indices[0], spectra_indices[1]
]
elif data.noise is not None:
noise = data.noise[:, spectra_indices[0], spectra_indices[1]]
else:
if data.flux.ndim == 2:
raise ValueError(
f"Spectra indices must be an integer for spectra with {data.flux.ndim} dimensions."
)
else:
raise ValueError(
f"Spectra indices must be a list of {data.flux.ndim - 1} integers for spectra with {data.flux.ndim} dimensions."
)
else:
flux = data.flux
if data.covariance is not None:
noise = data.covariance
elif data.noise is not None:
noise = data.noise
# If noise is not provided, set a default noise level
if noise is None:
noise = 0.1 * np.ones_like(flux)
# Ensure the noise matrix is positive definite if it is 2D
if noise.ndim == 2 and not is_positive_definite(noise):
noise = get_nearest_positive_definite_matrix(noise)
# Deep copy the original noise for later use
original_noise = deepcopy(noise)
# Run the pPXF fitting
ppxf_fit = ppxf(
templates=template.flux,
galaxy=flux,
noise=deepcopy(
noise
), # sending deepcopy just in case, as pPXF may manipulate the noise array/matrix
velscale=data.velocity_scale,
start=start,
plot=plot,
lam=data.wavelengths,
lam_temp=template.wavelengths,
sky=background_template.flux if background_template else None,
quiet=quiet,
velscale_ratio=max(
int(round(data.velocity_scale / template.velocity_scale)), 1
),
**kwargs_ppxf,
)
# Store the original noise in the pPXF fit object
ppxf_fit.original_noise = original_noise
return ppxf_fit
[docs]
@classmethod
def run_ppxf_on_binned_spectra(
cls,
binned_spectra,
template,
start,
background_template=None,
**kwargs_ppxf,
):
"""Perform the kinematic analysis using pPXF on binned spectra.
This method runs the Penalized Pixel-Fitting (pPXF) method on the provided binned spectra using the given template.
It allows for the analysis of Voronoi binned spectra.
:param binned_spectra: Binned spectra to analyze.
:type binned_spectra: `VoronoiBinnedSpectra` class
:param template: Template library to use for fitting.
:type template: `Template` class
:param start: Initial guess for the velocity and dispersion for each kinematic component.
:type start: list
:param background_template: Background spectra to fit, if any.
:type background_template: `Template` class, optional
:param kwargs_ppxf: Additional options for `ppxf`, check documentation of `ppxf.ppxf()`.
:type kwargs_ppxf: dict
:return: Velocity dispersions, velocity dispersion uncertainties, mean velocities, mean velocity uncertainties.
:rtype: tuple of np.ndarray
"""
# Number of spectra in the binned spectra
num_spectra = binned_spectra.flux.shape[1]
# Initialize lists to store the results
velocity_dispersions = []
velocity_dispersion_uncertainties = []
mean_velocities = []
mean_velocity_uncertainties = []
# Loop through each binned spectrum and run pPXF
for i in range(num_spectra):
ppxf_fit = cls.run_ppxf(
binned_spectra,
template,
start=start,
background_template=background_template,
spectra_indices=i,
**kwargs_ppxf,
)
# Append the results to the lists
velocity_dispersions.append(ppxf_fit.sol[1])
velocity_dispersion_uncertainties.append(ppxf_fit.error[1])
mean_velocities.append(ppxf_fit.sol[0])
mean_velocity_uncertainties.append(ppxf_fit.error[0])
# Convert the lists to numpy arrays and return
return (
np.array(velocity_dispersions),
np.array(velocity_dispersion_uncertainties),
np.array(mean_velocities),
np.array(mean_velocity_uncertainties),
)
[docs]
@staticmethod
def get_terms_in_bic(ppxf_fit, num_fixed_parameters=0, weight_threshold=0.01):
"""Get the k, n, and log_L terms needed to compute the BIC.
This method extracts the number of parameters (k), the number of data points (n), and the log-likelihood (log_L)
from a pPXF fit object, which are required to compute the Bayesian Information Criterion (BIC).
:param ppxf_fit: ppxf fit object.
:type ppxf_fit: ppxf.ppxf
:param num_fixed_parameters: Number of fixed parameters in `fixed` given to ppxf.
:type num_fixed_parameters: int
:param weight_threshold: Threshold for the weights. Default is 1% (0.01).
:type weight_threshold: float
:return: Number of parameters (k), number of data points (n), and log-likelihood (log_L).
:rtype: tuple of int, int, float
"""
# Number of good pixels used in the fit
n = len(ppxf_fit.goodpixels)
# Determine the number of templates used based on the weight threshold
if weight_threshold is not None:
num_templates = np.sum(
ppxf_fit.weights > weight_threshold * ppxf_fit.weights.sum()
)
else:
num_templates = len(ppxf_fit.weights)
# Calculate the number of linear parameters
k_linear = num_templates + ppxf_fit.degree + 1
if ppxf_fit.sky is not None:
k_linear += ppxf_fit.sky.shape[1]
# Calculate the number of non-linear parameters
if isinstance(ppxf_fit.sol[0], float):
k_non_linear = len(ppxf_fit.sol) + ppxf_fit.mdegree
else:
k_non_linear = np.sum([len(a) for a in ppxf_fit.sol]) + ppxf_fit.mdegree
# Total number of parameters
k = k_linear + k_non_linear - num_fixed_parameters
# Compute the residuals between the observed and best-fit spectra
residuals = ppxf_fit.galaxy - ppxf_fit.bestfit
# Create a mask for the good pixels
mask = np.zeros_like(residuals)
mask[ppxf_fit.goodpixels] = 1
residuals = residuals[mask == 1]
# Extract the covariance matrix for the good pixels
covariance = np.copy(ppxf_fit.original_noise)
if covariance.ndim == 1:
covariance = np.diag(covariance**2)
covariance = covariance[mask == 1][:, mask == 1]
# Compute the chi-squared value
chi2 = np.dot(residuals, np.dot(np.linalg.inv(covariance), residuals))
# Compute the log-likelihood
log_likelihood = -0.5 * chi2
return k, n, log_likelihood
[docs]
@classmethod
def get_bic(cls, ppxf_fit, num_fixed_parameters=0, weight_threshold=0.01):
"""Compute the Bayesian Information Criterion (BIC) for a given pPXF fit.
This method calculates the BIC for a pPXF fit object using the number of
parameters (k), the number of data points (n),
and the log-likelihood (log_L) extracted from the fit.
:param ppxf_fit: ppxf fit object.
:type ppxf_fit: ppxf.ppxf
:param num_fixed_parameters: Number of fixed parameters in `fixed` given to ppxf.
:type num_fixed_parameters: int
:param weight_threshold: Threshold for the weights. Default is 1% (0.01).
:type weight_threshold: float
:return: BIC value.
:rtype: float
"""
# Get the terms needed to compute the BIC
k, n, log_likelihood = cls.get_terms_in_bic(
ppxf_fit,
num_fixed_parameters=num_fixed_parameters,
weight_threshold=weight_threshold,
)
# Compute the BIC
bic = k * np.log(n) - 2 * log_likelihood
return bic
[docs]
@classmethod
def get_bic_from_sample(
cls, ppxf_fits, num_fixed_parameters=0, weight_threshold=0.01
):
"""Calculate the Bayesian Information Criterion (BIC) for a sample of pPXF fits.
This function follows the methodology provided by Knabel & Mozumdar et al.
(2025), arxiv.org/abs/2502.16034. It computes the BIC for each
pPXF fit in the sample and returns the total BIC.
:param ppxf_fits: List of pPXF fit objects.
:type ppxf_fits: list of ppxf.ppxf
:param num_fixed_parameters: Number of fixed parameters in `fixed` given to ppxf.
:type num_fixed_parameters: int
:param weight_threshold: Threshold for the weights. Default is 1% (0.01).
:type weight_threshold: float
:return: Total BIC for the sample.
:rtype: float
"""
k_total = 0
n_total = 0
log_likelihood_total = 0
# Loop through each pPXF fit and accumulate the terms needed for BIC calculation
for ppxf_fit in ppxf_fits:
k, n, log_likelihood = cls.get_terms_in_bic(
ppxf_fit,
num_fixed_parameters=num_fixed_parameters,
weight_threshold=weight_threshold,
)
k_total += k
n_total += n
log_likelihood_total += log_likelihood
# Calculate the total BIC for the sample
bic = k_total * np.log(n_total) - 2 * log_likelihood_total
return bic
[docs]
@classmethod
def get_relative_bic_weights_for_sample(
cls,
ppxf_fits_list,
num_fixed_parameters=0,
num_bootstrap_samples=1000,
weight_threshold=0.01,
):
"""Calculate the relative BIC weights for a given sample of pPXF fits.
This function follows the methodology provided by Knabel & Mozumdar et al.
(2025), arxiv.org/abs/2502.16034. It computes the BIC for each pPXF fit in the
sample, performs bootstrap sampling to estimate uncertainties, and calculates
the relative BIC weights.
:param ppxf_fits_list: 2D array containing pPXF fits for the sample of galaxies
or set of Voronoi bins with the dimension [number of models (or templates),
number of systems (or bins) in sample].
:type ppxf_fits_list: np.ndarray
:param num_fixed_parameters: The number of fixed parameters in the model.
:type num_fixed_parameters: int
:param num_bootstrap_samples: The number of bootstrap samples to use.
:type num_bootstrap_samples: int
:param weight_threshold: The threshold for the relative BIC weights. Default is
1% (0.01).
:type weight_threshold: float
:return: Relative BIC weights for the sample.
:rtype: np.ndarray
"""
bics = np.zeros(len(ppxf_fits_list))
weights = np.zeros_like(bics)
# Calculate the sample-level BIC for each model.
for i, ppxf_fits in enumerate(ppxf_fits_list):
bics[i] = cls.get_bic_from_sample(
ppxf_fits,
num_fixed_parameters=num_fixed_parameters,
weight_threshold=weight_threshold,
)
# Calculate the difference in BIC values relative to the minimum BIC
delta_bics = bics - np.min(bics)
# Perform bootstrap sampling to estimate ΔBIC uncertainties
bic_samples = np.zeros((num_bootstrap_samples, len(ppxf_fits_list)))
for i in range(num_bootstrap_samples):
indices = np.random.randint(
0, len(ppxf_fits_list[0]), len(ppxf_fits_list[0])
)
ppxf_fits_list_bootstrapped = ppxf_fits_list[:, indices]
for j in range(len(ppxf_fits_list)):
bic_samples[i, j] = Pipeline.get_bic_from_sample(
ppxf_fits_list_bootstrapped[j],
num_fixed_parameters=num_fixed_parameters,
weight_threshold=weight_threshold,
)
delta_bic_samples = bic_samples - np.min(bic_samples, axis=1)[:, np.newaxis]
# Calculate the standard deviation of the BIC samples to estimate uncertainties
delta_bics_uncertainty = np.std(delta_bic_samples, axis=0)
# Replace zeros in delta_bics_uncertainty with a small constant (1e-10)
# to avoid division by zero errors in subsequent calculations.
delta_bics_uncertainty[delta_bics_uncertainty == 0] = 1e-10
# Calculate the relative BIC weights for each pPXF fit
for i in range(len(bics)):
weights[i] = cls.calculate_weights_from_bic(
delta_bics[i], delta_bics_uncertainty[i]
)
return weights
[docs]
@classmethod
def combine_measurements_from_templates(
cls,
values,
uncertainties,
ppxf_fits_list,
apply_bic_weighting=True,
num_fixed_parameters=0,
num_bootstrap_samples=1000,
weight_threshold=0.01,
do_bessel_correction=True,
verbose=False,
):
"""Combine measurements using the relative BIC weights.
This function follows the methodology provided by Knabel & Mozumdar et al.
(2025), arxiv.org/abs/2502.16034. It combines the values and uncertainties from
multiple templates using relative BIC weights.
:param values: The values to combine, with shape [number of templates, number of
bins or systems], or just [number of templates].
:type values: np.ndarray
:param uncertainties: The uncertainties in the values.
:type uncertainties: np.ndarray
:param ppxf_fits_list: The list of pPXF fits.
:type ppxf_fits_list: np.ndarray
:param apply_bic_weighting: Whether to apply BIC weighting.
:type apply_bic_weighting: bool
:param num_fixed_parameters: The number of fixed parameters in the model.
:type num_fixed_parameters: int
:param num_bootstrap_samples: The number of bootstrap samples to use.
:type num_bootstrap_samples: int
:param weight_threshold: The threshold for the relative BIC weights. Default is
1% (0.01).
:type weight_threshold: float
:param do_bessel_correction: Whether to apply Bessel correction.
:type do_bessel_correction: bool
:param verbose: Whether to print the results.
:type verbose: bool
:return: The combined values, combined systematic uncertainty, combined
statistical uncertainty, and covariance matrix.
:rtype: tuple of np.ndarray
"""
# Calculate the relative BIC weights if apply_bic_weighting is True
if apply_bic_weighting:
weights = cls.get_relative_bic_weights_for_sample(
ppxf_fits_list,
num_fixed_parameters=num_fixed_parameters,
num_bootstrap_samples=num_bootstrap_samples,
weight_threshold=weight_threshold,
)
else:
weights = np.ones(len(ppxf_fits_list))
weights /= np.sum(weights) * len(weights)
if verbose:
print(f"BIC weighting {'' if apply_bic_weighting else 'not'} applied")
print("Weights:", weights / np.sum(weights))
# Combine the values and uncertainties using the calculated weights
(
combined_values,
combined_systematic_uncertainty,
combined_statistical_uncertainty,
covariance,
) = cls.combine_weighted(
values, uncertainties, weights, do_bessel_correction=do_bessel_correction
)
return (
combined_values,
combined_systematic_uncertainty,
combined_statistical_uncertainty,
covariance,
)
[docs]
@classmethod
def combine_weighted(
cls, values, uncertainties, weights, do_bessel_correction=True
):
"""Combine the values using the weights.
The weighted combination with Bessel correction is described in Knabel &
Mozumdar et al. (2025), arxiv.org/abs/2502.16034.
:param values: The values to combine.
:type values: np.ndarray
:param uncertainties: The uncertainties in the values.
:type uncertainties: np.ndarray
:param weights: The weights to use for the combination.
:type weights: np.ndarray
:param do_bessel_correction: Whether to apply Bessel correction.
:type do_bessel_correction: bool
:return: The combined values, combined systematic uncertainty, combined
statistical uncertainty, and covariance matrix.
:rtype: tuple of np.ndarray
"""
sum_w2 = np.sum(weights**2)
sum_w = np.sum(weights)
w = weights[:, np.newaxis]
# Ensure values and uncertainties have the correct dimensions
if values.ndim == 1:
values = values[:, np.newaxis]
uncertainties = uncertainties[:, np.newaxis]
# Calculate the combined values using the weights
combined_values = np.sum(w * values, axis=0) / sum_w
# Calculate the denominator for the systematic uncertainty
if do_bessel_correction:
denominator = sum_w - sum_w2 / sum_w
else:
denominator = sum_w
# Calculate the combined systematic uncertainty
combined_systematic_uncertainty = np.sqrt(
np.sum(w * (values - combined_values[np.newaxis, :]) ** 2, axis=0)
/ denominator
)
# Calculate the combined statistical uncertainty
combined_statistical_uncertainty = np.sqrt(
np.sum(w * uncertainties**2, axis=0) / sum_w
)
# Calculate the covariance matrix if there are multiple values
if values.shape[1] > 1:
covariance = np.zeros((len(combined_values), len(combined_values)))
for i in range(covariance.shape[0]):
for j in range(covariance.shape[0]):
covariance[i, j] = (
np.sum(
weights
* (values[:, i] - combined_values[i])
* (values[:, j] - combined_values[j])
)
/ denominator
)
if i == j:
covariance[i, j] += combined_statistical_uncertainty[i] ** 2
else:
covariance = None
return (
combined_values,
combined_systematic_uncertainty,
combined_statistical_uncertainty,
covariance,
)
[docs]
@staticmethod
def calculate_weights_from_bic(delta_bic, sigma_delta_bic):
"""Calculate the relative BIC weight after accounting for the uncertainty.
This function follows the methodology provided by Knabel & Mozumdar et al.
(2025), arxiv.org/abs/2502.16034.
:param delta_bic: The difference in BIC values between the model and the best
model.
:type delta_bic: float
:param sigma_delta_bic: The uncertainty in the delta_BIC value.
:type sigma_delta_bic: float
:return: The relative weight of the model.
:rtype: float
"""
# Calculate the integrals and exponential factor for the weight calculation
integral_1 = ndtr(-delta_bic / sigma_delta_bic)
integral_2 = ndtr(delta_bic / sigma_delta_bic - sigma_delta_bic / 2)
exp_factor = (sigma_delta_bic**2 / 8) - (delta_bic / 2)
# Calculate the second integral multiplied by the exponential factor
integral2_multiplied = (
integral_2 # Default to zero if integral_2 is not positive
)
if integral_2 > 0:
integral2_multiplied = np.exp(exp_factor + np.log(integral_2))
# Calculate the relative weight of the model
weight = integral_1 + integral2_multiplied
return weight
[docs]
@staticmethod
def boost_noise(spectra, boost_factor, boosting_mask=None):
"""Boost the noise in the spectra.
This function increases the noise in the spectra by a specified boost factor. It can
optionally apply the boosting to a specific mask.
:param spectra: The spectra to boost the noise in.
:type spectra: `Spectra` or a child class
:param boost_factor: The factor to boost the noise by.
:type boost_factor: float
:param boosting_mask: The mask to apply the boosting to.
:type boosting_mask: np.ndarray
:return: The spectra with the boosted noise.
:rtype: `Spectra` or a child class
"""
# Create a deep copy of the spectra to avoid modifying the original object
noise_boosted_spectra = deepcopy(spectra)
if boosting_mask is None:
boosting_mask = np.ones_like(spectra.wavelengths, dtype=bool)
# Boost the noise in the spectra
if noise_boosted_spectra.noise is not None:
noise_boosted_spectra.noise[boosting_mask] *= boost_factor
# Boost the covariance in the spectra if it exists
if noise_boosted_spectra.covariance is not None:
noise_boosted_spectra.covariance[boosting_mask] *= boost_factor
noise_boosted_spectra.covariance[:, boosting_mask] *= boost_factor
return noise_boosted_spectra