diff options
Diffstat (limited to 'CarpetDev/CarpetIOF5/src/distribute.cc')
-rw-r--r-- | CarpetDev/CarpetIOF5/src/distribute.cc | 125 |
1 files changed, 104 insertions, 21 deletions
diff --git a/CarpetDev/CarpetIOF5/src/distribute.cc b/CarpetDev/CarpetIOF5/src/distribute.cc index 5fd8e5a29..ab7183df3 100644 --- a/CarpetDev/CarpetIOF5/src/distribute.cc +++ b/CarpetDev/CarpetIOF5/src/distribute.cc @@ -2,6 +2,7 @@ #include <cctk_Parameters.h> +#include <cacheinfo.hh> #include <carpet.hh> #include <cassert> @@ -52,10 +53,48 @@ namespace CarpetIOF5 { + /*** scatter_t::busy_tags_t *************************************************/ + + scatter_t::busy_tags_t::busy_tags_t() + { + // do nothing + } + + scatter_t::busy_tags_t::~busy_tags_t() + { + // Ensure all tags are free + for (int tag=0; tag<(int)tags.size(); ++tag) { + assert(not tags.at(tag)); + } + } + + int scatter_t::busy_tags_t::allocate_tag() + { + if (tags.empty()) { + tags.resize(max_concurrent_sends, false); + } + for (int tag=0; tag<(int)tags.size(); ++tag) { + if (not tags.at(tag)) { + tags.at(tag) = true; + return tag; + } + } + return -1; // no free tag available + } + + void scatter_t::busy_tags_t::free_tag(int const tag) + { + assert(tags.at(tag)); + tags.at(tag) = false; + } + + + /*** scatter_t **************************************************************/ scatter_t::scatter_t(cGH const *const cctkGH_) : cctkGH(cctkGH_), + busy_tags(CCTK_nProcs(cctkGH)), num_received(0), num_sent(0), bytes_allocated(0), did_send_all(false), did_receive_sent_all(0), did_receive_all_sent_all(false) @@ -136,7 +175,7 @@ namespace CarpetIOF5 { } if (verbose) { - CCTK_INFO("Destroying down global scatter object"); + CCTK_INFO("Destroying global scatter object"); } assert(public_recvs.empty()); @@ -152,7 +191,7 @@ namespace CarpetIOF5 { // consecutive) int child() { return dist::rank()*nsiblings() + 1; } // Get process id if my parent - int parent() { return (dist::rank()-1) / nsiblings(); } + int parent() { return dist::rank()==0 ? -1 : (dist::rank()-1) / nsiblings(); } // Check whether we should tell our parent that all our (ours and // our childrens') messages have been sent @@ -164,7 +203,8 @@ namespace CarpetIOF5 { if (p < dist::size()) ++need_receive; } if (did_send_all and did_receive_sent_all == need_receive) { - if (dist::rank() == 0) { + int const p = parent(); + if (p < 0) { // We are root; now broadcast this to all set_did_receive_all_sent_all(); } else { @@ -172,7 +212,6 @@ namespace CarpetIOF5 { CCTK_INFO("[Telling our parent that we and our children sent all our data]"); } to_parent.state = fragdesc_t::state_sent_all; - int const p = parent(); MPI_Request request; MPI_Isend(&to_parent, to_parent.num_ints(), MPI_INT, p, tag_desc, dist::comm(), &request); @@ -265,10 +304,25 @@ namespace CarpetIOF5 { while (not tosend.empty()) { list<transmission_t*>::iterator const tmi = tosend.begin(); transmission_t *const tm = *tmi; + tosend.erase(tmi); + + // Choose tag + int const proc = tm->fragdesc.process; + int tag; + while (true) { + tag = busy_tags.at(proc).allocate_tag(); + if (tag >= 0) break; + // No tag was available; wait for some progress before trying + // again (until the receiver has received some of our + // messages) + do_some_work(true); + } + tm->fragdesc.tag = tag_data_min + tag; if (verbose) { CCTK_VInfo(CCTK_THORNSTRING, - " Sending to process %d...", tm->fragdesc.process); + " Sending data to process %d with tag %d...", + tm->fragdesc.process, tm->fragdesc.tag); } // Send descriptor and data @@ -276,11 +330,10 @@ namespace CarpetIOF5 { MPI_Isend(&tm->fragdesc, tm->fragdesc.num_ints(), MPI_INT, tm->fragdesc.process, tag_desc, dist::comm(), &req); + assert(tm->request == MPI_REQUEST_NULL); MPI_Isend(&tm->data[0], tm->fragdesc.npoints(), tm->fragdesc.datatype(), - tm->fragdesc.process, tag_data, + tm->fragdesc.process, tm->fragdesc.tag, dist::comm(), &tm->request); - - tosend.erase(tmi); sends.push_back(tm); } @@ -352,8 +405,9 @@ namespace CarpetIOF5 { requests.push_back(tm->request); iterators.push_back(tmi); } + assert((int)requests.size() == nrequests); - // Wait for (or test for) some open requests + // Wait (or test) for some open requests int outcount; vector<int> indices(nrequests); vector<MPI_Status> statuses(nrequests); @@ -377,32 +431,38 @@ namespace CarpetIOF5 { // Process all completed requests for (int n=0; n<outcount; ++n) { int const idx = indices.at(n); + MPI_Status const& status = statuses.at(n); if (idx < npublic_recvs) { // We received a new descriptor - int const source = statuses.at(idx).MPI_SOURCE; - - if (verbose) { - CCTK_VInfo(CCTK_THORNSTRING, - "Receiving data from process %d", source); - } + int const source = status.MPI_SOURCE; + assert(source>=0 and source<dist::size()); list<transmission_t*>::iterator const tmi = iterators.at(idx); transmission_t *const tm = *tmi; + assert(tm->request != MPI_REQUEST_NULL); + tm->request = MPI_REQUEST_NULL; public_recvs.erase(tmi); if (tm->fragdesc.state != fragdesc_t::state_normal) { handle_state_transition(tm->fragdesc); } else { + if (verbose) { + CCTK_VInfo(CCTK_THORNSTRING, + "Receiving data from process %d with tag %d", + source, tm->fragdesc.tag); + } + // Prepare receiving the data assert(tm->fragdesc.process == dist::rank()); tm->data.resize(tm->fragdesc.npoints() * tm->fragdesc.vartypesize()); bytes_allocated += tm->data.size(); + assert(tm->request == MPI_REQUEST_NULL); MPI_Irecv(&tm->data[0], tm->fragdesc.npoints(), tm->fragdesc.datatype(), - source, tag_data, + source, tm->fragdesc.tag, dist::comm(), &tm->request); recvs.push_back(tm); if (verbose) { @@ -418,6 +478,8 @@ namespace CarpetIOF5 { // We completed receiving a dataset; process it list<transmission_t*>::iterator const tmi = iterators.at(idx); transmission_t *const tm = *tmi; + assert(tm->request != MPI_REQUEST_NULL); + tm->request = MPI_REQUEST_NULL; if (verbose) { char *const fullname = CCTK_FullName(tm->fragdesc.varindex); @@ -441,6 +503,8 @@ namespace CarpetIOF5 { // We completed sending a dataset; forget it list<transmission_t*>::iterator const tmi = iterators.at(idx); transmission_t *const tm = *tmi; + assert(tm->request != MPI_REQUEST_NULL); + tm->request = MPI_REQUEST_NULL; if (verbose) { char *const fullname = CCTK_FullName(tm->fragdesc.varindex); @@ -449,6 +513,10 @@ namespace CarpetIOF5 { free(fullname); } + int const proc = tm->fragdesc.process; + int const tag = tm->fragdesc.tag - tag_data_min; + busy_tags.at(proc).free_tag(tag); + bytes_allocated -= tm->data.size(); delete tm; if (verbose) { @@ -484,11 +552,28 @@ namespace CarpetIOF5 { baseext.lower() + fd.imax * baseext.stride(), baseext.stride()); + // Distribute load: don't start with sending to process 0, instead + // start with sending to (self+1) + int coffset; + { + int const nlc = hh.local_components(fd.reflevel); + if (nlc > 0) { + // find component of next processor + int c0 = hh.get_component(fd.reflevel, 0); + coffset = c0 + nlc; + } else { + // estimate component of next processor + int const nc = hh.components(fd.reflevel); + coffset = nc * dist::rank() / dist::size(); + } + } + ibset done; list<transmission_t*> tosend; dh::light_cboxes const& light_cbox = dd.light_boxes.at(fd.mglevel).at(fd.reflevel); - for (int c=0; c<hh.components(fd.reflevel); ++c) { + for (int ci=0; ci<hh.components(fd.reflevel); ++ci) { + int const c = (ci + coffset) % hh.components(fd.reflevel); dh::light_dboxes const& light_box = light_cbox.at(c); ibbox const& intr = light_box.interior; ibbox const ovlp = mybox & intr; @@ -567,11 +652,9 @@ namespace CarpetIOF5 { gh const& hh = *Carpet::arrdata.at(groupindex).at(fd.map).hh; dh const& dd = *Carpet::arrdata.at(groupindex).at(fd.map).dd; - ggf const& ff = - *Carpet::arrdata.at(groupindex).at(fd.map).data.at(varoffset); + ggf& ff = *Carpet::arrdata.at(groupindex).at(fd.map).data.at(varoffset); int const lc = hh.get_local_component(fd.reflevel, fd.component); - gdata const& data = - *ff.data_pointer(fd.timelevel, fd.reflevel, lc, fd.mglevel); + gdata& data = *ff.data_pointer(fd.timelevel, fd.reflevel, lc, fd.mglevel); ibbox const& baseext = hh.baseextents.AT(fd.mglevel).AT(fd.reflevel); |