Skip to content

Commit

Permalink
mount remote filesystem with sshfs
Browse files Browse the repository at this point in the history
clean debug code
  • Loading branch information
tmichela committed May 31, 2024
1 parent bb7ad08 commit 0617b50
Show file tree
Hide file tree
Showing 3 changed files with 166 additions and 87 deletions.
18 changes: 14 additions & 4 deletions damnit/backend/extract_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ def innetgr(netgroup: bytes, host=None, user=None, domain=None):
libc = CDLL("libc.so.6")
return bool(libc.innetgr(netgroup, host, user, domain))


def default_slurm_partition():
username = getpass.getuser().encode()
if innetgr(b'exfel-wgs-users', user=username):
Expand All @@ -43,6 +44,7 @@ def default_slurm_partition():
return 'upex'
return 'all'


def run_in_subprocess(args, **kwargs):
env = os.environ.copy()
ctxsupport_dir = str(Path(__file__).parents[1] / 'ctxsupport')
Expand All @@ -52,6 +54,7 @@ def run_in_subprocess(args, **kwargs):

return subprocess.run(args, env=env, **kwargs)


def process_log_path(run, proposal, ctx_dir=Path('.'), create=True):
p = ctx_dir.absolute() / 'process_logs' / f"r{run}-p{proposal}.out"
if create:
Expand Down Expand Up @@ -89,13 +92,13 @@ def loop():

def extract_in_subprocess(
proposal, run, out_path, cluster=False, run_data=RunData.ALL, match=(),
python_exe=None, mock=False, tee_output=None
python_exe=None, mock=False, tee_output=None, data_location='localhost',
):
if not python_exe:
python_exe = sys.executable

args = [python_exe, '-m', 'ctxrunner', 'exec', str(proposal), str(run), run_data.value,
'--save', out_path]
'--save', out_path, '--data-location', data_location]
if cluster:
args.append('--cluster-job')
if mock:
Expand Down Expand Up @@ -246,7 +249,8 @@ def slurm_options(self):
return opts

def extract_and_ingest(self, proposal, run, cluster=False,
run_data=RunData.ALL, match=(), mock=False, tee_output=None):
run_data=RunData.ALL, match=(), mock=False, tee_output=None,
data_location='localhost'):
if proposal is None:
proposal = self.proposal

Expand All @@ -261,6 +265,7 @@ def extract_and_ingest(self, proposal, run, cluster=False,
reduced_data = extract_in_subprocess(
proposal, run, out_path, cluster=cluster, run_data=run_data,
match=match, python_exe=python_exe, mock=mock, tee_output=tee_output,
data_location=data_location,
)
log.info("Reduced data has %d fields", len(reduced_data))
add_to_db(reduced_data, self.db, proposal, run)
Expand All @@ -274,6 +279,9 @@ def extract_and_ingest(self, proposal, run, cluster=False,
log.info("Sent Kafka update to topic %r", self.db.kafka_topic)

# Launch a Slurm job if there are any 'cluster' variables to evaluate
if data_location != 'localhost':
log.info('Skipping cluster variables with remote data [%s].', data_location)
return
ctx = self.ctx_whole.filter(run_data=run_data, name_matches=match, cluster=cluster)
ctx_slurm = self.ctx_whole.filter(run_data=run_data, name_matches=match, cluster=True)
if set(ctx_slurm.vars) > set(ctx.vars):
Expand Down Expand Up @@ -374,6 +382,7 @@ def reprocess(runs, proposal=None, match=(), mock=False):
ap.add_argument('run_data', choices=('raw', 'proc', 'all'))
ap.add_argument('--cluster-job', action="store_true")
ap.add_argument('--match', action="append", default=[])
ap.add_argument('--data-location', default='localhost')
args = ap.parse_args()
logging.basicConfig(level=logging.INFO,
format="%(asctime)s %(levelname)s %(name)s: %(message)s")
Expand All @@ -389,4 +398,5 @@ def reprocess(runs, proposal=None, match=(), mock=False):
Extractor().extract_and_ingest(args.proposal, args.run,
cluster=args.cluster_job,
run_data=RunData(args.run_data),
match=args.match)
match=args.match,
data_location=args.data_location)
73 changes: 47 additions & 26 deletions damnit/backend/listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,25 +10,30 @@
from socket import gethostname
from threading import Thread

from extra_data.read_machinery import find_proposal
from kafka import KafkaConsumer

from .db import DamnitDB
from .extract_data import RunData, process_log_path

# For now, the migration & calibration events come via DESY's Kafka brokers,
# but the AMORE updates go via XFEL's test instance.
CONSUMER_ID = 'xfel-da-amore-prototype-{}'
KAFKA_CONF = {
'maxwell': {
'brokers': ['exflwgs06:9091'],
'topics': ["test.r2d2", "cal.offline-corrections"],
'events': ["migration_complete", "run_corrections_complete"],
},
'onc': {
'brokers': ['exflwgs06:9091'],
'topics': ['test.euxfel.hed.daq', 'test.euxfel.hed.cal'],
'events': ['daq_run_complete', 'online_correction_complete'],
}
CONSUMER_ID = "xfel-da-amore-prototype-{}"
KAFKA_BROKERS = ["exflwgs06:9091"]
KAFKA_TOPICS = ["test.r2d2", "cal.offline-corrections", "test.euxfel.hed.daq", "test.euxfel.hed.cal"]
KAFKA_EVENTS = ["migration_complete", "run_corrections_complete", "daq_run_complete", "online_correction_complete"]
BACKEND_HOSTS_TO_ONLINE = ['max-exfl-display003.desy.de', 'max-exfl-display004.desy.de']
ONLINE_HOSTS ={
'FXE': 'sa1-onc-fxe.desy.de',
'HED': 'sa2-onc-hed.desy.de',
'MID': 'sa2-onc-mid.desy.de',
# 'SA1': '',
# 'SA2': '',
# 'SA3': '',
'SCS': 'sa3-onc-scs.desy.de',
'SPB': 'sa1-onc-spb.desy.de',
'SQS': 'sa3-onc-sqs.desy.de',
'SXP': 'sa3-onc-sxp.desy.de',
}

log = logging.getLogger(__name__)
Expand Down Expand Up @@ -73,20 +78,30 @@ def __init__(self, context_dir=Path('.')):
# Fail fast if read-only - https://stackoverflow.com/a/44707371/434217
self.db.conn.execute("pragma user_version=0;")
self.proposal = self.db.metameta['proposal']
log.info(f"Will watch for events from proposal {self.proposal}")

if gethostname().startswith('exflonc'):
# running on the online cluster
kafka_conf = KAFKA_CONF['onc']
else:
kafka_conf = KAFKA_CONF['maxwell']
log.info(f"Will watch for events from proposal {self.proposal}")

consumer_id = CONSUMER_ID.format(self.db.metameta['db_id'])
self.kafka_cns = KafkaConsumer(*kafka_conf['topics'],
bootstrap_servers=kafka_conf['brokers'],
self.kafka_cns = KafkaConsumer(*KAFKA_TOPICS,
bootstrap_servers=KAFKA_BROKERS,
group_id=consumer_id)
self.events = kafka_conf['events']

# check backend host and connection to online cluster
self.online_data_host = None
self.run_online = self.db.metameta.get('run_online', False) is not False
if self.run_online and gethostname() not in BACKEND_HOSTS_TO_ONLINE:
log.warning(f"Disabled online processing, the backend must run on one of: {BACKEND_HOSTS_TO_ONLINE}")
self.run_online = False
if self.run_online:
topic = Path(find_proposal(f'p{self.proposal:06}')).parts[-3]
if (remote_host := ONLINE_HOSTS.get(topic)) is None:
log.warn(f"Can't run online processing for topic '{topic}'")
self.run_online = False
else:
self.online_data_host = remote_host
log.debug("Processing online data? %s", self.run_online)

# Monitor thread for subprocesses
self.extract_procs_queue = queue.Queue()
self.extract_procs_watcher = Thread(
target=watch_processes_finish,
Expand All @@ -113,25 +128,28 @@ def run(self):
def _process_kafka_event(self, record):
msg = json.loads(record.value.decode())
event = msg.get('event')
if event in self.events:
if event in KAFKA_EVENTS:
log.debug("Processing %s event from Kafka", event)
getattr(self, f'handle_{event}')(record, msg)
else:
log.debug("Unexpected %s event from Kafka", event)

def handle_daq_run_complete(self, record, msg: dict):
self.handle_event(record, msg, RunData.RAW)
if self.run_online:
self.handle_event(record, msg, RunData.RAW, self.online_data_host)

def handle_online_correction_complete(self, record, msg: dict):
self.handle_event(record, msg, RunData.PROC)
if self.run_online:
self.handle_event(record, msg, RunData.PROC, self.online_data_host)

def handle_migration_complete(self, record, msg: dict):
self.handle_event(record, msg, RunData.RAW)

def handle_run_corrections_complete(self, record, msg: dict):
self.handle_event(record, msg, RunData.PROC)

def handle_event(self, record, msg: dict, run_data: RunData):
def handle_event(self, record, msg: dict, run_data: RunData,
data_location: str = "localhost"):
proposal = int(msg['proposal'])
run = int(msg['run'])

Expand All @@ -149,10 +167,12 @@ def handle_event(self, record, msg: dict, run_data: RunData):
# Create subprocess to process the run
extract_proc = subprocess.Popen([
sys.executable, '-m', 'damnit.backend.extract_data',
str(proposal), str(run), run_data.value
str(proposal), str(run), run_data.value,
'--data-location', data_location,
], cwd=self.context_dir, stdout=logf, stderr=subprocess.STDOUT)
self.extract_procs_queue.put((proposal, run, extract_proc))


def listen():
# Set up logging to a file
file_handler = logging.FileHandler("amore.log")
Expand All @@ -177,5 +197,6 @@ def listen():
if os.stat("amore.log").st_uid == os.getuid():
os.chmod("amore.log", 0o666)


if __name__ == '__main__':
listen()
Loading

0 comments on commit 0617b50

Please sign in to comment.