diff options
Diffstat (limited to 'Carpet/CarpetLib/src/commstate.cc')
-rw-r--r-- | Carpet/CarpetLib/src/commstate.cc | 118 |
1 files changed, 77 insertions, 41 deletions
diff --git a/Carpet/CarpetLib/src/commstate.cc b/Carpet/CarpetLib/src/commstate.cc index 2a720f059..874a2a661 100644 --- a/Carpet/CarpetLib/src/commstate.cc +++ b/Carpet/CarpetLib/src/commstate.cc @@ -45,6 +45,10 @@ comm_state::comm_state (int vartype_) : vartype(vartype_) default: assert (0); } collbufs.resize (dist::size()); + rrequests.resize (dist::size(), MPI_REQUEST_NULL); + srequests.resize (dist::size(), MPI_REQUEST_NULL); + recvbuffers_ready.resize (dist::size()); + } else { thestate = state_recv; } @@ -91,43 +95,51 @@ void comm_state::step () case state_get_buffer_sizes: // The sizes of the collective communication buffers are known - // so now allocate the send buffers. - for (int i = 0; i < collbufs.size(); i++) { -//fprintf (stderr, "proc %d: allocating buf[%d] = %d elems\n", dist::rank(), i, collbufs[i].sendbufsize); + // so now allocate them. + // The receive operations are also posted here already + // so that we can use ready mode sends (MPI_Irsend()) later. + num_posted_recvs = num_completed_recvs = 0; + + for (size_t i = 0; i < collbufs.size(); i++) { collbufs[i].sendbufbase = new char[collbufs[i].sendbufsize*vartypesize]; + collbufs[i].recvbufbase = new char[collbufs[i].recvbufsize*vartypesize]; collbufs[i].sendbuf = collbufs[i].sendbufbase; + collbufs[i].recvbuf = collbufs[i].recvbufbase; + + if (collbufs[i].recvbufsize > 0) { + MPI_Irecv (collbufs[i].recvbufbase, collbufs[i].recvbufsize, + datatype, i, MPI_ANY_TAG, dist::comm, &rrequests[i]); + num_posted_recvs++; + } } // Now go and get the send buffers filled with data. + // Once a buffer is full it will get posted right away + // (see gdata::copy_into_sendbuffer() and + // gdata::interpolate_into_sendbuffer()). thestate = state_fill_send_buffers; break; case state_fill_send_buffers: - // The send buffers contain all the data so now allocate the recv buffers. - for (int i = 0; i < collbufs.size(); i++) { - collbufs[i].recvbufbase = new char[collbufs[i].recvbufsize*vartypesize]; - collbufs[i].recvbuf = collbufs[i].recvbufbase; - } - - // Exchange pairs of send/recv buffers between all processors. - ExchangeBuffers (); - - // The send buffers are not needed anymore. - for (int i = 0; i < collbufs.size(); i++) { - delete[] collbufs[i].sendbufbase; - } - - // All data has been received so now go and empty the recv buffers. + // Now fall through to the next state in which the recv buffers + // are emptied as soon as data has arrived. thestate = state_empty_recv_buffers; - break; + case state_empty_recv_buffers: - // Finally release the recv buffers. - for (int i = 0; i < collbufs.size(); i++) { - delete[] collbufs[i].recvbufbase; + // Finish (at least one of) the posted communications + if (! AllPostedCommunicationsFinished (use_waitall)) { + // No state change if there are still outstanding communications; + // do another comm_state loop iteration. + } else { + // Everything is done so release the collective communication buffers. + for (size_t i = 0; i < collbufs.size(); i++) { + delete[] collbufs[i].sendbufbase; + delete[] collbufs[i].recvbufbase; + } + thestate = state_done; } - - thestate = state_done; break; + case state_done: assert (0); default: @@ -152,30 +164,54 @@ comm_state::~comm_state () } -// exchange the collective communication buffers between processors -void comm_state::ExchangeBuffers () +// wait for completion of posted collective buffer sends/receives +// +// Depending on use_waitall, this function will wait for all at once (true) +// or at least one (false) of the posted receive operations to finish. +// +// It returns true if all posted communications have been completed. +bool comm_state::AllPostedCommunicationsFinished (bool use_waitall) { - vector<MPI_Request> rrequests(collbufs.size(), MPI_REQUEST_NULL); - vector<MPI_Request> srequests(collbufs.size(), MPI_REQUEST_NULL); + // check if all outstanding receives have been completed already + if (num_posted_recvs == num_completed_recvs) { + // finalize the outstanding sends in one go + MPI_Waitall (srequests.size(), &srequests.front(), MPI_STATUSES_IGNORE); - // loop over all buffers (alias processors) and initiate the communications - for (int proc = 0; proc < collbufs.size(); proc++) { + return true; + } + + // reset completion flag for all receive buffers + for (size_t i = 0; i < recvbuffers_ready.size(); i++) { + recvbuffers_ready[i] = false; + } - // don't send zero-sized messages (this covers 'proc == rank') - if (collbufs[proc].recvbufsize > 0) { - MPI_Irecv (collbufs[proc].recvbufbase, collbufs[proc].recvbufsize, - datatype, proc, MPI_ANY_TAG, dist::comm, &rrequests[proc]); + if (use_waitall) { + // mark all posted recveive buffers as ready + for (size_t i = 0; i < rrequests.size(); i++) { + recvbuffers_ready[i] = rrequests[i] != MPI_REQUEST_NULL; } - if (collbufs[proc].sendbufsize > 0) { - MPI_Isend (collbufs[proc].sendbufbase, collbufs[proc].sendbufsize, - datatype, proc, 0, dist::comm, &srequests[proc]); + + // wait for completion of all posted receive operations + MPI_Waitall (rrequests.size(), &rrequests.front(), MPI_STATUSES_IGNORE); + num_completed_recvs = num_posted_recvs; + } else { + int num_completed_recvs_ = 0; + vector<int> completed_recvs(rrequests.size(), -1); + + // wait for completion of at least one posted receive operation + MPI_Waitsome (rrequests.size(), &rrequests.front(), &num_completed_recvs_, + &completed_recvs.front(), MPI_STATUSES_IGNORE); + assert (0 < num_completed_recvs_); + num_completed_recvs += num_completed_recvs_; + + // mark the recveive buffers of completed communications as ready + for (int i = 0; i < num_completed_recvs_; i++) { + assert (rrequests.at(completed_recvs.at(i)) == MPI_REQUEST_NULL); + recvbuffers_ready.at(completed_recvs.at(i)) = true; } } - // finalize the outstanding communications - // FIXME: use MPI_Waitsome() on rrequests - MPI_Waitall (rrequests.size(), &rrequests.front(), MPI_STATUSES_IGNORE); - MPI_Waitall (srequests.size(), &srequests.front(), MPI_STATUSES_IGNORE); + return (false); } |