Source code for laspec.ccf

"""
RVM without weights.


Vectorized functions
--------------------
xcorr_spec_vectorized(rv_grid, wave_obs, flux_obs, wave_mod, flux_mod)


Un-vectorized functions
-----------------------
cov(x1, x2)
├── xcorr(x1, x2)
|    ├── xcorr_spec(rv, wave_obs, flux_obs, wave_mod, flux_mod) -> float
|    |   ├── xcorr_spec_cost(rv, wave_obs, flux_obs, wave_mod, flux_mod)-> float
|    |   ├── xcorr_spec_rvgrid
|    |       ├── RVM.ccf_1mod
|    |       ├── RVM.measure
|    |       ├── RVM.measure_pw
|    ├── xcorr_spec_twin(rv1, drv, eta, wave_obs, flux_obs, wave_mod, flux_mod) -> float
|    ├── xcorr_spec_twin_gamma
|    ├── nxcorr_spec_twin_gamma
|    ├── nxcorr_spec_twin_gamma_BR
|    ├── xcorr_spec_twin_gamma_rv1grid
|        ├── No usage found
|    ├── xcorr_spec_binary(rv1, rv2, eta, wave_obs, flux_obs, wave_mod1, flux_mod1, wave_mod2, flux_mod2) -> float
|        ├── xcorr_spec_binary_cost(rv1_rv2_eta, wave_obs, flux_obs, wave_mod1, flux_mod1, wave_mod2, flux_mod2, eta_lim, drvmax) -> float
|            ├── xcorr_spec_binary_rv2grid

"""

__all__ = ["RVM", "SOL_kms"]

import datetime
import os
import warnings
from collections import OrderedDict
from typing import Optional, Union, Iterable

import joblib
import numpy as np
import numpy.typing as npt
from astropy import constants
from matplotlib import pyplot as plt
from scipy.optimize import minimize, OptimizeResult

from .mrs import MrsFits

SOL_kms = constants.c.value / 1000.0


# ================================================
# sine bell curves, from Tonry & Davis (1979)
# ================================================
def sinebell(n: int = 1000, index: float = 0.5) -> npt.NDArray:
    """sine bell to left & right end of spectra"""
    return np.sin(np.linspace(0, np.pi, n)) ** index


def sinebell_like(x, index: float = 0.5) -> npt.NDArray:
    return sinebell(len(x), index=index)


def test_sinebell() -> None:
    plt.figure()
    plt.plot(sinebell(4000, 1))
    plt.plot(sinebell(4000, 1 / 2))
    plt.plot(sinebell(4000, 1 / 3))
    return


def test_sinebell2() -> tuple[npt.NDArray, npt.NDArray]:
    """load data"""
    wave, flux, flux_err = np.loadtxt("/hydrogen/projects/song/delCep_order20.dat").T
    # flux_sine = flux - flux.mean()
    flux_sine = 1 - flux
    flux_sine = flux_sine * sinebell_like(flux, 1.0)

    plt.figure()
    plt.plot(wave, (flux - 1))
    plt.plot(wave, flux_sine)
    # plot(wave, flux_err)
    return wave, flux_sine


# ================================================
# CCF related functions
# ================================================
def cov(x1: npt.NDArray, x2: npt.NDArray) -> npt.NDArray:
    """Evaluate covariance."""
    return np.mean((x1 - np.mean(x1)) * (x2 - np.mean(x2)))


def xcorr(x1: npt.NDArray, x2: npt.NDArray) -> float:
    """Evaluate cross-correlation."""
    return cov(x1, x2) / np.sqrt(cov(x1, x1) * cov(x2, x2))


def xcorr_spec(
    rv: float,
    wave_obs: npt.NDArray,
    flux_obs: npt.NDArray,
    wave_mod: npt.NDArray,
    flux_mod: npt.NDArray,
) -> float:
    """Evaluate cross-correlation of two spectra."""
    flux_mod_interp = np.interp(wave_obs, wave_mod * (1 + rv / SOL_kms), flux_mod)
    return xcorr(flux_obs, flux_mod_interp)


def xcorr_spec_cost(
    rv: float,
    wave_obs: npt.NDArray,
    flux_obs: npt.NDArray,
    wave_mod: npt.NDArray,
    flux_mod: npt.NDArray,
) -> float:
    """Negative `xcorr_spec`, used as cost function for minimization"""
    return -xcorr_spec(rv, wave_obs, flux_obs, wave_mod, flux_mod)


def xcorr_spec_vectorized(
    wave_obs: npt.NDArray,
    flux_obs: npt.NDArray,
    wave_mod: npt.NDArray,
    flux_mod: npt.NDArray,
    rv_grid: npt.NDArray,
) -> npt.NDArray:
    """Vectorized cross-correlation at an `rv_grid`."""
    # determine shape
    npix = len(wave_obs)
    nmod = flux_mod.shape[0]
    nrv = len(rv_grid)
    # make model cube
    flux_mod_interp = np.zeros((nmod, nrv, npix), dtype=float)
    for imod in range(nmod):
        for irv in range(nrv):
            flux_mod_interp[imod, irv, :] = np.interp(
                wave_obs, wave_mod * (1 + rv_grid[irv] / SOL_kms), flux_mod[imod]
            )
    mean_obs = np.mean(flux_obs)
    mean_mod = np.mean(flux_mod_interp, axis=2)
    res0 = flux_obs - mean_obs
    res1 = flux_mod_interp - mean_mod[:, :, None]
    # underscore means it is not normalized
    _cov00 = np.sum(res0**2.0)  # var(F, F) float
    _cov11 = np.sum(res1**2.0, axis=2)  # var(G, G)  (nmod, nrv)
    _cov01 = np.sum(res0.reshape(1, 1, -1) * res1, axis=2)  # cov(F, G) (nmod, nrv)
    ccf_grid = _cov01 / np.sqrt(_cov00) / np.sqrt(_cov11)
    return ccf_grid


def xcorr_spec_twin(
    rv1: float,
    drv: float,
    eta: float,
    wave_obs: npt.NDArray,
    flux_obs: npt.NDArray,
    wave_mod: npt.NDArray,
    flux_mod: npt.NDArray,
) -> float:
    """Evaluate cross-correlation of two spectra (twin case) with luminosity ratio `eta`."""
    flux_mod_interp = np.interp(
        wave_obs, wave_mod * (1 + rv1 / SOL_kms), flux_mod
    ) + eta * np.interp(wave_obs, wave_mod * (1 + (rv1 + drv) / SOL_kms), flux_mod)
    return xcorr(flux_obs, flux_mod_interp)


def xcorr_spec_binary(
    rv1: float,
    rv2: float,
    eta: float,
    wave_obs: npt.NDArray,
    flux_obs: npt.NDArray,
    wave_mod1: npt.NDArray,
    flux_mod1: npt.NDArray,
    wave_mod2: npt.NDArray,
    flux_mod2: npt.NDArray,
) -> float:
    """Evaluate cross-correlation of two spectra (binary case)."""
    # combine two templates
    flux_mod_interp = np.interp(
        wave_obs, wave_mod1 * (1 + rv1 / SOL_kms), flux_mod1
    ) + eta * np.interp(wave_obs, wave_mod2 * (1 + rv2 / SOL_kms), flux_mod2)
    return xcorr(flux_obs, flux_mod_interp)


def derive_rv2(
    gamma: Union[float, npt.NDArray],
    q: Union[float, npt.NDArray],
    rv1: Union[float, npt.NDArray],
) -> Union[float, npt.NDArray]:
    """
    Derive `rv2` from `gamma`, `q`, and `rv1` for a binary system.

    Parameters
    ----------
    gamma : Union[float, npt.NDArray]
        RVs of center of mass
    q : Union[float, npt.NDArray]
        mass ratio = m2/m1
    rv1 : Union[float, npt.NDArray]
        RVs of primary star

    Returns
    -------
    rv2
        RVs of secondary star
    """
    return gamma * (1 + 1 / q) - rv1 / q


def xcorr_spec_twin_gamma(
    rv1: float,
    gamma: float,
    eta: float,
    q: float,
    wave_obs: npt.NDArray,
    flux_obs: npt.NDArray,
    wave_mod: npt.NDArray,
    flux_mod: npt.NDArray,
) -> float:
    """Evaluate cross-correlation of two spectra (twin case)"""
    rv2 = derive_rv2(gamma, q, rv1)
    # combine two templates
    flux_mod_interp = np.interp(
        wave_obs, wave_mod * (1 + rv1 / SOL_kms), flux_mod
    ) + eta * np.interp(wave_obs, wave_mod * (1 + rv2 / SOL_kms), flux_mod)
    return xcorr(flux_obs, flux_mod_interp)


def nxcorr_spec_twin_gamma(
    rv1: float,
    gamma: float,
    eta: float,
    q: float,
    wave_obs: npt.NDArray,
    flux_obs: npt.NDArray,
    wave_mod: npt.NDArray,
    flux_mod: npt.NDArray,
) -> float:
    """Negative cross-correlation of two spectra (twin case)."""
    rv2 = derive_rv2(gamma, q, rv1)
    # combine two templates
    flux_mod_interp = np.interp(
        wave_obs, wave_mod * (1 + rv1 / SOL_kms), flux_mod
    ) + eta * np.interp(wave_obs, wave_mod * (1 + rv2 / SOL_kms), flux_mod)
    return -xcorr(flux_obs, flux_mod_interp)


def nxcorr_spec_twin_gamma_BR(
    rv1: float,
    gamma: float,
    eta: float,
    q: float,
    wave_B: npt.NDArray,
    flux_B: npt.NDArray,
    wave_R: npt.NDArray,
    flux_R: npt.NDArray,
    wave_mod: npt.NDArray,
    flux_mod: npt.NDArray,
) -> float:
    """Sum of negative cross-correlation of two spectra (twin case) for B & R."""
    rv2 = derive_rv2(gamma, q, rv1)
    # combine two templates
    flux_mod_B = np.interp(
        wave_B, wave_mod * (1 + rv1 / SOL_kms), flux_mod
    ) + eta * np.interp(wave_B, wave_mod * (1 + rv2 / SOL_kms), flux_mod)
    flux_mod_R = np.interp(
        wave_R, wave_mod * (1 + rv1 / SOL_kms), flux_mod
    ) + eta * np.interp(wave_R, wave_mod * (1 + rv2 / SOL_kms), flux_mod)
    return -np.nansum((xcorr(flux_B, flux_mod_B), xcorr(flux_R, flux_mod_R)))


def xcorr_spec_twin_gamma_rv1grid(
    rv1_grid: npt.NDArray,
    gamma: float,
    eta: float,
    q: float,
    wave_obs: npt.NDArray,
    flux_obs: npt.NDArray,
    wave_mod: npt.NDArray,
    flux_mod: npt.NDArray,
) -> float:
    """cross-correlation of two spectra (twin case)"""
    rv2 = derive_rv2(gamma, q, rv1_grid)
    # combine two templates
    xcorr_results = np.zeros_like(rv1_grid, dtype=float)
    for irv in range(len(rv1_grid)):
        flux_mod_interp = np.interp(
            wave_obs, wave_mod * (1 + rv1_grid[irv] / SOL_kms), flux_mod
        ) + eta * np.interp(wave_obs, wave_mod * (1 + rv2[irv] / SOL_kms), flux_mod)
        xcorr_results[irv] = xcorr(flux_obs, flux_mod_interp)
    return -np.max(xcorr_results)


def xcorr_spec_binary_cost(
    rv1_rv2_eta: tuple[float, float, float],
    wave_obs: npt.NDArray,
    flux_obs: npt.NDArray,
    wave_mod1: npt.NDArray,
    flux_mod1: npt.NDArray,
    wave_mod2: npt.NDArray,
    flux_mod2: npt.NDArray,
    eta_lim: tuple[float, float] = (0.1, 1.2),
    drvmax: float = 500.0,
) -> float:
    """Negative of xcorr_spec_binary, used as cost function for minimization."""
    rv1, rv2, eta = rv1_rv2_eta
    if not eta_lim[0] < eta <= eta_lim[1] or np.abs(rv2 - rv1) > drvmax:
        return np.inf
    return -xcorr_spec_binary(
        rv1, rv2, eta, wave_obs, flux_obs, wave_mod1, flux_mod1, wave_mod2, flux_mod2
    )


def xcorr_spec_binary_rv2grid(
    wave_obs: npt.NDArray,
    flux_obs: npt.NDArray,
    wave_mod1: npt.NDArray,
    flux_mod1: npt.NDArray,
    wave_mod2: npt.NDArray,
    flux_mod2: npt.NDArray,
    flux_err: Optional[npt.NDArray] = None,
    rv1_init: float = 0.0,
    eta_init: float = 0.3,
    eta_lim: tuple[float, float] = (0.1, 1.2),
    drvmax: float = 500.0,
    drvstep: float = 5.0,
    method: str = "Powell",
    nmc: int = 100,
) -> Union[OptimizeResult | tuple]:
    """Evaluate cross-correlation of two spectra (binary case) based on RV2 grid."""
    # make grid for star 2
    rv2_grid = construct_rv_grid((-drvmax, drvmax, drvstep))
    # ccf2_grid
    ccf2_grid = np.zeros_like(rv2_grid, float)
    for irv2, rv2 in enumerate(rv2_grid):
        ccf2_grid[irv2] = xcorr_spec_binary(
            rv1_init,
            rv2,
            eta_init,
            wave_obs,
            flux_obs,
            wave_mod1,
            flux_mod1,
            wave_mod2,
            flux_mod2,
        )

    # find grid best
    ind_ccfmax = np.argmax(ccf2_grid)
    rv2_best = rv2_grid[ind_ccfmax]
    ccfmax = ccf2_grid[ind_ccfmax]

    if method is None:
        return rv2_best, ccfmax
    else:
        # optimization
        x0 = np.array([rv1_init, rv2_best, eta_init])
        opt = minimize(
            xcorr_spec_binary_cost,
            x0,
            method=method,
            args=(
                wave_obs,
                flux_obs,
                wave_mod1,
                flux_mod1,
                wave_mod2,
                flux_mod2,
                eta_lim,
            ),
        )
        opt["x0"] = x0
        opt["ccfmax2"] = -opt["fun"]

    if flux_err is not None:
        # add random noise to flux_obs
        xs = []
        for i in range(nmc):
            flux_mc = flux_obs + np.random.normal(0, 1, flux_obs.shape) * flux_err
            this_opt = minimize(
                xcorr_spec_binary_cost,
                opt.x,
                method=method,
                args=(
                    wave_obs,
                    flux_mc,
                    wave_mod1,
                    flux_mod1,
                    wave_mod2,
                    flux_mod2,
                    eta_lim,
                ),
            )
            xs.append(this_opt.x)
        opt["x_pct"] = np.percentile(np.array(xs), q=[16, 50, 84], axis=0)
    return opt


def respw_cost(
    rv: float,
    wave_obs: npt.NDArray,
    flux_obs: npt.NDArray,
    wave_mod: npt.NDArray,
    flux_mod: npt.NDArray,
    pw: float = 1,
) -> float:
    """Cost function of residual powered."""
    flux_mod_interp = np.interp(wave_obs, wave_mod * (1 + rv / SOL_kms), flux_mod)
    cost = np.sum(np.abs(flux_obs - flux_mod_interp) ** pw)
    return cost


def respw_rvgrid(
    wave_obs: npt.NDArray,
    flux_obs: npt.NDArray,
    wave_mod: npt.NDArray,
    flux_mod: npt.NDArray,
    rv_grid: npt.NDArray,
    pw: float = 1,
) -> npt.NDArray:
    """Cost function of residuals based on rv grid."""
    respw_grid = np.array(
        [
            respw_cost(rv, wave_obs, flux_obs, wave_mod, flux_mod, pw=pw)
            for rv in rv_grid
        ]
    )
    return respw_grid


def xcorr_spec_rvgrid(
    wave_obs: npt.NDArray,
    flux_obs: npt.NDArray,
    wave_mod: npt.NDArray,
    flux_mod: npt.NDArray,
    rv_grid: npt.NDArray,
) -> npt.NDArray:
    """Evaluate cross-correlation via Tonry & Davis (1979).

    Interpolate a model spectrum with different RV and cross-correlate with
    the observed spectrum, return the CCF at the RV grid.

    wave_obs: array
        Wavelength of observed spectrum (normalized).
    flux_obs: array
        Flux of observed spectrum.
    wave_mod: array
        Wavelength of model spectrum (normalized).
    flux_mod:
        Flux of model spectrum.
    rv_grid:
        RV grid, a tuple of (start, stop, step).

    """
    wave_obs = np.asarray(wave_obs)
    flux_obs = np.asarray(flux_obs)
    wave_mod = np.asarray(wave_mod)
    flux_mod = np.asarray(flux_mod)

    # RV grid --> CCF grid
    rv_grid = construct_rv_grid(rv_grid)
    # nz = len(z_grid)
    ccf_grid = np.ones_like(rv_grid, float)

    # calculate CCF
    for i_rv, this_rv in enumerate(rv_grid):
        ccf_grid[i_rv] = xcorr_spec(this_rv, wave_obs, flux_obs, wave_mod, flux_mod)

    return ccf_grid


[docs] class RVM: def __init__(self, pmod: npt.NDArray, wave_mod: npt.NDArray, flux_mod: npt.NDArray): """ Parameters: ----------- pmod: (n_model, *) parameters of model spectra wave_mod: (n_pixel,) wavelength of model spectra flux_mod: (n_model, n_pixel) normalized flux of model spectra """ print("@RVM2: initializing Radial Velocity Machine (RVM)...") # wavelength self.wave_mod = wave_mod # parameters if pmod.ndim == 2: self.pmod = pmod else: self.pmod = pmod.reshape(1, -1) # flux if flux_mod.ndim == 2: self.flux_mod = flux_mod else: self.flux_mod = flux_mod.reshape(1, -1) # dimensions self.nparam = self.pmod.shape[1] self.nmod, self.npix = self.flux_mod.shape # cache names self.cache_names = [] def __repr__(self): return "<RVM [nmod={}] [{:.1f}<lambda<{:.1f}]>".format( self.nmod, self.wave_mod[0], self.wave_mod[-1] )
[docs] def make_cache( self, cache_name: str = "B", wave_range: Union[list[tuple], tuple] = (5000, 5300), rv_grid: Union[tuple, npt.NDArray] = (-1000, 1000, 10), ) -> None: """Make cache for fast RV measurements, for single exposures only. Parameters ---------- cache_name: Suffix of cached data. wave_range: Wavelength range(s). rv_grid: A tuple of (rv_start, rv_stop, rv_step) or a list of tuples. """ print("@RVM: making cache ...") if isinstance(wave_range, list): # multiple ranges ind_wave_cache = np.zeros_like(self.wave_mod, dtype=bool) for _wave_range in wave_range: this_ind = (self.wave_mod > _wave_range[0]) & ( self.wave_mod < _wave_range[1] ) print( "@RVM: adding {} pixels in range [{}, {}]".format( np.sum(this_ind), *_wave_range ) ) ind_wave_cache |= this_ind else: # single range ind_wave_cache = (self.wave_mod > wave_range[0]) & ( self.wave_mod < wave_range[1] ) print( "@RVM: adding {} pixels in range [{}, {}]".format( np.sum(ind_wave_cache), *wave_range ) ) # cache wavelength wave_cache = self.wave_mod[ind_wave_cache] # cache rv_grid rv_grid_cache = construct_rv_grid(rv_grid) # cache model flux npix = len(wave_cache) nmod = self.nmod nrv = len(rv_grid_cache) flux_mod_cache = np.zeros((nmod, nrv, npix), dtype=float) for imod in range(nmod): for irv in range(nrv): flux_mod_cache[imod, irv, :] = np.interp( wave_cache, self.wave_mod * (1 + rv_grid_cache[irv] / SOL_kms), self.flux_mod[imod], ) self.__setattr__("wave_mod_cache_{}".format(cache_name), wave_cache) self.__setattr__("rv_grid_cache_{}".format(cache_name), rv_grid_cache) self.__setattr__("flux_mod_cache_{}".format(cache_name), flux_mod_cache) # statistics _mean1 = np.mean(flux_mod_cache, axis=2) _res1 = flux_mod_cache - _mean1[:, :, None] _cov11 = np.sum(_res1**2.0, axis=2) # var(G, G) (nmod, nrv) self.__setattr__("_mean1_cache_{}".format(cache_name), _mean1) self.__setattr__("_res1_cache_{}".format(cache_name), _res1) self.__setattr__("_cov11_cache_{}".format(cache_name), _cov11) # update cache names self.cache_names.append(cache_name)
[docs] def delete_cache(self, cache_name: str) -> None: """delete cache""" assert cache_name in self.cache_names print("@RVM: deleting cache [cache_name={}]...".format(cache_name)) self.__delattr__("wave_mod_cache_{}".format(cache_name)) self.__delattr__("rv_grid_cache_{}".format(cache_name)) self.__delattr__("flux_mod_cache_{}".format(cache_name)) self.__delattr__("_mean1_cache_{}".format(cache_name)) self.__delattr__("_res1_cache_{}".format(cache_name)) self.__delattr__("_cov11_cache_{}".format(cache_name)) self.cache_names.remove(cache_name)
[docs] def shrink(self, nmod: Union[float, int] = 0.5, method: str = "top"): # determine number of models if 0 < nmod < 1: assert self.nmod * nmod >= 1 nmod = int(self.nmod * nmod) elif nmod > 1: nmod = int(nmod) else: raise ValueError("Invalid nmod value: {}".format(nmod)) # determine ind of new models assert method in ["top", "bottom", "random"] if method == "top": ind = np.arange(self.nmod)[:nmod] elif method == "bottom": ind = np.arange(self.nmod)[-nmod:] elif method == "random": ind = np.random.choice(np.arange(self.nmod), size=nmod, replace=False) else: raise ValueError(f"Invalid method {method}") # construct new RVM return RVM(self.pmod[ind, :], self.wave_mod, self.flux_mod[ind, :])
[docs] def measure( self, wave_obs: npt.NDArray, flux_obs: npt.NDArray, rv_grid: Union[tuple, list, npt.NDArray] = (-1000, 1000, 10), flux_err: Optional[npt.NDArray] = None, flux_bounds: tuple[float, float] = (0, 3.0), nmc: int = 100, method: str = "BFGS", cache_name: Optional[str] = None, return_ccfgrid: bool = False, ) -> dict: """Measure RV for a single spectrum. Parameters ---------- wave_obs: observed wavelength flux_obs: observed flux (normalized) flux_err: observed flux error rv_grid: Union[tuple, list, npt.NDArray] if cache, use the cached rv_grid else, use this rv_grid flux_bounds: flux bounds nmc: number of MC repeats method: optimization method cache_name: cache name. if None: no acceleration; if "vector": partial acceleration. return_ccfgrid: if True, return ccfgrid Returns ------- dict RV measurement. """ # clip extreme values ind3 = (flux_obs > flux_bounds[0]) & (flux_obs < flux_bounds[1]) flux_obs = np.interp(wave_obs, wave_obs[ind3], flux_obs[ind3]) # CCF grid if cache_name in self.cache_names: # if cache exists flux0 = np.interp( self.__getattribute__("wave_mod_cache_{}".format(cache_name)), wave_obs[ind3], flux_obs[ind3], left=1, right=1, ) mean0 = np.mean(flux0) res0 = flux0 - mean0 # underscore means it is not normalized _cov00 = np.sum(res0**2.0) # var(F, F) float _cov01 = np.sum( res0.reshape(1, 1, -1) * self.__getattribute__("_res1_cache_{}".format(cache_name)), axis=2, ) # cov(F, G) (nmod, nrv) ccf_grid = ( _cov01 / np.sqrt(_cov00) / np.sqrt(self.__getattribute__("_cov11_cache_{}".format(cache_name))) ) elif cache_name == "matrix": # vectorize data to accelerate # e.g., 100 templates x 200 rv values (-1000 to 1000, a step of 10) x 3347 pixels takes 450MB memory rv_grid = construct_rv_grid(rv_grid) ccf_grid = xcorr_spec_vectorized( wave_obs, flux_obs, self.wave_mod, self.flux_mod, rv_grid ) elif cache_name is None or cache_name is False: # no cache rv_grid = construct_rv_grid(rv_grid) ccf_grid = np.zeros((self.flux_mod.shape[0], rv_grid.shape[0])) for imod in range(self.nmod): ccf_grid[imod] = xcorr_spec_rvgrid( wave_obs=wave_obs, flux_obs=flux_obs, wave_mod=self.wave_mod, flux_mod=self.flux_mod[imod], rv_grid=rv_grid, ) else: raise ValueError( "@RVM: invalid cache_name: {}. valid options:{} or matrix".format( cache_name, self.cache_names ) ) # CCF max ccf_max = np.max(ccf_grid) imod, irv_best = np.unravel_index(np.argmax(ccf_grid), ccf_grid.shape) if cache_name in self.cache_names: rv_best = self.__getattribute__("rv_grid_cache_{}".format(cache_name))[ irv_best ] else: rv_best = rv_grid[irv_best] # CCF opt opt = minimize( xcorr_spec_cost, x0=rv_best, args=(wave_obs, flux_obs, self.wave_mod, self.flux_mod[imod]), method=method, ) # Powell result = dict( rv_opt=float(opt.x), rv_err=float(opt.hess_inv) if method == "BFGS" else np.nan, rv_best=rv_best, ccfmax=-opt["fun"], success=opt.success, imod=imod, pmod=self.pmod[imod], status=opt["status"], ) if flux_err is not None: x_mc = np.zeros(nmc, dtype=float) for i in range(nmc): # CCF opt flux_mc = flux_obs + np.random.normal(0, 1, flux_obs.shape) * flux_err opt = minimize( xcorr_spec_cost, x0=rv_best, args=(wave_obs, flux_mc, self.wave_mod, self.flux_mod[imod]), method=method, ) x_mc[i] = opt.x result["rv_pct"] = np.percentile(x_mc, [16, 50, 84]) if return_ccfgrid: result["ccf_grid"] = ccf_grid return result
[docs] def measure_multiepoch( self, wave_obs_list: list, flux_obs_list: list, flux_err_obs_list, cache_name: str = "BR", flux_bounds: tuple = (0.0, 3.0), method: str = "BFGS", eta_init: float = 0.4, eta_lim: tuple = (0.1, 2.0), drvmax: float = 500.0, drvstep: float = 10, nmc: int = 100, verbose: bool = False, ): """Determine the RVs of multi-epoch spectra. For best performance, all input spectra are assumed to have the same wavelength coverage. Those with only one good arm. Parameters ---------- wave_obs_list: list of wavelength flux_obs_list list of flux flux_err_obs_list: list flux_err cache_name: Cache name. flux_bounds: Flux bounds. Returns ------- list[dict] RV measurements. """ if verbose: print(datetime.datetime.now(), "starting") # cache algorithm needed assert cache_name in self.cache_names # assert cache is ready assert len(wave_obs_list) == len(flux_obs_list) n_spec = len(wave_obs_list) # initialize ccf arrays ccf_max_grid = np.zeros(n_spec, int) imod_best_grid = np.zeros(n_spec, int) irv_best_grid = np.zeros(n_spec, int) if verbose: print(datetime.datetime.now(), "cache grid for best template") # loop spectra for i_spec in range(n_spec): # get current spectrum wave_obs = wave_obs_list[i_spec] flux_obs = flux_obs_list[i_spec] # clip extreme values ind_inbounds = (flux_obs > flux_bounds[0]) & (flux_obs < flux_bounds[1]) flux_obs = np.interp( wave_obs, wave_obs[ind_inbounds], flux_obs[ind_inbounds] ) # interpolate to cache wavelength grid flux0 = np.interp( self.__getattribute__("wave_mod_cache_{}".format(cache_name)), wave_obs[ind_inbounds], flux_obs[ind_inbounds], left=1, right=1, ) # calculate ccf_grid mean0 = np.mean(flux0) res0 = flux0 - mean0 # underscore means it is not normalized _cov00 = np.sum(res0**2.0) # var(F, F) float _cov01 = np.sum( res0.reshape(1, 1, -1) * self.__getattribute__("_res1_cache_{}".format(cache_name)), axis=2, ) # cov(F, G) (nmod, nrv) ccf_grid = ( _cov01 / np.sqrt(_cov00) / np.sqrt(self.__getattribute__("_cov11_cache_{}".format(cache_name))) ) # CCF max ccf_max_grid[i_spec] = np.max(ccf_grid) imod_best_grid[i_spec], irv_best_grid[i_spec] = np.unravel_index( np.argmax(ccf_grid), ccf_grid.shape ) # select the best template imod_selected = imod_best_grid[np.argmax(ccf_max_grid)] # initialize ccf arrays ccf_max_grid = np.zeros(n_spec, float) irv_best_grid = np.zeros(n_spec, int) if verbose: print(datetime.datetime.now(), "template selected, calculating rv ...") res_list = [] # loop spectra for i_spec in range(n_spec): # get current spectrum wave_obs = wave_obs_list[i_spec] flux_obs = flux_obs_list[i_spec] # clip extreme values ind_inbounds = (flux_obs > flux_bounds[0]) & (flux_obs < flux_bounds[1]) flux_obs = np.interp( wave_obs, wave_obs[ind_inbounds], flux_obs[ind_inbounds] ) # interpolate to cache wavelength grid flux0 = np.interp( self.__getattribute__("wave_mod_cache_{}".format(cache_name)), wave_obs[ind_inbounds], flux_obs[ind_inbounds], left=1, right=1, ) # calculate ccf_grid mean0 = np.mean(flux0) res0 = flux0 - mean0 # underscore means it is not normalized _cov00 = np.sum(res0**2.0) # var(F, F) float _cov01 = np.sum( res0.reshape(1, -1) * self.__getattribute__("_res1_cache_{}".format(cache_name))[ imod_selected ], axis=1, ) # cov(F, G) (nrv,) ccf_grid = ( _cov01 / np.sqrt(_cov00) / np.sqrt( self.__getattribute__("_cov11_cache_{}".format(cache_name))[ imod_selected ] ) ) # CCF max ccf_max_grid[i_spec] = np.max(ccf_grid) irv_best_grid[i_spec] = np.argmax(ccf_grid) rv_best_grid = self.__getattribute__("rv_grid_cache_{}".format(cache_name))[ irv_best_grid[i_spec] ] # CCF opt opt = minimize( xcorr_spec_cost, x0=self.__getattribute__("rv_grid_cache_{}".format(cache_name))[ irv_best_grid[i_spec] ], args=(wave_obs, flux_obs, self.wave_mod, self.flux_mod[imod_selected]), method=method, ) # Powell # store single star result this_res = dict(n_spec=n_spec) this_res["rv1_opt_{}".format(cache_name)] = float(opt.x) this_res["rv1_best_{}".format(cache_name)] = rv_best_grid this_res["ccfmax1_{}".format(cache_name)] = -opt["fun"] this_res["imod_{}".format(cache_name)] = imod_selected this_res["pmod_{}".format(cache_name)] = self.pmod[imod_selected] this_res["status1_{}".format(cache_name)] = opt["status"] this_res["success1_{}".format(cache_name)] = opt.success # Monte Carlo for error if flux_err_obs_list is not None: flux_err_obs = flux_err_obs_list[i_spec] x_mc = np.zeros(nmc, dtype=float) for i in range(nmc): # CCF opt flux_mc = ( flux_obs + np.random.normal(0, 1, flux_obs.shape) * flux_err_obs ) opt = minimize( xcorr_spec_cost, x0=self.__getattribute__("rv_grid_cache_{}".format(cache_name))[ irv_best_grid[i_spec] ], args=( wave_obs, flux_mc, self.wave_mod, self.flux_mod[imod_selected], ), method=method, ) x_mc[i] = opt.x this_res["rv1_pct_{}".format(cache_name)] = np.percentile( x_mc, [16, 50, 84] ) this_res["rv1_err_{}".format(cache_name)] = ( float(opt.hess_inv) if method == "BFGS" else np.mean(np.diff(this_res["rv1_pct_{}".format(cache_name)])) ) else: flux_err_obs = None this_res["rv1_err_{}".format(cache_name)] = ( float(opt.hess_inv) if method == "BFGS" else np.nan ) # measure double components """ given a template, optimize (rv1, drv, eta) """ rvr2 = xcorr_spec_binary_rv2grid( wave_obs, flux_obs, self.wave_mod, self.flux_mod[imod_selected], self.wave_mod, self.flux_mod[imod_selected], flux_err=flux_err_obs, rv1_init=this_res["rv1_opt_{}".format(cache_name)], eta_init=eta_init, eta_lim=eta_lim, drvmax=drvmax, drvstep=drvstep, method=method, nmc=nmc, ) ( this_res["rv1_{}".format(cache_name)], this_res["rv2_{}".format(cache_name)], this_res["eta_{}".format(cache_name)], ) = rvr2["x"] this_res["dccfmax_{}".format(cache_name)] = ( rvr2["ccfmax2"] - this_res["ccfmax1_{}".format(cache_name)] ) this_res["ccfmax2_{}".format(cache_name)] = rvr2["ccfmax2"] this_res["rv1_rv2_eta0_{}".format(cache_name)] = rvr2["x0"] this_res["rv1_rv2_eta_{}".format(cache_name)] = rvr2["x"] this_res["rv1_rv2_eta_pct_{}".format(cache_name)] = rvr2["x_pct"] this_res["rv1_rv2_eta_err_{}".format(cache_name)] = ( rvr2["x_pct"][2] - rvr2["x_pct"][0] ) / 2 this_res["success_2_{}".format(cache_name)] = rvr2["success"] this_res["status2_{}".format(cache_name)] = rvr2["status"] if verbose: print(datetime.datetime.now(), "finished") res_list.append(this_res) return res_list
[docs] def measure2( self, wave_obs: npt.NDArray, flux_obs: npt.NDArray, flux_err: npt.NDArray, wave_mod1: npt.NDArray, flux_mod1: npt.NDArray, wave_mod2: npt.NDArray, flux_mod2: npt.NDArray, rv1_init: float = 0.0, eta_init: float = 0.3, eta_lim=(0.1, 1.0), drvmax: float = 500.0, drvstep: float = 5.0, method: str = "Powell", nmc: int = 100, ): """given a template, optimize (rv1, drv, eta)""" opt = xcorr_spec_binary_rv2grid( wave_obs, flux_obs, wave_mod1, flux_mod1, wave_mod2, flux_mod2, flux_err, rv1_init=rv1_init, eta_init=eta_init, eta_lim=eta_lim, drvmax=drvmax, drvstep=drvstep, method=method, nmc=nmc, ) return opt
[docs] def mock_binary_spectrum(self, imod1, imod2, rv1, drv, eta): """make mock binary spectrum""" flux_mod_interp = np.interp( self.wave_mod, self.wave_mod * (1 + rv1 / SOL_kms), self.flux_mod[imod1] ) + eta * np.interp( self.wave_mod, self.wave_mod * (1 + (rv1 + drv) / SOL_kms), self.flux_mod[imod2], ) return flux_mod_interp
[docs] def reproduce_spectrum_single(self, rvr: dict): """reproduce the spectrum""" imod1 = rvr["imod1"] rv1 = rvr["rv1"] flux_mod_interp = np.interp( self.wave_mod, self.wave_mod * (1 + rv1 / SOL_kms), self.flux_mod[imod1] ) return flux_mod_interp
[docs] def reproduce_spectrum_binary(self, rvr): """reproduce the binary spectrum""" imod1 = rvr["imod1"] imod2 = rvr["imod2"] rv1, drv, eta = rvr["rv1_drv_eta"] flux_mod_interp = np.interp( self.wave_mod, self.wave_mod * (1 + rv1 / SOL_kms), self.flux_mod[imod1] ) + eta * np.interp( self.wave_mod, self.wave_mod * (1 + (rv1 + drv) / SOL_kms), self.flux_mod[imod2], ) return flux_mod_interp / (1 + eta)
[docs] def measure_binary( self, wave_obs: npt.NDArray, flux_obs: npt.NDArray, rv_grid: Union[tuple, list, npt.NDArray] = (-1000, 1000, 10), flux_err: Optional[npt.NDArray] = None, cache_name: str = "B", flux_bounds: tuple = (0.0, 3.0), twin: bool = True, eta_init: float = 0.3, eta_lim: tuple = (0.01, 3.0), drvmax: float = 500.0, drvstep: float = 5.0, method: str = "Powell", nmc: int = 100, suffix: str = "", return_ccfgrid: bool = False, return_spec: bool = False, ): # clip extreme values ind3 = (flux_obs > flux_bounds[0]) & (flux_obs < flux_bounds[1]) flux_obs = np.interp(wave_obs, wave_obs[ind3], flux_obs[ind3]) # RV1 rv_grid = construct_rv_grid(rv_grid) rvr1 = self.measure( wave_obs, flux_obs, rv_grid=rv_grid, flux_err=flux_err, nmc=nmc, cache_name=cache_name, return_ccfgrid=return_ccfgrid, ) # determine the secondary template if necessary if twin: imod2 = rvr1["imod"] else: # fix one template and calculate RV2 drv_best = np.zeros((self.nmod,), float) ccfmax = np.zeros((self.nmod,), float) for i in range(self.nmod): drv_best[i], ccfmax[i] = self.measure2( wave_obs, flux_obs, flux_err=flux_err, wave_mod1=self.wave_mod, flux_mod1=self.flux_mod[rvr1["imod"]], wave_mod2=self.wave_mod, flux_mod2=self.flux_mod[i], rv1_init=rvr1["rv_opt"], eta_init=eta_init, eta_lim=eta_lim, drvmax=drvmax, drvstep=drvstep, method=None, ) # best secondary imod2 = np.argmax(ccfmax) rvr2 = self.measure2( wave_obs, flux_obs, flux_err, wave_mod1=self.wave_mod, flux_mod1=self.flux_mod[rvr1["imod"]], wave_mod2=self.wave_mod, flux_mod2=self.flux_mod[imod2], rv1_init=rvr1["rv_opt"], eta_init=eta_init, eta_lim=eta_lim, drvmax=drvmax, drvstep=drvstep, method=method, nmc=nmc, ) if suffix == "" or suffix is None: suffix = "" else: suffix = "_{}".format(suffix) if flux_err is None: rvr = OrderedDict() rvr["rv1{}".format(suffix)] = rvr1["rv_opt"] rvr["ccfmax1{}".format(suffix)] = rvr1["ccfmax"] rvr["rv1_best{}".format(suffix)] = rvr1["rv_best"] rvr["imod1{}".format(suffix)] = rvr1["imod"] rvr["pmod1{}".format(suffix)] = rvr1["pmod"] rvr["imod2{}".format(suffix)] = imod2 rvr["pmod2{}".format(suffix)] = self.pmod[imod2] rvr["success1{}".format(suffix)] = rvr1["success"] rvr["ccfmax2{}".format(suffix)] = rvr2["ccfmax2"] rvr["success2{}".format(suffix)] = rvr2["success"] rvr["rv1_rv2_eta0{}".format(suffix)] = rvr2["x0"] rvr["rv1_rv2_eta{}".format(suffix)] = rvr2["x"] rvr["status1{}".format(suffix)] = rvr1["status"] rvr["status2{}".format(suffix)] = rvr2["status"] else: rvr = OrderedDict() rvr["rv1{}".format(suffix)] = rvr1["rv_opt"] rvr["rv1_pct{}".format(suffix)] = rvr1["rv_pct"] rvr["rv1_err{}".format(suffix)] = ( rvr1["rv_pct"][2] - rvr1["rv_pct"][0] ) / 2 rvr["ccfmax1{}".format(suffix)] = rvr1["ccfmax"] rvr["rv1_best{}".format(suffix)] = rvr1["rv_best"] rvr["imod1{}".format(suffix)] = rvr1["imod"] rvr["pmod1{}".format(suffix)] = rvr1["pmod"] rvr["imod2{}".format(suffix)] = imod2 rvr["pmod2{}".format(suffix)] = self.pmod[imod2] rvr["success1{}".format(suffix)] = rvr1["success"] rvr["ccfmax2{}".format(suffix)] = rvr2["ccfmax2"] rvr["success2{}".format(suffix)] = rvr2["success"] rvr["rv1_rv2_eta0{}".format(suffix)] = rvr2["x0"] rvr["rv1_rv2_eta{}".format(suffix)] = rvr2["x"] rvr["rv1_rv2_eta_pct{}".format(suffix)] = rvr2["x_pct"] rvr["rv1_rv2_eta_err{}".format(suffix)] = ( rvr2["x_pct"][2] - rvr2["x_pct"][0] ) / 2 rvr["status1{}".format(suffix)] = rvr1["status"] rvr["status2{}".format(suffix)] = rvr2["status"] # rvr["b_rv1{}".format(suffix)] = rvr2["x_pct"][0] # rvr["b_rv2{}".format(suffix)] = rvr2["x_pct"][1] # rvr["b_eta{}".format(suffix)] = rvr2["x_pct"][2] # rvr["b_rv1_err{}".format(suffix)] = rvr2["x_pct"][0] # rvr["b_rv2_err{}".format(suffix)] = rvr2["x_pct"][1] # rvr["b_eta_err{}".format(suffix)] = rvr2["x_pct"][2] if return_ccfgrid: rvr["ccf_grid{}".format(suffix)] = rvr1["ccf_grid"] if return_spec: rvr["spec{}".format(suffix)] = wave_obs, flux_obs # if method is "BFGS": # rvr["hess_inv"] = rvr2["hess_inv"] return rvr
[docs] def ccf_1mod( self, wave_mod: npt.NDArray, flux_mod: npt.NDArray, wave_obs: npt.NDArray, flux_obs: npt.NDArray, rv_grid: Union[tuple, list, npt.NDArray] = (-1000, 1000, 10), flux_bounds: tuple = (0, 3.0), ) -> tuple[npt.NDArray, npt.NDArray]: """Calculate CCF for a given flux_mod and a flux_obs at `rv_grid`.""" # clip extreme values ind3 = (flux_obs > flux_bounds[0]) & (flux_obs < flux_bounds[1]) flux_obs = np.interp(wave_obs, wave_obs[ind3], flux_obs[ind3]) # CCF grid rv_grid = construct_rv_grid(rv_grid) ccf_grid = xcorr_spec_rvgrid(wave_obs, flux_obs, wave_mod, flux_mod[1], rv_grid) return rv_grid, ccf_grid
[docs] def chi2_1mod( self, imod: int, wave_obs: npt.NDArray, flux_obs: npt.NDArray, rv_grid: Union[tuple, list, npt.NDArray] = (-1000, 1000, 10), pw: float = 2, flux_bounds: tuple[float, float] = (0.0, 3.0), ): """measure RV""" # clip extreme values ind3 = (flux_obs > flux_bounds[0]) & (flux_obs < flux_bounds[1]) flux_obs = np.interp(wave_obs, wave_obs[ind3], flux_obs[ind3]) # respw grid rv_grid = construct_rv_grid(rv_grid) respw_grid = respw_rvgrid( wave_obs, flux_obs, self.wave_mod, self.flux_mod[imod], rv_grid, pw=pw, ) return rv_grid, respw_grid
[docs] def measure_pw( self, wave_obs: npt.NDArray, flux_obs: npt.NDArray, rv_grid: Union[tuple, list, npt.NDArray] = (-1000, 1000, 10), method: str = "BFGS", pw: float = 1.0, ): # clip extreme values ind3 = (flux_obs < 3) & (flux_obs > 0.0) flux_obs = np.interp(wave_obs, wave_obs[ind3], flux_obs[ind3]) # CCF grid rv_grid = construct_rv_grid(rv_grid) ccf = np.zeros((self.flux_mod.shape[0], rv_grid.shape[0])) for j in range(self.flux_mod.shape[0]): ccf[j] = xcorr_spec_rvgrid( wave_obs, flux_obs, self.wave_mod, self.flux_mod[j][1], rv_grid ) # CCF max ccfmax = np.max(ccf) ind_best = np.where(ccfmax == ccf) imod = ind_best[0][0] irv_best = ind_best[1][0] rv_best = rv_grid[irv_best] # CCF opt opt = minimize( respw_cost, x0=rv_best, args=(wave_obs, flux_obs, self.wave_mod, self.flux_mod[imod], pw), method=method, ) # opt = minimize(ccf_cost_interp, x0=rv_best, args=(wave_obs, flux_obs, wave_mod, flux_mod[imod_best]), # method="Powell") # x = np.interp(wave, wave_obs/(1+opt.x/SOL_kms), flux_obs).reshape(1, -1) return dict( rv_opt=float(opt.x), rv_err=float(opt.hess_inv) if method == "BFGS" else np.nan, rv_best=rv_best, ccfmax=ccfmax, success=opt.success, imod=imod, pmod=self.pmod[imod], opt=opt, )
[docs] def measure_binary_mrsbatch( self, fp: str, lmjm: int, snr_B: Optional[float] = None, snr_R: Optional[float] = None, snr_threshold: float = 5.0, raise_error: bool = False, ) -> OrderedDict: """this is the configuration used in""" # read spectrum mrv_kwargs = { "rv_grid": (-1000, 1000, 10), "eta_init": 0.5, "eta_lim": (0.01, 3.0), } mf = MrsFits(fp.strip()) try: # blue arm ms = mf.get_one_spec(lmjm=lmjm, band="B") if snr_B is not None: assert snr_B > snr_threshold else: assert ms.snr > snr_threshold # cosmic ray removal msr = ms.reduce(npix_cushion=70, norm_type="spline", niter=2) rvr_B = self.measure_binary( msr.wave, msr.flux_norm, flux_err=msr.flux_norm_err, cache_name="B", nmc=50, **mrv_kwargs, suffix="B", ) rvr_B["bjdmid"] = ms.bjdmid except Exception as e_: if raise_error: raise e_ rvr_B = {} try: # red arm ms = mf.get_one_spec(lmjm=lmjm, band="R") if snr_B is not None: assert snr_R > snr_threshold else: assert ms.snr > snr_threshold # cosmic ray removal msr = ms.reduce(npix_cushion=70, norm_type="spline", niter=2) # cut 6800+A ind_use = msr.wave < 6800 rvr_R = self.measure_binary( msr.wave[ind_use], msr.flux_norm[ind_use], flux_err=msr.flux_norm_err[ind_use], cache_name="R", nmc=50, **mrv_kwargs, suffix="R", ) rvr_R["bjdmid"] = ms.bjdmid # ind_use = (msr.wave < 6800) & ((msr.wave < 6540) | (msr.wave > 6590)) # rvr_Rm = self.measure_binary(msr.wave[ind_use], msr.flux_norm[ind_use], # flux_err=msr.flux_norm_err[ind_use], nmc=50, **mrv_kwargs, suffix="Rm") except Exception as e_: if raise_error: raise e_ rvr_R = {} # rvr_Rm = {} rvr_B.update(rvr_R) # rvr_B.update(rvr_Rm) return rvr_B
[docs] def mrsbatch( self, fp_list: list[str], lmjm_list: list[int], snr_B_list: list[float], snr_R_list: list[float], snr_threshold: float = 5.0, fpout: Optional[str] = None, ): if os.path.exists(fpout): return nspec = len(fp_list) with warnings.catch_warnings(): warnings.simplefilter("ignore") rvr = [ self.measure_binary_mrsbatch( fp_list[i], lmjm_list[i], snr_B_list[i], snr_R_list[i], snr_threshold=snr_threshold, ) for i in range(nspec) ] if fpout is None: return rvr else: joblib.dump(rvr, fpout)
def construct_rv_grid( rv_grid: Union[tuple, list, npt.NDArray] = (-1000, 1000, 10) ) -> npt.NDArray: """Construct rv_grid form input. Parameters ---------- rv_grid : Union[tuple, list, npt.NDArray] RV grid or parameters. - tuple/list: (rv_start, rv_stop, rv_step) - npt.NdArray: return grid Returns ------- npt.NDArray RV grid. """ if isinstance(rv_grid, (list, tuple)): # RV grid parameters assert ( isinstance(rv_grid, tuple) and len(rv_grid) == 3 ), "The input `rv_grid` should be a tuple of (rv_start, rv_stop, rv_step)!" return np.arange(rv_grid[0], rv_grid[1] + rv_grid[2], rv_grid[2], dtype=float) else: # RV grid already return np.asarray(rv_grid, dtype=float) def test_rvm_20240401(): import joblib import numpy as np from laspec import RVM rvmdata = joblib.load("/Users/cham/projects/lamost/rvm/RVMDATA_R7500_M8.joblib") rvm = RVM(**rvmdata) i_test = 1 print(rvm.pmod[i_test]) rvm.make_cache(cache_name="B", wave_range=(5000, 5300), rv_grid=(-1000, 1000, 10)) index_B = (rvm.wave_mod > 5000) & (rvm.wave_mod < 5300) rv = 123.45 wave_obs = rvm.wave_mod[index_B] * (1 + rv / SOL_kms) flux_obs = rvm.flux_mod[i_test][index_B] + np.random.normal( loc=0, scale=0.05, size=wave_obs.shape ) # %%time rvm.measure(wave_obs, flux_obs, flux_err=None, cache_name=None) # %%time rvm.measure(wave_obs, flux_obs, flux_err=None, cache_name="matrix") # %%time rvm.measure(wave_obs, flux_obs, flux_err=None, cache_name="B") # %%time rvm.measure_binary( wave_obs, flux_obs, rv_grid=(-1000, 1000, 10), flux_err=None, cache_name="B" ) # def test_new_rvm(): # import joblib # # rvm = RVM( # joblib.load( # "/Users/cham/PycharmProjects/laspec/laspec/data/rvm/v8_rvm_pmod.dump" # ), # joblib.load( # "/Users/cham/PycharmProjects/laspec/laspec/data/rvm/v8_rvm_wave_mod.dump" # ), # joblib.load( # "/Users/cham/PycharmProjects/laspec/laspec/data/rvm/v8_rvm_flux_mod.dump" # ), # ) # # waveBR, spec_list = joblib.load( # "/Users/cham/projects/sb2/test_ccf/wave_flux_30.dump" # ) # wave_obs = waveBR[waveBR < 5500] # npix = len(wave_obs) # # %%%% read spectra # import glob # # fps = glob.glob("./*.fits.gz") # from laspec.mrs import MrsSource, debad # # ms = MrsSource.read(fps) # # # %% # fig, axs = plt.subplots(1, 2) # for i, me in enumerate(ms[:1]): # # wave_obs = me.wave_B # # flux_obs = me.flux_norm_B # wave_obs, flux_obs = ( # me.wave_B[50:-50], # debad(me.wave_B, me.flux_norm_B, nsigma=(4, 8), maxiter=5)[50:-50], # ) # axs[0].plot(wave_obs, flux_obs + i, "k") # # # measure RV # rvr = rvm.measure(wave_obs, flux_obs) # print(rvr) # imod = rvr["imod"] # rv_grid, ccf_grid = rvm.ccf_1mod( # rvm.wave_mod, # rvm.flux_mod[imod], # wave_obs, # flux_obs, # rv_grid=(-2000, 2000, 5), # ) # axs[1].plot(rv_grid, ccf_grid + i, "b") # # # %%time # rvr = [] # # for i, me in enumerate(ms[:]): # print(i) # # # wave_obs = me.wave_B # # flux_obs = me.flux_norm_B # # # remove cosmic rays # wave_obs, flux_obs = ( # me.wave_B[50:-50], # debad(me.wave_B, me.flux_norm_B, nsigma=(4, 8), maxiter=5)[50:-50], # ) # # measure binary # this_rvr = rvm.measure_binary( # wave_obs, # flux_obs, # rv_grid=(-600, 600, 10), # flux_bounds=(0, 3.0), # eta_init=0.3, # drvmax=500, # drvstep=5, # method="Powell", # ) # this_rvr["lmjm"] = me.epoch # this_rvr["snr"] = me.snr[0] # # append results # rvr.append(this_rvr) # # from astropy.table import Table # # trvr = Table(rvr) # trvr.write("./trvr.fits", overwrite=True) # trvr.show_in_browser() # # %% # plt.figure() # plt.plot(trvr["snr"], trvr["ccfmax1"], "bo") # plt.plot(trvr["snr"], trvr["ccfmax2"], "ro") # plt.ylim(0, 1) # # %% # plt.figure() # plt.plot(trvr["lmjm"], trvr["rv1_drv_eta"][:, 0], "ro", label="star 1") # plt.plot( # trvr["lmjm"], # trvr["rv1_drv_eta"][:, 0] + trvr["rv1_drv_eta"][:, 1], # "bo", # label="star 2", # ) # plt.legend(loc="right") # plt.xlabel("lmjm") # plt.ylabel("RV[km/s]") # # # if __name__ == "__main__": # pass