From a903770841aeae8d760953f7d0659b51c3cc211f Mon Sep 17 00:00:00 2001 From: rettigl Date: Tue, 10 Dec 2024 16:04:43 +0100 Subject: [PATCH] load files into memory for processing fist --- sed/loader/mpes/loader.py | 84 ++++++++++++++++++++++++++------------- 1 file changed, 56 insertions(+), 28 deletions(-) diff --git a/sed/loader/mpes/loader.py b/sed/loader/mpes/loader.py index c57a2c5c..f2333ba8 100644 --- a/sed/loader/mpes/loader.py +++ b/sed/loader/mpes/loader.py @@ -5,6 +5,7 @@ """ import datetime import glob +import io import json import os from typing import Dict @@ -27,6 +28,29 @@ from sed.loader.base.loader import BaseLoader +def load_h5_in_memory(file_path): + """ + Load an HDF5 file entirely into memory and open it with h5py. + + Parameters: + file_path (str): Path to the .h5 file. + + Returns: + h5py.File: An h5py File object representing the in-memory HDF5 file. + """ + # Read the entire file into memory + with open(file_path, "rb") as f: + file_content = f.read() + + # Load the content into a BytesIO object + file_buffer = io.BytesIO(file_content) + + # Open the HDF5 file using h5py from the in-memory buffer + h5_file = h5py.File(file_buffer, "r") + + return h5_file + + def hdf5_to_dataframe( files: Sequence[str], group_names: Sequence[str] = None, @@ -67,20 +91,22 @@ def hdf5_to_dataframe( # Read a file to parse the file structure test_fid = kwds.pop("test_fid", 0) - test_proc = h5py.File(files[test_fid]) + test_proc = load_h5_in_memory(files[test_fid]) if group_names == []: group_names, alias_dict = get_groups_and_aliases( h5file=test_proc, search_pattern="Stream", ) + test_proc.close() + column_names = [alias_dict.get(group, group) for group in group_names] if time_stamps: column_names.append(time_stamp_alias) test_array = hdf5_to_array( - h5file=test_proc, + h5filename=files[test_fid], group_names=group_names, time_stamps=time_stamps, ms_markers_group=ms_markers_group, @@ -94,7 +120,7 @@ def hdf5_to_dataframe( arrays.append( da.from_delayed( dask.delayed(hdf5_to_array)( - h5file=h5py.File(f), + h5filename=f, group_names=group_names, time_stamps=time_stamps, ms_markers_group=ms_markers_group, @@ -155,20 +181,22 @@ def hdf5_to_timed_dataframe( # Read a file to parse the file structure test_fid = kwds.pop("test_fid", 0) - test_proc = h5py.File(files[test_fid]) + test_proc = load_h5_in_memory(files[test_fid]) if group_names == []: group_names, alias_dict = get_groups_and_aliases( h5file=test_proc, search_pattern="Stream", ) + test_proc.close() + column_names = [alias_dict.get(group, group) for group in group_names] if time_stamps: column_names.append(time_stamp_alias) test_array = hdf5_to_timed_array( - h5file=test_proc, + h5filename=files[test_fid], group_names=group_names, time_stamps=time_stamps, ms_markers_group=ms_markers_group, @@ -182,7 +210,7 @@ def hdf5_to_timed_dataframe( arrays.append( da.from_delayed( dask.delayed(hdf5_to_timed_array)( - h5file=h5py.File(f), + h5filename=f, group_names=group_names, time_stamps=time_stamps, ms_markers_group=ms_markers_group, @@ -237,7 +265,7 @@ def get_groups_and_aliases( def hdf5_to_array( - h5file: h5py.File, + h5filename: str, group_names: Sequence[str], data_type: str = "float32", time_stamps=False, @@ -248,14 +276,10 @@ def hdf5_to_array( 2-dimensional array with the corresponding values. Args: - h5file (h5py.File): - hdf5 file handle to read from - group_names (str): - group names to read - data_type (str, optional): - Data type of the output data. Defaults to "float32". - time_stamps (bool, optional): - Option to calculate time stamps. Defaults to False. + h5filename (str): hdf5 file name to read from + group_names (str): group names to read + data_type (str, optional): Data type of the output data. Defaults to "float32". + time_stamps (bool, optional): Option to calculate time stamps. Defaults to False. ms_markers_group (str): h5 column containing timestamp information. Defaults to "msMarkers". first_event_time_stamp_key (str): h5 attribute containing the start @@ -267,6 +291,8 @@ def hdf5_to_array( # Delayed array for loading an HDF5 file of reasonable size (e.g. < 1GB) + h5file = load_h5_in_memory(h5filename) + # Read out groups: data_list = [] for group in group_names: @@ -316,11 +342,13 @@ def hdf5_to_array( data_list.append(time_stamp_data) + h5file.close() + return np.asarray(data_list) def hdf5_to_timed_array( - h5file: h5py.File, + h5filename: str, group_names: Sequence[str], data_type: str = "float32", time_stamps=False, @@ -331,14 +359,10 @@ def hdf5_to_timed_array( timed version of a 2-dimensional array with the corresponding values. Args: - h5file (h5py.File): - hdf5 file handle to read from - group_names (str): - group names to read - data_type (str, optional): - Data type of the output data. Defaults to "float32". - time_stamps (bool, optional): - Option to calculate time stamps. Defaults to False. + h5filename (str): hdf5 file name to read from + group_names (str): group names to read + data_type (str, optional): Data type of the output data. Defaults to "float32". + time_stamps (bool, optional): Option to calculate time stamps. Defaults to False. ms_markers_group (str): h5 column containing timestamp information. Defaults to "msMarkers". first_event_time_stamp_key (str): h5 attribute containing the start @@ -351,6 +375,8 @@ def hdf5_to_timed_array( # Delayed array for loading an HDF5 file of reasonable size (e.g. < 1GB) + h5file = load_h5_in_memory(h5filename) + # Read out groups: data_list = [] ms_marker = np.asarray(h5file[ms_markers_group]) @@ -386,6 +412,8 @@ def hdf5_to_timed_array( data_list.append(time_stamp_data) + h5file.close() + return np.asarray(data_list) @@ -692,14 +720,14 @@ def get_start_and_end_time(self) -> Tuple[float, float]: Returns: Tuple[float, float]: A tuple containing the start and end time stamps """ - h5file = h5py.File(self.files[0]) + h5file = load_h5_in_memory(self.files[0]) timestamps = hdf5_to_array( h5file, group_names=self._config["dataframe"]["hdf5_groupnames"], time_stamps=True, ) ts_from = timestamps[-1][1] - h5file = h5py.File(self.files[-1]) + h5file = load_h5_in_memory(self.files[-1]) timestamps = hdf5_to_array( h5file, group_names=self._config["dataframe"]["hdf5_groupnames"], @@ -929,7 +957,7 @@ def get_count_rate( for fid in fids: try: count_rate_, secs_ = get_count_rate( - h5py.File(self.files[fid]), + load_h5_in_memory(self.files[fid]), ms_markers_group=ms_markers_group, ) secs_list.append((accumulated_time + secs_).T) @@ -974,7 +1002,7 @@ def get_elapsed_time(self, fids: Sequence[int] = None, **kwds) -> float: for fid in fids: try: secs += get_elapsed_time( - h5py.File(self.files[fid]), + load_h5_in_memory(self.files[fid]), ms_markers_group=ms_markers_group, ) except OSError as exc: