Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a PPU component #71

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions docs/components/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@
This module contains classes that abstract various Karabo devices to make access
easier.

- [Scans](scans.md)
- [Scantool][extra.components.Scantool]
- [Scan][extra.components.Scan]

- [Pulse patterns](pulse-patterns.md)
- [XrayPulses][extra.components.XrayPulses]
- [OpticalLaserPulses][extra.components.OpticalLaserPulses]
- [DldPulses][extra.components.DldPulses]
- [Pulse Picker Unit](pulse-picker-unit.md)
- [Scans](scans.md)
- [Scantool][extra.components.Scantool]
- [Scan][extra.components.Scan]
1 change: 1 addition & 0 deletions docs/components/pulse-picker-unit.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
::: extra.components.PPU
2 changes: 0 additions & 2 deletions mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@ nav:
- detector-geometry.md
- Components:
- components/index.md
- components/scans.md
- components/pulse-patterns.md
Comment on lines -25 to -26
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer if all pages were listed in the index. In general I dislike having disembodied pages because otherwise navigating to them requires that someone knows the right link to click on the right page (e.g. I always get confused when trying to find the xwiz docs).

- karabo-bridge.md
- utilities.md
- changelog.md
Expand Down
1 change: 1 addition & 0 deletions src/extra/components/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@

from .scantool import Scantool # noqa
from .ppu import PPU
from .pulses import XrayPulses, OpticalLaserPulses, DldPulses # noqa
from .scan import Scan
183 changes: 183 additions & 0 deletions src/extra/components/ppu.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
import logging
from functools import lru_cache
from typing import List, Union

import numpy as np
import pandas as pd
from extra_data import by_id
from extra_data.keydata import KeyData
from extra_data.reader import DataCollection
from extra_data.sourcedata import SourceData

log = logging.getLogger(__name__)


def _find_ppu(run: DataCollection, device: str = None):
"""Helper function to find a PPU device."""

# fast path, we don't validate if the type or name match
if isinstance(device, SourceData):
return device
elif isinstance(device, KeyData):
return run[device.source]
elif isinstance(device, str):
if device in run.control_sources:
return run[device]
elif device in run.alias:
return _find_ppu(run, run.alias[device])
# else search substring for match
elif device is not None:
raise KeyError(f"ppu must be a SourceData or str, not {type(device).__name__}")

# Then we list all PPU device in the run
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the 'then' is a bit misleading, it sounds like this is logically done after the code above, but this is the default entry point if no device is given by argument (but if it is given, the function will have returned at this point).
It is veeeery minor, but maybe "# Default: we get all PPU devices in the run as checked against hardcoded device classes, respectively substrings"

available_ppus = [
source
for source in run.control_sources
if run[source].device_class in PPU._DEVICE_CLASSES
]
if len(available_ppus) == 0:
available_ppus = [s for s in run.control_sources if "MDL/PPU" in s]
available_ppus += [s for s in run.control_sources if "MDL/DIPOLE_PPU" in s]

if len(available_ppus) == 0:
raise KeyError("Could not find a PPU device in this data")
elif len(available_ppus) == 1:
return run[available_ppus[0]]
else: # len(available_ppus) > 1
if device:
# And unique substrings of available PPU
matches = [name for name in available_ppus if device.upper() in name]
if len(matches) == 1:
return run[matches[0]]
elif len(matches) == 0:
raise KeyError(
f"Couldn't identify a PPU from '{device}'; please pass a valid device name, alias, or unique substring"
)
else:
raise KeyError(
f"Multiple PPUs found matching '{device}', please be more specific: {matches}"
)
raise KeyError(f"Multiple PPU devices found in that data: {available_ppus}")


class PPU:
"""Interface to a Pulse Picker Unit (PPU).

Despite its name, the PPU selects a bunch train from within the 10Hz
structure and block the remainder of the beam.

Technical description:
A motor-driven absorber rotor is rotated into the beam axis in order to
block the XFEL beam when triggered. The rotor is contained within a UHV
chamber. In terms of temporal structure, the beam pipe is blocked by an
absorbing rotor for up to 9/10ths of a second or vice versa,
synchronized to the facility clock/trigger.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very useful information for the docs, I only wonder (naive question) how the 9/10 blocking scheme, i.e. one train per second picked, fits to an overall run where 1 / 309 trains are selected - like run 3379, 50 for instance, shouln't it then be 30 trains with non-consecutive IDs?

"""

_DEVICE_CLASSES = [
"PulsePickerTrainTrigger", # PPU
"PulsePickerTrainTriggerCopy",
"StandardTrigger", # DIPOLE PPU
]

def __init__(
self, data: DataCollection, ppu: Union[KeyData, SourceData, str] = None
):
"""

Args:
data (DataCollection):
ppu (Union[KeyData, SourceData, str], optional):
Specify a Pulse Picker Unit device to use, necessary if a run
contains more than one PPU. This can be any of:
- The device name of the source.
- A `SourceData` or [KeyData][extra_data.KeyData] of the
control source (e.g. `HED_XTD6_PPU/MDL/PPU_TRIGGER`) of a
PPU.
- The alias name of either a `SourceData` or
[KeyData][extra_data.KeyData] belonging to a PPU.
- A unique (case-insensitive) substring of a PPU source name.

Raises:
KeyError: If we can't identify a unique PPU device from the
arguments.
"""
self.data = data
self.device = _find_ppu(data, ppu)

@lru_cache()
def number_of_trains(self, train_id: int) -> int:
"""Number of trains picked for the sequence starting at train_id.

Args:
train_id (int): train ID of the sequence start.
"""

# The Dipole PPU-like device does not allow to pick multiple trains
if "trainTrigger.numberOfTrains" not in self.device.keys():
return 1
n_trains = self.device["trainTrigger.numberOfTrains"]
return int(n_trains.select_trains(by_id[[train_id]]).ndarray()[0])

def train_ids(
self, offset: int = 0, labelled: bool = False
) -> Union[List[int], pd.Series]:
"""All train IDs picked by the PPU.

Args:
offset (int, optional):
offset to add to the selected trains. Defaults to 0.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For interest: why would the user want to add an offset? I'm sure there are use cases, just asking

labelled (bool, optional):
Returns a Pandas Series if set to True, where this index represents the
trigger sequence a train ID is part of. Defaults to False.

Returns:
Union[List[int], pd.Series]: Train IDs picked by the PPU.
"""
seq_start = self.device["trainTrigger.sequenceStart"].ndarray()
# The trains picked are the unique values of trainTrigger.sequenceStart
# minus the first (previous trigger before this run).
start_train_ids = np.unique(seq_start)[1:] + offset

train_ids = []
sequences = []
for seq, train_id in enumerate(start_train_ids):
span = self.number_of_trains(train_id)
train_ids.extend(np.arange(train_id, train_id + span).tolist())
sequences.extend([seq] * span)

log.info(
f"PPU device {self.device.source} triggered for {len(train_ids)} train(s) across {len(sequences)} sequence(s)."
)

if labelled:
train_ids = pd.Series(train_ids, index=sequences)
return train_ids

def trains(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you consider a different name for this method? Both in EXtra-data as well as across existing components, we use .trains() to return an iterator over trains. This method however takes something that has train IDs and changes it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Do you have something in mind?

  • filter? filter_trains?
  • train_selection? select? select_trains?
  • pick? pick_trains?
  • take? take_trains?

I'd probably go for pick_trains.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

select and select_trains also have different meanings already (though in the long run, I would love to pass this component to DataCollection.select_trains, i.e. have some kind of TrainSelector protocol)

From your list, I would probably also go for pick_trains, but it's unfortunate one way or the other that it's the "picking-thing" calling on the "thing-to-be-picked" 🤔
Maybe @takluyver can comment on what sounds most natural.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would love to pass this component to DataCollection.select_trains, i.e. have some kind of TrainSelector protocol

yes, me too!
But do we have an idea of what/how we want to select trains or how this protocol can look like? I can easily think on how to do that specifically for the PPU device, but I can't imagine other cases for now (maybe you have more use cases as part of the data reduction).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about we invert the problem: Instead of having a method that selects trains on some object, have a property or method that returns the correspondig extra_data.by_id slice? Then selection becomes: run.select_trains(ppu.train_sel) (name obviously preliminary)

Whenever we decide on a universal TrainSelector protocol, we can simply add it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to something like ppu.train_sel. For the split-sequence option, we could make PPU iterable, giving some sort of TrainSequence or TrainGroup object so you could do [run.select_trains(seq.train_sel) for seq in ppu].

self,
data: Union[DataCollection, SourceData, KeyData] = None,
*,
split_sequence: bool = False,
offset: int = 0,
) -> Union[DataCollection, List[DataCollection]]:
"""Returns a subset of the data only with Trains selected by the PPU.

Args:
data: Data to filter. If set to None (defaut) use the data used at initialization.
split_sequence (bool, optional): Split data per PPU trigger sequence. Defaults to False.
offset (int, optional): offset to apply to train IDs to be selected. Defaults to 0.

Returns:
Union[DataCollection, List[DataCollection]]:
DataCollection(s) containing only trains triggered by the PPU
"""
data = data or self.data
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good to make this generic. Strictly speaking, the EXtra component is thus not a PPU component any more, but a "trainSelector" component. I wonder if there should be such class per se. I see that this addition to the trains() method makes the PPU more flexible, but it also seems a bit awkward to initialize by PPU device information, in order to then feed it with something else, except for comparison. In other words, I am not sure if a generic class with specific methods would be better than a specific class with a generic method by argument 🙃

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure what you mean here. The line you highlighted here is only to make it convenient to use with KeyData or SourceData. Otherwise one would need to first filter the DataCollection and then make source/keydata object. Here you can instantiate all your object at the start of your script and then filter trains for any object as required.

Copy link
Member Author

@tmichela tmichela Dec 5, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ppu = PPU(run)
filtered_run = ppu.trains()
source = filtered_run['awesome_source']

vs

ppu = PPU(run)
source = run['awesome_source']
filtered_source = ppu.trains(source)

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the explanation. My comment was not about that particular line in the first place, more about the way this method accepts other source data i.e. allowing to do train picking based on a pre-selected source. This may be more convenient indeed. The default filtering i.e. trains() method without argument is using the PPU device source, if one is actually interested in another source as in your first code block, it is more "natural" to do it the 2nd way.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still, on a much higher level, I thought whether it would be good have a very generic "trainPicker" or "trainSelector" component in EXtra, that is, a class that could take PPU or any other source for filtering, already at instantiation level.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this comment is not affecting the PR, it is more an outlook to the future. (Maybe it does not make sense, because it lacks a real use case?)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not, but do we have other filtering sources/methods at this moment?


train_ids = self.train_ids(labelled=True, offset=offset)
if split_sequence:
return [
data.select_trains(by_id[seq.values])
for _, seq in train_ids.groupby(train_ids.index)
]
return data.select_trains(by_id[train_ids.values])
42 changes: 39 additions & 3 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,26 @@
from pathlib import Path
from tempfile import TemporaryDirectory

import h5py
import numpy as np
import pytest

from extra_data import RunDirectory
from extra_data.tests.mockdata import write_file
from extra_data.tests.mockdata.xgm import XGM
from extra_data.tests.mockdata.base import DeviceBase
from extra_data.tests.mockdata.motor import Motor
from extra_data.tests.mockdata.xgm import XGM

from .mockdata.timeserver import PulsePatternDecoder, Timeserver

from .mockdata.timeserver import Timeserver, PulsePatternDecoder

class PPU(DeviceBase):
control_keys = [
('trainTrigger.numberOfTrains', 'i4', ()),
('trainTrigger.sequenceStart', 'i4', ()),
]
extra_run_values = [
('classId', None, 'PulsePickerTrainTrigger'),
]


@pytest.fixture(scope='session')
Expand All @@ -24,3 +36,27 @@ def mock_spb_aux_run():
with TemporaryDirectory() as td:
write_file(Path(td) / 'RAW-R0001-DA01-S00000.h5', sources, 100)
yield RunDirectory(td)


@pytest.fixture(scope='session')
def ppu_run():
sources = [
PPU('HED_XTD6_PPU/MDL/PPU_TRIGGER'),
PPU('HED_DIPOLE_PPU/MDL/PPU_TRIGGER'),
Timeserver('HED_RR_SYS/TSYS/TIMESERVER'),
]

with TemporaryDirectory() as td:
fpath = Path(td) / 'RAW-R0001-DA01-S00000.h5'
write_file(fpath, sources, 100, firsttrain=10000, format_version='1.3')

with h5py.File(fpath, 'r+') as f:
f['/CONTROL/HED_XTD6_PPU/MDL/PPU_TRIGGER/trainTrigger/numberOfTrains'] = np.array([10] * 100, dtype=np.int64)
f['/CONTROL/HED_XTD6_PPU/MDL/PPU_TRIGGER/trainTrigger/sequenceStart'] = np.repeat([9000, 10080], 50)
f['/CONTROL/HED_DIPOLE_PPU/MDL/PPU_TRIGGER/trainTrigger/numberOfTrains'] = np.array([1] * 100, dtype=np.int64)
f['/CONTROL/HED_DIPOLE_PPU/MDL/PPU_TRIGGER/trainTrigger/sequenceStart'] = np.repeat([9985, 10015, 10045, 10075], 25)

aliases = {'ppu-hed': 'HED_XTD6_PPU/MDL/PPU_TRIGGER',
'ppu-dipole': 'HED_DIPOLE_PPU/MDL/PPU_TRIGGER'}
run = RunDirectory(td)
yield run.with_aliases(aliases)
86 changes: 86 additions & 0 deletions tests/test_components_ppu.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
import pandas as pd
import pytest

from extra_data.reader import DataCollection
from extra.components import PPU
from extra.components.ppu import _find_ppu


def test_find_ppu(ppu_run):
source = _find_ppu(ppu_run, ppu_run['HED_DIPOLE_PPU/MDL/PPU_TRIGGER'])
assert source.source == 'HED_DIPOLE_PPU/MDL/PPU_TRIGGER'

source = _find_ppu(ppu_run, ppu_run['HED_DIPOLE_PPU/MDL/PPU_TRIGGER', 'trainTrigger.sequenceStart'])
assert source.source == 'HED_DIPOLE_PPU/MDL/PPU_TRIGGER'

source = _find_ppu(ppu_run, 'HED_DIPOLE_PPU/MDL/PPU_TRIGGER')
assert source.source == 'HED_DIPOLE_PPU/MDL/PPU_TRIGGER'

source = _find_ppu(ppu_run, 'ppu-hed')
assert source.source == 'HED_XTD6_PPU/MDL/PPU_TRIGGER'

source = _find_ppu(ppu_run, 'XTD6')
assert source.source == 'HED_XTD6_PPU/MDL/PPU_TRIGGER'

source = _find_ppu(ppu_run.select('HED_XTD6_PPU*'))
assert source.source == 'HED_XTD6_PPU/MDL/PPU_TRIGGER'

# fails with multiple PPUs
with pytest.raises(KeyError) as excinfo:
_find_ppu(ppu_run)
assert 'Multiple PPU' in str(excinfo.value)

# fails with invalid device type
with pytest.raises(KeyError) as excinfo:
_find_ppu(ppu_run, 1)
assert 'not int' in str(excinfo.value)

# fails with 0 PPUs
with pytest.raises(KeyError) as excinfo:
_find_ppu(ppu_run.select('*TIMESERVER'))
assert 'Could not find a PPU' in str(excinfo.value)

# too many match
with pytest.raises(KeyError) as excinfo:
_find_ppu(ppu_run, 'PPU')
assert 'Multiple PPUs found matching' in str(excinfo.value)

# no match
with pytest.raises(KeyError) as excinfo:
_find_ppu(ppu_run, 'PPU2')
assert 'Couldn\'t identify a PPU' in str(excinfo.value)


def test_train_ids(ppu_run):
# single trigger sequence
ppu = PPU(ppu_run, 'ppu-hed')
train_ids = ppu.train_ids()
assert isinstance(train_ids, list)
assert len(train_ids) == 10
train_ids = ppu.train_ids(labelled=True)
assert isinstance(train_ids, pd.Series)
assert train_ids.size == 10 # 10 trains in total
assert train_ids.index.unique().size == 1 # single trigger sequence

# multiple trigger sequences
ppu = PPU(ppu_run, 'ppu-dipole')
train_ids = ppu.train_ids()
assert isinstance(train_ids, list)
assert len(train_ids) == 3
train_ids = ppu.train_ids(labelled=True)
assert isinstance(train_ids, pd.Series)
assert train_ids.index.unique().size == 3 # 3 trigger sequence
assert train_ids.size == 3 # 1 train per sequence


def test_trains(ppu_run):
ppu = PPU(ppu_run, 'ppu-dipole')
reduced_run = ppu.trains()
assert isinstance(reduced_run, DataCollection)
assert reduced_run.train_ids == [10015, 10045, 10075]

# split per sequence
reduced_run = ppu.trains(split_sequence=True)
assert isinstance(reduced_run, list)
assert len(reduced_run) == 3
assert reduced_run[0].train_ids == [10015]