Skip to content

Commit

Permalink
Merge pull request #13 from European-XFEL/fix/close_thread_local_files
Browse files Browse the repository at this point in the history
Close files in EXtra-data functor only for different processes
  • Loading branch information
philsmt authored Mar 22, 2022
2 parents 2047d10 + f4bbc16 commit b0dc047
Showing 1 changed file with 14 additions and 5 deletions.
19 changes: 14 additions & 5 deletions pasha/functor.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@
# Copyright (c) 2020, European X-Ray Free-Electron Laser Facility GmbH.
# All rights reserved.

from collections.abc import Sequence
from os import getpid
import math
import sys
from collections.abc import Sequence

import numpy as np

Expand Down Expand Up @@ -255,6 +256,10 @@ def __init__(self, obj):
self.obj = obj
self.n_trains = len(self.obj.train_ids)

# Save PID of parent process where the functor is created to
# close files as appropriately later on, see comment below.
self._parent_pid = getpid()

@classmethod
def wrap(cls, value):
if 'extra_data' not in sys.modules:
Expand All @@ -272,10 +277,14 @@ def split(self, num_workers):
def iterate(self, share):
subobj = self.obj.select_trains(np.s_[share])

# Close all file handles inherited from the parent collection
# to force re-opening them in each worker process.
for f in subobj.files:
f.close()
# Older versions of HDF < 1.10.5 are not robust against sharing
# a file descriptor across threads or processes. If running in a
# different process than the functor was initially created in,
# close all file handles inherited from the parent collection to
# force re-opening them again in each child process.
if getpid() != self._parent_pid:
for f in subobj.files:
f.close()

it = zip(range(*share.indices(self.n_trains)), subobj.trains())

Expand Down

0 comments on commit b0dc047

Please sign in to comment.