Skip to content

Commit

Permalink
Add Comm.irecv() and Request.{wait|test}[any|all]() (with Aron Ahmadia)
Browse files Browse the repository at this point in the history
  • Loading branch information
dalcinl committed Dec 27, 2011
1 parent 1a405f0 commit 9736998
Show file tree
Hide file tree
Showing 4 changed files with 377 additions and 13 deletions.
6 changes: 6 additions & 0 deletions src/MPI/Comm.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -847,6 +847,12 @@ cdef class Comm:
request.ob_buf = PyMPI_issend(obj, dest, tag, comm, &request.ob_mpi)
return request
#
def irecv(self, obj=None, int dest=0, int tag=0):
cdef MPI_Comm comm = self.ob_mpi
cdef Request request = <Request>Request.__new__(Request)
request.ob_buf = PyMPI_irecv(obj, dest, tag, comm, &request.ob_mpi)
return request
#
def barrier(self):
"Barrier"
cdef MPI_Comm comm = self.ob_mpi
Expand Down
53 changes: 53 additions & 0 deletions src/MPI/Request.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,59 @@ cdef class Request:
request.ob_mpi = MPI_Request_f2c(arg)
return request

# Python Communication
# --------------------
#
def wait(self, Status status=None):
"""
Wait for a send or receive to complete
"""
cdef msg = PyMPI_wait(self, status)
return msg
#
def test(self, Status status=None):
"""
Test for the completion of a send or receive
"""
cdef int flag = 0
cdef msg = PyMPI_test(self, &flag, status)
return (<bint>flag, msg)
#
@classmethod
def waitany(cls, requests, Status status=None):
"""
Wait for any previously initiated request to complete
"""
cdef int index = MPI_UNDEFINED
cdef msg = PyMPI_waitany(requests, &index, status)
return (index, msg)
#
@classmethod
def testany(cls, requests, Status status=None):
"""
Test for completion of any previously initiated request
"""
cdef int index = MPI_UNDEFINED
cdef int flag = 0
cdef msg = PyMPI_testany(requests, &index, &flag, status)
return (index, <bint>flag, msg)
#
@classmethod
def waitall(cls, requests, statuses=None):
"""
Wait for all previously initiated requests to complete
"""
cdef msg = PyMPI_waitall(requests, statuses)
return msg
#
@classmethod
def testall(cls, requests, statuses=None):
"""
Test for completion of all previously initiated requests
"""
cdef int flag = 0
cdef msg = PyMPI_testall(requests, &flag, statuses)
return (<bint>flag, msg)


cdef class Prequest(Request):
Expand Down
245 changes: 233 additions & 12 deletions src/MPI/pickled.pxi
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# -----------------------------------------------------------------------------

cdef extern from "Python.h":
enum: PY_MAJOR_VERSION
bint PyBytes_CheckExact(object)
char* PyBytes_AsString(object) except NULL
Py_ssize_t PyBytes_Size(object) except -1
object PyBytes_FromStringAndSize(char*,Py_ssize_t)
Expand All @@ -20,28 +22,48 @@ except ImportError:
from pickle import loads as PyPickle_loads
from pickle import HIGHEST_PROTOCOL as PyPickle_PROTOCOL

cdef object PyStringIO_New = None
cdef object PyPickle_loadf = None
try:
from cStringIO import StringIO as PyStringIO_New
from cPickle import load as PyPickle_loadf
except ImportError:
pass

cdef class _p_Pickle:

cdef object ob_dumps
cdef object ob_loads
cdef object ob_PROTOCOL

def __cinit__(self):
self.ob_dumps = PyPickle_dumps
self.ob_loads = PyPickle_loads
self.ob_dumps = None
self.ob_loads = None
self.ob_PROTOCOL = PyPickle_PROTOCOL

property dumps:
def __get__(self):
return self.ob_dumps
if self.ob_dumps is None:
return PyPickle_dumps
else:
return self.ob_dumps
def __set__(self, dumps):
self.ob_dumps = dumps
if dumps is PyPickle_dumps:
self.ob_dumps = None
else:
self.ob_dumps = dumps

property loads:
def __get__(self):
return self.ob_loads
if self.ob_loads is None:
return PyPickle_loads
else:
return self.ob_loads
def __set__(self, loads):
self.ob_loads = loads
if loads is PyPickle_loads:
self.ob_loads = None
else:
self.ob_loads = loads

property PROTOCOL:
def __get__(self):
Expand All @@ -54,7 +76,11 @@ cdef class _p_Pickle:
p[0] = NULL
n[0] = 0
return None
cdef object buf = self.ob_dumps(obj, self.ob_PROTOCOL)
cdef object buf
if self.ob_dumps is None:
buf = PyPickle_dumps(obj, self.ob_PROTOCOL)
else:
buf = self.ob_dumps(obj, self.ob_PROTOCOL)
p[0] = <void*> PyBytes_AsString(buf)
n[0] = <int> PyBytes_Size(buf) # XXX overflow?
return buf
Expand All @@ -63,15 +89,29 @@ cdef class _p_Pickle:
if n == 0:
p[0] = NULL
return None
cdef object buf = PyBytes_FromStringAndSize(NULL, n)
cdef object buf
buf = PyBytes_FromStringAndSize(NULL, n)
p[0] = PyBytes_AsString(buf)
return buf

cdef object load(self, object buf):
if buf is None:
return None
cdef object obj = self.ob_loads(buf)
return obj
if buf is None: return None
cdef bint use_StringIO = \
(PY_MAJOR_VERSION == 2 and
not PyBytes_CheckExact(buf) and
PyStringIO_New is not None)
if self.ob_loads is None:
if use_StringIO:
buf = PyStringIO_New(buf)
if PyPickle_loadf is not None:
return PyPickle_loadf(buf)
buf = buf.read()
return PyPickle_loads(buf)
else:
if use_StringIO:
buf = PyStringIO_New(buf)
buf = buf.read()
return self.ob_loads(buf)

cdef object dumpv(self, object obj, void **p,
int n, int cnt[], int dsp[]):
Expand Down Expand Up @@ -286,6 +326,187 @@ cdef object PyMPI_issend(object obj, int dest, int tag,
dest, tag, comm, request) )
return smsg


cdef object PyMPI_irecv(object obj, int dest, int tag,
MPI_Comm comm, MPI_Request *request):
cdef _p_Pickle pickle = PyMPI_pickle()
#
cdef void *rbuf = NULL
cdef int rcount = 0
cdef MPI_Datatype rtype = MPI_BYTE
#
cdef _p_buffer rmsg = None
cdef int dorecv = (dest != MPI_PROC_NULL)
if dorecv:
if obj is None:
rcount = <int>(1<<15)
obj = pickle.alloc(&rbuf, rcount)
rmsg = getbuffer(obj, 1, 0)
#elif is_int(obj):
# rcount = <int> obj
# obj = pickle.alloc(&rbuf, rcount)
# rmsg = getbuffer(obj, 1, 0)
else:
rmsg = getbuffer(obj, 0, 0)
rbuf = rmsg.view.buf
rcount = <int> rmsg.view.len # XXX overflow?
with nogil: CHKERR( MPI_Irecv(rbuf, rcount, rtype,
dest, tag, comm, request) )
return rmsg


cdef object PyMPI_wait(Request request, Status status):
cdef _p_Pickle pickle = PyMPI_pickle()
cdef object buf
#
cdef MPI_Status rsts
with nogil: CHKERR( MPI_Wait(&request.ob_mpi, &rsts) )
buf = request.ob_buf
if status is not None:
status.ob_mpi = rsts
if request.ob_mpi == MPI_REQUEST_NULL:
request.ob_buf = None
#
cdef int rcount = 0
cdef MPI_Datatype rtype = MPI_BYTE
if type(buf) is not _p_buffer: return None
CHKERR( MPI_Get_count(&rsts, rtype, &rcount) )
if rcount <= 0: return None
return pickle.load(buf)


cdef object PyMPI_test(Request request, int *flag, Status status):
cdef _p_Pickle pickle = PyMPI_pickle()
cdef object buf
#
cdef MPI_Status rsts
with nogil: CHKERR( MPI_Test(&request.ob_mpi, flag, &rsts) )
if flag[0]:
buf = request.ob_buf
if status is not None:
status.ob_mpi = rsts
if request.ob_mpi == MPI_REQUEST_NULL:
request.ob_buf = None
#
if not flag[0]: return None
cdef int rcount = 0
cdef MPI_Datatype rtype = MPI_BYTE
if type(buf) is not _p_buffer: return None
CHKERR( MPI_Get_count(&rsts, rtype, &rcount) )
if rcount <= 0: return None
return pickle.load(buf)


cdef object PyMPI_waitany(requests, int *index, Status status):
cdef _p_Pickle pickle = PyMPI_pickle()
cdef object buf
#
cdef int count = 0
cdef MPI_Request *irequests = NULL
cdef MPI_Status rsts
#
cdef tmp = acquire_rs(requests, None, &count, &irequests, NULL)
try:
with nogil: CHKERR( MPI_Waitany(count, irequests, index, &rsts) )
if index[0] != MPI_UNDEFINED:
buf = (<Request>requests[<Py_ssize_t>index[0]]).ob_buf
if status is not None:
status.ob_mpi = rsts
finally:
release_rs(requests, None, count, irequests, NULL)
#
if index[0] == MPI_UNDEFINED: return None
cdef int rcount = 0
cdef MPI_Datatype rtype = MPI_BYTE
if type(buf) is not _p_buffer: return None
CHKERR( MPI_Get_count(&rsts, rtype, &rcount) )
if rcount <= 0: return None
return pickle.load(buf)


cdef object PyMPI_testany(requests, int *index, int *flag, Status status):
cdef _p_Pickle pickle = PyMPI_pickle()
cdef object buf
#
cdef int count = 0
cdef MPI_Request *irequests = NULL
cdef MPI_Status rsts
#
cdef tmp = acquire_rs(requests, None, &count, &irequests, NULL)
try:
with nogil: CHKERR( MPI_Testany(count, irequests, index, flag, &rsts) )
if index[0] != MPI_UNDEFINED:
buf = (<Request>requests[<Py_ssize_t>index[0]]).ob_buf
if status is not None:
status.ob_mpi = rsts
finally:
release_rs(requests, None, count, irequests, NULL)
#
if index[0] == MPI_UNDEFINED: return None
if not flag[0]: return None
cdef int rcount = 0
cdef MPI_Datatype rtype = MPI_BYTE
if type(buf) is not _p_buffer: return None
CHKERR( MPI_Get_count(&rsts, rtype, &rcount) )
if rcount <= 0: return None
return pickle.load(buf)


cdef object PyMPI_waitall(requests, statuses):
cdef _p_Pickle pickle = PyMPI_pickle()
cdef object buf, bufs
#
cdef Py_ssize_t i = 0
cdef int count = 0
cdef MPI_Request *irequests = NULL
cdef MPI_Status *istatuses = MPI_STATUSES_IGNORE
#
cdef tmp = acquire_rs(requests, True, &count, &irequests, &istatuses)
try:
with nogil: CHKERR( MPI_Waitall(count, irequests, istatuses) )
bufs = [(<Request>requests[i]).ob_buf for i from 0 <= i < count]
finally:
release_rs(requests, statuses, count, irequests, NULL)
#
cdef int rcount = 0
cdef MPI_Datatype rtype = MPI_BYTE
for i from 0 <= i < count:
if type(bufs[i]) is not _p_buffer:
bufs[i] = None; continue
rcount = 0
CHKERR( MPI_Get_count(&istatuses[i], rtype, &rcount) )
if rcount <= 0: bufs[i] = None
return [pickle.load(buf) for buf in bufs]


cdef object PyMPI_testall(requests, int *flag, statuses):
cdef _p_Pickle pickle = PyMPI_pickle()
cdef object buf, bufs
#
cdef Py_ssize_t i = 0
cdef int count = 0
cdef MPI_Request *irequests = NULL
cdef MPI_Status *istatuses = MPI_STATUSES_IGNORE
#
cdef tmp = acquire_rs(requests, True, &count, &irequests, &istatuses)
try:
with nogil: CHKERR( MPI_Testall(count, irequests, flag, istatuses) )
if flag[0]:
bufs = [(<Request>requests[i]).ob_buf for i from 0 <= i < count]
finally:
release_rs(requests, statuses, count, irequests, NULL)
#
if not flag[0]: return None
cdef int rcount = 0
cdef MPI_Datatype rtype = MPI_BYTE
for i from 0 <= i < count:
if type(bufs[i]) is not _p_buffer:
bufs[i] = None; continue
rcount = 0
CHKERR( MPI_Get_count(&istatuses[i], rtype, &rcount) )
if rcount <= 0: bufs[i] = None
return [pickle.load(buf) for buf in bufs]

# -----------------------------------------------------------------------------

cdef object PyMPI_barrier(MPI_Comm comm):
Expand Down
Loading

0 comments on commit 9736998

Please sign in to comment.