Source code for squirrel.data

"""This module contains the class to store data products in 3D datacube or 2D detector
image."""

from copy import deepcopy
import numpy as np


[docs] class Spectra(object): """A class to store spectroscopic data.""" def __init__( self, wavelengths, flux, wavelength_unit, fwhm, z_lens=0.0, z_source=0.0, flux_unit="arbitrary", noise=None, covariance=None, ): """Initialize the Spectra object with the given parameters. :param wavelengths: wavelengths of the spectra in observer frame :type wavelengths: numpy.ndarray :param flux: flux of the data :type flux: numpy.ndarray :param wavelength_unit: unit of the wavelengths :type wavelength_unit: str :param fwhm: full width at half maximum of the data. Needs to be in the same unit as the wavelengths :type fwhm: float :param z_lens: lens redshift :type z_lens: float :param z_source: source redshift :type z_source: float :param flux_unit: unit of the flux :type flux_unit: str :param noise: noise of the data :type noise: numpy.ndarray :param covariance: covariance of the data :type covariance: numpy.ndarray """ self._wavelengths = deepcopy(wavelengths) self._original_wavelengths = wavelengths self._flux = deepcopy(flux) self._original_flux = flux self._flux_unit = flux_unit self._wavelength_unit = wavelength_unit self._spectra_modifications = [] self._wavelengths_frame = "observed" self._z_lens = z_lens self._z_source = z_source self._fwhm = deepcopy(fwhm) self._original_fwhm = fwhm self._noise = deepcopy(noise) self._original_noise = noise self._covariance = deepcopy(covariance) self._original_covariance = covariance self._velocity_scale = None @property def flux(self): """Return the flux of the data.""" if hasattr(self, "_flux"): return self._flux @flux.setter def flux(self, flux): """Set the flux of the data.""" self._flux = flux @property def wavelengths(self): """Return the wavelengths of the data.""" if hasattr(self, "_wavelengths"): return self._wavelengths @wavelengths.setter def wavelengths(self, wavelengths): """Set the wavelengths of the data.""" self._wavelengths = wavelengths @property def wavelengths_frame(self): """Return the frame of the wavelengths. Possible frames are 'observed', 'lens', 'source', or 'z={`redshift`}'. """ if hasattr(self, "_wavelengths_frame"): return self._wavelengths_frame @wavelengths_frame.setter def wavelengths_frame(self, frame): """Set the frame of the wavelengths.""" self._wavelengths_frame = frame @property def fwhm(self): """Return the full width at half maximum of the data.""" if hasattr(self, "_fwhm"): return self._fwhm @property def flux_unit(self): """Return the unit of the flux.""" if hasattr(self, "_flux_unit"): return self._flux_unit @property def spectra_modifications(self): """Return the state of the spectra. Possible states are 'original', 'rebinned', or 'log_rebinned'. """ if hasattr(self, "_spectra_modifications"): return self._spectra_modifications @spectra_modifications.setter def spectra_modifications(self, state): """Set the state of the spectra.""" self._spectra_modifications = state @property def wavelength_unit(self): """Return the unit of the wavelengths.""" if hasattr(self, "_wavelength_unit"): return self._wavelength_unit @property def noise(self): """Return the noise of the data.""" if hasattr(self, "_noise"): return self._noise @noise.setter def noise(self, noise): """Set the noise of the data.""" self._noise = noise @property def covariance(self): """Return the covariance of the data.""" if hasattr(self, "_covariance"): return self._covariance @covariance.setter def covariance(self, covariance): """Set the covariance of the data.""" self._covariance = covariance @property def z_lens(self): """Return the lens redshift.""" if hasattr(self, "_z_lens"): return self._z_lens @property def z_source(self): """Return the source redshift.""" if hasattr(self, "_z_source"): return self._z_source @property def velocity_scale(self): """Return the velocity scale of the data.""" if hasattr(self, "_velocity_scale"): return self._velocity_scale @velocity_scale.setter def velocity_scale(self, velocity_scale): """Set the velocity scale of the data.""" self._velocity_scale = velocity_scale
[docs] def deredshift(self, redshift=None, target_frame=None): """Deredshift the spectra. :param redshift: redshift to deredshift the data to :type redshift: float :param target_frame: frame to deredshift the data to, "lens" or "source" :type target_frame: str """ if redshift is None: if target_frame == "lens": redshift = self.z_lens self._wavelengths_frame = "lens frame" elif target_frame == "source": redshift = self.z_source self._wavelengths_frame = "source frame" else: raise ValueError( "If redshift is not provided, frame must be either 'lens' or 'source'" ) else: self._wavelengths_frame = f"z={redshift:.3f}" # Adjust wavelengths and FWHM by the redshift factor self._wavelengths = self._wavelengths / (1.0 + redshift) self._fwhm = self._fwhm / (1.0 + redshift)
[docs] def reset(self): """Reset the data to the original state.""" self._wavelengths = deepcopy(self._original_wavelengths) self._flux = deepcopy(self._original_flux) self._noise = deepcopy(self._original_noise) self._covariance = deepcopy(self._original_covariance) self._fwhm = deepcopy(self._original_fwhm) self._spectra_modifications = [] self._wavelengths_frame = "observed" self._velocity_scale = None
[docs] def clip(self, wavelength_min, wavelength_max): """Clip the data to the given wavelength range. :param wavelength_min: minimum wavelength to clip :type wavelength_min: float :param wavelength_max: maximum wavelength to clip :type wavelength_max: float """ mask = (self._wavelengths >= wavelength_min) & ( self._wavelengths <= wavelength_max ) # Apply mask to wavelengths, flux, noise, and covariance self._wavelengths = self._wavelengths[mask] self._flux = self._flux[mask, ...] if self._noise is not None: self._noise = self._noise[mask, ...] if self._covariance is not None: # The covariance slicing is split into two steps to ensure proper masking # of both the first and second dimensions independently. This approach # is necessary because the covariance matrix is 3D, and applying the mask # to both dimensions simultaneously is not directly supported. self._covariance = self._covariance[mask, :, ...] self._covariance = self._covariance[:, mask, ...] self._spectra_modifications += ["clipped"]
def _add(self, other, destination): """Add two spectra together. :param destination: destination spectra :type destination: Spectra :param other: spectra to add :type other: Spectra """ assert np.all(self.wavelengths == other.wavelengths), "Wavelengths must match." assert self.flux.shape == other.flux.shape, "Spectra must have the same shape." # Add flux and noise, and handle covariance if present destination.flux = self.flux + other.flux destination.noise = None destination.covariance = None if self.noise is not None and other.noise is not None: destination.noise = np.sqrt(self.noise**2 + other.noise**2) if self.covariance is not None and other.covariance is not None: destination.covariance = self.covariance + other.covariance def __add__(self, other): """Add two spectra together. :param other: spectra to add :type other: Spectra :return: sum of the two spectra :rtype: Spectra """ spectra = deepcopy(self) self._add(other, spectra) return spectra def __iadd__(self, other): """Add two spectra together in place. :param other: spectra to add :type other: Spectra :return: sum of the two spectra :rtype: Spectra """ self._add(other, self) return self def _concat(self, other, destination): """Concatenate two spectra together. :param other: spectra to concatenate :type other: Spectra :param destination: destination spectra :type destination: Spectra """ assert ( self.wavelength_unit == other.wavelength_unit ), "Wavelength units must match." assert self.fwhm == other.fwhm, "FWHM must match." assert ( self.wavelengths[-1] < other.wavelengths[0] ), "Wavelengths must be increasing without overlap." assert ( self.spectra_modifications == other.spectra_modifications ), "Spectra modifications must match." # Concatenate wavelengths, flux, noise, and covariance destination.wavelengths = np.concatenate((self.wavelengths, other.wavelengths)) destination.flux = np.concatenate((self.flux, other.flux), axis=0) destination.noise = None destination.covariance = None if self.noise is not None and other.noise is not None: destination.noise = np.concatenate((self.noise, other.noise), axis=0) if self.covariance is not None and other.covariance is not None: destination.covariance = np.zeros( ( self.covariance.shape[0] + other.covariance.shape[0], self.covariance.shape[1] + other.covariance.shape[1], ) ) destination.covariance[ : self.covariance.shape[0], : self.covariance.shape[1] ] = self.covariance destination.covariance[ self.covariance.shape[0] :, self.covariance.shape[1] : ] = other.covariance
[docs] def concat(self, other): """Concatenate two spectra together. :param other: spectra to concatenate :type other: Spectra :return: concatenated spectra :rtype: Spectra """ spectra = deepcopy(self) self._concat(other, spectra) return spectra
def __and__(self, other): """Concatenate two spectra together. :param other: spectra to concatenate :type other: Spectra :return: concatenated spectra :rtype: Spectra """ return self.concat(other) def __iand__(self, other): """Concatenate two spectra together in place. :param other: spectra to concatenate :type other: Spectra :return: concatenated spectra :rtype: Spectra """ self._concat(other, self) return self
[docs] def copy(self): """Return a deep copy of the spectra object.""" return deepcopy(self)
[docs] class Datacube(Spectra): """A class to store in 3D IFU datacubes.""" def __init__( self, wavelengths, flux, wavelength_unit, fwhm, z_lens, z_source, center_pixel_x, center_pixel_y, coordinate_transform_matrix, flux_unit="arbitrary", noise=None, covariance=None, ): """Initialize the Datacube object with the given parameters. :param wavelengths: wavelengths of the data, must match the wavelength dimension of the provided datacube flux :type wavelengths: numpy.ndarray :param flux: flux of the data :type flux: numpy.ndarray :param wavelength_unit: unit of the wavelengths :type wavelength_unit: str :param fwhm: full width at half maximum of the data. Needs to be in the same unit as the wavelengths. :type fwhm: float :param z_lens: lens redshift :type z_lens: float :param z_source: source redshift :type z_source: float :param center_pixel_x: x coordinate of the center pixel :type center_pixel_x: int :param center_pixel_y: y coordinate of the center pixel :type center_pixel_y: int :param coordinate_transform_matrix: matrix to transform coordinates :type coordinate_transform_matrix: numpy.ndarray :param flux_unit: unit of the flux :type flux_unit: str :param noise: noise of the data :type noise: numpy.ndarray :param covariance: covariance of the data :type covariance: numpy.ndarray """ super(Datacube, self).__init__( wavelengths=wavelengths, flux=flux, wavelength_unit=wavelength_unit, fwhm=fwhm, z_lens=z_lens, z_source=z_source, flux_unit=flux_unit, noise=noise, covariance=covariance, ) self._center_pixel_x = center_pixel_x self._center_pixel_y = center_pixel_y # Calculate pixel coordinates relative to the center pixel x_pixels = np.arange(self._flux.shape[2]) - self._center_pixel_x y_pixels = np.arange(self._flux.shape[1]) - self._center_pixel_y # Create a meshgrid of pixel coordinates xx_pixels, yy_pixels = np.meshgrid(x_pixels, y_pixels) # Transform coordinates using the provided matrix transformed_coordinates = np.dot( coordinate_transform_matrix, np.array([xx_pixels.flatten(), yy_pixels.flatten()]), ) self._x_coordinates = transformed_coordinates[0].reshape(self._flux.shape[1:]) self._y_coordinates = transformed_coordinates[1].reshape(self._flux.shape[1:]) @property def center_pixel_x(self): """Return the x coordinate of the center pixel.""" if hasattr(self, "_center_pixel_x"): return self._center_pixel_x @property def center_pixel_y(self): """Return the y coordinate of the center pixel.""" if hasattr(self, "_center_pixel_y"): return self._center_pixel_y @property def x_coordinates(self): """Return the x coordinates of the data.""" if hasattr(self, "_x_coordinates"): return self._x_coordinates @property def y_coordinates(self): """Return the y coordinates of the data.""" if hasattr(self, "_y_coordinates"): return self._y_coordinates
[docs] def get_1d_spectra(self, x=None, y=None, mask=None): """Return the spectra at a given pixel, or summed within a given spatial mask (over spaxels). If nothing is provided, all the spaxels in the datacube will be summed together. :param x: x coordinate of the pixel :type x: int :param y: y coordinate of the pixel :type y: int :param mask: mask to sum the spectra :type mask: numpy.ndarray :return: spectrum at the given pixel :rtype: Spectra """ noise = None covariance = None if mask is not None: # Sum flux and noise within the mask flux = np.nansum(self.flux * mask[None, :, :], axis=(1, 2)) if self.noise is not None: noise = np.sqrt( np.nansum(self.noise**2 * mask[None, :, :], axis=(1, 2)) ) if self.covariance is not None: covariance = np.nansum( self.covariance * mask[None, None, :, :], axis=(2, 3) ) elif x is not None and y is not None: # Get flux and noise at the specified pixel flux = self.flux[:, y, x] if self.noise is not None: noise = self.noise[:, y, x] if self.covariance is not None: covariance = self.covariance[:, :, y, x] elif (x is None and y is not None) or (x is not None and y is None): raise ValueError("Both x and y must be provided if one of them is.") else: # Sum flux and noise over the entire datacube flux = np.nansum(self.flux, axis=(1, 2)) if self.noise is not None: noise = np.sqrt(np.nansum(self.noise**2, axis=(1, 2))) if self.covariance is not None: covariance = np.nansum(self.covariance, axis=(2, 3)) # Create a new Spectra object with the summed or selected data spectra = Spectra( wavelengths=self.wavelengths, flux=flux, wavelength_unit=self.wavelength_unit, fwhm=self.fwhm, z_lens=self.z_lens, z_source=self.z_source, flux_unit=self.flux_unit, noise=noise, covariance=covariance, ) # Copy the state of the spectra spectra.wavelengths_frame = deepcopy(self.wavelengths_frame) spectra.spectra_modifications = deepcopy(self.spectra_modifications) spectra.velocity_scale = deepcopy(self.velocity_scale) return spectra
[docs] class VoronoiBinnedSpectra(Spectra): """A class to store binned spectra using Voronoi binning. This class extends the Spectra class to handle data that has been binned using Voronoi binning. It includes additional attributes to store the coordinates of the original datacube's spatial pixels, the bin numbers, and the coordinates of the bin centers, among other properties. """ def __init__( self, wavelengths, flux, wavelength_unit, fwhm, z_lens, z_source, x_coordinates, y_coordinates, num_bins, x_pixel_index_of_bins, y_pixel_index_of_bins, flux_unit="arbitrary", noise=None, covariance=None, bin_center_x=None, bin_center_y=None, area=None, snr=None, ): """Initialize the VoronoiBinnedSpectra object with the given parameters. :param wavelengths: wavelengths of the data :type wavelengths: numpy.ndarray :param flux: flux of the data :type flux: numpy.ndarray :param wavelength_unit: unit of the wavelengths :type wavelength_unit: str :param fwhm: full width at half maximum of the data. Needs to be in the same unit as the wavelengths. :type fwhm: float :param z_lens: lens redshift :type z_lens: float :param z_source: source redshift :type z_source: float :param x_coordinates: x coordinates of the original datacube's spatial pixels (2D) :type x_coordinates: numpy.ndarray :param y_coordinates: y coordinates of the original datacube's spatial pixels (2D) :type y_coordinates: numpy.ndarray :param num_bins: number of bins :type num_bins: int :param x_pixel_index_of_bins: pixel_x values corresponding to `bin_numbers` :type x_pixel_index_of_bins: numpy.ndarray :param y_pixel_index_of_bins: pixel_y values corresponding to `bin_numbers` :type y_pixel_index_of_bins: numpy.ndarray :param flux_unit: unit of the flux :type flux_unit: str :param noise: noise of the data :type noise: numpy.ndarray :param covariance: covariance of the data :type covariance: numpy.ndarray :param bin_center_x: x coordinates of the bin centers :type bin_center_x: numpy.ndarray :param bin_center_y: y coordinates of the bin centers :type bin_center_y: numpy.ndarray :param area: area of the bins :type area: numpy.ndarray :param snr: signal-to-noise ratio of the bins :type snr: numpy.ndarray """ # Initialize the parent Spectra class super(VoronoiBinnedSpectra, self).__init__( wavelengths=wavelengths, flux=flux, wavelength_unit=wavelength_unit, fwhm=fwhm, z_lens=z_lens, z_source=z_source, flux_unit=flux_unit, noise=noise, covariance=covariance, ) # Store additional attributes specific to Voronoi binned spectra self._x_coordinates = x_coordinates self._y_coordinates = y_coordinates self._bin_numbers = num_bins self._x_pixels_of_bins = x_pixel_index_of_bins self._y_pixels_of_bins = y_pixel_index_of_bins self._bin_center_x = bin_center_x self._bin_center_y = bin_center_y self._area = area self._snr = snr @property def x_coordinates(self): """Return the x coordinates of the data.""" if hasattr(self, "_x_coordinates"): return self._x_coordinates @property def y_coordinates(self): """Return the y coordinates of the data.""" if hasattr(self, "_y_coordinates"): return self._y_coordinates @property def bin_numbers(self): """Return the bin number of the data.""" if hasattr(self, "_bin_numbers"): return self._bin_numbers @property def x_pixels_of_bins(self): """Return the x pixel values corresponding to the bin numbers.""" if hasattr(self, "_x_pixels_of_bins"): return self._x_pixels_of_bins @property def y_pixels_of_bins(self): """Return the y pixel values corresponding to the bin numbers.""" if hasattr(self, "_y_pixels_of_bins"): return self._y_pixels_of_bins @property def bin_center_x(self): """Return the x coordinates of the bin centers.""" if hasattr(self, "_bin_center_x"): return self._bin_center_x @property def bin_center_y(self): """Return the y coordinates of the bin centers.""" if hasattr(self, "_bin_center_y"): return self._bin_center_y @property def area(self): """Return the area of the bins.""" if hasattr(self, "_area"): return self._area @property def snr(self): """Return the signal-to-noise ratio of the bins.""" if hasattr(self, "_snr"): return self._snr
[docs] def get_spaxel_map_with_bin_number(self): """Return Voronoi bin mapping. -1 is masked pixel. Unmasked pixel start counting from 0. :return: 2D array with bin mapping :rtype: numpy.ndarray """ bin_numbers = self.bin_numbers # Initialize the mapping array with -1 (masked pixels) mapping = np.zeros(self.x_coordinates.shape, dtype=int) - 1 x_pixels = self.x_pixels_of_bins y_pixels = self.y_pixels_of_bins # Assign bin numbers to the corresponding pixels for v, x, y in zip(bin_numbers, x_pixels, y_pixels): mapping[int(y)][int(x)] = v return mapping
[docs] def get_single_spectra(self, bin_index): """Return the spectra of a single bin. :param bin_index: bin number :type bin_index: int :return: spectra of the bin :rtype: numpy.ndarray """ # Retrieve noise and covariance for the specified bin, if available if self.noise is not None: noise = self.noise[:, bin_index] else: noise = None if self.covariance is not None: covariance = self.covariance[:, :, bin_index] else: covariance = None # Create a new Spectra object for the specified bin spectra = Spectra( wavelengths=self.wavelengths, flux=self.flux[:, bin_index], wavelength_unit=self.wavelength_unit, fwhm=self.fwhm, z_lens=self.z_lens, z_source=self.z_source, flux_unit=self.flux_unit, noise=noise, covariance=covariance, ) # Copy the state of the spectra spectra.spectra_modifications = deepcopy(self.spectra_modifications) spectra.velocity_scale = deepcopy(self.velocity_scale) return spectra
[docs] class PowerBinnedSpectra(VoronoiBinnedSpectra): """A class to store binned spectra using power binning. This class is functionally identical to `VoronoiBinnedSpectra`. This class extends the Spectra class to handle data that has been binned using power-law binning. It includes additional attributes to store the coordinates of the original datacube's spatial pixels, the bin numbers, and the coordinates of the bin centers, among other properties. """ pass
[docs] class RadiallyBinnedSpectra(Spectra): """A class to store radially binned spectra. This class extends the Spectra class to handle data that has been binned radially. It includes an additional attribute to store the radial edges of the bins. """ def __init__( self, wavelengths, flux, wavelength_unit, fwhm, z_lens, z_source, bin_radii, flux_unit="arbitrary", noise=None, covariance=None, ): """Initialize the RadiallyBinnedSpectra object with the given parameters. :param wavelengths: wavelengths of the data :type wavelengths: numpy.ndarray :param flux: flux of the data :type flux: numpy.ndarray :param wavelength_unit: unit of the wavelengths :type wavelength_unit: str :param fwhm: full width at half maximum of the data. Needs to be in the same unit as the wavelengths. :type fwhm: float :param z_lens: lens redshift :type z_lens: float :param z_source: source redshift :type z_source: float :param bin_radii: radial edges of the bins, starting with the inner edge of the first bin. The first value should be zero if the first bin is a circle. :type bin_radii: numpy.ndarray :param flux_unit: unit of the flux :type flux_unit: str :param noise: noise of the data :type noise: numpy.ndarray :param covariance: covariance of the data :type covariance: numpy.ndarray """ # Ensure the number of bins matches the number of spectra assert ( len(bin_radii) - 1 == flux.shape[1] ), "Number of bins must match the number of spectra." # Initialize the parent Spectra class super(RadiallyBinnedSpectra, self).__init__( wavelengths=wavelengths, flux=flux, wavelength_unit=wavelength_unit, fwhm=fwhm, z_lens=z_lens, z_source=z_source, flux_unit=flux_unit, noise=noise, covariance=covariance, ) # Store the radial edges of the bins self._bin_radii = bin_radii @property def bin_radii(self): """Return the radii of the bins.""" if hasattr(self, "_bin_radii"): return self._bin_radii