在上一篇中我們介紹了 mpi4py 中標(biāo)準(zhǔn)阻塞通信模式,下面我們將介紹緩沖阻塞通信模式紧憾。
緩沖通信模式主要用于解開阻塞通信的發(fā)送和接收之間的耦合。有了緩沖機(jī)制昆码,即使在接受端沒有啟動相應(yīng)的接收的情況下顷歌,在完成其消息數(shù)據(jù)到緩沖區(qū)的轉(zhuǎn)移后發(fā)送端的阻塞發(fā)送函數(shù)也可返回锰蓬。其實標(biāo)準(zhǔn)通信模式中也存在緩沖機(jī)制,它使用的是 MPI 環(huán)境所提供的數(shù)據(jù)緩沖區(qū)眯漩,是有一定大小的芹扭。使用緩沖通信模式,我們可以自己分配和組裝一塊內(nèi)存區(qū)域用作緩沖區(qū)坤塞,緩沖區(qū)的大小可以根據(jù)需要進(jìn)行控制冯勉。但需要注意的是,當(dāng)消息大小超過緩沖區(qū)容量時摹芙,程序會出錯灼狰。
下面是 mpi4py 中用于緩沖阻塞點到點通信的方法接口(MPI.Comm 類的方法):
bsend(self, obj, int dest, int tag=0)
recv(self, buf=None, int source=ANY_SOURCE, int tag=ANY_TAG, Status status=None)
Bsend(self, buf, int dest, int tag=0)
Recv(self, buf, int source=ANY_SOURCE, int tag=ANY_TAG, Status status=None)
這些方法調(diào)用中的參數(shù)是與標(biāo)準(zhǔn)通信模式的方法調(diào)用參數(shù)一樣的。
另外我們會用到的裝配和卸載用于通信的緩沖區(qū)的函數(shù)如下:
MPI.Attach_buffer(buf)
MPI.Detach_buffer()
下面分別給出 bsend/recv 和 Bsend/Recv 的使用例程浮禾。
# bsend_recv.py
from mpi4py import MPI
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
# MPI.BSEND_OVERHEAD gives the extra overhead in buffered mode
BUFSISE = 2000 + MPI.BSEND_OVERHEAD
buf = bytearray(BUFSISE)
# Attach a user-provided buffer for sending in buffered mode
MPI.Attach_buffer(buf)
send_obj = {'a': [1, 2.4, 'abc', -2.3+3.4J],
'b': {2, 3, 4}}
if rank == 0:
comm.bsend(send_obj, dest=1, tag=11)
recv_obj = comm.recv(source=1, tag=22)
elif rank == 1:
recv_obj = comm.recv(source=0, tag=11)
comm.bsend(send_obj, dest=0, tag=22)
print 'process %d receives %s' % (rank, recv_obj)
# Remove an existing attached buffer
MPI.Detach_buffer()
運行結(jié)果如下:
$ mpiexec -n 2 python bsend_recv.py
process 0 receives {'a': [1, 2.4, 'abc', (-2.3+3.4j)], 'b': set([2, 3, 4])}
process 1 receives {'a': [1, 2.4, 'abc', (-2.3+3.4j)], 'b': set([2, 3, 4])}
# Bsend_recv.py
import numpy as np
from mpi4py import MPI
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
# MPI.BSEND_OVERHEAD gives the extra overhead in buffered mode
BUFSISE = 2000 + MPI.BSEND_OVERHEAD
buf = bytearray(BUFSISE)
# Attach a user-provided buffer for sending in buffered mode
MPI.Attach_buffer(buf)
count = 10
send_buf = np.arange(count, dtype='i')
recv_buf = np.empty(count, dtype='i')
if rank == 0:
comm.Bsend(send_buf, dest=1, tag=11)
comm.Recv(recv_buf, source=1, tag=22)
elif rank == 1:
comm.Recv(recv_buf, source=0, tag=11)
comm.Bsend(send_buf, dest=0, tag=22)
print 'process %d receives %s' % (rank, recv_buf)
# Remove an existing attached buffer
MPI.Detach_buffer()
運行結(jié)果如下:
$ mpiexec -n 2 python Bsend_recv.py
process 0 receives [0 1 2 3 4 5 6 7 8 9]
process 1 receives [0 1 2 3 4 5 6 7 8 9]
在以上兩個例程中交胚,因為發(fā)送的數(shù)據(jù)量很小,即使不裝配一個用于通信的緩沖區(qū)盈电,程序一樣可以工作(讀者可以試一試)蝴簇,這時將使用 MPI 環(huán)境提供的緩沖區(qū)。但是當(dāng)通信的數(shù)據(jù)量很大超過 MPI 環(huán)境提供的緩沖區(qū)容量時匆帚,就必須提供一個足夠大的緩沖區(qū)以使程序能夠正常工作熬词。
可以用下面這個例程測試一下 MPI 環(huán)境提供的緩沖區(qū)大小。
# attach_detach_buf.py
import numpy as np
from mpi4py import MPI
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
max_msg_size = 2**10
BUFSISE = 32 * max_msg_size
mpi_buf = bytearray(BUFSISE)
# Attach a big user-provided buffer for sending in buffered mode
MPI.Attach_buffer(mpi_buf)
recv_buf = np.empty((max_msg_size,), np.float64)
if rank == 0:
print '-' * 80
print 'With an attached big buffer:'
print
msg_size = 1
tag = 0
while msg_size <= max_msg_size:
msg = np.random.random((msg_size,))
if rank == 0:
print 'Trying with size: ', msg_size
comm.Bsend(msg, (rank+1)%2, tag)
comm.Recv(recv_buf, (rank+1)%2, tag)
if rank == 0:
print 'Completed with size: ', msg_size
msg_size *= 2
tag += 1
# Remove an existing attached buffer
MPI.Detach_buffer()
if rank == 0:
print
print '-' * 80
print 'Without an attached big buffer:'
print
msg_size = 1
tag = 0
while msg_size <= max_msg_size:
msg = np.random.random((msg_size,))
if rank == 0:
print 'Trying with size: ', msg_size
comm.Bsend(msg, (rank+1)%2, tag)
comm.Recv(recv_buf, (rank+1)%2, tag)
if rank == 0:
print 'Completed with size: ', msg_size
msg_size *= 2
tag += 1
運行結(jié)果如下:
$ mpiexec -n 2 python attach_detach_buf.py
--------------------------------------------------------------------------------
With an attached big buffer:
Trying with size: 1
Completed with size: 1
Trying with size: 2
Completed with size: 2
Trying with size: 4
Completed with size: 4
Trying with size: 8
Completed with size: 8
Trying with size: 16
Completed with size: 16
Trying with size: 32
Completed with size: 32
Trying with size: 64
Completed with size: 64
Trying with size: 128
Completed with size: 128
Trying with size: 256
Completed with size: 256
Trying with size: 512
Completed with size: 512
Trying with size: 1024
Completed with size: 1024
--------------------------------------------------------------------------------
Without an attached big buffer:
Trying with size: 1
Completed with size: 1
Trying with size: 2
Completed with size: 2
Trying with size: 4
Completed with size: 4
Trying with size: 8
Traceback (most recent call last):
Completed with size: 8
Trying with size: 16
Completed with size: 16
Trying with size: 32
Completed with size: 32
Trying with size: 64
Completed with size: 64
Trying with size: 128
Completed with size: 128
Trying with size: 256
Completed with size: 256
Trying with size: 512
Traceback (most recent call last):
File "attach_detach_buf.py", line 56, in <module>
File "attach_detach_buf.py", line 56, in <module>
comm.Bsend(msg, (rank+1)%2, tag)
File "Comm.pyx", line 286, in mpi4py.MPI.Comm.Bsend (src/mpi4py.MPI.c:64922)
comm.Bsend(msg, (rank+1)%2, tag)
mpi4py.MPI.Exception: MPI_ERR_BUFFER: invalid buffer pointer
File "Comm.pyx", line 286, in mpi4py.MPI.Comm.Bsend (src/mpi4py.MPI.c:64922)
mpi4py.MPI.Exception: MPI_ERR_BUFFER: invalid buffer pointer
-------------------------------------------------------
Primary job terminated normally, but 1 process returned
a non-zero exit code.. Per user-direction, the job has been aborted.
-------------------------------------------------------
--------------------------------------------------------------------------
mpiexec detected that one or more processes exited with non-zero status, thus causing
the job to be terminated. The first process to do so was:
Process name: [[45613,1],0]
Exit code: 1
--------------------------------------------------------------------------
可以看出吸重,當(dāng)我們提供一個大的緩沖區(qū)時就能夠成功地收發(fā)大的消息互拾,但是當(dāng)我們卸載掉這個緩沖區(qū)后,再發(fā)送大的消息時就出錯了嚎幸。
上面我們介紹 mpi4py 中緩沖阻塞通信模式颜矿,在下一篇中我們將介紹就緒阻塞通信模式。