Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mount remote filesystem with sshfs #255

Open
wants to merge 14 commits into
base: master
Choose a base branch
from
13 changes: 8 additions & 5 deletions damnit/backend/extract_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,13 @@ def run_in_subprocess(args, **kwargs):

def extract_in_subprocess(
proposal, run, out_path, cluster=False, run_data=RunData.ALL, match=(),
python_exe=None, mock=False
python_exe=None, mock=False, data_location='localhost',
):
if not python_exe:
python_exe = sys.executable

args = [python_exe, '-m', 'ctxrunner', 'exec', str(proposal), str(run), run_data.value,
'--save', out_path]
'--save', out_path, '--data-location', data_location]
if cluster:
args.append('--cluster-job')
if mock:
Expand Down Expand Up @@ -194,7 +194,8 @@ def update_db_vars(self):
self.kafka_prd.flush()

def extract_and_ingest(self, proposal, run, cluster=False,
run_data=RunData.ALL, match=(), mock=False):
run_data=RunData.ALL, match=(), mock=False,
data_location='localhost'):
if proposal is None:
proposal = self.db.metameta['proposal']

Expand All @@ -206,7 +207,7 @@ def extract_and_ingest(self, proposal, run, cluster=False,
python_exe = self.db.metameta.get('context_python', '')
reduced_data = extract_in_subprocess(
proposal, run, out_path, cluster=cluster, run_data=run_data,
match=match, python_exe=python_exe, mock=mock,
match=match, python_exe=python_exe, mock=mock, data_location=data_location,
)
log.info("Reduced data has %d fields", len(reduced_data))
add_to_db(reduced_data, self.db, proposal, run)
Expand Down Expand Up @@ -248,6 +249,7 @@ def main(argv=None):
# variables (confusing because all extraction now runs in cluster jobs)
ap.add_argument('--cluster-job', action="store_true")
ap.add_argument('--match', action="append", default=[])
ap.add_argument('--data-location', default='localhost', help=argparse.SUPPRESS)
ap.add_argument('--mock', action='store_true')
ap.add_argument('--update-vars', action='store_true')
args = ap.parse_args(argv)
Expand All @@ -272,7 +274,8 @@ def main(argv=None):
cluster=args.cluster_job,
run_data=RunData(args.run_data),
match=args.match,
mock=args.mock)
mock=args.mock,
data_location=args.data_location)


if __name__ == '__main__':
Expand Down
5 changes: 4 additions & 1 deletion damnit/backend/extraction_control.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,12 +85,14 @@ class ExtractionRequest:
match: tuple = ()
mock: bool = False
update_vars: bool = True
data_location: str = 'localhost'
tmichela marked this conversation as resolved.
Show resolved Hide resolved

def python_cmd(self):
"""Creates the command for a process to do this extraction"""
cmd = [
sys.executable, '-m', 'damnit.backend.extract_data',
str(self.proposal), str(self.run), self.run_data.value
str(self.proposal), str(self.run), self.run_data.value,
'--data-location', self.data_location,
]
if self.cluster:
cmd.append('--cluster-job')
Expand All @@ -100,6 +102,7 @@ def python_cmd(self):
cmd.append('--mock')
if self.update_vars:
cmd.append('--update-vars')

return cmd


Expand Down
74 changes: 45 additions & 29 deletions damnit/backend/listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from pathlib import Path
from socket import gethostname

from extra_data.read_machinery import find_proposal
from kafka import KafkaConsumer

from ..context import RunData
Expand All @@ -14,18 +15,21 @@

# For now, the migration & calibration events come via DESY's Kafka brokers,
# but the AMORE updates go via XFEL's test instance.
CONSUMER_ID = 'xfel-da-amore-prototype-{}'
KAFKA_CONF = {
'maxwell': {
'brokers': ['exflwgs06:9091'],
'topics': ["test.r2d2", "cal.offline-corrections"],
'events': ["migration_complete", "run_corrections_complete"],
},
'onc': {
'brokers': ['exflwgs06:9091'],
'topics': ['test.euxfel.hed.daq', 'test.euxfel.hed.cal'],
'events': ['daq_run_complete', 'online_correction_complete'],
}
CONSUMER_ID = "xfel-da-amore-prototype-{}"
KAFKA_BROKERS = ["exflwgs06:9091"]
KAFKA_TOPICS = ["test.r2d2", "cal.offline-corrections", "test.euxfel.hed.daq", "test.euxfel.hed.cal"]
KAFKA_EVENTS = ["migration_complete", "run_corrections_complete", "daq_run_complete", "online_correction_complete"]
ONLINE_HOSTS ={
'FXE': 'sa1-onc-fxe.desy.de',
'HED': 'sa2-onc-hed.desy.de',
'MID': 'sa2-onc-mid.desy.de',
# 'SA1': '',
# 'SA2': '',
# 'SA3': '',
'SCS': 'sa3-onc-scs.desy.de',
'SPB': 'sa1-onc-spb.desy.de',
'SQS': 'sa3-onc-sqs.desy.de',
'SXP': 'sa3-onc-sxp.desy.de',
}

log = logging.getLogger(__name__)
Expand All @@ -34,28 +38,36 @@
class EventProcessor:

def __init__(self, context_dir=Path('.')):
if gethostname().startswith('exflonc'):
log.warning('Running the DAMNIT listener on the online cluster is not allowed')
exit(1)

self.context_dir = context_dir
self.db = DamnitDB.from_dir(context_dir)
self.submitter = ExtractionSubmitter(context_dir, self.db)
# Fail fast if read-only - https://stackoverflow.com/a/44707371/434217
self.db.conn.execute("pragma user_version=0;")
self.proposal = self.db.metameta['proposal']
log.info(f"Will watch for events from proposal {self.proposal}")

if gethostname().startswith('exflonc'):
# running on the online cluster
kafka_conf = KAFKA_CONF['onc']
else:
kafka_conf = KAFKA_CONF['maxwell']
tmichela marked this conversation as resolved.
Show resolved Hide resolved
log.info(f"Will watch for events from proposal {self.proposal}")

consumer_id = CONSUMER_ID.format(self.db.metameta['db_id'])
self.kafka_cns = KafkaConsumer(*kafka_conf['topics'],
bootstrap_servers=kafka_conf['brokers'],
self.kafka_cns = KafkaConsumer(*KAFKA_TOPICS,
bootstrap_servers=KAFKA_BROKERS,
group_id=consumer_id,
consumer_timeout_ms=600_000,
)
self.events = kafka_conf['events']

consumer_timeout_ms=600_000,)

# check backend host and connection to online cluster
self.online_data_host = None
self.run_online = self.db.metameta.get('run_online', False) is not False
if self.run_online:
topic = Path(find_proposal(f'p{self.proposal:06}')).parts[-3]
if (remote_host := ONLINE_HOSTS.get(topic)) is None:
log.warning(f"Can't run online processing for topic '{topic}'")
self.run_online = False
else:
self.online_data_host = remote_host
log.info("Processing online data? %s", self.run_online)

def __enter__(self):
return self
Expand All @@ -81,25 +93,28 @@ def run(self):
def _process_kafka_event(self, record):
msg = json.loads(record.value.decode())
event = msg.get('event')
if event in self.events:
if event in KAFKA_EVENTS:
log.debug("Processing %s event from Kafka", event)
getattr(self, f'handle_{event}')(record, msg)
else:
log.debug("Unexpected %s event from Kafka", event)

def handle_daq_run_complete(self, record, msg: dict):
self.handle_event(record, msg, RunData.RAW)
if self.run_online:
self.handle_event(record, msg, RunData.RAW, self.online_data_host)

def handle_online_correction_complete(self, record, msg: dict):
self.handle_event(record, msg, RunData.PROC)
if self.run_online:
self.handle_event(record, msg, RunData.PROC, self.online_data_host)

def handle_migration_complete(self, record, msg: dict):
self.handle_event(record, msg, RunData.RAW)

def handle_run_corrections_complete(self, record, msg: dict):
self.handle_event(record, msg, RunData.PROC)

def handle_event(self, record, msg: dict, run_data: RunData):
def handle_event(self, record, msg: dict, run_data: RunData,
data_location: str = "localhost"):
proposal = int(msg['proposal'])
run = int(msg['run'])

Expand All @@ -109,7 +124,7 @@ def handle_event(self, record, msg: dict, run_data: RunData):
self.db.ensure_run(proposal, run, record.timestamp / 1000)
log.info(f"Added p%d r%d ({run_data.value} data) to database", proposal, run)

req = ExtractionRequest(run, proposal, run_data)
req = ExtractionRequest(run, proposal, run_data, data_location=data_location)
self.submitter.submit(req)


Expand Down Expand Up @@ -137,5 +152,6 @@ def listen():
if os.stat("amore.log").st_uid == os.getuid():
os.chmod("amore.log", 0o666)


if __name__ == '__main__':
listen()
Loading