# Licensed under a 3-clause BSD style license - see LICENSE.rst
"""Resample spectra using interpolation.
"""
from __future__ import print_function, division
import numpy as np
import numpy.ma as ma
import scipy.interpolate
import pkg_resources as pkgr
if pkgr.parse_version(np.__version__) >= pkgr.parse_version('1.16'):
import numpy.lib.recfunctions as rfn
[docs]
def resample(data_in, x_in, x_out, y, data_out=None, kind='linear'):
"""Resample the data of one spectrum using interpolation.
Dependent variables y1, y2, ... in the input data are resampled in the
independent variable x using interpolation models y1(x), y2(x), ...
evaluated on a new grid of x values. The independent variable will
typically be a wavelength or frequency and the independent variables can
be fluxes, inverse variances, etc.
Interpolation is intended for cases where the input and output grids have
comparable densities. When neighboring samples are correlated, the
resampling process should be essentially lossless. When the output
grid is sparser than the input grid, it may be more appropriate to
"downsample", i.e., average dependent variables over consecutive ranges
of input samples.
The basic usage of this function is:
>>> data = np.ones((5,),
... [('wlen', float), ('flux', float), ('ivar', float)])
>>> data['wlen'] = np.arange(4000, 5000, 200)
>>> wlen_out = np.arange(4100, 4700, 200)
>>> out = resample(data, 'wlen', wlen_out, ('flux', 'ivar'))
>>> np.all(out ==
... np.array([(4100, 1.0, 1.0), (4300, 1.0, 1.0), (4500, 1.0, 1.0)],
... dtype=[('wlen', '<i8'), ('flux', '<f8'), ('ivar', '<f8')]))
True
The input grid can also be external to the structured array of spectral
data, for example:
>>> data = np.ones((5,), [('flux', float), ('ivar', float)])
>>> wlen_in = np.arange(4000, 5000, 200)
>>> wlen_out = np.arange(4100, 4900, 200)
>>> out = resample(data, wlen_in, wlen_out, ('flux', 'ivar'))
>>> np.all(out ==
... np.array([(1.0, 1.0), (1.0, 1.0), (1.0, 1.0), (1.0, 1.0)],
... dtype=[('flux', '<f8'), ('ivar', '<f8')]))
True
If the output grid extends beyond the input grid, a `masked array
<http://docs.scipy.org/doc/numpy/reference/maskedarray.html>`__ will be
returned with any values requiring extrapolation masked:
>>> wlen_out = np.arange(3500, 5500, 500)
>>> out = resample(data, wlen_in, wlen_out, 'flux')
>>> np.all(out.mask ==
... np.array([(True,), (False,), (False,), (True,)],
... dtype=[('flux', 'bool')]))
True
If the input data is masked, any output interpolated values that depend on
an input masked value will be masked in the output:
>>> data = ma.ones((5,), [('flux', float), ('ivar', float)])
>>> data['flux'][2] = ma.masked
>>> wlen_out = np.arange(4100, 4900, 200)
>>> out = resample(data, wlen_in, wlen_out, 'flux')
>>> np.all(out.mask ==
... np.array([(False,), (True,), (True,), (False,)],
... dtype=[('flux', 'bool')]))
True
Interpolation is performed using :class:`scipy.interpolate.inter1pd`.
Parameters
----------
data_in : numpy.ndarray or numpy.ma.MaskedArray
Structured numpy array of input spectral data to resample. The input
array must be one-dimensional.
x_in : string or numpy.ndarray
A field name in data_in containing the independent variable to use
for interpolation, or else an array of values with the same shape
as the input data.
x_out : numpy.ndarray
An array of values for the independent variable where interpolation
models should be evaluated to calculate the output values.
y : string or iterable of strings.
A field name or a list of field names present in the input data that
should be resampled by interpolation and included in the output.
data_out : numpy.ndarray or None
Structured numpy array where the output result should be written. If
None is specified, then an appropriately sized array will be allocated
and returned. Use this method to take control of the memory allocation
and, for example, re-use the same array when resampling many spectra.
kind : string or integer
Specify the kind of interpolation models to build using any of the
forms allowed by :class:`scipy.interpolate.inter1pd`. If any input
dependent values are masked, only the ``nearest` and ``linear``
values are allowed.
Returns
-------
numpy.ndarray or numpy.ma.MaskedArray
Structured numpy array of the resampled result containing all ``y``
fields and (if ``x_in`` is specified as a string) the output ``x``
field. The output will be a :class:`numpy.ma.MaskedArray` if ``x_out``
extends beyond ``x_in`` or if ``data_in`` is masked.
"""
if not isinstance(data_in, np.ndarray):
raise ValueError('Invalid data_in type: {0}.'.format(type(data_in)))
if data_in.dtype.fields is None:
raise ValueError('Input data_in is not a structured array.')
if len(data_in.shape) > 1:
raise ValueError('Input data_in is multidimensional.')
if isinstance(x_in, str):
if x_in not in data_in.dtype.names:
raise ValueError('No such x_in field: {0}.'.format(x_in))
x_out_name = x_in
x_in = data_in[x_in]
else:
if not isinstance(x_in, np.ndarray):
raise ValueError('Invalid x_in type: {0}.'.format(type(x_in)))
if x_in.shape != data_in.shape:
raise ValueError('Incompatible shapes for x_in and data_in.')
x_out_name = None
if not isinstance(x_out, np.ndarray):
raise ValueError('Invalid x_out type: {0}.'.format(type(data_out)))
if ma.isMA(x_in) and np.any(x_in.mask):
raise ValueError('Cannot resample masked x_in.')
x_type = np.promote_types(x_in.dtype, x_out.dtype)
dtype_out = []
if x_out_name is not None:
dtype_out.append((x_out_name, x_out.dtype))
if isinstance(y, str):
# Use a list instead of a tuple here so y_names can be used
# to index data_in below.
y_names = [y,]
else:
try:
y_names = [name for name in y]
except TypeError:
raise ValueError('Invalid y type: {0}.'.format(type(y)))
for not_first, y in enumerate(y_names):
if y not in data_in.dtype.names:
raise ValueError('No such y field: {0}.'.format(y))
if not_first:
if data_in[y].dtype != y_type:
raise ValueError('All y fields must have the same type.')
else:
y_type = data_in[y].dtype
dtype_out.append((y, y_type))
y_shape = (len(y_names),)
if ma.isMA(data_in):
# Copy the structured 1D array into a 2D unstructured array
# and set masked values to NaN.
y_in = np.zeros(data_in.shape + y_shape, y_type)
for i,y in enumerate(y_names):
y_in[:,i] = data_in[y].filled(np.nan)
else:
if pkgr.parse_version(np.__version__) >= pkgr.parse_version('1.16'):
# The slicing does not work in numpy 1.16 and above
# we use structured_to_unstructured to get the slice that we care about
y_in = rfn.structured_to_unstructured(
data_in[y_names]).reshape(data_in.shape + y_shape)
else:
y_in = data_in[y_names]
# View the structured 1D array as a 2D unstructured array (without
# copying any memory).
y_in = y_in.view(y_type).reshape(data_in.shape + y_shape)
# interp1d will only propagate NaNs correctly for certain values of `kind`.
# With numpy = 1.6 or 1.7, only 'nearest' and 'linear' work.
# With numpy = 1.8 or 1.9, 'slinear' and kind = 0 or 1 also work.
if np.any(np.isnan(y_in)):
if kind not in ('nearest', 'linear'):
raise ValueError(
'Interpolation kind not supported for masked data: {0}.'
.format(kind))
try:
interpolator = scipy.interpolate.interp1d(
x_in, y_in, kind=kind, axis=0, copy=False,
bounds_error=False, fill_value=np.nan)
except NotImplementedError:
raise ValueError('Interpolation kind not supported: {0}.'.format(kind))
shape_out = (len(x_out),)
if data_out is None:
data_out = np.empty(shape_out, dtype_out)
else:
if data_out.shape != shape_out:
raise ValueError(
'data_out has wrong shape: {0}. Expected: {1}.'
.format(data_out.shape, shape_out))
if data_out.dtype != dtype_out:
raise ValueError(
'data_out has wrong dtype: {0}. Expected: {1}.'
.format(data_out.dtype, dtype_out))
if x_out_name is not None:
data_out[x_out_name][:] = x_out
y_out = interpolator(x_out)
for i,y in enumerate(y_names):
data_out[y][:] = y_out[:,i]
if ma.isMA(data_in) or np.any(np.isnan(y_out)):
data_out = ma.MaskedArray(data_out)
data_out.mask = False
for y in y_names:
data_out[y].mask = np.isnan(data_out[y].data)
return data_out