Source code for speclite.filters

# Licensed under a 3-clause BSD style license - see LICENSE.rst
r"""Support for calculations involving filter response curves.

Overview
--------

See :doc:`/filters` for information about the standard filters included with
this code distribution and instructions for adding your own filters. Filter
names have two components, a group name and a band name, which are combined
with a hyphen, e.g. "sdss2010-r".  The group names included with this package
are:

    >>> filter_group_names
    ['sdss2010', 'sdss2010noatm', 'decam2014', 'wise2010', 'hsc2017', 'lsst2016', 'lsst2023', 'bessell', 'BASS', 'MzLS', 'Euclid', 'decamDR1', 'decamDR1noatm', 'gaiadr2', 'gaiadr3', 'twomass', 'galex', 'odin', 'suprime', 'cfht_megacam']

List the band names associated with any group using, for example:

    >>> load_filters('sdss2010-*').names
    ['sdss2010-u', 'sdss2010-g', 'sdss2010-r', 'sdss2010-i', 'sdss2010-z']

Here is a brief example of calculating SDSS r,i and Bessell V magnitudes for a
numpy array of fluxes for 100 spectra covering 4000-10,000 A with 1A pixels:

    >>> import astropy.units as u
    >>> wlen = np.arange(4000, 10000) * u.Angstrom
    >>> flux = np.ones((100, len(wlen))) * u.erg / (u.cm**2 * u.s * u.Angstrom)

Units are recommended but not required (otherwise, the units shown here are
assumed as defaults).  Next, load the filter responses:

    >>> import speclite.filters
    >>> filters = speclite.filters.load_filters(
    ...     'sdss2010-r', 'sdss2010-i', 'bessell-V')

Finally, calculate the magnitudes to obtain an :class:`astropy Table
<astropy.table.Table>` of results with one row per input spectrum and one
column per filter:

    >>> mags = filters.get_ab_magnitudes(flux, wlen)

If you prefer to work with a numpy structured array, you can convert the
returned table:

    >>> mags = filters.get_ab_magnitudes(flux, wlen).as_array()

.. _wavelength-padding:

Padding
-------

Under certain circumstances, you may need to pad an input spectrum to cover
the full wavelength range of the filters you are using.  See
:meth:`FilterResponse.pad_spectrum` and :meth:`FilterSequence.pad_spectrum` for
details on how this is implemented.

.. _shifted-filters:

Shifted Filters
---------------

It can be useful to work with a filter whose wavelengths have been shifted,
e.g. for the applications
in http://arxiv.org/abs/astro-ph/0205243.  This is supported with the
:meth:`FilterResponse.create_shifted` method, for example:

    >>> wlen = np.arange(4000, 10000) * u.Angstrom
    >>> flux = np.ones(len(wlen)) * u.erg / (u.cm**2 * u.s * u.Angstrom)
    >>> r0 = speclite.filters.load_filter('sdss2010-r')
    >>> rz = r0.create_shifted(band_shift=0.2)
    >>> print(np.round(r0.get_ab_magnitude(flux, wlen), 3))
    -21.362
    >>> print(np.round(rz.get_ab_magnitude(flux, wlen), 3))
    -20.966

Note that a shifted filter has a different wavelength coverage, so
may require :ref:`padding of your input spectra <wavelength-padding>`.

.. _convolution-operator:

Convolutions
------------

The filter response convolution operator implemented here is defined as:

.. math::

    F[R,f_\lambda] \equiv \int_0^\infty \frac{dg}{d\lambda}(\lambda)
    R(\lambda) \omega(\lambda) d\lambda

where :math:`R(\lambda)` is a filter response function, represented by a
:class:`FilterResponse` object, and :math:`f_\lambda \equiv dg/d\lambda` is an
arbitrary differential function of wavelength, which can either be represented
as a callable python object or else with arrays of wavelengths and function
values.

.. _weights:

The default weights:

.. math::

    \omega(\lambda) = \frac{\lambda}{h c}

are appropriate for photon-counting detectors such as CCDs, and enabled by
the default setting ``photon_weighted = True`` in the methods below.  Otherwise,
the convolution is unweighted, :math:`\omega(\lambda) = 1`, but arbitrary
alternative weights can always be included in the definition of
:math:`f_\lambda`. For example, a differential function of frequency
:math:`dg/d\nu` can be reweighted using:

.. math::

    f_\lambda(\lambda) = \frac{dg}{d\lambda}(\lambda) =
    \frac{c}{\lambda^2} \frac{dg}{d\nu}(\nu = c/\lambda)

These definitions make no assumptions about the units of
:math:`f_\lambda`, but magnitude calculations are an important special case
where the units of :math:`f_\lambda` must have the dimensions
:math:`M / (L T^3)`, for example,
:math:`\text{erg}/(\text{cm}^2\,\text{s}\,\AA)`.

.. _magnitude:

The magnitude of :math:`f_\lambda` in the filter band with response
:math:`R` is then:

.. math::

    m[R,f_\lambda] \equiv -2.5 \log_{10}(F[R,f_\lambda] / F[R,f_{\lambda,0}])

where :math:`f_{\lambda,0}(\lambda)` is the reference flux density that defines
the photometric system's zeropoint :math:`F[R,f_{\lambda,0}]`.  The zeropoint
has dimensions :math:`1 / (L^2 T)` and gives the rate of incident photons
per unit telescope area from a zero magnitude source.

A spectral flux density per unit frequency, :math:`f_\nu = dg/d\nu`, should be
converted using:

.. math::

    f_\lambda(\lambda) = \frac{c}{\lambda^2} f_\nu(\nu = c/\lambda)

for use with the methods implemented here.

For the AB system,

.. math::

    f_{\lambda,0}^{AB}(\lambda) = \frac{c}{\lambda^2} (3631 \text{Jy}) \; ,

and the convolutions use photon-counting weights.

.. _sampling:

Sampling
--------

Filter responses are tabulated on non-uniform grids with sufficient sampling
that linear interpolation is sufficient for most applications. When a filter
response is convolved with tabulated data, we must also consider the sampling
of the tabulated function. In this implementation, we assume that the tabulated
function is also sufficiently sampled to allow linear interpolation.

The next issue is how to sample the product of a filter response and tabulated
function when performing a numerical convolution integral.  With the assumptions
above, a safe strategy would be to sample the integrand at the union of the
two wavelength grids.  However, this is relatively expensive since it requires
interpolating both the response and the input function. Interpolation is
sometimes unavoidable: for example, when the function is linear and represented
by its values at only two wavelengths.

The approach adopted here is to use the sampling grid of the input function
to sample the convolution integrand whenever it samples the filter response
sufficiently.  This requires that the filter response be interpolated, but this
operation only needs to be performed once when many convolutions are performed
on the same input wavelength grid.  Our criteria for sufficient filter sampling
is that at most one filter wavelength point lies between any consecutive pair
of input wavelength points.  When this condition is not met, the input
function will be interpolated at the minimum number of response wavelengths
necessary to satisfy the condition.

The figure below (generated by :func:`this function
<filter_sampling_explanatory_plot>`) illustrates three different sampling
regimes that can occur
in a convolution with the Bessell V filter (red curve). Filled blue squares
show where the input function is sampled and open blue circles show where
interpolation of the input function is performed.  The left-hand plot shows
an extremely undersampled input that is interpolated at every filter point.
The middle plot shows a well sampled input that requires no interpolation.
The right-hand plot shows an input that is slightly undersampled and requires
interpolation at some of the filter points. All three methods give consistent
results, with discrepancies of < 0.05%.

.. image:: _static/sampling.png
    :alt: sampling explanatory plot

The logic described here is encapsulated in the :class:`FilterConvolution`
class.  Interpolation is performed automatically, as necessary, by the
high-level magnitude calculating methods, but :class:`FilterConvolution` is
available when more control of this process is needed to improve performance.

.. _performance:

Performance
-----------

If the performance of magnitude calculations is a bottleneck, you can
speed up the code significantly by taking advantage of the fact that all
of the convolution functions can operate on multidimensional arrays.  For
example, calling :meth:`FilterResponse.get_ab_magnitudes` once with an
array of 5000 spectra is about 10x faster than calling it 5000 times with the
individual spectra.  However, in order to take advantage of this speedup,
your spectra need to all use the same wavelength grid.

Note that the eliminating flux units (which are always optional) from your
input spectra will only result in about a 10% speedup, so units are generally
recommended.

Attributes
----------
filter_group_names : list
    List of filter group names included with this package.
default_wavelength_unit : :class:`astropy.units.Unit`
    The default wavelength units assumed when units are not specified.
    The same units are used to store wavelength values in internal arrays.
default_flux_unit : :class:`astropy.units.Unit`
    The default units for spectral flux density per unit wavelength.
"""
from __future__ import print_function, division # pragma: no cover

import os
import os.path
import glob
import re
import collections.abc

import numpy as np

import scipy.interpolate
import scipy.integrate

import astropy.table
import astropy.units
from .utils import get_path_of_data_file


filter_group_names = [
    'sdss2010', 'sdss2010noatm', 'decam2014', 'wise2010', 'hsc2017', 'lsst2016', 'lsst2023', 'bessell',
    'BASS', 'MzLS', 'Euclid', 'decamDR1', 'decamDR1noatm', 'gaiadr2', 'gaiadr3', 'twomass',
    'galex', 'odin', 'suprime', 'cfht_megacam']

default_wavelength_unit = astropy.units.Angstrom

default_flux_unit = (astropy.units.erg / astropy.units.cm**2 /
                     astropy.units.s / default_wavelength_unit)

# Constant spectral flux density per unit frequency for a zero magnitude
# AB source.
_ab_constant = (
    3631. * astropy.units.Jansky * astropy.constants.c).to(
        default_flux_unit * default_wavelength_unit**2)

# The units specified below give AB zeropoints in 1 / (cm**2 s).
_hc_constant = (astropy.constants.h * astropy.constants.c).to(
    astropy.units.erg * default_wavelength_unit)

_photon_weighted_unit = default_wavelength_unit**2 / _hc_constant.unit

# Map names to integration methods allowed by the convolution methods below.
_filter_integration_methods = dict(
    trapz= scipy.integrate.trapz,
    simps= scipy.integrate.simps)

# Group and band names must be valid python identifiers. Although a leading
# underscore is probably not a good idea, it is simpler to stick with a
# well-established lexical class.
# https://docs.python.org/2/reference/lexical_analysis.html#identifiers
_name_pattern = re.compile('^[a-zA-Z_][a-zA-Z0-9_]*\\Z')

# The wildcard pattern is "<group_name>-*" and captures <group_name>.
_group_wildcard = re.compile('^([a-zA-Z_][a-zA-Z0-9_]*)-\\*\\Z')

# Split a valid canonical name "<group_name>-<band_name>" into its components.
_full_name_pattern = re.compile(
    '^([a-zA-Z_][a-zA-Z0-9_]*)-([a-zA-Z_][a-zA-Z0-9_]*)\\Z')

# Dictionary of cached FilterResponse objects.
_filter_cache = {}


[docs] def ab_reference_flux(wavelength, magnitude=0.): """Calculate an AB reference spectrum with the specified magnitude. For example, to calculate the flux of a 20th magnitude AB reference at 600 nm: >>> flux = ab_reference_flux(600 * astropy.units.nanometer, magnitude=20) >>> print('{0:.3g}'.format(flux)) 3.02e-17 erg / (Angstrom cm2 s) This function is used to calculate :attr:`filter response zeropoints <FilterResponse.ab_zeropoint>` in the AB system. If either of the parameters is an array, the result will be broadcast over the parameters using the usual numpy rules. Parameters ---------- wavelength : astropy.units.Quantity Wavelength or array of wavelengths where the flux should be evaluated. Wavelengths must have valid units. magnitude : float or array Dimensionless magnitude value(s) used to normalize the spectrum. Returns ------- astropy.units.Quantity Spectral flux densities per unit wavelength tabulated at each input wavelength, in units of :attr:`default_flux_unit`. Raises ------ ValueError Wavelength parameter does not have valid units. """ magnitude = np.asarray(magnitude) try: wavelength = wavelength.to(default_wavelength_unit) except (AttributeError, astropy.units.UnitConversionError): raise ValueError('Cannot evaluate flux for invalid wavelength.') flux = 10 ** (-0.4 * magnitude) * _ab_constant / wavelength ** 2 return flux.to(default_flux_unit)
[docs] def validate_wavelength_array(wavelength, min_length=0): """Validate a wavelength array for filter operations. This function will not perform any copying or allocation if the input is already a numpy array or astropy Quantity. Parameters ---------- wavelength : array A 1D array of strictly increasing wavelength values with optional units. If units are included, they must be convertible to :attr:`default_wavelength_unit`. Otherwise, the :attr:`default_wavelength_unit` is assumed. min_length : int The minimum required length of the wavelength array. Returns ------- numpy.ndarray Array of validated wavelengths without any units, but with values given in :attr:`default_wavelength_unit`. Raises ------ ValueError Wavelength array is not 1D, or not strictly increasing, or below the minimum length. astropy.units.UnitConversionError The wavelength array has units that are not convertible to :attr:`default_wavelength_unit` """ try: # This multiplies by a scalar conversion factor, not a unit. wavelength_no_units = wavelength.to(default_wavelength_unit).value except AttributeError: # No units present, so assume default units. wavelength_no_units = np.asarray(wavelength) if np.isscalar(wavelength_no_units) or len(wavelength_no_units.shape) != 1: raise ValueError('Wavelength array must be 1D.') if len(wavelength_no_units) < min_length: raise ValueError('Minimum length is {0}.'.format(min_length)) if not np.all(np.diff(wavelength_no_units) > 0): raise ValueError('Wavelength values must be strictly increasing.') return wavelength_no_units
[docs] def tabulate_function_of_wavelength(function, wavelength, verbose=False): """Evaluate a function of wavelength. Parameters ---------- function : callable Any function that expects a wavelength or array of wavelengths and returns its value. Functions will be called first with wavelength units included and then without units included, in which case they should treat all wavelengths as having :attr:`default_wavelength_unit`. If a function returns a value with units, this will be correctly propagated to the output. wavelength : astropy.units.Quantity Wavelength or array of wavelengths where the function should be evaluated. Wavelengths must have valid units. verbose : bool Print details of the sequence of attempts used to call the function. Returns ------- tuple Tuple (values, units) of function values at each input wavelength. """ try: wavelength = wavelength.to(default_wavelength_unit) except (AttributeError, astropy.units.UnitConversionError): raise ValueError('Cannot evaluate function for invalid wavelength.') function_units = None # Try broadcasting our wavelength array with its units. if verbose: print('Trying to broadcast with units.') try: function_values = function(wavelength) try: function_units = function_values.unit function_values = function_values.value except AttributeError: # Ok if the function does not return any units. pass return function_values, function_units except Exception as e: # Keep trying. if verbose: print('Failed: {0}'.format(e)) # Try broadcasting our wavelength array without its units. if verbose: print('Trying to broadcast without units.') try: function_values = function(wavelength.value) try: function_units = function_values.unit function_values = function_values.value except AttributeError: # Ok if the function does not return any units. pass return function_values, function_units except Exception as e: # Keep trying. if verbose: print('Failed: {0}'.format(e)) # Try looping over wavelengths and including units. if verbose: print('Trying to iterate with units.') try: function_values = [] for i, w in enumerate(wavelength.value): value = function(w * default_wavelength_unit) # Check that the function is consistent in the units it returns. if i == 0: # Remember the units of the first function value. try: function_units = value.unit value = value.value except AttributeError: # Ok if the function does not return any units. pass elif function_units == None: try: new_units = value.unit # Function has units now but did not earlier. raise RuntimeError( 'Inconsistent function units: none, {0}.' .format(new_units)) except AttributeError: # Still no units, as expected. pass else: try: if function_units != value.unit: # Function units have changed. raise RuntimeError( 'Inconsistent function units: {0}, {1}.' .format(function_units, value.unit)) except AttributeError: # Function had units before but does not now. raise RuntimeError( 'Inconsistent function units: {0}, none.' .format(function_units)) value = value.value function_values.append(value) function_values = np.asarray(function_values) return function_values, function_units except RuntimeError as e: raise e except Exception as e: # Keep trying. if verbose: print('Failed: {0}'.format(e)) # Try looping over wavelengths and not including units. if verbose: print('Trying to iterate without units.') try: function_values = [] for i, w in enumerate(wavelength.value): value = function(w) # Check that the function is consistent in the units it returns. if i == 0: # Remember the units of the first function value. try: function_units = value.unit value = value.value except AttributeError: # Ok if the function does not return any units. pass elif function_units == None: try: new_units = value.unit # Function has units now but did not earlier. raise RuntimeError( 'Inconsistent function units: none, {0}.' .format(new_units)) except AttributeError: # Still no units, as expected. pass else: try: if function_units != value.unit: # Function units have changed. raise RuntimeError( 'Inconsistent function units: {0}, {1}.' .format(function_units, value.unit)) except AttributeError: # Function had units before but does not now. raise RuntimeError( 'Inconsistent function units: {0}, none.' .format(function_units)) value = value.value function_values.append(value) function_values = np.asarray(function_values) return function_values, function_units except RuntimeError as e: raise e except Exception as e: if verbose: print('Failed: {0}'.format(e)) # If we get here, none of the above strategies worked. raise ValueError('Invalid function.')
[docs] class FilterResponse(object): """A filter response curve tabulated in wavelength. Some standard filters are included in this package and can be initialized using :func:`load_filter`. For example: >>> rband = load_filter('sdss2010-r') Objects behave like functions that evaluate their response at aribtrary wavelengths. Wavelength units can be specified, or else default to :attr:`default_wavelength_unit`: >>> round(rband(6000 * astropy.units.Angstrom), 4) 0.4692 >>> round(rband(6000), 4) 0.4692 >>> round(rband(0.6 * astropy.units.micron), 4) 0.4692 Filters can be also evaluated for an arbitrary array of wavelengths, returning a numpy array of response values: >>> resp = rband([5980, 6000, 6020]) >>> np.round(resp[1], 4) 0.4692 The effective wavelength of a filter is defined as the :ref:`photon-weighted <weights>` mean wavelength: .. math:: \lambda_{eff} \equiv F[R, \lambda] / F[R, 1] where :math:`F` is our convolution operator defined :ref:`above <convolution-operator>`. Use the :attr:`effective_wavelength` attribute to access this value: >>> print(np.round(rband.effective_wavelength, 1)) 6205.8 Angstrom The examples below show three different ways to calculate the AB magnitude in the ``sdss2010-r`` filter for a source with a constant spectral flux density per unit wavelength of :math:`10^{-17} \\text{erg}/(\\text{cm}^2\, \\text{s} \,\AA)`. First, we specify the spectrum with a function object: >>> flux = lambda wlen: 1e-17 * default_flux_unit >>> print(rband.get_ab_magnitude(flux).round(3)) 21.138 Next, we tabulate a constant flux using only two wavelength points that span the filter response: >>> wlen = [5300, 7200] * default_wavelength_unit >>> flux = [1e-17, 1e-17] * default_flux_unit >>> print(rband.get_ab_magnitude(flux, wlen).round(3)) 21.138 Since this spectrum undersamples the filter response, it is automatically interpolated. Finally, we tabulate a constant flux using a dense wavelength grid that oversamples the filter response and does not require any interpolation: >>> wlen = np.linspace(5300, 7200, 200) * default_wavelength_unit >>> flux = np.ones_like(wlen.value) * 1e-17 * default_flux_unit >>> print(rband.get_ab_magnitude(flux, wlen).round(3)) 21.138 Filters can have an optional wavelength shift applied. See http://arxiv.org/abs/astro-ph/0205243 for applications. Shifted filters can be created from non-shifted filters using :meth:`create_shifted`. Parameters ---------- wavelength : array A :func:`valid array <validate_wavelength_array>` of wavelengths. response : array A dimensionless 1D array of filter response values corresponding to each wavelength. Response values must be non-negative and cannot all be zero. The bounding response values must be zero, and the response is assumed to be zero outside of the specified wavelength range. If this parameter has units, they will be silently ignored. meta : dict A dictionary of metadata which must include values for the keys ``group_name`` and ``band_name``, which must be `valid python identifiers <https://docs.python.org/2/reference/lexical_analysis.html#identifiers>`__. However, you are encouraged to provide the full set of keys listed :doc:`here </filters>`, and additional keys are also permitted. band_shift : float or None A shift to apply to filter wavelengths (but not response values). A shifted filter cannot be saved or further shifted, but can otherwise be used like any other filter. Attributes ---------- name : str Canonical name of this filter response in the format "<group_name>-<band_name>". effective_wavelength : :class:`astropy.units.Quantity` Mean :ref:`photon-weighted <weights>` wavelength of this response function, as defined above. ab_zeropoint : :class:`astropy.units.Quantity` Zeropoint for this filter response in the AB system, as defined :ref:`above <magnitude>`, and including units. meta : dict Dictionary of metadata associated with this filter. interpolator : :class:`scipy.interpolate.interp1d` Linear interpolator of our response function that returns zero for all values outside our wavelength range. Should normally be evaluated through our :meth:`__call__` convenience method. Raises ------ ValueError Invalid wavelength or response input arrays, or missing required keys in the input metadata. """ def __init__(self, wavelength, response, meta, band_shift=None): self._wavelength = validate_wavelength_array(wavelength, min_length=3) if band_shift is not None: if band_shift <= -1: raise ValueError( 'Invalid filter band_shift <= -1: {0}.'.format(band_shift)) self._wavelength = self._wavelength / (1 + band_shift) self.band_shift = band_shift # If response has units, np.asarray() makes a copy and drops the units. self._response = np.asarray(response) if len(self._wavelength) != len(self._response): raise ValueError('Arrays must have same length.') # Check for a valid response curve. if np.any(self._response < 0): raise ValueError('Response values must be non-negative.') if np.all(self._response == 0): raise ValueError('Response values cannot all be zero.') if not (self._response[0] == 0 and self._response[-1] == 0): raise ValueError('Response must go to zero on both sides.') # Trim any extra leading and trailing zeros. non_zero = np.where(self._response > 0)[0] start, stop = non_zero[0] - 1, non_zero[-1] + 2 if stop - start < len(self._wavelength): self._wavelength = self._wavelength[start: stop] self._response = self._response[start: stop] # Check for the required metadata fields. try: self.meta = dict(meta) except TypeError: raise ValueError('Invalid metadata dictionary.') for required in ('group_name', 'band_name'): if required not in self.meta: raise ValueError( 'Metadata missing required key "{0}".'.format(required)) try: if not _name_pattern.match(meta[required]): raise ValueError( 'Value of {0} is not a valid identifier: "{1}"' .format(required, meta[required])) except TypeError: raise ValueError('Invalid value type for {0}.'.format(required)) # Create a linear interpolator of our response function that returns # zero outside of our wavelength range. self.interpolator = scipy.interpolate.interp1d( self._wavelength, self._response, kind='linear', copy=False, bounds_error=False, fill_value=0.) # Calculate this filter's effective wavelength. one = astropy.units.Quantity(1.) numer = self.convolve_with_function(lambda wlen: wlen) denom = self.convolve_with_function(lambda wlen: one) self.effective_wavelength = numer / denom # Calculate this filter's zeropoint in the AB system. self.ab_zeropoint = self.convolve_with_function( ab_reference_flux, units=default_flux_unit) self.name = '{0}-{1}'.format(meta['group_name'], meta['band_name']) if self.band_shift is None: # Remember this object in our cache so that load_filter can find it. # In case this object is already in our cache, overwrite it now. _filter_cache[self.name] = self else: # Add the band_shift to the name. self.name += '-shift({0})'.format(self.band_shift) @property def wavelength(self): """Return the wavelength array of the filter Returns ------- np.ndarray An array of wavelengths with their associated units after trimming any extra leading or trailing zero response values. """ return self._wavelength @property def response(self): """Return the response curve array of the filter Returns ------- np.ndarray An array of response values after trimming any extra leading or trailing zero response values. """ return self._response
[docs] def create_shifted(self, band_shift): """Create a copy of this filter response with shifted wavelengths. A filter response :math:`R(\lambda)` is transformed to blue-shifted response :math:`R((1+z)\lambda)` by shifting the wavelengths where its response values are tabulated from :math:`\lambda_i` to :math:`\lambda_i/(1+z)`. Note that only the tabulated wavelengths are transformed and not the response values. See http://arxiv.org/abs/astro-ph/0205243 for applications. A shifted filter cannot be saved or further shifted, but can otherwise be used like any other filter. Parameters ---------- band_shift : float or None Shift to apply to filter wavelengths (but not response values). Returns ------- FilterResponse A new filter object that is a copy of this filter but with the specified wavelength shift applied. Raises ------ RuntimeError Cannot apply a second wavelength shift to a filter response. """ if self.band_shift is not None: raise RuntimeError( 'Cannot apply a second wavelength shift to a filter response.') return FilterResponse( self._wavelength, self._response, self.meta, band_shift=band_shift)
[docs] def __call__(self, wavelength): """Evaluate the filter response at arbitrary wavelengths. Parameters ---------- wavelength : array or float A single wavelength value or an array of wavelengths. If units are included, they will be correctly interpreted. Otherwise :attr:`default_wavelength_unit` is assumed. Returns ------- numpy.ndarray Numpy array of response values corresponding to each input wavelength. Raises ------ astropy.units.UnitConversionError Input wavelength(s) have unit that is not convertible to :attr:`default_wavelength_unit`. """ # Use asanyarray() so that a Quantity with units is not copied here. wavelength = np.asanyarray(wavelength) try: wavelength = wavelength.to(default_wavelength_unit).value except AttributeError: # No units present, so assume the default units. pass response = self.interpolator(wavelength) # If the input was scalar, return a scalar. if response.shape == (): response = response.item() return response
[docs] def save(self, directory_name='.', overwrite=True): """Save this filter response to file. The response is saved in the `ECSV format <https://github.com/astropy/astropy-APEs/blob/master/APE6.rst>`__ and can be read back by passing the returned path to :func:`load_filter`:: file_name = response.save() response2 = load_filter(file_name) The file name in the specified directory will be "<group_name>-<band_name>.ecsv". Any existing file with the same name will be silently overwritten. Saved filter responses cannot have any wavelength shift applied since their file naming convention does not distinguish between different shifts. Therefore, attempting to save a filter with a wavelength shift applied will raise a RuntimeError. Parameters ---------- directory_name : str An existing directory where the response file should be written. overwrite : bool Silently overwrite this file when True. Returns ------- str Absolute path of the created file, including the .ecsv extension. Raises ------ ValueError Directory name does not exist or refers to a file. RuntimeError Filter response has a wavelength shift applied. """ if self.band_shift is not None: raise RuntimeError('Cannot save a shifted filter response.') if not os.path.isdir(directory_name): raise ValueError('Invalid directory name.') table = astropy.table.QTable(meta=self.meta) table['wavelength'] = self._wavelength * default_wavelength_unit table['response'] = self._response name = os.path.join( directory_name, '{0}-{1}.ecsv'.format( self.meta['group_name'], self.meta['band_name'])) table.write(name, format='ascii.ecsv', formats={'wavelength': repr, 'response': repr}, overwrite=overwrite) return os.path.abspath(name)
[docs] def convolve_with_function(self, function, photon_weighted=True, units=None, method='trapz'): """Convolve this response with a function of wavelength. Returns a numerical estimate of the convolution integral :math:`F[R,f]` defined above for an arbitrary function of wavelength :math:`f(\lambda)`. For example, to calculate a filter's effective wavelength: >>> rband = load_filter('sdss2010-r') >>> one = astropy.units.Quantity(1.) >>> numer = rband.convolve_with_function(lambda wlen: wlen) >>> denom = rband.convolve_with_function(lambda wlen: one) >>> print(np.round(numer / denom, 1)) 6205.8 Angstrom Similarly, a filter's zeropoint can be calculated using: >>> zpt = rband.convolve_with_function(ab_reference_flux) >>> print(zpt.round(1)) 493486.7 1 / (cm2 s) Note that both of these values are pre-calculated in the constructor and are available from the :attr:`effective_wavelength` and :attr:`ab_zeropoint` attributes. Parameters ---------- function : callable Any function that expects a wavelength or array of wavelengths and returns its value. Functions will be called first with wavelength units included and then without units included, in which case they should treat all wavelengths as having :attr:`default_wavelength_unit`. If a function returns a value with units, this will be correctly propagated to the convolution result. photon_weighted : bool Use :ref:`weights <weights>` appropriate for a photon-counting detector such as a CCD when this parameter is True. Otherwise, use unit weights. units : astropy.units.Quantity or None When this parameter is not None, then any explicit units returned by the function must be convertible to these units, and these units will be applied if the function values do not already have units. method : str Specifies the numerical integration scheme to use and must be either 'trapz' or 'simps', to select the corresponding ``scipy.integration`` function. The 'simps' method may be more accurate than the default 'trapz' method, but should be used with care since it is also less robust and more sensitive to the wavelength grid. Returns ------- float or astropy.units.Quantity Result of the convolution integral. Units will be included if the function returns a value with units. Otherwise, the implicit units of the result are equal to the implicit function value units multiplied by :attr:`default_wavelength_unit`. Raises ------ ValueError Function does not behave as expected or invalid method. RuntimeError Function returns inconsistent units. """ if method not in _filter_integration_methods.keys(): raise ValueError( 'Invalid integration method {0}. Pick one of {1}.' .format(method, _filter_integration_methods.keys())) # Try to tabulate the function to integrate on our wavelength grid. integrand, func_units = \ tabulate_function_of_wavelength( function, self._wavelength * default_wavelength_unit) if units is not None: if func_units is not None: try: converted = func_units.to(units) except astropy.units.UnitConversionError: raise ValueError( 'Function units {0} not convertible to {1}.' .format(func_units, units)) else: func_units = units # Build the integrand by including appropriate weights. integrand *= self._response if photon_weighted: integrand *= self._wavelength / _hc_constant.value if func_units is not None: func_units *= default_wavelength_unit / _hc_constant.unit integrator = _filter_integration_methods[method] result = integrator(y = integrand, x=self._wavelength) # Apply units to the result if the fuction has units. if func_units is not None: result = result * func_units * default_wavelength_unit return result
[docs] def convolve_with_array(self, wavelength, values, photon_weighted=True, interpolate=False, axis=-1, units=None, method='trapz'): """Convolve this response with a tabulated function of wavelength. This is a convenience method that creates a temporary :class:`FilterConvolution` object to perform the convolution. See that class' documentation for details on this method's parameters and usage. See also the notes :ref:`above <sampling>` about how the convolution integrand is sampled. Parameters ---------- wavelength : array A :func:`valid array <validate_wavelength_array>` of wavelengths that must cover the full range of this filter's response. values : array or :class:`astropy.units.Quantity` Array of function values to use. Values are assumed to be tabulated on our wavelength grid. Values can be multidimensional, in which case an array of convolution results is returned. If values have units, then these are propagated to the result. photon_weighted : bool Use :ref:`weights <weights>` appropriate for a photon-counting detector such as a CCD when this parameter is True. Otherwise, use unit weights. interpolate : bool Allow interpolation of the tabulated function if necessary. See :class:`FilterConvolution` for details. axis : int In case of multidimensional function values, this specifies the index of the axis corresponding to the wavelength dimension. units : astropy.units.Quantity or None When this parameter is not None, then any explicit values units must be convertible to these units, and these units will be applied to the values if they do not already have units. method : str Specifies the numerical integration scheme to use. See :class:`FilterConvolution` for details. Returns ------- float or array or :class:`astropy.units.Quantity` Convolution integral result(s). If the input values have units then these are propagated to the result(s). If the input is multidimensional, then so is the result but with the specified axis integrated out. """ convolution = FilterConvolution( self, wavelength, photon_weighted, interpolate, units) return convolution(values, axis, method)
[docs] def get_ab_maggies(self, spectrum, wavelength=None, axis=-1): """Calculate a spectrum's relative AB flux convolution. The result is the dimensionless ratio :math:`F[R,f_\lambda] / F[R,f_{\lambda,0}]` defined :ref:`above <convolution-operator>`, and provides a linear measure of a source's flux through this filter relative to an AB standard flux. Use :meth:`get_ab_magnitude` for the corresponding AB magnitude. Parameters ---------- spectrum : callable or array or :class:`astropy.units.Quantity` The spectrum whose flux should be compared with the AB standard flux in this filter band. Can either be a callable object (see :meth:`convolve_with_function` for details) or else an array (See :meth:`convolve_with_array` for details). A multidimensional array can be used to calculate maggies for many spectra at once. The spectrum fluxes must either have explicit units that are convertible to :attr:`default_flux_unit`, or else they will be implicitly interpreted as having these default units. wavelength : array or :class:`astropy.units.Quantity` or None When this parameter is None, the spectrum must be a callable object. Otherwise, the spectrum must be a :func:`valid array <validate_wavelength_array>` of wavelengths that must cover the full range of this filter's response. A spectrum that does not cover this filter can be padded first using :meth:`pad_spectrum`. axis : int The axis along which wavelength increases in a spectrum array. Ignored when the wavelength parameter is None. Returns ------- float or array """ if wavelength is None: convolution = self.convolve_with_function( spectrum, photon_weighted=True, units=default_flux_unit) else: # Allow interpolation since this is a convenience method. convolution = self.convolve_with_array( wavelength, spectrum, photon_weighted=True, interpolate=True, axis=axis, units=default_flux_unit) try: return convolution.value / self.ab_zeropoint.value except AttributeError: return convolution / self.ab_zeropoint.value
[docs] def get_ab_magnitude(self, spectrum, wavelength=None, axis=-1): """Calculate a spectrum's AB magnitude. Use :meth:`get_ab_maggies` for the corresponding dimensionless ratio :math:`F[R,f_\lambda] / F[R,f_{\lambda,0}]`. The magnitude is calculated as: .. math: -2.5 \log_{10}(F[R,f_\lambda] / F[R,f_{\lambda,0}]) Parameters ---------- spectrum : callable or array or :class:`astropy.units.Quantity` See :meth:`get_ab_maggies` for details. wavelength : array or :class:`astropy.units.Quantity` or None See :meth:`get_ab_maggies` for details. axis : int See :meth:`get_ab_maggies` for details. Returns ------- float or array """ maggies = self.get_ab_maggies(spectrum, wavelength, axis) return -2.5 * np.log10(maggies)
_pad_methods = ('median', 'zero', 'edge')
[docs] def pad_spectrum(self, spectrum, wavelength, axis=-1, method='median'): """Pad a tabulated spectrum to cover this filter's response. This is a convenience method that pads an input spectrum (or spectra) so that it covers this filter's response and can then be used for convolutions and magnitude calculations: >>> rband = load_filter('decam2014-r') >>> wlen = np.arange(4000, 10000) >>> flux = np.ones((100, len(wlen))) >>> mag = rband.get_ab_magnitude(flux, wlen) Traceback (most recent call last): ... ValueError: Wavelengths do not cover decam2014-r response 3330.0-10990.0 Angstrom. >>> flux, wlen = rband.pad_spectrum(flux, wlen) >>> mag = rband.get_ab_magnitude(flux, wlen) This method does not attempt to perform a physically motivated extrapolation, so should only be used under special circumstances. An appropriate use would be to extend a spectrum to cover wavelengths where the filter response is almost zero or the spectrum is known to be essentially flat. The three padding methods implemented here are borrowed from :func:`numpy.pad`: - median: Pad with the median flux value. - zero: Pad with a zero flux value. - edge: Pad with the edge flux values. If the results from these methods are not in good agreement, you should probably not be padding your spectrum. Parameters ---------- spectrum : array or :class:`astropy.units.Quantity` See :meth:`get_ab_maggies` for details. This method does not check for valid flux density units, if any are specified. wavelength : array or :class:`astropy.units.Quantity` or None See :meth:`get_ab_maggies` for details. axis : int See :meth:`get_ab_maggies` for details. method : str Must be one of 'median', 'zero', or 'edge'. Returns ------- tuple A tuple (padded_spectrum, padded_wavelength) that replaces the inputs with padded equivalents. """ if method not in self._pad_methods: raise ValueError( "Invalid method '{0}'. Pick one of {1}." .format(method, self._pad_methods)) wavelength_value = validate_wavelength_array(wavelength) num_pad_before, num_pad_after = 0, 0 if self._wavelength[0] < wavelength_value[0]: num_pad_before = len( np.where(self._wavelength < wavelength_value[0])[0]) if self._wavelength[-1] > wavelength_value[-1]: num_pad_after = len( np.where(self._wavelength > wavelength_value[-1])[0]) if num_pad_before == 0 and num_pad_after == 0: # Return the inputs. return spectrum, wavelength padded_wavelength = np.hstack(( self._wavelength[:num_pad_before], wavelength_value, self._wavelength[len(self._wavelength) - num_pad_after:])) try: # Restore the input wavelength units, if any. wavelength_unit = wavelength.unit padded_wavelength = ( padded_wavelength * default_wavelength_unit).to(wavelength_unit) except AttributeError: pass try: # Remove and remember the input flux units, if any. spectrum_unit = spectrum.unit spectrum_value = spectrum.value except AttributeError: spectrum_unit = None spectrum_value = spectrum padding = [(0, 0)] * len(spectrum_value.shape) padding[axis] = (num_pad_before, num_pad_after) if method == 'median': pad_mode='median' elif method == 'zero': pad_mode='constant' # use default constant_values = 0. elif method == 'edge': pad_mode='edge' padded_spectrum = np.pad(spectrum_value, padding, mode=pad_mode) if spectrum_unit is not None: # Restore the input flux units, if any. padded_spectrum = padded_spectrum * spectrum_unit return padded_spectrum, padded_wavelength
[docs] class FilterConvolution(object): """Convolve a filter response with a tabulated function. See :ref:`above <convolution-operator>` for details on how the convolution operator implemented by this class is defined, and :ref:`here <sampling>` for details on how the convolution integrand is sampled. Most of the computation involved depends only on the tabulated function's wavelength grid, and not on the function values, so this class does the necessary initialization in its constructor, resulting in a function object that can be efficiently re-used with different function values. Use this class to efficiently perform repeated convolutions of different tabulated functions for the same filter on the same wavelength grid. When efficiency is not important or the wavelength grid changes for each convolution, the convencience method :meth:`FilterResponse.convolve_with_array` can be used instead. Parameters ---------- response : :class:`FilterResponse` or str A FilterResponse object or else a fully qualified name of the form "<group_name>-<band_name>", which will be loaded using :func:`load_filter`. wavelength : array A :func:`valid array <validate_wavelength_array>` of wavelengths that must cover the full range of the filter response. photon_weighted : bool Use :ref:`weights <weights>` appropriate for a photon-counting detector such as a CCD when this parameter is True. Otherwise, use unit weights. interpolate : bool Allow interpolation of the tabulated function if necessary. Interpolation is required if two or more of the wavelengths where the filter response is tabulated fall between a consecutive pair of input wavelengths. Linear interpolation is then performed to estimate the input function at the undersampled filter response wavelengths. Interpolation has a performance impact when :meth:`evaluating <__call__>` a convolution, so is not enabled by default and can usually be avoided with finer sampling of the input function. units : astropy.units.Quantity or None When this parameter is not None, then any explicit units attached to the values passed to a :meth:`convolution <__call__>` must be convertible to these units. When values are passed without units they are assumed to be in these units. Attributes ---------- response : :class:`FilterResponse` The filter response used for this convolution. input_units : :class:`astropy.units.Unit` Units expected for values passed to :meth:`__call__`. output_units : :class:`astropy.units.Unit` Units of :meth:`__call__` result. num_wavelength : int The number of wavelengths used to tabulate input functions. response_grid : numpy.ndarray Array of filter response values evaluated at each ``wavelength``. response_slice : slice Slice of the input wavelength grid used for convolution. interpolate_wavelength : numpy.ndarray or None Array of wavelengths where the input function will be interpolated. interpolate_response : numpy.ndarray or None Array of filter response values at each ``interpolate_wavelength``. interpolate_sort_order : numpy.ndarray or None Integer array specifying the sort order required to combine ``wavelength`` and ``interpolate_wavelength``. quad_wavelength : numpy.ndarray Array of wavelengths used for numerical quadrature, combining both ``wavelength`` and ``interpolate_wavelength``. quad_weight : :class:`astropy.units.Quantity` or None Array of weights corresponding to each ``quad_wavelength``. Will be None if the parameter ``photon_weighted = False``. """ def __init__(self, response, wavelength, photon_weighted=True, interpolate=False, units=None): if isinstance(response, str): self._response = load_filter(response) else: self._response = response self._wavelength = validate_wavelength_array(wavelength, min_length=2) self.num_wavelength = len(self._wavelength) # Check if extrapolation would be required. under = (self._wavelength[0] > self._response._wavelength[0]) over = (self._wavelength[-1] < self._response._wavelength[-1]) if under or over: raise ValueError( 'Wavelengths do not cover {0} response {1:.1f}-{2:.1f} {3}.' .format(self._response.name, self._response._wavelength[0], self._response._wavelength[-1], default_wavelength_unit)) # Find the smallest slice that covers the non-zero range of the # integrand. start, stop = 0, len(self._wavelength) if self._wavelength[0] < self._response._wavelength[0]: start = np.where( self._wavelength <= self._response._wavelength[0])[0][-1] if self._wavelength[-1] > self._response._wavelength[-1]: stop = 1 + np.where( self._wavelength >= self._response._wavelength[-1])[0][0] # Trim the wavelength grid if possible. self._response_slice = slice(start, stop) if start > 0 or stop < len(self._wavelength): self._wavelength = self._wavelength[self._response_slice] # Linearly interpolate the filter response to our wavelength grid. self._response_grid = self._response(self._wavelength) # Test if our grid is samples the response with sufficient density. Our # criterion is that at most one internal response wavelength (i.e., # excluding the endpoints which we treat separately) falls between each # consecutive pair of our wavelength grid points. insert_index = np.searchsorted( self._wavelength, self._response._wavelength[1:]) undersampled = np.diff(insert_index) == 0 if np.any(undersampled): undersampled = 1 + np.where(undersampled)[0] if interpolate: # Interpolate at each undersampled wavelength. self.interpolate_wavelength = ( self._response._wavelength[undersampled]) self.interpolate_response = self._response.response[undersampled] self.quad_wavelength = np.hstack( [self._wavelength, self.interpolate_wavelength]) self.interpolate_sort_order = np.argsort(self.quad_wavelength) self.quad_wavelength = self.quad_wavelength[ self.interpolate_sort_order] else: raise ValueError( 'Wavelengths undersample the response ' + 'and interpolate is False.') else: self.interpolate_wavelength = None self.interpolate_response = None self.interpolate_sort_order = None self.quad_wavelength = self._wavelength # Replace the quadrature endpoints with the actual filter endpoints # to eliminate any overrun. if self.quad_wavelength[0] < self._response._wavelength[0]: self.quad_wavelength[0] = self._response._wavelength[0] if self.quad_wavelength[-1] > self._response._wavelength[-1]: self.quad_wavelength[-1] = self._response._wavelength[-1] if photon_weighted: # Precompute the weights to use. self.quad_weight = self.quad_wavelength / _hc_constant.value else: self.quad_weight = None # Save the expected input value units. self.input_units = units if self.input_units is not None: # Calculate the output value units. if photon_weighted: self.output_units = self.input_units * _photon_weighted_unit else: self.output_units = self.input_units * default_wavelength_unit else: self.output_units = None
[docs] def __call__(self, values, axis=-1, method='trapz', plot=False): """Evaluate the convolution for arbitrary tabulated function values. Parameters ---------- values : array or :class:`astropy.units.Quantity` Array of function values to use. Values are assumed to be tabulated on our wavelength grid. Values can be multidimensional, in which case an array of convolution results is returned. If values have units, then these are propagated to the result. axis : int In case of multidimensional function values, this specifies the index of the axis corresponding to the wavelength dimension. method : str Specifies the numerical integration scheme to use and must be either 'trapz' or 'simps', to select the corresponding ``scipy.integration`` function. The 'simps' method may be more accurate than the default 'trapz' method, but should be used with care since it is also less robust and more sensitive to the wavelength grids. plot : bool Displays a plot illustrating how the convolution integrand is constructed. Requires that the matplotlib package is installed and does not support multidimensional input values. This option is primarily intended for debugging and to generate figures for the documentation. Returns ------- float or numpy.ndarray or :class:`astropy.units.Quantity` Convolution integral result. If the input values have units then these are propagated to the result, including the units associated with the photon weights (if these are selected). Otherwise, the result will be a plain float or numpy array. If the input is multidimensional, then so is the result but with the specified axis integrated out. """ if method not in _filter_integration_methods.keys(): raise ValueError( 'Invalid method "{0}", pick one of {1}.' .format(method, _filter_integration_methods.keys())) # Check that input units are compatible with expected units # and initialize array of values without units. try: values_no_units = values.to(self.input_units).value input_has_units = True except AttributeError: # Input values have no units, so we assume that they are in # self.input_units if these are defined, or else that the caller # knows what they are doing. values_no_units = np.asarray(values) input_has_units = False except TypeError: # The input values have units but self.input_units is None. raise ValueError( 'Must specify expected units for values with units.') except astropy.units.UnitConversionError: raise ValueError( 'Values units {0} not convertible to {1}.' .format(values.unit, self.input_units)) # Check values shape and slice out the subarray that overlaps # the filter we are convolving with. if values_no_units.shape[axis] != self.num_wavelength: raise ValueError( 'Expected {0} values along axis {1}.' .format(len(self._wavelength), axis)) values_slice = [slice(None)] * len(values_no_units.shape) values_slice[axis] = self._response_slice values_no_units = values_no_units[tuple(values_slice)] if plot: if len(values_no_units.shape) != 1: raise ValueError( 'Cannot plot convolution of multidimensional values.') import matplotlib.pyplot as plt ##fig, left_axis = plt.subplots() # Plot the filter response using the left-hand axis. plt.plot(self._response._wavelength, self._response.response, 'rx-') plt.ylim(0., 1.1 * np.max(self._response.response)) plt.xlabel('Wavelength (A)') plt.ylabel( '{0}-{1} Filter Response'.format( self._response.meta['group_name'], self._response.meta['band_name'])) # Use the right-hand axis for the data being filtered. right_axis = plt.twinx() # A kludge to include the left-hand axis label in our legend. right_axis.plot([], [], 'r.-', label='filter') # Plot the input values using the right-hand axis. right_axis.set_ylabel(r'Integrand $dg/d\lambda \cdot R$') right_axis.plot( self._wavelength, values_no_units, 'bs-', label='input') right_axis.set_ylim(0., 1.1 * np.max(values_no_units)) # Multiply values by the response. response_shape = np.ones_like(values_no_units.shape, dtype=int) response_shape[axis] = len(self._response_grid) integrand = values_no_units * self._response_grid.reshape(response_shape) if self.interpolate_wavelength is not None: # Interpolate the input values. interpolator = scipy.interpolate.interp1d( self._wavelength, values_no_units, axis=axis, kind='linear', copy=False, bounds_error=True) interpolated_values = interpolator(self.interpolate_wavelength) if plot: # Show the interpolation locations. plt.scatter( self.interpolate_wavelength, interpolated_values, s=30, marker='o', edgecolor='b', facecolor='none', label='interpolated') # Multiply interpolated values by the response. response_shape[axis] = len(self.interpolate_wavelength) interpolated_integrand = ( interpolated_values * self.interpolate_response.reshape(response_shape)) # Update the integrand with the interpolated values. integrand = np.concatenate( (integrand, interpolated_integrand), axis=axis) # Resort by wavelength. values_slice[axis] = self.interpolate_sort_order integrand = integrand[tuple(values_slice)] if plot: # Plot integrand before applying weights, so we can re-use # the right-hand axis scale. plt.fill_between( self.quad_wavelength, integrand, color='g', lw=0, alpha=0.25) plt.plot( self.quad_wavelength, integrand, 'g-', alpha=0.5, label='filtered') right_axis.legend(loc='center right') xpad = 0.05 * ( self.quad_wavelength[-1] - self.quad_wavelength[0]) plt.xlim(self._wavelength[0] - xpad, self._wavelength[-1] + xpad) if self.quad_weight is not None: # Apply weights. response_shape[axis] = len(self.quad_weight) integrand *= self.quad_weight.reshape(response_shape) integrator = _filter_integration_methods[method] integral = integrator( y=integrand, x=self.quad_wavelength, axis=axis) if input_has_units: # Apply the output units. integral = integral * self.output_units return integral
[docs] class FilterSequence(collections.abc.Sequence): """Immutable sequence of filter responses. Sequences should normally be created by calling :func:`load_filters`. Objects implement the `immutable sequence <https://docs.python.org/2/library/collections.html #collections-abstract-base-classes>`__ API, in addition to the methods listed here. A filter sequence's :meth:`get_ab_maggies` and :meth:`get_ab_magnitudes` methods return their results in a :class:`Table <astropy.table.Table>` and are convenient for calculating convolutions in several bands for multiple spectra. For example, given the following 4 (identical) spectra covering the SDSS filters: >>> num_spectra, num_pixels = 4, 500 >>> wlen = np.linspace(2000, 12000, num_pixels) * default_wavelength_unit >>> flux = np.ones((num_spectra, num_pixels)) * 1e-17 * default_flux_unit We can now calculate their magnitudes in all bands with one function: >>> sdss = load_filters('sdss2010-*') >>> mags = sdss.get_ab_magnitudes(flux, wlen) Refer to the :mod:`astropy docs <astropy.table>` for details on working with Tables. For example, to pretty-print the magnitudes with a precision of 0.001: >>> formats = dict((n, '%.3f') for n in sdss.names) >>> mags.write(None, format='ascii.fixed_width', formats=formats) | sdss2010-u | sdss2010-g | sdss2010-r | sdss2010-i | sdss2010-z | | 22.316 | 21.731 | 21.138 | 20.718 | 20.344 | | 22.316 | 21.731 | 21.138 | 20.718 | 20.344 | | 22.316 | 21.731 | 21.138 | 20.718 | 20.344 | | 22.316 | 21.731 | 21.138 | 20.718 | 20.344 | Parameters ---------- responses : iterable Response objects to include in this sequence. Objects are copied to an internal list. Attributes ---------- names : list of str List of the filter response names in this sequence, with the format "<group_name>-<band_name>". effective_wavelengths : astropy.units.Quantity List of the effective wavelengths for the filter responses in this sequence, with the default wavelength units. """ def __init__(self, responses): self._responses = list(responses) def __contains__(self, item): return item in self._responses def __len__(self): return len(self._responses) def __iter__(self): return iter(self._responses) def __getitem__(self, key): return self._responses[key] @property def names(self): return [r.name for r in self] @property def effective_wavelengths(self): return [ r.effective_wavelength.value for r in self ] * default_wavelength_unit def _get_table(self, spectrum, wavelength=None, axis=-1, mask_invalid=False, method=None): """Helper method to avoid duplicating code. Used by :meth:`get_ab_magnitudes` and :meth:`get_ab_maggies`. Parameters are identical to those methods except for an additional ``method`` parameter that specifies what method of :class:`FilterReponse` to call to calculate table entries. """ missing_column = None t = astropy.table.Table(meta=dict( description='Created by speclite <speclite.readthedocs.io>'), masked=mask_invalid) for r in self: try: data = method(r, spectrum, wavelength, axis) if data.shape == (): data = [data] column = astropy.table.Column(name=r.name, data=data) except ValueError as e: if not mask_invalid: raise e # Create a missing column the first time we need it. if missing_column is None: column_shape = np.sum(spectrum, axis=axis).shape missing_data = np.zeros(column_shape, dtype=float) if missing_data.shape == (): missing_data = [missing_data] missing_column = astropy.table.MaskedColumn( data=missing_data, mask=True, fill_value=np.nan) column = missing_column column.name = r.name t.add_column(column) return t
[docs] def get_ab_maggies(self, spectrum, wavelength=None, axis=-1, mask_invalid=False): """Calculate a spectrum's relative AB flux convolutions. Calls :meth:`FilterResponse.get_ab_maggies` for each filter in this sequence and returns the results in a structured array. Use :meth:`get_ab_magnitudes` for the corresponding AB magnitudes. Parameters ---------- spectrum : callable or array or :class:`astropy.units.Quantity` See :meth:`get_ab_maggies` for details. wavelength : array or :class:`astropy.units.Quantity` or None See :meth:`get_ab_maggies` for details. axis : int See :meth:`get_ab_maggies` for details. mask_invalid : bool When True, if an error occurs while calculating results for one filter (usually because of insufficient wavelength coverage), the corresponding output column will be filled with missing values. Otherwise, any error raises an exception. Returns ------- astropy.table.Table A table of results with column names corresponding to canonical filter names of the form "<group_name>-<band_name>". If the input spectrum data is multidimensional, its first index is mapped to rows of the returned table. """ return self._get_table(spectrum, wavelength, axis, mask_invalid, method=FilterResponse.get_ab_maggies)
[docs] def get_ab_magnitudes(self, spectrum, wavelength=None, axis=-1, mask_invalid=False): """Calculate a spectrum's AB magnitude. Calls :meth:`FilterResponse.get_ab_magnitude` for each filter in this sequence and returns the results in a structured array. Parameters ---------- spectrum : callable or array or :class:`astropy.units.Quantity` See :meth:`get_ab_magnitude` for details. wavelength : array or :class:`astropy.units.Quantity` or None See :meth:`get_ab_magnitude` for details. axis : int See :meth:`get_ab_magnitude` for details. mask_invalid : bool When True, if an error occurs while calculating results for one filter (usually because of insufficient wavelength coverage), the corresponding output column will be filled with missing values. Otherwise, any error raises an exception. Returns ------- astropy.table.Table A table of results with column names corresponding to canonical filter names of the form "<group_name>-<band_name>". If the input spectrum data is multidimensional, its first index is mapped to rows of the returned table. """ return self._get_table(spectrum, wavelength, axis, mask_invalid, method=FilterResponse.get_ab_magnitude)
[docs] def pad_spectrum(self, spectrum, wavelength, axis=-1, method='median'): """Pad a spectrum to cover all filter responses. Calls :meth:`FilterResponse.pad_spectrum` for each filter in this sequence, in order of increasing effective wavelength. Parameters and caveats for this method are the same. Parameters ---------- spectrum : array or :class:`astropy.units.Quantity` See :meth:`FilterResponse.pad_spectrum` for details. wavelength : array or :class:`astropy.units.Quantity` or None See :meth:`FilterResponse.pad_spectrum` for details. axis : int See :meth:`FilterResponse.pad_spectrum` for details. method : str See :meth:`FilterResponse.pad_spectrum` for details. Returns ------- tuple A tuple (padded_spectrum, padded_wavelength) that replaces the inputs with padded equivalents. """ sorted_responses = [resp for (wlen, resp) in sorted(zip(self.effective_wavelengths, self))] padded_spectrum = np.asanyarray(spectrum) padded_wavelength = np.asanyarray(wavelength) for response in sorted_responses: padded_spectrum, padded_wavelength = response.pad_spectrum( padded_spectrum, padded_wavelength) return padded_spectrum, padded_wavelength
[docs] def load_filters(*names): """Load a sequence of filters by name. For example, to load all the ``sdss2010`` filters: >>> sdss = load_filters('sdss2010-*') >>> sdss.names ['sdss2010-u', 'sdss2010-g', 'sdss2010-r', 'sdss2010-i', 'sdss2010-z'] Names are intepreted according to the following rules: - A canonical name of the form "<group_name>-<band_name>" loads the corresponding single standard filter response. - A group name with a wildcard, "<group_name>-\*", loads all the standard filters in a group, ordered by increasing effective wavelength. - A filename ending with the ".ecsv" extension loads a custom filter response from the specified file. Note that custom filters must be specified individually, even if they all belong to the same group. Parameters ---------- \\*names Variable length list of names to include. Each name must be in one of the formats described above. Returns ------- FilterSequence An immutable :class:`FilterSequence` object containing the requested filters in the order they were specified. """ # Replace any group wildcards with the corresponding canonical names. filters_path = get_path_of_data_file('filters/') names_to_load = [] for name in names: group_match = _group_wildcard.match(name) if group_match: # Check that the group name is recognized. group_name = group_match.group(1) if group_name not in filter_group_names: raise ValueError( "No such group '{0}'. Choose one of {1}." .format(group_name, filter_group_names)) # Scan data/filters/ for bands in this group. band_names = [] band_weff = [] file_names = glob.glob( os.path.join(filters_path, '{0}.ecsv'.format(name))) for file_name in file_names: full_name, _ = os.path.splitext(os.path.basename(file_name)) band_names.append(full_name) response = load_filter(full_name) band_weff.append(response.effective_wavelength) # Add bands in order of increasing effective wavelength. names_to_load.extend( [name for (weff, name) in sorted(zip(band_weff, band_names))]) else: if '*' in name: raise ValueError( "Invalid wildcard pattern '{0}'. Use '<group>-*'." .format(name)) names_to_load.append(name) # Load filters and return them wrapped in a FilterSequence. responses = [] for name in names_to_load: responses.append(load_filter(name)) return FilterSequence(responses)
[docs] def load_filter(name, load_from_cache=True, verbose=False): """Load a single filter response by name. See :doc:`/filters` for details on the filter response file format and the available standard filters. A filter response is normally only loaded from disk the first time this function is called, and subsequent calls immediately return the same cached object. Use the ``verbose`` option for details on how a filter is loaded: >>> rband = load_filter('sdss2010-r') >>> rband = load_filter('sdss2010-r', verbose=True) Returning cached filter response "sdss2010-r" Use :func:`load_filters` to load a sequence of filters from one or more filter groups. Parameters ---------- name : str Name of the filter response to load, which should normally have the format "<group_name>-<band_name>", and refer to one of the reference filters described :doc:`here </filters>`. Otherwise, the name of any file in the `ECSV format <https://github.com/astropy/astropy-APEs/blob/master/APE6.rst>`__ and containing the required fields can be provided. The existence of the ".ecsv" extension is used to distinguish between these cases and any other extension is considered an error. load_from_cache : bool Return a previously cached response object if available. Otherwise, always load the file from disk. verbose : bool Print verbose information about how this filter is loaded. Returns ------- FilterResponse A :class:`FilterResponse` object for the requested filter. Raises ------ ValueError File does not exist or custom file has wrong extension. RuntimeError File is incorrectly formatted. This should never happen for the files included in the source code distribution. """ if load_from_cache and name in _filter_cache: if verbose: print('Returning cached filter response "{0}"'.format(name)) return _filter_cache[name] # Is this a non-standard filter file? base_name, extension = os.path.splitext(name) if extension not in ('', '.ecsv'): raise ValueError( 'Invalid extension for filter file name: "{0}".'.format(extension)) if extension: file_name = name if not os.path.isfile(file_name): raise ValueError('No such filter file "{0}".'.format(file_name)) else: # Validate the name. valid = _full_name_pattern.match(name) if not valid: raise ValueError( "Invalid filter name '{0}'. Use '<group>-<band>'.".format(name)) if valid.group(1) not in filter_group_names: raise ValueError( "No such group '{0}'. Choose one of {1}." .format(valid.group(1), filter_group_names)) file_name = get_path_of_data_file( 'filters/{0}.ecsv'.format(name)) if not os.path.isfile(file_name): raise ValueError("No such filter '{0}' in this group.".format(name)) if verbose: print('Loading filter response from "{0}".'.format(file_name)) table = astropy.table.Table.read( file_name, format='ascii.ecsv', guess=False) if 'wavelength' not in table.colnames: raise RuntimeError('Table is missing required wavelength column.') wavelength_column = table['wavelength'] if wavelength_column.unit is None: raise RuntimeError('No wavelength column unit specified.') wavelength = wavelength_column.data * wavelength_column.unit if 'response' not in table.colnames: raise RuntimeError('Table is missing required response column.') response_column = table['response'] if response_column.unit is not None: raise RuntimeError('Response column has unexpected units.') response = response_column.data return FilterResponse(wavelength, response, table.meta)
[docs] def plot_filters(responses, wavelength_unit=None, wavelength_limits=None, wavelength_scale='linear', legend_loc='upper right', legend_ncols=1, response_limits=None, cmap='nipy_spectral'): """Plot one or more filter response curves. The matplotlib package must be installed to use this function. The :meth:`show <matplotlib.pylot.show` method is not called after creating the plot to allow convenient customization and saving. As a result, you will normally need to call this method yourself. Parameters ---------- responses : :class:`FilterSequence` The sequence of filters to plot, normally obtained by calling :func:`load_filters`. wavelength_unit : :class:`astropy.units.Unit` Convert values along the wavelength axis to the specified unit, or leave them as :attr:`default_wavelength_unit` if this parameter is None. wavelength_limits : tuple or None Plot limits to use on the wavelength axis, or select limits automatically if this parameter is None. Units are optional. wavelength_scale : str Scaling to use for the wavelength axis. See :func:`matplotlib.pyplot.yscale` for details. legend_loc : str Location of the legend to plot, or do not display any legend if this value is None. See :func:`matplotlib.pyplot.legend` for details. legend_ncols : int Number of legend columns. Default is 1. response_limits : tuple or None Plot limits to use on the response axis, or select limits automatically if this parameter is None. cmap : str or :class:`matplotlib.colors.Colormap` Color map to use for plotting each filter band. Colors are assigned based on each band's effective wavelength, so a spectral color map (from blue to red) will give nice results. """ if wavelength_unit is None: wavelength_unit = default_wavelength_unit # Look up the range of effective wavelengths for this set of filters. effective_wavelengths = responses.effective_wavelengths min_wlen, max_wlen = min(effective_wavelengths), max(effective_wavelengths) import matplotlib.pyplot as plt import matplotlib.cm as cm cmap = cm.get_cmap(cmap) fig, ax = plt.subplots() plt.xscale(wavelength_scale) if wavelength_limits is not None: try: wlen_min, wlen_max = wavelength_limits except ValueError: raise ValueError('Invalid wavelength limits.') try: wlen_min = wlen_min.to(wavelength_unit).value except astropy.units.UnitConversionError: raise ValueError('Invalid wavelength_unit.') except AttributeError: pass try: wlen_max = wlen_max.to(wavelength_unit).value except astropy.units.UnitConversionError: raise ValueError('Invalid wavelength_unit.') except AttributeError: pass plt.xlim(wlen_min, wlen_max) for response, wlen in zip(responses, effective_wavelengths): if max_wlen > min_wlen: # Use an approximate spectral color for each band. c = cmap(0.1 + 0.8 * (wlen - min_wlen) / (max_wlen - min_wlen)) else: c = 'green' wlen = response._wavelength * default_wavelength_unit try: wlen = wlen.to(wavelength_unit) except astropy.units.UnitConversionError: raise ValueError('Invalid wavelength_unit.') plt.fill_between(wlen.value, response.response, color=c, alpha=0.25) plt.plot(wlen.value, response.response, color=c, alpha=0.5, label=response.name) if response_limits is None: plt.ylim(0, None) else: plt.ylim(response_limits) plt.xlabel('Wavelength [{0}]'.format(wavelength_unit)) plt.ylabel('Filter Response') if legend_loc is not None: if legend_ncols is None: legend_ncols = 1 plt.legend(loc=legend_loc, ncol=legend_ncols) plt.grid()
[docs] def filter_sampling_explanatory_plot(save=None): """Generate an explanatory plot for the documentation. Requires that the matplotlib package is installed. The generated figure appears in the :ref:`sampling` section. Parameters ---------- save : str or None Name of the file where this plot should be saved. """ import matplotlib.pyplot as plt fig = plt.figure(figsize=(15, 4)) # wlen = [4500, 7400] * default_wavelength_unit rconv = FilterConvolution( 'bessell-V', wlen, interpolate=True, units=default_flux_unit) flux = [1., 1.] * default_flux_unit plt.subplot(1, 3, 1) c1 = rconv(flux, plot=True).cgs # wlen = np.linspace(4500, 7400, 30) * default_wavelength_unit rconv = FilterConvolution('bessell-V', wlen, units=default_flux_unit) flux = np.ones_like(wlen.value) * default_flux_unit plt.subplot(1, 3, 2) c2 = rconv(flux, plot=True).cgs # wlen = np.linspace(4500, 7400, 9) * default_wavelength_unit rconv = FilterConvolution( 'bessell-V', wlen, interpolate=True, units=default_flux_unit) flux = np.ones_like(wlen.value) * default_flux_unit plt.subplot(1, 3, 3) c3 = rconv(flux, plot=True).cgs # print('c2-c1 error = {0:.3f}%, c3-c1 error = {1:.3f}%' .format(1e2 * abs(c2 - c1) / c1, 1e2 * abs(c3 - c1) / c1)) # plt.tight_layout() if save: plt.savefig(save)