Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Shared memory for NBLAST #87

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 124 additions & 15 deletions navis/nbl/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,13 @@

"""Module containing base classes for BLASTING."""

import atexit

import numpy as np
import pandas as pd

from abc import ABC, abstractmethod
from multiprocessing import shared_memory

from .. import utils, config

Expand All @@ -25,7 +28,7 @@


class Blaster(ABC):
"""Base class for blasting."""
"""Base class for Blasting."""

def __init__(self, dtype=np.float64, progress=True):
"""Initialize class."""
Expand Down Expand Up @@ -58,15 +61,7 @@ def dtype(self):

@dtype.setter
def dtype(self, dtype):
try:
self._dtype = np.dtype(dtype)
except TypeError:
try:
self._dtype = FLOAT_DTYPES[dtype]
except KeyError:
raise ValueError(
f'Unknown precision/dtype {dtype}. Expected on of the following: 16, 32 or 64 (default)'
)
self._dtype = parse_precision(dtype)

def pair_query_target(self, pairs, scores='forward'):
"""BLAST multiple pairs.
Expand Down Expand Up @@ -101,7 +96,7 @@ def pair_query_target(self, pairs, scores='forward'):

return scr

def multi_query_target(self, q_idx, t_idx, scores='forward'):
def multi_query_target(self, q_idx, t_idx, scores='forward', out=None):
"""BLAST multiple queries against multiple targets.

Parameters
Expand All @@ -110,6 +105,8 @@ def multi_query_target(self, q_idx, t_idx, scores='forward'):
Iterable of query/target neuron indices to BLAST.
scores : "forward" | "mean" | "min" | "max"
Which scores to return.
out : np.ndarray, optional
Array to write results to.

"""
if utils.is_jupyter() and config.tqdm == config.tqdm_notebook:
Expand All @@ -124,18 +121,24 @@ def multi_query_target(self, q_idx, t_idx, scores='forward'):
else:
position = getattr(self, 'pbar_position', 0)

res = np.zeros((len(q_idx), len(t_idx)),
dtype=self.dtype)
shape = (len(q_idx), len(t_idx))
if not isinstance(out, np.ndarray):
out = np.zeros(shape, dtype=self.dtype)
elif out.shape != shape:
raise TypeError(f'Expected out array of shape {shape}, got {out.shape}')
elif out.dtype != self.dtype:
raise TypeError(f'Expected out array to be {self.dtype}, got {out.dtype}')

for i, q in enumerate(config.tqdm(q_idx,
desc=self.desc,
leave=False,
position=position,
disable=not self.progress)):
for k, t in enumerate(t_idx):
res[i, k] = self.single_query_target(q, t, scores=scores)
out[i, k] = self.single_query_target(q, t, scores=scores)

# Generate results
res = pd.DataFrame(res)
res = pd.DataFrame(out)
res.columns = [self.ids[t] for t in t_idx]
res.index = [self.ids[q] for q in q_idx]

Expand All @@ -157,3 +160,109 @@ def all_by_all(self, scores='forward'):
res.loc[:, :] = np.dstack((res, res.T)).max(axis=2)

return res


class SharedBlaster(Blaster, ABC):
"""Version of the Blaster that works with shared memory buffers."""

def create_array(self, shm, shape, offset=None):
"""Create array from shared memory buffer."""
# Generate the out array from shared memory buffer
arr = np.ndarray(shape, dtype=self.dtype, buffer=shm.buf)

if not isinstance(offset, type(None)):
arr = arr[offset]

return arr

def multi_query_target(self, q_idx, t_idx, shm, shape, offset=None, scores='forward'):
"""BLAST multiple queries against multiple targets.

Parameters
----------
q_idx,t_idx : iterable
Iterable of query/target neuron indices to BLAST.
shm : multiprocessing.shared_memory.SharedMemory
shape : tuple (N, M)
Shape of the array in the memory buffer.
offset : tuple (slice, slice)
The view inside the full array to pass through as `out` to
`multi_query_target`.
scores : "forward" | "mean" | "min" | "max"
Which scores to produce.

"""
out = self.create_array(shm, shape, offset=offset)

_ = super().multi_query_target(q_idx, t_idx, scores=scores, out=out)

def all_by_all(self, shm, shape, offset=None, scores='forward'):
"""BLAST all-by-all neurons."""
out = self.create_array(shm, shape, offset=offset)

_ = super().multi_query_target(range(len(self.neurons)),
range(len(self.neurons)),
scores='forward',
out=out)

# For all-by-all BLAST we can get the mean score by
# transposing the scores
if scores == 'mean':
out[:, :] = (out + out.T) / 2
elif scores == 'min':
out[:, :] = np.dstack((out, out.T)).min(axis=2)
elif scores == 'max':
out[:, :] = np.dstack((out, out.T)).max(axis=2)


def create_shared_array(shape, dtype):
"""Create shared array.

Parameters
----------
shape : tuple
dtype : str | np.dtype

Returns
-------
multiprocessing.shared_memory.SharedMemory
The shared memory buffer for the array.
np.ndarray
A numpy array accessing the shared memory buffer.

"""
# Get the number of items in the requested array
if utils.is_iterable(shape):
items = np.prod(shape)
else:
items = shape

# Force dtype to numpy dtype
dtype = np.dtype(dtype)
# Calculate required size for memory buffer
size = dtype.itemsize * items

# Create shared memory buffer
shm = shared_memory.SharedMemory(create=True, size=size)

# We need to make sure that the memory is released on exit of this process
# Note: order is reverse -> last registered is executed first
atexit.register(shm.unlink)
atexit.register(shm.close)

# Create array based on buffer
arr = np.ndarray(shape, dtype=dtype, buffer=shm.buf)

return shm, arr


def parse_precision(dtype):
"""Parse precision into numpy dtype."""
try:
return np.dtype(dtype)
except TypeError:
try:
return FLOAT_DTYPES[dtype]
except KeyError:
raise ValueError(f'Unknown precision/dtype {dtype}. Expected one '
'of the following: 16, 32 or 64 (default)')
75 changes: 49 additions & 26 deletions navis/nbl/nblast_funcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@

from .. import utils, config
from ..core import NeuronList, Dotprops, make_dotprops
from .base import Blaster
from .base import Blaster, SharedBlaster, create_shared_array, parse_precision

__all__ = ['nblast', 'nblast_smart', 'nblast_allbyall', 'sim_to_dist']

Expand Down Expand Up @@ -234,6 +234,11 @@ def single_query_target(self, q_idx, t_idx, scores='forward'):
return scr


class SharedNBlaster(NBlaster, SharedBlaster):
"""An NBLASTER working with shared memory."""
pass


def nblast_smart(query: Union[Dotprops, NeuronList],
target: Optional[str] = None,
t: Union[int, float] = 90,
Expand Down Expand Up @@ -316,7 +321,7 @@ def nblast_smart(query: Union[Dotprops, NeuronList],
precision : int [16, 32, 64] | str [e.g. "float64"] | np.dtype
Precision for scores. Defaults to 64 bit (double) floats.
This is useful to reduce the memory footprint for very large
matrices. In real-world scenarios 32 bit (single)- and
matrices. In real-world scenarios 32 bit (single) - and
depending on the purpose even 16 bit (half) - are typically
sufficient.
n_cores : int, optional
Expand Down Expand Up @@ -685,21 +690,30 @@ def nblast(query: Union[Dotprops, NeuronList],
else:
n_rows, n_cols = find_batch_partition(batch_size, query_dps, target_dps)

# Force precision into numpy dtype
dtype = parse_precision(precision)

# Create scores array
shm, arr = create_shared_array(shape=(len(query_dps), len(target_dps)),
dtype=dtype)

nblasters = []
with config.tqdm(desc='Preparing',
total=n_rows * n_cols,
leave=False,
disable=not progress) as pbar:
offset_x = 0
for q in np.array_split(query_dps, n_rows):
offset_y = 0
for t in np.array_split(target_dps, n_cols):
# Initialize NBlaster
this = NBlaster(use_alpha=use_alpha,
normalized=normalized,
smat=smat,
limit_dist=limit_dist,
dtype=precision,
approx_nn=approx_nn,
progress=progress)
this = SharedNBlaster(use_alpha=use_alpha,
normalized=normalized,
smat=smat,
limit_dist=limit_dist,
dtype=dtype,
approx_nn=approx_nn,
progress=progress)

# Use better description if we process in batches
if batch_size:
Expand All @@ -713,32 +727,41 @@ def nblast(query: Union[Dotprops, NeuronList],
# Keep track of indices of queries and targets
this.queries = np.arange(len(q))
this.targets = np.arange(len(t)) + len(q)

# Make sure progress bars don't overlap
this.pbar_position = len(nblasters) % n_cores

# This is the offset into the shared array
this.offset_x = slice(offset_x, offset_x + len(q))
this.offset_y = slice(offset_y, offset_y + len(t))

nblasters.append(this)
pbar.update(1)
offset_y += len(t)
offset_x += len(q)

# If only one core, we don't need to break out the multiprocessing
if n_cores == 1:
return this.multi_query_target(this.queries,
this.targets,
scores=scores)

with ProcessPoolExecutor(max_workers=n_cores) as pool:
# Each nblaster is passed to its own process
futures = [pool.submit(this.multi_query_target,
q_idx=this.queries,
t_idx=this.targets,
scores=scores) for this in nblasters]

results = [f.result() for f in futures]
_ = this.multi_query_target(this.queries,
this.targets,
shm=shm,
shape=arr.shape,
offset=None,
scores=scores)
else:
with ProcessPoolExecutor(max_workers=n_cores) as pool:
# Each nblaster is passed to its own process
futures = [pool.submit(this.multi_query_target,
q_idx=this.queries,
t_idx=this.targets,
shm=shm,
shape=arr.shape,
offset=(this.offset_x, this.offset_y),
scores=scores) for this in nblasters]

scores = pd.DataFrame(np.zeros((len(query_dps), len(target_dps)),
dtype=this.dtype),
index=query_dps.id, columns=target_dps.id)
_ = [f.result() for f in futures]

for res in results:
scores.loc[res.index, res.columns] = res.values
scores = pd.DataFrame(arr, index=query_dps.id, columns=target_dps.id)

return scores

Expand Down