diff options
Diffstat (limited to 'CarpetDev/CarpetIOF5/src/distribute.cc')
-rw-r--r-- | CarpetDev/CarpetIOF5/src/distribute.cc | 627 |
1 files changed, 627 insertions, 0 deletions
diff --git a/CarpetDev/CarpetIOF5/src/distribute.cc b/CarpetDev/CarpetIOF5/src/distribute.cc new file mode 100644 index 000000000..66ad0715f --- /dev/null +++ b/CarpetDev/CarpetIOF5/src/distribute.cc @@ -0,0 +1,627 @@ +#include "distribute.hh" + +#include <cctk_Parameters.h> + +#include <carpet.hh> + +#include <cassert> +#include <cstdlib> +#include <cstring> +#include <vector> + + + +namespace CarpetIOF5 { + + using namespace std; + + + + /*** fragdesc_t *************************************************************/ + + int fragdesc_t::npoints() const + { + int np = 1; + for (int d=0; d<dim; ++d) { + np *= imax[d] - imin[d] + 1; + } + assert(np>0); + return np; + } + + int fragdesc_t::vartypesize() const + { + assert(varindex>=0); + int const vartype = CCTK_VarTypeI(varindex); + assert(vartype>=0); + int const size = CCTK_VarTypeSize(vartype); + assert(size>0); + return size; + } + + MPI_Datatype fragdesc_t::datatype() const + { + assert(varindex>=0); + int const vartype = CCTK_VarTypeI(varindex); + assert(vartype>=0); + switch (vartype) { + case CCTK_VARIABLE_BYTE : return dist::mpi_datatype<CCTK_BYTE >(); + case CCTK_VARIABLE_INT : return dist::mpi_datatype<CCTK_INT >(); + case CCTK_VARIABLE_REAL : return dist::mpi_datatype<CCTK_REAL >(); + case CCTK_VARIABLE_COMPLEX: return dist::mpi_datatype<CCTK_COMPLEX>(); + default: assert(0); + } + } + + + + /*** scatter_t **************************************************************/ + + scatter_t::scatter_t(cGH const *const cctkGH_) + : cctkGH(cctkGH_), + num_received(0), num_sent(0), bytes_allocated(0), + did_send_all(false), + did_receive_sent_all(0), did_receive_all_sent_all(false) + { + DECLARE_CCTK_PARAMETERS; + if (verbose) { + CCTK_INFO("Creating global scatter object"); + } + post_public_recvs(); + } + + + + scatter_t::~scatter_t() + { + DECLARE_CCTK_PARAMETERS; + if (verbose) { + CCTK_INFO("Shutting down global scatter object"); + } + + // Wait until everything has been sent + bool did_send_everything; + for (;;) { + if (verbose) { + CCTK_INFO("Waiting until something has been transmitted..."); + } + did_send_everything = sends.empty(); + if (did_send_everything) break; + do_some_work(true); + } + if (verbose) { + CCTK_INFO("We sent all our data"); + } + + // Notify others: we sent all our data + set_did_send_all(); + + // Wait until everything has been transmitted + bool did_transmit_everything; + for (;;) { + if (verbose) { + CCTK_INFO("Waiting until something has been transmitted..."); + } + did_transmit_everything = + did_receive_all_sent_all and recvs.empty() and sends.empty(); + if (did_transmit_everything) break; + do_some_work(true); + } + if (verbose) { + CCTK_INFO("Everything has been transmitted"); + } + + int const my_difference = num_sent - num_received; + int total_difference; + MPI_Allreduce(const_cast<int*>(&my_difference), &total_difference, + 1, MPI_INT, MPI_SUM, + dist::comm()); + if (total_difference < 0) { + CCTK_WARN(CCTK_WARN_ABORT, + "More received messages than sent messages -- impossible!"); + } + if (total_difference > 0) { + CCTK_WARN(CCTK_WARN_ABORT, "Not all sent messages have been received"); + } + if (verbose) { + CCTK_INFO("Global number of sent and received messages is consistent"); + } + + // Cancel the public receives + if (verbose) { + CCTK_INFO("Cancelling all public receives..."); + } + while (not public_recvs.empty()) { + list<transmission_t*>::iterator const tmi = public_recvs.begin(); + transmission_t *const tm = *tmi; + MPI_Cancel(&tm->request); + public_recvs.erase(tmi); + } + + if (verbose) { + CCTK_INFO("Destroying down global scatter object"); + } + + assert(public_recvs.empty()); + assert(recvs.empty()); + assert(sends.empty()); + } + + + + // Communication tree + int nsiblings() { return 2; } + // Get process id of my first child (the other children are + // consecutive) + int child() { return dist::rank()*nsiblings() + 1; } + // Get process id if my parent + int parent() { return (dist::rank()-1) / nsiblings(); } + + // Check whether we should tell our parent that all our (ours and + // our childrens') messages have been sent + void scatter_t::maybe_send_did_send() + { + DECLARE_CCTK_PARAMETERS; + int need_receive = 0; + for (int p = child(); p<child()+nsiblings(); ++p) { + if (p < dist::size()) ++need_receive; + } + if (did_send_all and did_receive_sent_all == need_receive) { + if (dist::rank() == 0) { + // We are root; now broadcast this to all + set_did_receive_all_sent_all(); + } else { + if (verbose) { + CCTK_INFO("[Telling our parent that we and our children sent all our data]"); + } + to_parent.state = fragdesc_t::state_sent_all; + int const p = parent(); + MPI_Request request; + MPI_Isend(&to_parent, to_parent.num_ints(), MPI_INT, p, tag_desc, + dist::comm(), &request); + } + } + } + + // Broadcast "all message have been sent" to all our children + void scatter_t::send_all_did_send() + { + DECLARE_CCTK_PARAMETERS; + if (verbose) { + CCTK_INFO("[Telling our children that all data have been sent]"); + } + to_children.state = fragdesc_t::state_all_sent_all; + for (int p = child(); p<child()+nsiblings(); ++p) { + if (p < dist::size()) { + MPI_Request request; + MPI_Isend(&to_children, to_children.num_ints(), MPI_INT, p, tag_desc, + dist::comm(), &request); + } + } + } + + // We (this process) sent all our messages + void scatter_t::set_did_send_all() + { + DECLARE_CCTK_PARAMETERS; + if (verbose) { + CCTK_INFO("[We sent all our data]"); + } + assert(not did_send_all); + did_send_all = true; + maybe_send_did_send(); + } + + // One of our children (and all its children) sent all their + // messages + void scatter_t::set_did_receive_sent_all() + { + DECLARE_CCTK_PARAMETERS; + if (verbose) { + CCTK_INFO("[One of our children sent all their data]"); + } + assert(did_receive_sent_all < 2); + ++did_receive_sent_all; + maybe_send_did_send(); + } + + // Our parent broadcast that all messages have been sent + void scatter_t::set_did_receive_all_sent_all() + { + DECLARE_CCTK_PARAMETERS; + if (verbose) { + CCTK_INFO("[Everything has been sent]"); + } + assert(not did_receive_all_sent_all); + did_receive_all_sent_all = true; + send_all_did_send(); + } + + // Dispatch upon a state change message + void scatter_t::handle_state_transition(fragdesc_t const& fd) + { + switch (fd.state) { + case fragdesc_t::state_sent_all: set_did_receive_sent_all(); break; + case fragdesc_t::state_all_sent_all: set_did_receive_all_sent_all(); break; + default: assert(0); + } + } + + + + void scatter_t::send(fragdesc_t const& fd, void const *const data) + { + DECLARE_CCTK_PARAMETERS; + + assert(data); + + if (verbose) { + char *const fullname = CCTK_FullName(fd.varindex); + CCTK_VInfo(CCTK_THORNSTRING, + "Sending data read from variable %s, reflevel %d, map %d, component %d, timelevel %d", + fullname, fd.reflevel, fd.map, fd.src_component, fd.timelevel); + free(fullname); + } + + list<transmission_t*> tosend = split_for_sending(fd, data); + + while (not tosend.empty()) { + list<transmission_t*>::iterator const tmi = tosend.begin(); + transmission_t *const tm = *tmi; + + if (verbose) { + CCTK_VInfo(CCTK_THORNSTRING, + " Sending to process %d...", tm->fragdesc.process); + } + + // Send descriptor and data + MPI_Request req; + MPI_Isend(&tm->fragdesc, tm->fragdesc.num_ints(), MPI_INT, + tm->fragdesc.process, tag_desc, + dist::comm(), &req); + MPI_Isend(&tm->data[0], tm->fragdesc.npoints(), tm->fragdesc.datatype(), + tm->fragdesc.process, tag_data, + dist::comm(), &tm->request); + + tosend.erase(tmi); + sends.push_back(tm); + } + + // Do some work (if some is available) + do_some_work(); + + if (verbose) { + CCTK_INFO("Done sending"); + } + } + + + + void scatter_t::post_public_recvs() + { + DECLARE_CCTK_PARAMETERS; + + if (verbose) { + CCTK_INFO("Posting public receives"); + } + + // Post public receives + while ((int)public_recvs.size() < num_public_recvs) { + transmission_t *const tm = new transmission_t; + MPI_Irecv(&tm->fragdesc, tm->fragdesc.num_ints(), MPI_INT, + MPI_ANY_SOURCE, tag_desc, + dist::comm(), &tm->request); + public_recvs.push_back(tm); + } + } + + + + void scatter_t::do_some_work(bool const do_wait) + { + DECLARE_CCTK_PARAMETERS; + + if (verbose) { + CCTK_INFO("Checking for progress"); + } + + // Set up an array of all open requests + vector<MPI_Request> requests; + vector<list<transmission_t*>::iterator> iterators; + int const npublic_recvs = public_recvs.size(); + int const nrecvs = recvs.size(); + int const nsends = sends.size(); + int const nrequests = npublic_recvs + nrecvs + nsends; + requests.reserve(nrequests); + iterators.reserve(nrequests); + for (list<transmission_t*>::iterator + tmi = public_recvs.begin(); tmi != public_recvs.end(); ++tmi) + { + transmission_t const *const tm = *tmi; + requests.push_back(tm->request); + iterators.push_back(tmi); + } + for (list<transmission_t*>::iterator + tmi = recvs.begin(); tmi != recvs.end(); ++tmi) + { + transmission_t const *const tm = *tmi; + requests.push_back(tm->request); + iterators.push_back(tmi); + } + for (list<transmission_t*>::iterator + tmi = sends.begin(); tmi != sends.end(); ++tmi) + { + transmission_t const *const tm = *tmi; + requests.push_back(tm->request); + iterators.push_back(tmi); + } + + // Wait for (or test for) some open requests + int outcount; + vector<int> indices(nrequests); + vector<MPI_Status> statuses(nrequests); + if (do_wait) { + if (verbose) { + CCTK_INFO("Waiting for some progress..."); + } + MPI_Waitsome(requests.size(), &requests[0], + &outcount, &indices[0], &statuses[0]); + } else { + if (verbose) { + CCTK_INFO("Testing for some progress..."); + } + MPI_Testsome(requests.size(), &requests[0], + &outcount, &indices[0], &statuses[0]); + } + if (verbose) { + CCTK_VInfo(CCTK_THORNSTRING, "Completed %d transmissions", outcount); + } + + // Process all completed requests + for (int n=0; n<outcount; ++n) { + int const idx = indices.at(n); + + if (idx < npublic_recvs) { + + // We received a new descriptor + int const source = statuses.at(idx).MPI_SOURCE; + + if (verbose) { + CCTK_VInfo(CCTK_THORNSTRING, + "Receiving data from process %d", source); + } + + list<transmission_t*>::iterator const tmi = iterators.at(idx); + transmission_t *const tm = *tmi; + public_recvs.erase(tmi); + + if (tm->fragdesc.state != fragdesc_t::state_normal) { + handle_state_transition(tm->fragdesc); + } else { + + // Prepare receiving the data + assert(tm->fragdesc.process == dist::rank()); + tm->data.resize(tm->fragdesc.npoints() * tm->fragdesc.vartypesize()); + bytes_allocated += tm->data.size(); + MPI_Irecv(&tm->data[0], + tm->fragdesc.npoints(), tm->fragdesc.datatype(), + source, tag_data, + dist::comm(), &tm->request); + recvs.push_back(tm); + if (verbose) { + CCTK_VInfo(CCTK_THORNSTRING, + " Current buffer size: %td bytes", bytes_allocated); + } + + post_public_recvs(); + } + + } else if (idx < npublic_recvs + nrecvs) { + + // We completed receiving a dataset; process it + list<transmission_t*>::iterator const tmi = iterators.at(idx); + transmission_t *const tm = *tmi; + + if (verbose) { + char *const fullname = CCTK_FullName(tm->fragdesc.varindex); + CCTK_VInfo(CCTK_THORNSTRING, + "Completed receiving data for variable %s", fullname); + free(fullname); + } + + write_data(tm); + bytes_allocated -= tm->data.size(); + delete tm; + if (verbose) { + CCTK_VInfo(CCTK_THORNSTRING, + " Current buffer size: %td bytes", bytes_allocated); + } + recvs.erase(tmi); + ++num_received; + + } else { + + // We completed sending a dataset; forget it + list<transmission_t*>::iterator const tmi = iterators.at(idx); + transmission_t *const tm = *tmi; + + if (verbose) { + char *const fullname = CCTK_FullName(tm->fragdesc.varindex); + CCTK_VInfo(CCTK_THORNSTRING, + "Completed sending data for variable %s", fullname); + free(fullname); + } + + bytes_allocated -= tm->data.size(); + delete tm; + if (verbose) { + CCTK_VInfo(CCTK_THORNSTRING, + " Current buffer size: %td bytes", bytes_allocated); + } + sends.erase(tmi); + ++num_sent; + + } + } + } + + + + list<scatter_t::transmission_t*> + scatter_t::split_for_sending(fragdesc_t const& fd, void const *const data) + { + DECLARE_CCTK_PARAMETERS; + + int const groupindex = CCTK_GroupIndexFromVarI(fd.varindex); + assert(groupindex>=0); + int const varoffset = fd.varindex - CCTK_FirstVarIndexI(groupindex); + assert(varoffset>=0 and varoffset<=fd.varindex); + + gh const& hh = *Carpet::arrdata.at(groupindex).at(fd.map).hh; + dh const& dd = *Carpet::arrdata.at(groupindex).at(fd.map).dd; + th const& tt = *Carpet::arrdata.at(groupindex).at(fd.map).tt; + + ibbox const& baseext = + hh.baseextents.AT(fd.mglevel).AT(fd.reflevel); + + ibbox const mybox(baseext.lower() + fd.imin * baseext.stride(), + baseext.lower() + fd.imax * baseext.stride(), + baseext.stride()); + + ibset done; + list<transmission_t*> tosend; + dh::light_cboxes const& light_cbox = + dd.light_boxes.at(fd.mglevel).at(fd.reflevel); + for (int c=0; c<hh.components(fd.reflevel); ++c) { + dh::light_dboxes const& light_box = light_cbox.at(c); + ibbox const& intr = light_box.interior; + ibbox const ovlp = mybox & intr; + assert((ovlp & done).empty()); + done += ovlp; + + if (not ovlp.empty()) { + ibbox const& box = ovlp; + transmission_t *const tm = new transmission_t; + tm->fragdesc = fd; + tm->fragdesc.imin = (box.lower() - baseext.lower()) / baseext.stride(); + tm->fragdesc.imax = (box.upper() - baseext.lower()) / baseext.stride(); + tm->fragdesc.component = c; + tm->fragdesc.process = hh.processor(fd.reflevel, c); + + ptrdiff_t const vartypesize = tm->fragdesc.vartypesize(); + ptrdiff_t const ni = tm->fragdesc.imax[0] - tm->fragdesc.imin[0] + 1; + ptrdiff_t const nj = tm->fragdesc.imax[1] - tm->fragdesc.imin[1] + 1; + ptrdiff_t const nk = tm->fragdesc.imax[2] - tm->fragdesc.imin[2] + 1; + ptrdiff_t const di = 1; + ptrdiff_t const dj = ni; + ptrdiff_t const dk = ni * nj; + ptrdiff_t const np = ni * nj * nk; + tm->data.resize(np * vartypesize); + bytes_allocated += tm->data.size(); + if (verbose) { + CCTK_VInfo(CCTK_THORNSTRING, + " Current buffer size: %td bytes", bytes_allocated); + } + char *const dst = &tm->data[0]; + + ptrdiff_t const nis = fd.imax[0] - fd.imin[0] + 1; + ptrdiff_t const njs = fd.imax[1] - fd.imin[1] + 1; + ptrdiff_t const nks = fd.imax[2] - fd.imin[2] + 1; + ptrdiff_t const dis = 1; + ptrdiff_t const djs = nis; + ptrdiff_t const dks = nis * njs; + ptrdiff_t const nps = nis * njs * nks; + ptrdiff_t const i0s = tm->fragdesc.imin[0] - fd.imin[0]; + ptrdiff_t const j0s = tm->fragdesc.imin[1] - fd.imin[1]; + ptrdiff_t const k0s = tm->fragdesc.imin[2] - fd.imin[2]; + ptrdiff_t const ind0s = i0s * dis + j0s * djs + k0s * dks; + char const *const src = &((char const*)data)[vartypesize * ind0s]; + +#pragma omp parallel for collapse(2) + for (ptrdiff_t k=0; k<nk; ++k) { + for (ptrdiff_t j=0; j<nj; ++j) { + ptrdiff_t const ind = j*dj + k*dk; + ptrdiff_t const inds = j*djs + k*dks; + memcpy(&dst[ind], &src[inds], ni*vartypesize); + } + } + + tosend.push_back(tm); + } + } + // Don't enforce this -- mesh refinement boundaries have ghosts + // that do not overlap with any interior + // assert(done == mybox); + + return tosend; + } + + + + void scatter_t::write_data(transmission_t *const tm) + { + fragdesc_t const& fd = tm->fragdesc; + int const groupindex = CCTK_GroupIndexFromVarI(fd.varindex); + assert(groupindex>=0); + int const varoffset = fd.varindex - CCTK_FirstVarIndexI(groupindex); + assert(varoffset>=0 and varoffset<=fd.varindex); + + gh const& hh = *Carpet::arrdata.at(groupindex).at(fd.map).hh; + dh const& dd = *Carpet::arrdata.at(groupindex).at(fd.map).dd; + th const& tt = *Carpet::arrdata.at(groupindex).at(fd.map).tt; + ggf const& ff = + *Carpet::arrdata.at(groupindex).at(fd.map).data.at(varoffset); + int const lc = hh.get_local_component(fd.reflevel, fd.component); + gdata const& data = + *ff.data_pointer(fd.timelevel, fd.reflevel, lc, fd.mglevel); + + ibbox const& baseext = + hh.baseextents.AT(fd.mglevel).AT(fd.reflevel); + + ibbox const mybox(baseext.lower() + fd.imin * baseext.stride(), + baseext.lower() + fd.imax * baseext.stride(), + baseext.stride()); + + dh::light_dboxes const& light_box = + dd.light_boxes.at(fd.mglevel).at(fd.reflevel).at(fd.component); + ibbox const& intr = light_box.interior; + assert(mybox.is_contained_in(intr)); + ibbox const& extr = light_box.exterior; + + ptrdiff_t const vartypesize = fd.vartypesize(); + ptrdiff_t const ni = fd.imax[0] - fd.imin[0] + 1; + ptrdiff_t const nj = fd.imax[1] - fd.imin[1] + 1; + ptrdiff_t const nk = fd.imax[2] - fd.imin[2] + 1; + ptrdiff_t const di = 1; + ptrdiff_t const dj = ni; + ptrdiff_t const dk = ni * nj; + ptrdiff_t const np = ni * nj * nk; + char const *const src = (char const*)&tm->data[0]; + + ivect const lbnd = (extr.lower() - baseext.lower()) / baseext.stride(); + ivect const lsh = extr.shape() / baseext.stride(); + ptrdiff_t const nid = lsh[0]; + ptrdiff_t const njd = lsh[1]; + ptrdiff_t const nkd = lsh[2]; + ptrdiff_t const did = 1; + ptrdiff_t const djd = nid; + ptrdiff_t const dkd = nid * njd; + ptrdiff_t const npd = nid * njd * nkd; + ptrdiff_t const i0d = fd.imin[0] - lbnd[0]; + ptrdiff_t const j0d = fd.imin[1] - lbnd[1]; + ptrdiff_t const k0d = fd.imin[2] - lbnd[2]; + ptrdiff_t const ind0d = i0d * did + j0d * djd + k0d * dkd; + assert(data.has_storage()); + char *const dst = &((char*)data.storage())[ind0d]; + +#pragma omp parallel for collapse(2) + for (ptrdiff_t k=0; k<nk; ++k) { + for (ptrdiff_t j=0; j<nj; ++j) { + ptrdiff_t const indd = j*djd + k*dkd; + ptrdiff_t const ind = j*dj + k*dk; + memcpy(&dst[indd], &src[ind], ni*vartypesize); + } + } + } + +} // end namespace CarpetIOF5 |