diff options
author | Erik Schnetter <schnetter@cct.lsu.edu> | 2007-04-19 01:39:00 +0000 |
---|---|---|
committer | Erik Schnetter <schnetter@cct.lsu.edu> | 2007-04-19 01:39:00 +0000 |
commit | 242efeb8f500afb5fca33220d40e9930dfb55929 (patch) | |
tree | 75050282cf94e676a6bae54317b1c26820f68402 /Carpet | |
parent | 018b9711ff2f4fbc8451d5ccf645e86ae9ac4435 (diff) |
CarpetLib: Various commstate changes
Always use collective communication buffers in commstate class.
Add functions to reserve space in a commbuf, to get a pointer into the
space, and to commit space. This encapsulates using commbufs.
darcs-hash:20070419013946-dae7b-fce3d05b5e90fb37588939d1b11dce6d48ea2ead.gz
Diffstat (limited to 'Carpet')
-rw-r--r-- | Carpet/CarpetLib/src/commstate.cc | 509 | ||||
-rw-r--r-- | Carpet/CarpetLib/src/commstate.hh | 88 |
2 files changed, 308 insertions, 289 deletions
diff --git a/Carpet/CarpetLib/src/commstate.cc b/Carpet/CarpetLib/src/commstate.cc index b770a706a..34d75ac12 100644 --- a/Carpet/CarpetLib/src/commstate.cc +++ b/Carpet/CarpetLib/src/commstate.cc @@ -22,25 +22,16 @@ using namespace CarpetLib; // Communication state control comm_state::comm_state () { - // If CarpetLib::use_collective_communication_buffers is set to true, - // this comm_state object will use collective communications, - // i.e., it will step through - // state_get_buffer_sizes - // state_fill_send_buffers - // state_empty_recv_buffers - // - // If CarpetLib::use_collective_communication_buffers is false - // then individual communications on single components are used - // by stepping through - // state_post - // state_wait + // A comm_state object will step through + // state_get_buffer_sizes + // state_fill_send_buffers + // state_empty_recv_buffers DECLARE_CCTK_PARAMETERS; static Timer timer ("commstate::create"); timer.start (); - thestate = use_collective_communication_buffers ? - state_get_buffer_sizes : state_post; + thestate = state_get_buffer_sizes; typebufs.resize (dist::c_ndatatypes()); for (size_t type = 0; type < typebufs.size(); type++) { @@ -57,7 +48,6 @@ comm_state::comm_state () #include "instantiate" #undef INSTANTIATE - recvbuffers_ready.resize (dist::c_ndatatypes() * dist::size()); srequests.resize (dist::c_ndatatypes() * dist::size(), MPI_REQUEST_NULL); rrequests.resize (dist::c_ndatatypes() * dist::size(), MPI_REQUEST_NULL); @@ -72,165 +62,165 @@ void comm_state::step () total.start (); assert (thestate != state_done); switch (thestate) { - case state_post: - // After all sends/recvs have been posted in 'state_post' - // now wait for their completion in 'state_wait'. - thestate = state_wait; - break; - - case state_wait: - thestate = state_done; - break; - - case state_get_buffer_sizes: - // The sizes of the collective communication buffers are known - // so now allocate them. - // The receive operations are also posted here already - // (a clever MPI layer may take advantage of such early posting). - num_posted_recvs = num_completed_recvs = 0; - + + case state_get_buffer_sizes: + // The sizes of the collective communication buffers are known so + // now allocate them. + // The receive operations are also posted here already (a clever + // MPI layer may take advantage of such early posting). + num_posted_recvs = num_completed_recvs = 0; + + for (int proc1 = 0; proc1 < dist::size(); ++ proc1) { + size_t const proc = + interleave_communications + ? (proc1 + dist::rank()) % dist::size() + : proc1; + + for (size_t type = 0; type < typebufs.size(); type++) { + + // skip unused datatype buffers + if (not typebufs[type].in_use) continue; + + int datatypesize = typebufs[type].datatypesize; + procbufdesc& procbuf = typebufs[type].procbufs[proc]; + + procbuf.sendbufbase = new char[procbuf.sendbufsize*datatypesize]; + procbuf.recvbufbase = new char[procbuf.recvbufsize*datatypesize]; + // TODO: this may be a bit extreme, and it is only for + // internal consistency checking + if (poison_new_memory) { + memset (procbuf.sendbufbase, poison_value, + procbuf.sendbufsize*datatypesize); + memset (procbuf.recvbufbase, poison_value, + procbuf.recvbufsize*datatypesize); + } + procbuf.sendbuf = procbuf.sendbufbase; + procbuf.recvbuf = procbuf.recvbufbase; + + if (procbuf.recvbufsize > 0) { + static Timer timer ("commstate_sizes_irecv"); + timer.start (); + int const tag = + vary_tags + ? (dist::rank() + dist::size() * (proc + dist::size() * type)) % 32768 + : type; + MPI_Irecv (procbuf.recvbufbase, procbuf.recvbufsize, + typebufs[type].mpi_datatype, proc, tag, + dist::comm(), &rrequests[dist::size()*type + proc]); + timer.stop (0); + num_posted_recvs++; + } + } + } + + if (barrier_between_stages) { + // Add a barrier, to try to ensure that all Irecvs are posted + // before the first Isends are made + // (Alternative: Use MPI_Alltoallv instead) + MPI_Barrier (dist::comm()); + } + + // Now go and get the send buffers filled with data. + // Once a buffer is full it will be posted right away + // (see gdata::copy_into_sendbuffer() and + // gdata::interpolate_into_sendbuffer()). + thestate = state_fill_send_buffers; + break; + + case state_fill_send_buffers: + if (combine_sends) { + // Send the data. Do not send them sequentially, but try to + // intersperse the communications for (int proc1 = 0; proc1 < dist::size(); ++ proc1) { - size_t const proc = + int const proc = interleave_communications - ? (proc1 + dist::rank()) % dist::size() + ? (proc1 + dist::size() - dist::rank()) % dist::size() : proc1; for (size_t type = 0; type < typebufs.size(); type++) { - // skip unused datatype buffers if (not typebufs[type].in_use) continue; - - int datatypesize = typebufs[type].datatypesize; - procbufdesc& procbuf = typebufs[type].procbufs[proc]; - - procbuf.sendbufbase = new char[procbuf.sendbufsize*datatypesize]; - procbuf.recvbufbase = new char[procbuf.recvbufsize*datatypesize]; - // TODO: this may be a bit extreme, and it is only for - // internal consistency checking - if (poison_new_memory) { - memset (procbuf.sendbufbase, poison_value, procbuf.sendbufsize*datatypesize); - memset (procbuf.recvbufbase, poison_value, procbuf.recvbufsize*datatypesize); - } - procbuf.sendbuf = procbuf.sendbufbase; - procbuf.recvbuf = procbuf.recvbufbase; - - if (procbuf.recvbufsize > 0) { - static Timer timer ("commstate_sizes_irecv"); - timer.start (); - int const tag = - vary_tags - ? (dist::rank() + dist::size() * (proc + dist::size() * type)) % 32768 - : type; - MPI_Irecv (procbuf.recvbufbase, procbuf.recvbufsize, - typebufs[type].mpi_datatype, proc, tag, - dist::comm(), &rrequests[dist::size()*type + proc]); - timer.stop (0); - num_posted_recvs++; - } - } - } - - if (barrier_between_stages) { - // Add a barrier, to try to ensure that all Irecvs are posted - // before the first Isends are made - // (Alternative: Use MPI_Alltoallv instead) - MPI_Barrier (dist::comm()); - } - - // Now go and get the send buffers filled with data. - // Once a buffer is full it will be posted right away - // (see gdata::copy_into_sendbuffer() and - // gdata::interpolate_into_sendbuffer()). - thestate = state_fill_send_buffers; - break; - - case state_fill_send_buffers: - if (combine_sends) { - // Send the data. Do not send them sequentially, but try to - // intersperse the communications - for (int proc1 = 0; proc1 < dist::size(); ++ proc1) { - int const proc = - interleave_communications - ? (proc1 + dist::size() - dist::rank()) % dist::size() - : proc1; - - for (size_t type = 0; type < typebufs.size(); type++) { - // skip unused datatype buffers - if (not typebufs[type].in_use) continue; - int const datatypesize = typebufs[type].datatypesize; - procbufdesc const & procbuf = typebufs[type].procbufs[proc]; + int const datatypesize = typebufs[type].datatypesize; + procbufdesc const & procbuf = typebufs[type].procbufs[proc]; - int const fillstate = procbuf.sendbuf - procbuf.sendbufbase; - assert (fillstate == (int)procbuf.sendbufsize * datatypesize); + int const fillstate = procbuf.sendbuf - procbuf.sendbufbase; + assert (fillstate == (int)procbuf.sendbufsize * datatypesize); - if (procbuf.sendbufsize > 0) { - int const tag = - vary_tags - ? (proc + dist::size() * (dist::rank() + dist::size() * type)) % 32768 - : type; - if (use_mpi_send) { - // use MPI_Send - static Timer timer ("commstate_send"); - timer.start (); - MPI_Send (procbuf.sendbufbase, procbuf.sendbufsize, - typebufs[type].mpi_datatype, proc, tag, - dist::comm()); - srequests[dist::size()*type + proc] = MPI_REQUEST_NULL; - timer.stop (procbuf.sendbufsize * datatypesize); - } else if (use_mpi_ssend) { - // use MPI_Ssend - static Timer timer ("commstate_ssend"); - timer.start (); - MPI_Ssend (procbuf.sendbufbase, procbuf.sendbufsize, - typebufs[type].mpi_datatype, proc, tag, - dist::comm()); - srequests[dist::size()*type + proc] = MPI_REQUEST_NULL; - timer.stop (procbuf.sendbufsize * datatypesize); - } else { - // use MPI_Isend - static Timer timer ("commstate_isend"); - timer.start (); - MPI_Isend (procbuf.sendbufbase, procbuf.sendbufsize, - typebufs[type].mpi_datatype, proc, tag, - dist::comm(), &srequests[dist::size()*type + proc]); - timer.stop (procbuf.sendbufsize * datatypesize); - } + if (procbuf.sendbufsize > 0) { + int const tag = + vary_tags + ? (proc + dist::size() * (dist::rank() + dist::size() * type)) % 32768 + : type; + if (use_mpi_send) { + // use MPI_Send + static Timer timer ("commstate_send"); + timer.start (); + MPI_Send (procbuf.sendbufbase, procbuf.sendbufsize, + typebufs[type].mpi_datatype, proc, tag, + dist::comm()); + srequests[dist::size()*type + proc] = MPI_REQUEST_NULL; + timer.stop (procbuf.sendbufsize * datatypesize); + } else if (use_mpi_ssend) { + // use MPI_Ssend + static Timer timer ("commstate_ssend"); + timer.start (); + MPI_Ssend (procbuf.sendbufbase, procbuf.sendbufsize, + typebufs[type].mpi_datatype, proc, tag, + dist::comm()); + srequests[dist::size()*type + proc] = MPI_REQUEST_NULL; + timer.stop (procbuf.sendbufsize * datatypesize); + } else { + // use MPI_Isend + static Timer timer ("commstate_isend"); + timer.start (); + MPI_Isend (procbuf.sendbufbase, procbuf.sendbufsize, + typebufs[type].mpi_datatype, proc, tag, + dist::comm(), &srequests[dist::size()*type + proc]); + timer.stop (procbuf.sendbufsize * datatypesize); } + } - } // for type + } // for type - } // for proc - } - - // Now fall through to the next state in which the recv buffers - // are emptied as soon as data has arrived. - thestate = state_empty_recv_buffers; - - case state_empty_recv_buffers: - // Finish (at least one of) the posted communications - if (not AllPostedCommunicationsFinished ()) { - // No state change if there are still outstanding communications; - // do another comm_state loop iteration. - } else { - // Everything is done so release the collective communication buffers. - for (size_t type = 0; type < typebufs.size(); type++) { - for (size_t proc = 0; proc < typebufs[type].procbufs.size(); proc++) { - delete[] typebufs[type].procbufs[proc].sendbufbase; - delete[] typebufs[type].procbufs[proc].recvbufbase; - } + } // for proc + } + + // Now fall through to the next state in which the recv buffers + // are emptied as soon as data has arrived. + thestate = state_do_some_work; + break; + + case state_do_some_work: + // Now fall through to the next state in which the recv buffers + // are emptied as soon as data has arrived. + thestate = state_empty_recv_buffers; + + case state_empty_recv_buffers: + // Finish (at least one of) the posted communications + if (not AllPostedCommunicationsFinished ()) { + // No state change if there are still outstanding + // communications; do another comm_state loop iteration. + } else { + // Everything is done so release the collective communication buffers. + for (size_t type = 0; type < typebufs.size(); type++) { + for (size_t proc = 0; proc < typebufs[type].procbufs.size(); proc++) { + delete[] typebufs[type].procbufs[proc].sendbufbase; + delete[] typebufs[type].procbufs[proc].recvbufbase; } - thestate = state_done; } - break; - - default: - assert (0 && "invalid state"); + thestate = state_done; + } + break; + + default: + assert (0 && "invalid state"); } total.stop (0); } + bool comm_state::done () { return thestate == state_done; @@ -242,17 +232,14 @@ comm_state::~comm_state () DECLARE_CCTK_PARAMETERS; assert (thestate == state_done or - thestate == (use_collective_communication_buffers ? - state_get_buffer_sizes : state_post)); - assert (requests.empty()); + thestate == state_get_buffer_sizes); } // wait for completion of posted collective buffer sends/receives // -// Depending on the parameter CarpetLib::use_waitall, this function will wait -// for all (true) or at least one (false) of the posted receive operations -// to finish. +// This function will wait for all of the posted receive operations to +// finish. // // It returns true if all posted communications have been completed. bool comm_state::AllPostedCommunicationsFinished () @@ -292,115 +279,149 @@ bool comm_state::AllPostedCommunicationsFinished () return true; } - // reset completion flag for all receive buffers - recvbuffers_ready.assign (recvbuffers_ready.size(), false); - - if (use_waitall) { - // mark all posted recveive buffers as ready - for (size_t i = 0; i < recvbuffers_ready.size(); i++) { - recvbuffers_ready.AT(i) = rrequests.AT(i) != MPI_REQUEST_NULL; - } - - // wait for completion of all posted receive operations - if (reduce_mpi_waitall) { - size_t nreqs = 0; - for (size_t i=0; i<rrequests.size(); ++i) { - if (rrequests.AT(i) != MPI_REQUEST_NULL) { - ++nreqs; - } + // wait for completion of all posted receive operations + if (reduce_mpi_waitall) { + size_t nreqs = 0; + for (size_t i=0; i<rrequests.size(); ++i) { + if (rrequests.AT(i) != MPI_REQUEST_NULL) { + ++nreqs; } - vector<MPI_Request> reqs(nreqs); - nreqs = 0; - for (size_t i=0; i<rrequests.size(); ++i) { - if (rrequests.AT(i) != MPI_REQUEST_NULL) { - reqs.AT(nreqs) = rrequests.AT(i); - ++nreqs; - } + } + vector<MPI_Request> reqs(nreqs); + nreqs = 0; + for (size_t i=0; i<rrequests.size(); ++i) { + if (rrequests.AT(i) != MPI_REQUEST_NULL) { + reqs.AT(nreqs) = rrequests.AT(i); + ++nreqs; } - assert (nreqs == reqs.size()); - static Timer timer ("commstate_waitall"); - timer.start (); - MPI_Waitall (reqs.size(), &reqs.front(), MPI_STATUSES_IGNORE); - timer.stop (0); - } else { - static Timer timer ("commstate_waitall"); - timer.start (); - MPI_Waitall (rrequests.size(), &rrequests.front(), MPI_STATUSES_IGNORE); - timer.stop (0); } - num_completed_recvs = num_posted_recvs; + assert (nreqs == reqs.size()); + static Timer timer ("commstate_waitall"); + timer.start (); + MPI_Waitall (reqs.size(), &reqs.front(), MPI_STATUSES_IGNORE); + timer.stop (0); } else { - int num_completed_recvs_ = 0; - vector<int> completed_recvs(rrequests.size(), -1); - - // wait for completion of at least one posted receive operation - static Timer timer ("commstate_waitsome"); + static Timer timer ("commstate_waitall"); timer.start (); - MPI_Waitsome (rrequests.size(), &rrequests.front(), &num_completed_recvs_, - &completed_recvs.front(), MPI_STATUSES_IGNORE); + MPI_Waitall (rrequests.size(), &rrequests.front(), MPI_STATUSES_IGNORE); timer.stop (0); - assert (0 < num_completed_recvs_); - num_completed_recvs += num_completed_recvs_; - - // mark the recveive buffers of completed communications as ready - for (int i = 0; i < num_completed_recvs_; i++) { - assert (rrequests.AT(completed_recvs.AT(i)) == MPI_REQUEST_NULL); - recvbuffers_ready.AT(completed_recvs.AT(i)) = true; - } } - - return (false); + num_completed_recvs = num_posted_recvs; + + return false; } -template<typename T> -comm_state::commbuf<T>::commbuf (ibbox const & box) + +void +comm_state:: +reserve_send_space (unsigned int const type, + int const proc, + int const npoints) { - data.resize (prod (box.shape() / box.stride())); + assert (type < dist::c_ndatatypes()); + assert (proc >= 0 and proc < dist::size()); + assert (npoints >= 0); + typebufdesc & typebuf = typebufs.AT(type); + procbufdesc & procbuf = typebuf.procbufs.AT(proc); + procbuf.sendbufsize += npoints; + typebuf.in_use = true; } -template<typename T> -comm_state::commbuf<T>::~commbuf () +void +comm_state:: +reserve_recv_space (unsigned int const type, + int const proc, + int const npoints) { + assert (type < dist::c_ndatatypes()); + assert (proc >= 0 and proc < dist::size()); + assert (npoints >= 0); + typebufdesc & typebuf = typebufs.AT(type); + procbufdesc & procbuf = typebuf.procbufs.AT(proc); + procbuf.recvbufsize += npoints; + typebuf.in_use = true; } -template<typename T> -void const * -comm_state::commbuf<T>::pointer () - const +void * +comm_state:: +send_buffer (unsigned int const type, + int const proc, + int const npoints) { - return &data.front(); + assert (type < dist::c_ndatatypes()); + assert (proc >= 0 and proc < dist::size()); + typebufdesc const & typebuf = typebufs.AT(type); + procbufdesc const & procbuf = typebuf.procbufs.AT(proc); + + assert (procbuf.sendbuf + npoints * typebuf.datatypesize <= + procbuf.sendbufbase + procbuf.sendbufsize * typebuf.datatypesize); + + return procbuf.sendbuf; } -template<typename T> void * -comm_state::commbuf<T>::pointer () +comm_state:: +recv_buffer (unsigned int const type, + int const proc, + int const npoints) { - return &data.front(); + assert (type < dist::c_ndatatypes()); + assert (proc >= 0 and proc < dist::size()); + typebufdesc const & typebuf = typebufs.AT(type); + procbufdesc const & procbuf = typebuf.procbufs.AT(proc); + + assert (procbuf.recvbuf + npoints * typebuf.datatypesize <= + procbuf.recvbufbase + procbuf.recvbufsize * typebuf.datatypesize); + + return procbuf.recvbuf; } -template<typename T> -int -comm_state::commbuf<T>::size () - const +void +comm_state:: +commit_send_space (unsigned int const type, + int const proc, + int const npoints) { - return data.size(); + DECLARE_CCTK_PARAMETERS; + + assert (type < dist::c_ndatatypes()); + assert (proc >= 0 and proc < dist::size()); + assert (npoints >= 0); + typebufdesc & typebuf = typebufs.AT(type); + procbufdesc & procbuf = typebuf.procbufs.AT(proc); + procbuf.sendbuf += npoints * typebuf.datatypesize; + assert (procbuf.sendbuf <= + procbuf.sendbufbase + procbuf.sendbufsize * typebuf.datatypesize); + + if (not combine_sends) { + // post the send if the buffer is full + if (procbuf.sendbuf == + procbuf.sendbufbase + procbuf.sendbufsize * typebuf.datatypesize) + { + static Timer timer ("commit_send_space::isend"); + timer.start (); + MPI_Isend (procbuf.sendbufbase, + procbuf.sendbufsize, typebuf.mpi_datatype, + proc, type, dist::comm(), + & srequests.AT(type * dist::size() + proc)); + timer.stop (procbuf.sendbufsize * typebuf.datatypesize); + } + } } -template<typename T> -MPI_Datatype -comm_state::commbuf<T>::datatype () - const +void +comm_state:: +commit_recv_space (unsigned int const type, + int const proc, + int const npoints) { - T dummy; - return dist::datatype (dummy); + assert (type < dist::c_ndatatypes()); + assert (proc >= 0 and proc < dist::size()); + assert (npoints >= 0); + typebufdesc & typebuf = typebufs.AT(type); + procbufdesc & procbuf = typebuf.procbufs.AT(proc); + procbuf.recvbuf += npoints * typebuf.datatypesize; + assert (procbuf.recvbuf <= + procbuf.recvbufbase + procbuf.recvbufsize * typebuf.datatypesize); } - - - -#define INSTANTIATE(T) \ - template class comm_state::commbuf<T>; - -#include "instantiate" - -#undef INSTANTIATE diff --git a/Carpet/CarpetLib/src/commstate.hh b/Carpet/CarpetLib/src/commstate.hh index fcce6f201..e2e90ccc8 100644 --- a/Carpet/CarpetLib/src/commstate.hh +++ b/Carpet/CarpetLib/src/commstate.hh @@ -6,24 +6,26 @@ #include <mpi.h> +#include "cctk_Parameters.h" + #include "dist.hh" +#include "timestat.hh" using namespace std; +using namespace CarpetLib; -// State information for communications -// -// Depending on how a comm state object was created, -// it will step through one of two state transitions (in the given order): -enum astate { - // these are used for collective communications - state_get_buffer_sizes, state_fill_send_buffers, state_empty_recv_buffers, - // these are used for communications on individual components - state_post, state_wait, +// State information for communications - // all transition graphs must end with here - state_done +// A comm state object will step through the state transitions in the +// given order: +enum astate { + state_get_buffer_sizes, + state_fill_send_buffers, + state_do_some_work, + state_empty_recv_buffers, + state_done, }; struct comm_state { @@ -42,37 +44,6 @@ private: public: ////////////////////////////////////////////////////////////////////////// - // the following members are used for single-component communications - ////////////////////////////////////////////////////////////////////////// - - // List of MPI requests for use_waitall - vector<MPI_Request> requests; - - // Lists of communication buffers for use_lightweight_buffers - struct gcommbuf { - gcommbuf () {}; - virtual ~gcommbuf () {}; - MPI_Request request; - virtual void const * pointer () const = 0; - virtual void * pointer () = 0; - virtual int size () const = 0; - virtual MPI_Datatype datatype () const = 0; - }; - - template<typename T> - struct commbuf : gcommbuf { - commbuf (ibbox const & box); - virtual ~commbuf (); - virtual void const * pointer () const; - virtual void * pointer (); - virtual int size () const; - virtual MPI_Datatype datatype () const; - vector<T> data; - }; - queue<gcommbuf*> recvbufs, sendbufs; - - - ////////////////////////////////////////////////////////////////////////// // the following members are used for collective communications ////////////////////////////////////////////////////////////////////////// @@ -125,12 +96,39 @@ public: // list of datatype buffers vector<typebufdesc> typebufs; // [dist::c_ndatatypes()] - // flags indicating which receive buffers are ready to be emptied - vector<bool> recvbuffers_ready; // [dist::size() * dist::c_ndatatypes()] + void + reserve_send_space (unsigned int type, + int proc, + int npoints); + + void + reserve_recv_space (unsigned int type, + int proc, + int npoints); + + void * + send_buffer (unsigned int type, + int proc, + int npoints); + void * + recv_buffer (unsigned int type, + int proc, + int npoints); + + void + commit_send_space (unsigned int type, + int proc, + int npoints); + + void + commit_recv_space (unsigned int type, + int proc, + int npoints); + +private: // lists of outstanding requests for posted send/recv communications vector<MPI_Request> srequests; // [dist::size() * dist::c_ndatatypes()] -private: vector<MPI_Request> rrequests; // [dist::size() * dist::c_ndatatypes()] // number of posted and already completed receive communications |