Sophie

Sophie

distrib > Mandriva > 2008.1 > i586 > by-pkgid > 2fe96174012fea2d88f752857a5bea1d > files > 96

python-mpi4py-0.6.0-4mdv2008.1.i586.rpm

import sys, unittest
from mpi4py import MPI

arrayimpl = []

try:
    import array
    mk_array = lambda typecode, init: array.array(typecode, init)
    eq_array = lambda a, b : a == b
    arrayimpl.append((mk_array, eq_array))
except ImportError:
    pass

try:
    import numpy
    mk_numpy = lambda typecode, init: numpy.array(init, dtype=typecode)
    eq_numpy = lambda a, b : (a == b).all()
    arrayimpl.append((mk_numpy, eq_numpy))
except ImportError:
    pass


typemap = dict(#b=MPI.CHAR,  #B=MPI.UNSIGNED_CHAR,
               h=MPI.SHORT, H=MPI.UNSIGNED_SHORT,
               i=MPI.INT,   I=MPI.UNSIGNED,
               l=MPI.LONG,  L=MPI.UNSIGNED_LONG,
               f=MPI.FLOAT, )# d=MPI.DOUBLE)

#typemap = dict(i=MPI.INT)
typemap = dict(h=MPI.SHORT)

class TestPackBase(object):

    COMM = MPI.COMM_NULL

    def testPackSize(self):
        for typecode, datatype in typemap.items():
            itemsize = array.array(typecode).itemsize
            overhead = datatype.Pack_size(0, self.COMM)
            for count in range(10):
                pack_size = datatype.Pack_size(count, self.COMM)
                self.assertEqual(pack_size - overhead, count*itemsize)

    def testPackUnpack(self):
        for array, equal in arrayimpl:
            for items in range(10):
                for typecode1, datatype1 in typemap.items():
                    for typecode2, datatype2 in typemap.items():
                        # input and output arrays
                        iarray1 = array(typecode1, range(items))
                        iarray2 = array(typecode2, range(items))
                        oarray1 = array(typecode1, [items] * items)
                        oarray2 = array(typecode2, [items] * items)
                        # temp array for packing
                        size1 = datatype1.Pack_size(len(iarray1), self.COMM)
                        size2 = datatype2.Pack_size(len(iarray2), self.COMM)
                        tmpbuf = array('b', [0] * (size1 + size2 + 1))
                        # pack input arrays
                        position = 0
                        position = datatype1.Pack(iarray1, tmpbuf, position, self.COMM)
                        position = datatype2.Pack(iarray2, tmpbuf, position, self.COMM)
                        # unpack output arrays
                        position = 0
                        position = datatype1.Unpack(tmpbuf, position, oarray1, self.COMM)
                        position = datatype2.Unpack(tmpbuf, position, oarray2, self.COMM)
                        # test
                        self.assertTrue(equal(iarray1, oarray1))
                        self.assertTrue(equal(iarray2, oarray2))

EXT32 = 'external32'

class TestPackExternalBase(object):

    byteswap = False

    def testPackSize(self):
        try:
            MPI.BYTE.Pack_external_size(EXT32, 0)
        except NotImplementedError:
            return
        for typecode, datatype in typemap.items():
            itemsize = array.array(typecode).itemsize
            overhead = datatype.Pack_external_size(EXT32, 0)
            for count in range(10):
                pack_size = datatype.Pack_external_size(EXT32, count)
                real_size = pack_size - overhead

    def testPackUnpackExternal(self):
        try:
            MPI.BYTE.Pack_external_size(EXT32, 0)
        except NotImplementedError:
            return
        for array, equal in arrayimpl:
            for items in range(1, 10):
                for typecode1, datatype1 in typemap.items():
                    for typecode2, datatype2 in typemap.items():
                        # input and output arrays
                        iarray1 = array(typecode1,[256] * items )#range(items))
                        iarray2 = array(typecode2, range(items))
                        oarray1 = array(typecode1, [items] * items)
                        oarray2 = array(typecode2, [items] * items)
                        # temp array for packing
                        size1 = datatype1.Pack_external_size(EXT32, len(iarray1))
                        size2 = datatype2.Pack_external_size(EXT32, len(iarray2))
                        tmpbuf = array('b', [0] * (size1 + size2 + 1))
                        # pack input arrays
                        position = 0
                        position = datatype1.Pack_external(EXT32, iarray1, tmpbuf, position)
                        position = datatype2.Pack_external(EXT32, iarray2, tmpbuf, position)
                        # unpack output arrays
                        position = 0
                        position = datatype1.Unpack_external(EXT32, tmpbuf, position, oarray1)
                        position = datatype2.Unpack_external(EXT32, tmpbuf, position, oarray2)
                        # test result
                        if self.byteswap:
                            if type(oarray1).__name__ == 'array':
                                inplace = ( )
                            else:
                                inplace = (True,)
                            oarray1.byteswap(*inplace)
                            oarray2.byteswap(*inplace)
                        self.assertTrue(equal(iarray1, oarray1))
                        self.assertTrue(equal(iarray2, oarray2))


class TestPackSelf(TestPackBase, unittest.TestCase):
    COMM = MPI.COMM_SELF

class TestPackWorld(TestPackBase, unittest.TestCase):
    COMM = MPI.COMM_SELF

class TestPackExternal(TestPackExternalBase, unittest.TestCase):
    pass


_name, _version = MPI._mpi_info()
if _name == 'OpenMPI':
    if sys.byteorder == 'little':
        TestPackExternalBase.byteswap = True

if __name__ == '__main__':
    try:
        unittest.main()
    except SystemExit:
        pass