Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

karabo-bridge-serve-run command #458

Merged
merged 13 commits into from
Nov 6, 2023
45 changes: 42 additions & 3 deletions extra_data/export.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
"""

import os.path as osp
import time
from collections import deque
from socket import AF_INET
from warnings import warn

Expand Down Expand Up @@ -107,9 +109,7 @@ def _iter_trains(data, merge_detector=False):
yield tid, train_data


def serve_files(path, port, source_glob='*', key_glob='*',
append_detector_modules=False, dummy_timestamps=False,
use_infiniband=False, sock='REP'):
def serve_files(path, port, source_glob='*', key_glob='*', **kwargs):
"""Stream data from files through a TCP socket.

Parameters
Expand Down Expand Up @@ -144,15 +144,54 @@ def serve_files(path, port, source_glob='*', key_glob='*',
data = H5File(path)

data = data.select(source_glob, key_glob)
serve_data(data, port, **kwargs)


def serve_data(data, port, append_detector_modules=False,
dummy_timestamps=False, use_infiniband=False, sock='REP'):
"""Stream data from files through a TCP socket.

Parameters
----------
data: DataCollection
The data to be streamed; should already have sources & keys selected.
port: str or int
A ZMQ endpoint (e.g. 'tcp://*:44444') or a TCP port to bind the socket
to. Integers or strings of all digits are treated as port numbers.
append_detector_modules: bool
Combine multi-module detector data in a single data source (sources for
individual modules are removed). The last section of the source name is
replaces with 'APPEND', example:
'SPB_DET_AGIPD1M-1/DET/#CH0:xtdf' -> 'SPB_DET_AGIPD1M-1/DET/APPEND'

Supported detectors: AGIPD, DSSC, LPD
dummy_timestamps: bool
Whether to add mock timestamps if the metadata lacks them.
use_infiniband: bool
Use infiniband interface if available (if port specifies a TCP port)
sock: str
socket type - supported: REP, PUB, PUSH (default REP).
"""
if isinstance(port, int) or port.isdigit():
endpt = f'tcp://{find_infiniband_ip() if use_infiniband else "*"}:{port}'
else:
endpt = port

sender = Sender(endpt, sock=sock, dummy_timestamps=dummy_timestamps)
print(f'Streamer started on: {sender.endpoint}')
ntrains = len(data.train_ids)
print(f'Sending {ntrains} trains')

sent_times = deque([time.monotonic()], 10)
count = 0
for tid, data in _iter_trains(data, merge_detector=append_detector_modules):
sender.send(data)
count += 1
new_time = time.monotonic()
if count % 5 == 0:
rate = len(deque) / (new_time - sent_times[0])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand this is a rolling average, and limited to 10 elements, the length is 10 at max. (10 trains corresp. to 1 second), correct? Then: sent_times[0] is not the first time ever, but the first of the current deque (oldest elements are thrown out), so sent_times[-1] minus 10 elements, correct?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically, why is it len(deque) and not len(sent_times) - do you actually access the overall length of a deque type (not the actual object which has the name "sent_times")?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, 10 trains correspond to one second, and with a limit on the deque, each call to .append() drops one item out of the beginning (once it has filled up).

Each timestamp is measured just after sending a train, and we calculate this before adding the new timestamp. So since sent_times[0] we have sent 9 more trains corresponding to the other 9 entries in sent_times, plus the one we've just sent but not yet added. So 10 trains in the measured time interval.

print(f'Sent {count}/{ntrains} trains - Train ID {tid} - {rate:.1f} Hz', end='\r')
sent_times.append(new_time)

# The karabo-bridge code sets linger to 0 so that it doesn't get stuck if
# the client goes away. But this would also mean that we close the socket
Expand Down