Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

karabo-bridge-serve-run command #458

Merged
merged 13 commits into from
Nov 6, 2023
Merged
2 changes: 1 addition & 1 deletion docs/aligning_trains.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -849,7 +849,7 @@
],
"metadata": {
"kernelspec": {
"display_name": "xfel",
"display_name": "xfel (Python 3.7)",
"language": "python",
"name": "xfel"
},
Expand Down
88 changes: 88 additions & 0 deletions extra_data/cli/serve_run.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
from argparse import ArgumentParser
import sys

Check warning on line 2 in extra_data/cli/serve_run.py

View check run for this annotation

Codecov / codecov/patch

extra_data/cli/serve_run.py#L1-L2

Added lines #L1 - L2 were not covered by tests

from .. import open_run

Check warning on line 4 in extra_data/cli/serve_run.py

View check run for this annotation

Codecov / codecov/patch

extra_data/cli/serve_run.py#L4

Added line #L4 was not covered by tests

IMPORT_FAILED_MSG = """\

Check warning on line 6 in extra_data/cli/serve_run.py

View check run for this annotation

Codecov / codecov/patch

extra_data/cli/serve_run.py#L6

Added line #L6 was not covered by tests
{}

karabo-bridge-serve-run requires additional dependencies:
pip install karabo-bridge psutil
"""

def main(argv=None):
ap = ArgumentParser(prog="karabo-bridge-serve-run")
ap.add_argument("proposal", help="Proposal number")
ap.add_argument("run", help="Run number")
ap.add_argument("port", help="TCP port or ZMQ endpoint to send data on")
ap.add_argument(

Check warning on line 18 in extra_data/cli/serve_run.py

View check run for this annotation

Codecov / codecov/patch

extra_data/cli/serve_run.py#L13-L18

Added lines #L13 - L18 were not covered by tests
"--include", help="Select matching sources (and optionally keys) to "
"include in streamed data",
action='append'
)
ap.add_argument(

Check warning on line 23 in extra_data/cli/serve_run.py

View check run for this annotation

Codecov / codecov/patch

extra_data/cli/serve_run.py#L23

Added line #L23 was not covered by tests
"--allow-partial", help="Send trains where some sources are missing",
action='store_true'
)
ap.add_argument(

Check warning on line 27 in extra_data/cli/serve_run.py

View check run for this annotation

Codecov / codecov/patch

extra_data/cli/serve_run.py#L27

Added line #L27 was not covered by tests
"--append-detector-modules", help="combine multiple module sources"
" into one (will only work for AGIPD data currently).",
action='store_true'
)
ap.add_argument(

Check warning on line 32 in extra_data/cli/serve_run.py

View check run for this annotation

Codecov / codecov/patch

extra_data/cli/serve_run.py#L32

Added line #L32 was not covered by tests
"--dummy-timestamps", help="create dummy timestamps if the meta-data"
" lacks proper timestamps",
action='store_true'
)
ap.add_argument(

Check warning on line 37 in extra_data/cli/serve_run.py

View check run for this annotation

Codecov / codecov/patch

extra_data/cli/serve_run.py#L37

Added line #L37 was not covered by tests
"--use-infiniband", help="Use infiniband interface if available "
"(if a TCP port is specified)",
action='store_true'
)
ap.add_argument(

Check warning on line 42 in extra_data/cli/serve_run.py

View check run for this annotation

Codecov / codecov/patch

extra_data/cli/serve_run.py#L42

Added line #L42 was not covered by tests
"-z", "--socket-type", help="ZeroMQ socket type",
choices=['PUB', 'PUSH', 'REP'], default='REP'
)
args = ap.parse_args(argv)

Check warning on line 46 in extra_data/cli/serve_run.py

View check run for this annotation

Codecov / codecov/patch

extra_data/cli/serve_run.py#L46

Added line #L46 was not covered by tests

try:
from ..export import serve_data
except ImportError as e:
sys.exit(IMPORT_FAILED_MSG.format(e))

Check warning on line 51 in extra_data/cli/serve_run.py

View check run for this annotation

Codecov / codecov/patch

extra_data/cli/serve_run.py#L48-L51

Added lines #L48 - L51 were not covered by tests

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the reason to do this import here within the function and not at the top of the file? (except of course the fail message constant needs to be defined first)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd usually try to avoid side-effects (like sys.exit()) when loading a module, although it doesn't matter so much for a module defining a CLI like this. It's also handy that --help still works even without the extra dependencies.

We could still have the import at the top like this:

# Top of file
try:
    from ..export import serve_data
except ImportError:
    serve_data = None

# In the function
if serve_data is None:
    sys.exit(msg)

But that looks less neat to me.

run = open_run(args.proposal, args.run, data='all')

Check warning on line 53 in extra_data/cli/serve_run.py

View check run for this annotation

Codecov / codecov/patch

extra_data/cli/serve_run.py#L53

Added line #L53 was not covered by tests

if not args.include:
print("Available sources:")
for s in sorted(run.all_sources):
print(f" {s}")
sys.exit("Please select at least one source with --include")

Check warning on line 59 in extra_data/cli/serve_run.py

View check run for this annotation

Codecov / codecov/patch

extra_data/cli/serve_run.py#L55-L59

Added lines #L55 - L59 were not covered by tests

include = []
for pat in args.include:
if '[' in pat:
if not pat.endswith(']'):
sys.exit(f"Missing final ] in {pat!r}")
src_pat, key_pat = pat[:-1].split('[', 1)
include.append((src_pat, key_pat))

Check warning on line 67 in extra_data/cli/serve_run.py

View check run for this annotation

Codecov / codecov/patch

extra_data/cli/serve_run.py#L61-L67

Added lines #L61 - L67 were not covered by tests
else:
# Source pattern only
include.append(pat)

Check warning on line 70 in extra_data/cli/serve_run.py

View check run for this annotation

Codecov / codecov/patch

extra_data/cli/serve_run.py#L70

Added line #L70 was not covered by tests

if args.allow_partial:
sel = run.select(include, require_any=True)

Check warning on line 73 in extra_data/cli/serve_run.py

View check run for this annotation

Codecov / codecov/patch

extra_data/cli/serve_run.py#L72-L73

Added lines #L72 - L73 were not covered by tests
else:
sel = run.select(include, require_all=True)

Check warning on line 75 in extra_data/cli/serve_run.py

View check run for this annotation

Codecov / codecov/patch

extra_data/cli/serve_run.py#L75

Added line #L75 was not covered by tests

try:
serve_data(

Check warning on line 78 in extra_data/cli/serve_run.py

View check run for this annotation

Codecov / codecov/patch

extra_data/cli/serve_run.py#L77-L78

Added lines #L77 - L78 were not covered by tests
sel, args.port,
append_detector_modules=args.append_detector_modules,
dummy_timestamps=args.dummy_timestamps,
use_infiniband=args.use_infiniband, sock=args.socket_type
)
except KeyboardInterrupt:
print('\nStopped.')

Check warning on line 85 in extra_data/cli/serve_run.py

View check run for this annotation

Codecov / codecov/patch

extra_data/cli/serve_run.py#L84-L85

Added lines #L84 - L85 were not covered by tests

if __name__ == '__main__':
main()

Check warning on line 88 in extra_data/cli/serve_run.py

View check run for this annotation

Codecov / codecov/patch

extra_data/cli/serve_run.py#L87-L88

Added lines #L87 - L88 were not covered by tests
49 changes: 46 additions & 3 deletions extra_data/export.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
"""

import os.path as osp
import time
from collections import deque
from socket import AF_INET
from warnings import warn

Expand Down Expand Up @@ -107,9 +109,7 @@
yield tid, train_data


def serve_files(path, port, source_glob='*', key_glob='*',
append_detector_modules=False, dummy_timestamps=False,
use_infiniband=False, sock='REP'):
def serve_files(path, port, source_glob='*', key_glob='*', **kwargs):
"""Stream data from files through a TCP socket.

Parameters
Expand Down Expand Up @@ -144,15 +144,58 @@
data = H5File(path)

data = data.select(source_glob, key_glob)
serve_data(data, port, **kwargs)

Check warning on line 147 in extra_data/export.py

View check run for this annotation

Codecov / codecov/patch

extra_data/export.py#L147

Added line #L147 was not covered by tests


def serve_data(data, port, append_detector_modules=False,
dummy_timestamps=False, use_infiniband=False, sock='REP'):
"""Stream data from files through a TCP socket.

Parameters
----------
data: DataCollection
The data to be streamed; should already have sources & keys selected.
port: str or int
A ZMQ endpoint (e.g. 'tcp://*:44444') or a TCP port to bind the socket
to. Integers or strings of all digits are treated as port numbers.
append_detector_modules: bool
Combine multi-module detector data in a single data source (sources for
individual modules are removed). The last section of the source name is
replaces with 'APPEND', example:
'SPB_DET_AGIPD1M-1/DET/#CH0:xtdf' -> 'SPB_DET_AGIPD1M-1/DET/APPEND'

Supported detectors: AGIPD, DSSC, LPD
dummy_timestamps: bool
Whether to add mock timestamps if the metadata lacks them.
use_infiniband: bool
Use infiniband interface if available (if port specifies a TCP port)
sock: str
socket type - supported: REP, PUB, PUSH (default REP).
"""
if isinstance(port, int) or port.isdigit():
endpt = f'tcp://{find_infiniband_ip() if use_infiniband else "*"}:{port}'
else:
endpt = port

sender = Sender(endpt, sock=sock, dummy_timestamps=dummy_timestamps)
print(f'Streamer started on: {sender.endpoint}')
ntrains = len(data.train_ids)

Check warning on line 182 in extra_data/export.py

View check run for this annotation

Codecov / codecov/patch

extra_data/export.py#L182

Added line #L182 was not covered by tests

sent_times = deque([time.monotonic()], 10)
count = 0
new_time = 0.
Fixed Show fixed Hide fixed
def print_update(end='\r'):
print(f'Sent {count}/{ntrains} trains - Train ID {tid} - {rate:.1f} Hz', end=end)

Check warning on line 188 in extra_data/export.py

View check run for this annotation

Codecov / codecov/patch

extra_data/export.py#L184-L188

Added lines #L184 - L188 were not covered by tests

for tid, data in _iter_trains(data, merge_detector=append_detector_modules):
sender.send(data)
count += 1
new_time = time.monotonic()
if count % 5 == 0:
rate = len(sent_times) / (new_time - sent_times[0])
takluyver marked this conversation as resolved.
Show resolved Hide resolved
print_update()
sent_times.append(new_time)
print_update(end='\n')

Check warning on line 198 in extra_data/export.py

View check run for this annotation

Codecov / codecov/patch

extra_data/export.py#L192-L198

Added lines #L192 - L198 were not covered by tests
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Factored-out function to allow different line-end behaviour, carriage return vs. line break - fair enough to avoid the longish format string expression twice.


# The karabo-bridge code sets linger to 0 so that it doesn't get stuck if
# the client goes away. But this would also mean that we close the socket
Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ def find_version(*parts):
"console_scripts": [
"lsxfel = extra_data.lsxfel:main",
"karabo-bridge-serve-files = extra_data.cli.serve_files:main",
"karabo-bridge-serve-run = extra_data.cli.serve_run:main",
"extra-data-validate = extra_data.validation:main",
"extra-data-make-virtual-cxi = extra_data.cli.make_virtual_cxi:main",
"extra-data-locality = extra_data.locality:main",
Expand Down