Source code for jetset.jet_spectral_components


__author__ = "Andrea Tramacere"


import numpy as np
import os
from astropy.table import Table
from numpy.core._multiarray_umath import zeros, log10
from scipy import interpolate

on_rtd = os.environ.get('READTHEDOCS', None) == 'True'

if on_rtd == True:
    try:
        from .jetkernel import jetkernel as BlazarSED
    except ImportError:
        from .mock import jetkernel as BlazarSED
else:
    from .jetkernel import jetkernel as BlazarSED

from . import spectral_shapes
from .jetkernel_models_dic import nuFnu_obs_dict, n_seed_dic
from .plot_sedfit import PlotSpecComp,PlotSeedPhotons
from .utils import check_frame, unexpected_behaviour

__all__=['JetSeedPhotons','JetSpecComponent','SpecCompList']

[docs] class JetSeedPhotons(object): """ """ def __init__(self,name,blob_object,var_name=None): self.name = name self._blob_object = blob_object self._n_name, self._nu_name = n_seed_dic[self.name] self.n_ptr = getattr(blob_object, self._n_name) self.nu_ptr = getattr(blob_object, self._nu_name) #self.SED = spectral_shapes.SED(name=self.name) if var_name is not None: self._var_name=var_name self.fill(emiss_lim=self._blob_object.emiss_lim)
[docs] def fill(self,log_log=False,emiss_lim=0): self.nu,self.n=self.get_spectral_points(log_log=log_log,emiss_lim=emiss_lim)
[docs] def get_spectral_points(self,log_log=False,emiss_lim=0): #try: size=self._blob_object.nu_grid_size x=zeros(size) y=zeros(size) for i in range(size): x[i]=BlazarSED.get_spectral_array(self.nu_ptr,self._blob_object,i) y[i]=BlazarSED.get_spectral_array(self.n_ptr,self._blob_object,i) #print("->%e %e"%(x[i],y[i])) msk_nan=np.isnan(x) msk_nan+=np.isnan(y) #print('emiss lim',self.get_emiss_lim()) x[msk_nan]=0. y[msk_nan]=emiss_lim msk=y<emiss_lim y[msk]=emiss_lim if log_log==True: msk = y <= 0. y[msk] = emiss_lim #x=x[msk] # y=y[msk] x=log10(x) y=log10(y) return x,y
#except: # raise RuntimeError ('model evaluation failed in get_spectral_points')
[docs] def plot(self, y_min=None,y_max=None): self.fill(emiss_lim=self._blob_object.emiss_lim) p=PlotSeedPhotons() p.plot(nu=self.nu,nuFnu=self.n,y_min=y_min,y_max=y_max) return p
[docs] class JetSpecComponent(object): """ """ def __repr__(self): return str(self.show()) #def __str__(self): # return str(self.show()) def __init__(self,jet_obj,name,blob_object,var_name=None,state_dict=None,state=None): self.name=name self.jet_obj=jet_obj self._blob_object=blob_object self._nuFnu_name, self._nu_name=nuFnu_obs_dict[self.name] self.nuFnu_ptr=getattr(blob_object,self._nuFnu_name) self.nu_ptr=getattr(blob_object,self._nu_name) self.SED=spectral_shapes.SED(name=self.name,beaming=jet_obj.get_beaming()) self.seed_field=None # self._nu_start_src_name, self._nu_stop_src_name = nu_src_start_stop_dict[self.name] # # self.nu_ptr_start = getattr(blob_object, self._nu_name) # self.nu_ptr_stop = getattr(blob_object, self._nu_name) # # self._nu_start_src = 'auto' # self._nu_stop_src = 'auto' # self._nu_start_obs = 'auto' # self._nu_stop_obs = 'auto' if name in n_seed_dic.keys(): self.seed_field=JetSeedPhotons(name,blob_object) if var_name is not None: self._var_name=var_name if state_dict is None: self._state_dict = dict() self._state_dict['on'] = 1 self._state_dict['off'] = 0 else: self._state_dict=state_dict self.state='on' else: self._state_dict = {} self._var_name=None self._state='on' if state is not None and self._state_dict != {}: self.state=state # @property # def nu_boundaries(self,frame='obs'): # check_frame(frame) # if frame == 'src': # return self._nu_start_src, self._nu_stop_src # else: # return self._nu_start_obs, self._nu_stop_obs # # @nu_boundaries.setter # def nu_boundaries(self,nu_start=None,nu_stop=None, frame='obs'): # check_frame(frame) # if frame == 'obs': # self._nu_start_obs = nu # self._nu_start_src = convert_nu_to_src(nu,self.jet_obj.get_par_by_type('redshift').val,'obs') # else: # self._nu_start_src = nu # self._nu_start_obs = convert_nu_to_src(nu, self.jet_obj.get_par_by_type('redshift').val, 'obs')
[docs] def get_emiss_lim(self,seed=False): return self._blob_object.emiss_lim
[docs] def fill_SED(self,log_log=False,lin_nu=None,skip_zeros=False): x,y=self.get_SED_points( log_log=log_log,lin_nu=lin_nu,skip_zeros=skip_zeros) self.SED.beaming=self.jet_obj.get_beaming() self.SED.fill(nu=x,nuFnu=y,log_log=log_log) self.SED.fill_nuLnu(z=self.jet_obj.get_par_by_type('redshift').val,dl=self.jet_obj.get_DL_cm()) if self.seed_field is not None: self.seed_field.fill(log_log=log_log)
[docs] def get_SED_points(self, log_log=False, lin_nu=None,interp='linear',skip_zeros=False): size = self._blob_object.nu_grid_size x = zeros(size) y = zeros(size) for i in range(size): x[i] = BlazarSED.get_spectral_array(self.nu_ptr, self._blob_object, i) y[i] = BlazarSED.get_spectral_array(self.nuFnu_ptr, self._blob_object, i) msk_nan = np.isnan(x) msk_nan += np.isnan(y) x[msk_nan] = 0. y[msk_nan] = self.get_emiss_lim() msk = y < self.get_emiss_lim() y[msk] = self.get_emiss_lim() msk_zeros = y > self.get_emiss_lim() if lin_nu is not None: #f_interp=interpolate.Akima1DInterpolator(log10(x), log10(y)) f_interp = interpolate.interp1d(log10(x), log10(y), bounds_error=False, kind=interp) y = np.power(10., f_interp(log10(lin_nu))) x=lin_nu msk_nan = np.isnan(y) y[msk_nan] = 0. msk_zeros = y > self.get_emiss_lim() y[~msk_zeros] = 0. if log_log == True: msk = y <= 0. y[msk] = -1.0E10 msk_zeros = y > self.get_emiss_lim() x = log10(x) y = log10(y) if skip_zeros is True: _x = x[msk_zeros] _y = y[msk_zeros] else: _x = x _y = y return _x, _y
[docs] def update(self): size = self._blob.nu_grid_size x = zeros(size) y = zeros(size) for i in range(size): x[i] = BlazarSED.get_spectral_array(self.nu_ptr, self._blob, i) y[i] = BlazarSED.get_spectral_array(self.nuFnu_ptr, self._blob, i)
[docs] def show(self): print('name :',self.name) print('var name :',self._var_name) print('state :',self._state) #print('nu_start (src frame):', self._nu_start_src) #print('nu_stop (src frame):', self._nu_stop_src) if self._state_dict is not None: print('allowed states :',[k for k in self._state_dict.keys()])
@property def state(self,): return self._state @state.setter def state(self, val): if self._state_dict!={}: if val not in self._state_dict.keys(): raise RuntimeError('val', val, 'not in allowed', self._state_dict.keys()) self._state = val if self._var_name is not None: setattr(self._blob_object, self._var_name, self._state_dict[val]) else: raise Warning('the state of the spectral component',self.name,' can not be changed')
[docs] def get_var_state(self,): if self._var_name is not None: return getattr(self._blob_object,self._var_name) else: return None
[docs] def plot(self, y_min=None,y_max=None): p=PlotSpecComp() p.plot(nu=self.SED.nu.value,nuFnu=self.SED.nuFnu.value,y_min=y_min,y_max=y_max) return p
[docs] class SpecCompList(object): def __init__(self,sc_list): self._sc_list=sc_list self._table=None
[docs] def show(self): for sc in self._sc_list: sc.show()
def __repr__(self): return str(self.show())
[docs] def build_table(self, restframe='obs'): _names = ['nu'] _cols=[] check_frame(restframe) if restframe=='obs': _cols.append(self._sc_list[0].SED.nu) elif restframe=='src': _cols.append(self._sc_list[0].SED.nu_src) else: unexpected_behaviour() for ID,sc in enumerate(self._sc_list): _names.append(sc.name) if restframe == 'obs': _cols.append(sc.SED.nuFnu) else: _cols.append(sc.SED.nuLnu_src) _meta=dict(src_name=sc.jet_obj.name) _meta['redshift']=sc.jet_obj.get_par_by_type('redshift').val _meta['restframe']= restframe self._table = Table(_cols, names=_names,meta=_meta)
[docs] def get_spectral_component_by_name(self,name,verbose=True): for i in range(len(self._sc_list)): if self._sc_list[i].name==name: return self._sc_list[i] else: if verbose==True: print ("no spectral components with name %s found"%name)
@property def table(self): return self._table