Source code for laspec.line_index

# -*- coding: utf-8 -*-
import collections
import os

import matplotlib.pyplot as plt
import numpy as np
from lmfit.models import LinearModel, GaussianModel

from .read_spectrum import read_spectrum


[docs] def integrate_spectrum(wave, flux_norm, flux_norm_err=None, mask=None, nmc=50, wave_range=(6554, 6574), suffix="Ha"): if not suffix == "": suffix = "_{}".format(suffix) rew = dict() ind_range = np.where(np.logical_and(wave > wave_range[0], wave < wave_range[1]))[0] npix_range = len(ind_range) if wave[0] < wave_range[0] < wave_range[1] < wave[-1] and npix_range >= 3: # good data wave_diff = np.diff(wave[ind_range]) wave_diff = np.hstack((wave_diff[0], (wave_diff[1:] + wave_diff[:-1]) / 2, wave_diff[-1])) rew["EW{}".format(suffix)] = np.sum((1 - flux_norm[ind_range]) * wave_diff) if flux_norm_err is not None: # evaluate pct noise = np.random.normal(flux_norm[ind_range], flux_norm_err[ind_range], (nmc, npix_range)) rew["EW16{}".format(suffix)], rew["EW50{}".format(suffix)], rew["EW84{}".format(suffix)] = \ np.percentile(np.sum((1 - noise) * wave_diff, axis=1), [16, 50, 84]) else: rew["EW16{}".format(suffix)] = np.nan rew["EW50{}".format(suffix)] = np.nan rew["EW84{}".format(suffix)] = np.nan if mask is not None: rew["EWnbad{}".format(suffix)] = np.sum(mask[ind_range] > 0) else: rew["EWnbad{}".format(suffix)] = 0 return rew else: # bad data rew["EW{}".format(suffix)] = np.nan rew["EW16{}".format(suffix)] = np.nan rew["EW50{}".format(suffix)] = np.nan rew["EW84{}".format(suffix)] = np.nan rew["EWnbad{}".format(suffix)] = 0 return rew
# old code below ---- # should consider whether to maintain filepath arg # since the plot function could be replaced using recover
[docs] def measure_line_index(wave, flux, flux_err=None, mask=None, z=None, line_info=None, num_refit=(100, None), filepath=None, return_type='dict', verbose=False): """ Measure line index / EW and have it plotted Parameters ---------- wave: array-like wavelength vector flux: array-like flux vector flux_err: array-like flux error vector (optional) If un-specified, auto-generate an np.ones array mask: array-like andmask or ormask (optional) If un-specified, auto-generate an np.ones array (evenly weighted) line_info: dict information about spectral line, eg: >>> line_info_dib5780 = {'line_center': 5780, >>> 'line_range': (5775, 5785), >>> 'line_shoulder_left': (5755, 5775), >>> 'line_shoulder_right': (5805, 5825)} num_refit: non-negative integer number of refitting. If 0, no refit will be performed If positive, refits will be performed after adding normal random noise z: float redshift (only specify when z is large) filepath: string path of the diagnostic figure if None, do nothing, else print diagnostic figure return_type: string 'dict' or 'array' if 'array', np.array(return dict.values()) verbose: bool if True, print details Returns ------- line_indx: dict A dictionary type result of line index. If any problem encountered, return the default result (filled with nan). """ try: # 0. do some input check # 0.1> check line_info line_info_keys = line_info.keys() assert 'line_range' in line_info_keys assert 'line_shoulder_left' in line_info_keys assert 'line_shoulder_right' in line_info_keys # 0.2> check line range/shoulder in spectral range assert np.min(wave) <= line_info['line_shoulder_left'][0] assert np.max(wave) >= line_info['line_shoulder_right'][0] # 1. get line information # line_center = line_info['line_center'] # not used line_range = line_info['line_range'] line_shoulder_left = line_info['line_shoulder_left'] line_shoulder_right = line_info['line_shoulder_right'] # 2. data preparation # 2.1> shift spectra to rest-frame wave = np.array(wave) flux = np.array(flux) if z is not None: wave /= 1. + z # 2.2> generate flux_err and mask if un-specified if flux_err == None: flux_err = np.ones(wave.shape) if mask == None: mask = np.zeros(wave.shape) mask_ = np.zeros(wave.shape) ind_mask = np.all([mask!=0],axis=0) mask_[ind_mask] = 1 mask = mask_ # 3. estimate the local continuum # 3.1> shoulder wavelength range ind_shoulder = np.any([ np.all([wave > line_shoulder_left[0], wave < line_shoulder_left[1]], axis=0), np.all([wave > line_shoulder_right[0], wave < line_shoulder_right[1]], axis=0)], axis=0) wave_shoulder = wave[ind_shoulder] flux_shoulder = flux[ind_shoulder] # 3.2> integrated/fitted wavelength range ind_range = np.logical_and(wave > line_range[0], wave < line_range[1]) wave_range = wave[ind_range] flux_range = flux[ind_range] # flux_err_range = flux_err[ind_range] # not used mask_range = mask[ind_range] flux_err_shoulder = flux_err[ind_shoulder] # mask_shoulder = mask[ind_shoulder] # not used # 4. linear model mod_linear = LinearModel(prefix='mod_linear_') par_linear = mod_linear.guess(flux_shoulder, x=wave_shoulder) # ############################################# # # to see the parameter names: # # model_linear.param_names # # {'linear_fun_intercept', 'linear_fun_slope'} # # ############################################# # out_linear = mod_linear.fit(flux_shoulder, par_linear, x=wave_shoulder, method='leastsq') # 5. estimate continuum cont_shoulder = out_linear.best_fit noise_std = np.std(flux_shoulder / cont_shoulder) cont_range = mod_linear.eval(out_linear.params, x=wave_range) resi_range = 1 - flux_range / cont_range # 6.1 Integrated EW ( # estimate EW_int wave_diff = np.diff(wave_range) wave_step = np.mean(np.vstack([np.hstack([wave_diff[0], wave_diff]), np.hstack([wave_diff, wave_diff[-1]])]), axis=0) EW_int = np.dot(resi_range, wave_step) # estimate EW_int_err num_refit_ = num_refit[0] if num_refit_ is not None and num_refit_>0: EW_int_err = np.std(np.dot( (resi_range.reshape(1, -1).repeat(num_refit_, axis=0) + np.random.randn(num_refit_, resi_range.size) * noise_std), wave_step)) # 6.2 Gaussian model # estimate EW_fit mod_gauss = GaussianModel(prefix='mod_gauss_') par_gauss = mod_gauss.guess(resi_range, x=wave_range) out_gauss = mod_gauss.fit(resi_range, par_gauss, x=wave_range) line_indx = collections.OrderedDict([ ('SN_local_flux_err', np.median(flux_shoulder / flux_err_shoulder)), ('SN_local_flux_std', 1. / noise_std), ('num_bad_pixel', np.sum(mask_range != 0)), ('EW_int', EW_int), ('EW_int_err', EW_int_err), ('mod_linear_slope', out_linear.params[mod_linear.prefix + 'slope'].value), ('mod_linear_slope_err', out_linear.params[mod_linear.prefix + 'slope'].stderr), ('mod_linear_intercept', out_linear.params[mod_linear.prefix + 'intercept'].value), ('mod_linear_intercept_err', out_linear.params[mod_linear.prefix + 'intercept'].stderr), ('mod_gauss_amplitude', out_gauss.params[mod_gauss.prefix + 'amplitude'].value), ('mod_gauss_amplitude_err', out_gauss.params[mod_gauss.prefix + 'amplitude'].stderr), ('mod_gauss_center', out_gauss.params[mod_gauss.prefix + 'center'].value), ('mod_gauss_center_err', out_gauss.params[mod_gauss.prefix + 'center'].stderr), ('mod_gauss_sigma', out_gauss.params[mod_gauss.prefix + 'sigma'].value), ('mod_gauss_sigma_err', out_gauss.params[mod_gauss.prefix + 'sigma'].stderr), ('mod_gauss_amplitude_std', np.nan), ('mod_gauss_center_std', np.nan), ('mod_gauss_sigma_std', np.nan)]) # estimate EW_fit_err num_refit_ = num_refit[1] if num_refit_ is not None and num_refit_ > 2: # {'mod_gauss_amplitude', # 'mod_gauss_center', # 'mod_gauss_fwhm', # 'mod_gauss_sigma'} out_gauss_refit_amplitude = np.zeros(num_refit_) out_gauss_refit_center = np.zeros(num_refit_) out_gauss_refit_sigma = np.zeros(num_refit_) # noise_fit = np.random.randn(num_refit,resi_range.size)*noise_std for i in range(int(num_refit_)): # resi_range_with_noise = resi_range + noise_fit[i,:] resi_range_with_noise = resi_range + \ np.random.randn(resi_range.size) * noise_std out_gauss_refit = mod_gauss.fit(resi_range_with_noise, par_gauss, x=wave_range) out_gauss_refit_amplitude[i],\ out_gauss_refit_center[i],\ out_gauss_refit_sigma[i] =\ out_gauss_refit.params[mod_gauss.prefix + 'amplitude'].value,\ out_gauss_refit.params[mod_gauss.prefix + 'center'].value,\ out_gauss_refit.params[mod_gauss.prefix + 'sigma'].value print(out_gauss_refit_amplitude[i], out_gauss_refit_center[i], out_gauss_refit_sigma[i]) line_indx.update([ ('mod_gauss_amplitude_std', np.nanstd(out_gauss_refit_amplitude)), ('mod_gauss_center_std', np.nanstd(out_gauss_refit_center)), ('mod_gauss_sigma_std', np.nanstd(out_gauss_refit_sigma)) ]) # 7. plot and save image if filepath is not None and os.path.exists(os.path.dirname(filepath)): save_image_line_indice(filepath, wave, flux, ind_range, cont_range, ind_shoulder, line_info) # if necessary, convert to array # NOTE: for a non-ordered dict the order of keys and values may change! if return_type == 'array': return np.array(line_indx.values()) return line_indx except Exception: return measure_line_index_null_result(return_type)
[docs] def measure_line_index_null_result(return_type): """generate default value (nan/False) when measurement fails Returns ------- default value (nan/False) """ line_indx = collections.OrderedDict([ ('SN_local_flux_err', np.nan), ('SN_local_flux_std', np.nan), ('num_bad_pixel', np.nan), ('EW_int', np.nan), ('EW_int_err', np.nan), ('mod_linear_slope', np.nan), ('mod_linear_slope_err', np.nan), ('mod_linear_intercept', np.nan), ('mod_linear_intercept_err', np.nan), ('mod_gauss_amplitude', np.nan), ('mod_gauss_amplitude_err', np.nan), ('mod_gauss_center', np.nan), ('mod_gauss_center_err', np.nan), ('mod_gauss_sigma', np.nan), ('mod_gauss_sigma_err', np.nan), ('mod_gauss_amplitude_std', np.nan), ('mod_gauss_center_std', np.nan), ('mod_gauss_sigma_std', np.nan)]) if return_type == 'array': return np.array(line_indx.values()) return line_indx
# pure fit: 100 loops, best of 3: 8.06 ms per loop (1 int + 1 fit) # 1000 re-fit: 1 loops, best of 3: 378 ms per loop (1 int + 1 fit + 100 re-fit)
[docs] def measure_line_index_loopfun(filepath): """loopfun for measuring line index Parameters ---------- filepath: string path of the spec document Returns ------- several line_indx: tuple every line_indx is a dictionary type result of line index. """ num_refit = 100, None return_type = 'array' line_info_dib5780 = {'line_center': 5780, 'line_range': (5775, 5785), 'line_shoulder_left': (5755, 5775), 'line_shoulder_right': (5805, 5825)} line_info_dib5797 = {'line_center': 5797, 'line_range': (5792, 5802), 'line_shoulder_left': (5755, 5775), 'line_shoulder_right': (5805, 5825)} line_info_dib6284 = {'line_center': 6285, 'line_range': (6280, 6290), 'line_shoulder_left': (6260, 6280), 'line_shoulder_right': (6310, 6330)} try: # read spectrum # ------------- spec = read_spectrum(filepath, 'auto') # measure DIBs # ------------ # DIB5780 line_indx_dib5780 = measure_line_index(wave=spec['wave'], flux=spec['flux'], flux_err=spec['flux_err'], mask=spec['and_mask'], line_info=line_info_dib5780, num_refit=num_refit, return_type=return_type, z=0) # DIB5797 line_indx_dib5797 = measure_line_index(wave=spec['wave'], flux=spec['flux'], flux_err=spec['flux_err'], mask=spec['and_mask'], line_info=line_info_dib5797, num_refit=num_refit, return_type=return_type, z=0) # DIB6284 line_indx_dib6284 = measure_line_index(wave=spec['wave'], flux=spec['flux'], flux_err=spec['flux_err'], mask=spec['and_mask'], line_info=line_info_dib6284, num_refit=num_refit, return_type=return_type, z=0) return line_indx_dib5780, line_indx_dib5797, line_indx_dib6284 except Exception: return (measure_line_index_null_result(return_type), measure_line_index_null_result(return_type), measure_line_index_null_result(return_type))
[docs] def measure_line_index_recover_spectrum(wave, params, norm=False): """ recover the fitted line profile from params Parameters ---------- wave: array-like the wavelength to which the recovered flux correspond params: 5-element tuple the 1 to 5 elements are: mod_linear_slope mod_linear_intercept mod_gauss_amplitude mod_gauss_center mod_gauss_sigma norm: bool if True, linear model (continuum) is deprecated else linear + Gaussian model is used """ from lmfit.models import LinearModel, GaussianModel mod_linear = LinearModel(prefix='mod_linear_') mod_gauss = GaussianModel(prefix='mod_gauss_') par_linear = mod_linear.make_params() par_gauss = mod_gauss.make_params() par_linear['mod_linear_slope'].value = params[0] par_linear['mod_linear_intercept'].value = params[1] par_gauss['mod_gauss_amplitude'].value = params[2] par_gauss['mod_gauss_center'].value = params[3] par_gauss['mod_gauss_sigma'].value = params[4] if not norm: flux = 1 - mod_gauss.eval(params=par_gauss, x=wave) else: flux = \ (1 - mod_gauss.eval(params=par_gauss, x=wave)) * \ mod_linear.eval(params=par_linear, x=wave) return flux
[docs] def save_image_line_indice(filepath, wave, flux, ind_range, cont_range, ind_shoulder, line_info): """Plot a line indice and save it as a .png document. Parameters ---------- filepath: string path of the spec document wave: array wavelength vector flux: array flux vector ind_range: array bool indicating the middle range of a particular line cont_range: array continuum flux of the middle range derived from linear model ind_shoulder: array bool indicating the shoulder range of a particular line line_info: dict information about spectral line, eg: >>> line_info_dib5780 = {'line_center': 5780, >>> 'line_range': (5775, 5785), >>> 'line_shoulder_left': (5755, 5775), >>> 'line_shoulder_right': (5805, 5825)} """ filename = os.path.basename(filepath) fig = plt.figure() plt.plot(wave[ind_range], flux[ind_range], 'r-') plt.plot(wave[ind_range], cont_range, 'b-') plt.plot(wave[ind_shoulder], flux[ind_shoulder], 'm-') plt.title(r'line' + str(line_info['line_center']) + r'of ' + filename) fig.savefig(filepath)
[docs] def test_(): # filepath = walk_dir() # filesource = 'auto' filepath = r'/pool/lamost/dr2/spectra/fits/F5902/spec-55859-F5902_sp01-001.fits' filesource = 'lamost_dr2' spec = read_spectrum(filepath, filesource) # 10 loops, best of 3: 35.7 ms per loop # line_indx_pack = measure_line_index_loopfun(filepath) z = 0.00205785 line_info_dib6284 = {'line_center': 6285, 'line_range': (6280, 6290), 'line_shoulder_left': (6260, 6280), 'line_shoulder_right': (6310, 6330)} line_indx = measure_line_index(wave=spec['wave'], flux=spec['flux'], flux_err=spec['flux_err'], mask=spec['and_mask'], line_info=line_info_dib6284, num_refit=(100, 0), return_type='dict', z=z) for key in line_indx.keys(): print (key, line_indx[key]) print(np.sum(np.isnan(line_indx.values()))) ''' 45 ms for integration and other procedures 380 ms for 100 refits In the fastest way (45ms), run 40 line indices on 4 million spectra: 0.045*40*4E6/24/86400 ~ 3.5 days In the slowest way (380ms) 0.420*40*4E6/24/86400 ~ 32.5 days '''
# I don't think this function should be implemented here, # it could be useful if under the bopy.core package
[docs] def walk_dir(dirpath): """ enumerate all files under dirpath Parameters ---------- dirpath: string the directory to be walked in Returns ------- filename: list filepaths of all the spectra in finder dirpath """ filename_list = [] for parent, dirnames, filenames in os.walk(dirpath): filename_list.extend([os.path.join(parent, filename) for filename in filenames]) n = len(filename_list) filename_list = filename_list[1:n+1] return filename_list
# for fucntions below, consider whether it is need to be here # #############################################################################
[docs] def test_measure_line_index(): filepath = walk_dir('') n = len(filepath) line_indx_star = [[]for i in range(3)] for i in range(n): line_indx = measure_line_index_loopfun(filepath[i]) line_indx_star[0].append(line_indx[0]) line_indx_star[1].append(line_indx[1]) line_indx_star[2].append(line_indx[2]) return line_indx_star
[docs] def get_equivalent_width(line_indx_star): EW = [[] for i in range(3)] n = len(line_indx_star[0]) for i in range(3): for j in range(n): EW[i].append(line_indx_star[i][j]['EW_int']) return EW
[docs] def plot_equivalent_width_hist(EW_star): titles = ["5780", "5797", "6285"] fig, axes = plt.subplots(1, 3, figsize=(8, 8)) for i in range(3): ax = axes[0, i] ax.hist(EW_star[i], facecolor='red', alpha=0.5) ax.set_xlabel('equivalent width') ax.set_ylabel('number') ax.set_title('Histogram of equivalent width_line'+titles[i]) plt.tight_layout() plt.show()
[docs] def plot_line_indices(EW_star): titles = ["5780", "5797", "6285"] fig, axes = plt.subplots(3, 3, figsize=(64, 64)) for i in range(3): for j in range(i+1): ax = axes[i, j] ax.set_title(titles[i]+" - "+titles[j], fontsize = 8) ax.set_ylabel(titles[i], fontsize=8) ax.set_xlabel(titles[j], fontsize=8) ax.plot(EW_star[j], EW_star[i], 'ob',markersize=3, alpha=0.5) plt.tight_layout()
# ############################################################################# # %% test if __name__ == '__main__': # line_indx_star = test_measure_line_index() # EW_star = get_equivalent_width(line_indx_star) # plot_equivalent_width_hist(EW_star) # plot_line_indices(EW_star) test_()