Sophie

Sophie

distrib > Mandriva > 2008.1 > i586 > by-pkgid > 2fe96174012fea2d88f752857a5bea1d > files > 94

python-mpi4py-0.6.0-4mdv2008.1.i586.rpm

import unittest
from mpi4py import MPI
from array import array

typemap = dict(#b=MPI.CHAR,
               h=MPI.SHORT,
               i=MPI.INT,
               l=MPI.LONG,
               f=MPI.FLOAT,
               d=MPI.DOUBLE)

arrayimpl = []

try:
    import array
    def mk_dtype_array(a, datatype):
        return datatype or typemap[a.typecode]
    def mk_buf_array_1(a, dt=None, c=None):
        return MPI.Buffer(a, c or len(a), mk_dtype_array(a, dt))
    def mk_buf_array_2(a, dt=None, c=None):
        bptr, blen = a.buffer_info()
        return MPI.Buffer((bptr, False), c or blen, mk_dtype_array(a, dt))
    mk_buf_array = (mk_buf_array_1, mk_buf_array_2)
    mk_arr_array = lambda typecode, init: array.array(typecode, init)
    eq_arr_array = lambda a, b : a == b
    arrayimpl.append((mk_buf_array, mk_arr_array, eq_arr_array))
except ImportError:
    pass

try:
    import numpy
    def mk_dtype_numpy(a, datatype):
        return datatype or typemap[a.dtype.char]
    def mk_buf_numpy_1(a, dt=None, c=None):
        return MPI.Buffer(a, c or a.size, mk_dtype_numpy(a, dt))
    def mk_buf_numpy_2(a, dt=None, c=None):
        return MPI.Buffer(a.data, c or a.size, mk_dtype_numpy(a, dt))
    def mk_buf_numpy_3(a, dt=None, c=None):
        data = a.__array_interface__['data']
        return MPI.Buffer(data, c or a.size, mk_dtype_numpy(a, dt))
    mk_buf_numpy = (mk_buf_numpy_1, mk_buf_numpy_2, mk_buf_numpy_3)
    mk_arr_numpy = lambda typecode, init: numpy.array(init, dtype=typecode)
    eq_arr_numpy = lambda a, b : numpy.allclose(a, b)
    arrayimpl.append((mk_buf_numpy, mk_arr_numpy, eq_arr_numpy))
except ImportError:
    pass


def maxvalue(a):
    try:
        typecode = a.typecode
    except AttributeError:
        typecode = a.dtype.char
    if typecode == ('f'):
        return 1e30
    elif typecode == ('d'):
        return 1e300
    else:
        return 2 ** (a.itemsize * 7) - 1


class TestP2PBufBase(object):

    COMM = MPI.COMM_NULL

    def testSendrecv(self):
        size = self.COMM.Get_size()
        rank = self.COMM.Get_rank()
        dest = (rank + 1) % size
        source = (rank - 1) % size
        for mkbufs, array, equal in arrayimpl:
            for mkbuf in mkbufs:
                for typecode, datatype in typemap.items():
                    for s in range(0, size):
                        sbuf = array(typecode, [s] * s)
                        rbuf = array(typecode, [-1] * s)
                        self.COMM.Sendrecv(mkbuf(sbuf, datatype), dest, 0,
                                           mkbuf(rbuf, datatype), source, 0)
                        for value in rbuf:
                            self.assertEqual(value, s)

    def testSendRecv(self):
        size = self.COMM.Get_size()
        rank = self.COMM.Get_rank()
        for mkbufs, array, equal in arrayimpl:
            for mkbuf in mkbufs:
                for typecode, datatype in typemap.items():
                    for s in range(0, size):
                        sbuf = array(typecode, [s] * s)
                        rbuf = array(typecode, [-1] * s)
                        mem  = array(typecode, [0] * (s+MPI.BSEND_OVERHEAD))
                        if size == 1:
                            MPI.Attach_buffer(mem)
                            rbuf = sbuf
                            MPI.Detach_buffer()
                        elif rank == 0:
                            MPI.Attach_buffer(mem)
                            self.COMM.Bsend(mkbuf(sbuf, datatype), 1, 0)
                            MPI.Detach_buffer()
                            self.COMM.Send(mkbuf(sbuf, datatype), 1, 0)
                            self.COMM.Ssend(mkbuf(sbuf, datatype), 1, 0)
                            self.COMM.Recv(mkbuf(rbuf, datatype),  1, 0)
                            self.COMM.Recv(mkbuf(rbuf, datatype), 1, 0)
                            self.COMM.Recv(mkbuf(rbuf, datatype), 1, 0)
                        elif rank == 1:
                            self.COMM.Recv(mkbuf(rbuf, datatype), 0, 0)
                            self.COMM.Recv(mkbuf(rbuf, datatype), 0, 0)
                            self.COMM.Recv(mkbuf(rbuf, datatype), 0, 0)
                            MPI.Attach_buffer(mem)
                            self.COMM.Bsend(mkbuf(sbuf, datatype), 0, 0)
                            MPI.Detach_buffer()
                            self.COMM.Send(mkbuf(sbuf, datatype), 0, 0)
                            self.COMM.Ssend(mkbuf(sbuf, datatype), 0, 0)
                        else:
                            rbuf = sbuf

                        for value in rbuf:
                            self.assertEqual(value, s)


    def testPersistent(self):
        size = self.COMM.Get_size()
        rank = self.COMM.Get_rank()
        dest = (rank + 1) % size
        source = (rank - 1) % size
        for mkbufs, array, equal in arrayimpl:
            for mkbuf in mkbufs:
                for typecode, datatype in typemap.items():
                    for s in range(size):
                        #
                        sbuf = array(typecode, [s]*s)
                        rbuf = array(typecode, [-1]*s)
                        sendreq = self.COMM.Send_init(mkbuf(sbuf, datatype), dest, 0)
                        recvreq = self.COMM.Recv_init(mkbuf(rbuf, datatype), source, 0)
                        sendreq.Start()
                        recvreq.Start()
                        sendreq.Wait()
                        recvreq.Wait()
                        self.assertNotEqual(sendreq, MPI.REQUEST_NULL)
                        self.assertNotEqual(recvreq, MPI.REQUEST_NULL)
                        sendreq.Free()
                        recvreq.Free()
                        #
                        sbuf = array(typecode, [s]*s)
                        rbuf = array(typecode, [-1]*s)
                        sendreq = self.COMM.Send_init(mkbuf(sbuf, datatype), dest, 0)
                        recvreq = self.COMM.Recv_init(mkbuf(rbuf, datatype), source, 0)
                        reqlist = [sendreq, recvreq]
                        MPI.Prequest.Startall(reqlist)
                        index = MPI.Prequest.Waitany(reqlist)
                        self.assertTrue(index in [0, 1])
                        self.assertNotEqual(reqlist[index], MPI.REQUEST_NULL)
                        MPI.Prequest.Waitall(reqlist)
                        for preq in reqlist:
                            self.assertNotEqual(preq, MPI.REQUEST_NULL)
                            preq.Free()
                            self.assertEqual(preq, MPI.REQUEST_NULL)
                        for value in rbuf:
                            self.assertEqual(value, s)


class TestP2PBufSelf(TestP2PBufBase, unittest.TestCase):
    COMM = MPI.COMM_SELF

class TestP2PBufWorld(TestP2PBufBase, unittest.TestCase):
    COMM = MPI.COMM_WORLD



if __name__ == '__main__':
    try:
        unittest.main()
    except SystemExit:
        pass