# -*- coding: utf-8 -*-
# ###########################################################################
# Enlightening example:
#
# for one pixel:
# R_hi = 2000 @5000A FWHM_hi = 2.5A
# R_lo = 500 @5000A FWHM_lo = 10.0A
# FWHM_GK = sqrt(10.0**2 - 2.5**2) = 9.68A
#
# for next pixel:
# R_hi = 1000 @5000A FWHM_hi = 5.0A
# R_lo = 500 @5000A FWHM_lo = 10.0A
# FWHM_GK = sqrt(10.0**2 - 5.0**2) = 8.66A
#
# Therefore, to keep the number of pixels of Gaussian Kernels the same,
# we have to adjust the FWHM_interp (and thus the R_interp).
#
# For pixels 2000->500, the FWHM_GK = 9.68 (wider GK),
# while for pixels 1000-> 500, FWHM_GK = 8.66 (narrower GK),
# thus the R_interp should be 9.68/8.66=1.12 times higher,
# i.e., the delta_lambda should be 1./1.12 times smaller.
# ###########################################################################
import datetime
import numpy as np
from astropy.table import Table
from scipy import signal
from scipy.interpolate import interp1d
# ########################################################################### #
# ################ transformation between R and FWHM ###################### #
# ########################################################################### #
[docs]
def resolution2fwhm(R, wave=5000.0):
# assert R is positive
if np.isscalar(R):
assert R > 0.0
else:
assert np.all(R > 0.0)
# assert wave is correct in shape
assert np.isscalar(wave) or len(wave) == len(R)
return wave / R
[docs]
def fwhm2resolution(fwhm, wave=5000.0):
# assert fwhm is positive
if np.isscalar(fwhm):
assert fwhm > 0.0
else:
assert np.all(fwhm > 0.0)
# assert wave is correct in shape
assert np.isscalar(wave) or len(wave) == len(fwhm)
return wave / fwhm
# ########################################################################### #
# ###################### generate wave array given R ###################### #
# ########################################################################### #
def _generate_wave_array_R_fixed(wave_start, wave_stop, R=2000.0, over_sample=1.0):
"""generate wave array given a fixed R"""
R_ = over_sample * R - 0.5
# determine wave_step_min
wave_step_min = wave_start / R_
# wave guess
wave_step_guess = np.zeros(int((wave_stop - wave_start) / wave_step_min))
wave_guess = np.zeros_like(wave_step_guess)
wave_step_guess[0] = wave_step_min
wave_guess[0] = wave_start
# iterate for real
for i in np.arange(1, len(wave_guess)):
wave_guess[i] = wave_guess[i - 1] + wave_step_guess[i - 1]
wave_step_guess[i] = wave_guess[i] / R_
return wave_guess[np.logical_and(wave_guess >= wave_start, wave_guess <= wave_stop)]
def _generate_wave_array_R_func(
wave_start, wave_stop, R=(lambda x: x), over_sample=1.0, wave_test_step=1.0
):
"""generate wave array given R as a function"""
# determine wave_step_min
wave_test = np.arange(wave_start, wave_stop, wave_test_step)
R_test = over_sample * R(wave_test)
wave_step_min = np.min(wave_test / R_test)
# wave guess
wave_guess = np.zeros(int(np.ceil((wave_stop - wave_start) / wave_step_min)))
wave_guess[0] = wave_start
# iterate for real # single side R !!!
for i in np.arange(1, len(wave_guess)):
wave_guess[i] = wave_guess[i - 1] + wave_guess[i - 1] / (
over_sample * R(wave_guess[i - 1])
)
return wave_guess[np.logical_and(wave_guess >= wave_start, wave_guess <= wave_stop)]
[docs]
def generate_wave_array_R(
wave_start, wave_stop, R=2000.0, over_sample=1.0, wave_test_step=1.0
):
"""generate a wavelength array matching the given R
Parameters
----------
wave_start: float
start from this wavelength
wave_stop: float
stop at this wavelength
R: float or function
specify a fixed R or specify R as a function of wavelength
over_sample: float
over-sampling rate, default is 1.
wave_test_step:
used to determing the wave_step_min
Returns
-------
wave: array
an array matching the given R
Example
-------
>>> def R(x): return 0.2*x
>>> wave_array_R = generate_wave_array_R(4000., 5000., R)
"""
if np.isscalar(R):
# if R is scalar
return _generate_wave_array_R_fixed(
wave_start, wave_stop, R=R, over_sample=over_sample
)
else:
# if R is a function / Interpolator
return _generate_wave_array_R_func(
wave_start,
wave_stop,
R=R,
over_sample=over_sample,
wave_test_step=wave_test_step,
)
# ########################################################################### #
# ################ generate wave array given delta_lambda ################# #
# ########################################################################### #
def _generate_wave_array_delta_lambda_fixed(
wave_start, wave_stop, delta_lambda, over_sample=1.0
):
"""generate a wavelength array matching the given delta_lambda (fixed)"""
return np.arange(wave_start, wave_stop, delta_lambda / over_sample)
def _generate_wave_array_delta_lambda_func(
wave_start,
wave_stop,
delta_lambda=(lambda x: 1.0),
over_sample=1.0,
wave_test_step=1.0,
):
"""generate a wavelength array matching the given delta_lambda
(specified as a function of wavelength)"""
wave_test = np.arange(wave_start, wave_stop, wave_test_step)
delta_lambda_min = np.min(delta_lambda(wave_test))
wave_guess = np.arange(wave_start, wave_stop, delta_lambda_min)
for i in range(1, len(wave_guess)):
wave_guess[i] = (
wave_guess[i - 1] + delta_lambda(wave_guess[i - 1]) / over_sample
)
return wave_guess[np.logical_and(wave_guess >= wave_start, wave_guess <= wave_stop)]
[docs]
def generate_wave_array_delta_lambda(
wave_start,
wave_stop,
delta_lambda=(lambda x: 1.0),
over_sample=1.0,
wave_test_step=1.0,
):
"""generate a wavelength array matching the given delta_lambda a function of wavelength
Parameters
----------
wave_start: float
where the wavelength starts
wave_stop: float
where the wavelength stops
delta_lambda: float or function
specifies the delta_lambda as a fixed number or a function of wavelength
over_sample: float
over-sampling
wave_test_step: float
tests for the smallest wave_guess step
Returns
-------
wave_guess: array
the guess of wavelength array
Example
-------
>>> def dl(x): return 0.002*x
>>> wave_array_dl = generate_wave_array_delta_lambda(4000., 5000., dl)
"""
if np.isscalar(delta_lambda):
# if delta_lambda is scalar
return _generate_wave_array_delta_lambda_fixed(
wave_start, wave_stop, delta_lambda=delta_lambda, over_sample=over_sample
)
else:
# if delta_lambda is a function / Interpolator
return _generate_wave_array_delta_lambda_func(
wave_start,
wave_stop,
delta_lambda=delta_lambda,
over_sample=over_sample,
wave_test_step=wave_test_step,
)
# ########################################################################### #
# ############## find spectral R (FWHM) and R_max (FWHM_min) ################ #
# ########################################################################### #
[docs]
def find_R_for_wave_array(wave):
"""find the R of wavelength array (sampling resolution array)"""
wave = wave.flatten()
wave_diff = np.diff(wave)
wave_diff_ = (wave_diff[1:] + wave_diff[:-1]) / 2.0
return np.hstack(
(wave[0] / wave_diff[0], wave[1:-1] / wave_diff_, wave[-1] / wave_diff[-1])
)
[docs]
def find_R_max_for_wave_array(wave):
"""find the maximum sampling resolution of a given wavelength array"""
return np.max(find_R_for_wave_array(wave))
[docs]
def find_delta_lambda_for_wave_array(wave):
"""find the delta_lambda of wavelength array (delta_lambda array)"""
wave = wave.flatten()
wave_diff = np.diff(wave)
wave_diff_ = (wave_diff[1:] + wave_diff[:-1]) / 2.0
return np.hstack((wave_diff[0], wave_diff_, wave[-1]))
[docs]
def find_delta_lambda_min_for_wave_array(wave):
"""find the minimum delta_lambda of a given wavelength array"""
return np.min(find_delta_lambda_for_wave_array(wave))
# ########################################################################### #
# ############################### find Rgk ################################ #
# ########################################################################### #
[docs]
def find_Rgk(R_hi=2000.0, R_lo=500.0, over_sample=1.0):
"""find Rgk as a function of wavelength
Parameters
----------
R_hi: float or funtion
higher resolution (as a function of wavelength)
R_lo: float or funtion
lower resolution (as a function of wavelength)
over_sample: float
over-sampled resolution, default is 1.
Returns
-------
Rgk: function
Gaussian Kernel resolution as a function of wavelength
"""
if np.isscalar(R_hi):
# if R_hi is a fixed number
R_hi_ = lambda x: R_hi
else:
R_hi_ = R_hi
if np.isscalar(R_lo):
# if R_lo is a fixed number
R_lo_ = lambda x: R_lo
else:
R_lo_ = R_lo
Rgk = lambda x: (
over_sample * x / np.sqrt((x / R_lo_(x)) ** 2.0 - (x / R_hi_(x)) ** 2.0)
)
return Rgk
# ########################################################################### #
# ######################### Gaussian Kernel ################################ #
# ########################################################################### #
[docs]
def fwhm2sigma(fwhm):
return fwhm / (2.0 * np.sqrt(2.0 * np.log(2.0)))
[docs]
def sigma2fwhm(sigma):
return 2.0 * np.sqrt(2.0 * np.log(2.0)) * sigma
[docs]
def normalized_gaussian_array(x, b=0.0, c=1.0):
# a = 1. / (np.sqrt(2*np.pi) * c)
ngs_arr = np.exp(-((x - b) ** 2.0) / (2.0 * c**2.0))
return ngs_arr / np.sum(ngs_arr)
[docs]
def generate_gaussian_kernel_array(over_sample_Rgk, sigma_num):
"""generate gaussian kernel array according to over_sample_Rgk
Parameters
----------
over_sample_Rgk: float
over_sample rate
sigma_num: float
1 sigma of the Gaussian = sigma_num pixels
Returns
-------
normalized gaussian array
"""
sigma_pixel_num = fwhm2sigma(over_sample_Rgk)
array_length = np.fix(sigma_num * sigma_pixel_num)
if array_length % 2 == 0:
array_length += 1
array_length_half = (array_length - 1) / 2.0
xgs = np.arange(-array_length_half, array_length_half + 1)
return normalized_gaussian_array(xgs, b=0.0, c=sigma_pixel_num)
# most general case
[docs]
def conv_spec(
wave,
flux,
R_hi=2000.0,
R_lo=500.0,
over_sample_additional=3.0,
gaussian_kernel_sigma_num=5.0,
wave_new=None,
wave_new_oversample=3.0,
verbose=True,
return_type="array",
):
"""to convolve high-R spectrum to low-R spectrum
Parameters
----------
wave: array
wavelength
flux: array
flux array
R_hi: float or function
higher resolution
R_lo: float or function
lower resolution
over_sample_additional: float
additional over-sample rate
gaussian_kernel_sigma_num: float
the gaussian kernel width in terms of sigma
wave_new: None or float or array
if None: wave_new auto-generated using wave_new_oversample
if float: this specifies the over-sample rate
if voctor: this specifies the new wave_new array
wave_new_oversample:
if wave_new is None, use auto-generated wave_new_oversample
verbose: bool
if True, print the details on the screen
return_type: string
if 'array': return wave and flux as array
if 'table': retrun spec object
Returns
-------
wave_new, flux_new OR Table([wave, flux])
"""
if verbose:
start = datetime.datetime.now()
print("===============================================================")
print("@laspec.convolution.conv_spec: starting at {}".format(start))
# 1. re-format R_hi & R_lo
assert R_hi is not None and R_lo is not None
if np.isscalar(R_hi):
R_hi_ = lambda x: R_hi
else:
R_hi_ = R_hi
if np.isscalar(R_lo):
R_lo_ = lambda x: R_lo
else:
R_lo_ = R_lo
# 2. find Rgk
Rgk = find_Rgk(R_hi_, R_lo_, over_sample=1.0)
# 3. find appropriate over_sample
R_hi_specmax = find_R_max_for_wave_array(wave)
R_hi_max = np.max(R_hi_(wave))
over_sample = over_sample_additional * np.fix(
np.max([R_hi_specmax / Rgk(wave), R_hi_max / Rgk(wave)])
)
# 4. find wave_interp & flux_interp
if verbose:
print(
"@laspec.convolution.conv_spec: interpolating orignal spectrum to wave_interp ..."
)
wave_max = np.max(wave)
wave_min = np.min(wave)
wave_interp = generate_wave_array_R(
wave_min, wave_max, Rgk, over_sample=over_sample
)
# P = PchipInterpolator(wave, flux, extrapolate=None)
P = interp1d(wave, flux, kind="linear", bounds_error=False, fill_value=np.nan)
flux_interp = P(wave_interp)
assert not np.any(np.isnan(flux_interp))
# 5. generate Gaussian Kernel array
if verbose:
print("@laspec.convolution.conv_spec: generating gaussian kernel array ...")
gk_array = generate_gaussian_kernel_array(over_sample, gaussian_kernel_sigma_num)
# gk_len = len(gk_array)
# gk_len_half = int((gk_len - 1) / 2.)
# 6. convolution
if verbose:
print("@laspec.convolution.conv_spec: convolving ...")
# convolved_flux = np.convolve(flux_interp, gk_array)[gk_len_half:-gk_len_half]
# convolved_flux = np.convolve(flux_interp, gk_array, mode="same")
convolved_flux = signal.fftconvolve(flux_interp, gk_array, mode="same")
# 7. find new wave array
if wave_new is None:
# wave_new is None
# default: 5 times over-sample
if verbose:
print(
"@laspec.convolution.conv_spec: using default 5 times over-sample wave array ..."
)
wave_new = generate_wave_array_R(
wave_interp[0], wave_interp[-1], R_lo, wave_new_oversample
)
elif np.isscalar(wave_new):
# wave_new specifies the new wave array over_sampling_lo rate
# default is 5. times over-sample
if verbose:
print(
"@laspec.convolution.conv_spec: using user-specified {:.2f} times over-sample wave array ...".format(
wave_new
)
)
wave_new = generate_wave_array_R(
wave_interp[0], wave_interp[-1], R_lo, wave_new
)
else:
# wave_new specified
if verbose:
print("@laspec.convolution.conv_spec: using user-specified wave array ...")
# 8. interpolate convolved flux to new wave array
if verbose:
print(
"@laspec.convolution.conv_spec: interpolating convolved spectrum to new wave array ..."
)
# P = PchipInterpolator(wave_interp, convolved_flux, extrapolate=False)
P = interp1d(
wave_interp,
convolved_flux,
kind="linear",
bounds_error=False,
fill_value=np.nan,
)
flux_new = P(wave_new)
if verbose:
stop = datetime.datetime.now()
print(
"@laspec.convolution.conv_spec: total time spent: {:.2f} seconds".format(
(stop - start).total_seconds()
)
)
print("===============================================================")
if return_type == "array":
return wave_new, flux_new
elif return_type == "table":
return Table([wave_new, flux_new], names=["wave", "flux"])
[docs]
def read_phoenix_sun():
"""read PHOENIX synthetic spectrum for the Sun"""
import laspec
from astropy.io import fits
flux_sun = fits.open(
laspec.__path__[0]
+ "/data/phoenix/lte05800-4.50-0.0.PHOENIX-ACES-AGSS-COND-2011-HiRes.fits"
)[0].data
wave_sun = fits.open(
laspec.__path__[0] + "/data/phoenix/WAVE_PHOENIX-ACES-AGSS-COND-2011.fits"
)[0].data
return wave_sun, flux_sun
[docs]
def test_conv_phoenix_sun():
"""testing convolving PHOENIX synthetic spectrum for the Sun"""
import matplotlib.pyplot as plt
wave_sun, flux_sun = read_phoenix_sun()
ind_optical = (wave_sun > 4000) & (wave_sun < 7000)
wave = wave_sun[ind_optical]
flux = flux_sun[ind_optical]
wave_conv, flux_conv = conv_spec(wave, flux, R_hi=3e5, R_lo=2000)
plt.figure()
plt.plot(wave, flux)
plt.plot(wave_conv, flux_conv)
return
if __name__ == "__main__":
test_conv_phoenix_sun()