diff options
author | Thomas Radke <tradke@aei.mpg.de> | 2005-04-11 12:22:00 +0000 |
---|---|---|
committer | Thomas Radke <tradke@aei.mpg.de> | 2005-04-11 12:22:00 +0000 |
commit | 5386023def644841cad93ade7380e088faecb0f3 (patch) | |
tree | 5968b02987063f38d8c789d5f08222d2119f3383 /Carpet | |
parent | 9e4e8bcf147a2c84b40a6836e3e1a27bd772d6de (diff) |
CarpetLib: some optimisations for the collective buffers communication scheme
* Receive operations are posted earlier now (don't wait until send buffers
are filled).
* A send operation is posted as soon as its send buffer is full (don't wait
until all send buffers have been filled).
* MPI_Irsend() is used instead of MPI_Isend()
This probably doesn't make a difference with most MPI implementations.
* Use MPI_Waitsome() to allow for overlapping of communication and computation
to some extent: data from already finished receive operations can be
copied back while active receive operations are still going on.
MPI_Waitsome() is now called (instead of MPI_Waitall()) to wait for
(one or more) posted receive operations to finish. The receive buffers
for those operations are then flagged as ready for data copying.
The drawback of this overlapping communication/computation scheme is
that the comm_state loop may be iterated more often now. My benchmarks on
up to 16 processors showed no performance win compared to using MPI_Waitall()
(in fact, the performance decreased). Maybe it performs better on larger
numbers of processors when there is more potential for network congestion.
The feature can be turned on/off by setting CarpetLib::use_waitall to yes/no.
For now I recommend using CarpetLib::use_waitall = "yes" (which is not the
default setting).
darcs-hash:20050411122235-776a0-e4f4179f46fce120572231b19cacb69c940f7b82.gz
Diffstat (limited to 'Carpet')
-rw-r--r-- | Carpet/CarpetLib/src/commstate.cc | 118 | ||||
-rw-r--r-- | Carpet/CarpetLib/src/commstate.hh | 24 | ||||
-rw-r--r-- | Carpet/CarpetLib/src/gdata.cc | 65 |
3 files changed, 137 insertions, 70 deletions
diff --git a/Carpet/CarpetLib/src/commstate.cc b/Carpet/CarpetLib/src/commstate.cc index 2a720f059..874a2a661 100644 --- a/Carpet/CarpetLib/src/commstate.cc +++ b/Carpet/CarpetLib/src/commstate.cc @@ -45,6 +45,10 @@ comm_state::comm_state (int vartype_) : vartype(vartype_) default: assert (0); } collbufs.resize (dist::size()); + rrequests.resize (dist::size(), MPI_REQUEST_NULL); + srequests.resize (dist::size(), MPI_REQUEST_NULL); + recvbuffers_ready.resize (dist::size()); + } else { thestate = state_recv; } @@ -91,43 +95,51 @@ void comm_state::step () case state_get_buffer_sizes: // The sizes of the collective communication buffers are known - // so now allocate the send buffers. - for (int i = 0; i < collbufs.size(); i++) { -//fprintf (stderr, "proc %d: allocating buf[%d] = %d elems\n", dist::rank(), i, collbufs[i].sendbufsize); + // so now allocate them. + // The receive operations are also posted here already + // so that we can use ready mode sends (MPI_Irsend()) later. + num_posted_recvs = num_completed_recvs = 0; + + for (size_t i = 0; i < collbufs.size(); i++) { collbufs[i].sendbufbase = new char[collbufs[i].sendbufsize*vartypesize]; + collbufs[i].recvbufbase = new char[collbufs[i].recvbufsize*vartypesize]; collbufs[i].sendbuf = collbufs[i].sendbufbase; + collbufs[i].recvbuf = collbufs[i].recvbufbase; + + if (collbufs[i].recvbufsize > 0) { + MPI_Irecv (collbufs[i].recvbufbase, collbufs[i].recvbufsize, + datatype, i, MPI_ANY_TAG, dist::comm, &rrequests[i]); + num_posted_recvs++; + } } // Now go and get the send buffers filled with data. + // Once a buffer is full it will get posted right away + // (see gdata::copy_into_sendbuffer() and + // gdata::interpolate_into_sendbuffer()). thestate = state_fill_send_buffers; break; case state_fill_send_buffers: - // The send buffers contain all the data so now allocate the recv buffers. - for (int i = 0; i < collbufs.size(); i++) { - collbufs[i].recvbufbase = new char[collbufs[i].recvbufsize*vartypesize]; - collbufs[i].recvbuf = collbufs[i].recvbufbase; - } - - // Exchange pairs of send/recv buffers between all processors. - ExchangeBuffers (); - - // The send buffers are not needed anymore. - for (int i = 0; i < collbufs.size(); i++) { - delete[] collbufs[i].sendbufbase; - } - - // All data has been received so now go and empty the recv buffers. + // Now fall through to the next state in which the recv buffers + // are emptied as soon as data has arrived. thestate = state_empty_recv_buffers; - break; + case state_empty_recv_buffers: - // Finally release the recv buffers. - for (int i = 0; i < collbufs.size(); i++) { - delete[] collbufs[i].recvbufbase; + // Finish (at least one of) the posted communications + if (! AllPostedCommunicationsFinished (use_waitall)) { + // No state change if there are still outstanding communications; + // do another comm_state loop iteration. + } else { + // Everything is done so release the collective communication buffers. + for (size_t i = 0; i < collbufs.size(); i++) { + delete[] collbufs[i].sendbufbase; + delete[] collbufs[i].recvbufbase; + } + thestate = state_done; } - - thestate = state_done; break; + case state_done: assert (0); default: @@ -152,30 +164,54 @@ comm_state::~comm_state () } -// exchange the collective communication buffers between processors -void comm_state::ExchangeBuffers () +// wait for completion of posted collective buffer sends/receives +// +// Depending on use_waitall, this function will wait for all at once (true) +// or at least one (false) of the posted receive operations to finish. +// +// It returns true if all posted communications have been completed. +bool comm_state::AllPostedCommunicationsFinished (bool use_waitall) { - vector<MPI_Request> rrequests(collbufs.size(), MPI_REQUEST_NULL); - vector<MPI_Request> srequests(collbufs.size(), MPI_REQUEST_NULL); + // check if all outstanding receives have been completed already + if (num_posted_recvs == num_completed_recvs) { + // finalize the outstanding sends in one go + MPI_Waitall (srequests.size(), &srequests.front(), MPI_STATUSES_IGNORE); - // loop over all buffers (alias processors) and initiate the communications - for (int proc = 0; proc < collbufs.size(); proc++) { + return true; + } + + // reset completion flag for all receive buffers + for (size_t i = 0; i < recvbuffers_ready.size(); i++) { + recvbuffers_ready[i] = false; + } - // don't send zero-sized messages (this covers 'proc == rank') - if (collbufs[proc].recvbufsize > 0) { - MPI_Irecv (collbufs[proc].recvbufbase, collbufs[proc].recvbufsize, - datatype, proc, MPI_ANY_TAG, dist::comm, &rrequests[proc]); + if (use_waitall) { + // mark all posted recveive buffers as ready + for (size_t i = 0; i < rrequests.size(); i++) { + recvbuffers_ready[i] = rrequests[i] != MPI_REQUEST_NULL; } - if (collbufs[proc].sendbufsize > 0) { - MPI_Isend (collbufs[proc].sendbufbase, collbufs[proc].sendbufsize, - datatype, proc, 0, dist::comm, &srequests[proc]); + + // wait for completion of all posted receive operations + MPI_Waitall (rrequests.size(), &rrequests.front(), MPI_STATUSES_IGNORE); + num_completed_recvs = num_posted_recvs; + } else { + int num_completed_recvs_ = 0; + vector<int> completed_recvs(rrequests.size(), -1); + + // wait for completion of at least one posted receive operation + MPI_Waitsome (rrequests.size(), &rrequests.front(), &num_completed_recvs_, + &completed_recvs.front(), MPI_STATUSES_IGNORE); + assert (0 < num_completed_recvs_); + num_completed_recvs += num_completed_recvs_; + + // mark the recveive buffers of completed communications as ready + for (int i = 0; i < num_completed_recvs_; i++) { + assert (rrequests.at(completed_recvs.at(i)) == MPI_REQUEST_NULL); + recvbuffers_ready.at(completed_recvs.at(i)) = true; } } - // finalize the outstanding communications - // FIXME: use MPI_Waitsome() on rrequests - MPI_Waitall (rrequests.size(), &rrequests.front(), MPI_STATUSES_IGNORE); - MPI_Waitall (srequests.size(), &srequests.front(), MPI_STATUSES_IGNORE); + return (false); } diff --git a/Carpet/CarpetLib/src/commstate.hh b/Carpet/CarpetLib/src/commstate.hh index 9c5239b21..21c05320e 100644 --- a/Carpet/CarpetLib/src/commstate.hh +++ b/Carpet/CarpetLib/src/commstate.hh @@ -90,6 +90,7 @@ public: // the following members are used for collective communications ////////////////////////////////////////////////////////////////////////// +public: // CCTK vartype used for this comm_state object int vartype; @@ -97,9 +98,6 @@ public: // (used as stride for advancing the char-based buffer pointers) int vartypesize; - // MPI datatype corresponding to CCTK vartype - MPI_Datatype datatype; - // buffers for collective communications struct collbufdesc { // the sizes of communication buffers (in elements of type <vartype>) @@ -115,17 +113,29 @@ public: sendbuf(NULL), recvbuf(NULL), sendbufbase(NULL), recvbufbase(NULL) {} -// FIXME: why can't these be made private ?? -//private: // the allocated communication buffers char* sendbufbase; char* recvbufbase; }; vector<collbufdesc> collbufs; // [nprocs] + // flags indicating which receive buffers are ready to be emptied + vector<bool> recvbuffers_ready; // [nprocs] + + // MPI datatype corresponding to CCTK vartype + MPI_Datatype datatype; + + // lists of outstanding requests for posted send/recv communications + vector<MPI_Request> srequests; // [nprocs] private: - // Exchange pairs of send/recv buffers between all processors. - void ExchangeBuffers(); + vector<MPI_Request> rrequests; // [nprocs] + + // number of posted and already completed receive communications + int num_posted_recvs; + int num_completed_recvs; + + // wait for completion of posted collective buffer sends/receives + bool AllPostedCommunicationsFinished(bool use_waitall); }; diff --git a/Carpet/CarpetLib/src/gdata.cc b/Carpet/CarpetLib/src/gdata.cc index 670a08db3..7bab32fa5 100644 --- a/Carpet/CarpetLib/src/gdata.cc +++ b/Carpet/CarpetLib/src/gdata.cc @@ -158,9 +158,9 @@ void gdata::copy_from (comm_state& state, } break; case state_empty_recv_buffers: - // if this is a destination processor: copy its data from the recv buffer - // (the processor-local case is not handled here) - if (proc() == dist::rank() && src->proc() != proc()) { + // if this is a destination processor and data has already been received + // from the source processor: copy it from the recv buffer + if (proc() == dist::rank() && state.recvbuffers_ready.at(src->proc())) { copy_from_recvbuffer (state, src, box); } break; @@ -416,19 +416,28 @@ void gdata::copy_into_sendbuffer (comm_state& state, // copy to remote processor assert (src->_has_storage); assert (src->_owns_storage); - assert (state.collbufs.at(proc()).sendbuf - - state.collbufs.at(proc()).sendbufbase <= - (state.collbufs.at(proc()).sendbufsize - box.size()) * - state.vartypesize); + assert (proc() < state.collbufs.size()); + int fillstate = (state.collbufs[proc()].sendbuf + + box.size()*state.vartypesize) - + state.collbufs[proc()].sendbufbase; + assert (fillstate <= state.collbufs[proc()].sendbufsize*state.vartypesize); // copy this processor's data into the send buffer gdata* tmp = src->make_typed (varindex, transport_operator, tag); - tmp->allocate (box, src->proc(), state.collbufs.at(proc()).sendbuf); + tmp->allocate (box, src->proc(), state.collbufs[proc()].sendbuf); tmp->copy_from_innerloop (src, box); delete tmp; // advance send buffer to point to the next ibbox slot - state.collbufs.at(proc()).sendbuf += state.vartypesize * box.size(); + state.collbufs[proc()].sendbuf += state.vartypesize * box.size(); + + // post the send if the buffer is full + if (fillstate == state.collbufs[proc()].sendbufsize*state.vartypesize) { + MPI_Irsend (state.collbufs[proc()].sendbufbase, + state.collbufs[proc()].sendbufsize, + state.datatype, proc(), 0, dist::comm, + &state.srequests[proc()]); + } } } @@ -438,18 +447,20 @@ void gdata::copy_into_sendbuffer (comm_state& state, void gdata::copy_from_recvbuffer (comm_state& state, const gdata* src, const ibbox& box) { - assert (state.collbufs.at(proc()).recvbuf - - state.collbufs.at(proc()).recvbufbase <= - (state.collbufs.at(proc()).recvbufsize-box.size()) * state.vartypesize); + assert (src->proc() < state.collbufs.size()); + assert (state.collbufs[src->proc()].recvbuf - + state.collbufs[src->proc()].recvbufbase <= + (state.collbufs[src->proc()].recvbufsize-box.size()) * + state.vartypesize); // copy this processor's data from the recv buffer gdata* tmp = make_typed (varindex, transport_operator, tag); - tmp->allocate (box, proc(), state.collbufs.at(src->proc()).recvbuf); + tmp->allocate (box, proc(), state.collbufs[src->proc()].recvbuf); copy_from_innerloop (tmp, box); delete tmp; // advance recv buffer to point to the next ibbox slot - state.collbufs.at(src->proc()).recvbuf += state.vartypesize * box.size(); + state.collbufs[src->proc()].recvbuf += state.vartypesize * box.size(); } @@ -523,9 +534,10 @@ void gdata } break; case state_empty_recv_buffers: - // if this is a destination processor: copy its data from the recv buffer + // if this is a destination processor and data has already been received + // from the source processor: copy it from the recv buffer // (the processor-local case is not handled here) - if (proc() == dist::rank() && srcs.at(0)->proc() != proc()) { + if (proc() == dist::rank() && state.recvbuffers_ready.at(srcs.at(0)->proc())) { copy_from_recvbuffer (state, srcs.at(0), box); } break; @@ -769,19 +781,28 @@ void gdata // interpolate to remote processor assert (srcs.at(0)->_has_storage); assert (srcs.at(0)->_owns_storage); - assert (state.collbufs.at(proc()).sendbuf - - state.collbufs.at(proc()).sendbufbase <= - (state.collbufs.at(proc()).sendbufsize - box.size()) * - state.vartypesize); + assert (proc() < state.collbufs.size()); + int fillstate = (state.collbufs[proc()].sendbuf + + box.size()*state.vartypesize) - + state.collbufs[proc()].sendbufbase; + assert (fillstate <= state.collbufs[proc()].sendbufsize*state.vartypesize); // copy this processor's data into the send buffer gdata* tmp = srcs.at(0)->make_typed (varindex, transport_operator, tag); - tmp->allocate (box, srcs.at(0)->proc(), state.collbufs.at(proc()).sendbuf); + tmp->allocate (box, srcs.at(0)->proc(), state.collbufs[proc()].sendbuf); tmp->interpolate_from_innerloop (srcs, times, box, time, order_space, order_time); delete tmp; // advance send buffer to point to the next ibbox slot - state.collbufs.at(proc()).sendbuf += state.vartypesize * box.size(); + state.collbufs[proc()].sendbuf += state.vartypesize * box.size(); + + // post the send if the buffer is full + if (fillstate == state.collbufs[proc()].sendbufsize*state.vartypesize) { + MPI_Irsend (state.collbufs[proc()].sendbufbase, + state.collbufs[proc()].sendbufsize, + state.datatype, proc(), 0, dist::comm, + &state.srequests[proc()]); + } } } |