Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

karabo-bridge-serve-run command #458

Merged
merged 13 commits into from
Nov 6, 2023
Merged
2 changes: 1 addition & 1 deletion docs/aligning_trains.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -849,7 +849,7 @@
],
"metadata": {
"kernelspec": {
"display_name": "xfel",
"display_name": "xfel (Python 3.7)",
"language": "python",
"name": "xfel"
},
Expand Down
72 changes: 72 additions & 0 deletions docs/cli.rst
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,75 @@ Check the structure of an EuXFEL run or HDF5 file:
If it finds problems with the data, the program will produce a list of them and
exit with status 1. See :doc:`validation` for details of what it checks.

.. _cmd-serve-run:

``karabo-bridge-serve-run``
---------------------------

Stream data from a run, by proposal & run number, in the `Karabo bridge
<https://rtd.xfel.eu/docs/data-analysis-user-documentation/en/latest/online.html#streaming-from-karabo-bridge>`_
format. See :doc:`streaming` for more information.

.. code-block:: shell

# Proposal run
karabo-bridge-serve-run 700000 40 --port 4321 \
--include 'SPB_IRDA_JF4M/DET/JNGFR*:daqOutput' \
--include '*/MOTOR/*[*Position]'

.. program:: karabo-bridge-serve-run

.. option:: --port <port>

Either a numeric TCP port, e.g. ``4321``, or a ZMQ endpoint address, e.g.
``tcp://0.0.0.0:4321``. You will need to give the receiving code this port
number or address as well.

If no port is specified, the program will pick an unused port, and display
the endpoint address as it starts.

.. option:: --include <pattern>

Sources matching the pattern will be included in the streamed data.
You can also match keys by putting a key pattern in ``[]`` square brackets
at the end of the source pattern.

You must specify at least one ``--include`` pattern, and you can use the
option several times to expand the selection.

If data is flowing slower than you expect, see if you can use more specific
patterns to avoid sending unnecessary sources.

.. option:: --allow-partial

By default, trains are only sent if they contain all the data selected by
``--include``. This option also sends trains where some of that data is
missing.

.. option:: --append-detector-modules

If the file data contains multiple detector modules as separate sources,
i. e. for big area detectors (AGIPD, LPD and DSSC), append these into one
single source.

.. option:: --dummy-timestamps

Add mock timestamps if missing in the original meta-data.

These two options above - appended module sources and dummy timestamps - are
required if streamed data shall be provided to OnDA.

.. option:: -z <type>, --socket-type <type>

The ZMQ socket type to use, one of ``PUB``, ``PUSH`` or ``REP``.
Default: ``REP``.

.. option:: --use-infiniband

Use the infiniband network interface (``ib0``) if it's present.
This is ignored if ``--port`` is used with a full ZMQ endpoint address.


.. _cmd-serve-files:

``karabo-bridge-serve-files``
Expand All @@ -53,6 +122,9 @@ Stream data from files in the `Karabo bridge
<https://rtd.xfel.eu/docs/data-analysis-user-documentation/en/latest/online.html#streaming-from-karabo-bridge>`_
format. See :doc:`streaming` for more information.

For streaming data from a run directory, we recommend the newer
:ref:`cmd-serve-run` command in place of this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In fact, I have used the old command (only) for streaming from a run directory, using the full path as argument. Apart from the fact that ...-serve-run is indeed more convenient to achieve this, what would be the main use case for using the old command now?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The -serve-files command makes it easy to stream from a non-standard run directory location, e.g. if we do an experimental correction of a run, we might put it in proposal scratch. Or it gives you a way to stream from red before we've integrated support for that. Or if users transfer run data back to their home institution and want to use EXtra-data there. I think the new -serve-run command will be better for ~95% of use cases.

The biggest reason to retain the old command is compatibility & familiarity, though - don't break what's working for people. 🙂

.. code-block:: shell

karabo-bridge-serve-files /gpfs/exfel/exp/XMPL/201750/p700000/proc/r0005 4321
Expand Down
2 changes: 1 addition & 1 deletion docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@
#
# This is also used if you do content translation via gettext catalogs.
# Usually you set "language" from the command line for these cases.
language = None
language = 'en'

# There are two options for replacing |today|: either, you set today to some
# non-false value, then it is used:
Expand Down
16 changes: 11 additions & 5 deletions docs/streaming.rst
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,20 @@ socket. The ``extra_data`` Python package can stream data from files using the s
protocol. You can use this to test code which expects to receive data from
Karabo Bridge, or use the same code for analysing live data and stored data.

To stream the data from a file or run unmodified, use the command::
To stream data from a saved run, use the ``karabo-bridge-serve-run`` command:

karabo-bridge-serve-files /gpfs/exfel/exp/SPB/201830/p900022/raw/r0034 4545
.. code-block:: shell

The number (4545) must be an unused TCP port above 1024. It will bind to
this and stream the data to any connected clients.
# Proposal run
karabo-bridge-serve-run 700000 40 --port 4545 \
--include 'SPB_IRDA_JF4M/DET/JNGFR*:daqOutput' \
--include '*/MOTOR/*[*Position]'

The port number (4545 above) must be an unused TCP port above 1024.
Clients will need this port and the IP address of the sender to connect.
For clients running on the same node, use the IP address ``127.0.0.1``.
Command-line options are explained on the
:ref:`command reference <cmd-serve-files>` page.
:ref:`command reference <cmd-serve-run>` page.

.. note::

Expand Down
90 changes: 90 additions & 0 deletions extra_data/cli/serve_run.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
from argparse import ArgumentParser
import sys

Check warning on line 2 in extra_data/cli/serve_run.py

View check run for this annotation

Codecov / codecov/patch

extra_data/cli/serve_run.py#L1-L2

Added lines #L1 - L2 were not covered by tests

from .. import open_run

Check warning on line 4 in extra_data/cli/serve_run.py

View check run for this annotation

Codecov / codecov/patch

extra_data/cli/serve_run.py#L4

Added line #L4 was not covered by tests

IMPORT_FAILED_MSG = """\

Check warning on line 6 in extra_data/cli/serve_run.py

View check run for this annotation

Codecov / codecov/patch

extra_data/cli/serve_run.py#L6

Added line #L6 was not covered by tests
{}

karabo-bridge-serve-run requires additional dependencies:
pip install karabo-bridge psutil
"""

def main(argv=None):
ap = ArgumentParser(prog="karabo-bridge-serve-run")
ap.add_argument("proposal", help="Proposal number")
ap.add_argument("run", help="Run number")
ap.add_argument(

Check warning on line 17 in extra_data/cli/serve_run.py

View check run for this annotation

Codecov / codecov/patch

extra_data/cli/serve_run.py#L13-L17

Added lines #L13 - L17 were not covered by tests
"--port", default="0", help="TCP port or ZMQ endpoint to send data on. "
"Selects a random TCP port by default.")
ap.add_argument(

Check warning on line 20 in extra_data/cli/serve_run.py

View check run for this annotation

Codecov / codecov/patch

extra_data/cli/serve_run.py#L20

Added line #L20 was not covered by tests
"--include", help="Select matching sources (and optionally keys) to "
"include in streamed data",
action='append'
)
ap.add_argument(

Check warning on line 25 in extra_data/cli/serve_run.py

View check run for this annotation

Codecov / codecov/patch

extra_data/cli/serve_run.py#L25

Added line #L25 was not covered by tests
"--allow-partial", help="Send trains where some sources are missing",
action='store_true'
)
ap.add_argument(

Check warning on line 29 in extra_data/cli/serve_run.py

View check run for this annotation

Codecov / codecov/patch

extra_data/cli/serve_run.py#L29

Added line #L29 was not covered by tests
"--append-detector-modules", help="combine multiple module sources"
" into one (will only work for AGIPD data currently).",
action='store_true'
)
ap.add_argument(

Check warning on line 34 in extra_data/cli/serve_run.py

View check run for this annotation

Codecov / codecov/patch

extra_data/cli/serve_run.py#L34

Added line #L34 was not covered by tests
"--dummy-timestamps", help="create dummy timestamps if the meta-data"
" lacks proper timestamps",
action='store_true'
)
ap.add_argument(

Check warning on line 39 in extra_data/cli/serve_run.py

View check run for this annotation

Codecov / codecov/patch

extra_data/cli/serve_run.py#L39

Added line #L39 was not covered by tests
"--use-infiniband", help="Use infiniband interface if available "
"(if a TCP port is specified)",
action='store_true'
)
ap.add_argument(

Check warning on line 44 in extra_data/cli/serve_run.py

View check run for this annotation

Codecov / codecov/patch

extra_data/cli/serve_run.py#L44

Added line #L44 was not covered by tests
"-z", "--socket-type", help="ZeroMQ socket type",
choices=['PUB', 'PUSH', 'REP'], default='REP'
)
args = ap.parse_args(argv)

Check warning on line 48 in extra_data/cli/serve_run.py

View check run for this annotation

Codecov / codecov/patch

extra_data/cli/serve_run.py#L48

Added line #L48 was not covered by tests

try:
from ..export import serve_data
except ImportError as e:
sys.exit(IMPORT_FAILED_MSG.format(e))

Check warning on line 53 in extra_data/cli/serve_run.py

View check run for this annotation

Codecov / codecov/patch

extra_data/cli/serve_run.py#L50-L53

Added lines #L50 - L53 were not covered by tests

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the reason to do this import here within the function and not at the top of the file? (except of course the fail message constant needs to be defined first)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd usually try to avoid side-effects (like sys.exit()) when loading a module, although it doesn't matter so much for a module defining a CLI like this. It's also handy that --help still works even without the extra dependencies.

We could still have the import at the top like this:

# Top of file
try:
    from ..export import serve_data
except ImportError:
    serve_data = None

# In the function
if serve_data is None:
    sys.exit(msg)

But that looks less neat to me.

run = open_run(args.proposal, args.run, data='all')

Check warning on line 55 in extra_data/cli/serve_run.py

View check run for this annotation

Codecov / codecov/patch

extra_data/cli/serve_run.py#L55

Added line #L55 was not covered by tests

if not args.include:
print("Available sources:")
for s in sorted(run.all_sources):
print(f" {s}")
sys.exit("Please select at least one source with --include")

Check warning on line 61 in extra_data/cli/serve_run.py

View check run for this annotation

Codecov / codecov/patch

extra_data/cli/serve_run.py#L57-L61

Added lines #L57 - L61 were not covered by tests

include = []
for pat in args.include:
if '[' in pat:
if not pat.endswith(']'):
sys.exit(f"Missing final ] in {pat!r}")
src_pat, key_pat = pat[:-1].split('[', 1)
include.append((src_pat, key_pat))

Check warning on line 69 in extra_data/cli/serve_run.py

View check run for this annotation

Codecov / codecov/patch

extra_data/cli/serve_run.py#L63-L69

Added lines #L63 - L69 were not covered by tests
else:
# Source pattern only
include.append(pat)

Check warning on line 72 in extra_data/cli/serve_run.py

View check run for this annotation

Codecov / codecov/patch

extra_data/cli/serve_run.py#L72

Added line #L72 was not covered by tests

if args.allow_partial:
sel = run.select(include, require_any=True)

Check warning on line 75 in extra_data/cli/serve_run.py

View check run for this annotation

Codecov / codecov/patch

extra_data/cli/serve_run.py#L74-L75

Added lines #L74 - L75 were not covered by tests
else:
sel = run.select(include, require_all=True)

Check warning on line 77 in extra_data/cli/serve_run.py

View check run for this annotation

Codecov / codecov/patch

extra_data/cli/serve_run.py#L77

Added line #L77 was not covered by tests

try:
serve_data(

Check warning on line 80 in extra_data/cli/serve_run.py

View check run for this annotation

Codecov / codecov/patch

extra_data/cli/serve_run.py#L79-L80

Added lines #L79 - L80 were not covered by tests
sel, args.port,
append_detector_modules=args.append_detector_modules,
dummy_timestamps=args.dummy_timestamps,
use_infiniband=args.use_infiniband, sock=args.socket_type
)
except KeyboardInterrupt:
print('\nStopped.')

Check warning on line 87 in extra_data/cli/serve_run.py

View check run for this annotation

Codecov / codecov/patch

extra_data/cli/serve_run.py#L86-L87

Added lines #L86 - L87 were not covered by tests

if __name__ == '__main__':
main()

Check warning on line 90 in extra_data/cli/serve_run.py

View check run for this annotation

Codecov / codecov/patch

extra_data/cli/serve_run.py#L89-L90

Added lines #L89 - L90 were not covered by tests
49 changes: 46 additions & 3 deletions extra_data/export.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
"""

import os.path as osp
import time
from collections import deque
from socket import AF_INET
from warnings import warn

Expand Down Expand Up @@ -107,9 +109,7 @@
yield tid, train_data


def serve_files(path, port, source_glob='*', key_glob='*',
append_detector_modules=False, dummy_timestamps=False,
use_infiniband=False, sock='REP'):
def serve_files(path, port, source_glob='*', key_glob='*', **kwargs):
"""Stream data from files through a TCP socket.

Parameters
Expand Down Expand Up @@ -144,15 +144,58 @@
data = H5File(path)

data = data.select(source_glob, key_glob)
serve_data(data, port, **kwargs)

Check warning on line 147 in extra_data/export.py

View check run for this annotation

Codecov / codecov/patch

extra_data/export.py#L147

Added line #L147 was not covered by tests


def serve_data(data, port, append_detector_modules=False,
dummy_timestamps=False, use_infiniband=False, sock='REP'):
"""Stream data from files through a TCP socket.

Parameters
----------
data: DataCollection
The data to be streamed; should already have sources & keys selected.
port: str or int
A ZMQ endpoint (e.g. 'tcp://*:44444') or a TCP port to bind the socket
to. Integers or strings of all digits are treated as port numbers.
append_detector_modules: bool
Combine multi-module detector data in a single data source (sources for
individual modules are removed). The last section of the source name is
replaces with 'APPEND', example:
'SPB_DET_AGIPD1M-1/DET/#CH0:xtdf' -> 'SPB_DET_AGIPD1M-1/DET/APPEND'

Supported detectors: AGIPD, DSSC, LPD
dummy_timestamps: bool
Whether to add mock timestamps if the metadata lacks them.
use_infiniband: bool
Use infiniband interface if available (if port specifies a TCP port)
sock: str
socket type - supported: REP, PUB, PUSH (default REP).
"""
if isinstance(port, int) or port.isdigit():
endpt = f'tcp://{find_infiniband_ip() if use_infiniband else "*"}:{port}'
else:
endpt = port

sender = Sender(endpt, sock=sock, dummy_timestamps=dummy_timestamps)
print(f'Streamer started on: {sender.endpoint}')
ntrains = len(data.train_ids)

Check warning on line 182 in extra_data/export.py

View check run for this annotation

Codecov / codecov/patch

extra_data/export.py#L182

Added line #L182 was not covered by tests

sent_times = deque([time.monotonic()], 10)
count = 0
tid, rate = 0, 0.
def print_update(end='\r'):
print(f'Sent {count}/{ntrains} trains - Train ID {tid} - {rate:.1f} Hz', end=end)

Check warning on line 188 in extra_data/export.py

View check run for this annotation

Codecov / codecov/patch

extra_data/export.py#L184-L188

Added lines #L184 - L188 were not covered by tests

for tid, data in _iter_trains(data, merge_detector=append_detector_modules):
sender.send(data)
count += 1
new_time = time.monotonic()
if count % 5 == 0:
rate = len(sent_times) / (new_time - sent_times[0])
takluyver marked this conversation as resolved.
Show resolved Hide resolved
print_update()
sent_times.append(new_time)
print_update(end='\n')

Check warning on line 198 in extra_data/export.py

View check run for this annotation

Codecov / codecov/patch

extra_data/export.py#L192-L198

Added lines #L192 - L198 were not covered by tests
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Factored-out function to allow different line-end behaviour, carriage return vs. line break - fair enough to avoid the longish format string expression twice.


# The karabo-bridge code sets linger to 0 so that it doesn't get stuck if
# the client goes away. But this would also mean that we close the socket
Expand Down
3 changes: 2 additions & 1 deletion extra_data/read_machinery.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
"""
import logging
import math
import os
import os.path as osp
import re
import time
Expand All @@ -17,7 +18,7 @@
DETECTOR_NAMES = {'AGIPD', 'DSSC', 'LPD'}
DETECTOR_SOURCE_RE = re.compile(r'(.+)/DET/(\d+)CH')

DATA_ROOT_DIR = '/gpfs/exfel/exp'
DATA_ROOT_DIR = os.environ.get('EXTRA_DATA_DATA_ROOT', '/gpfs/exfel/exp')


class _SliceConstructor(type):
Expand Down
2 changes: 1 addition & 1 deletion extra_data/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ def mock_spb_raw_run(format_version):
yield td


@pytest.fixture(scope='session')
@pytest.fixture()
def mock_spb_raw_and_proc_run():
with TemporaryDirectory() as td:
prop_dir = osp.join(str(td), 'SPB', '201830', 'p002012')
Expand Down
Loading