Skip to content

Commit

Permalink
Formatted with ruff
Browse files Browse the repository at this point in the history
  • Loading branch information
shinzlet committed Oct 7, 2024
1 parent cb9876f commit 782d9c0
Show file tree
Hide file tree
Showing 11 changed files with 69 additions and 140 deletions.
1 change: 1 addition & 0 deletions softcopy/copier.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

LOG = logging.getLogger(__name__)


class AbstractCopier(ABC):
def __init__(self, source: Path, destination: Path, n_copy_procs: int, log: logging.Logger = LOG):
self._source = source
Expand Down
1 change: 1 addition & 0 deletions softcopy/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ def main(targets_file, verbose, nprocs):
for copier in copiers:
copier.stop()


def set_low_io_priority():
try:
if sys.platform == "linux":
Expand Down
1 change: 1 addition & 0 deletions softcopy/ome_zarr_copier.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

LOG = logging.getLogger(__name__)


class OMEZarrCopier(AbstractCopier):
"""
Wrapper around a ZarrCopier that also copies the metadata files for an OME-Zarr archive.
Expand Down
18 changes: 13 additions & 5 deletions softcopy/packed_name.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@ class PackedName:

# __slots__ is a special attribute that tells Python to not use a dict, and only allocate space for a fixed set of
# attributes. This is a performance optimization which saves memory.
__slots__ = ('_path', '_index')
__slots__ = ("_path", "_index")

def __init__(self, name: str, files_nd: np.ndarray, dim_separator: Literal["/", "."], zarr_format: Literal[2, 3]):
if dim_separator == "/":
parts = name.split("/")
else:
last_slash = name.rfind("/")
parts = name[last_slash + 1:].split(dim_separator)
parts = name[last_slash + 1 :].split(dim_separator)

require_c_prefix = zarr_format == 3
needed_parts = files_nd.size + (1 if require_c_prefix else 0)
Expand All @@ -49,7 +49,7 @@ def __init__(self, name: str, files_nd: np.ndarray, dim_separator: Literal["/",
# parts = parts[-needed_parts:]

try:
chunk_index_nd = tuple(int(p) for p in parts[-files_nd.size:])
chunk_index_nd = tuple(int(p) for p in parts[-files_nd.size :])
except ValueError:
self._path = name
self._index = None
Expand All @@ -73,15 +73,23 @@ def from_index(index: int):
ret._path = None
return ret

def path_from_index(index: int, files_nd: np.ndarray, zarr_location: Path, dim_separator: Literal["/", "."], zarr_format: Literal[2, 3]) -> Path:
def path_from_index(
index: int,
files_nd: np.ndarray,
zarr_location: Path,
dim_separator: Literal["/", "."],
zarr_format: Literal[2, 3],
) -> Path:
chunk_index_nd = np.unravel_index(index, files_nd)
prefixless_chunk_key = dim_separator.join(map(str, chunk_index_nd))

if zarr_format == 3:
return zarr_location / "c" / prefixless_chunk_key
return zarr_location / prefixless_chunk_key

def get_path(self, files_nd: np.ndarray, zarr_location: Path, dim_separator: Literal["/", "."], zarr_format: Literal[2, 3]) -> Path:
def get_path(
self, files_nd: np.ndarray, zarr_location: Path, dim_separator: Literal["/", "."], zarr_format: Literal[2, 3]
) -> Path:
if self._path is not None:
return zarr_location / self._path
elif self._index is not None:
Expand Down
17 changes: 10 additions & 7 deletions softcopy/slow_write.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,12 @@
@click.option("--method", type=click.Choice(["v2", "v3", "v3_shard"]), default="v3")
@click.option("--sleep", type=float, default=1.0)
@click.option("--timepoints", type=int, default=3)
@click.option("--no-complete-file", is_flag=True, help="Disable using a file called 'complete' to signal that the write is done", default=False)
@click.option(
"--no-complete-file",
is_flag=True,
help="Disable using a file called 'complete' to signal that the write is done",
default=False,
)
def main(source, destination, method, sleep, timepoints, no_complete_file):
ensure_high_io_priority()

Expand Down Expand Up @@ -66,13 +71,11 @@ def main(source, destination, method, sleep, timepoints, no_complete_file):
# target of 1000 * timepoints, we will warn the user:
num_files = np.prod(data.shape[1:] // chunks[1:])
if num_files / np.prod(target_files_nd) < 0.1:
print(f"Warning: the number of files being written is very low ({num_files} / timepoint). In real acquisitions, softcopy moves much more data than this, and so this may not be a good test of its performance.")
print(
f"Warning: the number of files being written is very low ({num_files} / timepoint). In real acquisitions, softcopy moves much more data than this, and so this may not be a good test of its performance."
)

preparation_methods = {
"v2": prepare_zarr_v2,
"v3": prepare_zarr_v3,
"v3_shard": prepare_zarr_v3_shard
}
preparation_methods = {"v2": prepare_zarr_v2, "v3": prepare_zarr_v3, "v3_shard": prepare_zarr_v3_shard}

prepare_zarr = preparation_methods[method]
dataset = prepare_zarr(destination, data, timepoints, chunks)
Expand Down
36 changes: 28 additions & 8 deletions softcopy/zarr_copier.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,9 @@ def __init__(self, source: Path, destination: Path, n_copy_procs: int = 1, log:
self._dimension_separator = zarr_json["dimension_separator"]
self._log.debug(f"Dimension separator: {self._dimension_separator}")
if self._dimension_separator in (None, ""):
log.critical(f"Could not determine dimension separator from zarr.json file {zarr_json_path}: {self._dimension_separator!r}")
log.critical(
f"Could not determine dimension separator from zarr.json file {zarr_json_path}: {self._dimension_separator!r}"
)
exit(1)
elif self._zarr_format == 3:
chunks = np.array(zarr_json["chunk_grid"]["configuration"]["chunk_shape"])
Expand All @@ -93,7 +95,12 @@ def __init__(self, source: Path, destination: Path, n_copy_procs: int = 1, log:
self._queue = Queue()
self._observer = Observer()
event_handler = ZarrFileEventHandler(
self._zarr_format, self._dimension_separator, self._files_nd, self._observation_finished, self._queue, self._log
self._zarr_format,
self._dimension_separator,
self._files_nd,
self._observation_finished,
self._queue,
self._log,
)
self._observer.schedule(event_handler, source, recursive=True)
self._copy_procs = []
Expand Down Expand Up @@ -190,7 +197,9 @@ def print_copy_status():
missed_count = 0
for chunk_index in range(np.prod(self._files_nd)):
chunk_packed_name: PackedName = PackedName.from_index(chunk_index)
chunk_path = chunk_packed_name.get_path(self._files_nd, self._destination, self._dimension_separator, self._zarr_format)
chunk_path = chunk_packed_name.get_path(
self._files_nd, self._destination, self._dimension_separator, self._zarr_format
)

if not chunk_path.exists():
self._log.debug(f"File {chunk_path} was missed by the observer! Adding to queue for retry.")
Expand Down Expand Up @@ -232,7 +241,9 @@ def _queue_existing_files(self):
# and then pack the name
relative_dir_path = os.path.relpath(dir_path, self._source)
relative_filepath = os.path.join(relative_dir_path, file)
packed_name = PackedName(relative_filepath, self._files_nd, self._dimension_separator, self._zarr_format)
packed_name = PackedName(
relative_filepath, self._files_nd, self._dimension_separator, self._zarr_format
)
if packed_name.is_zarr_chunk():
chunk_count += 1
if Path(file).stem == "complete":
Expand Down Expand Up @@ -313,9 +324,16 @@ def _copy_worker(
if queue_draining.value == 1 and queue.empty():
break


class ZarrFileEventHandler(FileSystemEventHandler):
def __init__(
self, zarr_format: Literal[2, 3], dimension_separator: Literal[".", "/"], files_nd: np.ndarray, observation_finished: Synchronized, queue: Queue, log: Logger = LOG
self,
zarr_format: Literal[2, 3],
dimension_separator: Literal[".", "/"],
files_nd: np.ndarray,
observation_finished: Synchronized,
queue: Queue,
log: Logger = LOG,
):
super().__init__()
self.zarr_format = zarr_format
Expand All @@ -326,10 +344,10 @@ def __init__(
self.queue = queue

def on_created(self, event: FileCreatedEvent):
if isinstance(event, FileCreatedEvent): # noqa: SIM102
if isinstance(event, FileCreatedEvent): # noqa: SIM102
# This is probably pointless, but I am worried about path parsing overhead given how many file transactions
# can occur - so only parse the path if we know the filepath ends with "complete"
if event.src_path.endswith("complete"): # noqa: SIM102
if event.src_path.endswith("complete"): # noqa: SIM102
if Path(event.src_path).stem == "complete":
self._log.info("Detected 'complete' file. Stopping observer.")
with self.observation_finished.get_lock():
Expand All @@ -340,7 +358,9 @@ def on_deleted(self, event):
# remove .__lock suffix from right side of path
lock_index = event.src_path.rfind(".__lock")
if lock_index != -1:
packed_name = PackedName(event.src_path[:lock_index], self.files_nd, self._dimension_separator, self.zarr_format)
packed_name = PackedName(
event.src_path[:lock_index], self.files_nd, self._dimension_separator, self.zarr_format
)
if packed_name._index is None:
print(f"screwed up: {event.src_path}")
self.queue.put(packed_name)
Expand Down
2 changes: 2 additions & 0 deletions softcopy/zarr_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
ALL_METADATA_FILES = set(functools.reduce(operator.iadd, METADATA_FILES_BY_VERSION.values(), []))
KNOWN_VERSIONS = set(METADATA_FILES_BY_VERSION.keys())


def identify_zarr_format(archive_path: Path, log: Logger = LOG) -> Optional[Literal[2, 3]]:
"""
Identify the zarr version of the archive by identifying a metadata file and reading its zarr_format key.
Expand All @@ -38,6 +39,7 @@ def identify_zarr_format(archive_path: Path, log: Logger = LOG) -> Optional[Lite
log.debug(f"Could not identify zarr version from metadata files in archive folder {archive_path}")
return None


def dtype_string_zarr2(dtype):
endianness = dtype.byteorder
if endianness == "=":
Expand Down
118 changes: 0 additions & 118 deletions test.py

This file was deleted.

13 changes: 11 additions & 2 deletions tests/test_softcopy.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,19 @@


def run_slow_write(dummy_path: Path, input_path: Path, no_complete_file: bool = False):
softcopy.slow_write.main([str(dummy_path), str(input_path), "--method", "v2", "--no-complete-file" if no_complete_file else "--"], standalone_mode=False)
softcopy.slow_write.main(
[str(dummy_path), str(input_path), "--method", "v2", "--no-complete-file" if no_complete_file else "--"],
standalone_mode=False,
)


def run_softcopy(targets_file_path: Path):
softcopy.main.main([str(targets_file_path),], standalone_mode=False)
softcopy.main.main(
[
str(targets_file_path),
],
standalone_mode=False,
)


def create_targets_yaml(targets_file_path: Path, input_path: Path, output_path: Path):
Expand Down Expand Up @@ -85,6 +93,7 @@ def test_slow_write_and_softcopy(workspace, dummy_zarr_path, create_zarr2_archiv

assert np.all(np.equal(slow_write_output[:].read().result(), copied_dataset[:].read().result()))


# In the tests above, we just test copying a regular zarr archive. However, we also need limited support for ome-zarr FOV data.
# Specifically, the ome-zarr subset that is written by DaXi. This looks like the following:
# aquisition.ome.zarr/
Expand Down
1 change: 1 addition & 0 deletions tests/test_to_ome.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,5 @@ def test_full_run(dummy_zarr_path, create_zarr2_archive):
expected_contents = {".zattrs", ".zgroup", "0"}
assert top_level_contents == expected_contents


# TODO: test for / dimension separator input, remove dim
1 change: 1 addition & 0 deletions tests/test_zarr_copier.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

TEST_DATA = "test data"


def test_zarr_copier(workspace, dummy_zarr_path):
destination = workspace / "destination"
destination.mkdir()
Expand Down

0 comments on commit 782d9c0

Please sign in to comment.