Skip to content

Commit

Permalink
load files into memory for processing fist
Browse files Browse the repository at this point in the history
  • Loading branch information
rettigl committed Dec 10, 2024
1 parent f7e7a53 commit a903770
Showing 1 changed file with 56 additions and 28 deletions.
84 changes: 56 additions & 28 deletions sed/loader/mpes/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
"""
import datetime
import glob
import io
import json
import os
from typing import Dict
Expand All @@ -27,6 +28,29 @@
from sed.loader.base.loader import BaseLoader


def load_h5_in_memory(file_path):
"""
Load an HDF5 file entirely into memory and open it with h5py.
Parameters:
file_path (str): Path to the .h5 file.
Returns:
h5py.File: An h5py File object representing the in-memory HDF5 file.
"""
# Read the entire file into memory
with open(file_path, "rb") as f:
file_content = f.read()

# Load the content into a BytesIO object
file_buffer = io.BytesIO(file_content)

# Open the HDF5 file using h5py from the in-memory buffer
h5_file = h5py.File(file_buffer, "r")

return h5_file


def hdf5_to_dataframe(
files: Sequence[str],
group_names: Sequence[str] = None,
Expand Down Expand Up @@ -67,20 +91,22 @@ def hdf5_to_dataframe(

# Read a file to parse the file structure
test_fid = kwds.pop("test_fid", 0)
test_proc = h5py.File(files[test_fid])
test_proc = load_h5_in_memory(files[test_fid])
if group_names == []:
group_names, alias_dict = get_groups_and_aliases(
h5file=test_proc,
search_pattern="Stream",
)

test_proc.close()

column_names = [alias_dict.get(group, group) for group in group_names]

if time_stamps:
column_names.append(time_stamp_alias)

test_array = hdf5_to_array(
h5file=test_proc,
h5filename=files[test_fid],
group_names=group_names,
time_stamps=time_stamps,
ms_markers_group=ms_markers_group,
Expand All @@ -94,7 +120,7 @@ def hdf5_to_dataframe(
arrays.append(
da.from_delayed(
dask.delayed(hdf5_to_array)(
h5file=h5py.File(f),
h5filename=f,
group_names=group_names,
time_stamps=time_stamps,
ms_markers_group=ms_markers_group,
Expand Down Expand Up @@ -155,20 +181,22 @@ def hdf5_to_timed_dataframe(

# Read a file to parse the file structure
test_fid = kwds.pop("test_fid", 0)
test_proc = h5py.File(files[test_fid])
test_proc = load_h5_in_memory(files[test_fid])
if group_names == []:
group_names, alias_dict = get_groups_and_aliases(
h5file=test_proc,
search_pattern="Stream",
)

test_proc.close()

column_names = [alias_dict.get(group, group) for group in group_names]

if time_stamps:
column_names.append(time_stamp_alias)

test_array = hdf5_to_timed_array(
h5file=test_proc,
h5filename=files[test_fid],
group_names=group_names,
time_stamps=time_stamps,
ms_markers_group=ms_markers_group,
Expand All @@ -182,7 +210,7 @@ def hdf5_to_timed_dataframe(
arrays.append(
da.from_delayed(
dask.delayed(hdf5_to_timed_array)(
h5file=h5py.File(f),
h5filename=f,
group_names=group_names,
time_stamps=time_stamps,
ms_markers_group=ms_markers_group,
Expand Down Expand Up @@ -237,7 +265,7 @@ def get_groups_and_aliases(


def hdf5_to_array(
h5file: h5py.File,
h5filename: str,
group_names: Sequence[str],
data_type: str = "float32",
time_stamps=False,
Expand All @@ -248,14 +276,10 @@ def hdf5_to_array(
2-dimensional array with the corresponding values.
Args:
h5file (h5py.File):
hdf5 file handle to read from
group_names (str):
group names to read
data_type (str, optional):
Data type of the output data. Defaults to "float32".
time_stamps (bool, optional):
Option to calculate time stamps. Defaults to False.
h5filename (str): hdf5 file name to read from
group_names (str): group names to read
data_type (str, optional): Data type of the output data. Defaults to "float32".
time_stamps (bool, optional): Option to calculate time stamps. Defaults to False.
ms_markers_group (str): h5 column containing timestamp information.
Defaults to "msMarkers".
first_event_time_stamp_key (str): h5 attribute containing the start
Expand All @@ -267,6 +291,8 @@ def hdf5_to_array(

# Delayed array for loading an HDF5 file of reasonable size (e.g. < 1GB)

h5file = load_h5_in_memory(h5filename)

# Read out groups:
data_list = []
for group in group_names:
Expand Down Expand Up @@ -316,11 +342,13 @@ def hdf5_to_array(

data_list.append(time_stamp_data)

h5file.close()

return np.asarray(data_list)


def hdf5_to_timed_array(
h5file: h5py.File,
h5filename: str,
group_names: Sequence[str],
data_type: str = "float32",
time_stamps=False,
Expand All @@ -331,14 +359,10 @@ def hdf5_to_timed_array(
timed version of a 2-dimensional array with the corresponding values.
Args:
h5file (h5py.File):
hdf5 file handle to read from
group_names (str):
group names to read
data_type (str, optional):
Data type of the output data. Defaults to "float32".
time_stamps (bool, optional):
Option to calculate time stamps. Defaults to False.
h5filename (str): hdf5 file name to read from
group_names (str): group names to read
data_type (str, optional): Data type of the output data. Defaults to "float32".
time_stamps (bool, optional): Option to calculate time stamps. Defaults to False.
ms_markers_group (str): h5 column containing timestamp information.
Defaults to "msMarkers".
first_event_time_stamp_key (str): h5 attribute containing the start
Expand All @@ -351,6 +375,8 @@ def hdf5_to_timed_array(

# Delayed array for loading an HDF5 file of reasonable size (e.g. < 1GB)

h5file = load_h5_in_memory(h5filename)

# Read out groups:
data_list = []
ms_marker = np.asarray(h5file[ms_markers_group])
Expand Down Expand Up @@ -386,6 +412,8 @@ def hdf5_to_timed_array(

data_list.append(time_stamp_data)

h5file.close()

return np.asarray(data_list)


Expand Down Expand Up @@ -692,14 +720,14 @@ def get_start_and_end_time(self) -> Tuple[float, float]:
Returns:
Tuple[float, float]: A tuple containing the start and end time stamps
"""
h5file = h5py.File(self.files[0])
h5file = load_h5_in_memory(self.files[0])
timestamps = hdf5_to_array(
h5file,
group_names=self._config["dataframe"]["hdf5_groupnames"],
time_stamps=True,
)
ts_from = timestamps[-1][1]
h5file = h5py.File(self.files[-1])
h5file = load_h5_in_memory(self.files[-1])
timestamps = hdf5_to_array(
h5file,
group_names=self._config["dataframe"]["hdf5_groupnames"],
Expand Down Expand Up @@ -929,7 +957,7 @@ def get_count_rate(
for fid in fids:
try:
count_rate_, secs_ = get_count_rate(
h5py.File(self.files[fid]),
load_h5_in_memory(self.files[fid]),
ms_markers_group=ms_markers_group,
)
secs_list.append((accumulated_time + secs_).T)
Expand Down Expand Up @@ -974,7 +1002,7 @@ def get_elapsed_time(self, fids: Sequence[int] = None, **kwds) -> float:
for fid in fids:
try:
secs += get_elapsed_time(
h5py.File(self.files[fid]),
load_h5_in_memory(self.files[fid]),
ms_markers_group=ms_markers_group,
)
except OSError as exc:
Expand Down

0 comments on commit a903770

Please sign in to comment.