diff options
author | Thomas Radke <tradke@aei.mpg.de> | 2005-08-15 15:00:00 +0000 |
---|---|---|
committer | Thomas Radke <tradke@aei.mpg.de> | 2005-08-15 15:00:00 +0000 |
commit | b3405326ebf651b20b4c44423df62ef23a1bf8f2 (patch) | |
tree | 3f28fb7697589ff69ab3ac283cd0064dba340522 /Carpet/CarpetLib/src/commstate.cc | |
parent | 8493c61f465169c3d52b53b5023680a0d33f898c (diff) |
Carpet*: generalise the comm_state class for collective buffer communications
CarpetLib's comm_state class (actually, it's still just a struct) has been
extended to handle collective buffer communications for all possible C datatypes
at the same time. This makes it unnecessary for the higher-level communication
routines to loop over each individual datatype separately.
darcs-hash:20050815150023-776a0-dddc1aca7ccaebae872f9f451b2c3595cd951fed.gz
Diffstat (limited to 'Carpet/CarpetLib/src/commstate.cc')
-rw-r--r-- | Carpet/CarpetLib/src/commstate.cc | 145 |
1 files changed, 77 insertions, 68 deletions
diff --git a/Carpet/CarpetLib/src/commstate.cc b/Carpet/CarpetLib/src/commstate.cc index be3e485a2..1bd9a2a8c 100644 --- a/Carpet/CarpetLib/src/commstate.cc +++ b/Carpet/CarpetLib/src/commstate.cc @@ -10,54 +10,61 @@ // Communication state control -comm_state::comm_state (int vartype_) : vartype(vartype_) +comm_state::comm_state () { - // If this comm state is created with a valid (ie. non-negative) CCTK vartype - // then it is assumed to be used in collective communications for that type - // later on, ie. it will step through + // If CarpetLib::use_collective_communication_buffers is set to true, + // this comm_state object will use collective communications, + // ie. it will step through // state_get_buffer_sizes // state_fill_send_buffers // state_empty_recv_buffers - // This path needs the size of the vartype, the corresponding MPI datatype - // and the collective communication buffers initialized also. // - // If this comm state is created with a negative vartype then individual - // communications on single components are used by stepping through + // If CarpetLib::use_collective_communication_buffers is false + // then individual communications on single components are used + // by stepping through // state_post // state_wait DECLARE_CCTK_PARAMETERS; - uses_collective_communication_buffers = - use_collective_communication_buffers && vartype >= 0; + thestate = use_collective_communication_buffers ? + state_get_buffer_sizes : state_post; - if (uses_collective_communication_buffers) { - vartypesize = CCTK_VarTypeSize (vartype); - assert (vartypesize > 0); - - thestate = state_get_buffer_sizes; - switch (vartype) { -#define TYPECASE(N,T) \ - case N: { T dummy; datatype = dist::datatype(dummy); } break; -#include "carpet_typecase.hh" -#undef TYPECASE - default: assert (0 && "invalid datatype"); - } - collbufs.resize (dist::size()); - rrequests.resize (dist::size(), MPI_REQUEST_NULL); - srequests.resize (dist::size(), MPI_REQUEST_NULL); - recvbuffers_ready.resize (dist::size()); + typebufs.resize (dist::c_ndatatypes()); + for (int type = 0; type < typebufs.size(); type++) { + typebufs[type].procbufs.resize(dist::size()); + } - } else { - thestate = state_post; +#define INSTANTIATE(T) \ + { \ + T dummy; \ + int type = dist::c_datatype (dummy); \ + typebufs.at(type).datatypesize = sizeof (dummy); \ + typebufs.at(type).mpi_datatype = dist::datatype (dummy); \ } +#include "instantiate" +#undef INSTANTIATE + + recvbuffers_ready.resize (dist::c_ndatatypes() * dist::size()); + srequests.resize (dist::c_ndatatypes() * dist::size(), MPI_REQUEST_NULL); + rrequests.resize (dist::c_ndatatypes() * dist::size(), MPI_REQUEST_NULL); } + void comm_state::step () { assert (thestate != state_done); - if (uses_collective_communication_buffers) { - switch (thestate) { + switch (thestate) { + case state_post: + // After all sends/recvs have been posted in 'state_post' + // now wait for their completion in 'state_wait'. + thestate = state_wait; + break; + + case state_wait: + thestate = state_done; + break; + case state_get_buffer_sizes: // The sizes of the collective communication buffers are known // so now allocate them. @@ -65,21 +72,33 @@ void comm_state::step () // (a clever MPI layer may take advantage of such early posting). num_posted_recvs = num_completed_recvs = 0; - for (size_t i = 0; i < collbufs.size(); i++) { - collbufs[i].sendbufbase = new char[collbufs[i].sendbufsize*vartypesize]; - collbufs[i].recvbufbase = new char[collbufs[i].recvbufsize*vartypesize]; - collbufs[i].sendbuf = collbufs[i].sendbufbase; - collbufs[i].recvbuf = collbufs[i].recvbufbase; + for (int type = 0; type < typebufs.size(); type++) { + + // skip unused datatype buffers + if (not typebufs[type].in_use) { + continue; + } - if (collbufs[i].recvbufsize > 0) { - MPI_Irecv (collbufs[i].recvbufbase, collbufs[i].recvbufsize, - datatype, i, MPI_ANY_TAG, dist::comm, &rrequests[i]); - num_posted_recvs++; + int& datatypesize = typebufs[type].datatypesize; + for (int proc = 0; proc < typebufs[type].procbufs.size(); proc++) { + procbufdesc& procbuf = typebufs[type].procbufs[proc]; + + procbuf.sendbufbase = new char[procbuf.sendbufsize*datatypesize]; + procbuf.recvbufbase = new char[procbuf.recvbufsize*datatypesize]; + procbuf.sendbuf = procbuf.sendbufbase; + procbuf.recvbuf = procbuf.recvbufbase; + + if (procbuf.recvbufsize > 0) { + MPI_Irecv (procbuf.recvbufbase, procbuf.recvbufsize, + typebufs[type].mpi_datatype, proc, type, + dist::comm, &rrequests[dist::size()*type + proc]); + num_posted_recvs++; + } } } // Now go and get the send buffers filled with data. - // Once a buffer is full it will get posted right away + // Once a buffer is full it will be posted right away // (see gdata::copy_into_sendbuffer() and // gdata::interpolate_into_sendbuffer()). thestate = state_fill_send_buffers; @@ -92,14 +111,16 @@ void comm_state::step () case state_empty_recv_buffers: // Finish (at least one of) the posted communications - if (! AllPostedCommunicationsFinished ()) { + if (not AllPostedCommunicationsFinished ()) { // No state change if there are still outstanding communications; // do another comm_state loop iteration. } else { // Everything is done so release the collective communication buffers. - for (size_t i = 0; i < collbufs.size(); i++) { - delete[] collbufs[i].sendbufbase; - delete[] collbufs[i].recvbufbase; + for (int type = 0; type < typebufs.size(); type++) { + for (int proc = 0; proc < typebufs[type].procbufs.size(); proc++) { + delete[] typebufs[type].procbufs[proc].sendbufbase; + delete[] typebufs[type].procbufs[proc].recvbufbase; + } } thestate = state_done; } @@ -107,33 +128,23 @@ void comm_state::step () default: assert (0 && "invalid state"); - } - } else { - switch (thestate) { - case state_post: - // After all sends/recvs have been posted in 'state_post' - // now wait for their completion in 'state_wait'. - thestate = state_wait; - break; - case state_wait: - thestate = state_done; - break; - default: - assert (0 && "invalid state"); - } } } + bool comm_state::done () { return thestate == state_done; } + comm_state::~comm_state () { - assert (thestate == state_done || - thestate == uses_collective_communication_buffers ? - state_get_buffer_sizes : state_post); + DECLARE_CCTK_PARAMETERS; + + assert (thestate == state_done or + thestate == (use_collective_communication_buffers ? + state_get_buffer_sizes : state_post)); assert (requests.empty()); } @@ -141,14 +152,14 @@ comm_state::~comm_state () // wait for completion of posted collective buffer sends/receives // // Depending on the parameter CarpetLib::use_waitall, this function will wait -// for all at once (true) or at least one (false) of the -// posted receive operations to finish. +// for all (true) or at least one (false) of the posted receive operations +// to finish. // // It returns true if all posted communications have been completed. bool comm_state::AllPostedCommunicationsFinished () { DECLARE_CCTK_PARAMETERS; - + // check if all outstanding receives have been completed already if (num_posted_recvs == num_completed_recvs) { // finalize the outstanding sends in one go @@ -158,13 +169,11 @@ bool comm_state::AllPostedCommunicationsFinished () } // reset completion flag for all receive buffers - for (size_t i = 0; i < recvbuffers_ready.size(); i++) { - recvbuffers_ready[i] = false; - } + recvbuffers_ready.assign (recvbuffers_ready.size(), false); if (use_waitall) { // mark all posted recveive buffers as ready - for (size_t i = 0; i < rrequests.size(); i++) { + for (size_t i = 0; i < recvbuffers_ready.size(); i++) { recvbuffers_ready[i] = rrequests[i] != MPI_REQUEST_NULL; } |