-
Notifications
You must be signed in to change notification settings - Fork 271
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Chunk interpolation to select calibration data #2634
base: main
Are you sure you want to change the base?
Changes from 12 commits
3d879b9
c2ce42c
c8f7ef0
bef9677
cd4864f
fb3a9a4
0f0b948
50f2791
90115d6
f772b19
e201356
907b6cd
dc3b24c
3c7e6ab
cad523e
4a032ed
7ae62e2
2997b6c
a6be0ae
8498274
a915d8d
3423bd3
496af8f
3b93770
da86bf6
50b9e84
c11b6f2
b02b526
16aa176
e0c0004
1644388
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
Add ChunkInterpolator to ctapipe.monitoring.interpolation as a tool to select data from chunks. The planned use for this is to select calibration data. |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,15 +4,13 @@ | |
import astropy.units as u | ||
import numpy as np | ||
import tables | ||
from astropy.table import Table | ||
from astropy.time import Time | ||
from scipy.interpolate import interp1d | ||
|
||
from ctapipe.core import Component, traits | ||
|
||
__all__ = [ | ||
"Interpolator", | ||
"PointingInterpolator", | ||
] | ||
__all__ = ["Interpolator", "PointingInterpolator", "ChunkInterpolator"] | ||
|
||
|
||
class Interpolator(Component, metaclass=ABCMeta): | ||
|
@@ -22,7 +20,7 @@ | |
Parameters | ||
---------- | ||
h5file : None | tables.File | ||
A open hdf5 file with read access. | ||
An open hdf5 file with read access. | ||
""" | ||
|
||
bounds_error = traits.Bool( | ||
|
@@ -40,7 +38,7 @@ | |
required_columns = set() | ||
expected_units = {} | ||
|
||
def __init__(self, h5file=None, **kwargs): | ||
def __init__(self, h5file: None | tables.File = None, **kwargs: Any) -> None: | ||
super().__init__(**kwargs) | ||
|
||
if h5file is not None and not isinstance(h5file, tables.File): | ||
|
@@ -60,26 +58,25 @@ | |
self._interpolators = {} | ||
|
||
@abstractmethod | ||
def add_table(self, tel_id, input_table): | ||
def add_table(self, tel_id: int, input_table: Table) -> None: | ||
""" | ||
Add a table to this interpolator | ||
Add a table to this interpolator. | ||
This method reads input tables and creates instances of the needed interpolators | ||
to be added to _interpolators. The first index of _interpolators needs to be | ||
tel_id, the second needs to be the name of the parameter that is to be interpolated | ||
tel_id, the second needs to be the name of the parameter that is to be interpolated. | ||
|
||
Parameters | ||
---------- | ||
tel_id : int | ||
Telescope id | ||
Telescope id. | ||
input_table : astropy.table.Table | ||
Table of pointing values, expected columns | ||
are always ``time`` as ``Time`` column and | ||
other columns for the data that is to be interpolated | ||
other columns for the data that is to be interpolated. | ||
""" | ||
|
||
pass | ||
|
||
def _check_tables(self, input_table): | ||
def _check_tables(self, input_table: Table) -> None: | ||
missing = self.required_columns - set(input_table.colnames) | ||
if len(missing) > 0: | ||
raise ValueError(f"Table is missing required column(s): {missing}") | ||
|
@@ -98,14 +95,14 @@ | |
f"{col} must have units compatible with '{self.expected_units[col].name}'" | ||
) | ||
|
||
def _check_interpolators(self, tel_id): | ||
def _check_interpolators(self, tel_id: int) -> None: | ||
if tel_id not in self._interpolators: | ||
if self.h5file is not None: | ||
self._read_parameter_table(tel_id) # might need to be removed | ||
else: | ||
raise KeyError(f"No table available for tel_id {tel_id}") | ||
|
||
def _read_parameter_table(self, tel_id): | ||
def _read_parameter_table(self, tel_id: int) -> None: | ||
# prevent circular import between io and monitoring | ||
from ..io import read_table | ||
|
||
|
@@ -118,30 +115,30 @@ | |
|
||
class PointingInterpolator(Interpolator): | ||
""" | ||
Interpolator for pointing and pointing correction data | ||
Interpolator for pointing and pointing correction data. | ||
""" | ||
|
||
telescope_data_group = "/dl0/monitoring/telescope/pointing" | ||
required_columns = frozenset(["time", "azimuth", "altitude"]) | ||
expected_units = {"azimuth": u.rad, "altitude": u.rad} | ||
|
||
def __call__(self, tel_id, time): | ||
def __call__(self, tel_id: int, time: Time) -> tuple[u.Quantity, u.Quantity]: | ||
""" | ||
Interpolate alt/az for given time and tel_id. | ||
|
||
Parameters | ||
---------- | ||
tel_id : int | ||
telescope id | ||
Telescope id. | ||
time : astropy.time.Time | ||
time for which to interpolate the pointing | ||
Time for which to interpolate the pointing. | ||
|
||
Returns | ||
------- | ||
altitude : astropy.units.Quantity[deg] | ||
interpolated altitude angle | ||
Interpolated altitude angle. | ||
azimuth : astropy.units.Quantity[deg] | ||
interpolated azimuth angle | ||
Interpolated azimuth angle. | ||
""" | ||
|
||
self._check_interpolators(tel_id) | ||
|
@@ -151,14 +148,14 @@ | |
alt = u.Quantity(self._interpolators[tel_id]["alt"](mjd), u.rad, copy=False) | ||
return alt, az | ||
|
||
def add_table(self, tel_id, input_table): | ||
def add_table(self, tel_id: int, input_table: Table) -> None: | ||
""" | ||
Add a table to this interpolator | ||
Add a table to this interpolator. | ||
|
||
Parameters | ||
---------- | ||
tel_id : int | ||
Telescope id | ||
Telescope id. | ||
input_table : astropy.table.Table | ||
Table of pointing values, expected columns | ||
are ``time`` as ``Time`` column, ``azimuth`` and ``altitude`` | ||
|
@@ -186,3 +183,108 @@ | |
self._interpolators[tel_id] = {} | ||
self._interpolators[tel_id]["az"] = interp1d(mjd, az, **self.interp_options) | ||
self._interpolators[tel_id]["alt"] = interp1d(mjd, alt, **self.interp_options) | ||
|
||
|
||
class ChunkInterpolator(Interpolator): | ||
""" | ||
Simple interpolator for overlapping chunks of data. | ||
""" | ||
|
||
required_columns = frozenset(["start_time", "end_time"]) | ||
|
||
def __call__( | ||
self, tel_id: int, time: Time, columns: str | list[str] | ||
) -> float | dict[str, float]: | ||
""" | ||
Interpolate overlapping chunks of data for a given time, tel_id, and column(s). | ||
|
||
Parameters | ||
---------- | ||
tel_id : int | ||
Telescope id. | ||
time : astropy.time.Time | ||
Time for which to interpolate the data. | ||
columns : str or list of str | ||
Name(s) of the column(s) to interpolate. | ||
|
||
Returns | ||
------- | ||
interpolated : float or dict | ||
Interpolated data for the specified column(s). | ||
""" | ||
|
||
self._check_interpolators(tel_id) | ||
|
||
if isinstance(columns, str): | ||
columns = [columns] | ||
|
||
result = {} | ||
mjd = time.to_value("mjd") | ||
for column in columns: | ||
if column not in self._interpolators[tel_id]: | ||
raise ValueError( | ||
f"Column '{column}' not found in interpolators for tel_id {tel_id}" | ||
) | ||
result[column] = self._interpolators[tel_id][column](mjd) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why use Why not just call There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
|
||
if len(result) == 1: | ||
return result[columns[0]] | ||
return result | ||
|
||
def add_table(self, tel_id: int, input_table: Table, columns: list[str]) -> None: | ||
""" | ||
Add a table to this interpolator for specific columns. | ||
|
||
Parameters | ||
---------- | ||
tel_id : int | ||
Telescope id. | ||
input_table : astropy.table.Table | ||
Table of values to be interpolated, expected columns | ||
are ``start_time`` as ``validity start Time`` column, | ||
``end_time`` as ``validity end Time`` and the specified columns | ||
for the data of the chunks. | ||
columns : list of str | ||
Names of the columns to interpolate. | ||
""" | ||
|
||
required_columns = set(self.required_columns) | ||
required_columns.update(columns) | ||
self.required_columns = frozenset(required_columns) | ||
self._check_tables(input_table) | ||
|
||
input_table = input_table.copy() | ||
input_table.sort("start_time") | ||
start_time = input_table["start_time"].to_value("mjd") | ||
end_time = input_table["end_time"].to_value("mjd") | ||
|
||
if tel_id not in self._interpolators: | ||
self._interpolators[tel_id] = {} | ||
|
||
for column in columns: | ||
values = input_table[column] | ||
|
||
def interpolate_chunk( | ||
mjd: float, start_time=start_time, end_time=end_time, values=values | ||
) -> float: | ||
# Find the index of the closest preceding start time | ||
preceding_index = np.searchsorted(start_time, mjd, side="right") - 1 | ||
if preceding_index < 0: | ||
return np.nan | ||
|
||
# Check if the time is within the valid range of the chunk | ||
if start_time[preceding_index] <= mjd <= end_time[preceding_index]: | ||
value = values[preceding_index] | ||
if not np.isnan(value): | ||
return value | ||
|
||
# If the closest preceding chunk has nan, check the next closest chunk | ||
for i in range(preceding_index - 1, -1, -1): | ||
if start_time[i] <= mjd <= end_time[i]: | ||
value = values[i] | ||
if not np.isnan(value): | ||
return value | ||
|
||
return np.nan | ||
|
||
self._interpolators[tel_id][column] = interpolate_chunk | ||
ctoennis marked this conversation as resolved.
Show resolved
Hide resolved
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is an immutable type that you "mutate" further down in the code. Please don't do this, if you want to extend it with the value column, particular for an object, just make a copy and to the object attribute and modify that one.