__all__ = ['EISCube']
import sys
import copy
import numpy as np
import astropy.units as u
from astropy.convolution import convolve, CustomKernel
from astropy.coordinates import SkyCoord
from ndcube import __version__ as ndcube_ver
from ndcube import NDCube
[docs]
class EISCube(NDCube):
"""EIS Level-1 Data Cube
Subclass of NDCube. Accepts all of the standard arguments and keywords
of `ndcube.NDCube`, as well as a few EIS specific parameters.
Parameters
----------
data : `numpy.ndarray`
The array holding the actual data in this object.
wcs : `astropy.wcs.WCS`, optional
The WCS object containing the axes' information, optional only if
``data`` is an `astropy.nddata.NDData` object.
uncertainty : array_like, optional
Uncertainty in the dataset. Ideally, it should have an attribute
"uncertainty_type" that defines what kind of uncertainty is stored, for
example "std" for standard deviation or "var" for variance. A metaclass
defining such an interface is `~astropy.nddata.NDUncertainty`,
however it's use in not mandatory. If the uncertainty has no type
attribute, the uncertainty is stored as UnknownUncertainty.
Defaults to None.
mask : array_like, optional
Mask for the dataset. Masks should follow the numpy convention that
valid data points are marked by False and invalid ones with True.
Defaults to None.
meta : dict-like object, optional
Additional meta information about the dataset. If no meta is provided
an empty collections.OrderedDict is created. Default is None.
unit : Unit-like or str, optional
Unit for the dataset. Strings that can be converted to a Unit are allowed.
Default is None.
copy : bool, optional
Indicates whether to save the arguments as copy. True copies every attribute
before saving it while False tries to save every parameter as reference.
Note however that it is not always possible to save the input as reference.
Default is False.
wavelength : `numpy.ndarray`, optional
Numpy array with the corrected wavelength values for each location
within the EIS raster. Must have the same dimensionality as the input
data. If not given, will initialize the .wavelength property with
an array of zeros.
radcal : `numpy.ndarray`, optional
Array of the radiometric calibration curve currently applied to the
input data cube. Required if you wish to use the .apply_radcal() and
.remove_radcal() methods
"""
# NOTE: this is based on the example given at
# https://docs.astropy.org/en/stable/nddata/subclassing.html#slicing-an-additional-property
def __init__(self, *args, **kwargs):
# Extract extra attributes, if given, and initialize them correctly.
input_wave = kwargs.pop('wavelength') if 'wavelength' in kwargs else None
if input_wave is None:
input_wave = np.zeros_like(args[0], dtype=float)
self.wavelength = input_wave
input_radcal = kwargs.pop('radcal') if 'radcal' in kwargs else 'unknown'
self._current_radcal = input_radcal
kwargs['copy'] = True # Try to ensure meta is not leaked between cutouts
super().__init__(*args, **kwargs)
@property
def wavelength(self):
"""Corrected wavelength values observed by EIS"""
return self._wavelength
@wavelength.setter
def wavelength(self, input_array):
self._wavelength = input_array
@property
def radcal(self):
"""Current radiometric calibration curve"""
return self._current_radcal
@radcal.setter
def radcal(self, input_array):
print('Error: Please use the .apply_radcal() and .remove_radcal()'
+' methods to modify or change the radiometric calibration.')
def _slice(self, item):
kwargs = super()._slice(item) # slice all normal attributes
old_meta = kwargs.pop('meta')
kwargs['meta'] = copy.deepcopy(old_meta) # no refs, please!
# The arguments for creating a new instance are saved in kwargs. So we
# need to add additional keywords with our sliced, extra properties
kwargs['wavelength'] = self.wavelength[item]
kwargs['radcal'] = self.radcal
# Update the 'mod_index' (used for exporting to .fits after fitting)
# Reminder: 'mod_index' uses a fits image axes order of [X, Y, Wave]
m_idx = copy.deepcopy(kwargs['meta']['mod_index'])
ll_wcs = kwargs['wcs'].low_level_wcs
wcs_arr_shape = ll_wcs.array_shape # axis order of [Y, X, wave]
new_ori = kwargs['wcs'].array_index_to_world(0,0,0)
ax_shape = [1, 1, 1] # true length of [Y, X, Wave] axes
if isinstance(new_ori, list):
# 3D subcube or 2D [spatial, wave] slice (one axis removed)
x1 = new_ori[-1].Tx.to('arcsec').value
y1 = new_ori[-1].Ty.to('arcsec').value
w1 = new_ori[0].to('Angstrom').value
elif isinstance(new_ori, SkyCoord):
# 2D slice at a given wavelength value or 1D spatial profile
x1 = new_ori.Tx.to('arcsec').value
y1 = new_ori.Ty.to('arcsec').value
lost_unit = ll_wcs.dropped_world_dimensions['world_axis_units']
lost_value = ll_wcs.dropped_world_dimensions['value']
w1 = u.Quantity(lost_value[0], lost_unit[0]).to('Angstrom').value
elif isinstance(new_ori, u.Quantity):
# Single spectrum at a selected location
w1 = new_ori.to('Angstrom').value
lost_units = ll_wcs.dropped_world_dimensions['world_axis_units']
lost_values = ll_wcs.dropped_world_dimensions['value']
x1 = u.Quantity(lost_values[0], lost_units[0]).to('arcsec').value
y1 = u.Quantity(lost_values[1], lost_units[1]).to('arcsec').value
# ndcube >= 2.0 drops all shallow (length 1) axes from .array_shape
# UNLESS an axis was sliced with explicit start:stop values of i:i+1
# Unfortunatly, this means there is no reliable method to identify the
# true length of each axis and which axis was dropped (if any).
# Therefore, we must resort to checking the type of each input
# slice parameter in "item" and try to manaully match axes with lengths
if ndcube_ver >= '2.0.0':
# First, extract and rearrange old shape in order of [Y, X, Wave]
old_shape = (m_idx['naxis2'], m_idx['naxis1'], m_idx['naxis3'])
ax_i = 0 # true axis index
wcs_i = 0 # sliced wcs axis index
# Note: we must take care when slicing a 2D slice of a cube
for s_i in range(len(item)):
while ax_i <= 2:
if old_shape[ax_i] == 1:
# skip previously dropped axes
ax_i += 1
else:
if isinstance(item[s_i], slice):
ax_shape[ax_i] = wcs_arr_shape[wcs_i]
wcs_i += 1
else:
ax_shape[ax_i] = 1
ax_i += 1
break
else:
# Works just fine in ndcube 1.4.2 (for all kinds of slices)
ax_shape = wcs_arr_shape
x2 = x1 + ax_shape[1]*m_idx['cdelt1']
y2 = y1 + ax_shape[0]*m_idx['cdelt2']
x_shift = round(abs(x1 - m_idx['crval1'])/m_idx['cdelt1'])
y_shift = round(abs(y1 - m_idx['crval2'])/m_idx['cdelt2'])
w_shift = round(abs(w1 - m_idx['crval3'])/m_idx['cdelt3'])
m_idx['naxis1'] = ax_shape[1] # X-axis
m_idx['naxis2'] = ax_shape[0] # Y-axis
m_idx['naxis3'] = ax_shape[2] # Wavelength axis
m_idx['crpix1'] = 1.0 - x_shift
m_idx['crpix2'] = 1.0 - y_shift
m_idx['crpix3'] = 1.0 - w_shift
m_idx['fovx'] = x2 - x1
m_idx['fovy'] = y2 - y1
m_idx['xcen'] = x1 + 0.5*(x2-x1)
m_idx['ycen'] = y1 + 0.5*(y2-y1)
kwargs['meta']['mod_index'] = m_idx
kwargs['meta']['extent_arcsec'] = [x1, x2, y1, y2] # [L, R, Top, Bot]
return kwargs # these must be returned
[docs]
def crop_by_coords(self, *args, **kwargs):
"""REMOVED in NDCube 2.0"""
print('Error: crop_by_coords() was removed in NDCube 2.0. Please use'
+' the .crop() or .crop_by_values() methods instead. See the'
+' NDCube documentation for more information.', file=sys.stderr)
return None
[docs]
def apply_radcal(self, input_radcal=None):
"""Apply a radiometric calibration curve (user-inputted or preflight)
Parameters
----------
input_radcal : array_like, optional
User-inputted radiometric calibration curve. If set to None, will
use the preflight radcal curve from the .meta dict. Default is None
Returns
-------
output_cube : `EISCube` class instance
A new EISCube class instance containing the calibrated data
"""
if input_radcal is None:
# Preflight radcal from HDF5 header file
new_radcal = self.meta['radcal']
else:
# User-inputted radcal curve
new_radcal = np.array(input_radcal)
if len(new_radcal) != self.data.shape[-1]:
print('Error: input_radcal must have the same number of elements'
+' as the last dimension in the data array.')
return self
output_radcal = new_radcal
if self.unit != u.photon:
if str(self.radcal) == 'unknown':
print('Error: Data currently has an unknown radcal applied.'
+' Unable to apply new calibration.')
return self
elif np.all(self.radcal == new_radcal):
print('Error: input_radcal is identical to current radcal.'
+' No calculation is required.')
return self
else:
print('Warning: Data currently has a different radcal applied.'
+' Old calibration curve will be removed.')
new_radcal = new_radcal/self.radcal
new_data = self.data.copy()*new_radcal
new_errs = self.uncertainty.array.copy()*new_radcal
new_meta = copy.deepcopy(self.meta)
new_meta['mod_index']['bunit'] = 'erg / (cm2 s sr)'
new_meta['notes'].append('Applied radcal to convert photon counts to intensity')
# wcs_mask = (np.array(tuple(reversed(self.wcs.array_shape))) <= 1).tolist()
output_cube = EISCube(new_data, wcs=self.wcs, uncertainty=new_errs,
wavelength=self.wavelength, radcal=output_radcal,
meta=new_meta, unit='erg / (cm2 s sr)',
# mask=self.mask, missing_axes=wcs_mask)
mask=self.mask)
return output_cube
[docs]
def remove_radcal(self):
"""Remove the applied radiometric calibration and convert data to counts
Returns
-------
output_cube : `EISCube` class instance
A new EISCube class instance containing the photon count data
"""
if self.unit == u.photon:
print('Error: Data is already in units of photon counts.'
+' No calculation required.')
return self
elif str(self.radcal) == 'unknown':
print('Error: Data currently has an unknown radcal applied.'
+' Unable to remove calibration.')
return self
new_data = self.data.copy()/self.radcal
new_errs = self.uncertainty.array.copy()/self.radcal
new_meta = copy.deepcopy(self.meta)
new_meta['mod_index']['bunit'] = 'photon'
new_meta['notes'].append('Removed radcal to convert intensity to photon counts')
# wcs_mask = (np.array(tuple(reversed(self.wcs.array_shape))) <= 1).tolist()
output_cube = EISCube(new_data, wcs=self.wcs, uncertainty=new_errs,
wavelength=self.wavelength, radcal=None,
meta=new_meta, unit='photon',
# mask=self.mask, missing_axes=wcs_mask)
mask=self.mask)
return output_cube
[docs]
def sum_spectra(self, wave_range=None, units=u.Angstrom):
"""Sum the data along the spectral axis.
Parameters
----------
wave_range : list of ints, floats, or `~astropy.units.Quantity`
Wavelength range to sum over. Values can be input as either
[min, max] or [center, half width]. Units can be specified using
either Astropy units instances or by inputting a pair of ints or
floats and then also using the "units" keyword. If wave_range is set
to None, then entire spectra will be summed over. Default is None.
units : str or `~astropy.units.Quantity`
Units to be used for the wavelength range if wave_range is given a
list of ints or floats. Will be ignored if either wave_range is None
or is given a list with Astropy units. Default is 'Angstrom'.
Returns
-------
output_cube : `NDCube` class instance
A new 2D `NDCube` class instance containing the summed data (NB: not
a full EISCube!)
"""
if wave_range is None:
# Sum over entire wavelength axis and return an NDCube
try:
new_wcs = self.wcs.dropaxis(0)
except:
new_wcs = copy.deepcopy(self[:,:,0].wcs)
sum_data = np.sum(self.data, axis=2)
new_meta = copy.deepcopy(self.meta)
new_meta['notes'].append('Summed over entire wavelength axis.')
return NDCube(sum_data, new_wcs, meta=new_meta)
# Validate input wavelength range
if isinstance(wave_range, (list, tuple)):
use_range = [0, 0]
range_units = ['unknown', 'unknown']
print('Summing EISCube spectra over a select wavelength range.')
if len(wave_range) != 2:
print('Error: invalid number of wave_range values. Please input'
+' a list or tuple with exactly two elements.',
file=sys.stderr)
return None
else:
print('Error: invalid wave_range type. Please input either None or'
+' a list (or tuple) with two elements.', file=sys.stderr)
return None
for w in range(2):
if isinstance(wave_range[w], u.Quantity):
# Parse an astropy.units.Quantity and convert as needed
# Note: this will overwrite any inputs to the "units" kwarg
if wave_range[w].unit == u.pix:
use_range[w] = wave_range[w].value
range_units[w] = u.pix
elif wave_range[w].unit.physical_type == 'length':
use_range[w] = wave_range[w].to('Angstrom').value
range_units[w] = u.Angstrom
else:
print('Error: invalid wavelength unit. Please input a pixel'
+' or length unit.', file=sys.stderr)
return None
else:
# Assume default or user inputted units (still convert if needed)
input_units = u.Unit(units)
if input_units == u.pix:
use_range[w] = float(wave_range[w])
range_units[w] = u.pix
elif input_units.physical_type == 'length':
u_scale = input_units.to('Angstrom')
use_range[w] = float(wave_range[w])*u_scale
range_units[w] = u.Angstrom
else:
print('Error: invalid wavelength unit. Please input a pixel'
+' or length unit.', file=sys.stderr)
return None
# Check for consistent units
if range_units[0] != range_units[1]:
print('Error: mismatched units. Please input the same units for'
+' both wave_range elements or use the "units" keyword',
file=sys.stderr)
return None
# If given values of [center, half width], compute the actual range
if use_range[1] < use_range[0]:
temp_center = use_range[0]
temp_half_wid = use_range[1]
use_range[0] = temp_center - temp_half_wid
use_range[1] = temp_center + temp_half_wid
# Get indices to be summed over
w_indices = [0, -1]
if range_units[0] == u.pix:
# Round pixels values to nearest whole indice
w_indices[w] = int(round(use_range[w]))
elif range_units[0] == u.Angstrom:
# Find the closest pixel location on the average wavelength axis
try:
# Note: the corrected wavelength has units of [Angstrom]
w_coords = np.mean(self.wavelength, axis=(0,1))
except KeyError:
print('Error: missing or invalid corrected wavelength array.')
return None
for w in range(2):
abs_w_diff = np.abs(w_coords - use_range[w])
w_indices[w] = np.argmin(abs_w_diff)
try:
new_wcs = self.wcs.dropaxis(0)
except:
new_wcs = copy.deepcopy(self[:,:,0].wcs)
sum_data = np.sum(self.data[:,:,w_indices[0]:w_indices[1]+1], axis=2)
new_meta = copy.deepcopy(self.meta)
new_meta['notes'].append('Summed wavelength axis over the range of '
+str(use_range)+' '+str(range_units[0]))
return NDCube(sum_data, new_wcs, meta=new_meta)
[docs]
def smooth_cube(self, width=3, **kwargs):
"""Smooth the data along one or more spatial axes.
Parameters
----------
width : list or single value of ints, floats, or `~astropy.units.Quantity`
Number of pixels or angular distance to smooth over. If given a
single value, only the y-axis will be smoothed. Floats and angular
distances will be converted to the nearest whole pixel value.
If a width value is even, width + 1 will be used instead.
Default is width = 3
**kwargs : keywords or dict
Keyword arguments to be passed to the astropy.convolution.convolve()
function.
Returns
-------
output_cube : `EISCube` class instance
A new EISCube class instance containing the smoothed data
"""
# Validate input width
num_dims = len(self.dimensions)
wid_list = [1]*num_dims # NB: a width of 1 results in no smoothing
if isinstance(width, (list, tuple)):
# Note: we assume the last dim is always wavelength
wid_list[0] = width[0]
if num_dims > 2:
wid_list[1] = width[1]
print('Warning: smoothing over the x-axis can yield unexpected'
+' results due to the time interval between observations.'
+' Use with care.')
if len(width) >= num_dims:
print('Warning: smoothing over the wavelength axis is not'
+' supported. Only widths for the Y & X axes will be used')
elif isinstance(width, (int, float, u.Quantity)):
wid_list[0] = width # Only smooth along y-axis
else:
print('Error: invalid width data type. Please input an int, float,'
+' or astropy.units.Quantity instance', file=sys.stderr)
return None
coord_ax = ['y', 'x', 'w']
for w in range(len(wid_list)-1):
# Parse a astropy.units.Quantity and convert to units of pixels
if isinstance(wid_list[w], u.Quantity):
if wid_list[w].unit == u.pix:
wid_list[w] = wid_list[w].value
elif not wid_list[w].unit.physical_type == 'angle':
print('Error: invalid width unit. Please input a pixel or'
+' angular unit.', file=sys.stderr)
return None
else:
try:
# Note: y & x scales are in units of [arcsec]/[pixel]
ax_scale = self.meta['pointing'][coord_ax[w]+'_scale']
except KeyError:
print('Error: missing '+coord_ax[w]+'-axis scale.')
return None
angular_wid_str = str(wid_list[w])
wid_list[w] = wid_list[w].to('arcsec').value / ax_scale
print('Note: on the '+coord_ax[w]+'-axis, '+angular_wid_str
+' is equivalent to '+str(wid_list[w])+' pixels.')
# Round to nearest pixel and add 1 to even values
wid_list[w] = int(round(wid_list[w]))
if wid_list[w] % 2 == 0:
wid_list[w] = wid_list[w] + 1
# Create smoothing kernel with normalized weights (i.e. sum to 1)
# Note: Using a 2D or 3D kernel allows us to smooth everything at once
sm_weights = np.ones(wid_list) / (wid_list[0]*wid_list[1])
sm_kernel = CustomKernel(sm_weights)
# Calculate smoothed data and uncertainty values
sm_data = convolve(self.data, sm_kernel, **kwargs)
if self.uncertainty is not None:
sm_errs = np.sqrt(convolve(self.uncertainty.array**2,
sm_kernel, **kwargs))
else:
sm_errs = None
sm_data_mask = np.logical_or(np.isnan(sm_data), sm_data < 0)
# Pack everything up in a new EISCube
old_radcal = self.radcal
new_meta = copy.deepcopy(self.meta)
new_meta['notes'].append('Smoothed using pixel widths of '+str(wid_list))
# wcs_mask = (np.array(tuple(reversed(self.wcs.array_shape))) <= 1).tolist()
output_cube = EISCube(sm_data, wcs=self.wcs, uncertainty=sm_errs,
wavelength=self.wavelength, radcal=old_radcal,
meta=new_meta, unit=self.unit,
# mask=sm_data_mask, missing_axes=wcs_mask)
mask=sm_data_mask)
return output_cube