aboutsummaryrefslogtreecommitdiff
path: root/Carpet/CarpetLib/src/gdata.cc
diff options
context:
space:
mode:
authorThomas Radke <tradke@aei.mpg.de>2005-05-26 11:42:00 +0000
committerThomas Radke <tradke@aei.mpg.de>2005-05-26 11:42:00 +0000
commit6a5fdd7e19335d54e102a6e5b7211fc920c94317 (patch)
tree7bce2af36a1d658587a5957b673985ce739f425e /Carpet/CarpetLib/src/gdata.cc
parentcbe3768986ffe1b8156427751d617f124f6fabfe (diff)
CarpetLib: clean-up of communication scheme for individual sends/recvs on single components
The default communication scheme in Carpet (which does an individual send/recv operation for each component) comes with two parameters for fine tuning: CarpetLib::use_lightweight_buffers CarpetLib::combine_recv_send the defaults of which are set to use a well-tested but also slower communication pattern (as turned out during benchmark runs). This patch cleans up the implementation of this communication scheme so that the fastest communication pattern (combined posting of send/recv; use of lightweight buffers) is now always used. The above parameters therefore became obsolete and shouldn't be used anymore in parfiles. darcs-hash:20050526114253-776a0-780933a1539a260d74da8b92522fa2f48c714964.gz
Diffstat (limited to 'Carpet/CarpetLib/src/gdata.cc')
-rw-r--r--Carpet/CarpetLib/src/gdata.cc749
1 files changed, 215 insertions, 534 deletions
diff --git a/Carpet/CarpetLib/src/gdata.cc b/Carpet/CarpetLib/src/gdata.cc
index 31f62b38e..f5e3ac081 100644
--- a/Carpet/CarpetLib/src/gdata.cc
+++ b/Carpet/CarpetLib/src/gdata.cc
@@ -26,7 +26,7 @@ using namespace std;
static int nexttag ()
{
DECLARE_CCTK_PARAMETERS;
-
+
int const min_tag = 100;
static int last = 0;
++last;
@@ -66,29 +66,16 @@ void gdata::change_processor (comm_state& state,
const int newproc,
void* const mem)
{
- DECLARE_CCTK_PARAMETERS;
-
switch (state.thestate) {
- case state_recv:
- if (combine_recv_send) {
- change_processor_recv (state, newproc, mem);
- change_processor_send (state, newproc, mem);
- } else {
- change_processor_recv (state, newproc, mem);
- }
- break;
- case state_send:
- if (combine_recv_send) {
- // do nothing
- } else {
- change_processor_send (state, newproc, mem);
- }
+ case state_post:
+ change_processor_recv (state, newproc, mem);
+ change_processor_send (state, newproc, mem);
break;
case state_wait:
change_processor_wait (state, newproc, mem);
break;
default:
- assert(0);
+ assert(0 && "invalid state");
}
}
@@ -98,39 +85,34 @@ void gdata::change_processor (comm_state& state,
void gdata::copy_from (comm_state& state,
const gdata* src, const ibbox& box)
{
- DECLARE_CCTK_PARAMETERS;
-
assert (has_storage() && src->has_storage());
assert (all(box.lower()>=extent().lower()
- && box.lower()>=src->extent().lower()));
+ && box.lower()>=src->extent().lower()));
assert (all(box.upper()<=extent().upper()
- && box.upper()<=src->extent().upper()));
+ && box.upper()<=src->extent().upper()));
assert (all(box.stride()==extent().stride()
- && box.stride()==src->extent().stride()));
+ && box.stride()==src->extent().stride()));
assert (all((box.lower()-extent().lower())%box.stride() == 0
- && (box.lower()-src->extent().lower())%box.stride() == 0));
-
+ && (box.lower()-src->extent().lower())%box.stride() == 0));
+
if (box.empty()) return;
-
+ if (dist::rank() != proc() && dist::rank() != src->proc()) return;
+
switch (state.thestate) {
- case state_recv:
- if (combine_recv_send) {
- copy_from_recv (state, src, box);
- copy_from_send (state, src, box);
+ case state_post:
+ if (proc() == src->proc()) {
+ copy_from_innerloop (src, box);
} else {
- copy_from_recv (state, src, box);
- }
- break;
- case state_send:
- if (combine_recv_send) {
- // do nothing
- } else {
- copy_from_send (state, src, box);
+ copy_from_post (state, src, box);
}
break;
+
case state_wait:
- copy_from_wait (state, src, box);
+ if (proc() != src->proc()) {
+ copy_from_wait (state, src, box);
+ }
break;
+
case state_get_buffer_sizes:
// don't count processor-local copies
if (proc() != src->proc()) {
@@ -144,13 +126,19 @@ void gdata::copy_from (comm_state& state,
}
}
break;
+
case state_fill_send_buffers:
// if this is a source processor: copy its data into the send buffer
// (the processor-local case is also handled here)
if (src->proc() == dist::rank()) {
- copy_into_sendbuffer (state, src, box);
+ if (proc() == src->proc()) {
+ copy_from_innerloop (src, box);
+ } else {
+ copy_into_sendbuffer (state, src, box);
+ }
}
break;
+
case state_empty_recv_buffers:
// if this is a destination processor and data has already been received
// from the source processor: copy it from the recv buffer
@@ -158,286 +146,160 @@ void gdata::copy_from (comm_state& state,
copy_from_recvbuffer (state, src, box);
}
break;
+
default:
- assert(0);
+ assert(0 && "invalid state");
}
}
-
-void gdata::copy_from_nocomm (const gdata* src, const ibbox& box)
+void gdata::copy_from_post (comm_state& state,
+ const gdata* src, const ibbox& box)
{
- assert (has_storage() && src->has_storage());
- assert (proc() == src->proc());
-
- // copy on same processor
+ wtime_copyfrom_recv.start();
+
if (dist::rank() == proc()) {
- copy_from_innerloop (src, box);
- }
-}
+ // this processor receives data
+ wtime_copyfrom_recvinner_allocate.start();
+ comm_state::gcommbuf * b = make_typed_commbuf (box);
+ wtime_copyfrom_recvinner_allocate.stop();
+
+ wtime_copyfrom_recvinner_recv.start();
+ MPI_Irecv (b->pointer(), b->size(), b->datatype(), src->proc(),
+ tag, dist::comm, &b->request);
+ wtime_copyfrom_recvinner_recv.stop();
+ state.requests.push_back (b->request);
+ state.recvbufs.push (b);
-void gdata::copy_from_recv (comm_state& state,
- const gdata* src, const ibbox& box)
-{
- DECLARE_CCTK_PARAMETERS;
-
- wtime_copyfrom_recv.start();
-
- if (proc() == src->proc()) {
- // copy on same processor
-
} else {
- // copy to different processor
-
- if (! use_lightweight_buffers) {
-
- wtime_copyfrom_recv_maketyped.start();
- gdata* const tmp = make_typed(varindex, transport_operator);
- wtime_copyfrom_recv_maketyped.stop();
- state.tmps1.push (tmp);
- wtime_copyfrom_recv_allocate.start();
- tmp->allocate (box, src->proc());
- wtime_copyfrom_recv_allocate.stop();
- wtime_copyfrom_recv_changeproc_recv.start();
- tmp->change_processor_recv (state, proc());
- wtime_copyfrom_recv_changeproc_recv.stop();
-
- } else {
-
- if (dist::rank() == proc()) {
- // this processor receives data
-
- wtime_copyfrom_recvinner_allocate.start();
- comm_state::gcommbuf * b = make_typed_commbuf (box);
- wtime_copyfrom_recvinner_allocate.stop();
-
- wtime_copyfrom_recvinner_recv.start();
- MPI_Irecv (b->pointer(), b->size(), b->datatype(), src->proc(),
- tag, dist::comm, &b->request);
- wtime_copyfrom_recvinner_recv.stop();
- if (use_waitall) {
- state.requests.push_back (b->request);
- }
- state.recvbufs.push (b);
-
+ // this processor sends data
+
+ wtime_copyfrom_sendinner_allocate.start();
+ comm_state::gcommbuf * b = src->make_typed_commbuf (box);
+ wtime_copyfrom_sendinner_allocate.stop();
+
+ // copy data into send buffer
+ wtime_copyfrom_sendinner_copy.start();
+ const ibbox& ext = src->extent();
+ ivect shape = ext.shape() / ext.stride();
+ ivect items = (box.upper() - box.lower()) / box.stride() + 1;
+ ivect offs = (box.lower() - ext.lower()) / ext.stride();
+ char* send_buffer = (char*) b->pointer();
+ const int vartype = CCTK_VarTypeI(varindex);
+ const int vartypesize = CCTK_VarTypeSize(vartype);
+ assert(vartypesize >= 0);
+
+ for (int k = 0; k < items[2]; k++) {
+ for (int j = 0; j < items[1]; j++) {
+ int i = offs[0] + shape[0]*((j+offs[1]) + shape[1]*(k+offs[2]));
+ memcpy (send_buffer,
+ ((char*) src->storage()) + vartypesize*i,
+ vartypesize*items[0]);
+ send_buffer += vartypesize*items[0];
}
-
}
-
- }
-
- wtime_copyfrom_recv.stop();
-}
+ wtime_copyfrom_sendinner_copy.stop();
-void gdata::copy_from_send (comm_state& state,
- const gdata* src, const ibbox& box)
-{
- DECLARE_CCTK_PARAMETERS;
-
- wtime_copyfrom_send.start();
-
- if (proc() == src->proc()) {
- // copy on same processor
-
- wtime_copyfrom_send_copyfrom_nocomm1.start();
- copy_from_nocomm (src, box);
- wtime_copyfrom_send_copyfrom_nocomm1.stop();
-
- } else {
- // copy to different processor
-
- if (! use_lightweight_buffers) {
-
- gdata* const tmp = state.tmps1.front();
- state.tmps1.pop();
- state.tmps2.push (tmp);
- assert (tmp);
- wtime_copyfrom_send_copyfrom_nocomm2.start();
- tmp->copy_from_nocomm (src, box);
- wtime_copyfrom_send_copyfrom_nocomm2.stop();
- wtime_copyfrom_send_changeproc_send.start();
- tmp->change_processor_send (state, proc());
- wtime_copyfrom_send_changeproc_send.stop();
-
- } else {
-
- if (dist::rank() == src->proc()) {
- // this processor sends data
-
- wtime_copyfrom_sendinner_allocate.start();
- comm_state::gcommbuf * b = src->make_typed_commbuf (box);
- wtime_copyfrom_sendinner_allocate.stop();
-
- wtime_copyfrom_sendinner_copy.start();
- assert (src->_has_storage);
- assert (dist::rank() == src->proc());
- gdata * tmp = src->make_typed (varindex, transport_operator, tag);
- tmp->allocate (box, src->proc(), b->pointer());
- tmp->copy_from_innerloop (src, box);
- delete tmp;
- wtime_copyfrom_sendinner_copy.stop();
-
- wtime_copyfrom_sendinner_send.start();
- MPI_Isend (b->pointer(), b->size(), b->datatype(), proc(),
+ wtime_copyfrom_sendinner_send.start();
+ MPI_Isend (b->pointer(), b->size(), b->datatype(), proc(),
tag, dist::comm, &b->request);
- wtime_copyfrom_sendinner_send.stop();
- if (use_waitall) {
- state.requests.push_back (b->request);
- }
- state.sendbufs.push (b);
-
- }
- }
-
+ wtime_copyfrom_sendinner_send.stop();
+ state.requests.push_back (b->request);
+ state.sendbufs.push (b);
}
-
- wtime_copyfrom_send.stop();
+
+ wtime_copyfrom_recv.stop();
}
+
void gdata::copy_from_wait (comm_state& state,
const gdata* src, const ibbox& box)
{
- DECLARE_CCTK_PARAMETERS;
-
wtime_copyfrom_wait.start();
-
- if (proc() == src->proc()) {
- // copy on same processor
-
- } else {
- // copy to different processor
-
- if (! use_lightweight_buffers) {
-
- gdata* const tmp = state.tmps2.front();
- state.tmps2.pop();
- assert (tmp);
- wtime_copyfrom_wait_changeproc_wait.start();
- tmp->change_processor_wait (state, proc());
- wtime_copyfrom_wait_changeproc_wait.stop();
- wtime_copyfrom_wait_copyfrom_nocomm.start();
- copy_from_nocomm (tmp, box);
- wtime_copyfrom_wait_copyfrom_nocomm.stop();
- wtime_copyfrom_wait_delete.start();
- delete tmp;
- wtime_copyfrom_wait_delete.stop();
-
- } else {
-
- if (dist::rank() == proc()) {
- // this processor receives data
-
- comm_state::gcommbuf * b = state.recvbufs.front();
- state.recvbufs.pop();
-
- wtime_copyfrom_recvwaitinner_wait.start();
- if (use_waitall) {
- if (! state.requests.empty()) {
- // wait for all requests at once
- MPI_Waitall (state.requests.size(), &state.requests.front(),
- MPI_STATUSES_IGNORE);
- state.requests.clear();
- }
- }
-
- if (! use_waitall) {
- MPI_Wait (&b->request, MPI_STATUS_IGNORE);
- }
- wtime_copyfrom_recvwaitinner_wait.stop();
-
- wtime_copyfrom_recvwaitinner_copy.start();
- assert (_has_storage);
- assert (dist::rank() == proc());
- gdata * tmp = make_typed (varindex, transport_operator, tag);
- tmp->allocate (box, proc(), b->pointer());
- copy_from_innerloop (tmp, box);
- delete tmp;
- wtime_copyfrom_recvwaitinner_copy.stop();
-
- wtime_copyfrom_recvwaitinner_delete.start();
- delete b;
- wtime_copyfrom_recvwaitinner_delete.stop();
-
- }
-
- if (dist::rank() == src->proc()) {
- // this processor sends data
-
- comm_state::gcommbuf * b = state.sendbufs.front();
- state.sendbufs.pop();
-
- wtime_copyfrom_sendwaitinner_wait.start();
- if (use_waitall) {
- if (! state.requests.empty()) {
- // wait for all requests at once
- MPI_Waitall (state.requests.size(), &state.requests.front(),
- MPI_STATUSES_IGNORE);
- state.requests.clear();
- }
- }
-
- if (! use_waitall) {
- MPI_Wait (&b->request, MPI_STATUS_IGNORE);
- }
- wtime_copyfrom_sendwaitinner_wait.stop();
-
- wtime_copyfrom_sendwaitinner_delete.start();
- delete b;
- wtime_copyfrom_sendwaitinner_delete.stop();
-
+
+ wtime_copyfrom_recvwaitinner_wait.start();
+ if (! state.requests.empty()) {
+ // wait for all requests at once
+ MPI_Waitall (state.requests.size(), &state.requests.front(),
+ MPI_STATUSES_IGNORE);
+ state.requests.clear();
+ }
+ wtime_copyfrom_recvwaitinner_wait.stop();
+
+ queue<comm_state::gcommbuf*>* const bufs =
+ dist::rank() == proc() ? &state.recvbufs : &state.sendbufs;
+ comm_state::gcommbuf* b = bufs->front();
+
+ // copy data out of receive buffer
+ if (bufs == &state.recvbufs) {
+ wtime_copyfrom_recvwaitinner_copy.start();
+ const ibbox& ext = extent();
+ ivect shape = ext.shape() / ext.stride();
+ ivect items = (box.upper() - box.lower()) / box.stride() + 1;
+ ivect offs = (box.lower() - ext.lower()) / ext.stride();
+ const char* recv_buffer = (const char*) b->pointer();
+ const int vartype = CCTK_VarTypeI(varindex);
+ const int vartypesize = CCTK_VarTypeSize(vartype);
+ assert(vartypesize >= 0);
+
+ for (int k = 0; k < items[2]; k++) {
+ for (int j = 0; j < items[1]; j++) {
+ int i = offs[0] + shape[0]*((j+offs[1]) + shape[1]*(k+offs[2]));
+ memcpy (((char*) storage()) + vartypesize*i,
+ recv_buffer,
+ vartypesize*items[0]);
+ recv_buffer += vartypesize*items[0];
}
-
}
-
+ wtime_copyfrom_recvwaitinner_copy.stop();
}
-
+
+ wtime_copyfrom_recvwaitinner_delete.start();
+ bufs->pop();
+ delete b;
+ wtime_copyfrom_recvwaitinner_delete.stop();
+
wtime_copyfrom_wait.stop();
}
// Copy processor-local source data into communication send buffer
// of the corresponding destination processor
-// The case when both source and destination are local is also handled here.
void gdata::copy_into_sendbuffer (comm_state& state,
const gdata* src, const ibbox& box)
{
- if (proc() == src->proc()) {
- // copy on same processor
- copy_from_innerloop (src, box);
- } else {
- // copy to remote processor
- assert (src->_has_storage);
- assert (proc() < state.collbufs.size());
- int fillstate = (state.collbufs[proc()].sendbuf +
- box.size()*state.vartypesize) -
- state.collbufs[proc()].sendbufbase;
- assert (fillstate <= state.collbufs[proc()].sendbufsize*state.vartypesize);
-
- // copy this processor's data into the send buffer
- const ibbox& ext = src->extent();
- ivect shape = ext.shape() / ext.stride();
- ivect items = (box.upper() - box.lower()) / box.stride() + 1;
- ivect offs = (box.lower() - ext.lower()) / ext.stride();
+ // copy to remote processor
+ assert (src->has_storage());
+ assert (proc() < state.collbufs.size());
+ int fillstate = (state.collbufs[proc()].sendbuf +
+ box.size()*state.vartypesize) -
+ state.collbufs[proc()].sendbufbase;
+ assert (fillstate <= state.collbufs[proc()].sendbufsize*state.vartypesize);
+
+ // copy this processor's data into the send buffer
+ const ibbox& ext = src->extent();
+ ivect shape = ext.shape() / ext.stride();
+ ivect items = (box.upper() - box.lower()) / box.stride() + 1;
+ ivect offs = (box.lower() - ext.lower()) / ext.stride();
- for (int k = 0; k < items[2]; k++) {
- for (int j = 0; j < items[1]; j++) {
- int i = offs[0] + shape[0]*((j+offs[1]) + shape[1]*(k+offs[2]));
- memcpy (state.collbufs[proc()].sendbuf,
- ((const char*) src->storage()) + state.vartypesize*i,
- state.vartypesize*items[0]);
- state.collbufs[proc()].sendbuf += state.vartypesize*items[0];
- }
+ for (int k = 0; k < items[2]; k++) {
+ for (int j = 0; j < items[1]; j++) {
+ int i = offs[0] + shape[0]*((j+offs[1]) + shape[1]*(k+offs[2]));
+ memcpy (state.collbufs[proc()].sendbuf,
+ ((const char*) src->storage()) + state.vartypesize*i,
+ state.vartypesize*items[0]);
+ state.collbufs[proc()].sendbuf += state.vartypesize*items[0];
}
+ }
- // post the send if the buffer is full
- if (fillstate == state.collbufs[proc()].sendbufsize*state.vartypesize) {
- MPI_Isend (state.collbufs[proc()].sendbufbase,
- state.collbufs[proc()].sendbufsize,
- state.datatype, proc(), 0, dist::comm,
- &state.srequests[proc()]);
- }
+ // post the send if the buffer is full
+ if (fillstate == state.collbufs[proc()].sendbufsize*state.vartypesize) {
+ MPI_Isend (state.collbufs[proc()].sendbufbase,
+ state.collbufs[proc()].sendbufsize,
+ state.datatype, proc(), 0, dist::comm,
+ &state.srequests[proc()]);
}
}
@@ -479,8 +341,6 @@ void gdata
const int order_space,
const int order_time)
{
- DECLARE_CCTK_PARAMETERS;
-
assert (transport_operator != op_error);
if (transport_operator == op_none) return;
@@ -495,38 +355,34 @@ void gdata
assert (all(box.lower()>=srcs.at(t)->extent().lower()));
assert (all(box.upper()<=srcs.at(t)->extent().upper()));
}
-
assert (! box.empty());
- if (box.empty()) return;
-
+ const gdata* src = srcs.at(0);
+ if (dist::rank() != proc() && dist::rank() != src->proc()) return;
+
switch (state.thestate) {
- case state_recv:
- if (combine_recv_send) {
- interpolate_from_recv (state, srcs, times, box, time, order_space, order_time);
- interpolate_from_send (state, srcs, times, box, time, order_space, order_time);
- } else {
- interpolate_from_recv (state, srcs, times, box, time, order_space, order_time);
- }
- break;
- case state_send:
- if (combine_recv_send) {
- // do nothing
+ case state_post:
+ if (proc() == src->proc()) {
+ interpolate_from_innerloop(srcs, times, box, time,
+ order_space, order_time);
} else {
- interpolate_from_send (state, srcs, times, box, time, order_space, order_time);
+ interpolate_from_post(state, srcs, times, box, time,
+ order_space, order_time);
}
break;
case state_wait:
- interpolate_from_wait (state, srcs, times, box, time, order_space, order_time);
+ if (proc() != src->proc()) {
+ copy_from_wait (state, src, box);
+ }
break;
case state_get_buffer_sizes:
// don't count processor-local interpolations
- if (proc() != srcs.at(0)->proc()) {
+ if (proc() != src->proc()) {
// if this is a destination processor: increment its recv buffer size
if (proc() == dist::rank()) {
- state.collbufs.at(srcs.at(0)->proc()).recvbufsize += box.size();
+ state.collbufs.at(src->proc()).recvbufsize += box.size();
}
// if this is a source processor: increment its send buffer size
- if (srcs.at(0)->proc() == dist::rank()) {
+ if (src->proc() == dist::rank()) {
state.collbufs.at(proc()).sendbufsize += box.size();
}
}
@@ -534,242 +390,72 @@ void gdata
case state_fill_send_buffers:
// if this is a source processor: interpolate its data into the send buffer
// (the processor-local case is also handled here)
- if (srcs.at(0)->proc() == dist::rank()) {
- interpolate_into_sendbuffer (state, srcs, times, box, time,
+ if (src->proc() == dist::rank()) {
+ if (proc() == src->proc()) {
+ interpolate_from_innerloop(srcs, times, box, time,
order_space, order_time);
+ } else {
+ interpolate_into_sendbuffer(state, srcs, times, box,
+ time, order_space, order_time);
+ }
}
break;
case state_empty_recv_buffers:
// if this is a destination processor and data has already been received
// from the source processor: copy it from the recv buffer
// (the processor-local case is not handled here)
- if (proc() == dist::rank() && state.recvbuffers_ready.at(srcs.at(0)->proc())) {
- copy_from_recvbuffer (state, srcs.at(0), box);
+ if (proc() == dist::rank() && state.recvbuffers_ready.at(src->proc())) {
+ copy_from_recvbuffer(state, src, box);
}
break;
default:
- assert(0);
- }
-}
-
-
-
-void gdata
-::interpolate_from_nocomm (const vector<const gdata*> srcs,
- const vector<CCTK_REAL> times,
- const ibbox& box, const CCTK_REAL time,
- const int order_space,
- const int order_time)
-{
- assert (proc() == srcs.at(0)->proc());
-
- assert (transport_operator != op_error);
- assert (transport_operator != op_none);
-
- // interpolate on same processor
- if (dist::rank() == proc()) {
- interpolate_from_innerloop
- (srcs, times, box, time, order_space, order_time);
+ assert(0 && "invalid state");
}
}
-
void gdata
-::interpolate_from_recv (comm_state& state,
+::interpolate_from_post (comm_state& state,
const vector<const gdata*> srcs,
const vector<CCTK_REAL> times,
- const ibbox& box, const CCTK_REAL time,
+ const ibbox& box,
+ const CCTK_REAL time,
const int order_space,
const int order_time)
{
- DECLARE_CCTK_PARAMETERS;
-
- if (proc() == srcs.at(0)->proc()) {
- // interpolate on same processor
-
- } else {
+ const gdata* src = srcs.at(0);
+ if (dist::rank() == proc()) {
// interpolate from other processor
-
- if (! use_lightweight_buffers) {
-
- gdata* const tmp = make_typed(varindex, transport_operator);
- state.tmps1.push (tmp);
- tmp->allocate (box, srcs.at(0)->proc());
- tmp->change_processor_recv (state, proc());
-
- } else {
-
- if (dist::rank() == proc()) {
- // this processor receives data
-
- comm_state::gcommbuf * b = make_typed_commbuf (box);
-
- MPI_Irecv (b->pointer(), b->size(), b->datatype(), srcs.at(0)->proc(),
- tag, dist::comm, &b->request);
- if (use_waitall) {
- state.requests.push_back (b->request);
- }
- state.recvbufs.push (b);
-
- }
-
- }
-
- }
-}
+ // this processor receives data
+ comm_state::gcommbuf * b = make_typed_commbuf (box);
-void gdata
-::interpolate_from_send (comm_state& state,
- const vector<const gdata*> srcs,
- const vector<CCTK_REAL> times,
- const ibbox& box, const CCTK_REAL time,
- const int order_space,
- const int order_time)
-{
- DECLARE_CCTK_PARAMETERS;
-
- if (proc() == srcs.at(0)->proc()) {
- // interpolate on same processor
-
- interpolate_from_nocomm (srcs, times, box, time, order_space, order_time);
-
+ MPI_Irecv (b->pointer(), b->size(), b->datatype(), src->proc(),
+ tag, dist::comm, &b->request);
+ state.requests.push_back (b->request);
+ state.recvbufs.push (b);
} else {
- // interpolate from other processor
-
- if (! use_lightweight_buffers) {
-
- gdata* const tmp = state.tmps1.front();
- state.tmps1.pop();
- state.tmps2.push (tmp);
- assert (tmp);
- tmp->interpolate_from_nocomm
- (srcs, times, box, time, order_space, order_time);
- tmp->change_processor_send (state, proc());
-
- } else {
-
- if (dist::rank() == srcs.at(0)->proc()) {
- // this processor sends data
-
- comm_state::gcommbuf * b = srcs.at(0)->make_typed_commbuf (box);
-
- assert (srcs.at(0)->_has_storage);
- assert (dist::rank() == srcs.at(0)->proc());
- gdata * tmp
- = srcs.at(0)->make_typed (varindex, transport_operator, tag);
- tmp->allocate (box, srcs.at(0)->proc(), b->pointer());
- tmp->interpolate_from_innerloop
- (srcs, times, box, time, order_space, order_time);
- delete tmp;
-
- MPI_Isend (b->pointer(), b->size(), b->datatype(), proc(),
- tag, dist::comm, &b->request);
- if (use_waitall) {
- state.requests.push_back (b->request);
- }
- state.sendbufs.push (b);
-
- }
-
- }
-
- }
-}
+ // this processor sends data
+ comm_state::gcommbuf * b = src->make_typed_commbuf (box);
+ gdata * tmp = src->make_typed (varindex, transport_operator, tag);
+ tmp->allocate (box, src->proc(), b->pointer());
+ tmp->interpolate_from_innerloop (srcs, times, box, time,
+ order_space, order_time);
+ delete tmp;
-void gdata
-::interpolate_from_wait (comm_state& state,
- const vector<const gdata*> srcs,
- const vector<CCTK_REAL> times,
- const ibbox& box, const CCTK_REAL time,
- const int order_space,
- const int order_time)
-{
- DECLARE_CCTK_PARAMETERS;
-
- if (proc() == srcs.at(0)->proc()) {
- // interpolate on same processor
-
- } else {
- // interpolate from other processor
-
- if (! use_lightweight_buffers) {
-
- gdata* const tmp = state.tmps2.front();
- state.tmps2.pop();
- assert (tmp);
- tmp->change_processor_wait (state, proc());
- copy_from_nocomm (tmp, box);
- delete tmp;
-
- } else {
-
- if (dist::rank() == proc()) {
- // this processor receives data
-
- comm_state::gcommbuf * b = state.recvbufs.front();
- state.recvbufs.pop();
-
- if (use_waitall) {
- if (! state.requests.empty()) {
- // wait for all requests at once
- MPI_Waitall (state.requests.size(), &state.requests.front(),
- MPI_STATUSES_IGNORE);
- state.requests.clear();
- }
- }
-
- if (! use_waitall) {
- MPI_Wait (&b->request, MPI_STATUS_IGNORE);
- }
-
- assert (_has_storage);
- assert (dist::rank() == proc());
- gdata * tmp = make_typed (varindex, transport_operator, tag);
- tmp->allocate (box, proc(), b->pointer());
- copy_from_innerloop (tmp, box);
- delete tmp;
-
- delete b;
-
- }
-
- if (dist::rank() == srcs.at(0)->proc()) {
- // this processor sends data
-
- comm_state::gcommbuf * b = state.sendbufs.front();
- state.sendbufs.pop();
-
- if (use_waitall) {
- if (! state.requests.empty()) {
- // wait for all requests at once
- MPI_Waitall (state.requests.size(), &state.requests.front(),
- MPI_STATUSES_IGNORE);
- state.requests.clear();
- }
- }
-
- if (! use_waitall) {
- MPI_Wait (&b->request, MPI_STATUS_IGNORE);
- }
-
- delete b;
-
- }
-
- }
-
+ MPI_Isend (b->pointer(), b->size(), b->datatype(), proc(),
+ tag, dist::comm, &b->request);
+ state.requests.push_back (b->request);
+ state.sendbufs.push (b);
}
}
// Interpolate processor-local source data into communication send buffer
// of the corresponding destination processor
-// The case when both source and destination are local is also handled here.
void gdata
::interpolate_into_sendbuffer (comm_state& state,
const vector<const gdata*> srcs,
@@ -779,35 +465,30 @@ void gdata
const int order_space,
const int order_time)
{
- if (proc() == srcs.at(0)->proc()) {
- // interpolate on same processor
- interpolate_from_innerloop (srcs, times, box, time,
- order_space, order_time);
- } else {
- // interpolate to remote processor
- assert (srcs.at(0)->_has_storage);
- assert (proc() < state.collbufs.size());
- int fillstate = (state.collbufs[proc()].sendbuf +
- box.size()*state.vartypesize) -
- state.collbufs[proc()].sendbufbase;
- assert (fillstate <= state.collbufs[proc()].sendbufsize*state.vartypesize);
-
- // interpolate this processor's data into the send buffer
- gdata* tmp = srcs.at(0)->make_typed (varindex, transport_operator, tag);
- tmp->allocate (box, srcs.at(0)->proc(), state.collbufs[proc()].sendbuf);
- tmp->interpolate_from_innerloop (srcs, times, box, time,
- order_space, order_time);
- delete tmp;
+ // interpolate to remote processor
+ const gdata* src = srcs.at(0);
+ assert (src->has_storage());
+ assert (proc() < state.collbufs.size());
+ int fillstate = (state.collbufs[proc()].sendbuf +
+ box.size()*state.vartypesize) -
+ state.collbufs[proc()].sendbufbase;
+ assert (fillstate <= state.collbufs[proc()].sendbufsize*state.vartypesize);
+
+ // interpolate this processor's data into the send buffer
+ gdata* tmp = src->make_typed (varindex, transport_operator, tag);
+ tmp->allocate (box, src->proc(), state.collbufs[proc()].sendbuf);
+ tmp->interpolate_from_innerloop (srcs, times, box, time,
+ order_space, order_time);
+ delete tmp;
- // advance send buffer to point to the next ibbox slot
- state.collbufs[proc()].sendbuf += state.vartypesize * box.size();
+ // advance send buffer to point to the next ibbox slot
+ state.collbufs[proc()].sendbuf += state.vartypesize * box.size();
- // post the send if the buffer is full
- if (fillstate == state.collbufs[proc()].sendbufsize*state.vartypesize) {
- MPI_Isend (state.collbufs[proc()].sendbufbase,
- state.collbufs[proc()].sendbufsize,
- state.datatype, proc(), 0, dist::comm,
- &state.srequests[proc()]);
- }
+ // post the send if the buffer is full
+ if (fillstate == state.collbufs[proc()].sendbufsize*state.vartypesize) {
+ MPI_Isend (state.collbufs[proc()].sendbufbase,
+ state.collbufs[proc()].sendbufsize,
+ state.datatype, proc(), 0, dist::comm,
+ &state.srequests[proc()]);
}
}