aboutsummaryrefslogtreecommitdiff
path: root/Carpet
diff options
context:
space:
mode:
authorErik Schnetter <schnetter@cct.lsu.edu>2007-04-19 01:39:00 +0000
committerErik Schnetter <schnetter@cct.lsu.edu>2007-04-19 01:39:00 +0000
commit242efeb8f500afb5fca33220d40e9930dfb55929 (patch)
tree75050282cf94e676a6bae54317b1c26820f68402 /Carpet
parent018b9711ff2f4fbc8451d5ccf645e86ae9ac4435 (diff)
CarpetLib: Various commstate changes
Always use collective communication buffers in commstate class. Add functions to reserve space in a commbuf, to get a pointer into the space, and to commit space. This encapsulates using commbufs. darcs-hash:20070419013946-dae7b-fce3d05b5e90fb37588939d1b11dce6d48ea2ead.gz
Diffstat (limited to 'Carpet')
-rw-r--r--Carpet/CarpetLib/src/commstate.cc509
-rw-r--r--Carpet/CarpetLib/src/commstate.hh88
2 files changed, 308 insertions, 289 deletions
diff --git a/Carpet/CarpetLib/src/commstate.cc b/Carpet/CarpetLib/src/commstate.cc
index b770a706a..34d75ac12 100644
--- a/Carpet/CarpetLib/src/commstate.cc
+++ b/Carpet/CarpetLib/src/commstate.cc
@@ -22,25 +22,16 @@ using namespace CarpetLib;
// Communication state control
comm_state::comm_state ()
{
- // If CarpetLib::use_collective_communication_buffers is set to true,
- // this comm_state object will use collective communications,
- // i.e., it will step through
- // state_get_buffer_sizes
- // state_fill_send_buffers
- // state_empty_recv_buffers
- //
- // If CarpetLib::use_collective_communication_buffers is false
- // then individual communications on single components are used
- // by stepping through
- // state_post
- // state_wait
+ // A comm_state object will step through
+ // state_get_buffer_sizes
+ // state_fill_send_buffers
+ // state_empty_recv_buffers
DECLARE_CCTK_PARAMETERS;
static Timer timer ("commstate::create");
timer.start ();
- thestate = use_collective_communication_buffers ?
- state_get_buffer_sizes : state_post;
+ thestate = state_get_buffer_sizes;
typebufs.resize (dist::c_ndatatypes());
for (size_t type = 0; type < typebufs.size(); type++) {
@@ -57,7 +48,6 @@ comm_state::comm_state ()
#include "instantiate"
#undef INSTANTIATE
- recvbuffers_ready.resize (dist::c_ndatatypes() * dist::size());
srequests.resize (dist::c_ndatatypes() * dist::size(), MPI_REQUEST_NULL);
rrequests.resize (dist::c_ndatatypes() * dist::size(), MPI_REQUEST_NULL);
@@ -72,165 +62,165 @@ void comm_state::step ()
total.start ();
assert (thestate != state_done);
switch (thestate) {
- case state_post:
- // After all sends/recvs have been posted in 'state_post'
- // now wait for their completion in 'state_wait'.
- thestate = state_wait;
- break;
-
- case state_wait:
- thestate = state_done;
- break;
-
- case state_get_buffer_sizes:
- // The sizes of the collective communication buffers are known
- // so now allocate them.
- // The receive operations are also posted here already
- // (a clever MPI layer may take advantage of such early posting).
- num_posted_recvs = num_completed_recvs = 0;
-
+
+ case state_get_buffer_sizes:
+ // The sizes of the collective communication buffers are known so
+ // now allocate them.
+ // The receive operations are also posted here already (a clever
+ // MPI layer may take advantage of such early posting).
+ num_posted_recvs = num_completed_recvs = 0;
+
+ for (int proc1 = 0; proc1 < dist::size(); ++ proc1) {
+ size_t const proc =
+ interleave_communications
+ ? (proc1 + dist::rank()) % dist::size()
+ : proc1;
+
+ for (size_t type = 0; type < typebufs.size(); type++) {
+
+ // skip unused datatype buffers
+ if (not typebufs[type].in_use) continue;
+
+ int datatypesize = typebufs[type].datatypesize;
+ procbufdesc& procbuf = typebufs[type].procbufs[proc];
+
+ procbuf.sendbufbase = new char[procbuf.sendbufsize*datatypesize];
+ procbuf.recvbufbase = new char[procbuf.recvbufsize*datatypesize];
+ // TODO: this may be a bit extreme, and it is only for
+ // internal consistency checking
+ if (poison_new_memory) {
+ memset (procbuf.sendbufbase, poison_value,
+ procbuf.sendbufsize*datatypesize);
+ memset (procbuf.recvbufbase, poison_value,
+ procbuf.recvbufsize*datatypesize);
+ }
+ procbuf.sendbuf = procbuf.sendbufbase;
+ procbuf.recvbuf = procbuf.recvbufbase;
+
+ if (procbuf.recvbufsize > 0) {
+ static Timer timer ("commstate_sizes_irecv");
+ timer.start ();
+ int const tag =
+ vary_tags
+ ? (dist::rank() + dist::size() * (proc + dist::size() * type)) % 32768
+ : type;
+ MPI_Irecv (procbuf.recvbufbase, procbuf.recvbufsize,
+ typebufs[type].mpi_datatype, proc, tag,
+ dist::comm(), &rrequests[dist::size()*type + proc]);
+ timer.stop (0);
+ num_posted_recvs++;
+ }
+ }
+ }
+
+ if (barrier_between_stages) {
+ // Add a barrier, to try to ensure that all Irecvs are posted
+ // before the first Isends are made
+ // (Alternative: Use MPI_Alltoallv instead)
+ MPI_Barrier (dist::comm());
+ }
+
+ // Now go and get the send buffers filled with data.
+ // Once a buffer is full it will be posted right away
+ // (see gdata::copy_into_sendbuffer() and
+ // gdata::interpolate_into_sendbuffer()).
+ thestate = state_fill_send_buffers;
+ break;
+
+ case state_fill_send_buffers:
+ if (combine_sends) {
+ // Send the data. Do not send them sequentially, but try to
+ // intersperse the communications
for (int proc1 = 0; proc1 < dist::size(); ++ proc1) {
- size_t const proc =
+ int const proc =
interleave_communications
- ? (proc1 + dist::rank()) % dist::size()
+ ? (proc1 + dist::size() - dist::rank()) % dist::size()
: proc1;
for (size_t type = 0; type < typebufs.size(); type++) {
-
// skip unused datatype buffers
if (not typebufs[type].in_use) continue;
-
- int datatypesize = typebufs[type].datatypesize;
- procbufdesc& procbuf = typebufs[type].procbufs[proc];
-
- procbuf.sendbufbase = new char[procbuf.sendbufsize*datatypesize];
- procbuf.recvbufbase = new char[procbuf.recvbufsize*datatypesize];
- // TODO: this may be a bit extreme, and it is only for
- // internal consistency checking
- if (poison_new_memory) {
- memset (procbuf.sendbufbase, poison_value, procbuf.sendbufsize*datatypesize);
- memset (procbuf.recvbufbase, poison_value, procbuf.recvbufsize*datatypesize);
- }
- procbuf.sendbuf = procbuf.sendbufbase;
- procbuf.recvbuf = procbuf.recvbufbase;
-
- if (procbuf.recvbufsize > 0) {
- static Timer timer ("commstate_sizes_irecv");
- timer.start ();
- int const tag =
- vary_tags
- ? (dist::rank() + dist::size() * (proc + dist::size() * type)) % 32768
- : type;
- MPI_Irecv (procbuf.recvbufbase, procbuf.recvbufsize,
- typebufs[type].mpi_datatype, proc, tag,
- dist::comm(), &rrequests[dist::size()*type + proc]);
- timer.stop (0);
- num_posted_recvs++;
- }
- }
- }
-
- if (barrier_between_stages) {
- // Add a barrier, to try to ensure that all Irecvs are posted
- // before the first Isends are made
- // (Alternative: Use MPI_Alltoallv instead)
- MPI_Barrier (dist::comm());
- }
-
- // Now go and get the send buffers filled with data.
- // Once a buffer is full it will be posted right away
- // (see gdata::copy_into_sendbuffer() and
- // gdata::interpolate_into_sendbuffer()).
- thestate = state_fill_send_buffers;
- break;
-
- case state_fill_send_buffers:
- if (combine_sends) {
- // Send the data. Do not send them sequentially, but try to
- // intersperse the communications
- for (int proc1 = 0; proc1 < dist::size(); ++ proc1) {
- int const proc =
- interleave_communications
- ? (proc1 + dist::size() - dist::rank()) % dist::size()
- : proc1;
-
- for (size_t type = 0; type < typebufs.size(); type++) {
- // skip unused datatype buffers
- if (not typebufs[type].in_use) continue;
- int const datatypesize = typebufs[type].datatypesize;
- procbufdesc const & procbuf = typebufs[type].procbufs[proc];
+ int const datatypesize = typebufs[type].datatypesize;
+ procbufdesc const & procbuf = typebufs[type].procbufs[proc];
- int const fillstate = procbuf.sendbuf - procbuf.sendbufbase;
- assert (fillstate == (int)procbuf.sendbufsize * datatypesize);
+ int const fillstate = procbuf.sendbuf - procbuf.sendbufbase;
+ assert (fillstate == (int)procbuf.sendbufsize * datatypesize);
- if (procbuf.sendbufsize > 0) {
- int const tag =
- vary_tags
- ? (proc + dist::size() * (dist::rank() + dist::size() * type)) % 32768
- : type;
- if (use_mpi_send) {
- // use MPI_Send
- static Timer timer ("commstate_send");
- timer.start ();
- MPI_Send (procbuf.sendbufbase, procbuf.sendbufsize,
- typebufs[type].mpi_datatype, proc, tag,
- dist::comm());
- srequests[dist::size()*type + proc] = MPI_REQUEST_NULL;
- timer.stop (procbuf.sendbufsize * datatypesize);
- } else if (use_mpi_ssend) {
- // use MPI_Ssend
- static Timer timer ("commstate_ssend");
- timer.start ();
- MPI_Ssend (procbuf.sendbufbase, procbuf.sendbufsize,
- typebufs[type].mpi_datatype, proc, tag,
- dist::comm());
- srequests[dist::size()*type + proc] = MPI_REQUEST_NULL;
- timer.stop (procbuf.sendbufsize * datatypesize);
- } else {
- // use MPI_Isend
- static Timer timer ("commstate_isend");
- timer.start ();
- MPI_Isend (procbuf.sendbufbase, procbuf.sendbufsize,
- typebufs[type].mpi_datatype, proc, tag,
- dist::comm(), &srequests[dist::size()*type + proc]);
- timer.stop (procbuf.sendbufsize * datatypesize);
- }
+ if (procbuf.sendbufsize > 0) {
+ int const tag =
+ vary_tags
+ ? (proc + dist::size() * (dist::rank() + dist::size() * type)) % 32768
+ : type;
+ if (use_mpi_send) {
+ // use MPI_Send
+ static Timer timer ("commstate_send");
+ timer.start ();
+ MPI_Send (procbuf.sendbufbase, procbuf.sendbufsize,
+ typebufs[type].mpi_datatype, proc, tag,
+ dist::comm());
+ srequests[dist::size()*type + proc] = MPI_REQUEST_NULL;
+ timer.stop (procbuf.sendbufsize * datatypesize);
+ } else if (use_mpi_ssend) {
+ // use MPI_Ssend
+ static Timer timer ("commstate_ssend");
+ timer.start ();
+ MPI_Ssend (procbuf.sendbufbase, procbuf.sendbufsize,
+ typebufs[type].mpi_datatype, proc, tag,
+ dist::comm());
+ srequests[dist::size()*type + proc] = MPI_REQUEST_NULL;
+ timer.stop (procbuf.sendbufsize * datatypesize);
+ } else {
+ // use MPI_Isend
+ static Timer timer ("commstate_isend");
+ timer.start ();
+ MPI_Isend (procbuf.sendbufbase, procbuf.sendbufsize,
+ typebufs[type].mpi_datatype, proc, tag,
+ dist::comm(), &srequests[dist::size()*type + proc]);
+ timer.stop (procbuf.sendbufsize * datatypesize);
}
+ }
- } // for type
+ } // for type
- } // for proc
- }
-
- // Now fall through to the next state in which the recv buffers
- // are emptied as soon as data has arrived.
- thestate = state_empty_recv_buffers;
-
- case state_empty_recv_buffers:
- // Finish (at least one of) the posted communications
- if (not AllPostedCommunicationsFinished ()) {
- // No state change if there are still outstanding communications;
- // do another comm_state loop iteration.
- } else {
- // Everything is done so release the collective communication buffers.
- for (size_t type = 0; type < typebufs.size(); type++) {
- for (size_t proc = 0; proc < typebufs[type].procbufs.size(); proc++) {
- delete[] typebufs[type].procbufs[proc].sendbufbase;
- delete[] typebufs[type].procbufs[proc].recvbufbase;
- }
+ } // for proc
+ }
+
+ // Now fall through to the next state in which the recv buffers
+ // are emptied as soon as data has arrived.
+ thestate = state_do_some_work;
+ break;
+
+ case state_do_some_work:
+ // Now fall through to the next state in which the recv buffers
+ // are emptied as soon as data has arrived.
+ thestate = state_empty_recv_buffers;
+
+ case state_empty_recv_buffers:
+ // Finish (at least one of) the posted communications
+ if (not AllPostedCommunicationsFinished ()) {
+ // No state change if there are still outstanding
+ // communications; do another comm_state loop iteration.
+ } else {
+ // Everything is done so release the collective communication buffers.
+ for (size_t type = 0; type < typebufs.size(); type++) {
+ for (size_t proc = 0; proc < typebufs[type].procbufs.size(); proc++) {
+ delete[] typebufs[type].procbufs[proc].sendbufbase;
+ delete[] typebufs[type].procbufs[proc].recvbufbase;
}
- thestate = state_done;
}
- break;
-
- default:
- assert (0 && "invalid state");
+ thestate = state_done;
+ }
+ break;
+
+ default:
+ assert (0 && "invalid state");
}
total.stop (0);
}
+
bool comm_state::done ()
{
return thestate == state_done;
@@ -242,17 +232,14 @@ comm_state::~comm_state ()
DECLARE_CCTK_PARAMETERS;
assert (thestate == state_done or
- thestate == (use_collective_communication_buffers ?
- state_get_buffer_sizes : state_post));
- assert (requests.empty());
+ thestate == state_get_buffer_sizes);
}
// wait for completion of posted collective buffer sends/receives
//
-// Depending on the parameter CarpetLib::use_waitall, this function will wait
-// for all (true) or at least one (false) of the posted receive operations
-// to finish.
+// This function will wait for all of the posted receive operations to
+// finish.
//
// It returns true if all posted communications have been completed.
bool comm_state::AllPostedCommunicationsFinished ()
@@ -292,115 +279,149 @@ bool comm_state::AllPostedCommunicationsFinished ()
return true;
}
- // reset completion flag for all receive buffers
- recvbuffers_ready.assign (recvbuffers_ready.size(), false);
-
- if (use_waitall) {
- // mark all posted recveive buffers as ready
- for (size_t i = 0; i < recvbuffers_ready.size(); i++) {
- recvbuffers_ready.AT(i) = rrequests.AT(i) != MPI_REQUEST_NULL;
- }
-
- // wait for completion of all posted receive operations
- if (reduce_mpi_waitall) {
- size_t nreqs = 0;
- for (size_t i=0; i<rrequests.size(); ++i) {
- if (rrequests.AT(i) != MPI_REQUEST_NULL) {
- ++nreqs;
- }
+ // wait for completion of all posted receive operations
+ if (reduce_mpi_waitall) {
+ size_t nreqs = 0;
+ for (size_t i=0; i<rrequests.size(); ++i) {
+ if (rrequests.AT(i) != MPI_REQUEST_NULL) {
+ ++nreqs;
}
- vector<MPI_Request> reqs(nreqs);
- nreqs = 0;
- for (size_t i=0; i<rrequests.size(); ++i) {
- if (rrequests.AT(i) != MPI_REQUEST_NULL) {
- reqs.AT(nreqs) = rrequests.AT(i);
- ++nreqs;
- }
+ }
+ vector<MPI_Request> reqs(nreqs);
+ nreqs = 0;
+ for (size_t i=0; i<rrequests.size(); ++i) {
+ if (rrequests.AT(i) != MPI_REQUEST_NULL) {
+ reqs.AT(nreqs) = rrequests.AT(i);
+ ++nreqs;
}
- assert (nreqs == reqs.size());
- static Timer timer ("commstate_waitall");
- timer.start ();
- MPI_Waitall (reqs.size(), &reqs.front(), MPI_STATUSES_IGNORE);
- timer.stop (0);
- } else {
- static Timer timer ("commstate_waitall");
- timer.start ();
- MPI_Waitall (rrequests.size(), &rrequests.front(), MPI_STATUSES_IGNORE);
- timer.stop (0);
}
- num_completed_recvs = num_posted_recvs;
+ assert (nreqs == reqs.size());
+ static Timer timer ("commstate_waitall");
+ timer.start ();
+ MPI_Waitall (reqs.size(), &reqs.front(), MPI_STATUSES_IGNORE);
+ timer.stop (0);
} else {
- int num_completed_recvs_ = 0;
- vector<int> completed_recvs(rrequests.size(), -1);
-
- // wait for completion of at least one posted receive operation
- static Timer timer ("commstate_waitsome");
+ static Timer timer ("commstate_waitall");
timer.start ();
- MPI_Waitsome (rrequests.size(), &rrequests.front(), &num_completed_recvs_,
- &completed_recvs.front(), MPI_STATUSES_IGNORE);
+ MPI_Waitall (rrequests.size(), &rrequests.front(), MPI_STATUSES_IGNORE);
timer.stop (0);
- assert (0 < num_completed_recvs_);
- num_completed_recvs += num_completed_recvs_;
-
- // mark the recveive buffers of completed communications as ready
- for (int i = 0; i < num_completed_recvs_; i++) {
- assert (rrequests.AT(completed_recvs.AT(i)) == MPI_REQUEST_NULL);
- recvbuffers_ready.AT(completed_recvs.AT(i)) = true;
- }
}
-
- return (false);
+ num_completed_recvs = num_posted_recvs;
+
+ return false;
}
-template<typename T>
-comm_state::commbuf<T>::commbuf (ibbox const & box)
+
+void
+comm_state::
+reserve_send_space (unsigned int const type,
+ int const proc,
+ int const npoints)
{
- data.resize (prod (box.shape() / box.stride()));
+ assert (type < dist::c_ndatatypes());
+ assert (proc >= 0 and proc < dist::size());
+ assert (npoints >= 0);
+ typebufdesc & typebuf = typebufs.AT(type);
+ procbufdesc & procbuf = typebuf.procbufs.AT(proc);
+ procbuf.sendbufsize += npoints;
+ typebuf.in_use = true;
}
-template<typename T>
-comm_state::commbuf<T>::~commbuf ()
+void
+comm_state::
+reserve_recv_space (unsigned int const type,
+ int const proc,
+ int const npoints)
{
+ assert (type < dist::c_ndatatypes());
+ assert (proc >= 0 and proc < dist::size());
+ assert (npoints >= 0);
+ typebufdesc & typebuf = typebufs.AT(type);
+ procbufdesc & procbuf = typebuf.procbufs.AT(proc);
+ procbuf.recvbufsize += npoints;
+ typebuf.in_use = true;
}
-template<typename T>
-void const *
-comm_state::commbuf<T>::pointer ()
- const
+void *
+comm_state::
+send_buffer (unsigned int const type,
+ int const proc,
+ int const npoints)
{
- return &data.front();
+ assert (type < dist::c_ndatatypes());
+ assert (proc >= 0 and proc < dist::size());
+ typebufdesc const & typebuf = typebufs.AT(type);
+ procbufdesc const & procbuf = typebuf.procbufs.AT(proc);
+
+ assert (procbuf.sendbuf + npoints * typebuf.datatypesize <=
+ procbuf.sendbufbase + procbuf.sendbufsize * typebuf.datatypesize);
+
+ return procbuf.sendbuf;
}
-template<typename T>
void *
-comm_state::commbuf<T>::pointer ()
+comm_state::
+recv_buffer (unsigned int const type,
+ int const proc,
+ int const npoints)
{
- return &data.front();
+ assert (type < dist::c_ndatatypes());
+ assert (proc >= 0 and proc < dist::size());
+ typebufdesc const & typebuf = typebufs.AT(type);
+ procbufdesc const & procbuf = typebuf.procbufs.AT(proc);
+
+ assert (procbuf.recvbuf + npoints * typebuf.datatypesize <=
+ procbuf.recvbufbase + procbuf.recvbufsize * typebuf.datatypesize);
+
+ return procbuf.recvbuf;
}
-template<typename T>
-int
-comm_state::commbuf<T>::size ()
- const
+void
+comm_state::
+commit_send_space (unsigned int const type,
+ int const proc,
+ int const npoints)
{
- return data.size();
+ DECLARE_CCTK_PARAMETERS;
+
+ assert (type < dist::c_ndatatypes());
+ assert (proc >= 0 and proc < dist::size());
+ assert (npoints >= 0);
+ typebufdesc & typebuf = typebufs.AT(type);
+ procbufdesc & procbuf = typebuf.procbufs.AT(proc);
+ procbuf.sendbuf += npoints * typebuf.datatypesize;
+ assert (procbuf.sendbuf <=
+ procbuf.sendbufbase + procbuf.sendbufsize * typebuf.datatypesize);
+
+ if (not combine_sends) {
+ // post the send if the buffer is full
+ if (procbuf.sendbuf ==
+ procbuf.sendbufbase + procbuf.sendbufsize * typebuf.datatypesize)
+ {
+ static Timer timer ("commit_send_space::isend");
+ timer.start ();
+ MPI_Isend (procbuf.sendbufbase,
+ procbuf.sendbufsize, typebuf.mpi_datatype,
+ proc, type, dist::comm(),
+ & srequests.AT(type * dist::size() + proc));
+ timer.stop (procbuf.sendbufsize * typebuf.datatypesize);
+ }
+ }
}
-template<typename T>
-MPI_Datatype
-comm_state::commbuf<T>::datatype ()
- const
+void
+comm_state::
+commit_recv_space (unsigned int const type,
+ int const proc,
+ int const npoints)
{
- T dummy;
- return dist::datatype (dummy);
+ assert (type < dist::c_ndatatypes());
+ assert (proc >= 0 and proc < dist::size());
+ assert (npoints >= 0);
+ typebufdesc & typebuf = typebufs.AT(type);
+ procbufdesc & procbuf = typebuf.procbufs.AT(proc);
+ procbuf.recvbuf += npoints * typebuf.datatypesize;
+ assert (procbuf.recvbuf <=
+ procbuf.recvbufbase + procbuf.recvbufsize * typebuf.datatypesize);
}
-
-
-
-#define INSTANTIATE(T) \
- template class comm_state::commbuf<T>;
-
-#include "instantiate"
-
-#undef INSTANTIATE
diff --git a/Carpet/CarpetLib/src/commstate.hh b/Carpet/CarpetLib/src/commstate.hh
index fcce6f201..e2e90ccc8 100644
--- a/Carpet/CarpetLib/src/commstate.hh
+++ b/Carpet/CarpetLib/src/commstate.hh
@@ -6,24 +6,26 @@
#include <mpi.h>
+#include "cctk_Parameters.h"
+
#include "dist.hh"
+#include "timestat.hh"
using namespace std;
+using namespace CarpetLib;
-// State information for communications
-//
-// Depending on how a comm state object was created,
-// it will step through one of two state transitions (in the given order):
-enum astate {
- // these are used for collective communications
- state_get_buffer_sizes, state_fill_send_buffers, state_empty_recv_buffers,
- // these are used for communications on individual components
- state_post, state_wait,
+// State information for communications
- // all transition graphs must end with here
- state_done
+// A comm state object will step through the state transitions in the
+// given order:
+enum astate {
+ state_get_buffer_sizes,
+ state_fill_send_buffers,
+ state_do_some_work,
+ state_empty_recv_buffers,
+ state_done,
};
struct comm_state {
@@ -42,37 +44,6 @@ private:
public:
//////////////////////////////////////////////////////////////////////////
- // the following members are used for single-component communications
- //////////////////////////////////////////////////////////////////////////
-
- // List of MPI requests for use_waitall
- vector<MPI_Request> requests;
-
- // Lists of communication buffers for use_lightweight_buffers
- struct gcommbuf {
- gcommbuf () {};
- virtual ~gcommbuf () {};
- MPI_Request request;
- virtual void const * pointer () const = 0;
- virtual void * pointer () = 0;
- virtual int size () const = 0;
- virtual MPI_Datatype datatype () const = 0;
- };
-
- template<typename T>
- struct commbuf : gcommbuf {
- commbuf (ibbox const & box);
- virtual ~commbuf ();
- virtual void const * pointer () const;
- virtual void * pointer ();
- virtual int size () const;
- virtual MPI_Datatype datatype () const;
- vector<T> data;
- };
- queue<gcommbuf*> recvbufs, sendbufs;
-
-
- //////////////////////////////////////////////////////////////////////////
// the following members are used for collective communications
//////////////////////////////////////////////////////////////////////////
@@ -125,12 +96,39 @@ public:
// list of datatype buffers
vector<typebufdesc> typebufs; // [dist::c_ndatatypes()]
- // flags indicating which receive buffers are ready to be emptied
- vector<bool> recvbuffers_ready; // [dist::size() * dist::c_ndatatypes()]
+ void
+ reserve_send_space (unsigned int type,
+ int proc,
+ int npoints);
+
+ void
+ reserve_recv_space (unsigned int type,
+ int proc,
+ int npoints);
+
+ void *
+ send_buffer (unsigned int type,
+ int proc,
+ int npoints);
+ void *
+ recv_buffer (unsigned int type,
+ int proc,
+ int npoints);
+
+ void
+ commit_send_space (unsigned int type,
+ int proc,
+ int npoints);
+
+ void
+ commit_recv_space (unsigned int type,
+ int proc,
+ int npoints);
+
+private:
// lists of outstanding requests for posted send/recv communications
vector<MPI_Request> srequests; // [dist::size() * dist::c_ndatatypes()]
-private:
vector<MPI_Request> rrequests; // [dist::size() * dist::c_ndatatypes()]
// number of posted and already completed receive communications