-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathSpec.py
136 lines (91 loc) · 5.51 KB
/
Spec.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
import numpy as np
c_sp = 299792458 # Speed of light (m/s)
class Spec:
def __init__(self, wvs, values, units='MJy/sr'):
if wvs.shape[0] != values.shape[0]:
raise AssertionError('The number of wavelength elements must be identical to the number of spectral values.')
else:
self.wvs = wvs # Wavelength grid (µm)
self.values = values # Spectrum values
self.units = units # Spectrum values units
def convert(self, units, px_area=1):
all_units = ['MJy/sr', 'Jy', 'erg s-1 cm-2 Hz-1', 'erg s-1 cm-2 micron-1']
if not isinstance(units, str) or units not in all_units:
raise TypeError('The input units are invalid. They must be one of the following units: MJy/sr, Jy, erg s-1 cm-2 Hz-1, erg s-1 cm-2 micron-1.')
elif not isinstance(px_area, (int, float)) or px_area < 0:
raise AssertionError('The input pixel area is invalid. It must be a positive float in steradian.')
else:
if units == self.units: # Input units as same as self.units
pass
elif self.units == all_units[0]: # MJy/sr
new_values = np.copy(self.values)
if units == all_units[1]: # -> Jy
new_values *= 1e6 * px_area
elif units == all_units[2]: # -> erg s-1 cm-2 Hz-1
new_values *= 1e6 * px_area * 1e-23
elif units == all_units[3]: # -> erg s-1 cm-2 micron-1
new_values *= 1e6 * px_area
new_values *= 1e-23
new_values *= (c_sp*1e6 / (self.wvs)**2)
elif self.units == all_units[1]: # Jy
new_values = np.copy(self.values)
if units == all_units[0]: # -> MJy/sr
new_values /= (1e6 * px_area)
elif units == all_units[2]: # -> erg s-1 cm-2 Hz-1
new_values *= 1e-23
elif units == all_units[3]: # -> erg s-1 cm-2 micron-1
new_values *= 1e-23
new_values *= (c_sp * 1e6 / (self.wvs) ** 2)
elif self.units == all_units[2]: # erg s-1 cm-2 Hz-1
new_values = np.copy(self.values)
if units == all_units[0]: # -> MJy/sr
new_values /= (1e-23 * 1e6 * px_area)
elif units == all_units[1]: # -> Jy
new_values /= 1e-23
elif units == all_units[3]: # -> erg s-1 cm-2 micron-1
new_values *= (c_sp * 1e6 / (self.wvs) ** 2)
elif self.units == all_units[3]: # erg s-1 cm-2 micron-1
new_values = np.copy(self.values)
if units == all_units[0]: # -> MJy/sr
new_values /= (c_sp * 1e6 / (self.wvs) ** 2)
new_values /= (1e-23 * 1e6 * px_area)
elif units == all_units[1]: # -> Jy
new_values /= (c_sp * 1e6 / (self.wvs) ** 2)
new_values /= 1e-23
elif units == all_units[2]: # -> erg s-1 cm-2 Hz-1
new_values /= (c_sp * 1e6 / (self.wvs) ** 2)
self.values = new_values
return
def cut(self, min, max, units='wav', wv_ref=None):
all_units = ['wav', 'vel']
if not isinstance(units, str) or units not in all_units:
raise AssertionError("The units input are invalid. They must correspond to wavelengths or velocities. The following character strings must be specified: 'wav' or 'vel'.")
elif units == all_units[1]:
if wv_ref is None:
raise AssertionError("Bounds are given in velocities. No reference wavelength has been specified for the 'wv_ref' parameter.")
elif not isinstance(wv_ref, (int, float)) or wv_ref <= 0:
raise AssertionError("The reference wavelength is invalid. It must be an integer float and given in km/s.")
else:
rvs = (c_sp * (self.wvs - wv_ref) / wv_ref) / 1000 # km/s
idxs_cut = np.where(np.logical_and(rvs >= min, rvs <= max))
wvs_cut = self.wvs[idxs_cut]
values_cut = self.values[idxs_cut]
spec_cut = Spec(wvs_cut, values_cut, units=self.units)
else:
if units == all_units[0]:
idxs_cut = np.where(np.logical_and(self.wvs >= min, self.wvs <= max))
wvs_cut = self.wvs[idxs_cut]
values_cut = self.values[idxs_cut]
spec_cut = Spec(wvs_cut, values_cut, units=self.units)
return spec_cut
def sub_baseline(self, wv_line, mask_rv=250, deg=2):
rvs = (c_sp * (self.wvs - wv_line) / wv_line) / 1000 # km/s
idxs_line = np.where(np.logical_and(rvs >= -mask_rv, rvs <= mask_rv))
print()
idxs_cont = np.concatenate([np.arange(0,idxs_line[0][0]+1), np.arange(idxs_line[0][-1], np.shape(self.values)[0])])
wvs_cont = self.wvs[idxs_cont]
spec_cont = self.values[idxs_cont]
params = np.polyfit(wvs_cont, spec_cont, deg=deg)
baseline = np.poly1d(params)(self.wvs)
spec_cont_sub = self.values - baseline
return Spec(self.wvs, spec_cont_sub, units=self.units)