aboutsummaryrefslogtreecommitdiff
path: root/Carpet
diff options
context:
space:
mode:
authorThomas Radke <tradke@aei.mpg.de>2005-04-11 12:22:00 +0000
committerThomas Radke <tradke@aei.mpg.de>2005-04-11 12:22:00 +0000
commit5386023def644841cad93ade7380e088faecb0f3 (patch)
tree5968b02987063f38d8c789d5f08222d2119f3383 /Carpet
parent9e4e8bcf147a2c84b40a6836e3e1a27bd772d6de (diff)
CarpetLib: some optimisations for the collective buffers communication scheme
* Receive operations are posted earlier now (don't wait until send buffers are filled). * A send operation is posted as soon as its send buffer is full (don't wait until all send buffers have been filled). * MPI_Irsend() is used instead of MPI_Isend() This probably doesn't make a difference with most MPI implementations. * Use MPI_Waitsome() to allow for overlapping of communication and computation to some extent: data from already finished receive operations can be copied back while active receive operations are still going on. MPI_Waitsome() is now called (instead of MPI_Waitall()) to wait for (one or more) posted receive operations to finish. The receive buffers for those operations are then flagged as ready for data copying. The drawback of this overlapping communication/computation scheme is that the comm_state loop may be iterated more often now. My benchmarks on up to 16 processors showed no performance win compared to using MPI_Waitall() (in fact, the performance decreased). Maybe it performs better on larger numbers of processors when there is more potential for network congestion. The feature can be turned on/off by setting CarpetLib::use_waitall to yes/no. For now I recommend using CarpetLib::use_waitall = "yes" (which is not the default setting). darcs-hash:20050411122235-776a0-e4f4179f46fce120572231b19cacb69c940f7b82.gz
Diffstat (limited to 'Carpet')
-rw-r--r--Carpet/CarpetLib/src/commstate.cc118
-rw-r--r--Carpet/CarpetLib/src/commstate.hh24
-rw-r--r--Carpet/CarpetLib/src/gdata.cc65
3 files changed, 137 insertions, 70 deletions
diff --git a/Carpet/CarpetLib/src/commstate.cc b/Carpet/CarpetLib/src/commstate.cc
index 2a720f059..874a2a661 100644
--- a/Carpet/CarpetLib/src/commstate.cc
+++ b/Carpet/CarpetLib/src/commstate.cc
@@ -45,6 +45,10 @@ comm_state::comm_state (int vartype_) : vartype(vartype_)
default: assert (0);
}
collbufs.resize (dist::size());
+ rrequests.resize (dist::size(), MPI_REQUEST_NULL);
+ srequests.resize (dist::size(), MPI_REQUEST_NULL);
+ recvbuffers_ready.resize (dist::size());
+
} else {
thestate = state_recv;
}
@@ -91,43 +95,51 @@ void comm_state::step ()
case state_get_buffer_sizes:
// The sizes of the collective communication buffers are known
- // so now allocate the send buffers.
- for (int i = 0; i < collbufs.size(); i++) {
-//fprintf (stderr, "proc %d: allocating buf[%d] = %d elems\n", dist::rank(), i, collbufs[i].sendbufsize);
+ // so now allocate them.
+ // The receive operations are also posted here already
+ // so that we can use ready mode sends (MPI_Irsend()) later.
+ num_posted_recvs = num_completed_recvs = 0;
+
+ for (size_t i = 0; i < collbufs.size(); i++) {
collbufs[i].sendbufbase = new char[collbufs[i].sendbufsize*vartypesize];
+ collbufs[i].recvbufbase = new char[collbufs[i].recvbufsize*vartypesize];
collbufs[i].sendbuf = collbufs[i].sendbufbase;
+ collbufs[i].recvbuf = collbufs[i].recvbufbase;
+
+ if (collbufs[i].recvbufsize > 0) {
+ MPI_Irecv (collbufs[i].recvbufbase, collbufs[i].recvbufsize,
+ datatype, i, MPI_ANY_TAG, dist::comm, &rrequests[i]);
+ num_posted_recvs++;
+ }
}
// Now go and get the send buffers filled with data.
+ // Once a buffer is full it will get posted right away
+ // (see gdata::copy_into_sendbuffer() and
+ // gdata::interpolate_into_sendbuffer()).
thestate = state_fill_send_buffers;
break;
case state_fill_send_buffers:
- // The send buffers contain all the data so now allocate the recv buffers.
- for (int i = 0; i < collbufs.size(); i++) {
- collbufs[i].recvbufbase = new char[collbufs[i].recvbufsize*vartypesize];
- collbufs[i].recvbuf = collbufs[i].recvbufbase;
- }
-
- // Exchange pairs of send/recv buffers between all processors.
- ExchangeBuffers ();
-
- // The send buffers are not needed anymore.
- for (int i = 0; i < collbufs.size(); i++) {
- delete[] collbufs[i].sendbufbase;
- }
-
- // All data has been received so now go and empty the recv buffers.
+ // Now fall through to the next state in which the recv buffers
+ // are emptied as soon as data has arrived.
thestate = state_empty_recv_buffers;
- break;
+
case state_empty_recv_buffers:
- // Finally release the recv buffers.
- for (int i = 0; i < collbufs.size(); i++) {
- delete[] collbufs[i].recvbufbase;
+ // Finish (at least one of) the posted communications
+ if (! AllPostedCommunicationsFinished (use_waitall)) {
+ // No state change if there are still outstanding communications;
+ // do another comm_state loop iteration.
+ } else {
+ // Everything is done so release the collective communication buffers.
+ for (size_t i = 0; i < collbufs.size(); i++) {
+ delete[] collbufs[i].sendbufbase;
+ delete[] collbufs[i].recvbufbase;
+ }
+ thestate = state_done;
}
-
- thestate = state_done;
break;
+
case state_done:
assert (0);
default:
@@ -152,30 +164,54 @@ comm_state::~comm_state ()
}
-// exchange the collective communication buffers between processors
-void comm_state::ExchangeBuffers ()
+// wait for completion of posted collective buffer sends/receives
+//
+// Depending on use_waitall, this function will wait for all at once (true)
+// or at least one (false) of the posted receive operations to finish.
+//
+// It returns true if all posted communications have been completed.
+bool comm_state::AllPostedCommunicationsFinished (bool use_waitall)
{
- vector<MPI_Request> rrequests(collbufs.size(), MPI_REQUEST_NULL);
- vector<MPI_Request> srequests(collbufs.size(), MPI_REQUEST_NULL);
+ // check if all outstanding receives have been completed already
+ if (num_posted_recvs == num_completed_recvs) {
+ // finalize the outstanding sends in one go
+ MPI_Waitall (srequests.size(), &srequests.front(), MPI_STATUSES_IGNORE);
- // loop over all buffers (alias processors) and initiate the communications
- for (int proc = 0; proc < collbufs.size(); proc++) {
+ return true;
+ }
+
+ // reset completion flag for all receive buffers
+ for (size_t i = 0; i < recvbuffers_ready.size(); i++) {
+ recvbuffers_ready[i] = false;
+ }
- // don't send zero-sized messages (this covers 'proc == rank')
- if (collbufs[proc].recvbufsize > 0) {
- MPI_Irecv (collbufs[proc].recvbufbase, collbufs[proc].recvbufsize,
- datatype, proc, MPI_ANY_TAG, dist::comm, &rrequests[proc]);
+ if (use_waitall) {
+ // mark all posted recveive buffers as ready
+ for (size_t i = 0; i < rrequests.size(); i++) {
+ recvbuffers_ready[i] = rrequests[i] != MPI_REQUEST_NULL;
}
- if (collbufs[proc].sendbufsize > 0) {
- MPI_Isend (collbufs[proc].sendbufbase, collbufs[proc].sendbufsize,
- datatype, proc, 0, dist::comm, &srequests[proc]);
+
+ // wait for completion of all posted receive operations
+ MPI_Waitall (rrequests.size(), &rrequests.front(), MPI_STATUSES_IGNORE);
+ num_completed_recvs = num_posted_recvs;
+ } else {
+ int num_completed_recvs_ = 0;
+ vector<int> completed_recvs(rrequests.size(), -1);
+
+ // wait for completion of at least one posted receive operation
+ MPI_Waitsome (rrequests.size(), &rrequests.front(), &num_completed_recvs_,
+ &completed_recvs.front(), MPI_STATUSES_IGNORE);
+ assert (0 < num_completed_recvs_);
+ num_completed_recvs += num_completed_recvs_;
+
+ // mark the recveive buffers of completed communications as ready
+ for (int i = 0; i < num_completed_recvs_; i++) {
+ assert (rrequests.at(completed_recvs.at(i)) == MPI_REQUEST_NULL);
+ recvbuffers_ready.at(completed_recvs.at(i)) = true;
}
}
- // finalize the outstanding communications
- // FIXME: use MPI_Waitsome() on rrequests
- MPI_Waitall (rrequests.size(), &rrequests.front(), MPI_STATUSES_IGNORE);
- MPI_Waitall (srequests.size(), &srequests.front(), MPI_STATUSES_IGNORE);
+ return (false);
}
diff --git a/Carpet/CarpetLib/src/commstate.hh b/Carpet/CarpetLib/src/commstate.hh
index 9c5239b21..21c05320e 100644
--- a/Carpet/CarpetLib/src/commstate.hh
+++ b/Carpet/CarpetLib/src/commstate.hh
@@ -90,6 +90,7 @@ public:
// the following members are used for collective communications
//////////////////////////////////////////////////////////////////////////
+public:
// CCTK vartype used for this comm_state object
int vartype;
@@ -97,9 +98,6 @@ public:
// (used as stride for advancing the char-based buffer pointers)
int vartypesize;
- // MPI datatype corresponding to CCTK vartype
- MPI_Datatype datatype;
-
// buffers for collective communications
struct collbufdesc {
// the sizes of communication buffers (in elements of type <vartype>)
@@ -115,17 +113,29 @@ public:
sendbuf(NULL), recvbuf(NULL),
sendbufbase(NULL), recvbufbase(NULL) {}
-// FIXME: why can't these be made private ??
-//private:
// the allocated communication buffers
char* sendbufbase;
char* recvbufbase;
};
vector<collbufdesc> collbufs; // [nprocs]
+ // flags indicating which receive buffers are ready to be emptied
+ vector<bool> recvbuffers_ready; // [nprocs]
+
+ // MPI datatype corresponding to CCTK vartype
+ MPI_Datatype datatype;
+
+ // lists of outstanding requests for posted send/recv communications
+ vector<MPI_Request> srequests; // [nprocs]
private:
- // Exchange pairs of send/recv buffers between all processors.
- void ExchangeBuffers();
+ vector<MPI_Request> rrequests; // [nprocs]
+
+ // number of posted and already completed receive communications
+ int num_posted_recvs;
+ int num_completed_recvs;
+
+ // wait for completion of posted collective buffer sends/receives
+ bool AllPostedCommunicationsFinished(bool use_waitall);
};
diff --git a/Carpet/CarpetLib/src/gdata.cc b/Carpet/CarpetLib/src/gdata.cc
index 670a08db3..7bab32fa5 100644
--- a/Carpet/CarpetLib/src/gdata.cc
+++ b/Carpet/CarpetLib/src/gdata.cc
@@ -158,9 +158,9 @@ void gdata::copy_from (comm_state& state,
}
break;
case state_empty_recv_buffers:
- // if this is a destination processor: copy its data from the recv buffer
- // (the processor-local case is not handled here)
- if (proc() == dist::rank() && src->proc() != proc()) {
+ // if this is a destination processor and data has already been received
+ // from the source processor: copy it from the recv buffer
+ if (proc() == dist::rank() && state.recvbuffers_ready.at(src->proc())) {
copy_from_recvbuffer (state, src, box);
}
break;
@@ -416,19 +416,28 @@ void gdata::copy_into_sendbuffer (comm_state& state,
// copy to remote processor
assert (src->_has_storage);
assert (src->_owns_storage);
- assert (state.collbufs.at(proc()).sendbuf -
- state.collbufs.at(proc()).sendbufbase <=
- (state.collbufs.at(proc()).sendbufsize - box.size()) *
- state.vartypesize);
+ assert (proc() < state.collbufs.size());
+ int fillstate = (state.collbufs[proc()].sendbuf +
+ box.size()*state.vartypesize) -
+ state.collbufs[proc()].sendbufbase;
+ assert (fillstate <= state.collbufs[proc()].sendbufsize*state.vartypesize);
// copy this processor's data into the send buffer
gdata* tmp = src->make_typed (varindex, transport_operator, tag);
- tmp->allocate (box, src->proc(), state.collbufs.at(proc()).sendbuf);
+ tmp->allocate (box, src->proc(), state.collbufs[proc()].sendbuf);
tmp->copy_from_innerloop (src, box);
delete tmp;
// advance send buffer to point to the next ibbox slot
- state.collbufs.at(proc()).sendbuf += state.vartypesize * box.size();
+ state.collbufs[proc()].sendbuf += state.vartypesize * box.size();
+
+ // post the send if the buffer is full
+ if (fillstate == state.collbufs[proc()].sendbufsize*state.vartypesize) {
+ MPI_Irsend (state.collbufs[proc()].sendbufbase,
+ state.collbufs[proc()].sendbufsize,
+ state.datatype, proc(), 0, dist::comm,
+ &state.srequests[proc()]);
+ }
}
}
@@ -438,18 +447,20 @@ void gdata::copy_into_sendbuffer (comm_state& state,
void gdata::copy_from_recvbuffer (comm_state& state,
const gdata* src, const ibbox& box)
{
- assert (state.collbufs.at(proc()).recvbuf -
- state.collbufs.at(proc()).recvbufbase <=
- (state.collbufs.at(proc()).recvbufsize-box.size()) * state.vartypesize);
+ assert (src->proc() < state.collbufs.size());
+ assert (state.collbufs[src->proc()].recvbuf -
+ state.collbufs[src->proc()].recvbufbase <=
+ (state.collbufs[src->proc()].recvbufsize-box.size()) *
+ state.vartypesize);
// copy this processor's data from the recv buffer
gdata* tmp = make_typed (varindex, transport_operator, tag);
- tmp->allocate (box, proc(), state.collbufs.at(src->proc()).recvbuf);
+ tmp->allocate (box, proc(), state.collbufs[src->proc()].recvbuf);
copy_from_innerloop (tmp, box);
delete tmp;
// advance recv buffer to point to the next ibbox slot
- state.collbufs.at(src->proc()).recvbuf += state.vartypesize * box.size();
+ state.collbufs[src->proc()].recvbuf += state.vartypesize * box.size();
}
@@ -523,9 +534,10 @@ void gdata
}
break;
case state_empty_recv_buffers:
- // if this is a destination processor: copy its data from the recv buffer
+ // if this is a destination processor and data has already been received
+ // from the source processor: copy it from the recv buffer
// (the processor-local case is not handled here)
- if (proc() == dist::rank() && srcs.at(0)->proc() != proc()) {
+ if (proc() == dist::rank() && state.recvbuffers_ready.at(srcs.at(0)->proc())) {
copy_from_recvbuffer (state, srcs.at(0), box);
}
break;
@@ -769,19 +781,28 @@ void gdata
// interpolate to remote processor
assert (srcs.at(0)->_has_storage);
assert (srcs.at(0)->_owns_storage);
- assert (state.collbufs.at(proc()).sendbuf -
- state.collbufs.at(proc()).sendbufbase <=
- (state.collbufs.at(proc()).sendbufsize - box.size()) *
- state.vartypesize);
+ assert (proc() < state.collbufs.size());
+ int fillstate = (state.collbufs[proc()].sendbuf +
+ box.size()*state.vartypesize) -
+ state.collbufs[proc()].sendbufbase;
+ assert (fillstate <= state.collbufs[proc()].sendbufsize*state.vartypesize);
// copy this processor's data into the send buffer
gdata* tmp = srcs.at(0)->make_typed (varindex, transport_operator, tag);
- tmp->allocate (box, srcs.at(0)->proc(), state.collbufs.at(proc()).sendbuf);
+ tmp->allocate (box, srcs.at(0)->proc(), state.collbufs[proc()].sendbuf);
tmp->interpolate_from_innerloop (srcs, times, box, time,
order_space, order_time);
delete tmp;
// advance send buffer to point to the next ibbox slot
- state.collbufs.at(proc()).sendbuf += state.vartypesize * box.size();
+ state.collbufs[proc()].sendbuf += state.vartypesize * box.size();
+
+ // post the send if the buffer is full
+ if (fillstate == state.collbufs[proc()].sendbufsize*state.vartypesize) {
+ MPI_Irsend (state.collbufs[proc()].sendbufbase,
+ state.collbufs[proc()].sendbufsize,
+ state.datatype, proc(), 0, dist::comm,
+ &state.srequests[proc()]);
+ }
}
}