From 11c4d98017cbb86d08e15fd1b549180184b58a26 Mon Sep 17 00:00:00 2001 From: Erik Schnetter Date: Thu, 3 Sep 2009 16:19:15 -0500 Subject: Import Carpet Ignore-this: 309b4dd613f4af2b84aa5d6743fdb6b3 --- Carpet/CarpetLib/src/commstate.cc | 710 +++++++++++++++++++++++--------------- 1 file changed, 433 insertions(+), 277 deletions(-) (limited to 'Carpet/CarpetLib/src/commstate.cc') diff --git a/Carpet/CarpetLib/src/commstate.cc b/Carpet/CarpetLib/src/commstate.cc index 7a00157c2..ef1b64cdb 100644 --- a/Carpet/CarpetLib/src/commstate.cc +++ b/Carpet/CarpetLib/src/commstate.cc @@ -21,241 +21,463 @@ using namespace CarpetLib; +char const * tostring (astate const & thestate) +{ + switch (thestate) { + case state_get_buffer_sizes: return "state_get_buffer_sizes"; + case state_fill_send_buffers: return "state_fill_send_buffers"; + case state_do_some_work: return "state_do_some_work"; + case state_empty_recv_buffers: return "state_empty_recv_buffers"; + case state_done: return "state_done"; + default: assert(0); abort(); + } + return NULL; +} + + + // Communication state control comm_state::comm_state () { - // A comm_state object will step through - // state_get_buffer_sizes - // state_fill_send_buffers - // state_empty_recv_buffers - DECLARE_CCTK_PARAMETERS; - + static Timer timer ("commstate::create"); timer.start (); thestate = state_get_buffer_sizes; - + typebufs.resize (dist::c_ndatatypes()); -#define INSTANTIATE(T) \ - { \ - T dummy; \ - int const type = dist::c_datatype (dummy); \ - assert (typebufs.AT(type).datatypesize == 0); \ - typebufs.AT(type).datatypesize = sizeof dummy; \ - typebufs.AT(type).mpi_datatype = dist::datatype (dummy); \ - typebufs.AT(type).procbufs.resize (dist::size()); \ +#define INSTANTIATE(T) \ + { \ + T dummy; \ + unsigned const type = dist::c_datatype (dummy); \ + typebufs.AT(type).mpi_datatype = dist::mpi_datatype (dummy); \ + typebufs.AT(type).datatypesize = sizeof dummy; \ } #include "instantiate" #undef INSTANTIATE - - srequests.resize (dist::c_ndatatypes() * dist::size(), MPI_REQUEST_NULL); - rrequests.resize (dist::c_ndatatypes() * dist::size(), MPI_REQUEST_NULL); + + srequests.reserve (dist::c_ndatatypes() * dist::size()); + rrequests.reserve (dist::c_ndatatypes() * dist::size()); timer.stop (0); } + void comm_state::step () { DECLARE_CCTK_PARAMETERS; static Timer total ("commstate::step"); total.start (); - assert (thestate != state_done); + + if (barrier_between_stages) { + // Add a barrier, ensuring e.g. that all Irecvs are posted before + // the first Isends are made + if (commstate_verbose) { + CCTK_VInfo (CCTK_THORNSTRING, + "before MPI_Barrier; state=%s", tostring(thestate)); + } + MPI_Barrier (dist::comm()); + if (commstate_verbose) { + CCTK_INFO ("after MPI_Barrier"); + } + } + switch (thestate) { - case state_get_buffer_sizes: + + + case state_get_buffer_sizes: { + + if (check_communication_schedule) { + vector sendcount(dist::size() * dist::c_ndatatypes()); + for (unsigned type = 0; type < dist::c_ndatatypes(); ++ type) { + for (int proc = 0; proc < dist::size(); ++ proc) { + sendcount.AT(proc * dist::c_ndatatypes() + type) = + typebufs.AT(type).in_use ? + typebufs.AT(type).procbufs.AT(proc).sendbufsize : + 0; + } + assert (sendcount.AT(dist::rank() * dist::c_ndatatypes() + type) == 0); + } + vector recvcount(dist::size() * dist::c_ndatatypes()); + if (commstate_verbose) { + CCTK_INFO ("before MPI_Alltoall"); + } + MPI_Alltoall (&sendcount.front(), dist::c_ndatatypes(), MPI_INT, + &recvcount.front(), dist::c_ndatatypes(), MPI_INT, + dist::comm()); + if (commstate_verbose) { + CCTK_INFO ("after MPI_Alltoall"); + } + for (unsigned type = 0; type < dist::c_ndatatypes(); ++ type) { + for (int proc = 0; proc < dist::size(); ++ proc) { + assert (recvcount.AT(proc * dist::c_ndatatypes() + type) == + (typebufs.AT(type).in_use ? + int (typebufs.AT(type).procbufs.AT(proc).recvbufsize) : + 0)); + } + assert (recvcount.AT(dist::rank() * dist::c_ndatatypes() + type) == 0); + } + } + // The sizes of the collective communication buffers are known so // now allocate them. // The receive operations are also posted here already (a clever // MPI layer may take advantage of such early posting). - num_posted_recvs = num_completed_recvs = 0; - for (int proc1 = 0; proc1 < dist::size(); ++ proc1) { - size_t const proc = - interleave_communications - ? (proc1 + dist::rank()) % dist::size() - : proc1; - - for (size_t type = 0; type < typebufs.size(); type++) { + for (unsigned type = 0; type < dist::c_ndatatypes(); ++ type) { + if (typebufs.AT(type).in_use) { - // skip unused datatype buffers - if (not typebufs.AT(type).in_use) continue; - - int datatypesize = typebufs.AT(type).datatypesize; - procbufdesc& procbuf = typebufs.AT(type).procbufs.AT(proc); - - assert (procbuf.sendbufbase.empty()); - assert (procbuf.recvbufbase.empty()); - procbuf.sendbufbase.resize (procbuf.sendbufsize*datatypesize); - procbuf.recvbufbase.resize (procbuf.recvbufsize*datatypesize); - // TODO: this may be a bit extreme, and it is only for - // internal consistency checking - if (poison_new_memory) { - memset (&procbuf.sendbufbase.front(), poison_value, - procbuf.sendbufsize*datatypesize); - memset (&procbuf.recvbufbase.front(), poison_value, - procbuf.recvbufsize*datatypesize); - } - procbuf.sendbuf = &procbuf.sendbufbase.front(); - procbuf.recvbuf = &procbuf.recvbufbase.front(); - - if (procbuf.recvbufsize > 0) { - static Timer timer ("commstate_sizes_irecv"); - timer.start (); - int const tag = - vary_tags - ? (dist::rank() + dist::size() * (proc + dist::size() * type)) % 32768 - : type; - if (commstate_verbose) { - CCTK_VInfo (CCTK_THORNSTRING, - "About to MPI_Irecv from %d", (int)proc); + for (int proc1 = 0; proc1 < dist::size(); ++ proc1) { + int const proc = + interleave_communications ? + (proc1 + dist::rank()) % dist::size() : + proc1; + + int const datatypesize = typebufs.AT(type).datatypesize; + procbufdesc & procbuf = typebufs.AT(type).procbufs.AT(proc); + + assert (procbuf.sendbufbase.empty()); + assert (procbuf.recvbufbase.empty()); + procbuf.sendbufbase.resize + (procbuf.sendbufsize * datatypesize * message_size_multiplier); + procbuf.recvbufbase.resize + (procbuf.recvbufsize * datatypesize * message_size_multiplier); + // TODO: this may be a bit extreme, and it is only for + // internal consistency checking + if (poison_new_memory) { + memset (&procbuf.sendbufbase.front(), poison_value, + procbuf.sendbufsize * datatypesize * message_size_multiplier); + memset (&procbuf.recvbufbase.front(), poison_value, + procbuf.recvbufsize * datatypesize * message_size_multiplier); } - MPI_Irecv (&procbuf.recvbufbase.front(), procbuf.recvbufsize, - typebufs.AT(type).mpi_datatype, proc, tag, - dist::comm(), &rrequests.AT(dist::size()*type + proc)); - if (commstate_verbose) { - CCTK_INFO ("Finished MPI_Irecv"); + procbuf.sendbuf = &procbuf.sendbufbase.front(); + procbuf.recvbuf = &procbuf.recvbufbase.front(); + + if (procbuf.recvbufsize > 0) { + static Timer timer ("commstate::sizes_irecv"); + timer.start (); + int const tag = type; + if (commstate_verbose) { + CCTK_VInfo (CCTK_THORNSTRING, + "About to MPI_Irecv from processor %d for type %s", + proc, dist::c_datatype_name(type)); + } + MPI_Irecv (&procbuf.recvbufbase.front(), + procbuf.recvbufsize * message_size_multiplier, + typebufs.AT(type).mpi_datatype, proc, tag, + dist::comm(), &push_back(rrequests)); + if (commstate_verbose) { + CCTK_INFO ("Finished MPI_Irecv"); + } + assert (not procbuf.did_post_recv); + procbuf.did_post_recv = true; + timer.stop (procbuf.recvbufsize * datatypesize); } - timer.stop (procbuf.recvbufsize * datatypesize); - num_posted_recvs++; - } + + } // for proc + } - } + } // for type - if (barrier_between_stages) { - // Add a barrier, to try to ensure that all Irecvs are posted - // before the first Isends are made - // (Alternative: Use MPI_Alltoallv instead) - MPI_Barrier (dist::comm()); + if (check_communication_schedule) { + for (unsigned type = 0; type < dist::c_ndatatypes(); ++ type) { + if (typebufs.AT(type).in_use) { + for (int proc = 0; proc < dist::size(); ++ proc) { + procbufdesc const & procbuf = typebufs.AT(type).procbufs.AT(proc); + assert (procbuf.did_post_recv == (procbuf.recvbufsize > 0)); + } + } + } } - // Now go and get the send buffers filled with data. - // Once a buffer is full it will be posted right away - // (see gdata::copy_into_sendbuffer() and - // gdata::interpolate_into_sendbuffer()). thestate = state_fill_send_buffers; break; + } + - case state_fill_send_buffers: + + case state_fill_send_buffers: { if (combine_sends) { - // Send the data. Do not send them sequentially, but try to - // intersperse the communications - for (int proc1 = 0; proc1 < dist::size(); ++ proc1) { - int const proc = - interleave_communications - ? (proc1 + dist::size() - dist::rank()) % dist::size() - : proc1; - - for (size_t type = 0; type < typebufs.size(); type++) { - // skip unused datatype buffers - if (not typebufs.AT(type).in_use) continue; + for (unsigned type = 0; type < dist::c_ndatatypes(); ++ type) { + if (typebufs.AT(type).in_use) { - int const datatypesize = typebufs.AT(type).datatypesize; - procbufdesc const & procbuf = typebufs.AT(type).procbufs.AT(proc); - - size_t const fillstate = - procbuf.sendbuf - &procbuf.sendbufbase.front(); - assert (fillstate == procbuf.sendbufsize * datatypesize); - - if (procbuf.sendbufsize > 0) { - int const tag = - vary_tags - ? (proc + dist::size() * (dist::rank() + dist::size() * type)) % 32768 - : type; - if (use_mpi_send) { - // use MPI_Send - static Timer timer ("commstate_send"); - timer.start (); - if (commstate_verbose) { - CCTK_VInfo (CCTK_THORNSTRING, - "About to MPI_Send to %d", (int)proc); + for (int proc1 = 0; proc1 < dist::size(); ++ proc1) { + int const proc = + interleave_communications + ? (proc1 + dist::size() - dist::rank()) % dist::size() + : proc1; + + procbufdesc & procbuf = typebufs.AT(type).procbufs.AT(proc); + if (procbuf.sendbufsize > 0) { + + int const datatypesize = typebufs.AT(type).datatypesize; + + size_t const fillstate = + procbuf.sendbuf - &procbuf.sendbufbase.front(); + assert (fillstate == procbuf.sendbufsize * datatypesize); + + // Enlarge messages for performance testing + if (message_size_multiplier > 1) { + size_t const nbytes = + procbuf.sendbufsize * datatypesize * + (message_size_multiplier - 1); +#warning "TODO" + // memset (procbuf.sendbuf, poison_value, nbytes); + memset (procbuf.sendbuf, 0, nbytes); } - MPI_Send (const_cast(&procbuf.sendbufbase.front()), - procbuf.sendbufsize, - typebufs.AT(type).mpi_datatype, proc, tag, - dist::comm()); - if (commstate_verbose) { - CCTK_INFO ("Finished MPI_Send"); + + int const tag = type; + if (use_mpi_send) { + // use MPI_Send + static Timer timer ("commstate::send"); + timer.start (); + if (commstate_verbose) { + CCTK_VInfo (CCTK_THORNSTRING, + "About to MPI_Send to processor %d for type %s", + proc, dist::c_datatype_name(type)); + } + MPI_Send (const_cast(&procbuf.sendbufbase.front()), + procbuf.sendbufsize * message_size_multiplier, + typebufs.AT(type).mpi_datatype, proc, tag, + dist::comm()); + assert (not procbuf.did_post_send); + procbuf.did_post_send = true; + if (commstate_verbose) { + CCTK_INFO ("Finished MPI_Send"); + } + timer.stop (procbuf.sendbufsize * datatypesize); + } else if (use_mpi_ssend) { + // use MPI_Ssend + static Timer timer ("commstate::ssend"); + timer.start (); + if (commstate_verbose) { + CCTK_VInfo (CCTK_THORNSTRING, + "About to MPI_Ssend to processor %d for type %s", + proc, dist::c_datatype_name(type)); + } + MPI_Ssend (const_cast(&procbuf.sendbufbase.front()), + procbuf.sendbufsize * message_size_multiplier, + typebufs.AT(type).mpi_datatype, proc, tag, + dist::comm()); + assert (not procbuf.did_post_send); + procbuf.did_post_send = true; + if (commstate_verbose) { + CCTK_INFO ("Finished MPI_Ssend"); + } + timer.stop (procbuf.sendbufsize * datatypesize); + } else { + // use MPI_Isend + static Timer timer ("commstate::isend"); + timer.start (); + if (commstate_verbose) { + CCTK_VInfo (CCTK_THORNSTRING, + "About to MPI_Isend to processor %d for type %s", + proc, dist::c_datatype_name(type)); + } + MPI_Isend (const_cast(&procbuf.sendbufbase.front()), + procbuf.sendbufsize * message_size_multiplier, + typebufs.AT(type).mpi_datatype, proc, tag, + dist::comm(), &push_back(srequests)); + assert (not procbuf.did_post_send); + procbuf.did_post_send = true; + if (commstate_verbose) { + CCTK_INFO ("Finished MPI_Isend"); + } + timer.stop (procbuf.sendbufsize * datatypesize); } - srequests.AT(dist::size()*type + proc) = MPI_REQUEST_NULL; - timer.stop (procbuf.sendbufsize * datatypesize); - } else if (use_mpi_ssend) { - // use MPI_Ssend - static Timer timer ("commstate_ssend"); + + } + } // for proc + + } + } // for type + } // if combine_sends + + if (check_communication_schedule) { + for (unsigned type = 0; type < dist::c_ndatatypes(); ++ type) { + if (typebufs.AT(type).in_use) { + for (int proc = 0; proc < dist::size(); ++ proc) { + procbufdesc const & procbuf = typebufs.AT(type).procbufs.AT(proc); + assert (procbuf.did_post_send == (procbuf.sendbufsize > 0)); + } + } + } + } + + thestate = state_do_some_work; + break; + } + + + + case state_do_some_work: { + static Timer timer ("commstate::do_some_work::waitall"); + timer.start (); + if (commstate_verbose) { + CCTK_INFO ("About to MPI_Waitall"); + } + MPI_Waitall (rrequests.size(), &rrequests.front(), MPI_STATUSES_IGNORE); + if (commstate_verbose) { + CCTK_INFO ("Finished MPI_Waitall"); + } + timer.stop (0); + + thestate = state_empty_recv_buffers; + break; + } + + + + case state_empty_recv_buffers: { + static Timer timer ("commstate::empty_recv_buffers::waitall"); + timer.start (); + if (commstate_verbose) { + CCTK_INFO ("About to MPI_Waitall"); + } + MPI_Waitall (srequests.size(), &srequests.front(), MPI_STATUSES_IGNORE); + if (commstate_verbose) { + CCTK_INFO ("Finished MPI_Waitall"); + } + timer.stop (0); + + // Transfer messages again for performance testing + for (int n = 1; n < message_count_multiplier; ++ n) { + + srequests.clear(); + srequests.reserve (dist::c_ndatatypes() * dist::size()); + rrequests.clear(); + rrequests.reserve (dist::c_ndatatypes() * dist::size()); + + // Irecv + for (unsigned type = 0; type < dist::c_ndatatypes(); ++ type) { + if (typebufs.AT(type).in_use) { + + for (int proc1 = 0; proc1 < dist::size(); ++ proc1) { + int const proc = + interleave_communications ? + (proc1 + dist::rank()) % dist::size() : + proc1; + + procbufdesc & procbuf = typebufs.AT(type).procbufs.AT(proc); + + if (procbuf.recvbufsize > 0) { + static Timer timer ("commstate::message_count_multiplier::irecv"); timer.start (); + int const tag = type; if (commstate_verbose) { CCTK_VInfo (CCTK_THORNSTRING, - "About to MPI_Ssend to %d", (int)proc); + "About to MPI_Irecv from processor %d for type %s", + proc, dist::c_datatype_name(type)); } - MPI_Ssend (const_cast(&procbuf.sendbufbase.front()), - procbuf.sendbufsize, + MPI_Irecv (&procbuf.recvbufbase.front(), + procbuf.recvbufsize * message_size_multiplier, typebufs.AT(type).mpi_datatype, proc, tag, - dist::comm()); + dist::comm(), &push_back(rrequests)); if (commstate_verbose) { - CCTK_INFO ("Finished MPI_Ssend"); + CCTK_INFO ("Finished MPI_Irecv"); } - srequests.AT(dist::size()*type + proc) = MPI_REQUEST_NULL; - timer.stop (procbuf.sendbufsize * datatypesize); - } else { - // use MPI_Isend - static Timer timer ("commstate_isend"); + timer.stop (procbuf.recvbufsize * typebufs.AT(type).datatypesize); + } + + } // for proc + + } + } // for type + + // Isend + for (unsigned type = 0; type < dist::c_ndatatypes(); ++ type) { + if (typebufs.AT(type).in_use) { + + for (int proc1 = 0; proc1 < dist::size(); ++ proc1) { + int const proc = + interleave_communications + ? (proc1 + dist::size() - dist::rank()) % dist::size() + : proc1; + + procbufdesc & procbuf = typebufs.AT(type).procbufs.AT(proc); + + if (procbuf.sendbufsize > 0) { + int const tag = type; + assert (not use_mpi_send); + assert (not use_mpi_ssend); + static Timer timer ("commstate::message_count_multiplier::isend"); timer.start (); if (commstate_verbose) { - CCTK_VWarn (3, __LINE__, __FILE__, CCTK_THORNSTRING, - "About to MPI_Isend to %d", (int)proc); + CCTK_VInfo (CCTK_THORNSTRING, + "About to MPI_Isend to processor %d for type %s", + proc, dist::c_datatype_name(type)); } MPI_Isend (const_cast(&procbuf.sendbufbase.front()), - procbuf.sendbufsize, + procbuf.sendbufsize * message_size_multiplier, typebufs.AT(type).mpi_datatype, proc, tag, - dist::comm(), &srequests.AT(dist::size()*type + proc)); + dist::comm(), &push_back(srequests)); if (commstate_verbose) { CCTK_INFO ("Finished MPI_Isend"); } - timer.stop (procbuf.sendbufsize * datatypesize); + timer.stop (procbuf.sendbufsize * typebufs.AT(type).datatypesize); } - } + + } // for proc - } // for type - - } // for proc - } + } + } // for type + + // Waitall + { + static Timer timer ("commstate::message_count_multiplier::waitall(irecv)"); + timer.start (); + if (commstate_verbose) { + CCTK_INFO ("About to MPI_Waitall"); + } + MPI_Waitall (rrequests.size(), &rrequests.front(), MPI_STATUSES_IGNORE); + if (commstate_verbose) { + CCTK_INFO ("Finished MPI_Waitall"); + } + timer.stop (0); + } + + // Waitall + { + static Timer timer ("commstate::message_count_multiplier::waitall(isend)"); + timer.start (); + if (commstate_verbose) { + CCTK_INFO ("About to MPI_Waitall"); + } + MPI_Waitall (srequests.size(), &srequests.front(), MPI_STATUSES_IGNORE); + if (commstate_verbose) { + CCTK_INFO ("Finished MPI_Waitall"); + } + timer.stop (0); + } + + } // for n - // Now fall through to the next state in which the recv buffers - // are emptied as soon as data has arrived. - thestate = state_do_some_work; + thestate = state_done; break; + } + + + + case state_done: { + assert (0); abort(); + } - case state_do_some_work: - // Now fall through to the next state in which the recv buffers - // are emptied as soon as data has arrived. - thestate = state_empty_recv_buffers; - case state_empty_recv_buffers: - // Finish (at least one of) the posted communications - if (not AllPostedCommunicationsFinished ()) { - // No state change if there are still outstanding - // communications; do another comm_state loop iteration. - } else { - // Everything is done so release the collective communication buffers. - for (size_t type = 0; type < typebufs.size(); type++) { - for (size_t proc = 0; proc < typebufs.AT(type).procbufs.size(); proc++) { - typebufs.AT(type).procbufs.AT(proc).sendbufbase.clear(); - typebufs.AT(type).procbufs.AT(proc).recvbufbase.clear(); - } - } - thestate = state_done; - } - break; default: - assert (0 && "invalid state"); + assert (0); abort(); } + + + total.stop (0); } -bool comm_state::done () +bool comm_state::done () const { return thestate == state_done; } @@ -264,116 +486,16 @@ bool comm_state::done () comm_state::~comm_state () { DECLARE_CCTK_PARAMETERS; - + assert (thestate == state_done or thestate == state_get_buffer_sizes); } -// wait for completion of posted collective buffer sends/receives -// -// This function will wait for all of the posted receive operations to -// finish. -// -// It returns true if all posted communications have been completed. -bool comm_state::AllPostedCommunicationsFinished () -{ - DECLARE_CCTK_PARAMETERS; - - // check if all outstanding receives have been completed already - if (num_posted_recvs == num_completed_recvs) { - // finalize the outstanding sends in one go - if (reduce_mpi_waitall) { - size_t nreqs = 0; - for (size_t i=0; i reqs(nreqs); - nreqs = 0; - for (size_t i=0; i reqs(nreqs); - nreqs = 0; - for (size_t i=0; i= 0 and proc < dist::size()); assert (npoints >= 0); typebufdesc & typebuf = typebufs.AT(type); + if (not typebuf.in_use) { + typebuf.procbufs.resize (dist::size()); + typebuf.in_use = true; + } procbufdesc & procbuf = typebuf.procbufs.AT(proc); procbuf.sendbufsize += npoints; - typebuf.in_use = true; } + + void comm_state:: -reserve_recv_space (unsigned int const type, +reserve_recv_space (unsigned const type, int const proc, int const npoints) { @@ -396,19 +523,25 @@ reserve_recv_space (unsigned int const type, assert (proc >= 0 and proc < dist::size()); assert (npoints >= 0); typebufdesc & typebuf = typebufs.AT(type); + if (not typebuf.in_use) { + typebuf.procbufs.resize (dist::size()); + typebuf.in_use = true; + } procbufdesc & procbuf = typebuf.procbufs.AT(proc); procbuf.recvbufsize += npoints; - typebuf.in_use = true; } + + void * comm_state:: -send_buffer (unsigned int const type, +send_buffer (unsigned const type, int const proc, int const npoints) { assert (type < dist::c_ndatatypes()); assert (proc >= 0 and proc < dist::size()); + assert (npoints > 0); typebufdesc const & typebuf = typebufs.AT(type); procbufdesc const & procbuf = typebuf.procbufs.AT(proc); @@ -419,14 +552,17 @@ send_buffer (unsigned int const type, return procbuf.sendbuf; } + + void * comm_state:: -recv_buffer (unsigned int const type, +recv_buffer (unsigned const type, int const proc, int const npoints) { assert (type < dist::c_ndatatypes()); assert (proc >= 0 and proc < dist::size()); + assert (npoints > 0); typebufdesc const & typebuf = typebufs.AT(type); procbufdesc const & procbuf = typebuf.procbufs.AT(proc); @@ -437,9 +573,11 @@ recv_buffer (unsigned int const type, return procbuf.recvbuf; } + + void comm_state:: -commit_send_space (unsigned int const type, +commit_send_space (unsigned const type, int const proc, int const npoints) { @@ -448,6 +586,7 @@ commit_send_space (unsigned int const type, assert (type < dist::c_ndatatypes()); assert (proc >= 0 and proc < dist::size()); assert (npoints >= 0); + assert (npoints > 0); typebufdesc & typebuf = typebufs.AT(type); procbufdesc & procbuf = typebuf.procbufs.AT(proc); procbuf.sendbuf += npoints * typebuf.datatypesize; @@ -461,16 +600,30 @@ commit_send_space (unsigned int const type, &procbuf.sendbufbase.front() + procbuf.sendbufsize * typebuf.datatypesize) { + if (message_size_multiplier > 1) { + size_t const nbytes = + procbuf.sendbufsize * typebuf.datatypesize * + (message_size_multiplier - 1); + memset (procbuf.sendbuf, poison_value, nbytes); + } + static Timer timer ("commit_send_space::isend"); timer.start (); if (commstate_verbose) { CCTK_VInfo (CCTK_THORNSTRING, - "About to MPI_Isend to %d", (int)proc); + "About to MPI_Isend to processor %d for type %s", + proc, dist::c_datatype_name(type)); } + int const tag = type; + assert (procbuf.sendbufsize > 0); + assert (not use_mpi_send); + assert (not use_mpi_ssend); MPI_Isend (&procbuf.sendbufbase.front(), - procbuf.sendbufsize, typebuf.mpi_datatype, - proc, type, dist::comm(), - & srequests.AT(type * dist::size() + proc)); + procbuf.sendbufsize * message_size_multiplier, + typebuf.mpi_datatype, proc, tag, + dist::comm(), &push_back(srequests)); + assert (not procbuf.did_post_send); + procbuf.did_post_send = true; if (commstate_verbose) { CCTK_INFO ("Finished MPI_Isend"); } @@ -479,15 +632,18 @@ commit_send_space (unsigned int const type, } } + + void comm_state:: -commit_recv_space (unsigned int const type, +commit_recv_space (unsigned const type, int const proc, int const npoints) { assert (type < dist::c_ndatatypes()); assert (proc >= 0 and proc < dist::size()); assert (npoints >= 0); + assert (npoints > 0); typebufdesc & typebuf = typebufs.AT(type); procbufdesc & procbuf = typebuf.procbufs.AT(proc); procbuf.recvbuf += npoints * typebuf.datatypesize; -- cgit v1.2.3