From 1f52c4b0eeb4f865616498408e6b0e6ad4c0ca66 Mon Sep 17 00:00:00 2001 From: JamesWrigley Date: Tue, 22 Feb 2022 18:05:45 +0100 Subject: [PATCH 1/4] Refactor the progress bar to show a percentage instead of file count Also removed the `suffix` argument from progress_bar() and `nproblems` from RunValidator.progress(). --- extra_data/validation.py | 21 +++++++++++---------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/extra_data/validation.py b/extra_data/validation.py index 3cf02c44..a9e3428f 100644 --- a/extra_data/validation.py +++ b/extra_data/validation.py @@ -212,10 +212,10 @@ def check_index_contiguous(firsts, counts, record): )) -def progress_bar(done, total, suffix=' '): - line = f'Progress: {done}/{total}{suffix}[{{}}]' +def progress_bar(done): + line = f'Progress: {done * 100:.2f}% [{{}}]' length = min(get_terminal_size().columns - len(line), 50) - filled = int(length * done // total) + filled = int(length * done) bar = '#' * filled + ' ' * (length - filled) return line.format(bar) @@ -245,6 +245,7 @@ def __init__(self, run_dir: str, term_progress=False): self.filenames = [f for f in os.listdir(run_dir) if f.endswith('.h5')] self.file_accesses = [] self.problems = [] + self.progress_stages = 1 def validate(self): problems = self.run_checks() @@ -253,17 +254,17 @@ def validate(self): def run_checks(self): self.problems = [] - self.check_files() + self.check_files(progress_stage=1) self.check_files_map() return self.problems - def progress(self, done, total, nproblems, badfiles): + def progress(self, stage_done, stage_total, stage, badfiles=None): """Show progress information""" if not self.term_progress: return - lines = progress_bar(done, total) - lines += f'\n{nproblems} problems' + lines = progress_bar((stage_done / stage_total + stage - 1) / self.progress_stages) + lines += f'\n{len(self.problems)} problems' if badfiles: lines += f' in {len(badfiles)} files (last: {badfiles[-1]})' if sys.stderr.isatty(): @@ -273,7 +274,7 @@ def progress(self, done, total, nproblems, badfiles): else: print(lines, file=sys.stderr) - def check_files(self): + def check_files(self, progress_stage): self.file_accesses = [] def initializer(): @@ -283,7 +284,7 @@ def initializer(): filepaths = [(self.run_dir, fn) for fn in sorted(self.filenames)] nfiles = len(self.filenames) badfiles = [] - self.progress(0, nfiles, 0, badfiles) + self.progress(0, nfiles, progress_stage, badfiles) with Pool(initializer=initializer) as pool: iterator = pool.imap_unordered(_check_file, filepaths) @@ -293,7 +294,7 @@ def initializer(): badfiles.append(fname) if fa is not None: self.file_accesses.append(fa) - self.progress(done, nfiles, len(self.problems), badfiles) + self.progress(done, nfiles, progress_stage, badfiles) if not self.file_accesses: self.problems.append( From ff7b8d6b99fb9ba13ea5a1e1d96dffedb7ebe27d Mon Sep 17 00:00:00 2001 From: JamesWrigley Date: Tue, 22 Feb 2022 18:08:05 +0100 Subject: [PATCH 2/4] Remove old argument from docstring --- extra_data/tests/mockdata/base.py | 1 - 1 file changed, 1 deletion(-) diff --git a/extra_data/tests/mockdata/base.py b/extra_data/tests/mockdata/base.py index 84a85e7e..38b18b05 100644 --- a/extra_data/tests/mockdata/base.py +++ b/extra_data/tests/mockdata/base.py @@ -21,7 +21,6 @@ def __init__(self, device_id, nsamples=None): """Create a dummy device :param str device_id: e.g. "SA1_XTD2_XGM/DOOCS/MAIN" - :param int ntrains: e.g. 256 :param int nsamples: For INSTRUMENT data only. Default is ntrains. If more, should be a multiple of ntrains. If fewer, samples will be spread evenly across the trains. From 45f8293b5519c6c674882345940a3ec786188d82 Mon Sep 17 00:00:00 2001 From: JamesWrigley Date: Tue, 22 Feb 2022 18:08:40 +0100 Subject: [PATCH 3/4] Allow make_fxe_run to control how many trains are missing data --- extra_data/tests/make_examples.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/extra_data/tests/make_examples.py b/extra_data/tests/make_examples.py index 1d9e97bd..8996e356 100644 --- a/extra_data/tests/make_examples.py +++ b/extra_data/tests/make_examples.py @@ -209,7 +209,7 @@ def make_lpd_file(path, format_version='0.5'): LPDModule('FXE_DET_LPD1M-1/DET/0CH0', frames_per_train=128) ], ntrains=480, chunksize=32, format_version=format_version) -def make_fxe_run(dir_path, raw=True, format_version='0.5'): +def make_fxe_run(dir_path, raw=True, missing_data_ratio=1, format_version='0.5'): prefix = 'RAW' if raw else 'CORR' for modno in range(16): path = osp.join(dir_path, @@ -220,17 +220,21 @@ def make_fxe_run(dir_path, raw=True, format_version='0.5'): ], ntrains=480, chunksize=32, format_version=format_version) if not raw: return + + # Calculate nsamples for the given missing_data_ratio + compute_nsamples = lambda ntrains: int((1 - missing_data_ratio) * ntrains) + write_file(osp.join(dir_path, 'RAW-R0450-DA01-S00000.h5'), [ XGM('SA1_XTD2_XGM/DOOCS/MAIN'), XGM('SPB_XTD9_XGM/DOOCS/MAIN'), GECCamera('FXE_XAD_GEC/CAM/CAMERA'), - GECCamera('FXE_XAD_GEC/CAM/CAMERA_NODATA', nsamples=0), + GECCamera('FXE_XAD_GEC/CAM/CAMERA_NODATA', nsamples=compute_nsamples(400)), ], ntrains=400, chunksize=200, format_version=format_version) write_file(osp.join(dir_path, '{}-R0450-DA01-S00001.h5'.format(prefix)), [ XGM('SA1_XTD2_XGM/DOOCS/MAIN'), XGM('SPB_XTD9_XGM/DOOCS/MAIN'), GECCamera('FXE_XAD_GEC/CAM/CAMERA'), - GECCamera('FXE_XAD_GEC/CAM/CAMERA_NODATA', nsamples=0), + GECCamera('FXE_XAD_GEC/CAM/CAMERA_NODATA', nsamples=compute_nsamples(80)), ], ntrains=80, firsttrain=10400, chunksize=200, format_version=format_version) def make_lpd_parallelgain_run(dir_path, raw=True, format_version='0.5'): From ce4d970fe0733eeb1bfd7ea492bc2db21540caf8 Mon Sep 17 00:00:00 2001 From: JamesWrigley Date: Tue, 22 Feb 2022 18:11:50 +0100 Subject: [PATCH 4/4] Add support for checking whether any sources have missing data This uses a user-defined threshold of trains with missing data to decide which sources to report. --- extra_data/tests/conftest.py | 7 ++++ extra_data/tests/test_validation.py | 19 +++++++++-- extra_data/validation.py | 53 +++++++++++++++++++++++++---- 3 files changed, 71 insertions(+), 8 deletions(-) diff --git a/extra_data/tests/conftest.py b/extra_data/tests/conftest.py index af9b768a..cfb7959d 100644 --- a/extra_data/tests/conftest.py +++ b/extra_data/tests/conftest.py @@ -62,6 +62,13 @@ def mock_fxe_raw_run(format_version): yield td +@pytest.fixture(scope='session') +def mock_fxe_raw_half_missing_run(format_version): + with TemporaryDirectory() as td: + make_examples.make_fxe_run(td, missing_data_ratio=0.5, format_version=format_version) + yield td + + @pytest.fixture(scope='session') def mock_lpd_parallelgain_run(): with TemporaryDirectory() as td: diff --git a/extra_data/tests/test_validation.py b/extra_data/tests/test_validation.py index 1b5e5e8a..a15365a8 100644 --- a/extra_data/tests/test_validation.py +++ b/extra_data/tests/test_validation.py @@ -29,7 +29,7 @@ def data_aggregator_file(): def test_validate_run(mock_fxe_raw_run): - rv = RunValidator(mock_fxe_raw_run) + rv = RunValidator(mock_fxe_raw_run, 1) rv.validate() @@ -37,12 +37,27 @@ def test_file_error(mock_fxe_raw_run): not_readable = Path(mock_fxe_raw_run) / 'notReadable.h5' not_readable.touch(mode=0o066) - problems = RunValidator(mock_fxe_raw_run).run_checks() + problems = RunValidator(mock_fxe_raw_run, 1).run_checks() assert len(problems) == 1 assert problems[0]['msg'] == 'Could not open file' assert problems[0]['file'] == str(not_readable) +def test_missing_sources(mock_fxe_raw_half_missing_run): + # Validating with a missing_data_threshold of 0.6 should pass, since only + # 50% of the camera data is missing. + rv = RunValidator(mock_fxe_raw_half_missing_run, missing_data_threshold=0.6) + rv.validate() + + # But with a threshold of 0.4, the camera with half its data missing should + # be reported. + rv = RunValidator(mock_fxe_raw_half_missing_run, missing_data_threshold=0.4) + problems = rv.run_checks() + assert len(problems) == 1 + assert "CAMERA_NODATA" in problems[0]['msg'] + assert "50.00%" in problems[0]['msg'] + + def test_zeros_in_train_ids(agipd_file): with File(agipd_file, 'r+') as f: # introduce zeros in trainId diff --git a/extra_data/validation.py b/extra_data/validation.py index a9e3428f..77fcd972 100644 --- a/extra_data/validation.py +++ b/extra_data/validation.py @@ -1,4 +1,4 @@ -from argparse import ArgumentParser +from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter from multiprocessing import Pool from functools import partial import numpy as np @@ -8,7 +8,7 @@ from signal import signal, SIGINT, SIG_IGN import sys -from .reader import H5File, FileAccess +from .reader import H5File, FileAccess, RunDirectory from .run_files_map import RunFilesMap @@ -239,13 +239,14 @@ def _check_file(args): class RunValidator: - def __init__(self, run_dir: str, term_progress=False): + def __init__(self, run_dir: str, missing_data_threshold: float, term_progress=False): self.run_dir = run_dir + self.missing_data_threshold = missing_data_threshold self.term_progress = term_progress self.filenames = [f for f in os.listdir(run_dir) if f.endswith('.h5')] self.file_accesses = [] self.problems = [] - self.progress_stages = 1 + self.progress_stages = 2 def validate(self): problems = self.run_checks() @@ -256,6 +257,7 @@ def run_checks(self): self.problems = [] self.check_files(progress_stage=1) self.check_files_map() + self.check_missing_sources(progress_stage=2) return self.problems def progress(self, stage_done, stage_total, stage, badfiles=None): @@ -323,19 +325,58 @@ def check_files_map(self): f_access.close() + def check_missing_sources(self, progress_stage): + run = RunDirectory(self.run_dir) + run_tid_count = len(run.train_ids) + sources = sorted(run.all_sources) + + for done, source in enumerate(sources, start=1): + bad_keys = [] + + # Look through all keys for missing data + for key in sorted(run[source].keys()): + counts = run[source, key].data_counts(labelled=False) + missing_data_ratio = (run_tid_count - np.count_nonzero(counts)) / run_tid_count + + # If the missing data ratio is above the threshold, record it + # for printing. + if missing_data_ratio > self.missing_data_threshold: + missing_percentage = missing_data_ratio * 100 + missing_count = run_tid_count - np.count_nonzero(counts) + bad_keys.append((key, missing_percentage, missing_count)) + + # Often a source will be missing data for all of its keys, so to be + # less spammy we record a single problem per-source instead of + # per-key. + if len(bad_keys) > 0: + msg = f"{source} is missing data for the following keys:" + for key, missing_percentage, missing_count in bad_keys: + msg += f"\n - {key} is missing from {missing_percentage:.2f}% ({missing_count}/{run_tid_count}) of trains" + + self.problems.append(dict(msg=msg, directory=self.run_dir)) + + self.progress(done, len(sources), progress_stage) + + def main(argv=None): if argv is None: argv = sys.argv[1:] - ap = ArgumentParser(prog='extra-data-validate') + ap = ArgumentParser(prog='extra-data-validate', + formatter_class=ArgumentDefaultsHelpFormatter) ap.add_argument('path', help="HDF5 file or run directory of HDF5 files.") + ap.add_argument('--missing-data-threshold', help="Threshold from 0-1 for the ratio of trains with missing data for a " + "source. For example, a threshold of 0.1 means report an error for all " + "sources missing from more than 10%% of trains for the run. " + "Only applicable to runs, not individual files.", + type=float, default=0.05) args = ap.parse_args(argv) path = args.path if os.path.isdir(path): print("Checking run directory:", path) print() - validator = RunValidator(path, term_progress=True) + validator = RunValidator(path, args.missing_data_threshold, term_progress=True) else: print("Checking file:", path) validator = FileValidator(H5File(path).files[0])