From 0f85d34018dc8d8567469fd9b11e49ebf5a92a26 Mon Sep 17 00:00:00 2001 From: Haris Katsikogiannis Date: Thu, 14 Aug 2025 02:43:41 +0300 Subject: [PATCH] Add error propagation estimates for BoxcarExtract and HorneExtract --- .../horne_uncertainty_testing.ipynb | 326 ++++++++++++++++++ specreduce/extract.py | 298 +++++++++++++++- specreduce/tests/test_boxcar_error_prop.py | 236 +++++++++++++ ...fit_gaussian_spatial_profile_error_prop.py | 157 +++++++++ specreduce/tests/test_horne_error_prop.py | 213 ++++++++++++ 5 files changed, 1217 insertions(+), 13 deletions(-) create mode 100644 notebook_sandbox/horne_extract/horne_uncertainty_testing.ipynb create mode 100644 specreduce/tests/test_boxcar_error_prop.py create mode 100644 specreduce/tests/test_fit_gaussian_spatial_profile_error_prop.py create mode 100644 specreduce/tests/test_horne_error_prop.py diff --git a/notebook_sandbox/horne_extract/horne_uncertainty_testing.ipynb b/notebook_sandbox/horne_extract/horne_uncertainty_testing.ipynb new file mode 100644 index 00000000..bbcde2fd --- /dev/null +++ b/notebook_sandbox/horne_extract/horne_uncertainty_testing.ipynb @@ -0,0 +1,326 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Horne Extraction - Toy Tests\n", + "\n", + "This notebook recreates some simple tests to run for `HorneExtract`.\n", + "Each section builds a synthetic 2‑D spectrum and checks how uncertainties are propagated under different conditions.\n", + "\n", + "**Key ideas:**\n", + "- The input is a 2‑D long‑slit spectrum (spatial x spectral).\n", + "- We supply per‑pixel variance via `NDData.uncertainty` in a few different flavors.\n", + "- `HorneExtract` uses a spatial profile and inverse‑variance weighting: \n", + " $F(\\lambda)=\\frac{\\sum P\\,D/\\sigma^2}{\\sum P^2/\\sigma^2}$ and $\\mathrm{Var}[F]=\\frac{1}{\\sum P^2/\\sigma^2}$.\n", + "\n", + "You should be able to run each cell top‑to‑bottom." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "b1e8f4f8", + "metadata": {}, + "outputs": [], + "source": [ + "import numpy as np\n", + "import astropy.units as u\n", + "from astropy.nddata import NDData, VarianceUncertainty, StdDevUncertainty\n", + "\n", + "from specreduce.extract import HorneExtract\n", + "from specreduce.tracing import FlatTrace" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Global synthetic image settings\n", + "ny, nx = 20, 60 # spatial, spectral\n", + "true_flux = 100.0 # DN per pixel\n", + "sigma_per_pixel = 5.0 # DN stddev per pixel\n", + "rng = np.random.default_rng(42) # reproducible noise\n", + "\n", + "def make_base_data(ny=ny, nx=nx, mu=true_flux, sigma=sigma_per_pixel):\n", + " return mu + rng.normal(0, sigma, size=(ny, nx))\n", + "\n", + "def set_spectral_axis(image, ndisp=nx):\n", + " # Specutils wants a spectral axis that matches a flux axis.\n", + " image.spectral_axis = np.arange(ndisp) * u.pix\n", + " return image\n", + "\n", + "def run_case(title, image, trace_pos=None, profile=\"gaussian\"):\n", + " print(f\"=== {title} ===\")\n", + " # Provide a spectral axis on the NDData\n", + " set_spectral_axis(image, ndisp=image.data.shape[1])\n", + "\n", + " # Choose a flat, centered trace unless requested otherwise\n", + " if trace_pos is None:\n", + " trace_pos = image.data.shape[0] // 2\n", + " trace = FlatTrace(image, trace_pos=trace_pos)\n", + "\n", + " # Build extractor and run\n", + " extractor = HorneExtract(\n", + " image=image,\n", + " trace_object=trace,\n", + " spatial_profile=profile,\n", + " disp_axis=1,\n", + " crossdisp_axis=0,\n", + " )\n", + " try:\n", + " spec = extractor()\n", + " arr = np.asarray(spec.uncertainty.array) if getattr(spec, 'uncertainty', None) is not None else None\n", + " print(\"flux shape:\", spec.flux.shape)\n", + " if arr is None:\n", + " print(\"No uncertainty returned\")\n", + " else:\n", + " print(\"first 5 uncertainties:\", arr[:5])\n", + " print(\"finite, positive:\", np.isfinite(arr).all() and np.all(arr > 0))\n", + " except Exception as e:\n", + " print(\"ERROR:\", e)\n", + " print()\n" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Case A - VarianceUncertainty (correct units)\n", + "We pass per‑pixel variance as `VarianceUncertainty` with units of `DN^2`. \n", + "Expected: uncertainties are finite and roughly constant across wavelength for stationary noise." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "dataA = make_base_data()\n", + "var_unc = VarianceUncertainty(np.full((ny, nx), sigma_per_pixel**2) * u.DN**2)\n", + "imgA = NDData(data=dataA * u.DN, uncertainty=var_unc)\n", + "run_case(\"A: VarianceUncertainty (correct units)\", imgA)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Case B - StdDevUncertainty (correct units)\n", + "We pass standard deviation with units (`DN`). Internally it will be squared to variance. \n", + "Expected: same result as Case A." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "dataB = make_base_data()\n", + "std_unc = StdDevUncertainty(np.full((ny, nx), sigma_per_pixel) * u.DN)\n", + "imgB = NDData(data=dataB * u.DN, uncertainty=std_unc)\n", + "run_case(\"B: StdDevUncertainty (correct units)\", imgB)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Case C - StdDevUncertainty (no units)\n", + "We pass a unitless stddev array; the parser will assume it's in the same units as the data. \n", + "Expected: matches Case B." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "dataC = make_base_data()\n", + "std_unc_nounits = StdDevUncertainty(np.full((ny, nx), sigma_per_pixel))\n", + "imgC = NDData(data=dataC * u.DN, uncertainty=std_unc_nounits)\n", + "run_case(\"C: StdDevUncertainty (no units)\", imgC)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Case D - No uncertainty\n", + "We omit uncertainties; `HorneExtract` requires them. This should raise an error." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "dataD = make_base_data()\n", + "imgD = NDData(data=dataD * u.DN)\n", + "run_case(\"D: No uncertainty\", imgD)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Case E - Negative variance\n", + "Construct a variance array with a negative element, should be rejected." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "dataE = make_base_data()\n", + "v = np.full((ny, nx), sigma_per_pixel**2)\n", + "v[0,0] = -1.0\n", + "var_unc_bad = VarianceUncertainty(v * u.DN**2)\n", + "imgE = NDData(data=dataE * u.DN, uncertainty=var_unc_bad)\n", + "run_case(\"E: Negative variance\", imgE)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Case F - Zero variance everywhere\n", + "All zeros are treated as an unweighted case inside the implementation (variances replaced so extraction can proceed)." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "dataF = make_base_data()\n", + "var_unc_zero = VarianceUncertainty(np.zeros((ny, nx)) * u.DN**2)\n", + "imgF = NDData(data=dataF * u.DN, uncertainty=var_unc_zero)\n", + "run_case(\"F: Zero variance everywhere\", imgF)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Case G - Some zero variances\n", + "Only the first spatial row has zero variance; those pixels are masked out for the variance calculation.\n", + "Expected: finite uncertainties; first elements may differ slightly." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "dataG = make_base_data()\n", + "v = np.full((ny, nx), sigma_per_pixel**2)\n", + "v[0, :] = 0.0\n", + "var_unc_mixed = VarianceUncertainty(v * u.DN**2)\n", + "imgG = NDData(data=dataG * u.DN, uncertainty=var_unc_mixed)\n", + "run_case(\"G: Some zero variances\", imgG)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Case H/I - Trace near detector edges\n", + "We move the flat trace near the top/bottom rows to simulate partial apertures hitting the edge.\n", + "Expected: still finite uncertainties (the spatial profile normalization handles partial coverage)." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "dataH = make_base_data()\n", + "var_unc_H = VarianceUncertainty(np.full((ny, nx), sigma_per_pixel**2) * u.DN**2)\n", + "imgH = NDData(data=dataH * u.DN, uncertainty=var_unc_H)\n", + "run_case(\"H: Trace near top edge\", imgH, trace_pos=1)\n", + "\n", + "dataI = make_base_data()\n", + "var_unc_I = VarianceUncertainty(np.full((ny, nx), sigma_per_pixel**2) * u.DN**2)\n", + "imgI = NDData(data=dataI * u.DN, uncertainty=var_unc_I)\n", + "run_case(\"I: Trace near bottom edge\", imgI, trace_pos=ny-2)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Case J - Fully masked input\n", + "We feed an image where all pixels are NaN; the input parser should reject it as fully masked." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "dataJ = np.full((ny, nx), np.nan)\n", + "var_unc_J = VarianceUncertainty(np.full((ny, nx), sigma_per_pixel**2) * u.DN**2)\n", + "imgJ = NDData(data=dataJ * u.DN, uncertainty=var_unc_J)\n", + "run_case(\"J: Fully masked input\", imgJ)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Case K - Zero profile sumP in a column\n", + "We force one spectral column to be entirely NaN, so the spatial profile normalization `sumP` would be zero there.\n", + "Implementation guards against division by zero by replacing `sumP==0` with 1.\n", + "Expected: uncertainties remain finite." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "dataK = make_base_data()\n", + "dataK[:, nx//2] = np.nan # one dead spectral column\n", + "var_unc_K = VarianceUncertainty(np.full((ny, nx), sigma_per_pixel**2) * u.DN**2)\n", + "imgK = NDData(data=dataK * u.DN, uncertainty=var_unc_K)\n", + "run_case(\"K: Zero profile sumP\", imgK)" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "testing", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.12.11" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/specreduce/extract.py b/specreduce/extract.py index 9db444f9..ca246789 100644 --- a/specreduce/extract.py +++ b/specreduce/extract.py @@ -6,7 +6,7 @@ import numpy as np from astropy import units as u from astropy.modeling import Model, models, fitting -from astropy.nddata import NDData, VarianceUncertainty +from astropy.nddata import NDData, VarianceUncertainty, StdDevUncertainty from numpy import ndarray from scipy.integrate import trapezoid from scipy.interpolate import RectBivariateSpline @@ -194,6 +194,139 @@ class BoxcarExtract(SpecreduceOperation): def spectrum(self): return self.__call__() + def _variance2d_from_image(self, image): + """ + Extract a 2-D variance array from the input image, if available. + + This method attempts to read per-pixel variance information from the + ``image`` object in units of ``image.unit**2``. The following sources + are checked in order of precedence: + + 1. ``image.uncertainty``: + - If it is an ``astropy.nddata.StdDevUncertainty``, the quantity is + squared to obtain variance. + - If it has a ``quantity`` attribute, that is squared. + - If it has an ``array`` attribute, it is interpreted as standard + deviation in the same units as ``image.unit`` and squared. + 2. ``image.variance``: + - If it has a unit, it is converted to ``image.unit**2``. + - Otherwise it is assumed to be in the same units as ``image.unit`` + squared. + + Returns + ------- + var2d : `~astropy.units.Quantity` or None + 2-D array of variance values with shape matching ``image.data`` and + units of ``image.unit**2``. Returns ``None`` if no variance or + uncertainty information can be found. + + Raises + ------ + ValueError + If a variance array is found but does not match the shape of + ``image.data``. + + Notes + ----- + - This function assumes variances are uncorrelated between pixels. + - If neither ``uncertainty`` nor ``variance`` is present, the calling + code should decide whether to estimate variances from detector + characteristics or proceed without uncertainties. + """ + var2d = None + img_unit = getattr(image, "unit", None) + + # From image.uncertainty + unc = getattr(image, "uncertainty", None) + if unc is not None: + try: + # Prefer explicit uncertainty_type if available + utype = getattr(unc, "uncertainty_type", None) + + if utype in ("var", "variance"): + # Already variance + arr = getattr(unc, "array", unc) + var2d = u.Quantity( + np.asanyarray(arr), + img_unit**2 if img_unit is not None else u.dimensionless_unscaled, + copy=False, + ) + if img_unit is not None: + var2d = var2d.to(img_unit**2) + + elif utype in ("std", "stddev", "std_dev"): + # Standard deviation -> square to get variance + arr = getattr(unc, "array", unc) + q = u.Quantity( + np.asanyarray(arr), + img_unit if img_unit is not None else u.dimensionless_unscaled, + copy=False, + ) + var2d = q**2 + if img_unit is not None: + var2d = var2d.to(img_unit**2) + + elif utype in ("ivar", "inverse_variance", "invvar"): + # Inverse variance -> invert to get variance + arr = getattr(unc, "array", unc) + ivq = u.Quantity( + np.asanyarray(arr), + (1.0 / (img_unit**2)) if img_unit is not None else u.dimensionless_unscaled, + copy=False, + ) + with np.errstate(divide="ignore", invalid="ignore"): + var2d = 1.0 / ivq + if img_unit is not None: + var2d = var2d.to(img_unit**2) + + else: + # Fallbacks when uncertainty_type is absent + if hasattr(unc, "quantity"): + q = u.Quantity(unc.quantity, copy=False) + var2d = q**2 + elif isinstance(unc, StdDevUncertainty): + q = u.Quantity(np.asanyarray(unc.array), img_unit, copy=False) + var2d = q**2 + elif hasattr(unc, "array"): + # Assume stddev if type is unknown + q = u.Quantity(np.asanyarray(unc.array), img_unit, copy=False) + var2d = q**2 + else: + q = u.Quantity(np.asanyarray(unc), img_unit, copy=False) + var2d = q**2 + + if img_unit is not None: + var2d = var2d.to(img_unit**2) + + except Exception: + var2d = None + + # From image.variance (only if not already set) + if var2d is None: + v = getattr(image, "variance", None) + if v is not None: + vq = u.Quantity(v, copy=False) if hasattr(v, "unit") else u.Quantity( + np.asanyarray(v), img_unit**2 if img_unit is not None else u.dimensionless_unscaled, copy=False + ) + if img_unit is not None: + vq = vq.to(img_unit**2, equivalencies=u.dimensionless_angles()) + var2d = vq + + if var2d is None: + return None + + # Ensure shape matches image.data + data = getattr(image, "data", None) + if data is None: + raise ValueError("image has no .data attribute to compare shapes against") + + if var2d.shape != np.shape(data): + raise ValueError( + f"variance shape {var2d.shape} does not match image data shape {np.shape(data)}" + ) + + return u.Quantity(var2d, copy=False) + def __call__( self, image: ImageLike | None = None, @@ -245,6 +378,7 @@ def __call__( # latter case, the flux is calculated as the average of the non-masked pixels inside # the window multiplied by the window width. window_weights = _ap_weight_image(trace, width, disp_axis, cdisp_axis, self.image.shape) + var2d_q = self._variance2d_from_image(self.image) if self.mask_treatment == "apply": image_cleaned = np.where(~self.image.mask, self.image.data * window_weights, 0.0) @@ -254,11 +388,37 @@ def __call__( / weights.sum(axis=cdisp_axis) * window_weights.sum(axis=cdisp_axis) ) + if var2d_q is not None: + a = np.where(~self.image.mask, window_weights, 0.0) + S = window_weights.sum(axis=cdisp_axis) + W = a.sum(axis=cdisp_axis) + W = np.where(W == 0.0, np.nan, W) + num_var = (a**2 * var2d_q).sum(axis=cdisp_axis) + var1d_q = ((S / W) ** 2) * num_var + else: + var1d_q = None + else: image_windowed = np.where(window_weights, self.image.data * window_weights, 0.0) spectrum = np.sum(image_windowed, axis=cdisp_axis) + if var2d_q is not None: + var1d_q = (window_weights**2 * var2d_q).sum(axis=cdisp_axis) + else: + var1d_q = None + - return Spectrum(spectrum * self.image.unit, spectral_axis=self.image.spectral_axis) + if var1d_q is not None: + unc = StdDevUncertainty(np.sqrt(var1d_q).to(self.image.unit)) + return Spectrum( + spectrum * self.image.unit, + spectral_axis=self.image.spectral_axis, + uncertainty=unc, + ) + else: + return Spectrum( + spectrum * self.image.unit, + spectral_axis=self.image.spectral_axis, + ) @dataclass @@ -464,7 +624,9 @@ def _parse_image(self, image, variance=None, mask=None, unit=None, disp_axis=1): variance = VarianceUncertainty(variance) - unit = getattr(image, "unit", u.Unit(unit) if unit is not None else u.Unit("DN")) + img_unit = getattr(image, "unit", None) + unit = u.Unit(unit) if unit is not None else img_unit + unit = unit if unit is not None else u.Unit("DN") spectral_axis = getattr(image, "spectral_axis", np.arange(img.shape[disp_axis]) * u.pix) @@ -473,7 +635,7 @@ def _parse_image(self, image, variance=None, mask=None, unit=None, disp_axis=1): def _fit_gaussian_spatial_profile( self, img: ndarray, disp_axis: int, crossdisp_axis: int, or_mask: ndarray, bkgrd_prof: Model ): - """Fit a 1D Gaussian spatial profile to spectrum in `img`. + """Fit a 1D Gaussian spatial profile to spectrum in `img` with error propagation. Fits an 1D Gaussian profile to spectrum in `img`. Takes the weighted mean of ``img`` along the cross-dispersion axis all ignoring masked pixels @@ -487,6 +649,26 @@ def _fit_gaussian_spatial_profile( # co-add signal in each image row, ignore masked pixels coadd = np.ma.masked_array(img, mask=or_mask).mean(disp_axis) + # build per-row uncertainties for the masked mean: + # var(mean) = sum_j var_ij / N_i^2 over valid pixels + var2d_q = self._var2d_as_quantity() + var2d = np.asarray(var2d_q.value) + + valid = ~or_mask # True where pixel is unmasked + N_per_row = valid.sum(axis=disp_axis) + sum_var_per_row = np.where( + N_per_row > 0, + np.sum(np.where(valid, var2d, 0.0), axis=disp_axis), + 0.0, + ) + with np.errstate(invalid="ignore", divide="ignore"): + var_mean_per_row = np.where( + N_per_row > 0, + sum_var_per_row / (N_per_row.astype(float) ** 2), + np.nan, + ) + sigma_row = np.sqrt(var_mean_per_row) + # use the sum of brightest row as an inital guess for Gaussian amplitude, # the the location of the brightest row as an initial guess for the mean gauss_prof = models.Gaussian1D(amplitude=coadd.max(), mean=coadd.argmax(), stddev=2) @@ -503,7 +685,43 @@ def _fit_gaussian_spatial_profile( with warnings.catch_warnings(): warnings.simplefilter("ignore") fitter = fitting.LMLSQFitter() - fit_ext_kernel = fitter(ext_prof, xd_pixels[~coadd.mask], coadd.compressed()) + # prepare x, y, and weights = 1/sigma + good = ~coadd.mask + x_fit = xd_pixels[good] + y_fit = coadd.compressed() + sigma_fit = sigma_row[good] + + finite = np.isfinite(sigma_fit) & (sigma_fit > 0.0) + x_fit = x_fit[finite] + y_fit = y_fit[finite] + w_fit = 1.0 / sigma_fit[finite] + + fit_ext_kernel = fitter(ext_prof, x_fit, y_fit, weights=w_fit) + + # capture covariance and 1-sigma parameter errors if available + fit_info = getattr(fitter, "fit_info", {}) or {} + cov = fit_info.get("param_cov", fit_info.get("cov_x", None)) + param_stderr = None + if cov is not None: + try: + names = fit_ext_kernel.param_names + diag = np.diag(cov) + diag = np.where(np.isfinite(diag) & (diag >= 0.0), diag, np.nan) + errs = np.sqrt(diag) + param_stderr = {name: err for name, err in zip(names, errs)} + except Exception: + param_stderr = None + + # attach diagnostics to the model and stash on self + if not hasattr(fit_ext_kernel, "meta") or fit_ext_kernel.meta is None: + fit_ext_kernel.meta = {} + fit_ext_kernel.meta["param_cov"] = cov + fit_ext_kernel.meta["param_stderr"] = param_stderr + + self._last_fit_cov = cov + self._last_fit_param_stderr = param_stderr + self._last_profile_sigma = sigma_row + return fit_ext_kernel def _fit_spatial_profile( @@ -565,6 +783,34 @@ def _fit_spatial_profile( return RectBivariateSpline(x=bin_centers, y=np.arange(nrows), z=samples, kx=kx, ky=ky) + def _var2d_as_quantity(self) -> u.Quantity: + """ + Return the per-pixel variance from self.image as a Quantity + with units of (image.unit)**2. + + This assumes self.image was created by _parse_image, which + ensures that: + + - self.image.uncertainty exists + - It is a VarianceUncertainty or equivalent + - Its .array attribute has the same shape as self.image.data + - Units are consistent with self.image.unit + + Returns + ------- + var2d : `~astropy.units.Quantity` + 2-D variance array with units (image.unit)**2. + + Raises + ------ + ValueError + If variance is missing or has no .array attribute. + """ + v = getattr(self.image, "uncertainty", None) + if v is None or getattr(v, "array", None) is None: + raise ValueError("No per-pixel variance available on image.uncertainty") + return u.Quantity(v.array, self.image.unit**2, copy=False) + def __call__( self, image=None, @@ -673,6 +919,7 @@ def __call__( bkgrd_prof = models.Polynomial1D(2) self.image = self._parse_image(image, variance, mask, unit, disp_axis) + var2d_q = self._var2d_as_quantity() variance = self.image.uncertainty.represent_as(VarianceUncertainty).array mask = self.image.mask.astype(bool) | (~np.isfinite(self.image.data)) unit = self.image.unit @@ -727,8 +974,9 @@ def __call__( norms = np.full(ndisp, np.nan) valid = ~mask - if profile_type == "gaussian": - norms[:] = fit_ext_kernel.amplitude_0 * fit_ext_kernel.stddev_0 * np.sqrt(2 * np.pi) + ### remove gaussian norms preset; we will normalize profile explicitly ### + # if profile_type == "gaussian": + # norms[:] = fit_ext_kernel.amplitude_0 * fit_ext_kernel.stddev_0 * np.sqrt(2 * np.pi) for idisp in range(ndisp): if not np.any(valid[:, idisp]): @@ -740,14 +988,38 @@ def __call__( else: fitted_col = interp_spatial_prof(idisp, xd_pixels) kernel_vals[:, idisp] = fitted_col - norms[idisp] = trapezoid(fitted_col, dx=1)[0] + # no norms here; P will be normalized per column below + + # Normalize spatial profile per wavelength column + sumP = np.sum(kernel_vals * valid, axis=crossdisp_axis, keepdims=True) + sumP = np.where(sumP == 0, 1.0, sumP) # avoid division by zero + kernel_vals = kernel_vals / sumP + + # Get variance as Quantity with correct units + var2d_q = self._var2d_as_quantity() + + # Replace masked/invalid pixels in profile and data + D_eff = np.where(valid, img, 0.0) + P_use = np.where(valid, kernel_vals, 0.0) + + # Compute inverse variance (set to 0 where invalid) + inv_var = 1.0 / np.where(valid, var2d_q, np.nan) + inv_var = np.where(~np.isfinite(inv_var) | (inv_var <= 0), + 0.0 / var2d_q.unit, inv_var) + + # Horne 1986 numerator and denominator + num = np.sum((P_use * D_eff) * inv_var, axis=crossdisp_axis) + den = np.sum((P_use * P_use) * inv_var, axis=crossdisp_axis) + + # Flux and variance in 1D + flux1d = num / den + var1d_q = 1.0 / den - with np.errstate(divide="ignore", invalid="ignore"): - num = np.sum(np.where(valid, img * kernel_vals / variance, 0.0), axis=crossdisp_axis) - den = np.sum(np.where(valid, kernel_vals**2 / variance, 0.0), axis=crossdisp_axis) - extraction = (num / den) * norms + # Build Spectrum with propagated uncertainty + unc = StdDevUncertainty(np.sqrt(var1d_q)) - return Spectrum(extraction * unit, spectral_axis=self.image.spectral_axis) + unc = StdDevUncertainty(np.sqrt(var1d_q)) + return Spectrum(flux1d * unit, spectral_axis=self.image.spectral_axis, uncertainty=unc) def _align_along_trace(img, trace_array, disp_axis=1, crossdisp_axis=0): diff --git a/specreduce/tests/test_boxcar_error_prop.py b/specreduce/tests/test_boxcar_error_prop.py new file mode 100644 index 00000000..815e7008 --- /dev/null +++ b/specreduce/tests/test_boxcar_error_prop.py @@ -0,0 +1,236 @@ +import numpy as np +import pytest +import astropy.units as u +from astropy.nddata import NDData, VarianceUncertainty, StdDevUncertainty + +from specreduce.extract import BoxcarExtract +from specreduce.tracing import FlatTrace, ArrayTrace + + +ny, nx = 20, 60 +true_flux = 100.0 +sigma_per_pixel = 5.0 +width = 5 +rng = np.random.default_rng(42) +data = true_flux + rng.normal(0, sigma_per_pixel, size=(ny, nx)) + + +def make_image(uncertainty, data_override=None, mask=None): + """Synthetic image parameters.""" + use_data = data if data_override is None else data_override + img = NDData(data=use_data * u.DN, uncertainty=uncertainty, mask=mask) + img.spectral_axis = np.arange(nx) * u.pix + return img + + +def n_in_window(trace_pos, w, nrows): + """Expected-sigma helpers.""" + half = w // 2 + y0 = int(trace_pos) + y1 = max(0, y0 - half) + y2 = min(nrows, y0 + half + (1 if (w % 2) else 0)) + return max(0, y2 - y1) + + +def n_unmasked_in_window(mask, trace_pos, w): + """Expected-sigma helpers.""" + half = w // 2 + y0 = int(trace_pos) + y1 = max(0, y0 - half) + y2 = min(mask.shape[0], y0 + half + (1 if (w % 2) else 0)) + return (~mask[y1:y2, :]).sum(axis=0) + + +def expected_std_per_column(mode, trace_pos, w, mask=None, sigma=sigma_per_pixel): + """Expected-sigma helpers.""" + if mode == "apply": + if mask is None: + n_eff = n_in_window(trace_pos, w, ny) + return np.full(nx, sigma * w / np.sqrt(max(n_eff, 1))) + n_um = n_unmasked_in_window(mask, trace_pos, w).astype(float) + n_um[n_um <= 0] = np.nan + return sigma * w / np.sqrt(n_um) + else: + n_eff = n_in_window(trace_pos, w, ny) + return np.full(nx, sigma * np.sqrt(max(n_eff, 1))) + + +def run_extract(image, trace_pos=None, mask_treatment="ignore"): + """Case builders.""" + tpos = ny // 2 if trace_pos is None else trace_pos + trace = FlatTrace(image, trace_pos=tpos) + extractor = BoxcarExtract( + image=image, + trace_object=trace, + width=width, + disp_axis=1, + crossdisp_axis=0, + mask_treatment=mask_treatment, + ) + spec = extractor() + assert getattr(spec, "uncertainty", None) is not None, "Extractor returned no uncertainty" + return np.asarray(spec.uncertainty.array) + + +var_unc = VarianceUncertainty(np.full((ny, nx), sigma_per_pixel**2) * (u.DN**2)) +std_unc = StdDevUncertainty(np.full((ny, nx), sigma_per_pixel) * u.DN) +std_unc_nounit = StdDevUncertainty(np.full((ny, nx), sigma_per_pixel)) +neg_var = VarianceUncertainty(np.full((ny, nx), -1.0) * (u.DN**2)) +zero_var_all = VarianceUncertainty(np.zeros((ny, nx)) * (u.DN**2)) + +_some_zero_var = np.full((ny, nx), sigma_per_pixel**2) * (u.DN**2) +_some_zero_var[0, :] = 0.0 * (u.DN**2) +some_zero_unc = VarianceUncertainty(_some_zero_var) + +half_mask = np.zeros((ny, nx), dtype=bool) +half_mask[::2, :] = True + +bad_data = data.copy() +bad_data[ny // 2, 5] = np.nan +bad_data[ny // 2, 8] = np.inf + + +def test_boxcar_variance_uncertainty_matches_expectation(): + """Basic uncertainty wiring.""" + img = make_image(var_unc) + arr = run_extract(img, mask_treatment="ignore") + exp = expected_std_per_column("ignore", ny // 2, width) + assert np.allclose(arr, exp, rtol=5e-3, atol=5e-3) + + +def test_boxcar_stddev_uncertainty_matches_variance_case(): + """Basic uncertainty wiring.""" + img = make_image(std_unc) + arr = run_extract(img, mask_treatment="ignore") + exp = expected_std_per_column("ignore", ny // 2, width) + assert np.allclose(arr, exp, rtol=5e-3, atol=5e-3) + + +def test_boxcar_unitless_stddev_matches(): + """Basic uncertainty wiring.""" + img = make_image(std_unc_nounit) + arr = run_extract(img, mask_treatment="ignore") + exp = expected_std_per_column("ignore", ny // 2, width) + assert np.allclose(arr, exp, rtol=5e-3, atol=5e-3) + + +def test_boxcar_missing_uncertainty_errors(): + """Basic uncertainty wiring.""" + img = NDData(data=data * u.DN) + with pytest.raises(Exception): + run_extract(img, mask_treatment="ignore") + + +@pytest.mark.parametrize( + "mode, use_halfmask, expect_vector, expect_nans", + [ + ("apply", True, True, False), + ("ignore", False, True, False), + ("zero_fill", False, True, False), + ("apply_mask_only", False, True, False), + ("apply_nan_only", False, True, False), + ("nan_fill", False, False, True), + ], +) +def test_boxcar_mask_treatments(mode, use_halfmask, expect_vector, expect_nans): + """Mask_treatment modes.""" + img = make_image(var_unc, data_override=bad_data, mask=half_mask if use_halfmask else None) + if mode == "apply": + mask = half_mask + else: + mask = None + + if mode == "nan_fill": + arr = run_extract(img, mask_treatment=mode) + assert np.isnan(arr).any() + return + + if mode in ("ignore", "zero_fill", "apply_mask_only", "apply_nan_only"): + arr = run_extract(img, mask_treatment=mode) + exp = expected_std_per_column("ignore", ny // 2, width) + assert np.all(np.isfinite(arr)) + assert np.allclose(arr, exp, rtol=5e-3, atol=5e-3) + return + + if mode == "apply": + arr = run_extract(img, mask_treatment=mode) + exp = expected_std_per_column("apply", ny // 2, width, mask=mask) + finite = np.isfinite(exp) + assert np.allclose(arr[finite], exp[finite], rtol=5e-3, atol=5e-3) + return + + +def test_boxcar_propagate_is_finite(): + """Mask_treatment modes.""" + img = make_image(var_unc, data_override=bad_data) + arr = run_extract(img, mask_treatment="propagate") + assert arr.shape == (nx,) + assert np.all(np.isfinite(arr)) + + +def test_boxcar_edge_top_clipped_window(): + """Aperture partly off the detector.""" + img = make_image(var_unc) + arr = run_extract(img, trace_pos=1, mask_treatment="ignore") + exp = expected_std_per_column("ignore", 1, width) + assert np.allclose(arr, exp, rtol=5e-3, atol=5e-3) + assert exp[0] < sigma_per_pixel * np.sqrt(width) + + +def test_boxcar_edge_bottom_clipped_window(): + """Aperture partly off the detector.""" + img = make_image(var_unc) + arr = run_extract(img, trace_pos=ny - 2, mask_treatment="ignore") + exp = expected_std_per_column("ignore", ny - 2, width) + assert np.allclose(arr, exp, rtol=5e-3, atol=5e-3) + + +def test_boxcar_fully_masked_input_raises(): + """Fully masked input.""" + masked_all = np.ones((ny, nx), dtype=bool) + img = make_image(var_unc, mask=masked_all) + with pytest.raises(Exception): + run_extract(img, mask_treatment="ignore") + + +def test_boxcar_arraytrace_equivalence(): + """ArrayTrace equivalence.""" + img = make_image(var_unc) + trace_array = np.full(nx, ny // 2) + trace = ArrayTrace(img, trace_array) + extractor = BoxcarExtract(image=img, trace_object=trace, width=width, disp_axis=1, crossdisp_axis=0) + spec = extractor() + arr = np.asarray(spec.uncertainty.array) + exp = expected_std_per_column("ignore", ny // 2, width) + assert np.allclose(arr, exp, rtol=5e-3, atol=5e-3) + + +def test_boxcar_apply_half_mask_normalization(): + """Apply normalization with half masked rows.""" + img = make_image(var_unc, mask=half_mask) + arr = run_extract(img, mask_treatment="apply") + exp = expected_std_per_column("apply", ny // 2, width, mask=half_mask) + finite = np.isfinite(exp) + assert np.allclose(arr[finite], exp[finite], rtol=5e-3, atol=5e-3) + + +def test_boxcar_negative_variance_rejected(): + """Additional variance edge cases.""" + img = make_image(neg_var) + with pytest.raises(Exception): + run_extract(img, mask_treatment="ignore") + + +def test_boxcar_zero_variance_allows_finite_output(): + """Additional variance edge cases.""" + img = make_image(zero_var_all) + arr = run_extract(img, mask_treatment="ignore") + assert np.all(np.isfinite(arr)) + + +def test_boxcar_some_zero_variances_row(): + """Additional variance edge cases.""" + img = make_image(some_zero_unc) + arr = run_extract(img, mask_treatment="ignore") + exp = expected_std_per_column("ignore", ny // 2, width) + assert np.allclose(arr, exp, rtol=5e-3, atol=5e-3) diff --git a/specreduce/tests/test_fit_gaussian_spatial_profile_error_prop.py b/specreduce/tests/test_fit_gaussian_spatial_profile_error_prop.py new file mode 100644 index 00000000..c5e58aff --- /dev/null +++ b/specreduce/tests/test_fit_gaussian_spatial_profile_error_prop.py @@ -0,0 +1,157 @@ +#!/usr/bin/env python3 +import os +import numpy as np +import astropy.units as u + +from dataclasses import dataclass +from astropy.io import fits +from astropy.nddata import NDData, VarianceUncertainty +from astropy.modeling import models + +from specreduce.extract import HorneExtract +from specreduce.tracing import FlatTrace + + +@dataclass +class CaseSpec: + name: str + ny: int + nx: int + amp: float + mean: float + std: float + bkg: float + sigma_per_pix: float + vary_row_sigma: bool = False + mask_fraction: float = 0.0 + kill_rows: tuple = () + + +def make_image(spec: CaseSpec, seed: int = 1234): + rng = np.random.default_rng(seed) + y = np.arange(spec.ny, dtype=float) + + profile = spec.amp * np.exp(-0.5 * ((y - spec.mean) / spec.std) ** 2) + spec.bkg + ideal = np.tile(profile[:, None], (1, spec.nx)) + + if spec.vary_row_sigma: + sigma_row = spec.sigma_per_pix * (0.5 + 0.5 * (y - y.min()) / (y.max() - y.min())) + var2d = (sigma_row[:, None] ** 2) * np.ones((1, spec.nx)) + else: + var2d = np.full((spec.ny, spec.nx), spec.sigma_per_pix**2, dtype=float) + + noise = rng.normal(0.0, np.sqrt(var2d)) + data = ideal + noise + + mask = np.zeros_like(data, dtype=bool) + if spec.mask_fraction > 0.0: + k = int(spec.mask_fraction * data.size) + idx = rng.choice(data.size, size=k, replace=False) + mask.flat[idx] = True + if spec.kill_rows: + mask[np.array(spec.kill_rows, dtype=int), :] = True + + return data, var2d, mask + + +def write_fits(path: str, data: np.ndarray, var2d: np.ndarray, mask: np.ndarray, unit: str = "DN"): + os.makedirs(os.path.dirname(path), exist_ok=True) + phdu = fits.PrimaryHDU(data.astype(np.float64)) + phdu.header["BUNIT"] = unit + vhdu = fits.ImageHDU(var2d.astype(np.float64), name="VARIANCE") + mhdu = fits.ImageHDU(mask.astype(np.uint8), name="MASK") + fits.HDUList([phdu, vhdu, mhdu]).writeto(path, overwrite=True) + + +def read_fits_as_nddata(path: str) -> NDData: + with fits.open(path) as hdul: + data = hdul[0].data.astype(np.float64) + unit = hdul[0].header.get("BUNIT", "DN") + var2d = hdul["VARIANCE"].data.astype(np.float64) + mask = hdul["MASK"].data.astype(bool) + return NDData( + data=data * u.Unit(unit), + uncertainty=VarianceUncertainty(var2d * (u.Unit(unit) ** 2)), + mask=mask, + unit=u.Unit(unit), + ) + + +def summarize_sigma_row(title: str, computed: np.ndarray, expected: np.ndarray): + diff = computed - expected + finite = np.isfinite(expected) & np.isfinite(computed) + mad = np.nanmedian(np.abs(diff[finite])) if np.any(finite) else np.nan + maxad = np.nanmax(np.abs(diff[finite])) if np.any(finite) else np.nan + nfin = int(np.sum(finite)) + print(f"[{title}] sigma_row vs expected") + print(f" finite rows : {nfin}") + print(f" median|diff| : {mad:.6g}") + print(f" max|diff| : {maxad:.6g}\n") + + +def print_fit_summary(model, label="fit"): + # Gaussian component is index 0, background (if present) is index 1 + stderr = model.meta.get("param_stderr", {}) or {} + cov = model.meta.get("param_cov", None) + + def serr(pname, comp_index): + key = f"{pname}_{comp_index}" + return float(stderr.get(key, np.nan)) + + # Gaussian params from component 0 + g = model[0] if hasattr(model, "__len__") else model + print(f"[{label}] fitted parameters") + print(f" amplitude = {g.amplitude.value:.6g} ± {serr('amplitude', 0):.3g}") + print(f" mean = {g.mean.value:.6g} ± {serr('mean', 0):.3g}") + print(f" stddev = {g.stddev.value:.6g} ± {serr('stddev', 0):.3g}") + + # Background amplitude, if present, is component 1 + if hasattr(model, "__len__") and len(model) > 1: + b = model[1] + print(f" background = {b.amplitude.value:.6g} ± {serr('amplitude', 1):.3g}") + + if cov is not None: + print(f" covariance shape: {np.shape(cov)}") + print("") + + +def run_case(spec: CaseSpec, outdir="synthetic_cases", with_background=True): + print("=" * 70) + print(f"Case: {spec.name}") + + data, var2d, mask = make_image(spec) + fpath = os.path.join(outdir, f"{spec.name}.fits") + write_fits(fpath, data, var2d, mask, unit="DN") + nd = read_fits_as_nddata(fpath) + + valid = ~mask + Ni = valid.sum(axis=1).astype(float) + sumvar = (valid * var2d).sum(axis=1) + with np.errstate(invalid="ignore", divide="ignore"): + var_mean = np.where(Ni > 0, sumvar / (Ni**2), np.nan) + expected_sigma = np.sqrt(var_mean) + + trace = FlatTrace(image=nd, trace_pos=spec.mean) + ex = HorneExtract(nd, trace_object=trace) + + bkgrd = models.Const1D() if with_background else None + model = ex._fit_gaussian_spatial_profile( + img=nd.data, # <-- only change: use nd.data, not nd.data.value + disp_axis=1, + crossdisp_axis=0, + or_mask=mask, + bkgrd_prof=bkgrd, + ) + + summarize_sigma_row(spec.name, ex._last_profile_sigma, expected_sigma) + print_fit_summary(model, label=spec.name) + + +if __name__ == "__main__": + cases = [ + CaseSpec("uniform_no_mask", 50, 300, 150.0, 23.0, 4.0, 3.0, 6.0), + CaseSpec("random_mask_empty_rows", 40, 200, 120.0, 15.0, 3.0, 2.0, 4.0, mask_fraction=0.35, kill_rows=(3, 7)), + CaseSpec("varying_row_sigma", 60, 250, 200.0, 29.0, 5.0, 1.0, 5.0, vary_row_sigma=True), + ] + for spec in cases: + run_case(spec, with_background=True) diff --git a/specreduce/tests/test_horne_error_prop.py b/specreduce/tests/test_horne_error_prop.py new file mode 100644 index 00000000..07227097 --- /dev/null +++ b/specreduce/tests/test_horne_error_prop.py @@ -0,0 +1,213 @@ +import numpy as np +import pytest +from astropy import units as u +from astropy.modeling import models +from astropy.nddata import NDData, VarianceUncertainty +from astropy.tests.helper import assert_quantity_allclose + +from specreduce.extract import HorneExtract +from specreduce.tracing import FlatTrace + + +def _make_gaussian_row_image( + ny=50, + nx=300, + amp=150.0, + mean=23.0, + std=4.0, + bkg=3.0, + sigma_per_pix=6.0, + vary_row_sigma=False, + kill_rows=None, + seed=1234, +): + rng = np.random.default_rng(seed) + y = np.arange(ny, dtype=float) + + profile = amp * np.exp(-0.5 * ((y - mean) / std) ** 2) + bkg + ideal = np.tile(profile[:, None], (1, nx)) + + if vary_row_sigma: + sigma_row = sigma_per_pix * (0.5 + 0.5 * (y - y.min()) / (y.max() - y.min())) + var2d = (sigma_row[:, None] ** 2) * np.ones((1, nx)) + else: + var2d = np.full((ny, nx), sigma_per_pix**2, dtype=float) + + noise = rng.normal(0.0, np.sqrt(var2d)) + data = ideal + noise + + mask = np.zeros_like(data, dtype=bool) + if kill_rows: + mask[np.asarray(kill_rows, dtype=int), :] = True + + nd = NDData( + data=data * u.DN, + unit=u.DN, + uncertainty=VarianceUncertainty(var2d * u.DN**2), + mask=mask, + ) + return nd, data, var2d, mask + + +def _expected_sigma_row(mask, var2d, disp_axis=1): + valid = ~mask + N = valid.sum(axis=disp_axis).astype(float) + sumvar = (valid * var2d).sum(axis=disp_axis) + with np.errstate(invalid="ignore", divide="ignore"): + var_mean = np.where(N > 0, sumvar / (N**2), np.nan) + return np.sqrt(var_mean) + + +@pytest.mark.filterwarnings("ignore:The fit may be unsuccessful") +def test_sigma_propagation_matches_analytic(): + nd, data, var2d, mask = _make_gaussian_row_image(ny=50, nx=300, sigma_per_pix=6.0) + trace = FlatTrace(image=nd, trace_pos=23.0) + + ex = HorneExtract( + data, trace, + spatial_profile="gaussian", + bkgrd_prof=models.Const1D(), + variance=var2d, + mask=mask, + unit=u.DN, + ) + _ = ex.spectrum # triggers the fit and sets _last_profile_sigma + + expected = _expected_sigma_row(mask, var2d) + # exact equality within floating noise in reduction path + np.testing.assert_allclose(ex._last_profile_sigma, expected, rtol=0, atol=0) + + # fitted parameters should be close to truth + cov = ex._last_fit_cov + stderr = ex._last_fit_param_stderr + assert cov is not None + assert stderr is not None + assert cov.shape[0] == cov.shape[1] == 4 # Gaussian amp, mean, std, Const1D amp + + # Diagonal nonnegative and finite where expected + d = np.diag(cov) + assert np.all(~np.isfinite(d) | (d >= 0)) + + # Check that main errors are positive and finite + for k in ("amplitude_0", "mean_0", "stddev_0", "amplitude_1"): + assert k in stderr + assert np.isfinite(stderr[k]) and stderr[k] > 0 + + # Rough proximity to injected parameters + # Use 3 sigma bounds from stderr to be robust + amp_est, mu_est, sig_est = ex.spectrum.meta.get("fit_amplitude", None), None, None + # Access parameters directly from covariance names if needed + # We avoid relying on internal fitted model object here. + + +@pytest.mark.filterwarnings("ignore:The fit may be unsuccessful") +def test_fully_masked_rows_are_excluded(): + ny, nx = 40, 200 + kill_rows = [3, 7, 9] + nd, data, var2d, mask = _make_gaussian_row_image( + ny=ny, nx=nx, sigma_per_pix=4.0, kill_rows=kill_rows + ) + trace = FlatTrace(image=nd, trace_pos=15.0) + ex = HorneExtract( + data, trace, + spatial_profile="gaussian", + bkgrd_prof=models.Const1D(), + variance=var2d, + mask=mask, + unit=u.DN, + ) + _ = ex.spectrum + + sigma_row = ex._last_profile_sigma + assert sigma_row.shape == (ny,) + # killed rows should be NaN + assert np.all(np.isnan(sigma_row[kill_rows])) + # number of finite rows equals ny minus killed + assert np.sum(np.isfinite(sigma_row)) == ny - len(kill_rows) + + # analytic check + expected = _expected_sigma_row(mask, var2d) + np.testing.assert_allclose(sigma_row, expected, rtol=0, atol=0) + + +@pytest.mark.filterwarnings("ignore:The fit may be unsuccessful") +def test_covariance_scales_with_variance(): + # Same data, two different overall variance scalings: V and 4V + nd, data, var2d, mask = _make_gaussian_row_image(ny=50, nx=300, sigma_per_pix=6.0) + trace = FlatTrace(image=nd, trace_pos=23.0) + + ex1 = HorneExtract( + data, trace, + spatial_profile="gaussian", + bkgrd_prof=models.Const1D(), + variance=var2d, + mask=mask, + unit=u.DN, + ) + _ = ex1.spectrum + stderr1 = ex1._last_fit_param_stderr + + ex2 = HorneExtract( + data, trace, + spatial_profile="gaussian", + bkgrd_prof=models.Const1D(), + variance=4.0 * var2d, # 2x sigma per pixel + mask=mask, + unit=u.DN, + ) + _ = ex2.spectrum + stderr2 = ex2._last_fit_param_stderr + + # Parameter point estimates should be nearly identical + # Compare the 1D extracted spectra, which are the final product + sp1 = ex1.spectrum.flux + sp2 = ex2.spectrum.flux + assert_quantity_allclose(sp1, sp2, rtol=1e-3, atol=0) + + # Standard errors should scale like the sigma scaling (2x here) + for k in ("amplitude_0", "mean_0", "stddev_0", "amplitude_1"): + s1 = float(stderr1[k]) + s2 = float(stderr2[k]) + assert np.isfinite(s1) and np.isfinite(s2) and s1 > 0 and s2 > 0 + ratio = s2 / s1 + assert np.isclose(ratio, 2.0, rtol=0.1, atol=0.0) # allow some fitter tolerance + + +@pytest.mark.filterwarnings("ignore:The fit may be unsuccessful") +def test_constant_vs_variable_row_sigma_agree_when_equalized(): + # When row variances are uniform, weighted fit equals the uniform-sigma case + nd_u, data_u, var_u, mask_u = _make_gaussian_row_image( + ny=50, nx=300, sigma_per_pix=5.0, vary_row_sigma=False + ) + nd_v, data_v, var_v, mask_v = _make_gaussian_row_image( + ny=50, nx=300, sigma_per_pix=5.0, vary_row_sigma=True + ) + + trace_u = FlatTrace(image=nd_u, trace_pos=23.0) + trace_v = FlatTrace(image=nd_v, trace_pos=23.0) + + # For the variable case, replace variance with its column-mean to equalize rows + var_v_equal = np.tile(np.nanmean(var_v, axis=1)[:, None], (1, var_v.shape[1])) + + ex_u = HorneExtract( + data_u, trace_u, + spatial_profile="gaussian", + bkgrd_prof=models.Const1D(), + variance=var_u, + mask=mask_u, + unit=u.DN, + ) + _ = ex_u.spectrum + + ex_v = HorneExtract( + data_v, trace_v, + spatial_profile="gaussian", + bkgrd_prof=models.Const1D(), + variance=var_v_equal, + mask=mask_v, + unit=u.DN, + ) + _ = ex_v.spectrum + + # Spectra should agree closely when the weighting is effectively uniform + assert_quantity_allclose(ex_u.spectrum.flux, ex_v.spectrum.flux, rtol=5e-3, atol=0)