Source code for laspec.mrs
__all__ = ["MrsSpec", "MrsEpoch", "MrsFits", "MrsSource", "debad", "SOL_kms"]
import glob
import os
import warnings
import matplotlib.pyplot as plt
import numpy as np
from astropy import constants
from astropy.io import fits
from astropy.table import Table
from scipy.signal import medfilt
from scipy.signal.windows import gaussian
from .normalization import normalize_spectrum_general
from .time import datetime2jd, jd2bjd
SOL_kms = constants.c.value / 1000.0
warnings.filterwarnings("ignore")
[docs]
def debad(
wave, fluxnorm, nsigma=(4, 8), mfarg=21, gfarg=(51, 9), maskconv=7, maxiter=3
):
"""
Parameters
----------
wave:
wavelength
fluxnorm:
normalized flux
nsigma:
lower & upper sigma levels
mfarg:
median filter width / pixel
gfarg:
Gaussian filter length & width / pixel
maskconv:
mask convolution --> cushion
maxiter:
max iteration
Return
------
fluxnorm
"""
npix = len(fluxnorm)
indclip = np.zeros_like(fluxnorm, dtype=bool)
countiter = 0
while True:
# median filter
fluxmf = medfilt(fluxnorm, mfarg)
# gaussian filter
gk = gaussian(5, 0.5)
gk /= np.sum(gk)
fluxmf = np.convolve(fluxmf, gk, "same")
# residuals
fluxres = fluxnorm - fluxmf
# gaussian filter --> sigma
gk = gaussian(*gfarg)
gk /= np.sum(gk)
fluxsigma = np.convolve(np.abs(fluxres), gk, "same")
# clip
indout = np.logical_or(
fluxres > fluxsigma * nsigma[0], fluxres < -fluxsigma * nsigma[1]
)
indclip |= indout
if np.sum(indclip) > 0.5 * npix:
raise RuntimeError("Too many bad pixels!")
countiter += 1
if np.sum(indout) == 0 or countiter >= maxiter:
return fluxnorm
else:
indout = np.convolve(indout * 1.0, np.ones((maskconv,)), "same") > 0
fluxnorm = np.interp(wave, wave[~indout], fluxnorm[~indout])
[docs]
class MrsSpec:
"""MRS spectrum"""
name = ""
# original quantities
wave = np.array([], dtype=float)
flux = np.array([], dtype=float)
ivar = np.array([], dtype=float)
mask = np.array([], dtype=bool) # True for problematic
flux_err = np.array([], dtype=float)
indcr = np.array([], dtype=float) # cosmic ray index
# normalized quantities
flux_norm = np.array([], dtype=float)
flux_cont = np.array([], dtype=float)
ivar_norm = np.array([], dtype=float)
flux_norm_err = np.array([], dtype=float)
# other information (optional)
info = {}
rv = 0.0
# time and position info
filename = ""
snr = 0
exptime = 0
lmjm = 0
lmjmlist = []
obsid = 0
seeing = 0.0
lamplist = ""
ra = 0.0
dec = 0.0
fibertype = ""
fibermask = 0.0
jdbeg = 0
jdend = 0
jdmid = 0
jdltt = 0.0
bjdmid = 0.0
# status
isempty = True
isnormalized = False
# default settings for normalize_spectrum_iter / normlize_spectrum_poly
norm_type = None
norm_kwargs = {}
[docs]
def meta(self):
return dict(
extname=self.extname,
snr=self.snr,
exptime=self.exptime,
lmjm=self.lmjm,
lamplist=None,
# time and position info
filename="",
obsid=0,
seeing=0,
ra=0.0,
dec=0.0,
fibertype=self.fibertype,
fibermask=self.fibermask,
jdbeg=self.jdbeg,
jdend=self.jdend,
jdmid=self.jdmid,
jdltt=self.jdltt,
bjdmid=self.bjdmid,
)
def __init__(
self,
wave=None,
flux=None,
ivar=None,
mask=None,
info={},
norm_type="spline",
**norm_kwargs,
):
"""a general form of spectrum
Parameters
----------
wave:
array, wavelength
flux:
array, flux
ivar:
array, ivar
mask:
array(int), 1 for problematic
info:
dict, information of this spectrum
norm_type:
poly/spline/None, if set, normalize spectrum after initialization
norm_kwargs:
normalization settings passed to normalize_spectrum_general()
"""
# set data
if wave is not None and flux is not None:
self.wave, self.flux = wave, flux
self.isempty = False
else:
# a null spec
self.wave = np.array([], dtype=float)
self.flux = np.array([], dtype=float)
self.isempty = True
# ivar and mask is optional for spec
if ivar is None:
self.ivar = np.ones_like(self.flux, dtype=float)
else:
self.ivar = ivar
if mask is None:
self.mask = np.zeros_like(self.flux, dtype=bool)
self.npix_bad = 0
else:
self.mask = mask
self.npix_bad = np.sum(self.mask > 0)
# flux_err
flux_err = self.ivar**-0.5
self.flux_err = np.where(np.isfinite(flux_err), flux_err, np.nan)
# set info
for k, v in info.items():
self.__setattr__(k, v)
self.info = info
# normalize spectrum
self.norm_type = norm_type
self.norm_kwargs = norm_kwargs
if norm_type in ["poly", "spline"]:
# normalize
self.normalize(norm_type=norm_type, **norm_kwargs)
self.isnormalized = True
return
def __repr__(self):
return "<MrsSpec name={} snr={:.1f}>".format(self.name, self.snr)
[docs]
@staticmethod
def from_hdu(hdu=None, norm_type=None, **norm_kwargs):
"""convert MRS HDU to spec"""
if hdu is None or hdu.header["EXTNAME"] == "Information":
return MrsSpec()
else:
# convert to Table
spec = Table(hdu.data)
if "LOGLAM" in spec.colnames:
# this is old format, until DR9 v0
spec.sort("LOGLAM")
if "COADD" in hdu.name:
# it's coadded spec
wave = 10 ** spec["LOGLAM"].data
flux = spec["FLUX"].data
ivar = spec["IVAR"].data
mask = spec["ORMASK"].data # use ormask for coadded spec
elif hdu.name.startswith("B-") or hdu.name.startswith("R-"):
# it's epoch spec
wave = 10 ** spec["LOGLAM"].data
flux = spec["FLUX"].data
ivar = spec["IVAR"].data
mask = spec["PIXMASK"].data # use pixmask for epoch spec
elif "WAVELENGTH" in spec.colnames:
# new format, since DR9 v1
if "COADD" in hdu.name:
# it's coadded spec
wave = spec["WAVELENGTH"][0]
flux = spec["FLUX"][0]
ivar = spec["IVAR"][0]
mask = spec["ORMASK"][0] # use ormask for coadded spec
elif hdu.name.startswith("B-") or hdu.name.startswith("R-"):
# it's epoch spec
wave = spec["WAVELENGTH"][0]
flux = spec["FLUX"][0]
ivar = spec["IVAR"][0]
mask = spec["PIXMASK"][0] # use pixmask for epoch spec
else:
raise ValueError("@MrsFits: error in reading epoch spec!")
# get meta info
info = dict(
name=get_kwd_safe(hdu.header, "EXTNAME", ""),
lmjm=int(get_kwd_safe(hdu.header, "LMJM", 0)),
exptime=float(get_kwd_safe(hdu.header, "EXPTIME", 0.0)),
snr=float(get_kwd_safe(hdu.header, "SNR", 0.0)),
lamplist=get_kwd_safe(hdu.header, "LAMPLIST", ""),
)
# initiate MrsSpec
ms = MrsSpec(
wave, flux, ivar, mask, info=info, norm_type=norm_type, **norm_kwargs
)
# calculate bjdmid
ms.jdbeg = datetime2jd(
hdu.header["DATE-BEG"], format="isot", tz_correction=8
)
ms.jdend = datetime2jd(
hdu.header["DATE-END"], format="isot", tz_correction=8
)
ms.jdmid = (ms.jdbeg + ms.jdend) / 2.0
ms.bjdmid = jd2bjd(ms.ra, ms.dec, ms.jdmid)
return ms
[docs]
@staticmethod
def from_mrs(fp_mrs, hduname="COADD_B", norm_type=None, **norm_kwargs):
"""read from MRS fits file"""
hl = fits.open(fp_mrs)
ms = MrsSpec.from_hdu(hl[hduname], norm_type=norm_type, **norm_kwargs)
return ms
[docs]
@staticmethod
def from_lrs(fp_lrs, norm_type="spline", **norm_kwargs):
"""read from LRS fits file"""
hl = fits.open(fp_lrs)
hdr = hl[0].header
try:
flux, ivar, wave, andmask, ormask = hl[0].data
except:
# dr9
flux = hl[1].data["FLUX"][0]
ivar = hl[1].data["IVAR"][0]
wave = hl[1].data["WAVELENGTH"][0]
# andmask = hl[1].data["ANDMASK"][0]
ormask = hl[1].data["ORMASK"][0]
# info
info = dict(
name=hdr["OBSID"],
obsid=hdr["OBSID"],
ra=hdr["RA"],
dec=hdr["DEC"],
rv=hdr["Z"] * SOL_kms,
rv_err=hdr["Z_ERR"] * SOL_kms,
subclass=hdr["SUBCLASS"],
tsource=hdr["TSOURCE"],
snr=hdr["SNRG"],
snru=hdr["SNRU"],
snrg=hdr["SNRG"],
snrr=hdr["SNRR"],
snri=hdr["SNRI"],
snrz=hdr["SNRZ"],
)
ms = MrsSpec(
wave, flux, ivar, ormask, info=info, norm_type=norm_type, **norm_kwargs
)
# calculate bjd
if hdr["DATE-BEG"] != "" and hdr["DATE-END"] != "":
ms.jdbeg = datetime2jd(hdr["DATE-BEG"], format="isot", tz_correction=8)
ms.jdend = datetime2jd(hdr["DATE-END"], format="isot", tz_correction=8)
ms.jdmid = (ms.jdbeg + ms.jdend) / 2.0
elif hdr["DATE-OBS"] != "":
ms.jdmid = datetime2jd(hdr["DATE-OBS"], format="isot", tz_correction=8)
ms.bjdmid = jd2bjd(ms.ra, ms.dec, ms.jdmid)
return ms
[docs]
def normalize(self, llim=0.0, norm_type=None, **norm_kwargs):
"""normalize spectrum with (optional) new settings"""
if not self.isempty:
# for normal spec
if norm_type is not None:
self.norm_type = norm_type
self.norm_kwargs.update(norm_kwargs)
# normalize spectrum
self.flux_norm, self.flux_cont = normalize_spectrum_general(
self.wave,
np.where(self.flux < llim, llim, self.flux),
self.norm_type,
**self.norm_kwargs,
)
self.ivar_norm = self.ivar * self.flux_cont**2
self.flux_norm_err = self.flux_err / self.flux_cont
else:
self.norm_type = norm_type
self.norm_kwargs.update(norm_kwargs)
# normalize spectrum
self.flux_norm = np.array([], dtype=float)
self.flux_cont = np.array([], dtype=float)
self.ivar_norm = np.array([], dtype=float)
self.flux_norm_err = np.array([], dtype=float)
else:
# for empty spec
# update norm kwargs
self.norm_kwargs.update(norm_kwargs)
# normalize spectrum
self.flux_norm = np.array([], dtype=float)
self.flux_cont = np.array([], dtype=float)
self.ivar_norm = np.array([], dtype=float)
self.flux_norm_err = np.array([], dtype=float)
return
[docs]
def wave_rv(self, rv=None):
"""calculate RV-corrected wavelength array
Parameters
----------
rv: float
radial velocity in km/s
"""
if rv is None:
rv = self.rv
return self.wave / (1 + rv / SOL_kms)
[docs]
def interp(self, new_wave, rv=None):
"""interpolate to a new wavelength grid"""
return np.interp(new_wave, self.wave_rv(rv), self.flux)
[docs]
def interp_then_norm(self, new_wave, rv=None):
"""interpolate to a new wavelength grid"""
flux_interp = np.interp(new_wave, self.wave_rv(rv), self.flux)
flux_norm, flux_cont = normalize_spectrum_general(
new_wave, flux_interp, norm_type=self.norm_type, **self.norm_kwargs
)
flux_norm_err = np.interp(new_wave, self.wave_rv(rv), self.flux_err) / flux_cont
return flux_norm, flux_norm_err
[docs]
def interp_norm(self, new_wave, rv=None):
"""interpolate to a new wavelength grid"""
return np.interp(new_wave, self.wave_rv(rv), self.flux_norm)
[docs]
def reduce(
self,
wave_new=None,
rv=0,
npix_cushion=50,
cr=True,
nsigma=(4, 8),
maxiter=5,
norm_type="spline",
niter=2,
flux_bounds=(0, 3),
):
"""
Parameters
----------
wave_new:
if specified, spectrum is interpolated to wave_new
rv:
if specified, radial velocity is corrected
npix_cushion: int
if speficied, cut the two ends
cr:
if True, remove cosmic rays using the *debad* function
nsigma:
sigma levels used in removing cosmic rays
maxiter:
max number of iterations used in removing cosmic rays
norm_type:
"spline" | None
niter:
number iterations in normalization
Returns
-------
wave_new, flux_norm, flux_norm_err
"""
# determine the chunk range
if npix_cushion > 0:
npix0 = npix_cushion
npix1 = -npix_cushion
else:
npix0 = 0
npix1 = len(self.wave)
# cut spectrum
wave_obs = self.wave[npix0:npix1]
flux_err = self.flux_err[npix0:npix1]
mask = self.mask[npix0:npix1] > 0 # positive for bad pixels
# remove cosmic rays if cr is True, also fill the negative pixels with 0.
if cr:
flux_obs = np.where(self.flux > 0, self.flux, 0.0)
flux_obs = debad(self.wave, flux_obs, nsigma=nsigma, maxiter=maxiter)[
npix0:npix1
]
indcr = (np.abs(flux_obs - self.flux[npix0:npix1]) > 1e-5) * 1
else:
flux_obs = self.flux[npix0:npix1]
flux_obs = np.where(flux_obs > 0, flux_obs, 0.0)
indcr = np.zeros(flux_obs.shape, dtype=int)
# use new wavelength grid if wave_new is specified
wave_obsz0 = wave_obs / (1 + rv / SOL_kms)
if wave_new is None:
wave_new = wave_obsz0
elif len(wave_new) == 2:
# this indicates the new wavelength limits
wave_new = wave_obsz0[
(wave_obsz0 > wave_new[0]) & (wave_obsz0 < wave_new[1])
]
# else: wave_new is specified
flux_obs = np.interp(wave_new, wave_obsz0, flux_obs)
flux_err = np.interp(wave_new, wave_obsz0, flux_err)
mask = 1 * (np.interp(wave_new, wave_obsz0, mask | indcr) > 0)
msr = MrsSpec()
msr.wave = wave_new
msr.flux = flux_obs
msr.ivar = flux_err**-2
msr.flux_err = flux_err
msr.mask = mask
msr.npix_bad = self.npix_bad
msr.indcr = indcr
msr.name = self.name
msr.isempty = self.isempty
# other information (optional)
msr.info = self.info
msr.rv = self.rv
# time and position info
msr.filename = self.filename
msr.snr = self.snr
msr.exptime = self.exptime
msr.lmjm = self.lmjm
msr.lmjmlist = self.lmjmlist
msr.obsid = self.obsid
msr.seeing = self.seeing
msr.lamplist = self.lamplist
msr.ra = self.ra
msr.dec = self.dec
msr.fibertype = self.fibertype
msr.fibermask = self.fibermask
msr.jdbeg = self.jdbeg
msr.jdend = self.jdend
msr.jdmid = self.jdmid
msr.jdltt = self.jdltt
msr.bjdmid = self.bjdmid
# normalize spectrum if norm_type is specified
if norm_type is not None:
msr.normalize(norm_type=norm_type, niter=niter)
return msr
[docs]
class MrsEpoch:
"""MRS epoch spcetrum"""
nspec = 0
speclist = []
specnames = []
# the most important attributes
epoch = -1
lmjm = 0
snr = []
rv = 0.0
# time and position info
filename = ""
obsid = 0
seeing = 0.0
ra = 0.0
dec = 0.0
fibertype = ""
fibermask = 0.0
jdbeg = 0.0
jdend = 0.0
jdmid = 0.0
jdltt = 0.0
jdmid_delta = 0.0
bjdmid = 0.0
wave = np.array([], dtype=float)
flux = np.array([], dtype=float)
ivar = np.array([], dtype=float)
mask = np.array([], dtype=int)
flux_err = np.array([], dtype=float)
flux_norm = np.array([], dtype=float)
ivar_norm = np.array([], dtype=float)
flux_cont = np.array([], dtype=float)
flux_norm_err = np.array([], dtype=float)
# # default settings for normalize_spectrum_iter/poly
norm_kwargs = {}
def __init__(
self, speclist, specnames=("B", "R"), epoch=-1, norm_type=None, **norm_kwargs
):
"""combine B & R to an epoch spectrum
In this list form, it is compatible with even echelle spectra
speclist:
spectrum list
specnames:
the names of spectra, will be used as suffix
epoch:
the epoch of this epoch spectrum
norm_type:
if True, normalize spectra in initialization
norm_kwargs:
the normalization settings
"""
# set epoch
self.epoch = epoch
# update norm kwargs
self.norm_kwargs.update(norm_kwargs)
self.nspec = len(speclist)
# default name is spec order
if specnames is None or len(specnames) == 0:
specnames = [i for i in range(self.nspec)]
self.speclist = speclist
self.specnames = specnames
# store spectrum data
self.snr = [spec.snr for spec in self.speclist]
# normalize spectra
self.normalize(llim=0.0, norm_type=norm_type, **norm_kwargs)
return
def __repr__(self):
s = "[MrsEpoch epoch={} nspec={}]".format(self.epoch, self.nspec)
for i in range(self.nspec):
s += "\n{}".format(self.speclist[i])
return s
[docs]
def normalize(self, llim=0.0, norm_type=None, **norm_kwargs):
"""normalize each spectrum with (optional) new settings"""
# update norm kwargs
self.norm_kwargs.update(norm_kwargs)
# normalize each spectrum
for i_spec in range(self.nspec):
self.speclist[i_spec].normalize(
llim=llim, norm_type=norm_type, **self.norm_kwargs
)
# store each spec
self.__setattr__(
"wave_{}".format(self.specnames[i_spec]), self.speclist[i_spec].wave
)
self.__setattr__(
"flux_{}".format(self.specnames[i_spec]), self.speclist[i_spec].flux
)
self.__setattr__(
"ivar_{}".format(self.specnames[i_spec]), self.speclist[i_spec].ivar
)
self.__setattr__(
"mask_{}".format(self.specnames[i_spec]), self.speclist[i_spec].mask
)
self.__setattr__(
"flux_err_{}".format(self.specnames[i_spec]),
self.speclist[i_spec].flux_err,
)
self.__setattr__(
"flux_norm_{}".format(self.specnames[i_spec]),
self.speclist[i_spec].flux_norm,
)
self.__setattr__(
"ivar_norm_{}".format(self.specnames[i_spec]),
self.speclist[i_spec].ivar_norm,
)
self.__setattr__(
"flux_cont_{}".format(self.specnames[i_spec]),
self.speclist[i_spec].flux_cont,
)
self.__setattr__(
"flux_norm_err_{}".format(self.specnames[i_spec]),
self.speclist[i_spec].flux_norm_err,
)
# combined attributes
self.wave = np.array([], dtype=float)
self.flux = np.array([], dtype=float)
self.ivar = np.array([], dtype=float)
self.mask = np.array([], dtype=int)
self.flux_err = np.array([], dtype=float)
self.flux_norm = np.array([], dtype=float)
self.ivar_norm = np.array([], dtype=float)
self.flux_cont = np.array([], dtype=float)
self.flux_norm_err = np.array([], dtype=float)
# concatenate into one epoch spec
for i_spec in range(self.nspec):
if not self.speclist[i_spec].isempty:
self.wave = np.append(self.wave, self.speclist[i_spec].wave)
self.flux = np.append(self.flux, self.speclist[i_spec].flux)
self.ivar = np.append(self.ivar, self.speclist[i_spec].ivar)
self.mask = np.append(self.mask, self.speclist[i_spec].mask)
self.flux_err = np.append(self.flux_err, self.speclist[i_spec].flux_err)
self.flux_norm = np.append(
self.flux_norm, self.speclist[i_spec].flux_norm
)
self.ivar_norm = np.append(
self.ivar_norm, self.speclist[i_spec].ivar_norm
)
self.flux_cont = np.append(
self.flux_cont, self.speclist[i_spec].flux_cont
)
self.flux_norm_err = np.append(
self.flux_norm_err, self.speclist[i_spec].flux_norm_err
)
return
[docs]
def wave_rv(self, rv=None):
"""
calculate RV-corrected wavelength array
Parameters
----------
rv: float
radial velocity in km/s
"""
if rv is None:
rv = self.rv
return self.wave / (1 + rv / SOL_kms)
[docs]
def flux_norm_dbd(self, **kwargs):
"""return fixed flux_norm"""
return debad(self.wave, self.flux_norm, *kwargs)
[docs]
def plot_reduce(self):
for i in range(self.nspec):
msr = self.speclist[i].reduce(norm_type=None)
msr.plot()
[docs]
def plot_norm_reduce(self, shift=0):
for i in range(self.nspec):
msr = self.speclist[i].reduce(norm_type="spline")
msr.plot_norm(shift=shift)
[docs]
def reduce(self, wave_new_list=None, norm_type="spline", niter=3, **rdc_kwargs):
"""
Parameters
----------
wave_new_list:
new wavelength grid list that will be interpolated to
norm_type:
type of normalization
niter:
number of iteration in normalization
Returns
-------
mer: MrsEpoch
reduced epoch spectrum
"""
if wave_new_list is None:
mer = MrsEpoch(
[self.speclist[i].reduce(**rdc_kwargs) for i in range(self.nspec)],
specnames=self.specnames,
norm_type=norm_type,
niter=niter,
)
else:
assert len(wave_new_list) == self.nspec
mer = MrsEpoch(
[
self.speclist[i].reduce(wave_new=wave_new_list[i], **rdc_kwargs)
for i in range(self.nspec)
],
specnames=self.specnames,
norm_type=norm_type,
niter=niter,
)
# header info
mer.epoch = self.epoch
mer.lmjm = self.lmjm
mer.snr = self.snr
mer.rv = self.rv
# time and position info
mer.filename = self.filename
mer.obsid = self.obsid
mer.seeing = self.seeing
mer.ra = self.ra
mer.dec = self.dec
mer.fibertype = self.fibertype
mer.fibermask = self.fibermask
mer.jdbeg = self.jdbeg
mer.jdend = self.jdend
mer.jdmid = self.jdmid
mer.jdltt = self.jdltt
mer.jdmid_delta = self.jdmid_delta
mer.bjdmid = self.bjdmid
return mer
@property
def exptime(self):
return np.array([_.exptime for _ in self.speclist])
[docs]
class MrsFits(fits.HDUList):
nhdu = 0
hdunames = []
isB = []
isR = []
isCoadd = []
isEpoch = []
ulmjm = []
def __init__(self, fp=None):
"""set file path and read data"""
# check fits existence
if fp is None:
print("@MrsSpec: file path is not set!")
elif not os.path.exists(fp):
raise RuntimeError("@MrsSpec: file not found! ", fp)
else:
self.filepath = fp
# read HDU list
super().__init__(fits.open(fp))
# get HDU names
self.nhdu = len(self)
self.hdunames = [hdu.name for hdu in self]
self.ulmjm = []
self.isB = np.zeros(self.nhdu, dtype=bool)
self.isR = np.zeros(self.nhdu, dtype=bool)
self.isEpoch = np.zeros(self.nhdu, dtype=bool)
self.isCoadd = np.zeros(self.nhdu, dtype=bool)
self.lmjm = np.zeros(self.nhdu, dtype=int)
for i in range(self.nhdu):
if self.hdunames[i].startswith("B-"):
self.isB[i] = True
self.isEpoch[i] = True
self.lmjm[i] = int(self.hdunames[i][2:])
elif self.hdunames[i].startswith("R-"):
self.isR[i] = True
self.isEpoch[i] = True
self.lmjm[i] = int(self.hdunames[i][2:])
elif self.hdunames[i] == "COADD_B":
self.isB[i] = True
self.isCoadd[i] = True
elif self.hdunames[i] == "COADD_R":
self.isR[i] = True
self.isCoadd[i] = True
elif not self.hdunames[i] == "Information":
raise RuntimeError("@MrsSpec: error during processing HDU name")
def __repr__(self):
"""as self.info()
Summarize the info of the HDUs in this `HDUList`.
Note that this function prints its results to the console---it
does not return a value.
"""
if self._file is None:
name = "(No file associated with this HDUList)"
else:
name = self._file.name
results = [
f"Filename: {name}",
"No. Name Ver Type Cards Dimensions Format",
]
format = "{:3d} {:10} {:3} {:11} {:5d} {} {} {}"
default = ("", "", "", 0, (), "", "")
for idx, hdu in enumerate(self):
summary = hdu._summary()
if len(summary) < len(default):
summary += default[len(summary) :]
summary = (idx,) + summary
results.append(format.format(*summary))
return "\n".join(results[1:])
# or get one spec (specify a key)?
[docs]
def get_one_spec(self, lmjm="COADD", band="B"):
if lmjm == "COADD":
k = "COADD_{}".format(band.strip())
else:
k = "{}-{}".format(band.strip(), lmjm)
return MrsSpec.from_hdu(self[k])
[docs]
def get_one_epoch(self, lmjm=84420148, norm_type="spline", **norm_kwargs):
"""get one epoch spec from fits"""
try:
if isinstance(lmjm, str):
assert lmjm == "COADD"
if isinstance(lmjm, int):
assert lmjm in self.lmjm
except AssertionError:
raise AssertionError(
"@MrsFits: lmjm={} is not found in this file!".format(lmjm)
)
if lmjm == "COADD":
kB = "COADD_B"
kR = "COADD_R"
else:
kB = "B-{}".format(lmjm)
kR = "R-{}".format(lmjm)
# read B & R band spec
if kB in self.hdunames:
msB = MrsSpec.from_hdu(self[kB], norm_type=norm_type, **norm_kwargs)
else:
msB = MrsSpec(norm_type=norm_type, **norm_kwargs)
if kR in self.hdunames:
msR = MrsSpec.from_hdu(self[kR], norm_type=norm_type, **norm_kwargs)
else:
msR = MrsSpec(norm_type=norm_type, **norm_kwargs)
# set epoch info
me = MrsEpoch(
(msB, msR),
specnames=("B", "R"),
epoch=lmjm,
norm_type=norm_type,
**norm_kwargs,
)
# set additional infomation
me.filename = self[0].header["FILENAME"]
me.obsid = self[0].header["OBSID"]
me.seeing = self[0].header["SEEING"]
me.ra = self[0].header["RA"]
me.dec = self[0].header["DEC"]
me.fibertype = (
self[0].header["FIBERTYP"] if "FIBERTYP" in self[0].header.keys() else None
)
me.fibermask = (
self[0].header["FIBERMAS"] if "FIBERMAS" in self[0].header.keys() else None
)
try:
if kB in self.hdunames and kR not in self.hdunames:
me.jdbeg = datetime2jd(
self[kB].header["DATE-BEG"], format="isot", tz_correction=8
)
me.jdend = datetime2jd(
self[kB].header["DATE-END"], format="isot", tz_correction=8
)
me.jdmid = (me.jdbeg + me.jdend) / 2.0
me.bjdmid = jd2bjd(me.ra, me.dec, me.jdmid)
elif kB not in self.hdunames and kR in self.hdunames:
me.jdbeg = datetime2jd(
self[kR].header["DATE-BEG"], format="isot", tz_correction=8
)
me.jdend = datetime2jd(
self[kR].header["DATE-END"], format="isot", tz_correction=8
)
me.jdmid = (me.jdbeg + me.jdend) / 2.0
me.bjdmid = jd2bjd(me.ra, me.dec, me.jdmid)
elif kB in self.hdunames and kR in self.hdunames:
# both records
jdbeg_B = datetime2jd(
self[kB].header["DATE-BEG"], format="isot", tz_correction=8
)
jdend_B = datetime2jd(
self[kB].header["DATE-END"], format="isot", tz_correction=8
)
jdbeg_R = datetime2jd(
self[kR].header["DATE-BEG"], format="isot", tz_correction=8
)
jdend_R = datetime2jd(
self[kR].header["DATE-END"], format="isot", tz_correction=8
)
jdmid_B = (jdbeg_B + jdend_B) / 2.0
jdmid_R = (jdbeg_R + jdend_R) / 2.0
jdmid_delta = jdmid_B - jdmid_R
me.jdbeg = jdbeg_B
me.jdend = jdend_B
me.jdmid = jdmid_B
me.bjdmid = jd2bjd(me.ra, me.dec, me.jdmid)
except Exception as ex:
print("Keywords DATE-* not found from file {}!".format(me.filename))
return me
[docs]
def get_all_epochs(self, including_coadd=False, norm_type=None, **norm_kwargs):
# make keys
if not including_coadd:
all_keys = []
else:
all_keys = [
"COADD",
]
all_keys.extend(np.unique(self.lmjm[self.lmjm > 0]))
# return epochs
return [
self.get_one_epoch(k, norm_type=norm_type, **norm_kwargs) for k in all_keys
]
@property
def ls_epoch(self):
return np.unique(self.lmjm[self.lmjm > 0])
@property
def ls_snr(self):
return np.unique(self.snr[self.lmjm > 0])
@property
def epoch(self):
return self.lmjm
@property
def snr(self):
_snr = np.zeros((self.nhdu,), dtype=float)
for i in range(self.nhdu):
if "SNR" in self[i].header.keys():
_snr[i] = self[i].header["SNR"]
return _snr
[docs]
class MrsSource(np.ndarray):
"""array of MrsEpoch instances,"""
mes = [] # MrsEpoch list #
name = "" # source name
@property
def snr(self):
return np.array([_.snr for _ in self], dtype=float)
@property
def epoch(self):
return np.array([_.epoch for _ in self], dtype=float)
@property
def nepoch(self):
return len(self)
@property
def rv(self):
return np.array([_.rv for _ in self], dtype=float)
def __new__(cls, data, name="", norm_type=None, **norm_kwargs):
# prepare
# print(data)
data = np.array(data, dtype=MrsEpoch)
# sort
indsort = np.argsort([_.epoch for _ in data])
data = data[indsort]
# substantiate
msrc = super(MrsSource, cls).__new__(
cls, buffer=data, dtype=data.dtype, shape=data.shape
)
# normalize if necessary
msrc.normalize(norm_type=norm_type, **norm_kwargs)
return msrc
# def __repr__(self):
# s = "{{MrsSource nepoch={} name={}}}\n".format(self.nepoch, self.name)
# for i in range(self.nepoch):
# s += self.mes[i].__repr__().split("\n")[1]
# s += "\n"
# return s
[docs]
@staticmethod
def glob(fmt, norm_type=None, **norm_kwargs):
fps = glob.glob(fmt)
fps.sort()
return MrsSource.read(fps, norm_type=norm_type, **norm_kwargs)
[docs]
@staticmethod
def read(fps, norm_type=None, **norm_kwargs):
mes = []
for fp in fps:
mf = MrsFits(fp)
mes.extend(mf.get_all_epochs(norm_type=norm_type, **norm_kwargs))
return MrsSource(mes, norm_type=norm_type, **norm_kwargs)
[docs]
def normalize(self, norm_type=None, **norm_kwargs):
# normalization
for i in range(self.nepoch):
self[i].normalize(norm_type=norm_type, **norm_kwargs)
return
@property
def jdmid(self):
return np.array([_.__getattribute__("jdmid") for _ in self])
@property
def bjdmid(self):
return np.array([_.__getattribute__("bjdmid") for _ in self])
@property
def jdltt(self):
return np.array([_.__getattribute__("jdltt") for _ in self])
[docs]
def shiftplot(self, shift=1.0):
fig = plt.figure()
for i, me in enumerate(self):
plt.plot(me.wave, me.flux_norm + i * shift)
return fig
def get_kwd_safe(hdr, key, fallback=0.0):
try:
return hdr[key]
except Exception:
return fallback