From c4e6d5370ed13f61eb13eb5a5e0bb91bf27de324 Mon Sep 17 00:00:00 2001 From: Christoph Toennis Date: Wed, 24 Jul 2024 13:00:11 +0200 Subject: [PATCH] I changed the text to remove duplicate code --- src/ctapipe/io/interpolating.py | 11 +++- src/ctapipe/io/tests/test_interpolator.py | 63 +++++++---------------- 2 files changed, 28 insertions(+), 46 deletions(-) diff --git a/src/ctapipe/io/interpolating.py b/src/ctapipe/io/interpolating.py index 2608d5d5756..afabc513e54 100644 --- a/src/ctapipe/io/interpolating.py +++ b/src/ctapipe/io/interpolating.py @@ -107,14 +107,14 @@ def add_table(self, tel_id, input_table): def __call__(self, tel_id, time): if tel_id not in self._interpolators: if self.h5file is not None: - self._read_parameter_table(tel_id) + self._read_parameter_table(tel_id) # might need to be removed else: raise KeyError(f"No table available for tel_id {tel_id}") + @abstractmethod def _read_parameter_table(self, tel_id): pass # this method is called when you try to interpolate, but no table has been added. # It will try adding a table from the hdf5 file specified in __init__ - # Each type of interpolator will need to get the data from a different point in the file class PointingInterpolator(Interpolator): @@ -244,3 +244,10 @@ def add_table(self, tel_id, input_table, par_name="pedestal"): time = input_table["time"] cal = input_table[par_name] self._interpolators[tel_id] = StepFunction(time, cal, **self.interp_options) + + def _read_parameter_table(self, tel_id): + input_table = read_table( + self.h5file, + f"/dl0/calibration/tel_{tel_id:03d}", # needs to be updated once we determine where calibration data is put, might remove method from base class + ) + self.add_table(tel_id, input_table) diff --git a/src/ctapipe/io/tests/test_interpolator.py b/src/ctapipe/io/tests/test_interpolator.py index 2d0777ef44b..1fa9ca52972 100644 --- a/src/ctapipe/io/tests/test_interpolator.py +++ b/src/ctapipe/io/tests/test_interpolator.py @@ -5,48 +5,13 @@ from astropy.table import Table from astropy.time import Time -t0 = Time("2022-01-01T00:00:00") - - -def test_simple(): - """Test interpolator""" - from ctapipe.io.interpolating import CalibrationInterpolator, PointingInterpolator - - table = Table( - { - "time": t0 + np.arange(0.0, 10.1, 2.0) * u.s, - "azimuth": np.linspace(0.0, 10.0, 6) * u.deg, - "altitude": np.linspace(70.0, 60.0, 6) * u.deg, - }, - ) - - table_cal = Table( - { - "time": np.arange(0.0, 10.1, 2.0), - "pedestal": np.reshape(np.random.normal(4.0, 1.0, 1850 * 6), (6, 1850)), - }, - ) - - interpolator = PointingInterpolator() - interpolator_cal = CalibrationInterpolator() - interpolator.add_table(1, table) - interpolator_cal.add_table(1, table_cal, "pedestal") +from ctapipe.io.interpolating import CalibrationInterpolator, PointingInterpolator - alt, az = interpolator(tel_id=1, time=t0 + 1 * u.s) - assert u.isclose(alt, 69 * u.deg) - assert u.isclose(az, 1 * u.deg) - - pedestal = interpolator_cal(tel_id=1, time=1.0) - assert all(pedestal == table_cal["pedestal"][0]) - with pytest.raises(KeyError): - interpolator(tel_id=2, time=t0 + 1 * u.s) - with pytest.raises(KeyError): - interpolator_cal(tel_id=2, time=1.0) +t0 = Time("2022-01-01T00:00:00") def test_azimuth_switchover(): """Test pointing interpolation""" - from ctapipe.io.interpolating import PointingInterpolator table = Table( { @@ -66,7 +31,6 @@ def test_azimuth_switchover(): def test_invalid_input(): """Test invalid pointing tables raise nice errors""" - from ctapipe.io.interpolating import PointingInterpolator wrong_time = Table( { @@ -102,8 +66,8 @@ def test_invalid_input(): def test_hdf5(tmp_path): + """Test writing interpolated data to file""" from ctapipe.io import write_table - from ctapipe.io.interpolating import PointingInterpolator table = Table( { @@ -124,7 +88,7 @@ def test_hdf5(tmp_path): def test_bounds(): """Test invalid pointing tables raise nice errors""" - from ctapipe.io.interpolating import CalibrationInterpolator, PointingInterpolator + from ctapipe.io.interpolating import PointingInterpolator table_pointing = Table( { @@ -157,6 +121,17 @@ def test_bounds(): with pytest.raises(ValueError, match="above the interpolation range"): interpolator_pointing(tel_id=1, time=t0 + 10.2 * u.s) + alt, az = interpolator_pointing(tel_id=1, time=t0 + 1 * u.s) + assert u.isclose(alt, 69 * u.deg) + assert u.isclose(az, 1 * u.deg) + + pedestal = interpolator_cal(tel_id=1, time=1.0) + assert all(pedestal == table_cal["pedestal"][0]) + with pytest.raises(KeyError): + interpolator_pointing(tel_id=2, time=t0 + 1 * u.s) + with pytest.raises(KeyError): + interpolator_cal(tel_id=2, time=1.0) + interpolator_pointing = PointingInterpolator(bounds_error=False) interpolator_cal = CalibrationInterpolator(bounds_error=False) interpolator_pointing.add_table(1, table_pointing) @@ -169,12 +144,12 @@ def test_bounds(): assert all(np.isnan(interpolator_cal(tel_id=1, time=-0.1))) - interpolator = PointingInterpolator(bounds_error=False, extrapolate=True) - interpolator.add_table(1, table_pointing) - alt, az = interpolator(tel_id=1, time=t0 - 1 * u.s) + interpolator_pointing = PointingInterpolator(bounds_error=False, extrapolate=True) + interpolator_pointing.add_table(1, table_pointing) + alt, az = interpolator_pointing(tel_id=1, time=t0 - 1 * u.s) assert u.isclose(alt, 71 * u.deg) assert u.isclose(az, -1 * u.deg) - alt, az = interpolator(tel_id=1, time=t0 + 11 * u.s) + alt, az = interpolator_pointing(tel_id=1, time=t0 + 11 * u.s) assert u.isclose(alt, 59 * u.deg) assert u.isclose(az, 11 * u.deg)