From 0617b5069059e263c26f490ad9ca0611536abecb Mon Sep 17 00:00:00 2001 From: tmichela Date: Mon, 27 May 2024 11:18:06 +0200 Subject: [PATCH 01/12] mount remote filesystem with sshfs clean debug code --- damnit/backend/extract_data.py | 18 +++- damnit/backend/listener.py | 73 +++++++++------ damnit/ctxsupport/ctxrunner.py | 162 +++++++++++++++++++++------------ 3 files changed, 166 insertions(+), 87 deletions(-) diff --git a/damnit/backend/extract_data.py b/damnit/backend/extract_data.py index d6d8a104..c13dd4b7 100644 --- a/damnit/backend/extract_data.py +++ b/damnit/backend/extract_data.py @@ -35,6 +35,7 @@ def innetgr(netgroup: bytes, host=None, user=None, domain=None): libc = CDLL("libc.so.6") return bool(libc.innetgr(netgroup, host, user, domain)) + def default_slurm_partition(): username = getpass.getuser().encode() if innetgr(b'exfel-wgs-users', user=username): @@ -43,6 +44,7 @@ def default_slurm_partition(): return 'upex' return 'all' + def run_in_subprocess(args, **kwargs): env = os.environ.copy() ctxsupport_dir = str(Path(__file__).parents[1] / 'ctxsupport') @@ -52,6 +54,7 @@ def run_in_subprocess(args, **kwargs): return subprocess.run(args, env=env, **kwargs) + def process_log_path(run, proposal, ctx_dir=Path('.'), create=True): p = ctx_dir.absolute() / 'process_logs' / f"r{run}-p{proposal}.out" if create: @@ -89,13 +92,13 @@ def loop(): def extract_in_subprocess( proposal, run, out_path, cluster=False, run_data=RunData.ALL, match=(), - python_exe=None, mock=False, tee_output=None + python_exe=None, mock=False, tee_output=None, data_location='localhost', ): if not python_exe: python_exe = sys.executable args = [python_exe, '-m', 'ctxrunner', 'exec', str(proposal), str(run), run_data.value, - '--save', out_path] + '--save', out_path, '--data-location', data_location] if cluster: args.append('--cluster-job') if mock: @@ -246,7 +249,8 @@ def slurm_options(self): return opts def extract_and_ingest(self, proposal, run, cluster=False, - run_data=RunData.ALL, match=(), mock=False, tee_output=None): + run_data=RunData.ALL, match=(), mock=False, tee_output=None, + data_location='localhost'): if proposal is None: proposal = self.proposal @@ -261,6 +265,7 @@ def extract_and_ingest(self, proposal, run, cluster=False, reduced_data = extract_in_subprocess( proposal, run, out_path, cluster=cluster, run_data=run_data, match=match, python_exe=python_exe, mock=mock, tee_output=tee_output, + data_location=data_location, ) log.info("Reduced data has %d fields", len(reduced_data)) add_to_db(reduced_data, self.db, proposal, run) @@ -274,6 +279,9 @@ def extract_and_ingest(self, proposal, run, cluster=False, log.info("Sent Kafka update to topic %r", self.db.kafka_topic) # Launch a Slurm job if there are any 'cluster' variables to evaluate + if data_location != 'localhost': + log.info('Skipping cluster variables with remote data [%s].', data_location) + return ctx = self.ctx_whole.filter(run_data=run_data, name_matches=match, cluster=cluster) ctx_slurm = self.ctx_whole.filter(run_data=run_data, name_matches=match, cluster=True) if set(ctx_slurm.vars) > set(ctx.vars): @@ -374,6 +382,7 @@ def reprocess(runs, proposal=None, match=(), mock=False): ap.add_argument('run_data', choices=('raw', 'proc', 'all')) ap.add_argument('--cluster-job', action="store_true") ap.add_argument('--match', action="append", default=[]) + ap.add_argument('--data-location', default='localhost') args = ap.parse_args() logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s: %(message)s") @@ -389,4 +398,5 @@ def reprocess(runs, proposal=None, match=(), mock=False): Extractor().extract_and_ingest(args.proposal, args.run, cluster=args.cluster_job, run_data=RunData(args.run_data), - match=args.match) + match=args.match, + data_location=args.data_location) diff --git a/damnit/backend/listener.py b/damnit/backend/listener.py index 02827974..3ba93621 100644 --- a/damnit/backend/listener.py +++ b/damnit/backend/listener.py @@ -10,6 +10,7 @@ from socket import gethostname from threading import Thread +from extra_data.read_machinery import find_proposal from kafka import KafkaConsumer from .db import DamnitDB @@ -17,18 +18,22 @@ # For now, the migration & calibration events come via DESY's Kafka brokers, # but the AMORE updates go via XFEL's test instance. -CONSUMER_ID = 'xfel-da-amore-prototype-{}' -KAFKA_CONF = { - 'maxwell': { - 'brokers': ['exflwgs06:9091'], - 'topics': ["test.r2d2", "cal.offline-corrections"], - 'events': ["migration_complete", "run_corrections_complete"], - }, - 'onc': { - 'brokers': ['exflwgs06:9091'], - 'topics': ['test.euxfel.hed.daq', 'test.euxfel.hed.cal'], - 'events': ['daq_run_complete', 'online_correction_complete'], - } +CONSUMER_ID = "xfel-da-amore-prototype-{}" +KAFKA_BROKERS = ["exflwgs06:9091"] +KAFKA_TOPICS = ["test.r2d2", "cal.offline-corrections", "test.euxfel.hed.daq", "test.euxfel.hed.cal"] +KAFKA_EVENTS = ["migration_complete", "run_corrections_complete", "daq_run_complete", "online_correction_complete"] +BACKEND_HOSTS_TO_ONLINE = ['max-exfl-display003.desy.de', 'max-exfl-display004.desy.de'] +ONLINE_HOSTS ={ + 'FXE': 'sa1-onc-fxe.desy.de', + 'HED': 'sa2-onc-hed.desy.de', + 'MID': 'sa2-onc-mid.desy.de', + # 'SA1': '', + # 'SA2': '', + # 'SA3': '', + 'SCS': 'sa3-onc-scs.desy.de', + 'SPB': 'sa1-onc-spb.desy.de', + 'SQS': 'sa3-onc-sqs.desy.de', + 'SXP': 'sa3-onc-sxp.desy.de', } log = logging.getLogger(__name__) @@ -73,20 +78,30 @@ def __init__(self, context_dir=Path('.')): # Fail fast if read-only - https://stackoverflow.com/a/44707371/434217 self.db.conn.execute("pragma user_version=0;") self.proposal = self.db.metameta['proposal'] - log.info(f"Will watch for events from proposal {self.proposal}") - if gethostname().startswith('exflonc'): - # running on the online cluster - kafka_conf = KAFKA_CONF['onc'] - else: - kafka_conf = KAFKA_CONF['maxwell'] + log.info(f"Will watch for events from proposal {self.proposal}") consumer_id = CONSUMER_ID.format(self.db.metameta['db_id']) - self.kafka_cns = KafkaConsumer(*kafka_conf['topics'], - bootstrap_servers=kafka_conf['brokers'], + self.kafka_cns = KafkaConsumer(*KAFKA_TOPICS, + bootstrap_servers=KAFKA_BROKERS, group_id=consumer_id) - self.events = kafka_conf['events'] + # check backend host and connection to online cluster + self.online_data_host = None + self.run_online = self.db.metameta.get('run_online', False) is not False + if self.run_online and gethostname() not in BACKEND_HOSTS_TO_ONLINE: + log.warning(f"Disabled online processing, the backend must run on one of: {BACKEND_HOSTS_TO_ONLINE}") + self.run_online = False + if self.run_online: + topic = Path(find_proposal(f'p{self.proposal:06}')).parts[-3] + if (remote_host := ONLINE_HOSTS.get(topic)) is None: + log.warn(f"Can't run online processing for topic '{topic}'") + self.run_online = False + else: + self.online_data_host = remote_host + log.debug("Processing online data? %s", self.run_online) + + # Monitor thread for subprocesses self.extract_procs_queue = queue.Queue() self.extract_procs_watcher = Thread( target=watch_processes_finish, @@ -113,17 +128,19 @@ def run(self): def _process_kafka_event(self, record): msg = json.loads(record.value.decode()) event = msg.get('event') - if event in self.events: + if event in KAFKA_EVENTS: log.debug("Processing %s event from Kafka", event) getattr(self, f'handle_{event}')(record, msg) else: log.debug("Unexpected %s event from Kafka", event) def handle_daq_run_complete(self, record, msg: dict): - self.handle_event(record, msg, RunData.RAW) + if self.run_online: + self.handle_event(record, msg, RunData.RAW, self.online_data_host) def handle_online_correction_complete(self, record, msg: dict): - self.handle_event(record, msg, RunData.PROC) + if self.run_online: + self.handle_event(record, msg, RunData.PROC, self.online_data_host) def handle_migration_complete(self, record, msg: dict): self.handle_event(record, msg, RunData.RAW) @@ -131,7 +148,8 @@ def handle_migration_complete(self, record, msg: dict): def handle_run_corrections_complete(self, record, msg: dict): self.handle_event(record, msg, RunData.PROC) - def handle_event(self, record, msg: dict, run_data: RunData): + def handle_event(self, record, msg: dict, run_data: RunData, + data_location: str = "localhost"): proposal = int(msg['proposal']) run = int(msg['run']) @@ -149,10 +167,12 @@ def handle_event(self, record, msg: dict, run_data: RunData): # Create subprocess to process the run extract_proc = subprocess.Popen([ sys.executable, '-m', 'damnit.backend.extract_data', - str(proposal), str(run), run_data.value + str(proposal), str(run), run_data.value, + '--data-location', data_location, ], cwd=self.context_dir, stdout=logf, stderr=subprocess.STDOUT) self.extract_procs_queue.put((proposal, run, extract_proc)) + def listen(): # Set up logging to a file file_handler = logging.FileHandler("amore.log") @@ -177,5 +197,6 @@ def listen(): if os.stat("amore.log").st_uid == os.getuid(): os.chmod("amore.log", 0o666) + if __name__ == '__main__': listen() diff --git a/damnit/ctxsupport/ctxrunner.py b/damnit/ctxsupport/ctxrunner.py index dc83c4fc..361b9307 100644 --- a/damnit/ctxsupport/ctxrunner.py +++ b/damnit/ctxsupport/ctxrunner.py @@ -14,12 +14,16 @@ import pickle import sys import time +from contextlib import contextmanager from datetime import timezone import traceback from enum import Enum from pathlib import Path from unittest.mock import MagicMock from graphlib import CycleError, TopologicalSorter +from subprocess import run +from tempfile import TemporaryDirectory +from unittest.mock import patch from matplotlib.axes import Axes from matplotlib.figure import Figure @@ -594,6 +598,98 @@ def save_hdf5(self, hdf5_path, reduced_only=False): if os.stat(hdf5_path).st_uid == os.getuid(): os.chmod(hdf5_path, 0o666) + +@contextmanager +def filesystem(host='localhost'): + """Mount remote proposal data with sshfs + + mount `/gpfs/exfel/exp/` from `host` and patch + `extra_data.read_machinery.DATA_ROOT_DIR` to use it instead of the local one. + Opening a file or a run directory with extra_data will open the remote data. + """ + if host == 'localhost': + yield + return + + with TemporaryDirectory() as td: + try: + mount_command = [ + "sshfs", f"{host}:{extra_data.read_machinery.DATA_ROOT_DIR}", str(td), + # deactivate password prompt to fail if we don't have a valid ssh key + "-o", "ssh_command='ssh -o PasswordAuthentication=no'" + ] + run(mount_command, check=True, shell=True) + + with patch("extra_data.read_machinery.DATA_ROOT_DIR", td): + yield + + finally: + run(["fusermount", "-u", str(td)], check=True, shell=True) + + +def execute_context(args): + # Check if we have proc data + proc_available = False + if args.mock: + # If we want to mock a run, assume it's available + proc_available = True + else: + # Otherwise check with open_run() + try: + extra_data.open_run(args.proposal, args.run, data="proc") + proc_available = True + except FileNotFoundError: + pass + except Exception as e: + log.warning(f"Error when checking if proc data available: {e}") + + run_data = RunData(args.run_data) + if run_data == RunData.ALL and not proc_available: + log.warning("Proc data is unavailable, only raw variables will be executed.") + run_data = RunData.RAW + + ctx_whole = ContextFile.from_py_file(Path('context.py')) + ctx_whole.check() + ctx = ctx_whole.filter( + run_data=run_data, cluster=args.cluster_job, name_matches=args.match + ) + log.info("Using %d variables (of %d) from context file %s", + len(ctx.vars), len(ctx_whole.vars), + "" if args.cluster_job else "(cluster variables will be processed later)") + + if args.mock: + run_dc = mock_run() + else: + # Make sure that we always select the most data possible, so proc + # variables have access to raw data too. + actual_run_data = RunData.ALL if run_data == RunData.PROC else run_data + run_dc = extra_data.open_run(args.proposal, args.run, data=actual_run_data.value) + + res = ctx.execute(run_dc, args.run, args.proposal, input_vars={}) + + for path in args.save: + res.save_hdf5(path) + for path in args.save_reduced: + res.save_hdf5(path, reduced_only=True) + + +def evaluate_context(args): + error_info = None + + try: + ctx = ContextFile.from_py_file(args.context_file) + + # Strip the functions from the Variable's, these cannot always be + # pickled. + for var in ctx.vars.values(): + var.func = None + except: + ctx = None + error_info = extract_error_info(*sys.exc_info()) + + args.out_file.write_bytes(pickle.dumps((ctx, error_info))) + + def mock_run(): run = MagicMock() run.files = [MagicMock(filename="/tmp/foo/bar.h5")] @@ -612,6 +708,7 @@ def train_timestamps(): return run + def main(argv=None): ap = argparse.ArgumentParser() subparsers = ap.add_subparsers(required=True, dest="subcmd") @@ -625,6 +722,7 @@ def main(argv=None): exec_ap.add_argument('--match', action="append", default=[]) exec_ap.add_argument('--save', action='append', default=[]) exec_ap.add_argument('--save-reduced', action='append', default=[]) + exec_ap.add_argument('--data-location', default='localhost') ctx_ap = subparsers.add_parser("ctx", help="Evaluate context file and pickle it to a file") ctx_ap.add_argument("context_file", type=Path) @@ -634,64 +732,14 @@ def main(argv=None): logging.basicConfig(level=logging.INFO) if args.subcmd == "exec": - # Check if we have proc data - proc_available = False - if args.mock: - # If we want to mock a run, assume it's available - proc_available = True - else: - # Otherwise check with open_run() - try: - extra_data.open_run(args.proposal, args.run, data="proc") - proc_available = True - except FileNotFoundError: - pass - except Exception as e: - log.warning(f"Error when checking if proc data available: {e}") - - run_data = RunData(args.run_data) - if run_data == RunData.ALL and not proc_available: - log.warning("Proc data is unavailable, only raw variables will be executed.") - run_data = RunData.RAW - - ctx_whole = ContextFile.from_py_file(Path('context.py')) - ctx_whole.check() - ctx = ctx_whole.filter( - run_data=run_data, cluster=args.cluster_job, name_matches=args.match - ) - log.info("Using %d variables (of %d) from context file %s", - len(ctx.vars), len(ctx_whole.vars), - "" if args.cluster_job else "(cluster variables will be processed later)") - - if args.mock: - run_dc = mock_run() - else: - # Make sure that we always select the most data possible, so proc - # variables have access to raw data too. - actual_run_data = RunData.ALL if run_data == RunData.PROC else run_data - run_dc = extra_data.open_run(args.proposal, args.run, data=actual_run_data.value) - - res = ctx.execute(run_dc, args.run, args.proposal, input_vars={}) - - for path in args.save: - res.save_hdf5(path) - for path in args.save_reduced: - res.save_hdf5(path, reduced_only=True) + if args.cluster_job and args.data_location != 'localhost': + # Only run cluster jobs with data on Maxwell + log.info('Skipping cluster jobs for remote data [%s].', args.data_location) + return + with filesystem(args.data_location): + execute_context(args) elif args.subcmd == "ctx": - error_info = None - - try: - ctx = ContextFile.from_py_file(args.context_file) - - # Strip the functions from the Variable's, these cannot always be - # pickled. - for var in ctx.vars.values(): - var.func = None - except: - ctx = None - error_info = extract_error_info(*sys.exc_info()) - - args.out_file.write_bytes(pickle.dumps((ctx, error_info))) + evaluate_context(args) if __name__ == '__main__': From b97fd0a6fdc3663d54f26c3c617d5e0453031e3a Mon Sep 17 00:00:00 2001 From: Thomas Michelat Date: Wed, 5 Jun 2024 12:52:12 +0200 Subject: [PATCH 02/12] adress reviewer comments, prevent user to start the listener on the online cluster --- damnit/backend/extract_data.py | 2 +- damnit/backend/listener.py | 6 +++++- damnit/gui/main_window.py | 8 ++++++++ 3 files changed, 14 insertions(+), 2 deletions(-) diff --git a/damnit/backend/extract_data.py b/damnit/backend/extract_data.py index c13dd4b7..2c0fd8d0 100644 --- a/damnit/backend/extract_data.py +++ b/damnit/backend/extract_data.py @@ -382,7 +382,7 @@ def reprocess(runs, proposal=None, match=(), mock=False): ap.add_argument('run_data', choices=('raw', 'proc', 'all')) ap.add_argument('--cluster-job', action="store_true") ap.add_argument('--match', action="append", default=[]) - ap.add_argument('--data-location', default='localhost') + ap.add_argument('--data-location', default='localhost', help=argparse.SUPPRESS) args = ap.parse_args() logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s: %(message)s") diff --git a/damnit/backend/listener.py b/damnit/backend/listener.py index 3ba93621..ab525fd5 100644 --- a/damnit/backend/listener.py +++ b/damnit/backend/listener.py @@ -73,6 +73,10 @@ def watch_processes_finish(q: queue.Queue): class EventProcessor: def __init__(self, context_dir=Path('.')): + if gethostname().startswith('exflonc'): + log.warning('Running the DAMNIT listener on the online cluster is not allowed') + exit(1) + self.context_dir = context_dir self.db = DamnitDB.from_dir(context_dir) # Fail fast if read-only - https://stackoverflow.com/a/44707371/434217 @@ -95,7 +99,7 @@ def __init__(self, context_dir=Path('.')): if self.run_online: topic = Path(find_proposal(f'p{self.proposal:06}')).parts[-3] if (remote_host := ONLINE_HOSTS.get(topic)) is None: - log.warn(f"Can't run online processing for topic '{topic}'") + log.warning(f"Can't run online processing for topic '{topic}'") self.run_online = False else: self.online_data_host = remote_host diff --git a/damnit/gui/main_window.py b/damnit/gui/main_window.py index 91d18218..3572f397 100644 --- a/damnit/gui/main_window.py +++ b/damnit/gui/main_window.py @@ -929,6 +929,14 @@ def __init__(self, file_path: Path, parent=None): def prompt_setup_db_and_backend(context_dir: Path, prop_no=None, parent=None): if not db_path(context_dir).is_file(): + if gethostname().endswith('exflonc'): + # prevent starting the backend on the online cluster + QMessageBox.warning( + "Running the DAMNIT backend on the online cluster is not allowed. " + "Please, open the damint GUI on Maxwell instead and retry." + ) + return False + button = QMessageBox.question( parent, "Database not found", f"{context_dir} does not contain a DAMNIT database, " From 28ea33354de76a84376bc335a0efae07beb244b3 Mon Sep 17 00:00:00 2001 From: Thomas Michelat Date: Wed, 5 Jun 2024 13:05:51 +0200 Subject: [PATCH 03/12] fix typo --- damnit/gui/main_window.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/damnit/gui/main_window.py b/damnit/gui/main_window.py index 3572f397..d832fd59 100644 --- a/damnit/gui/main_window.py +++ b/damnit/gui/main_window.py @@ -929,7 +929,7 @@ def __init__(self, file_path: Path, parent=None): def prompt_setup_db_and_backend(context_dir: Path, prop_no=None, parent=None): if not db_path(context_dir).is_file(): - if gethostname().endswith('exflonc'): + if gethostname().startswith('exflonc'): # prevent starting the backend on the online cluster QMessageBox.warning( "Running the DAMNIT backend on the online cluster is not allowed. " From 777425245823ce7fd6452af664d32879724c750d Mon Sep 17 00:00:00 2001 From: tmichela Date: Fri, 7 Jun 2024 14:41:42 +0200 Subject: [PATCH 04/12] suppress cli argument help --- damnit/backend/listener.py | 2 +- damnit/ctxsupport/ctxrunner.py | 19 +++++++++++-------- 2 files changed, 12 insertions(+), 9 deletions(-) diff --git a/damnit/backend/listener.py b/damnit/backend/listener.py index ab525fd5..6ecdd142 100644 --- a/damnit/backend/listener.py +++ b/damnit/backend/listener.py @@ -103,7 +103,7 @@ def __init__(self, context_dir=Path('.')): self.run_online = False else: self.online_data_host = remote_host - log.debug("Processing online data? %s", self.run_online) + log.info("Processing online data? %s", self.run_online) # Monitor thread for subprocesses self.extract_procs_queue = queue.Queue() diff --git a/damnit/ctxsupport/ctxrunner.py b/damnit/ctxsupport/ctxrunner.py index 361b9307..52b58eb6 100644 --- a/damnit/ctxsupport/ctxrunner.py +++ b/damnit/ctxsupport/ctxrunner.py @@ -613,18 +613,21 @@ def filesystem(host='localhost'): with TemporaryDirectory() as td: try: - mount_command = [ - "sshfs", f"{host}:{extra_data.read_machinery.DATA_ROOT_DIR}", str(td), - # deactivate password prompt to fail if we don't have a valid ssh key - "-o", "ssh_command='ssh -o PasswordAuthentication=no'" - ] + mount_command = ( + f"sshfs {host}:{extra_data.read_machinery.DATA_ROOT_DIR} {td} " + # deactivate password prompt to fail if we don't have a valid ssh key + "-o ssh_command='ssh -o PasswordAuthentication=no'" + ) + print('cmd:', mount_command) run(mount_command, check=True, shell=True) with patch("extra_data.read_machinery.DATA_ROOT_DIR", td): yield - + except Exception as ex: + import traceback + traceback.print_exc() finally: - run(["fusermount", "-u", str(td)], check=True, shell=True) + run(f"fusermount -u {td}", check=True, shell=True) def execute_context(args): @@ -722,7 +725,7 @@ def main(argv=None): exec_ap.add_argument('--match', action="append", default=[]) exec_ap.add_argument('--save', action='append', default=[]) exec_ap.add_argument('--save-reduced', action='append', default=[]) - exec_ap.add_argument('--data-location', default='localhost') + exec_ap.add_argument('--data-location', default='localhost', help=argparse.SUPPRESS) ctx_ap = subparsers.add_parser("ctx", help="Evaluate context file and pickle it to a file") ctx_ap.add_argument("context_file", type=Path) From 87df37f538cde47c8bb6ba0467da91ebdba50263 Mon Sep 17 00:00:00 2001 From: Thomas Michelat <32831491+tmichela@users.noreply.github.com> Date: Mon, 15 Jul 2024 10:27:33 +0200 Subject: [PATCH 05/12] Process online data from solaris (#288) run as slurm job, proxy through 10G node --- damnit/backend/extract_data.py | 2 +- damnit/backend/listener.py | 5 ----- damnit/ctxsupport/ctxrunner.py | 34 +++++++++++++++++++++------------- 3 files changed, 22 insertions(+), 19 deletions(-) diff --git a/damnit/backend/extract_data.py b/damnit/backend/extract_data.py index a37a94b3..45aa6605 100644 --- a/damnit/backend/extract_data.py +++ b/damnit/backend/extract_data.py @@ -195,7 +195,7 @@ def update_db_vars(self): def extract_and_ingest(self, proposal, run, cluster=False, run_data=RunData.ALL, match=(), mock=False, - data_location='localhost',): + data_location='localhost'): if proposal is None: proposal = self.db.metameta['proposal'] diff --git a/damnit/backend/listener.py b/damnit/backend/listener.py index ca46923f..37fd0e27 100644 --- a/damnit/backend/listener.py +++ b/damnit/backend/listener.py @@ -19,7 +19,6 @@ KAFKA_BROKERS = ["exflwgs06:9091"] KAFKA_TOPICS = ["test.r2d2", "cal.offline-corrections", "test.euxfel.hed.daq", "test.euxfel.hed.cal"] KAFKA_EVENTS = ["migration_complete", "run_corrections_complete", "daq_run_complete", "online_correction_complete"] -BACKEND_HOSTS_TO_ONLINE = ['max-exfl-display003.desy.de', 'max-exfl-display004.desy.de'] ONLINE_HOSTS ={ 'FXE': 'sa1-onc-fxe.desy.de', 'HED': 'sa2-onc-hed.desy.de', @@ -61,9 +60,6 @@ def __init__(self, context_dir=Path('.')): # check backend host and connection to online cluster self.online_data_host = None self.run_online = self.db.metameta.get('run_online', False) is not False - if self.run_online and gethostname() not in BACKEND_HOSTS_TO_ONLINE: - log.warning(f"Disabled online processing, the backend must run on one of: {BACKEND_HOSTS_TO_ONLINE}") - self.run_online = False if self.run_online: topic = Path(find_proposal(f'p{self.proposal:06}')).parts[-3] if (remote_host := ONLINE_HOSTS.get(topic)) is None: @@ -73,7 +69,6 @@ def __init__(self, context_dir=Path('.')): self.online_data_host = remote_host log.info("Processing online data? %s", self.run_online) - def __enter__(self): return self diff --git a/damnit/ctxsupport/ctxrunner.py b/damnit/ctxsupport/ctxrunner.py index bf8243e6..2c43a13b 100644 --- a/damnit/ctxsupport/ctxrunner.py +++ b/damnit/ctxsupport/ctxrunner.py @@ -14,15 +14,22 @@ import pickle import sys import time +from contextlib import contextmanager +from datetime import timezone import traceback from contextlib import contextmanager from datetime import timezone from enum import Enum from graphlib import CycleError, TopologicalSorter from pathlib import Path +from unittest.mock import MagicMock +from graphlib import CycleError, TopologicalSorter from subprocess import run from tempfile import TemporaryDirectory -from unittest.mock import MagicMock, patch +from unittest.mock import patch + +from matplotlib.axes import Axes +from matplotlib.figure import Figure import extra_data import h5py @@ -626,18 +633,23 @@ def filesystem(host='localhost'): with TemporaryDirectory() as td: try: - mount_command = ( - f"sshfs {host}:{extra_data.read_machinery.DATA_ROOT_DIR} {td} " - # deactivate password prompt to fail if we don't have a valid ssh key - "-o ssh_command='ssh -o PasswordAuthentication=no'" - ) - print('cmd:', mount_command) - run(mount_command, check=True, shell=True) + mount_command = [ + "/gpfs/exfel/sw/software/bin/sshfs", + f"{host}:{extra_data.read_machinery.DATA_ROOT_DIR}", str(td), + # deactivate password prompt and GSSAPI to fail fast if + # we don't have a valid ssh key + "-o", "PasswordAuthentication=no", "-o", "GSSAPIAuthentication=no", + # proxy through machine with 10G connection to the online cluster + "-o", "ProxyJump=10.255.34.101", + ] + res = run(mount_command, check=True) + if res.returncode != 0: + raise RuntimeError(res.stderr) with patch("extra_data.read_machinery.DATA_ROOT_DIR", td): yield finally: - run(f"fusermount -u {td}", check=True, shell=True) + run(["fusermount", "-u", str(td)], check=True) def execute_context(args): @@ -745,10 +757,6 @@ def main(argv=None): logging.basicConfig(level=logging.INFO) if args.subcmd == "exec": - if args.cluster_job and args.data_location != 'localhost': - # Only run cluster jobs with data on Maxwell - log.info('Skipping cluster jobs for remote data [%s].', args.data_location) - return with filesystem(args.data_location): execute_context(args) elif args.subcmd == "ctx": From de16f57a40a09cf5e38794a504f1d480ca919f05 Mon Sep 17 00:00:00 2001 From: Thomas Michelat <32831491+tmichela@users.noreply.github.com> Date: Mon, 15 Jul 2024 14:41:43 +0200 Subject: [PATCH 06/12] can run cluster variable on remote data --- damnit/backend/extract_data.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/damnit/backend/extract_data.py b/damnit/backend/extract_data.py index 45aa6605..1fff437f 100644 --- a/damnit/backend/extract_data.py +++ b/damnit/backend/extract_data.py @@ -229,9 +229,6 @@ def extract_and_ingest(self, proposal, run, cluster=False, log.info("Sent Kafka updates to topic %r", self.db.kafka_topic) # Launch a Slurm job if there are any 'cluster' variables to evaluate - if data_location != 'localhost': - log.info('Skipping cluster variables with remote data [%s].', data_location) - return ctx = self.ctx_whole.filter(run_data=run_data, name_matches=match, cluster=cluster) ctx_slurm = self.ctx_whole.filter(run_data=run_data, name_matches=match, cluster=True) if set(ctx_slurm.vars) > set(ctx.vars): From 91f439b5ac56761c74dbb09b87c2f64df133b1e4 Mon Sep 17 00:00:00 2001 From: Thomas Michelat <32831491+tmichela@users.noreply.github.com> Date: Mon, 15 Jul 2024 14:44:29 +0200 Subject: [PATCH 07/12] Update main_window.py --- damnit/gui/main_window.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/damnit/gui/main_window.py b/damnit/gui/main_window.py index d6297fe8..b95ac934 100644 --- a/damnit/gui/main_window.py +++ b/damnit/gui/main_window.py @@ -951,6 +951,8 @@ def prompt_setup_db_and_backend(context_dir: Path, prop_no=None, parent=None): if gethostname().startswith('exflonc'): # prevent starting the backend on the online cluster QMessageBox.warning( + None, + "Backend setup failed", "Running the DAMNIT backend on the online cluster is not allowed. " "Please, open the damint GUI on Maxwell instead and retry." ) From ee21679b09ef96ca1bfd10633f1a330fef7b1e08 Mon Sep 17 00:00:00 2001 From: tmichela Date: Sun, 11 Aug 2024 14:32:23 +0200 Subject: [PATCH 08/12] do not read the virtual overview file online because the links are absolute and break with using the mount point --- damnit/backend/extraction_control.py | 5 ++++- damnit/backend/listener.py | 2 +- damnit/ctxsupport/ctxrunner.py | 5 ++++- 3 files changed, 9 insertions(+), 3 deletions(-) diff --git a/damnit/backend/extraction_control.py b/damnit/backend/extraction_control.py index adfb1070..e67db63d 100644 --- a/damnit/backend/extraction_control.py +++ b/damnit/backend/extraction_control.py @@ -85,12 +85,14 @@ class ExtractionRequest: match: tuple = () mock: bool = False update_vars: bool = True + data_location: str = 'localhost' def python_cmd(self): """Creates the command for a process to do this extraction""" cmd = [ sys.executable, '-m', 'damnit.backend.extract_data', - str(self.proposal), str(self.run), self.run_data.value + str(self.proposal), str(self.run), self.run_data.value, + '--data-location', self.data_location, ] if self.cluster: cmd.append('--cluster-job') @@ -100,6 +102,7 @@ def python_cmd(self): cmd.append('--mock') if self.update_vars: cmd.append('--update-vars') + return cmd diff --git a/damnit/backend/listener.py b/damnit/backend/listener.py index 37fd0e27..e22d0678 100644 --- a/damnit/backend/listener.py +++ b/damnit/backend/listener.py @@ -124,7 +124,7 @@ def handle_event(self, record, msg: dict, run_data: RunData, self.db.ensure_run(proposal, run, record.timestamp / 1000) log.info(f"Added p%d r%d ({run_data.value} data) to database", proposal, run) - req = ExtractionRequest(run, proposal, run_data, data_location) + req = ExtractionRequest(run, proposal, run_data, data_location=data_location) self.submitter.submit(req) diff --git a/damnit/ctxsupport/ctxrunner.py b/damnit/ctxsupport/ctxrunner.py index 2c43a13b..c6b774aa 100644 --- a/damnit/ctxsupport/ctxrunner.py +++ b/damnit/ctxsupport/ctxrunner.py @@ -688,7 +688,10 @@ def execute_context(args): # Make sure that we always select the most data possible, so proc # variables have access to raw data too. actual_run_data = RunData.ALL if run_data == RunData.PROC else run_data - run_dc = extra_data.open_run(args.proposal, args.run, data=actual_run_data.value) + run_dc = extra_data.open_run( + args.proposal, args.run, data=actual_run_data.value, + _use_voview=args.data_location=='localhost' + ) res = ctx.execute(run_dc, args.run, args.proposal, input_vars={}) From 156555489b00330634fcc365dc145de074e4c54a Mon Sep 17 00:00:00 2001 From: tmichela Date: Wed, 11 Sep 2024 14:34:04 +0200 Subject: [PATCH 09/12] fix conflicts --- damnit/backend/extract_data.py | 12 ++++++------ damnit/backend/extraction_control.py | 4 ++-- damnit/ctxsupport/ctxrunner.py | 6 +++--- 3 files changed, 11 insertions(+), 11 deletions(-) diff --git a/damnit/backend/extract_data.py b/damnit/backend/extract_data.py index e9180d2c..64db1459 100644 --- a/damnit/backend/extract_data.py +++ b/damnit/backend/extract_data.py @@ -41,13 +41,13 @@ def run_in_subprocess(args, **kwargs): def extract_in_subprocess( proposal, run, out_path, cluster=False, run_data=RunData.ALL, match=(), - variables=(), python_exe=None, mock=False, data_location='localhost', + variables=(), python_exe=None, mock=False, mount_host=None, ): if not python_exe: python_exe = sys.executable args = [python_exe, '-m', 'ctxrunner', 'exec', str(proposal), str(run), run_data.value, - '--save', out_path, '--data-location', data_location] + '--save', out_path, '--mount-host', mount_host] if cluster: args.append('--cluster-job') if mock: @@ -199,7 +199,7 @@ def update_db_vars(self): def extract_and_ingest(self, proposal, run, cluster=False, run_data=RunData.ALL, match=(), variables=(), mock=False, - data_location='localhost'): + mount_host=None): if proposal is None: proposal = self.db.metameta['proposal'] @@ -212,7 +212,7 @@ def extract_and_ingest(self, proposal, run, cluster=False, reduced_data = extract_in_subprocess( proposal, run, out_path, cluster=cluster, run_data=run_data, match=match, variables=variables, python_exe=python_exe, mock=mock, - data_location=data_location, + mount_host=mount_host, ) log.info("Reduced data has %d fields", len(reduced_data)) add_to_db(reduced_data, self.db, proposal, run) @@ -284,9 +284,9 @@ def main(argv=None): cluster=args.cluster_job, run_data=RunData(args.run_data), match=args.match, - mount_host=args.mount_host) variables=args.var, - mock=args.mock) + mock=args.mock, + mount_host=args.mount_host) if __name__ == '__main__': diff --git a/damnit/backend/extraction_control.py b/damnit/backend/extraction_control.py index e80f1992..3d947925 100644 --- a/damnit/backend/extraction_control.py +++ b/damnit/backend/extraction_control.py @@ -86,14 +86,14 @@ class ExtractionRequest: variables: tuple = () # Overrides match if present mock: bool = False update_vars: bool = True - data_location: str = 'localhost' + mount_host: str = None def python_cmd(self): """Creates the command for a process to do this extraction""" cmd = [ sys.executable, '-m', 'damnit.backend.extract_data', str(self.proposal), str(self.run), self.run_data.value, - '--data-location', self.data_location, + '--mount-host', self.mount_host, ] if self.cluster: cmd.append('--cluster-job') diff --git a/damnit/ctxsupport/ctxrunner.py b/damnit/ctxsupport/ctxrunner.py index a22953e7..3f69b634 100644 --- a/damnit/ctxsupport/ctxrunner.py +++ b/damnit/ctxsupport/ctxrunner.py @@ -626,14 +626,14 @@ def save_hdf5(self, hdf5_path, reduced_only=False): @contextmanager -def filesystem(host='localhost'): +def filesystem(host=None): """Mount remote proposal data with sshfs mount `/gpfs/exfel/exp/` from `host` and patch `extra_data.read_machinery.DATA_ROOT_DIR` to use it instead of the local one. Opening a file or a run directory with extra_data will open the remote data. """ - if host == 'localhost': + if host is None: yield return @@ -758,7 +758,7 @@ def main(argv=None): exec_ap.add_argument('--var', action="append", default=[]) exec_ap.add_argument('--save', action='append', default=[]) exec_ap.add_argument('--save-reduced', action='append', default=[]) - exec_ap.add_argument('--data-location', default='localhost', help=argparse.SUPPRESS) + exec_ap.add_argument('--mount-host', help=argparse.SUPPRESS) ctx_ap = subparsers.add_parser("ctx", help="Evaluate context file and pickle it to a file") ctx_ap.add_argument("context_file", type=Path) From 9c80b2261364a844b416ff6097e47567ba60576a Mon Sep 17 00:00:00 2001 From: tmichela Date: Wed, 11 Sep 2024 14:39:34 +0200 Subject: [PATCH 10/12] sort imports --- damnit/backend/extract_data.py | 5 ++--- damnit/backend/extraction_control.py | 2 +- damnit/ctxsupport/ctxrunner.py | 12 +++--------- damnit/gui/main_window.py | 9 +++++---- 4 files changed, 11 insertions(+), 17 deletions(-) diff --git a/damnit/backend/extract_data.py b/damnit/backend/extract_data.py index 64db1459..a44bcfce 100644 --- a/damnit/backend/extract_data.py +++ b/damnit/backend/extract_data.py @@ -6,8 +6,8 @@ """ import argparse import copy -import os import logging +import os import pickle import re import socket @@ -18,12 +18,11 @@ import h5py import numpy as np - from kafka import KafkaProducer from ..context import ContextFile, RunData from ..definitions import UPDATE_BROKERS -from .db import DamnitDB, ReducedData, BlobTypes, MsgKind, msg_dict +from .db import BlobTypes, DamnitDB, MsgKind, ReducedData, msg_dict from .extraction_control import ExtractionRequest, ExtractionSubmitter log = logging.getLogger(__name__) diff --git a/damnit/backend/extraction_control.py b/damnit/backend/extraction_control.py index 3d947925..61476b2c 100644 --- a/damnit/backend/extraction_control.py +++ b/damnit/backend/extraction_control.py @@ -16,8 +16,8 @@ from extra_data.read_machinery import find_proposal -from .db import DamnitDB from ..context import RunData +from .db import DamnitDB log = logging.getLogger(__name__) diff --git a/damnit/ctxsupport/ctxrunner.py b/damnit/ctxsupport/ctxrunner.py index 3f69b634..452534b4 100644 --- a/damnit/ctxsupport/ctxrunner.py +++ b/damnit/ctxsupport/ctxrunner.py @@ -14,22 +14,15 @@ import pickle import sys import time -from contextlib import contextmanager -from datetime import timezone import traceback from contextlib import contextmanager from datetime import timezone from enum import Enum from graphlib import CycleError, TopologicalSorter from pathlib import Path -from unittest.mock import MagicMock -from graphlib import CycleError, TopologicalSorter from subprocess import run from tempfile import TemporaryDirectory -from unittest.mock import patch - -from matplotlib.axes import Axes -from matplotlib.figure import Figure +from unittest.mock import MagicMock, patch import extra_data import h5py @@ -38,7 +31,8 @@ import xarray as xr import yaml from damnit_ctx import Cell, RunData, Variable, isinstance_no_import - +from matplotlib.axes import Axes +from matplotlib.figure import Figure log = logging.getLogger(__name__) diff --git a/damnit/gui/main_window.py b/damnit/gui/main_window.py index 42844a5a..0a4623dd 100644 --- a/damnit/gui/main_window.py +++ b/damnit/gui/main_window.py @@ -14,22 +14,23 @@ from PyQt5 import QtCore, QtGui, QtSvg, QtWidgets from PyQt5.Qsci import QsciLexerPython, QsciScintilla from PyQt5.QtCore import Qt +from PyQt5.QtQuick import QQuickWindow, QSGRendererInterface from PyQt5.QtWebEngineWidgets import QWebEngineProfile from PyQt5.QtWidgets import QFileDialog, QMessageBox, QTabWidget -from PyQt5.QtQuick import QQuickWindow, QSGRendererInterface from ..api import DataType, RunVariables from ..backend import backend_is_running, initialize_and_start_backend from ..backend.db import DamnitDB, MsgKind, ReducedData, db_path -from ..backend.extraction_control import process_log_path, ExtractionSubmitter +from ..backend.extraction_control import ExtractionSubmitter, process_log_path from ..backend.user_variables import UserEditableVariable from ..definitions import UPDATE_BROKERS from ..util import StatusbarStylesheet, fix_data_for_plotting, icon_path from .editor import ContextTestResult, Editor from .kafka import UpdateAgent -from .open_dialog import OpenDBDialog from .new_context_dialog import NewContextFileDialog -from .plot import ImagePlotWindow, ScatterPlotWindow, Xarray1DPlotWindow, PlottingControls +from .open_dialog import OpenDBDialog +from .plot import (ImagePlotWindow, PlottingControls, ScatterPlotWindow, + Xarray1DPlotWindow) from .process import ProcessingDialog from .table import DamnitTableModel, TableView, prettify_notation from .user_variables import AddUserVariableDialog From f9e4dd51c2fd0ac18a613eca6649c7f863b29a3f Mon Sep 17 00:00:00 2001 From: tmichela Date: Wed, 11 Sep 2024 14:44:41 +0200 Subject: [PATCH 11/12] missing import --- damnit/gui/main_window.py | 1 + 1 file changed, 1 insertion(+) diff --git a/damnit/gui/main_window.py b/damnit/gui/main_window.py index 0a4623dd..58ad6a94 100644 --- a/damnit/gui/main_window.py +++ b/damnit/gui/main_window.py @@ -6,6 +6,7 @@ from datetime import datetime from enum import Enum from pathlib import Path +from socket import gethostname import h5py import numpy as np From 63fe25626f6032f0e4757f9bbdc4183fcdbba095 Mon Sep 17 00:00:00 2001 From: tmichela Date: Wed, 11 Sep 2024 15:06:32 +0200 Subject: [PATCH 12/12] bugfixes, update tests --- damnit/backend/extract_data.py | 4 +++- damnit/backend/extraction_control.py | 3 ++- damnit/ctxsupport/ctxrunner.py | 2 +- tests/test_backend.py | 4 ++-- 4 files changed, 8 insertions(+), 5 deletions(-) diff --git a/damnit/backend/extract_data.py b/damnit/backend/extract_data.py index a44bcfce..c7023c50 100644 --- a/damnit/backend/extract_data.py +++ b/damnit/backend/extract_data.py @@ -46,7 +46,7 @@ def extract_in_subprocess( python_exe = sys.executable args = [python_exe, '-m', 'ctxrunner', 'exec', str(proposal), str(run), run_data.value, - '--save', out_path, '--mount-host', mount_host] + '--save', out_path] if cluster: args.append('--cluster-job') if mock: @@ -57,6 +57,8 @@ def extract_in_subprocess( else: for m in match: args.extend(['--match', m]) + if mount_host: + args.extend(['--mount-host', mount_host]) with TemporaryDirectory() as td: # Save a separate copy of the reduced data, so we can send an update diff --git a/damnit/backend/extraction_control.py b/damnit/backend/extraction_control.py index 61476b2c..ae9026b4 100644 --- a/damnit/backend/extraction_control.py +++ b/damnit/backend/extraction_control.py @@ -93,7 +93,6 @@ def python_cmd(self): cmd = [ sys.executable, '-m', 'damnit.backend.extract_data', str(self.proposal), str(self.run), self.run_data.value, - '--mount-host', self.mount_host, ] if self.cluster: cmd.append('--cluster-job') @@ -107,6 +106,8 @@ def python_cmd(self): cmd.append('--mock') if self.update_vars: cmd.append('--update-vars') + if self.mount_host: + cmd.extend(['--mount-host', self.mount_host]) return cmd diff --git a/damnit/ctxsupport/ctxrunner.py b/damnit/ctxsupport/ctxrunner.py index 452534b4..9e0eada9 100644 --- a/damnit/ctxsupport/ctxrunner.py +++ b/damnit/ctxsupport/ctxrunner.py @@ -677,7 +677,7 @@ def execute_context(args): ctx_whole.check() ctx = ctx_whole.filter( run_data=run_data, cluster=args.cluster_job, name_matches=args.match, - variables=arg.var, + variables=args.var, ) log.info("Using %d variables (of %d) from context file %s", len(ctx.vars), len(ctx_whole.vars), diff --git a/tests/test_backend.py b/tests/test_backend.py index 58714b66..74234076 100644 --- a/tests/test_backend.py +++ b/tests/test_backend.py @@ -644,7 +644,7 @@ def slurm_scalar(run): # Helper function to raise an exception when proc data isn't available, like # open_run(data="proc") would. - def mock_open_run(*_, data=None): + def mock_open_run(*_, data=None, _use_voview=None): if data == "proc": raise FileNotFoundError("foo.h5") else: @@ -678,7 +678,7 @@ def mock_open_run(*_, data=None): patch("ctxrunner.Results"): main(["exec", "1234", "42", "proc"]) - open_run.assert_called_with(1234, 42, data="all") + open_run.assert_called_with(1234, 42, data="all", _use_voview=True) def test_custom_environment(mock_db, venv, monkeypatch, qtbot): db_dir, db = mock_db