From a0f059cc51cb54fa4abf079cd19e9697d1dafa68 Mon Sep 17 00:00:00 2001 From: Thomas Kluyver Date: Thu, 19 Oct 2023 14:00:13 +0100 Subject: [PATCH 01/13] Split out serve_data function and show moree info while streaming --- extra_data/export.py | 45 +++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 42 insertions(+), 3 deletions(-) diff --git a/extra_data/export.py b/extra_data/export.py index 79f6a38e..8f655bed 100644 --- a/extra_data/export.py +++ b/extra_data/export.py @@ -11,6 +11,8 @@ """ import os.path as osp +import time +from collections import deque from socket import AF_INET from warnings import warn @@ -107,9 +109,7 @@ def _iter_trains(data, merge_detector=False): yield tid, train_data -def serve_files(path, port, source_glob='*', key_glob='*', - append_detector_modules=False, dummy_timestamps=False, - use_infiniband=False, sock='REP'): +def serve_files(path, port, source_glob='*', key_glob='*', **kwargs): """Stream data from files through a TCP socket. Parameters @@ -144,15 +144,54 @@ def serve_files(path, port, source_glob='*', key_glob='*', data = H5File(path) data = data.select(source_glob, key_glob) + serve_data(data, port, **kwargs) + +def serve_data(data, port, append_detector_modules=False, + dummy_timestamps=False, use_infiniband=False, sock='REP'): + """Stream data from files through a TCP socket. + + Parameters + ---------- + data: DataCollection + The data to be streamed; should already have sources & keys selected. + port: str or int + A ZMQ endpoint (e.g. 'tcp://*:44444') or a TCP port to bind the socket + to. Integers or strings of all digits are treated as port numbers. + append_detector_modules: bool + Combine multi-module detector data in a single data source (sources for + individual modules are removed). The last section of the source name is + replaces with 'APPEND', example: + 'SPB_DET_AGIPD1M-1/DET/#CH0:xtdf' -> 'SPB_DET_AGIPD1M-1/DET/APPEND' + + Supported detectors: AGIPD, DSSC, LPD + dummy_timestamps: bool + Whether to add mock timestamps if the metadata lacks them. + use_infiniband: bool + Use infiniband interface if available (if port specifies a TCP port) + sock: str + socket type - supported: REP, PUB, PUSH (default REP). + """ if isinstance(port, int) or port.isdigit(): endpt = f'tcp://{find_infiniband_ip() if use_infiniband else "*"}:{port}' else: endpt = port + sender = Sender(endpt, sock=sock, dummy_timestamps=dummy_timestamps) print(f'Streamer started on: {sender.endpoint}') + ntrains = len(data.train_ids) + print(f'Sending {ntrains} trains') + + sent_times = deque([time.monotonic()], 10) + count = 0 for tid, data in _iter_trains(data, merge_detector=append_detector_modules): sender.send(data) + count += 1 + new_time = time.monotonic() + if count % 5 == 0: + rate = len(deque) / (new_time - sent_times[0]) + print(f'Sent {count}/{ntrains} trains - Train ID {tid} - {rate:.1f} Hz', end='\r') + sent_times.append(new_time) # The karabo-bridge code sets linger to 0 so that it doesn't get stuck if # the client goes away. But this would also mean that we close the socket From e97f0cb37e60d1cff41dfe97e9cf6d2a1b915f5a Mon Sep 17 00:00:00 2001 From: Thomas Kluyver Date: Thu, 19 Oct 2023 15:46:47 +0200 Subject: [PATCH 02/13] Fix getting length of deque --- docs/aligning_trains.ipynb | 2 +- extra_data/export.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/aligning_trains.ipynb b/docs/aligning_trains.ipynb index be6605de..d8689bc5 100644 --- a/docs/aligning_trains.ipynb +++ b/docs/aligning_trains.ipynb @@ -849,7 +849,7 @@ ], "metadata": { "kernelspec": { - "display_name": "xfel", + "display_name": "xfel (Python 3.7)", "language": "python", "name": "xfel" }, diff --git a/extra_data/export.py b/extra_data/export.py index 8f655bed..c4ffe688 100644 --- a/extra_data/export.py +++ b/extra_data/export.py @@ -189,7 +189,7 @@ def serve_data(data, port, append_detector_modules=False, count += 1 new_time = time.monotonic() if count % 5 == 0: - rate = len(deque) / (new_time - sent_times[0]) + rate = len(sent_times) / (new_time - sent_times[0]) print(f'Sent {count}/{ntrains} trains - Train ID {tid} - {rate:.1f} Hz', end='\r') sent_times.append(new_time) From 2d4f64d9fef2928e04e90e680a0604141f848646 Mon Sep 17 00:00:00 2001 From: Thomas Kluyver Date: Tue, 24 Oct 2023 16:46:04 +0100 Subject: [PATCH 03/13] Print final update when streaming finishes --- extra_data/export.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/extra_data/export.py b/extra_data/export.py index c4ffe688..eb2ea7e5 100644 --- a/extra_data/export.py +++ b/extra_data/export.py @@ -180,18 +180,22 @@ def serve_data(data, port, append_detector_modules=False, sender = Sender(endpt, sock=sock, dummy_timestamps=dummy_timestamps) print(f'Streamer started on: {sender.endpoint}') ntrains = len(data.train_ids) - print(f'Sending {ntrains} trains') sent_times = deque([time.monotonic()], 10) count = 0 + new_time = 0. + def print_update(end='\r'): + print(f'Sent {count}/{ntrains} trains - Train ID {tid} - {rate:.1f} Hz', end=end) + for tid, data in _iter_trains(data, merge_detector=append_detector_modules): sender.send(data) count += 1 new_time = time.monotonic() if count % 5 == 0: rate = len(sent_times) / (new_time - sent_times[0]) - print(f'Sent {count}/{ntrains} trains - Train ID {tid} - {rate:.1f} Hz', end='\r') + print_update() sent_times.append(new_time) + print_update(end='\n') # The karabo-bridge code sets linger to 0 so that it doesn't get stuck if # the client goes away. But this would also mean that we close the socket From eae99356628577d859bce86ee9a5bd6fbf7aa67a Mon Sep 17 00:00:00 2001 From: Thomas Kluyver Date: Tue, 24 Oct 2023 16:47:17 +0100 Subject: [PATCH 04/13] Add karabo-bridge-serve-run command --- extra_data/cli/serve_run.py | 81 +++++++++++++++++++++++++++++++++++++ setup.py | 1 + 2 files changed, 82 insertions(+) create mode 100644 extra_data/cli/serve_run.py diff --git a/extra_data/cli/serve_run.py b/extra_data/cli/serve_run.py new file mode 100644 index 00000000..6e357b37 --- /dev/null +++ b/extra_data/cli/serve_run.py @@ -0,0 +1,81 @@ +from argparse import ArgumentParser +import sys + +from .. import open_run + +IMPORT_FAILED_MSG = """\ +{} + +karabo-bridge-serve-run requires additional dependencies: + pip install karabo-bridge psutil +""" + +def main(argv=None): + ap = ArgumentParser(prog="karabo-bridge-serve-run") + ap.add_argument("proposal", help="Proposal number") + ap.add_argument("run", help="Run number") + ap.add_argument("port", help="TCP port or ZMQ endpoint to send data on") + ap.add_argument( + "--include", help="Select matching sources (and optionally keys) to " + "include in streamed data", + action='append' + ) + ap.add_argument( + "--append-detector-modules", help="combine multiple module sources" + " into one (will only work for AGIPD data currently).", + action='store_true' + ) + ap.add_argument( + "--dummy-timestamps", help="create dummy timestamps if the meta-data" + " lacks proper timestamps", + action='store_true' + ) + ap.add_argument( + "--use-infiniband", help="Use infiniband interface if available " + "(if a TCP port is specified)", + action='store_true' + ) + ap.add_argument( + "-z", "--socket-type", help="ZeroMQ socket type", + choices=['PUB', 'PUSH', 'REP'], default='REP' + ) + args = ap.parse_args(argv) + + try: + from ..export import serve_data + except ImportError as e: + sys.exit(IMPORT_FAILED_MSG.format(e)) + + run = open_run(args.proposal, args.run, data='all') + + if not args.include: + print("Available sources:") + for s in sorted(run.all_sources): + print(f" {s}") + sys.exit("Please select at least one source with --include") + + include = [] + for pat in args.include: + if '[' in pat: + if not pat.endswith(']'): + sys.exit(f"Missing final ] in {pat!r}") + src_pat, key_pat = pat[:-1].split('[', 1) + include.append((src_pat, key_pat)) + else: + # Source pattern only + include.append(pat) + + sel = run.select(include) + + try: + serve_data( + sel, source_glob=args.source, key_glob=args.key, + append_detector_modules=args.append_detector_modules, + dummy_timestamps=args.dummy_timestamps, + use_infiniband=args.use_infiniband, sock=args.socket_type + ) + except KeyboardInterrupt: + print('\nStopped.') + +if __name__ == '__main__': + main() diff --git a/setup.py b/setup.py index 0eaafd9e..889e0085 100755 --- a/setup.py +++ b/setup.py @@ -44,6 +44,7 @@ def find_version(*parts): "console_scripts": [ "lsxfel = extra_data.lsxfel:main", "karabo-bridge-serve-files = extra_data.cli.serve_files:main", + "karabo-bridge-serve-run = extra_data.cli.serve_run:main", "extra-data-validate = extra_data.validation:main", "extra-data-make-virtual-cxi = extra_data.cli.make_virtual_cxi:main", "extra-data-locality = extra_data.locality:main", From 0e3d0df5722c46eda82d226d2f17dc0607111abb Mon Sep 17 00:00:00 2001 From: Thomas Kluyver Date: Tue, 24 Oct 2023 17:03:24 +0100 Subject: [PATCH 05/13] karabo-bridge-serve-run: only send complete trains by default --- extra_data/cli/serve_run.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/extra_data/cli/serve_run.py b/extra_data/cli/serve_run.py index 6e357b37..30fc6e65 100644 --- a/extra_data/cli/serve_run.py +++ b/extra_data/cli/serve_run.py @@ -20,6 +20,10 @@ def main(argv=None): "include in streamed data", action='append' ) + ap.add_argument( + "--allow-partial", help="Send trains where some sources are missing", + action='store_true' + ) ap.add_argument( "--append-detector-modules", help="combine multiple module sources" " into one (will only work for AGIPD data currently).", @@ -65,7 +69,10 @@ def main(argv=None): # Source pattern only include.append(pat) - sel = run.select(include) + if args.allow_partial: + sel = run.select(include, require_any=True) + else: + sel = run.select(include, require_all=True) try: serve_data( From a67217f4872fea8813b0c2c1f4c228b2b2931c9d Mon Sep 17 00:00:00 2001 From: Thomas Kluyver Date: Tue, 24 Oct 2023 17:18:03 +0100 Subject: [PATCH 06/13] Fix parameters to serve_data() --- extra_data/cli/serve_run.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/extra_data/cli/serve_run.py b/extra_data/cli/serve_run.py index 30fc6e65..d0c17d60 100644 --- a/extra_data/cli/serve_run.py +++ b/extra_data/cli/serve_run.py @@ -76,7 +76,7 @@ def main(argv=None): try: serve_data( - sel, source_glob=args.source, key_glob=args.key, + sel, args.port, append_detector_modules=args.append_detector_modules, dummy_timestamps=args.dummy_timestamps, use_infiniband=args.use_infiniband, sock=args.socket_type From cda0b9cf55c13fdbdd7fdc15ba1b61c8e46d5c02 Mon Sep 17 00:00:00 2001 From: Thomas Kluyver Date: Wed, 25 Oct 2023 11:27:48 +0100 Subject: [PATCH 07/13] Ensure print_update() can't have an undefined variable --- extra_data/export.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/extra_data/export.py b/extra_data/export.py index eb2ea7e5..c23c3ede 100644 --- a/extra_data/export.py +++ b/extra_data/export.py @@ -183,7 +183,7 @@ def serve_data(data, port, append_detector_modules=False, sent_times = deque([time.monotonic()], 10) count = 0 - new_time = 0. + tid, rate = 0, 0. def print_update(end='\r'): print(f'Sent {count}/{ntrains} trains - Train ID {tid} - {rate:.1f} Hz', end=end) From 73208fcbc9099578755e296de8d4b5ce41f25caf Mon Sep 17 00:00:00 2001 From: Thomas Kluyver Date: Mon, 30 Oct 2023 15:53:24 +0000 Subject: [PATCH 08/13] Make --port an option --- extra_data/cli/serve_run.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/extra_data/cli/serve_run.py b/extra_data/cli/serve_run.py index d0c17d60..3012295f 100644 --- a/extra_data/cli/serve_run.py +++ b/extra_data/cli/serve_run.py @@ -14,7 +14,9 @@ def main(argv=None): ap = ArgumentParser(prog="karabo-bridge-serve-run") ap.add_argument("proposal", help="Proposal number") ap.add_argument("run", help="Run number") - ap.add_argument("port", help="TCP port or ZMQ endpoint to send data on") + ap.add_argument( + "--port", default="0", help="TCP port or ZMQ endpoint to send data on. " + "Selects a random TCP port by default.") ap.add_argument( "--include", help="Select matching sources (and optionally keys) to " "include in streamed data", From 10f145af4eb189779fd0740db260b32af208869a Mon Sep 17 00:00:00 2001 From: Thomas Kluyver Date: Wed, 1 Nov 2023 11:11:16 +0000 Subject: [PATCH 09/13] Document karabo-bridge-serve-run command --- docs/cli.rst | 72 ++++++++++++++++++++++++++++++++++++++++++++++ docs/conf.py | 2 +- docs/streaming.rst | 16 +++++++---- 3 files changed, 84 insertions(+), 6 deletions(-) diff --git a/docs/cli.rst b/docs/cli.rst index a62cc479..07fb38bf 100644 --- a/docs/cli.rst +++ b/docs/cli.rst @@ -44,6 +44,75 @@ Check the structure of an EuXFEL run or HDF5 file: If it finds problems with the data, the program will produce a list of them and exit with status 1. See :doc:`validation` for details of what it checks. +.. _cmd-serve-run: + +``karabo-bridge-serve-run`` +--------------------------- + +Stream data from a run, by proposal & run number, in the `Karabo bridge +`_ +format. See :doc:`streaming` for more information. + +.. code-block:: shell + + # Proposal run + karabo-bridge-serve-run 700000 40 --port 4321 \ + --include 'SPB_IRDA_JF4M/DET/JNGFR*:daqOutput' \ + --include '*/MOTOR/*[*Position]' + +.. program:: karabo-bridge-serve-run + +.. option:: --port + + Either a numeric TCP port, e.g. ``4321``, or a ZMQ endpoint address, e.g. + ``tcp://0.0.0.0:4321``. You will need to give the receiving code this port + number or address as well. + + If no port is specified, the program will pick an unused port, and display + the endpoint address as it starts. + +.. option:: --include + + Sources matching the pattern will be included in the streamed data. + You can also match keys by putting a key pattern in ``[]`` square brackets + at the end of the source pattern. + + You must specify at least one ``--include`` pattern, and you can use the + option several times to expand the selection. + + If data is flowing slower than you expect, see if you can use more specific + patterns to avoid sending unnecessary sources. + +.. option:: --allow-partial + + By default, trains are only sent if they contain all the data selected by + ``--include``. This option also sends trains where some of that data is + missing. + +.. option:: --append-detector-modules + + If the file data contains multiple detector modules as separate sources, + i. e. for big area detectors (AGIPD, LPD and DSSC), append these into one + single source. + +.. option:: --dummy-timestamps + + Add mock timestamps if missing in the original meta-data. + +These two options above - appended module sources and dummy timestamps - are +required if streamed data shall be provided to OnDA. + +.. option:: -z , --socket-type + + The ZMQ socket type to use, one of ``PUB``, ``PUSH`` or ``REP``. + Default: ``REP``. + +.. option:: --use-infiniband + + Use the infiniband network interface (``ib0``) if it's present. + This is ignored if ``--port`` is used with a full ZMQ endpoint address. + + .. _cmd-serve-files: ``karabo-bridge-serve-files`` @@ -53,6 +122,9 @@ Stream data from files in the `Karabo bridge `_ format. See :doc:`streaming` for more information. +For streaming data from a run directory, we recommend the newer +:ref:`cmd-serve-run` command in place of this. + .. code-block:: shell karabo-bridge-serve-files /gpfs/exfel/exp/XMPL/201750/p700000/proc/r0005 4321 diff --git a/docs/conf.py b/docs/conf.py index cb8f06c0..aead9214 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -76,7 +76,7 @@ # # This is also used if you do content translation via gettext catalogs. # Usually you set "language" from the command line for these cases. -language = None +language = 'en' # There are two options for replacing |today|: either, you set today to some # non-false value, then it is used: diff --git a/docs/streaming.rst b/docs/streaming.rst index cacfa3e6..f0aaea06 100644 --- a/docs/streaming.rst +++ b/docs/streaming.rst @@ -6,14 +6,20 @@ socket. The ``extra_data`` Python package can stream data from files using the s protocol. You can use this to test code which expects to receive data from Karabo Bridge, or use the same code for analysing live data and stored data. -To stream the data from a file or run unmodified, use the command:: +To stream data from a saved run, use the ``karabo-bridge-serve-run`` command: - karabo-bridge-serve-files /gpfs/exfel/exp/SPB/201830/p900022/raw/r0034 4545 +.. code-block:: shell -The number (4545) must be an unused TCP port above 1024. It will bind to -this and stream the data to any connected clients. + # Proposal run + karabo-bridge-serve-run 700000 40 --port 4545 \ + --include 'SPB_IRDA_JF4M/DET/JNGFR*:daqOutput' \ + --include '*/MOTOR/*[*Position]' + +The port number (4545 above) must be an unused TCP port above 1024. +Clients will need this port and the IP address of the sender to connect. +For clients running on the same node, use the IP address ``127.0.0.1``. Command-line options are explained on the -:ref:`command reference ` page. +:ref:`command reference ` page. .. note:: From 4554836672d76f6ff013dfc10a2b3b4dec8f53f5 Mon Sep 17 00:00:00 2001 From: Thomas Kluyver Date: Wed, 1 Nov 2023 11:30:29 +0000 Subject: [PATCH 10/13] Add a test for karabo-bridge-serve-run --- extra_data/read_machinery.py | 3 ++- extra_data/tests/test_streamer.py | 43 +++++++++++++++++++++++++++++++ 2 files changed, 45 insertions(+), 1 deletion(-) diff --git a/extra_data/read_machinery.py b/extra_data/read_machinery.py index 88cf56f7..13e7c6ff 100644 --- a/extra_data/read_machinery.py +++ b/extra_data/read_machinery.py @@ -4,6 +4,7 @@ """ import logging import math +import os import os.path as osp import re import time @@ -17,7 +18,7 @@ DETECTOR_NAMES = {'AGIPD', 'DSSC', 'LPD'} DETECTOR_SOURCE_RE = re.compile(r'(.+)/DET/(\d+)CH') -DATA_ROOT_DIR = '/gpfs/exfel/exp' +DATA_ROOT_DIR = os.environ.get('EXTRA_DATA_DATA_ROOT', '/gpfs/exfel/exp') class _SliceConstructor(type): diff --git a/extra_data/tests/test_streamer.py b/extra_data/tests/test_streamer.py index 3062e93c..2cb9d03c 100644 --- a/extra_data/tests/test_streamer.py +++ b/extra_data/tests/test_streamer.py @@ -2,6 +2,7 @@ import os +import numpy as np import pytest from subprocess import PIPE, Popen @@ -73,6 +74,48 @@ def test_serve_files(mock_fxe_raw_run, tmp_path): assert rc == -9 # process terminated by kill signal +@pytest.mark.skipif(os.name != 'posix', reason="Test uses Unix socket") +def test_serve_run(mock_spb_raw_and_proc_run, tmp_path): + mock_data_root, _, _ = mock_spb_raw_and_proc_run + xgm_src = 'SPB_XTD9_XGM/DOOCS/MAIN' + agipd_m0_src = 'SPB_DET_AGIPD1M-1/DET/0CH0:xtdf' + args = ['karabo-bridge-serve-run', '2012', '238', + '--port', f'ipc://{tmp_path}/socket', + '--include', f'{xgm_src}[beamPosition.i*Pos]', + '--include', '*AGIPD1M-1/DET/0CH0:xtdf' + ] + interface = None + + p = Popen(args, stdin=PIPE, stdout=PIPE, stderr=PIPE, + env=dict(os.environ, + PYTHONUNBUFFERED='1', + EXTRA_DATA_DATA_ROOT=mock_data_root)) + try: + for line in p.stdout: + line = line.decode('utf-8') + if line.startswith('Streamer started on:'): + interface = line.partition(':')[2].strip() + break + + print('interface:', interface) + assert interface is not None, p.stderr.read().decode() + + with Client(interface, timeout=30) as c: + data, meta = c.next() + + tid = next(m['timestamp.tid'] for m in meta.values()) + assert tid == 10000 + assert set(data) == {xgm_src, agipd_m0_src} + assert set(data[xgm_src]) == \ + {f'beamPosition.i{xy}Pos.value' for xy in 'xy'} | {'metadata'} + assert data[agipd_m0_src]['image.data'].dtype == np.float32 + finally: + if p.poll() is None: + p.kill() + rc = p.wait(timeout=2) + assert rc == -9 # process terminated by kill signal + + def test_deprecated_server(): with pytest.deprecated_call(): with ZMQStreamer(2222): From 505aa3ba355f9069120c1f00179316c3994b0231 Mon Sep 17 00:00:00 2001 From: Thomas Kluyver Date: Wed, 1 Nov 2023 13:54:57 +0000 Subject: [PATCH 11/13] Simplify test --- extra_data/tests/test_streamer.py | 24 ++++++++---------------- 1 file changed, 8 insertions(+), 16 deletions(-) diff --git a/extra_data/tests/test_streamer.py b/extra_data/tests/test_streamer.py index 2cb9d03c..7ff3ee3c 100644 --- a/extra_data/tests/test_streamer.py +++ b/extra_data/tests/test_streamer.py @@ -77,30 +77,22 @@ def test_serve_files(mock_fxe_raw_run, tmp_path): @pytest.mark.skipif(os.name != 'posix', reason="Test uses Unix socket") def test_serve_run(mock_spb_raw_and_proc_run, tmp_path): mock_data_root, _, _ = mock_spb_raw_and_proc_run + zmq_endpoint = f'ipc://{tmp_path}/socket' xgm_src = 'SPB_XTD9_XGM/DOOCS/MAIN' agipd_m0_src = 'SPB_DET_AGIPD1M-1/DET/0CH0:xtdf' args = ['karabo-bridge-serve-run', '2012', '238', - '--port', f'ipc://{tmp_path}/socket', + '--port', zmq_endpoint, '--include', f'{xgm_src}[beamPosition.i*Pos]', '--include', '*AGIPD1M-1/DET/0CH0:xtdf' ] - interface = None - p = Popen(args, stdin=PIPE, stdout=PIPE, stderr=PIPE, - env=dict(os.environ, - PYTHONUNBUFFERED='1', - EXTRA_DATA_DATA_ROOT=mock_data_root)) + p = Popen(args, env=dict( + os.environ, + PYTHONUNBUFFERED='1', + EXTRA_DATA_DATA_ROOT=mock_data_root + )) try: - for line in p.stdout: - line = line.decode('utf-8') - if line.startswith('Streamer started on:'): - interface = line.partition(':')[2].strip() - break - - print('interface:', interface) - assert interface is not None, p.stderr.read().decode() - - with Client(interface, timeout=30) as c: + with Client(zmq_endpoint, timeout=30) as c: data, meta = c.next() tid = next(m['timestamp.tid'] for m in meta.values()) From a46793d08d48ecf867e8e98cbb1f28edbaf2a78a Mon Sep 17 00:00:00 2001 From: Thomas Kluyver Date: Wed, 1 Nov 2023 14:21:35 +0000 Subject: [PATCH 12/13] Recreate raw+proc folder tree for each test using it --- extra_data/tests/conftest.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/extra_data/tests/conftest.py b/extra_data/tests/conftest.py index 9801f3b6..abc95306 100644 --- a/extra_data/tests/conftest.py +++ b/extra_data/tests/conftest.py @@ -140,7 +140,7 @@ def mock_spb_raw_run(format_version): yield td -@pytest.fixture(scope='session') +@pytest.fixture() def mock_spb_raw_and_proc_run(): with TemporaryDirectory() as td: prop_dir = osp.join(str(td), 'SPB', '201830', 'p002012') From f15401c6a5ad63452b38fd0f14134911a888a6bb Mon Sep 17 00:00:00 2001 From: Thomas Kluyver Date: Wed, 1 Nov 2023 14:52:05 +0000 Subject: [PATCH 13/13] Give test subprocesses a chance to exit cleanly --- extra_data/tests/test_streamer.py | 26 +++++++++++++++++--------- 1 file changed, 17 insertions(+), 9 deletions(-) diff --git a/extra_data/tests/test_streamer.py b/extra_data/tests/test_streamer.py index 7ff3ee3c..9aa84e29 100644 --- a/extra_data/tests/test_streamer.py +++ b/extra_data/tests/test_streamer.py @@ -1,10 +1,11 @@ """Test streaming data with ZMQ interface.""" import os +import signal +from subprocess import PIPE, Popen, TimeoutExpired import numpy as np import pytest -from subprocess import PIPE, Popen from extra_data import by_id, H5File, RunDirectory from extra_data.export import _iter_trains, ZMQStreamer @@ -42,6 +43,19 @@ def test_merge_detector(mock_fxe_raw_run, mock_fxe_control_data, mock_spb_proc_r break +def cleanup_proc(p: Popen): + if p.poll() is None: + p.send_signal(signal.SIGINT) + try: + p.wait(timeout=2) + except TimeoutExpired: + pass + if p.poll() is None: + p.kill() + rc = p.wait(timeout=2) + assert rc == -9 # process terminated by kill signal + + @pytest.mark.skipif(os.name != 'posix', reason="Test uses Unix socket") def test_serve_files(mock_fxe_raw_run, tmp_path): src = 'FXE_XAD_GEC/CAM/CAMERA:daqOutput' @@ -68,10 +82,7 @@ def test_serve_files(mock_fxe_raw_run, tmp_path): assert tid == 10000 assert set(data) == {src} finally: - if p.poll() is None: - p.kill() - rc = p.wait(timeout=2) - assert rc == -9 # process terminated by kill signal + cleanup_proc(p) @pytest.mark.skipif(os.name != 'posix', reason="Test uses Unix socket") @@ -102,10 +113,7 @@ def test_serve_run(mock_spb_raw_and_proc_run, tmp_path): {f'beamPosition.i{xy}Pos.value' for xy in 'xy'} | {'metadata'} assert data[agipd_m0_src]['image.data'].dtype == np.float32 finally: - if p.poll() is None: - p.kill() - rc = p.wait(timeout=2) - assert rc == -9 # process terminated by kill signal + cleanup_proc(p) def test_deprecated_server():