aboutsummaryrefslogtreecommitdiff
path: root/Carpet/CarpetLib/src/commstate.cc
diff options
context:
space:
mode:
Diffstat (limited to 'Carpet/CarpetLib/src/commstate.cc')
-rw-r--r--Carpet/CarpetLib/src/commstate.cc710
1 files changed, 433 insertions, 277 deletions
diff --git a/Carpet/CarpetLib/src/commstate.cc b/Carpet/CarpetLib/src/commstate.cc
index 7a00157c2..ef1b64cdb 100644
--- a/Carpet/CarpetLib/src/commstate.cc
+++ b/Carpet/CarpetLib/src/commstate.cc
@@ -21,241 +21,463 @@ using namespace CarpetLib;
+char const * tostring (astate const & thestate)
+{
+ switch (thestate) {
+ case state_get_buffer_sizes: return "state_get_buffer_sizes";
+ case state_fill_send_buffers: return "state_fill_send_buffers";
+ case state_do_some_work: return "state_do_some_work";
+ case state_empty_recv_buffers: return "state_empty_recv_buffers";
+ case state_done: return "state_done";
+ default: assert(0); abort();
+ }
+ return NULL;
+}
+
+
+
// Communication state control
comm_state::comm_state ()
{
- // A comm_state object will step through
- // state_get_buffer_sizes
- // state_fill_send_buffers
- // state_empty_recv_buffers
-
DECLARE_CCTK_PARAMETERS;
-
+
static Timer timer ("commstate::create");
timer.start ();
thestate = state_get_buffer_sizes;
-
+
typebufs.resize (dist::c_ndatatypes());
-#define INSTANTIATE(T) \
- { \
- T dummy; \
- int const type = dist::c_datatype (dummy); \
- assert (typebufs.AT(type).datatypesize == 0); \
- typebufs.AT(type).datatypesize = sizeof dummy; \
- typebufs.AT(type).mpi_datatype = dist::datatype (dummy); \
- typebufs.AT(type).procbufs.resize (dist::size()); \
+#define INSTANTIATE(T) \
+ { \
+ T dummy; \
+ unsigned const type = dist::c_datatype (dummy); \
+ typebufs.AT(type).mpi_datatype = dist::mpi_datatype (dummy); \
+ typebufs.AT(type).datatypesize = sizeof dummy; \
}
#include "instantiate"
#undef INSTANTIATE
-
- srequests.resize (dist::c_ndatatypes() * dist::size(), MPI_REQUEST_NULL);
- rrequests.resize (dist::c_ndatatypes() * dist::size(), MPI_REQUEST_NULL);
+
+ srequests.reserve (dist::c_ndatatypes() * dist::size());
+ rrequests.reserve (dist::c_ndatatypes() * dist::size());
timer.stop (0);
}
+
void comm_state::step ()
{
DECLARE_CCTK_PARAMETERS;
static Timer total ("commstate::step");
total.start ();
- assert (thestate != state_done);
+
+ if (barrier_between_stages) {
+ // Add a barrier, ensuring e.g. that all Irecvs are posted before
+ // the first Isends are made
+ if (commstate_verbose) {
+ CCTK_VInfo (CCTK_THORNSTRING,
+ "before MPI_Barrier; state=%s", tostring(thestate));
+ }
+ MPI_Barrier (dist::comm());
+ if (commstate_verbose) {
+ CCTK_INFO ("after MPI_Barrier");
+ }
+ }
+
switch (thestate) {
- case state_get_buffer_sizes:
+
+
+ case state_get_buffer_sizes: {
+
+ if (check_communication_schedule) {
+ vector<int> sendcount(dist::size() * dist::c_ndatatypes());
+ for (unsigned type = 0; type < dist::c_ndatatypes(); ++ type) {
+ for (int proc = 0; proc < dist::size(); ++ proc) {
+ sendcount.AT(proc * dist::c_ndatatypes() + type) =
+ typebufs.AT(type).in_use ?
+ typebufs.AT(type).procbufs.AT(proc).sendbufsize :
+ 0;
+ }
+ assert (sendcount.AT(dist::rank() * dist::c_ndatatypes() + type) == 0);
+ }
+ vector<int> recvcount(dist::size() * dist::c_ndatatypes());
+ if (commstate_verbose) {
+ CCTK_INFO ("before MPI_Alltoall");
+ }
+ MPI_Alltoall (&sendcount.front(), dist::c_ndatatypes(), MPI_INT,
+ &recvcount.front(), dist::c_ndatatypes(), MPI_INT,
+ dist::comm());
+ if (commstate_verbose) {
+ CCTK_INFO ("after MPI_Alltoall");
+ }
+ for (unsigned type = 0; type < dist::c_ndatatypes(); ++ type) {
+ for (int proc = 0; proc < dist::size(); ++ proc) {
+ assert (recvcount.AT(proc * dist::c_ndatatypes() + type) ==
+ (typebufs.AT(type).in_use ?
+ int (typebufs.AT(type).procbufs.AT(proc).recvbufsize) :
+ 0));
+ }
+ assert (recvcount.AT(dist::rank() * dist::c_ndatatypes() + type) == 0);
+ }
+ }
+
// The sizes of the collective communication buffers are known so
// now allocate them.
// The receive operations are also posted here already (a clever
// MPI layer may take advantage of such early posting).
- num_posted_recvs = num_completed_recvs = 0;
- for (int proc1 = 0; proc1 < dist::size(); ++ proc1) {
- size_t const proc =
- interleave_communications
- ? (proc1 + dist::rank()) % dist::size()
- : proc1;
-
- for (size_t type = 0; type < typebufs.size(); type++) {
+ for (unsigned type = 0; type < dist::c_ndatatypes(); ++ type) {
+ if (typebufs.AT(type).in_use) {
- // skip unused datatype buffers
- if (not typebufs.AT(type).in_use) continue;
-
- int datatypesize = typebufs.AT(type).datatypesize;
- procbufdesc& procbuf = typebufs.AT(type).procbufs.AT(proc);
-
- assert (procbuf.sendbufbase.empty());
- assert (procbuf.recvbufbase.empty());
- procbuf.sendbufbase.resize (procbuf.sendbufsize*datatypesize);
- procbuf.recvbufbase.resize (procbuf.recvbufsize*datatypesize);
- // TODO: this may be a bit extreme, and it is only for
- // internal consistency checking
- if (poison_new_memory) {
- memset (&procbuf.sendbufbase.front(), poison_value,
- procbuf.sendbufsize*datatypesize);
- memset (&procbuf.recvbufbase.front(), poison_value,
- procbuf.recvbufsize*datatypesize);
- }
- procbuf.sendbuf = &procbuf.sendbufbase.front();
- procbuf.recvbuf = &procbuf.recvbufbase.front();
-
- if (procbuf.recvbufsize > 0) {
- static Timer timer ("commstate_sizes_irecv");
- timer.start ();
- int const tag =
- vary_tags
- ? (dist::rank() + dist::size() * (proc + dist::size() * type)) % 32768
- : type;
- if (commstate_verbose) {
- CCTK_VInfo (CCTK_THORNSTRING,
- "About to MPI_Irecv from %d", (int)proc);
+ for (int proc1 = 0; proc1 < dist::size(); ++ proc1) {
+ int const proc =
+ interleave_communications ?
+ (proc1 + dist::rank()) % dist::size() :
+ proc1;
+
+ int const datatypesize = typebufs.AT(type).datatypesize;
+ procbufdesc & procbuf = typebufs.AT(type).procbufs.AT(proc);
+
+ assert (procbuf.sendbufbase.empty());
+ assert (procbuf.recvbufbase.empty());
+ procbuf.sendbufbase.resize
+ (procbuf.sendbufsize * datatypesize * message_size_multiplier);
+ procbuf.recvbufbase.resize
+ (procbuf.recvbufsize * datatypesize * message_size_multiplier);
+ // TODO: this may be a bit extreme, and it is only for
+ // internal consistency checking
+ if (poison_new_memory) {
+ memset (&procbuf.sendbufbase.front(), poison_value,
+ procbuf.sendbufsize * datatypesize * message_size_multiplier);
+ memset (&procbuf.recvbufbase.front(), poison_value,
+ procbuf.recvbufsize * datatypesize * message_size_multiplier);
}
- MPI_Irecv (&procbuf.recvbufbase.front(), procbuf.recvbufsize,
- typebufs.AT(type).mpi_datatype, proc, tag,
- dist::comm(), &rrequests.AT(dist::size()*type + proc));
- if (commstate_verbose) {
- CCTK_INFO ("Finished MPI_Irecv");
+ procbuf.sendbuf = &procbuf.sendbufbase.front();
+ procbuf.recvbuf = &procbuf.recvbufbase.front();
+
+ if (procbuf.recvbufsize > 0) {
+ static Timer timer ("commstate::sizes_irecv");
+ timer.start ();
+ int const tag = type;
+ if (commstate_verbose) {
+ CCTK_VInfo (CCTK_THORNSTRING,
+ "About to MPI_Irecv from processor %d for type %s",
+ proc, dist::c_datatype_name(type));
+ }
+ MPI_Irecv (&procbuf.recvbufbase.front(),
+ procbuf.recvbufsize * message_size_multiplier,
+ typebufs.AT(type).mpi_datatype, proc, tag,
+ dist::comm(), &push_back(rrequests));
+ if (commstate_verbose) {
+ CCTK_INFO ("Finished MPI_Irecv");
+ }
+ assert (not procbuf.did_post_recv);
+ procbuf.did_post_recv = true;
+ timer.stop (procbuf.recvbufsize * datatypesize);
}
- timer.stop (procbuf.recvbufsize * datatypesize);
- num_posted_recvs++;
- }
+
+ } // for proc
+
}
- }
+ } // for type
- if (barrier_between_stages) {
- // Add a barrier, to try to ensure that all Irecvs are posted
- // before the first Isends are made
- // (Alternative: Use MPI_Alltoallv instead)
- MPI_Barrier (dist::comm());
+ if (check_communication_schedule) {
+ for (unsigned type = 0; type < dist::c_ndatatypes(); ++ type) {
+ if (typebufs.AT(type).in_use) {
+ for (int proc = 0; proc < dist::size(); ++ proc) {
+ procbufdesc const & procbuf = typebufs.AT(type).procbufs.AT(proc);
+ assert (procbuf.did_post_recv == (procbuf.recvbufsize > 0));
+ }
+ }
+ }
}
- // Now go and get the send buffers filled with data.
- // Once a buffer is full it will be posted right away
- // (see gdata::copy_into_sendbuffer() and
- // gdata::interpolate_into_sendbuffer()).
thestate = state_fill_send_buffers;
break;
+ }
+
- case state_fill_send_buffers:
+
+ case state_fill_send_buffers: {
if (combine_sends) {
- // Send the data. Do not send them sequentially, but try to
- // intersperse the communications
- for (int proc1 = 0; proc1 < dist::size(); ++ proc1) {
- int const proc =
- interleave_communications
- ? (proc1 + dist::size() - dist::rank()) % dist::size()
- : proc1;
-
- for (size_t type = 0; type < typebufs.size(); type++) {
- // skip unused datatype buffers
- if (not typebufs.AT(type).in_use) continue;
+ for (unsigned type = 0; type < dist::c_ndatatypes(); ++ type) {
+ if (typebufs.AT(type).in_use) {
- int const datatypesize = typebufs.AT(type).datatypesize;
- procbufdesc const & procbuf = typebufs.AT(type).procbufs.AT(proc);
-
- size_t const fillstate =
- procbuf.sendbuf - &procbuf.sendbufbase.front();
- assert (fillstate == procbuf.sendbufsize * datatypesize);
-
- if (procbuf.sendbufsize > 0) {
- int const tag =
- vary_tags
- ? (proc + dist::size() * (dist::rank() + dist::size() * type)) % 32768
- : type;
- if (use_mpi_send) {
- // use MPI_Send
- static Timer timer ("commstate_send");
- timer.start ();
- if (commstate_verbose) {
- CCTK_VInfo (CCTK_THORNSTRING,
- "About to MPI_Send to %d", (int)proc);
+ for (int proc1 = 0; proc1 < dist::size(); ++ proc1) {
+ int const proc =
+ interleave_communications
+ ? (proc1 + dist::size() - dist::rank()) % dist::size()
+ : proc1;
+
+ procbufdesc & procbuf = typebufs.AT(type).procbufs.AT(proc);
+ if (procbuf.sendbufsize > 0) {
+
+ int const datatypesize = typebufs.AT(type).datatypesize;
+
+ size_t const fillstate =
+ procbuf.sendbuf - &procbuf.sendbufbase.front();
+ assert (fillstate == procbuf.sendbufsize * datatypesize);
+
+ // Enlarge messages for performance testing
+ if (message_size_multiplier > 1) {
+ size_t const nbytes =
+ procbuf.sendbufsize * datatypesize *
+ (message_size_multiplier - 1);
+#warning "TODO"
+ // memset (procbuf.sendbuf, poison_value, nbytes);
+ memset (procbuf.sendbuf, 0, nbytes);
}
- MPI_Send (const_cast<char*>(&procbuf.sendbufbase.front()),
- procbuf.sendbufsize,
- typebufs.AT(type).mpi_datatype, proc, tag,
- dist::comm());
- if (commstate_verbose) {
- CCTK_INFO ("Finished MPI_Send");
+
+ int const tag = type;
+ if (use_mpi_send) {
+ // use MPI_Send
+ static Timer timer ("commstate::send");
+ timer.start ();
+ if (commstate_verbose) {
+ CCTK_VInfo (CCTK_THORNSTRING,
+ "About to MPI_Send to processor %d for type %s",
+ proc, dist::c_datatype_name(type));
+ }
+ MPI_Send (const_cast<char*>(&procbuf.sendbufbase.front()),
+ procbuf.sendbufsize * message_size_multiplier,
+ typebufs.AT(type).mpi_datatype, proc, tag,
+ dist::comm());
+ assert (not procbuf.did_post_send);
+ procbuf.did_post_send = true;
+ if (commstate_verbose) {
+ CCTK_INFO ("Finished MPI_Send");
+ }
+ timer.stop (procbuf.sendbufsize * datatypesize);
+ } else if (use_mpi_ssend) {
+ // use MPI_Ssend
+ static Timer timer ("commstate::ssend");
+ timer.start ();
+ if (commstate_verbose) {
+ CCTK_VInfo (CCTK_THORNSTRING,
+ "About to MPI_Ssend to processor %d for type %s",
+ proc, dist::c_datatype_name(type));
+ }
+ MPI_Ssend (const_cast<char*>(&procbuf.sendbufbase.front()),
+ procbuf.sendbufsize * message_size_multiplier,
+ typebufs.AT(type).mpi_datatype, proc, tag,
+ dist::comm());
+ assert (not procbuf.did_post_send);
+ procbuf.did_post_send = true;
+ if (commstate_verbose) {
+ CCTK_INFO ("Finished MPI_Ssend");
+ }
+ timer.stop (procbuf.sendbufsize * datatypesize);
+ } else {
+ // use MPI_Isend
+ static Timer timer ("commstate::isend");
+ timer.start ();
+ if (commstate_verbose) {
+ CCTK_VInfo (CCTK_THORNSTRING,
+ "About to MPI_Isend to processor %d for type %s",
+ proc, dist::c_datatype_name(type));
+ }
+ MPI_Isend (const_cast<char*>(&procbuf.sendbufbase.front()),
+ procbuf.sendbufsize * message_size_multiplier,
+ typebufs.AT(type).mpi_datatype, proc, tag,
+ dist::comm(), &push_back(srequests));
+ assert (not procbuf.did_post_send);
+ procbuf.did_post_send = true;
+ if (commstate_verbose) {
+ CCTK_INFO ("Finished MPI_Isend");
+ }
+ timer.stop (procbuf.sendbufsize * datatypesize);
}
- srequests.AT(dist::size()*type + proc) = MPI_REQUEST_NULL;
- timer.stop (procbuf.sendbufsize * datatypesize);
- } else if (use_mpi_ssend) {
- // use MPI_Ssend
- static Timer timer ("commstate_ssend");
+
+ }
+ } // for proc
+
+ }
+ } // for type
+ } // if combine_sends
+
+ if (check_communication_schedule) {
+ for (unsigned type = 0; type < dist::c_ndatatypes(); ++ type) {
+ if (typebufs.AT(type).in_use) {
+ for (int proc = 0; proc < dist::size(); ++ proc) {
+ procbufdesc const & procbuf = typebufs.AT(type).procbufs.AT(proc);
+ assert (procbuf.did_post_send == (procbuf.sendbufsize > 0));
+ }
+ }
+ }
+ }
+
+ thestate = state_do_some_work;
+ break;
+ }
+
+
+
+ case state_do_some_work: {
+ static Timer timer ("commstate::do_some_work::waitall");
+ timer.start ();
+ if (commstate_verbose) {
+ CCTK_INFO ("About to MPI_Waitall");
+ }
+ MPI_Waitall (rrequests.size(), &rrequests.front(), MPI_STATUSES_IGNORE);
+ if (commstate_verbose) {
+ CCTK_INFO ("Finished MPI_Waitall");
+ }
+ timer.stop (0);
+
+ thestate = state_empty_recv_buffers;
+ break;
+ }
+
+
+
+ case state_empty_recv_buffers: {
+ static Timer timer ("commstate::empty_recv_buffers::waitall");
+ timer.start ();
+ if (commstate_verbose) {
+ CCTK_INFO ("About to MPI_Waitall");
+ }
+ MPI_Waitall (srequests.size(), &srequests.front(), MPI_STATUSES_IGNORE);
+ if (commstate_verbose) {
+ CCTK_INFO ("Finished MPI_Waitall");
+ }
+ timer.stop (0);
+
+ // Transfer messages again for performance testing
+ for (int n = 1; n < message_count_multiplier; ++ n) {
+
+ srequests.clear();
+ srequests.reserve (dist::c_ndatatypes() * dist::size());
+ rrequests.clear();
+ rrequests.reserve (dist::c_ndatatypes() * dist::size());
+
+ // Irecv
+ for (unsigned type = 0; type < dist::c_ndatatypes(); ++ type) {
+ if (typebufs.AT(type).in_use) {
+
+ for (int proc1 = 0; proc1 < dist::size(); ++ proc1) {
+ int const proc =
+ interleave_communications ?
+ (proc1 + dist::rank()) % dist::size() :
+ proc1;
+
+ procbufdesc & procbuf = typebufs.AT(type).procbufs.AT(proc);
+
+ if (procbuf.recvbufsize > 0) {
+ static Timer timer ("commstate::message_count_multiplier::irecv");
timer.start ();
+ int const tag = type;
if (commstate_verbose) {
CCTK_VInfo (CCTK_THORNSTRING,
- "About to MPI_Ssend to %d", (int)proc);
+ "About to MPI_Irecv from processor %d for type %s",
+ proc, dist::c_datatype_name(type));
}
- MPI_Ssend (const_cast<char*>(&procbuf.sendbufbase.front()),
- procbuf.sendbufsize,
+ MPI_Irecv (&procbuf.recvbufbase.front(),
+ procbuf.recvbufsize * message_size_multiplier,
typebufs.AT(type).mpi_datatype, proc, tag,
- dist::comm());
+ dist::comm(), &push_back(rrequests));
if (commstate_verbose) {
- CCTK_INFO ("Finished MPI_Ssend");
+ CCTK_INFO ("Finished MPI_Irecv");
}
- srequests.AT(dist::size()*type + proc) = MPI_REQUEST_NULL;
- timer.stop (procbuf.sendbufsize * datatypesize);
- } else {
- // use MPI_Isend
- static Timer timer ("commstate_isend");
+ timer.stop (procbuf.recvbufsize * typebufs.AT(type).datatypesize);
+ }
+
+ } // for proc
+
+ }
+ } // for type
+
+ // Isend
+ for (unsigned type = 0; type < dist::c_ndatatypes(); ++ type) {
+ if (typebufs.AT(type).in_use) {
+
+ for (int proc1 = 0; proc1 < dist::size(); ++ proc1) {
+ int const proc =
+ interleave_communications
+ ? (proc1 + dist::size() - dist::rank()) % dist::size()
+ : proc1;
+
+ procbufdesc & procbuf = typebufs.AT(type).procbufs.AT(proc);
+
+ if (procbuf.sendbufsize > 0) {
+ int const tag = type;
+ assert (not use_mpi_send);
+ assert (not use_mpi_ssend);
+ static Timer timer ("commstate::message_count_multiplier::isend");
timer.start ();
if (commstate_verbose) {
- CCTK_VWarn (3, __LINE__, __FILE__, CCTK_THORNSTRING,
- "About to MPI_Isend to %d", (int)proc);
+ CCTK_VInfo (CCTK_THORNSTRING,
+ "About to MPI_Isend to processor %d for type %s",
+ proc, dist::c_datatype_name(type));
}
MPI_Isend (const_cast<char*>(&procbuf.sendbufbase.front()),
- procbuf.sendbufsize,
+ procbuf.sendbufsize * message_size_multiplier,
typebufs.AT(type).mpi_datatype, proc, tag,
- dist::comm(), &srequests.AT(dist::size()*type + proc));
+ dist::comm(), &push_back(srequests));
if (commstate_verbose) {
CCTK_INFO ("Finished MPI_Isend");
}
- timer.stop (procbuf.sendbufsize * datatypesize);
+ timer.stop (procbuf.sendbufsize * typebufs.AT(type).datatypesize);
}
- }
+
+ } // for proc
- } // for type
-
- } // for proc
- }
+ }
+ } // for type
+
+ // Waitall
+ {
+ static Timer timer ("commstate::message_count_multiplier::waitall(irecv)");
+ timer.start ();
+ if (commstate_verbose) {
+ CCTK_INFO ("About to MPI_Waitall");
+ }
+ MPI_Waitall (rrequests.size(), &rrequests.front(), MPI_STATUSES_IGNORE);
+ if (commstate_verbose) {
+ CCTK_INFO ("Finished MPI_Waitall");
+ }
+ timer.stop (0);
+ }
+
+ // Waitall
+ {
+ static Timer timer ("commstate::message_count_multiplier::waitall(isend)");
+ timer.start ();
+ if (commstate_verbose) {
+ CCTK_INFO ("About to MPI_Waitall");
+ }
+ MPI_Waitall (srequests.size(), &srequests.front(), MPI_STATUSES_IGNORE);
+ if (commstate_verbose) {
+ CCTK_INFO ("Finished MPI_Waitall");
+ }
+ timer.stop (0);
+ }
+
+ } // for n
- // Now fall through to the next state in which the recv buffers
- // are emptied as soon as data has arrived.
- thestate = state_do_some_work;
+ thestate = state_done;
break;
+ }
+
+
+
+ case state_done: {
+ assert (0); abort();
+ }
- case state_do_some_work:
- // Now fall through to the next state in which the recv buffers
- // are emptied as soon as data has arrived.
- thestate = state_empty_recv_buffers;
- case state_empty_recv_buffers:
- // Finish (at least one of) the posted communications
- if (not AllPostedCommunicationsFinished ()) {
- // No state change if there are still outstanding
- // communications; do another comm_state loop iteration.
- } else {
- // Everything is done so release the collective communication buffers.
- for (size_t type = 0; type < typebufs.size(); type++) {
- for (size_t proc = 0; proc < typebufs.AT(type).procbufs.size(); proc++) {
- typebufs.AT(type).procbufs.AT(proc).sendbufbase.clear();
- typebufs.AT(type).procbufs.AT(proc).recvbufbase.clear();
- }
- }
- thestate = state_done;
- }
- break;
default:
- assert (0 && "invalid state");
+ assert (0); abort();
}
+
+
+
total.stop (0);
}
-bool comm_state::done ()
+bool comm_state::done () const
{
return thestate == state_done;
}
@@ -264,116 +486,16 @@ bool comm_state::done ()
comm_state::~comm_state ()
{
DECLARE_CCTK_PARAMETERS;
-
+
assert (thestate == state_done or
thestate == state_get_buffer_sizes);
}
-// wait for completion of posted collective buffer sends/receives
-//
-// This function will wait for all of the posted receive operations to
-// finish.
-//
-// It returns true if all posted communications have been completed.
-bool comm_state::AllPostedCommunicationsFinished ()
-{
- DECLARE_CCTK_PARAMETERS;
-
- // check if all outstanding receives have been completed already
- if (num_posted_recvs == num_completed_recvs) {
- // finalize the outstanding sends in one go
- if (reduce_mpi_waitall) {
- size_t nreqs = 0;
- for (size_t i=0; i<srequests.size(); ++i) {
- if (srequests.AT(i) != MPI_REQUEST_NULL) {
- ++nreqs;
- }
- }
- vector<MPI_Request> reqs(nreqs);
- nreqs = 0;
- for (size_t i=0; i<srequests.size(); ++i) {
- if (srequests.AT(i) != MPI_REQUEST_NULL) {
- reqs.AT(nreqs) = srequests.AT(i);
- ++nreqs;
- }
- }
- assert (nreqs == reqs.size());
- static Timer timer ("commstate_waitall_final");
- timer.start ();
- if (commstate_verbose) {
- CCTK_INFO ("About to MPI_Waitall");
- }
- MPI_Waitall (reqs.size(), &reqs.front(), MPI_STATUSES_IGNORE);
- if (commstate_verbose) {
- CCTK_INFO ("Finished MPI_Waitall");
- }
- timer.stop (0);
- } else {
- static Timer timer ("commstate_waitall_final");
- timer.start ();
- if (commstate_verbose) {
- CCTK_INFO ("About to MPI_Waitall");
- }
- MPI_Waitall (srequests.size(), &srequests.front(), MPI_STATUSES_IGNORE);
- if (commstate_verbose) {
- CCTK_INFO ("Finished MPI_Waitall");
- }
- timer.stop (0);
- }
-
- return true;
- }
-
- // wait for completion of all posted receive operations
- if (reduce_mpi_waitall) {
- size_t nreqs = 0;
- for (size_t i=0; i<rrequests.size(); ++i) {
- if (rrequests.AT(i) != MPI_REQUEST_NULL) {
- ++nreqs;
- }
- }
- vector<MPI_Request> reqs(nreqs);
- nreqs = 0;
- for (size_t i=0; i<rrequests.size(); ++i) {
- if (rrequests.AT(i) != MPI_REQUEST_NULL) {
- reqs.AT(nreqs) = rrequests.AT(i);
- ++nreqs;
- }
- }
- assert (nreqs == reqs.size());
- static Timer timer ("commstate_waitall");
- timer.start ();
- if (commstate_verbose) {
- CCTK_INFO ("About to MPI_Waitall");
- }
- MPI_Waitall (reqs.size(), &reqs.front(), MPI_STATUSES_IGNORE);
- if (commstate_verbose) {
- CCTK_INFO ("Finished MPI_Waitall");
- }
- timer.stop (0);
- } else {
- static Timer timer ("commstate_waitall");
- timer.start ();
- if (commstate_verbose) {
- CCTK_INFO ("About to MPI_Waitall");
- }
- MPI_Waitall (rrequests.size(), &rrequests.front(), MPI_STATUSES_IGNORE);
- if (commstate_verbose) {
- CCTK_INFO ("Finished MPI_Waitall");
- }
- timer.stop (0);
- }
- num_completed_recvs = num_posted_recvs;
-
- return false;
-}
-
-
void
comm_state::
-reserve_send_space (unsigned int const type,
+reserve_send_space (unsigned const type,
int const proc,
int const npoints)
{
@@ -381,14 +503,19 @@ reserve_send_space (unsigned int const type,
assert (proc >= 0 and proc < dist::size());
assert (npoints >= 0);
typebufdesc & typebuf = typebufs.AT(type);
+ if (not typebuf.in_use) {
+ typebuf.procbufs.resize (dist::size());
+ typebuf.in_use = true;
+ }
procbufdesc & procbuf = typebuf.procbufs.AT(proc);
procbuf.sendbufsize += npoints;
- typebuf.in_use = true;
}
+
+
void
comm_state::
-reserve_recv_space (unsigned int const type,
+reserve_recv_space (unsigned const type,
int const proc,
int const npoints)
{
@@ -396,19 +523,25 @@ reserve_recv_space (unsigned int const type,
assert (proc >= 0 and proc < dist::size());
assert (npoints >= 0);
typebufdesc & typebuf = typebufs.AT(type);
+ if (not typebuf.in_use) {
+ typebuf.procbufs.resize (dist::size());
+ typebuf.in_use = true;
+ }
procbufdesc & procbuf = typebuf.procbufs.AT(proc);
procbuf.recvbufsize += npoints;
- typebuf.in_use = true;
}
+
+
void *
comm_state::
-send_buffer (unsigned int const type,
+send_buffer (unsigned const type,
int const proc,
int const npoints)
{
assert (type < dist::c_ndatatypes());
assert (proc >= 0 and proc < dist::size());
+ assert (npoints > 0);
typebufdesc const & typebuf = typebufs.AT(type);
procbufdesc const & procbuf = typebuf.procbufs.AT(proc);
@@ -419,14 +552,17 @@ send_buffer (unsigned int const type,
return procbuf.sendbuf;
}
+
+
void *
comm_state::
-recv_buffer (unsigned int const type,
+recv_buffer (unsigned const type,
int const proc,
int const npoints)
{
assert (type < dist::c_ndatatypes());
assert (proc >= 0 and proc < dist::size());
+ assert (npoints > 0);
typebufdesc const & typebuf = typebufs.AT(type);
procbufdesc const & procbuf = typebuf.procbufs.AT(proc);
@@ -437,9 +573,11 @@ recv_buffer (unsigned int const type,
return procbuf.recvbuf;
}
+
+
void
comm_state::
-commit_send_space (unsigned int const type,
+commit_send_space (unsigned const type,
int const proc,
int const npoints)
{
@@ -448,6 +586,7 @@ commit_send_space (unsigned int const type,
assert (type < dist::c_ndatatypes());
assert (proc >= 0 and proc < dist::size());
assert (npoints >= 0);
+ assert (npoints > 0);
typebufdesc & typebuf = typebufs.AT(type);
procbufdesc & procbuf = typebuf.procbufs.AT(proc);
procbuf.sendbuf += npoints * typebuf.datatypesize;
@@ -461,16 +600,30 @@ commit_send_space (unsigned int const type,
&procbuf.sendbufbase.front() +
procbuf.sendbufsize * typebuf.datatypesize)
{
+ if (message_size_multiplier > 1) {
+ size_t const nbytes =
+ procbuf.sendbufsize * typebuf.datatypesize *
+ (message_size_multiplier - 1);
+ memset (procbuf.sendbuf, poison_value, nbytes);
+ }
+
static Timer timer ("commit_send_space::isend");
timer.start ();
if (commstate_verbose) {
CCTK_VInfo (CCTK_THORNSTRING,
- "About to MPI_Isend to %d", (int)proc);
+ "About to MPI_Isend to processor %d for type %s",
+ proc, dist::c_datatype_name(type));
}
+ int const tag = type;
+ assert (procbuf.sendbufsize > 0);
+ assert (not use_mpi_send);
+ assert (not use_mpi_ssend);
MPI_Isend (&procbuf.sendbufbase.front(),
- procbuf.sendbufsize, typebuf.mpi_datatype,
- proc, type, dist::comm(),
- & srequests.AT(type * dist::size() + proc));
+ procbuf.sendbufsize * message_size_multiplier,
+ typebuf.mpi_datatype, proc, tag,
+ dist::comm(), &push_back(srequests));
+ assert (not procbuf.did_post_send);
+ procbuf.did_post_send = true;
if (commstate_verbose) {
CCTK_INFO ("Finished MPI_Isend");
}
@@ -479,15 +632,18 @@ commit_send_space (unsigned int const type,
}
}
+
+
void
comm_state::
-commit_recv_space (unsigned int const type,
+commit_recv_space (unsigned const type,
int const proc,
int const npoints)
{
assert (type < dist::c_ndatatypes());
assert (proc >= 0 and proc < dist::size());
assert (npoints >= 0);
+ assert (npoints > 0);
typebufdesc & typebuf = typebufs.AT(type);
procbufdesc & procbuf = typebuf.procbufs.AT(proc);
procbuf.recvbuf += npoints * typebuf.datatypesize;