aboutsummaryrefslogtreecommitdiff
path: root/Carpet/CarpetLib/src/commstate.cc
diff options
context:
space:
mode:
Diffstat (limited to 'Carpet/CarpetLib/src/commstate.cc')
-rw-r--r--Carpet/CarpetLib/src/commstate.cc118
1 files changed, 77 insertions, 41 deletions
diff --git a/Carpet/CarpetLib/src/commstate.cc b/Carpet/CarpetLib/src/commstate.cc
index 2a720f059..874a2a661 100644
--- a/Carpet/CarpetLib/src/commstate.cc
+++ b/Carpet/CarpetLib/src/commstate.cc
@@ -45,6 +45,10 @@ comm_state::comm_state (int vartype_) : vartype(vartype_)
default: assert (0);
}
collbufs.resize (dist::size());
+ rrequests.resize (dist::size(), MPI_REQUEST_NULL);
+ srequests.resize (dist::size(), MPI_REQUEST_NULL);
+ recvbuffers_ready.resize (dist::size());
+
} else {
thestate = state_recv;
}
@@ -91,43 +95,51 @@ void comm_state::step ()
case state_get_buffer_sizes:
// The sizes of the collective communication buffers are known
- // so now allocate the send buffers.
- for (int i = 0; i < collbufs.size(); i++) {
-//fprintf (stderr, "proc %d: allocating buf[%d] = %d elems\n", dist::rank(), i, collbufs[i].sendbufsize);
+ // so now allocate them.
+ // The receive operations are also posted here already
+ // so that we can use ready mode sends (MPI_Irsend()) later.
+ num_posted_recvs = num_completed_recvs = 0;
+
+ for (size_t i = 0; i < collbufs.size(); i++) {
collbufs[i].sendbufbase = new char[collbufs[i].sendbufsize*vartypesize];
+ collbufs[i].recvbufbase = new char[collbufs[i].recvbufsize*vartypesize];
collbufs[i].sendbuf = collbufs[i].sendbufbase;
+ collbufs[i].recvbuf = collbufs[i].recvbufbase;
+
+ if (collbufs[i].recvbufsize > 0) {
+ MPI_Irecv (collbufs[i].recvbufbase, collbufs[i].recvbufsize,
+ datatype, i, MPI_ANY_TAG, dist::comm, &rrequests[i]);
+ num_posted_recvs++;
+ }
}
// Now go and get the send buffers filled with data.
+ // Once a buffer is full it will get posted right away
+ // (see gdata::copy_into_sendbuffer() and
+ // gdata::interpolate_into_sendbuffer()).
thestate = state_fill_send_buffers;
break;
case state_fill_send_buffers:
- // The send buffers contain all the data so now allocate the recv buffers.
- for (int i = 0; i < collbufs.size(); i++) {
- collbufs[i].recvbufbase = new char[collbufs[i].recvbufsize*vartypesize];
- collbufs[i].recvbuf = collbufs[i].recvbufbase;
- }
-
- // Exchange pairs of send/recv buffers between all processors.
- ExchangeBuffers ();
-
- // The send buffers are not needed anymore.
- for (int i = 0; i < collbufs.size(); i++) {
- delete[] collbufs[i].sendbufbase;
- }
-
- // All data has been received so now go and empty the recv buffers.
+ // Now fall through to the next state in which the recv buffers
+ // are emptied as soon as data has arrived.
thestate = state_empty_recv_buffers;
- break;
+
case state_empty_recv_buffers:
- // Finally release the recv buffers.
- for (int i = 0; i < collbufs.size(); i++) {
- delete[] collbufs[i].recvbufbase;
+ // Finish (at least one of) the posted communications
+ if (! AllPostedCommunicationsFinished (use_waitall)) {
+ // No state change if there are still outstanding communications;
+ // do another comm_state loop iteration.
+ } else {
+ // Everything is done so release the collective communication buffers.
+ for (size_t i = 0; i < collbufs.size(); i++) {
+ delete[] collbufs[i].sendbufbase;
+ delete[] collbufs[i].recvbufbase;
+ }
+ thestate = state_done;
}
-
- thestate = state_done;
break;
+
case state_done:
assert (0);
default:
@@ -152,30 +164,54 @@ comm_state::~comm_state ()
}
-// exchange the collective communication buffers between processors
-void comm_state::ExchangeBuffers ()
+// wait for completion of posted collective buffer sends/receives
+//
+// Depending on use_waitall, this function will wait for all at once (true)
+// or at least one (false) of the posted receive operations to finish.
+//
+// It returns true if all posted communications have been completed.
+bool comm_state::AllPostedCommunicationsFinished (bool use_waitall)
{
- vector<MPI_Request> rrequests(collbufs.size(), MPI_REQUEST_NULL);
- vector<MPI_Request> srequests(collbufs.size(), MPI_REQUEST_NULL);
+ // check if all outstanding receives have been completed already
+ if (num_posted_recvs == num_completed_recvs) {
+ // finalize the outstanding sends in one go
+ MPI_Waitall (srequests.size(), &srequests.front(), MPI_STATUSES_IGNORE);
- // loop over all buffers (alias processors) and initiate the communications
- for (int proc = 0; proc < collbufs.size(); proc++) {
+ return true;
+ }
+
+ // reset completion flag for all receive buffers
+ for (size_t i = 0; i < recvbuffers_ready.size(); i++) {
+ recvbuffers_ready[i] = false;
+ }
- // don't send zero-sized messages (this covers 'proc == rank')
- if (collbufs[proc].recvbufsize > 0) {
- MPI_Irecv (collbufs[proc].recvbufbase, collbufs[proc].recvbufsize,
- datatype, proc, MPI_ANY_TAG, dist::comm, &rrequests[proc]);
+ if (use_waitall) {
+ // mark all posted recveive buffers as ready
+ for (size_t i = 0; i < rrequests.size(); i++) {
+ recvbuffers_ready[i] = rrequests[i] != MPI_REQUEST_NULL;
}
- if (collbufs[proc].sendbufsize > 0) {
- MPI_Isend (collbufs[proc].sendbufbase, collbufs[proc].sendbufsize,
- datatype, proc, 0, dist::comm, &srequests[proc]);
+
+ // wait for completion of all posted receive operations
+ MPI_Waitall (rrequests.size(), &rrequests.front(), MPI_STATUSES_IGNORE);
+ num_completed_recvs = num_posted_recvs;
+ } else {
+ int num_completed_recvs_ = 0;
+ vector<int> completed_recvs(rrequests.size(), -1);
+
+ // wait for completion of at least one posted receive operation
+ MPI_Waitsome (rrequests.size(), &rrequests.front(), &num_completed_recvs_,
+ &completed_recvs.front(), MPI_STATUSES_IGNORE);
+ assert (0 < num_completed_recvs_);
+ num_completed_recvs += num_completed_recvs_;
+
+ // mark the recveive buffers of completed communications as ready
+ for (int i = 0; i < num_completed_recvs_; i++) {
+ assert (rrequests.at(completed_recvs.at(i)) == MPI_REQUEST_NULL);
+ recvbuffers_ready.at(completed_recvs.at(i)) = true;
}
}
- // finalize the outstanding communications
- // FIXME: use MPI_Waitsome() on rrequests
- MPI_Waitall (rrequests.size(), &rrequests.front(), MPI_STATUSES_IGNORE);
- MPI_Waitall (srequests.size(), &srequests.front(), MPI_STATUSES_IGNORE);
+ return (false);
}