Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for checking missing sources with extra-data-validate #280

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions extra_data/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,13 @@ def mock_fxe_raw_run(format_version):
yield td


@pytest.fixture(scope='session')
def mock_fxe_raw_half_missing_run(format_version):
with TemporaryDirectory() as td:
make_examples.make_fxe_run(td, missing_data_ratio=0.5, format_version=format_version)
yield td


@pytest.fixture(scope='session')
def mock_lpd_parallelgain_run():
with TemporaryDirectory() as td:
Expand Down
10 changes: 7 additions & 3 deletions extra_data/tests/make_examples.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ def make_lpd_file(path, format_version='0.5'):
LPDModule('FXE_DET_LPD1M-1/DET/0CH0', frames_per_train=128)
], ntrains=480, chunksize=32, format_version=format_version)

def make_fxe_run(dir_path, raw=True, format_version='0.5'):
def make_fxe_run(dir_path, raw=True, missing_data_ratio=1, format_version='0.5'):
prefix = 'RAW' if raw else 'CORR'
for modno in range(16):
path = osp.join(dir_path,
Expand All @@ -220,17 +220,21 @@ def make_fxe_run(dir_path, raw=True, format_version='0.5'):
], ntrains=480, chunksize=32, format_version=format_version)
if not raw:
return

# Calculate nsamples for the given missing_data_ratio
compute_nsamples = lambda ntrains: int((1 - missing_data_ratio) * ntrains)

write_file(osp.join(dir_path, 'RAW-R0450-DA01-S00000.h5'), [
XGM('SA1_XTD2_XGM/DOOCS/MAIN'),
XGM('SPB_XTD9_XGM/DOOCS/MAIN'),
GECCamera('FXE_XAD_GEC/CAM/CAMERA'),
GECCamera('FXE_XAD_GEC/CAM/CAMERA_NODATA', nsamples=0),
GECCamera('FXE_XAD_GEC/CAM/CAMERA_NODATA', nsamples=compute_nsamples(400)),
], ntrains=400, chunksize=200, format_version=format_version)
write_file(osp.join(dir_path, '{}-R0450-DA01-S00001.h5'.format(prefix)), [
XGM('SA1_XTD2_XGM/DOOCS/MAIN'),
XGM('SPB_XTD9_XGM/DOOCS/MAIN'),
GECCamera('FXE_XAD_GEC/CAM/CAMERA'),
GECCamera('FXE_XAD_GEC/CAM/CAMERA_NODATA', nsamples=0),
GECCamera('FXE_XAD_GEC/CAM/CAMERA_NODATA', nsamples=compute_nsamples(80)),
], ntrains=80, firsttrain=10400, chunksize=200, format_version=format_version)

def make_lpd_parallelgain_run(dir_path, raw=True, format_version='0.5'):
Expand Down
1 change: 0 additions & 1 deletion extra_data/tests/mockdata/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ def __init__(self, device_id, nsamples=None):
"""Create a dummy device

:param str device_id: e.g. "SA1_XTD2_XGM/DOOCS/MAIN"
:param int ntrains: e.g. 256
:param int nsamples: For INSTRUMENT data only. Default is ntrains.
If more, should be a multiple of ntrains. If fewer, samples will be
spread evenly across the trains.
Expand Down
19 changes: 17 additions & 2 deletions extra_data/tests/test_validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,20 +29,35 @@ def data_aggregator_file():


def test_validate_run(mock_fxe_raw_run):
rv = RunValidator(mock_fxe_raw_run)
rv = RunValidator(mock_fxe_raw_run, 1)
rv.validate()


def test_file_error(mock_fxe_raw_run):
not_readable = Path(mock_fxe_raw_run) / 'notReadable.h5'
not_readable.touch(mode=0o066)

problems = RunValidator(mock_fxe_raw_run).run_checks()
problems = RunValidator(mock_fxe_raw_run, 1).run_checks()
assert len(problems) == 1
assert problems[0]['msg'] == 'Could not open file'
assert problems[0]['file'] == str(not_readable)


def test_missing_sources(mock_fxe_raw_half_missing_run):
# Validating with a missing_data_threshold of 0.6 should pass, since only
# 50% of the camera data is missing.
rv = RunValidator(mock_fxe_raw_half_missing_run, missing_data_threshold=0.6)
rv.validate()

# But with a threshold of 0.4, the camera with half its data missing should
# be reported.
rv = RunValidator(mock_fxe_raw_half_missing_run, missing_data_threshold=0.4)
problems = rv.run_checks()
assert len(problems) == 1
assert "CAMERA_NODATA" in problems[0]['msg']
assert "50.00%" in problems[0]['msg']


def test_zeros_in_train_ids(agipd_file):
with File(agipd_file, 'r+') as f:
# introduce zeros in trainId
Expand Down
72 changes: 57 additions & 15 deletions extra_data/validation.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from argparse import ArgumentParser
from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter
from multiprocessing import Pool
from functools import partial
import numpy as np
Expand All @@ -8,7 +8,7 @@
from signal import signal, SIGINT, SIG_IGN
import sys

from .reader import H5File, FileAccess
from .reader import H5File, FileAccess, RunDirectory
from .run_files_map import RunFilesMap


Expand Down Expand Up @@ -212,10 +212,10 @@ def check_index_contiguous(firsts, counts, record):
))


def progress_bar(done, total, suffix=' '):
line = f'Progress: {done}/{total}{suffix}[{{}}]'
def progress_bar(done):
line = f'Progress: {done * 100:.2f}% [{{}}]'
length = min(get_terminal_size().columns - len(line), 50)
filled = int(length * done // total)
filled = int(length * done)
bar = '#' * filled + ' ' * (length - filled)
return line.format(bar)

Expand All @@ -239,12 +239,14 @@ def _check_file(args):


class RunValidator:
def __init__(self, run_dir: str, term_progress=False):
def __init__(self, run_dir: str, missing_data_threshold: float, term_progress=False):
self.run_dir = run_dir
self.missing_data_threshold = missing_data_threshold
self.term_progress = term_progress
self.filenames = [f for f in os.listdir(run_dir) if f.endswith('.h5')]
self.file_accesses = []
self.problems = []
self.progress_stages = 2

def validate(self):
problems = self.run_checks()
Expand All @@ -253,17 +255,18 @@ def validate(self):

def run_checks(self):
self.problems = []
self.check_files()
self.check_files(progress_stage=1)
self.check_files_map()
self.check_missing_sources(progress_stage=2)
return self.problems

def progress(self, done, total, nproblems, badfiles):
def progress(self, stage_done, stage_total, stage, badfiles=None):
"""Show progress information"""
if not self.term_progress:
return

lines = progress_bar(done, total)
lines += f'\n{nproblems} problems'
lines = progress_bar((stage_done / stage_total + stage - 1) / self.progress_stages)
lines += f'\n{len(self.problems)} problems'
if badfiles:
lines += f' in {len(badfiles)} files (last: {badfiles[-1]})'
if sys.stderr.isatty():
Expand All @@ -273,7 +276,7 @@ def progress(self, done, total, nproblems, badfiles):
else:
print(lines, file=sys.stderr)

def check_files(self):
def check_files(self, progress_stage):
self.file_accesses = []

def initializer():
Expand All @@ -283,7 +286,7 @@ def initializer():
filepaths = [(self.run_dir, fn) for fn in sorted(self.filenames)]
nfiles = len(self.filenames)
badfiles = []
self.progress(0, nfiles, 0, badfiles)
self.progress(0, nfiles, progress_stage, badfiles)

with Pool(initializer=initializer) as pool:
iterator = pool.imap_unordered(_check_file, filepaths)
Expand All @@ -293,7 +296,7 @@ def initializer():
badfiles.append(fname)
if fa is not None:
self.file_accesses.append(fa)
self.progress(done, nfiles, len(self.problems), badfiles)
self.progress(done, nfiles, progress_stage, badfiles)

if not self.file_accesses:
self.problems.append(
Expand Down Expand Up @@ -322,19 +325,58 @@ def check_files_map(self):

f_access.close()

def check_missing_sources(self, progress_stage):
run = RunDirectory(self.run_dir)
run_tid_count = len(run.train_ids)
sources = sorted(run.all_sources)

for done, source in enumerate(sources, start=1):
bad_keys = []

# Look through all keys for missing data
for key in sorted(run[source].keys()):
counts = run[source, key].data_counts(labelled=False)
missing_data_ratio = (run_tid_count - np.count_nonzero(counts)) / run_tid_count

# If the missing data ratio is above the threshold, record it
# for printing.
if missing_data_ratio > self.missing_data_threshold:
missing_percentage = missing_data_ratio * 100
missing_count = run_tid_count - np.count_nonzero(counts)
bad_keys.append((key, missing_percentage, missing_count))

# Often a source will be missing data for all of its keys, so to be
# less spammy we record a single problem per-source instead of
# per-key.
if len(bad_keys) > 0:
msg = f"{source} is missing data for the following keys:"
for key, missing_percentage, missing_count in bad_keys:
msg += f"\n - {key} is missing from {missing_percentage:.2f}% ({missing_count}/{run_tid_count}) of trains"

self.problems.append(dict(msg=msg, directory=self.run_dir))

self.progress(done, len(sources), progress_stage)


def main(argv=None):
if argv is None:
argv = sys.argv[1:]

ap = ArgumentParser(prog='extra-data-validate')
ap = ArgumentParser(prog='extra-data-validate',
formatter_class=ArgumentDefaultsHelpFormatter)
ap.add_argument('path', help="HDF5 file or run directory of HDF5 files.")
ap.add_argument('--missing-data-threshold', help="Threshold from 0-1 for the ratio of trains with missing data for a "
"source. For example, a threshold of 0.1 means report an error for all "
"sources missing from more than 10%% of trains for the run. "
"Only applicable to runs, not individual files.",
type=float, default=0.05)
args = ap.parse_args(argv)

path = args.path
if os.path.isdir(path):
print("Checking run directory:", path)
print()
validator = RunValidator(path, term_progress=True)
validator = RunValidator(path, args.missing_data_threshold, term_progress=True)
else:
print("Checking file:", path)
validator = FileValidator(H5File(path).files[0])
Expand Down