aboutsummaryrefslogtreecommitdiff
path: root/CarpetDev/CarpetIOF5/src/distribute.cc
diff options
context:
space:
mode:
Diffstat (limited to 'CarpetDev/CarpetIOF5/src/distribute.cc')
-rw-r--r--CarpetDev/CarpetIOF5/src/distribute.cc125
1 files changed, 104 insertions, 21 deletions
diff --git a/CarpetDev/CarpetIOF5/src/distribute.cc b/CarpetDev/CarpetIOF5/src/distribute.cc
index 5fd8e5a29..ab7183df3 100644
--- a/CarpetDev/CarpetIOF5/src/distribute.cc
+++ b/CarpetDev/CarpetIOF5/src/distribute.cc
@@ -2,6 +2,7 @@
#include <cctk_Parameters.h>
+#include <cacheinfo.hh>
#include <carpet.hh>
#include <cassert>
@@ -52,10 +53,48 @@ namespace CarpetIOF5 {
+ /*** scatter_t::busy_tags_t *************************************************/
+
+ scatter_t::busy_tags_t::busy_tags_t()
+ {
+ // do nothing
+ }
+
+ scatter_t::busy_tags_t::~busy_tags_t()
+ {
+ // Ensure all tags are free
+ for (int tag=0; tag<(int)tags.size(); ++tag) {
+ assert(not tags.at(tag));
+ }
+ }
+
+ int scatter_t::busy_tags_t::allocate_tag()
+ {
+ if (tags.empty()) {
+ tags.resize(max_concurrent_sends, false);
+ }
+ for (int tag=0; tag<(int)tags.size(); ++tag) {
+ if (not tags.at(tag)) {
+ tags.at(tag) = true;
+ return tag;
+ }
+ }
+ return -1; // no free tag available
+ }
+
+ void scatter_t::busy_tags_t::free_tag(int const tag)
+ {
+ assert(tags.at(tag));
+ tags.at(tag) = false;
+ }
+
+
+
/*** scatter_t **************************************************************/
scatter_t::scatter_t(cGH const *const cctkGH_)
: cctkGH(cctkGH_),
+ busy_tags(CCTK_nProcs(cctkGH)),
num_received(0), num_sent(0), bytes_allocated(0),
did_send_all(false),
did_receive_sent_all(0), did_receive_all_sent_all(false)
@@ -136,7 +175,7 @@ namespace CarpetIOF5 {
}
if (verbose) {
- CCTK_INFO("Destroying down global scatter object");
+ CCTK_INFO("Destroying global scatter object");
}
assert(public_recvs.empty());
@@ -152,7 +191,7 @@ namespace CarpetIOF5 {
// consecutive)
int child() { return dist::rank()*nsiblings() + 1; }
// Get process id if my parent
- int parent() { return (dist::rank()-1) / nsiblings(); }
+ int parent() { return dist::rank()==0 ? -1 : (dist::rank()-1) / nsiblings(); }
// Check whether we should tell our parent that all our (ours and
// our childrens') messages have been sent
@@ -164,7 +203,8 @@ namespace CarpetIOF5 {
if (p < dist::size()) ++need_receive;
}
if (did_send_all and did_receive_sent_all == need_receive) {
- if (dist::rank() == 0) {
+ int const p = parent();
+ if (p < 0) {
// We are root; now broadcast this to all
set_did_receive_all_sent_all();
} else {
@@ -172,7 +212,6 @@ namespace CarpetIOF5 {
CCTK_INFO("[Telling our parent that we and our children sent all our data]");
}
to_parent.state = fragdesc_t::state_sent_all;
- int const p = parent();
MPI_Request request;
MPI_Isend(&to_parent, to_parent.num_ints(), MPI_INT, p, tag_desc,
dist::comm(), &request);
@@ -265,10 +304,25 @@ namespace CarpetIOF5 {
while (not tosend.empty()) {
list<transmission_t*>::iterator const tmi = tosend.begin();
transmission_t *const tm = *tmi;
+ tosend.erase(tmi);
+
+ // Choose tag
+ int const proc = tm->fragdesc.process;
+ int tag;
+ while (true) {
+ tag = busy_tags.at(proc).allocate_tag();
+ if (tag >= 0) break;
+ // No tag was available; wait for some progress before trying
+ // again (until the receiver has received some of our
+ // messages)
+ do_some_work(true);
+ }
+ tm->fragdesc.tag = tag_data_min + tag;
if (verbose) {
CCTK_VInfo(CCTK_THORNSTRING,
- " Sending to process %d...", tm->fragdesc.process);
+ " Sending data to process %d with tag %d...",
+ tm->fragdesc.process, tm->fragdesc.tag);
}
// Send descriptor and data
@@ -276,11 +330,10 @@ namespace CarpetIOF5 {
MPI_Isend(&tm->fragdesc, tm->fragdesc.num_ints(), MPI_INT,
tm->fragdesc.process, tag_desc,
dist::comm(), &req);
+ assert(tm->request == MPI_REQUEST_NULL);
MPI_Isend(&tm->data[0], tm->fragdesc.npoints(), tm->fragdesc.datatype(),
- tm->fragdesc.process, tag_data,
+ tm->fragdesc.process, tm->fragdesc.tag,
dist::comm(), &tm->request);
-
- tosend.erase(tmi);
sends.push_back(tm);
}
@@ -352,8 +405,9 @@ namespace CarpetIOF5 {
requests.push_back(tm->request);
iterators.push_back(tmi);
}
+ assert((int)requests.size() == nrequests);
- // Wait for (or test for) some open requests
+ // Wait (or test) for some open requests
int outcount;
vector<int> indices(nrequests);
vector<MPI_Status> statuses(nrequests);
@@ -377,32 +431,38 @@ namespace CarpetIOF5 {
// Process all completed requests
for (int n=0; n<outcount; ++n) {
int const idx = indices.at(n);
+ MPI_Status const& status = statuses.at(n);
if (idx < npublic_recvs) {
// We received a new descriptor
- int const source = statuses.at(idx).MPI_SOURCE;
-
- if (verbose) {
- CCTK_VInfo(CCTK_THORNSTRING,
- "Receiving data from process %d", source);
- }
+ int const source = status.MPI_SOURCE;
+ assert(source>=0 and source<dist::size());
list<transmission_t*>::iterator const tmi = iterators.at(idx);
transmission_t *const tm = *tmi;
+ assert(tm->request != MPI_REQUEST_NULL);
+ tm->request = MPI_REQUEST_NULL;
public_recvs.erase(tmi);
if (tm->fragdesc.state != fragdesc_t::state_normal) {
handle_state_transition(tm->fragdesc);
} else {
+ if (verbose) {
+ CCTK_VInfo(CCTK_THORNSTRING,
+ "Receiving data from process %d with tag %d",
+ source, tm->fragdesc.tag);
+ }
+
// Prepare receiving the data
assert(tm->fragdesc.process == dist::rank());
tm->data.resize(tm->fragdesc.npoints() * tm->fragdesc.vartypesize());
bytes_allocated += tm->data.size();
+ assert(tm->request == MPI_REQUEST_NULL);
MPI_Irecv(&tm->data[0],
tm->fragdesc.npoints(), tm->fragdesc.datatype(),
- source, tag_data,
+ source, tm->fragdesc.tag,
dist::comm(), &tm->request);
recvs.push_back(tm);
if (verbose) {
@@ -418,6 +478,8 @@ namespace CarpetIOF5 {
// We completed receiving a dataset; process it
list<transmission_t*>::iterator const tmi = iterators.at(idx);
transmission_t *const tm = *tmi;
+ assert(tm->request != MPI_REQUEST_NULL);
+ tm->request = MPI_REQUEST_NULL;
if (verbose) {
char *const fullname = CCTK_FullName(tm->fragdesc.varindex);
@@ -441,6 +503,8 @@ namespace CarpetIOF5 {
// We completed sending a dataset; forget it
list<transmission_t*>::iterator const tmi = iterators.at(idx);
transmission_t *const tm = *tmi;
+ assert(tm->request != MPI_REQUEST_NULL);
+ tm->request = MPI_REQUEST_NULL;
if (verbose) {
char *const fullname = CCTK_FullName(tm->fragdesc.varindex);
@@ -449,6 +513,10 @@ namespace CarpetIOF5 {
free(fullname);
}
+ int const proc = tm->fragdesc.process;
+ int const tag = tm->fragdesc.tag - tag_data_min;
+ busy_tags.at(proc).free_tag(tag);
+
bytes_allocated -= tm->data.size();
delete tm;
if (verbose) {
@@ -484,11 +552,28 @@ namespace CarpetIOF5 {
baseext.lower() + fd.imax * baseext.stride(),
baseext.stride());
+ // Distribute load: don't start with sending to process 0, instead
+ // start with sending to (self+1)
+ int coffset;
+ {
+ int const nlc = hh.local_components(fd.reflevel);
+ if (nlc > 0) {
+ // find component of next processor
+ int c0 = hh.get_component(fd.reflevel, 0);
+ coffset = c0 + nlc;
+ } else {
+ // estimate component of next processor
+ int const nc = hh.components(fd.reflevel);
+ coffset = nc * dist::rank() / dist::size();
+ }
+ }
+
ibset done;
list<transmission_t*> tosend;
dh::light_cboxes const& light_cbox =
dd.light_boxes.at(fd.mglevel).at(fd.reflevel);
- for (int c=0; c<hh.components(fd.reflevel); ++c) {
+ for (int ci=0; ci<hh.components(fd.reflevel); ++ci) {
+ int const c = (ci + coffset) % hh.components(fd.reflevel);
dh::light_dboxes const& light_box = light_cbox.at(c);
ibbox const& intr = light_box.interior;
ibbox const ovlp = mybox & intr;
@@ -567,11 +652,9 @@ namespace CarpetIOF5 {
gh const& hh = *Carpet::arrdata.at(groupindex).at(fd.map).hh;
dh const& dd = *Carpet::arrdata.at(groupindex).at(fd.map).dd;
- ggf const& ff =
- *Carpet::arrdata.at(groupindex).at(fd.map).data.at(varoffset);
+ ggf& ff = *Carpet::arrdata.at(groupindex).at(fd.map).data.at(varoffset);
int const lc = hh.get_local_component(fd.reflevel, fd.component);
- gdata const& data =
- *ff.data_pointer(fd.timelevel, fd.reflevel, lc, fd.mglevel);
+ gdata& data = *ff.data_pointer(fd.timelevel, fd.reflevel, lc, fd.mglevel);
ibbox const& baseext =
hh.baseextents.AT(fd.mglevel).AT(fd.reflevel);