Skip to content

Commit

Permalink
Use dask scheduler for distributed computing
Browse files Browse the repository at this point in the history
  • Loading branch information
yuriyzubov committed Jan 14, 2025
1 parent 0a8ec82 commit 994022d
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 31 deletions.
54 changes: 27 additions & 27 deletions src/tiff_stack_to_zarr.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,35 +17,35 @@
@click.command()
@click.option('--src','-s',type=click.Path(exists = True),help='Input tiff stack directory path.')
@click.option('--dest','-s',type=click.STRING,help='Output .zarr file path.')
@click.option('--workers','-w',default=100,type=click.INT,help = "Number of dask workers")
@click.option('--num_workers','-w',default=100,type=click.INT,help = "Number of dask workers")
@click.option('--cluster', '-c', default='' ,type=click.STRING, help="Which instance of dask client to use. Local client - 'local', cluster 'lsf'")
def cli(src, dest, workers, cluster):
def cli(src, dest, num_workers, cluster):

# if cluster == '':
# print('Did not specify which instance of the dask client to use!')
# sys.exit(0)
# elif cluster == 'lsf':
# num_cores = 1
# cluster = LSFCluster(
# cores=num_cores,
# processes=num_cores,
# memory=f"{15 * num_cores}GB",
# ncpus=num_cores,
# mem=15 * num_cores,
# walltime="48:00",
# local_directory = "/scratch/$USER/"
# )
if cluster == '':
print('Did not specify which instance of the dask client to use!')
sys.exit(0)
elif cluster == 'lsf':
num_cores = 1
cluster = LSFCluster(
cores=num_cores,
processes=num_cores,
memory=f"{15 * num_cores}GB",
ncpus=num_cores,
mem=15 * num_cores,
walltime="48:00",
local_directory = "/scratch/$USER/"
)

# elif cluster == 'local':
# cluster = LocalCluster()
elif cluster == 'local':
cluster = LocalCluster()

# client = Client(cluster)
# with open(os.path.join(os.getcwd(), "dask_dashboard_link" + ".txt"), "w") as text_file:
# text_file.write(str(client.dashboard_link))
# print(client.dashboard_link)
client = Client(cluster)
with open(os.path.join(os.getcwd(), "dask_dashboard_link" + ".txt"), "w") as text_file:
text_file.write(str(client.dashboard_link))
print(client.dashboard_link)

path_to_stack = src
z_store = zarr.DirectoryStore(dest)
z_store = zarr.NestedDirectoryStore(dest)
tiff_stack = natsorted(os.listdir(path_to_stack))

probe_image = imread(os.path.join(path_to_stack, tiff_stack[0]))
Expand All @@ -62,11 +62,11 @@ def cli(src, dest, workers, cluster):
start_time = time.time()
print(z_arr.chunks[0])
print(len(tiff_stack))
# if tiff stack fits into one chunk, choose max value of zarr stack as the length of the tiff stack
zarr_stack = min(z_arr.chunks[0], len(tiff_stack))
print(zarr_stack)

write_tiles_strobbing(path_to_stack, z_arr, tiff_stack)

client.cluster.scale(num_workers)
write_tiles_strobbing(path_to_stack, z_arr, tiff_stack, client)
client.cluster.scale(0)

print(time.time() - start_time)
if __name__ == '__main__':
Expand Down
15 changes: 11 additions & 4 deletions src/writes.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
import numpy as np
import zarr
import os
import multiprocessing as mp
from dask.distributed import Client, wait
import time


def write_tile_to_zarr(
Expand Down Expand Up @@ -32,11 +33,17 @@ def write_tile_to_zarr(
def write_tiles_strobbing(path_to_stack : str,
zarray : zarr.Group,
tiles_list : list,
client : Client
):
chunks_list = np.arange(0, zarray.shape[0], zarray.chunks[0])
print(chunks_list)
cpu_num=mp.cpu_count()
with mp.Pool(processes=int(cpu_num * 0.6)) as pool:
results = pool.starmap(write_tile_to_zarr, ((chunk_num, path_to_stack, zarray, tiles_list) for chunk_num in chunks_list))

start = time.time()
fut = client.map(lambda v: write_tile_to_zarr(v, path_to_stack, zarray, tiles_list), chunks_list)
print(f'Submitted {len(chunks_list)} tasks to the scheduler in {time.time()- start}s')

# wait for all the futures to complete
result = wait(fut)
print(f'Completed {len(chunks_list)} tasks in {time.time() - start}s')

return 0

0 comments on commit 994022d

Please sign in to comment.