aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Carpet/Carpet/src/CarpetStartup.cc15
-rw-r--r--Carpet/CarpetLib/param.ccl4
-rw-r--r--Carpet/CarpetLib/src/commstate.cc93
-rw-r--r--Carpet/CarpetLib/src/commstate.hh13
-rw-r--r--Carpet/CarpetLib/src/gdata.cc749
-rw-r--r--Carpet/CarpetLib/src/gdata.hh89
6 files changed, 289 insertions, 674 deletions
diff --git a/Carpet/Carpet/src/CarpetStartup.cc b/Carpet/Carpet/src/CarpetStartup.cc
index d6e459845..5f2aa01ca 100644
--- a/Carpet/Carpet/src/CarpetStartup.cc
+++ b/Carpet/Carpet/src/CarpetStartup.cc
@@ -38,6 +38,7 @@ namespace Carpet {
CCTK_OverloadQueryGroupStorageB (QueryGroupStorageB);
CCTK_OverloadGroupDynamicData (GroupDynamicData);
+ // print warnings if the user set deprecated parameters in her/his parfile
if (CCTK_ParameterQueryTimesSet ("minimise_outstanding_communications",
"CarpetLib") > 0) {
CCTK_WARN (CCTK_WARN_COMPLAIN,
@@ -46,6 +47,20 @@ namespace Carpet {
" anymore. Use 'CarpetLib::use_collective_communication_buffers'"
" instead.");
}
+ if (CCTK_ParameterQueryTimesSet ("combine_recv_send",
+ "CarpetLib") > 0) {
+ CCTK_WARN (CCTK_WARN_COMPLAIN,
+ "You set the parameter 'CarpetLib::combine_recv_send'"
+ " in your parfile. This parameter is deprecated and should not be used"
+ " anymore.");
+ }
+ if (CCTK_ParameterQueryTimesSet ("use_lightweight_buffers",
+ "CarpetLib") > 0) {
+ CCTK_WARN (CCTK_WARN_COMPLAIN,
+ "You set the parameter 'CarpetLib::use_lightweight_buffers'"
+ " in your parfile. This parameter is deprecated and should not be used"
+ " anymore.");
+ }
}
} // namespace Carpet
diff --git a/Carpet/CarpetLib/param.ccl b/Carpet/CarpetLib/param.ccl
index 4cd753401..6966e1282 100644
--- a/Carpet/CarpetLib/param.ccl
+++ b/Carpet/CarpetLib/param.ccl
@@ -37,11 +37,11 @@ BOOLEAN use_waitall "Use MPI_Waitall instead individual MPI_Wait/MPI_Waitsome st
{
} "yes"
-BOOLEAN combine_recv_send "Combine MPI_Irecv and MPI_Isend calls"
+BOOLEAN combine_recv_send "Combine MPI_Irecv and MPI_Isend calls -- DEPRECATED - DO NOT USE ANYMORE"
{
} "no"
-BOOLEAN use_lightweight_buffers "Use lightweight communication buffers instead of data objects"
+BOOLEAN use_lightweight_buffers "Use lightweight communication buffers instead of data objects -- DEPRECATED - DO NOT USE ANYMORE"
{
} "no"
diff --git a/Carpet/CarpetLib/src/commstate.cc b/Carpet/CarpetLib/src/commstate.cc
index 887a9bcd7..be3e485a2 100644
--- a/Carpet/CarpetLib/src/commstate.cc
+++ b/Carpet/CarpetLib/src/commstate.cc
@@ -21,10 +21,9 @@ comm_state::comm_state (int vartype_) : vartype(vartype_)
// This path needs the size of the vartype, the corresponding MPI datatype
// and the collective communication buffers initialized also.
//
- // If this comm state is created with a negative vartype then it will be
- // for single-component communications and set up to step through
- // state_recv
- // state_send
+ // If this comm state is created with a negative vartype then individual
+ // communications on single components are used by stepping through
+ // state_post
// state_wait
DECLARE_CCTK_PARAMETERS;
@@ -42,7 +41,7 @@ comm_state::comm_state (int vartype_) : vartype(vartype_)
case N: { T dummy; datatype = dist::datatype(dummy); } break;
#include "carpet_typecase.hh"
#undef TYPECASE
- default: assert (0);
+ default: assert (0 && "invalid datatype");
}
collbufs.resize (dist::size());
rrequests.resize (dist::size(), MPI_REQUEST_NULL);
@@ -50,49 +49,15 @@ comm_state::comm_state (int vartype_) : vartype(vartype_)
recvbuffers_ready.resize (dist::size());
} else {
- thestate = state_recv;
+ thestate = state_post;
}
}
void comm_state::step ()
{
- DECLARE_CCTK_PARAMETERS;
-
- assert (thestate!=state_done);
- if (! uses_collective_communication_buffers && combine_recv_send) {
- switch (thestate) {
- case state_recv:
- assert (tmps1.empty());
- thestate = state_wait;
- break;
- case state_send:
- assert (0);
- case state_wait:
- assert (tmps1.empty());
- assert (tmps2.empty());
- thestate = state_done;
- break;
- case state_done:
- assert (0);
- default:
- assert (0);
- }
- } else {
+ assert (thestate != state_done);
+ if (uses_collective_communication_buffers) {
switch (thestate) {
- case state_recv:
- assert (tmps2.empty());
- thestate = state_send;
- break;
- case state_send:
- assert (tmps1.empty());
- thestate = state_wait;
- break;
- case state_wait:
- assert (tmps1.empty());
- assert (tmps2.empty());
- thestate = state_done;
- break;
-
case state_get_buffer_sizes:
// The sizes of the collective communication buffers are known
// so now allocate them.
@@ -127,7 +92,7 @@ void comm_state::step ()
case state_empty_recv_buffers:
// Finish (at least one of) the posted communications
- if (! AllPostedCommunicationsFinished (use_waitall)) {
+ if (! AllPostedCommunicationsFinished ()) {
// No state change if there are still outstanding communications;
// do another comm_state loop iteration.
} else {
@@ -140,38 +105,50 @@ void comm_state::step ()
}
break;
- case state_done:
- assert (0);
default:
- assert (0);
+ assert (0 && "invalid state");
+ }
+ } else {
+ switch (thestate) {
+ case state_post:
+ // After all sends/recvs have been posted in 'state_post'
+ // now wait for their completion in 'state_wait'.
+ thestate = state_wait;
+ break;
+ case state_wait:
+ thestate = state_done;
+ break;
+ default:
+ assert (0 && "invalid state");
}
}
}
bool comm_state::done ()
{
- return thestate==state_done;
+ return thestate == state_done;
}
comm_state::~comm_state ()
{
assert (thestate == state_done ||
thestate == uses_collective_communication_buffers ?
- state_get_buffer_sizes : state_recv);
- assert (tmps1.empty());
- assert (tmps2.empty());
+ state_get_buffer_sizes : state_post);
assert (requests.empty());
}
// wait for completion of posted collective buffer sends/receives
//
-// Depending on use_waitall, this function will wait for all at once (true)
-// or at least one (false) of the posted receive operations to finish.
+// Depending on the parameter CarpetLib::use_waitall, this function will wait
+// for all at once (true) or at least one (false) of the
+// posted receive operations to finish.
//
// It returns true if all posted communications have been completed.
-bool comm_state::AllPostedCommunicationsFinished (bool use_waitall)
+bool comm_state::AllPostedCommunicationsFinished ()
{
+ DECLARE_CCTK_PARAMETERS;
+
// check if all outstanding receives have been completed already
if (num_posted_recvs == num_completed_recvs) {
// finalize the outstanding sends in one go
@@ -215,16 +192,6 @@ bool comm_state::AllPostedCommunicationsFinished (bool use_waitall)
}
-comm_state::gcommbuf::gcommbuf ()
-{
-}
-
-comm_state::gcommbuf::~gcommbuf ()
-{
-}
-
-
-
template<typename T>
comm_state::commbuf<T>::commbuf (ibbox const & box)
{
diff --git a/Carpet/CarpetLib/src/commstate.hh b/Carpet/CarpetLib/src/commstate.hh
index 21c05320e..e120fa5e7 100644
--- a/Carpet/CarpetLib/src/commstate.hh
+++ b/Carpet/CarpetLib/src/commstate.hh
@@ -21,8 +21,8 @@ class gdata;
// Depending on how a comm state object was created,
// it will step through one of two state transitions (in the given order):
enum astate {
- // "recv -> send -> wait" are used for single-component communications
- state_recv, state_send, state_wait,
+ // these are used for communications on individual components
+ state_post, state_wait,
// these are used for collective communications
state_get_buffer_sizes, state_fill_send_buffers, state_empty_recv_buffers,
@@ -56,16 +56,13 @@ public:
// the following members are used for single-component communications
//////////////////////////////////////////////////////////////////////////
- // Lists of temporary data objects
- queue<gdata*> tmps1, tmps2;
-
// List of MPI requests for use_waitall
vector<MPI_Request> requests;
// Lists of communication buffers for use_lightweight_buffers
struct gcommbuf {
- gcommbuf ();
- virtual ~gcommbuf ();
+ gcommbuf () {};
+ virtual ~gcommbuf () {};
MPI_Request request;
virtual void const * pointer () const = 0;
virtual void * pointer () = 0;
@@ -135,7 +132,7 @@ private:
int num_completed_recvs;
// wait for completion of posted collective buffer sends/receives
- bool AllPostedCommunicationsFinished(bool use_waitall);
+ bool AllPostedCommunicationsFinished();
};
diff --git a/Carpet/CarpetLib/src/gdata.cc b/Carpet/CarpetLib/src/gdata.cc
index 31f62b38e..f5e3ac081 100644
--- a/Carpet/CarpetLib/src/gdata.cc
+++ b/Carpet/CarpetLib/src/gdata.cc
@@ -26,7 +26,7 @@ using namespace std;
static int nexttag ()
{
DECLARE_CCTK_PARAMETERS;
-
+
int const min_tag = 100;
static int last = 0;
++last;
@@ -66,29 +66,16 @@ void gdata::change_processor (comm_state& state,
const int newproc,
void* const mem)
{
- DECLARE_CCTK_PARAMETERS;
-
switch (state.thestate) {
- case state_recv:
- if (combine_recv_send) {
- change_processor_recv (state, newproc, mem);
- change_processor_send (state, newproc, mem);
- } else {
- change_processor_recv (state, newproc, mem);
- }
- break;
- case state_send:
- if (combine_recv_send) {
- // do nothing
- } else {
- change_processor_send (state, newproc, mem);
- }
+ case state_post:
+ change_processor_recv (state, newproc, mem);
+ change_processor_send (state, newproc, mem);
break;
case state_wait:
change_processor_wait (state, newproc, mem);
break;
default:
- assert(0);
+ assert(0 && "invalid state");
}
}
@@ -98,39 +85,34 @@ void gdata::change_processor (comm_state& state,
void gdata::copy_from (comm_state& state,
const gdata* src, const ibbox& box)
{
- DECLARE_CCTK_PARAMETERS;
-
assert (has_storage() && src->has_storage());
assert (all(box.lower()>=extent().lower()
- && box.lower()>=src->extent().lower()));
+ && box.lower()>=src->extent().lower()));
assert (all(box.upper()<=extent().upper()
- && box.upper()<=src->extent().upper()));
+ && box.upper()<=src->extent().upper()));
assert (all(box.stride()==extent().stride()
- && box.stride()==src->extent().stride()));
+ && box.stride()==src->extent().stride()));
assert (all((box.lower()-extent().lower())%box.stride() == 0
- && (box.lower()-src->extent().lower())%box.stride() == 0));
-
+ && (box.lower()-src->extent().lower())%box.stride() == 0));
+
if (box.empty()) return;
-
+ if (dist::rank() != proc() && dist::rank() != src->proc()) return;
+
switch (state.thestate) {
- case state_recv:
- if (combine_recv_send) {
- copy_from_recv (state, src, box);
- copy_from_send (state, src, box);
+ case state_post:
+ if (proc() == src->proc()) {
+ copy_from_innerloop (src, box);
} else {
- copy_from_recv (state, src, box);
- }
- break;
- case state_send:
- if (combine_recv_send) {
- // do nothing
- } else {
- copy_from_send (state, src, box);
+ copy_from_post (state, src, box);
}
break;
+
case state_wait:
- copy_from_wait (state, src, box);
+ if (proc() != src->proc()) {
+ copy_from_wait (state, src, box);
+ }
break;
+
case state_get_buffer_sizes:
// don't count processor-local copies
if (proc() != src->proc()) {
@@ -144,13 +126,19 @@ void gdata::copy_from (comm_state& state,
}
}
break;
+
case state_fill_send_buffers:
// if this is a source processor: copy its data into the send buffer
// (the processor-local case is also handled here)
if (src->proc() == dist::rank()) {
- copy_into_sendbuffer (state, src, box);
+ if (proc() == src->proc()) {
+ copy_from_innerloop (src, box);
+ } else {
+ copy_into_sendbuffer (state, src, box);
+ }
}
break;
+
case state_empty_recv_buffers:
// if this is a destination processor and data has already been received
// from the source processor: copy it from the recv buffer
@@ -158,286 +146,160 @@ void gdata::copy_from (comm_state& state,
copy_from_recvbuffer (state, src, box);
}
break;
+
default:
- assert(0);
+ assert(0 && "invalid state");
}
}
-
-void gdata::copy_from_nocomm (const gdata* src, const ibbox& box)
+void gdata::copy_from_post (comm_state& state,
+ const gdata* src, const ibbox& box)
{
- assert (has_storage() && src->has_storage());
- assert (proc() == src->proc());
-
- // copy on same processor
+ wtime_copyfrom_recv.start();
+
if (dist::rank() == proc()) {
- copy_from_innerloop (src, box);
- }
-}
+ // this processor receives data
+ wtime_copyfrom_recvinner_allocate.start();
+ comm_state::gcommbuf * b = make_typed_commbuf (box);
+ wtime_copyfrom_recvinner_allocate.stop();
+
+ wtime_copyfrom_recvinner_recv.start();
+ MPI_Irecv (b->pointer(), b->size(), b->datatype(), src->proc(),
+ tag, dist::comm, &b->request);
+ wtime_copyfrom_recvinner_recv.stop();
+ state.requests.push_back (b->request);
+ state.recvbufs.push (b);
-void gdata::copy_from_recv (comm_state& state,
- const gdata* src, const ibbox& box)
-{
- DECLARE_CCTK_PARAMETERS;
-
- wtime_copyfrom_recv.start();
-
- if (proc() == src->proc()) {
- // copy on same processor
-
} else {
- // copy to different processor
-
- if (! use_lightweight_buffers) {
-
- wtime_copyfrom_recv_maketyped.start();
- gdata* const tmp = make_typed(varindex, transport_operator);
- wtime_copyfrom_recv_maketyped.stop();
- state.tmps1.push (tmp);
- wtime_copyfrom_recv_allocate.start();
- tmp->allocate (box, src->proc());
- wtime_copyfrom_recv_allocate.stop();
- wtime_copyfrom_recv_changeproc_recv.start();
- tmp->change_processor_recv (state, proc());
- wtime_copyfrom_recv_changeproc_recv.stop();
-
- } else {
-
- if (dist::rank() == proc()) {
- // this processor receives data
-
- wtime_copyfrom_recvinner_allocate.start();
- comm_state::gcommbuf * b = make_typed_commbuf (box);
- wtime_copyfrom_recvinner_allocate.stop();
-
- wtime_copyfrom_recvinner_recv.start();
- MPI_Irecv (b->pointer(), b->size(), b->datatype(), src->proc(),
- tag, dist::comm, &b->request);
- wtime_copyfrom_recvinner_recv.stop();
- if (use_waitall) {
- state.requests.push_back (b->request);
- }
- state.recvbufs.push (b);
-
+ // this processor sends data
+
+ wtime_copyfrom_sendinner_allocate.start();
+ comm_state::gcommbuf * b = src->make_typed_commbuf (box);
+ wtime_copyfrom_sendinner_allocate.stop();
+
+ // copy data into send buffer
+ wtime_copyfrom_sendinner_copy.start();
+ const ibbox& ext = src->extent();
+ ivect shape = ext.shape() / ext.stride();
+ ivect items = (box.upper() - box.lower()) / box.stride() + 1;
+ ivect offs = (box.lower() - ext.lower()) / ext.stride();
+ char* send_buffer = (char*) b->pointer();
+ const int vartype = CCTK_VarTypeI(varindex);
+ const int vartypesize = CCTK_VarTypeSize(vartype);
+ assert(vartypesize >= 0);
+
+ for (int k = 0; k < items[2]; k++) {
+ for (int j = 0; j < items[1]; j++) {
+ int i = offs[0] + shape[0]*((j+offs[1]) + shape[1]*(k+offs[2]));
+ memcpy (send_buffer,
+ ((char*) src->storage()) + vartypesize*i,
+ vartypesize*items[0]);
+ send_buffer += vartypesize*items[0];
}
-
}
-
- }
-
- wtime_copyfrom_recv.stop();
-}
+ wtime_copyfrom_sendinner_copy.stop();
-void gdata::copy_from_send (comm_state& state,
- const gdata* src, const ibbox& box)
-{
- DECLARE_CCTK_PARAMETERS;
-
- wtime_copyfrom_send.start();
-
- if (proc() == src->proc()) {
- // copy on same processor
-
- wtime_copyfrom_send_copyfrom_nocomm1.start();
- copy_from_nocomm (src, box);
- wtime_copyfrom_send_copyfrom_nocomm1.stop();
-
- } else {
- // copy to different processor
-
- if (! use_lightweight_buffers) {
-
- gdata* const tmp = state.tmps1.front();
- state.tmps1.pop();
- state.tmps2.push (tmp);
- assert (tmp);
- wtime_copyfrom_send_copyfrom_nocomm2.start();
- tmp->copy_from_nocomm (src, box);
- wtime_copyfrom_send_copyfrom_nocomm2.stop();
- wtime_copyfrom_send_changeproc_send.start();
- tmp->change_processor_send (state, proc());
- wtime_copyfrom_send_changeproc_send.stop();
-
- } else {
-
- if (dist::rank() == src->proc()) {
- // this processor sends data
-
- wtime_copyfrom_sendinner_allocate.start();
- comm_state::gcommbuf * b = src->make_typed_commbuf (box);
- wtime_copyfrom_sendinner_allocate.stop();
-
- wtime_copyfrom_sendinner_copy.start();
- assert (src->_has_storage);
- assert (dist::rank() == src->proc());
- gdata * tmp = src->make_typed (varindex, transport_operator, tag);
- tmp->allocate (box, src->proc(), b->pointer());
- tmp->copy_from_innerloop (src, box);
- delete tmp;
- wtime_copyfrom_sendinner_copy.stop();
-
- wtime_copyfrom_sendinner_send.start();
- MPI_Isend (b->pointer(), b->size(), b->datatype(), proc(),
+ wtime_copyfrom_sendinner_send.start();
+ MPI_Isend (b->pointer(), b->size(), b->datatype(), proc(),
tag, dist::comm, &b->request);
- wtime_copyfrom_sendinner_send.stop();
- if (use_waitall) {
- state.requests.push_back (b->request);
- }
- state.sendbufs.push (b);
-
- }
- }
-
+ wtime_copyfrom_sendinner_send.stop();
+ state.requests.push_back (b->request);
+ state.sendbufs.push (b);
}
-
- wtime_copyfrom_send.stop();
+
+ wtime_copyfrom_recv.stop();
}
+
void gdata::copy_from_wait (comm_state& state,
const gdata* src, const ibbox& box)
{
- DECLARE_CCTK_PARAMETERS;
-
wtime_copyfrom_wait.start();
-
- if (proc() == src->proc()) {
- // copy on same processor
-
- } else {
- // copy to different processor
-
- if (! use_lightweight_buffers) {
-
- gdata* const tmp = state.tmps2.front();
- state.tmps2.pop();
- assert (tmp);
- wtime_copyfrom_wait_changeproc_wait.start();
- tmp->change_processor_wait (state, proc());
- wtime_copyfrom_wait_changeproc_wait.stop();
- wtime_copyfrom_wait_copyfrom_nocomm.start();
- copy_from_nocomm (tmp, box);
- wtime_copyfrom_wait_copyfrom_nocomm.stop();
- wtime_copyfrom_wait_delete.start();
- delete tmp;
- wtime_copyfrom_wait_delete.stop();
-
- } else {
-
- if (dist::rank() == proc()) {
- // this processor receives data
-
- comm_state::gcommbuf * b = state.recvbufs.front();
- state.recvbufs.pop();
-
- wtime_copyfrom_recvwaitinner_wait.start();
- if (use_waitall) {
- if (! state.requests.empty()) {
- // wait for all requests at once
- MPI_Waitall (state.requests.size(), &state.requests.front(),
- MPI_STATUSES_IGNORE);
- state.requests.clear();
- }
- }
-
- if (! use_waitall) {
- MPI_Wait (&b->request, MPI_STATUS_IGNORE);
- }
- wtime_copyfrom_recvwaitinner_wait.stop();
-
- wtime_copyfrom_recvwaitinner_copy.start();
- assert (_has_storage);
- assert (dist::rank() == proc());
- gdata * tmp = make_typed (varindex, transport_operator, tag);
- tmp->allocate (box, proc(), b->pointer());
- copy_from_innerloop (tmp, box);
- delete tmp;
- wtime_copyfrom_recvwaitinner_copy.stop();
-
- wtime_copyfrom_recvwaitinner_delete.start();
- delete b;
- wtime_copyfrom_recvwaitinner_delete.stop();
-
- }
-
- if (dist::rank() == src->proc()) {
- // this processor sends data
-
- comm_state::gcommbuf * b = state.sendbufs.front();
- state.sendbufs.pop();
-
- wtime_copyfrom_sendwaitinner_wait.start();
- if (use_waitall) {
- if (! state.requests.empty()) {
- // wait for all requests at once
- MPI_Waitall (state.requests.size(), &state.requests.front(),
- MPI_STATUSES_IGNORE);
- state.requests.clear();
- }
- }
-
- if (! use_waitall) {
- MPI_Wait (&b->request, MPI_STATUS_IGNORE);
- }
- wtime_copyfrom_sendwaitinner_wait.stop();
-
- wtime_copyfrom_sendwaitinner_delete.start();
- delete b;
- wtime_copyfrom_sendwaitinner_delete.stop();
-
+
+ wtime_copyfrom_recvwaitinner_wait.start();
+ if (! state.requests.empty()) {
+ // wait for all requests at once
+ MPI_Waitall (state.requests.size(), &state.requests.front(),
+ MPI_STATUSES_IGNORE);
+ state.requests.clear();
+ }
+ wtime_copyfrom_recvwaitinner_wait.stop();
+
+ queue<comm_state::gcommbuf*>* const bufs =
+ dist::rank() == proc() ? &state.recvbufs : &state.sendbufs;
+ comm_state::gcommbuf* b = bufs->front();
+
+ // copy data out of receive buffer
+ if (bufs == &state.recvbufs) {
+ wtime_copyfrom_recvwaitinner_copy.start();
+ const ibbox& ext = extent();
+ ivect shape = ext.shape() / ext.stride();
+ ivect items = (box.upper() - box.lower()) / box.stride() + 1;
+ ivect offs = (box.lower() - ext.lower()) / ext.stride();
+ const char* recv_buffer = (const char*) b->pointer();
+ const int vartype = CCTK_VarTypeI(varindex);
+ const int vartypesize = CCTK_VarTypeSize(vartype);
+ assert(vartypesize >= 0);
+
+ for (int k = 0; k < items[2]; k++) {
+ for (int j = 0; j < items[1]; j++) {
+ int i = offs[0] + shape[0]*((j+offs[1]) + shape[1]*(k+offs[2]));
+ memcpy (((char*) storage()) + vartypesize*i,
+ recv_buffer,
+ vartypesize*items[0]);
+ recv_buffer += vartypesize*items[0];
}
-
}
-
+ wtime_copyfrom_recvwaitinner_copy.stop();
}
-
+
+ wtime_copyfrom_recvwaitinner_delete.start();
+ bufs->pop();
+ delete b;
+ wtime_copyfrom_recvwaitinner_delete.stop();
+
wtime_copyfrom_wait.stop();
}
// Copy processor-local source data into communication send buffer
// of the corresponding destination processor
-// The case when both source and destination are local is also handled here.
void gdata::copy_into_sendbuffer (comm_state& state,
const gdata* src, const ibbox& box)
{
- if (proc() == src->proc()) {
- // copy on same processor
- copy_from_innerloop (src, box);
- } else {
- // copy to remote processor
- assert (src->_has_storage);
- assert (proc() < state.collbufs.size());
- int fillstate = (state.collbufs[proc()].sendbuf +
- box.size()*state.vartypesize) -
- state.collbufs[proc()].sendbufbase;
- assert (fillstate <= state.collbufs[proc()].sendbufsize*state.vartypesize);
-
- // copy this processor's data into the send buffer
- const ibbox& ext = src->extent();
- ivect shape = ext.shape() / ext.stride();
- ivect items = (box.upper() - box.lower()) / box.stride() + 1;
- ivect offs = (box.lower() - ext.lower()) / ext.stride();
+ // copy to remote processor
+ assert (src->has_storage());
+ assert (proc() < state.collbufs.size());
+ int fillstate = (state.collbufs[proc()].sendbuf +
+ box.size()*state.vartypesize) -
+ state.collbufs[proc()].sendbufbase;
+ assert (fillstate <= state.collbufs[proc()].sendbufsize*state.vartypesize);
+
+ // copy this processor's data into the send buffer
+ const ibbox& ext = src->extent();
+ ivect shape = ext.shape() / ext.stride();
+ ivect items = (box.upper() - box.lower()) / box.stride() + 1;
+ ivect offs = (box.lower() - ext.lower()) / ext.stride();
- for (int k = 0; k < items[2]; k++) {
- for (int j = 0; j < items[1]; j++) {
- int i = offs[0] + shape[0]*((j+offs[1]) + shape[1]*(k+offs[2]));
- memcpy (state.collbufs[proc()].sendbuf,
- ((const char*) src->storage()) + state.vartypesize*i,
- state.vartypesize*items[0]);
- state.collbufs[proc()].sendbuf += state.vartypesize*items[0];
- }
+ for (int k = 0; k < items[2]; k++) {
+ for (int j = 0; j < items[1]; j++) {
+ int i = offs[0] + shape[0]*((j+offs[1]) + shape[1]*(k+offs[2]));
+ memcpy (state.collbufs[proc()].sendbuf,
+ ((const char*) src->storage()) + state.vartypesize*i,
+ state.vartypesize*items[0]);
+ state.collbufs[proc()].sendbuf += state.vartypesize*items[0];
}
+ }
- // post the send if the buffer is full
- if (fillstate == state.collbufs[proc()].sendbufsize*state.vartypesize) {
- MPI_Isend (state.collbufs[proc()].sendbufbase,
- state.collbufs[proc()].sendbufsize,
- state.datatype, proc(), 0, dist::comm,
- &state.srequests[proc()]);
- }
+ // post the send if the buffer is full
+ if (fillstate == state.collbufs[proc()].sendbufsize*state.vartypesize) {
+ MPI_Isend (state.collbufs[proc()].sendbufbase,
+ state.collbufs[proc()].sendbufsize,
+ state.datatype, proc(), 0, dist::comm,
+ &state.srequests[proc()]);
}
}
@@ -479,8 +341,6 @@ void gdata
const int order_space,
const int order_time)
{
- DECLARE_CCTK_PARAMETERS;
-
assert (transport_operator != op_error);
if (transport_operator == op_none) return;
@@ -495,38 +355,34 @@ void gdata
assert (all(box.lower()>=srcs.at(t)->extent().lower()));
assert (all(box.upper()<=srcs.at(t)->extent().upper()));
}
-
assert (! box.empty());
- if (box.empty()) return;
-
+ const gdata* src = srcs.at(0);
+ if (dist::rank() != proc() && dist::rank() != src->proc()) return;
+
switch (state.thestate) {
- case state_recv:
- if (combine_recv_send) {
- interpolate_from_recv (state, srcs, times, box, time, order_space, order_time);
- interpolate_from_send (state, srcs, times, box, time, order_space, order_time);
- } else {
- interpolate_from_recv (state, srcs, times, box, time, order_space, order_time);
- }
- break;
- case state_send:
- if (combine_recv_send) {
- // do nothing
+ case state_post:
+ if (proc() == src->proc()) {
+ interpolate_from_innerloop(srcs, times, box, time,
+ order_space, order_time);
} else {
- interpolate_from_send (state, srcs, times, box, time, order_space, order_time);
+ interpolate_from_post(state, srcs, times, box, time,
+ order_space, order_time);
}
break;
case state_wait:
- interpolate_from_wait (state, srcs, times, box, time, order_space, order_time);
+ if (proc() != src->proc()) {
+ copy_from_wait (state, src, box);
+ }
break;
case state_get_buffer_sizes:
// don't count processor-local interpolations
- if (proc() != srcs.at(0)->proc()) {
+ if (proc() != src->proc()) {
// if this is a destination processor: increment its recv buffer size
if (proc() == dist::rank()) {
- state.collbufs.at(srcs.at(0)->proc()).recvbufsize += box.size();
+ state.collbufs.at(src->proc()).recvbufsize += box.size();
}
// if this is a source processor: increment its send buffer size
- if (srcs.at(0)->proc() == dist::rank()) {
+ if (src->proc() == dist::rank()) {
state.collbufs.at(proc()).sendbufsize += box.size();
}
}
@@ -534,242 +390,72 @@ void gdata
case state_fill_send_buffers:
// if this is a source processor: interpolate its data into the send buffer
// (the processor-local case is also handled here)
- if (srcs.at(0)->proc() == dist::rank()) {
- interpolate_into_sendbuffer (state, srcs, times, box, time,
+ if (src->proc() == dist::rank()) {
+ if (proc() == src->proc()) {
+ interpolate_from_innerloop(srcs, times, box, time,
order_space, order_time);
+ } else {
+ interpolate_into_sendbuffer(state, srcs, times, box,
+ time, order_space, order_time);
+ }
}
break;
case state_empty_recv_buffers:
// if this is a destination processor and data has already been received
// from the source processor: copy it from the recv buffer
// (the processor-local case is not handled here)
- if (proc() == dist::rank() && state.recvbuffers_ready.at(srcs.at(0)->proc())) {
- copy_from_recvbuffer (state, srcs.at(0), box);
+ if (proc() == dist::rank() && state.recvbuffers_ready.at(src->proc())) {
+ copy_from_recvbuffer(state, src, box);
}
break;
default:
- assert(0);
- }
-}
-
-
-
-void gdata
-::interpolate_from_nocomm (const vector<const gdata*> srcs,
- const vector<CCTK_REAL> times,
- const ibbox& box, const CCTK_REAL time,
- const int order_space,
- const int order_time)
-{
- assert (proc() == srcs.at(0)->proc());
-
- assert (transport_operator != op_error);
- assert (transport_operator != op_none);
-
- // interpolate on same processor
- if (dist::rank() == proc()) {
- interpolate_from_innerloop
- (srcs, times, box, time, order_space, order_time);
+ assert(0 && "invalid state");
}
}
-
void gdata
-::interpolate_from_recv (comm_state& state,
+::interpolate_from_post (comm_state& state,
const vector<const gdata*> srcs,
const vector<CCTK_REAL> times,
- const ibbox& box, const CCTK_REAL time,
+ const ibbox& box,
+ const CCTK_REAL time,
const int order_space,
const int order_time)
{
- DECLARE_CCTK_PARAMETERS;
-
- if (proc() == srcs.at(0)->proc()) {
- // interpolate on same processor
-
- } else {
+ const gdata* src = srcs.at(0);
+ if (dist::rank() == proc()) {
// interpolate from other processor
-
- if (! use_lightweight_buffers) {
-
- gdata* const tmp = make_typed(varindex, transport_operator);
- state.tmps1.push (tmp);
- tmp->allocate (box, srcs.at(0)->proc());
- tmp->change_processor_recv (state, proc());
-
- } else {
-
- if (dist::rank() == proc()) {
- // this processor receives data
-
- comm_state::gcommbuf * b = make_typed_commbuf (box);
-
- MPI_Irecv (b->pointer(), b->size(), b->datatype(), srcs.at(0)->proc(),
- tag, dist::comm, &b->request);
- if (use_waitall) {
- state.requests.push_back (b->request);
- }
- state.recvbufs.push (b);
-
- }
-
- }
-
- }
-}
+ // this processor receives data
+ comm_state::gcommbuf * b = make_typed_commbuf (box);
-void gdata
-::interpolate_from_send (comm_state& state,
- const vector<const gdata*> srcs,
- const vector<CCTK_REAL> times,
- const ibbox& box, const CCTK_REAL time,
- const int order_space,
- const int order_time)
-{
- DECLARE_CCTK_PARAMETERS;
-
- if (proc() == srcs.at(0)->proc()) {
- // interpolate on same processor
-
- interpolate_from_nocomm (srcs, times, box, time, order_space, order_time);
-
+ MPI_Irecv (b->pointer(), b->size(), b->datatype(), src->proc(),
+ tag, dist::comm, &b->request);
+ state.requests.push_back (b->request);
+ state.recvbufs.push (b);
} else {
- // interpolate from other processor
-
- if (! use_lightweight_buffers) {
-
- gdata* const tmp = state.tmps1.front();
- state.tmps1.pop();
- state.tmps2.push (tmp);
- assert (tmp);
- tmp->interpolate_from_nocomm
- (srcs, times, box, time, order_space, order_time);
- tmp->change_processor_send (state, proc());
-
- } else {
-
- if (dist::rank() == srcs.at(0)->proc()) {
- // this processor sends data
-
- comm_state::gcommbuf * b = srcs.at(0)->make_typed_commbuf (box);
-
- assert (srcs.at(0)->_has_storage);
- assert (dist::rank() == srcs.at(0)->proc());
- gdata * tmp
- = srcs.at(0)->make_typed (varindex, transport_operator, tag);
- tmp->allocate (box, srcs.at(0)->proc(), b->pointer());
- tmp->interpolate_from_innerloop
- (srcs, times, box, time, order_space, order_time);
- delete tmp;
-
- MPI_Isend (b->pointer(), b->size(), b->datatype(), proc(),
- tag, dist::comm, &b->request);
- if (use_waitall) {
- state.requests.push_back (b->request);
- }
- state.sendbufs.push (b);
-
- }
-
- }
-
- }
-}
+ // this processor sends data
+ comm_state::gcommbuf * b = src->make_typed_commbuf (box);
+ gdata * tmp = src->make_typed (varindex, transport_operator, tag);
+ tmp->allocate (box, src->proc(), b->pointer());
+ tmp->interpolate_from_innerloop (srcs, times, box, time,
+ order_space, order_time);
+ delete tmp;
-void gdata
-::interpolate_from_wait (comm_state& state,
- const vector<const gdata*> srcs,
- const vector<CCTK_REAL> times,
- const ibbox& box, const CCTK_REAL time,
- const int order_space,
- const int order_time)
-{
- DECLARE_CCTK_PARAMETERS;
-
- if (proc() == srcs.at(0)->proc()) {
- // interpolate on same processor
-
- } else {
- // interpolate from other processor
-
- if (! use_lightweight_buffers) {
-
- gdata* const tmp = state.tmps2.front();
- state.tmps2.pop();
- assert (tmp);
- tmp->change_processor_wait (state, proc());
- copy_from_nocomm (tmp, box);
- delete tmp;
-
- } else {
-
- if (dist::rank() == proc()) {
- // this processor receives data
-
- comm_state::gcommbuf * b = state.recvbufs.front();
- state.recvbufs.pop();
-
- if (use_waitall) {
- if (! state.requests.empty()) {
- // wait for all requests at once
- MPI_Waitall (state.requests.size(), &state.requests.front(),
- MPI_STATUSES_IGNORE);
- state.requests.clear();
- }
- }
-
- if (! use_waitall) {
- MPI_Wait (&b->request, MPI_STATUS_IGNORE);
- }
-
- assert (_has_storage);
- assert (dist::rank() == proc());
- gdata * tmp = make_typed (varindex, transport_operator, tag);
- tmp->allocate (box, proc(), b->pointer());
- copy_from_innerloop (tmp, box);
- delete tmp;
-
- delete b;
-
- }
-
- if (dist::rank() == srcs.at(0)->proc()) {
- // this processor sends data
-
- comm_state::gcommbuf * b = state.sendbufs.front();
- state.sendbufs.pop();
-
- if (use_waitall) {
- if (! state.requests.empty()) {
- // wait for all requests at once
- MPI_Waitall (state.requests.size(), &state.requests.front(),
- MPI_STATUSES_IGNORE);
- state.requests.clear();
- }
- }
-
- if (! use_waitall) {
- MPI_Wait (&b->request, MPI_STATUS_IGNORE);
- }
-
- delete b;
-
- }
-
- }
-
+ MPI_Isend (b->pointer(), b->size(), b->datatype(), proc(),
+ tag, dist::comm, &b->request);
+ state.requests.push_back (b->request);
+ state.sendbufs.push (b);
}
}
// Interpolate processor-local source data into communication send buffer
// of the corresponding destination processor
-// The case when both source and destination are local is also handled here.
void gdata
::interpolate_into_sendbuffer (comm_state& state,
const vector<const gdata*> srcs,
@@ -779,35 +465,30 @@ void gdata
const int order_space,
const int order_time)
{
- if (proc() == srcs.at(0)->proc()) {
- // interpolate on same processor
- interpolate_from_innerloop (srcs, times, box, time,
- order_space, order_time);
- } else {
- // interpolate to remote processor
- assert (srcs.at(0)->_has_storage);
- assert (proc() < state.collbufs.size());
- int fillstate = (state.collbufs[proc()].sendbuf +
- box.size()*state.vartypesize) -
- state.collbufs[proc()].sendbufbase;
- assert (fillstate <= state.collbufs[proc()].sendbufsize*state.vartypesize);
-
- // interpolate this processor's data into the send buffer
- gdata* tmp = srcs.at(0)->make_typed (varindex, transport_operator, tag);
- tmp->allocate (box, srcs.at(0)->proc(), state.collbufs[proc()].sendbuf);
- tmp->interpolate_from_innerloop (srcs, times, box, time,
- order_space, order_time);
- delete tmp;
+ // interpolate to remote processor
+ const gdata* src = srcs.at(0);
+ assert (src->has_storage());
+ assert (proc() < state.collbufs.size());
+ int fillstate = (state.collbufs[proc()].sendbuf +
+ box.size()*state.vartypesize) -
+ state.collbufs[proc()].sendbufbase;
+ assert (fillstate <= state.collbufs[proc()].sendbufsize*state.vartypesize);
+
+ // interpolate this processor's data into the send buffer
+ gdata* tmp = src->make_typed (varindex, transport_operator, tag);
+ tmp->allocate (box, src->proc(), state.collbufs[proc()].sendbuf);
+ tmp->interpolate_from_innerloop (srcs, times, box, time,
+ order_space, order_time);
+ delete tmp;
- // advance send buffer to point to the next ibbox slot
- state.collbufs[proc()].sendbuf += state.vartypesize * box.size();
+ // advance send buffer to point to the next ibbox slot
+ state.collbufs[proc()].sendbuf += state.vartypesize * box.size();
- // post the send if the buffer is full
- if (fillstate == state.collbufs[proc()].sendbufsize*state.vartypesize) {
- MPI_Isend (state.collbufs[proc()].sendbufbase,
- state.collbufs[proc()].sendbufsize,
- state.datatype, proc(), 0, dist::comm,
- &state.srequests[proc()]);
- }
+ // post the send if the buffer is full
+ if (fillstate == state.collbufs[proc()].sendbufsize*state.vartypesize) {
+ MPI_Isend (state.collbufs[proc()].sendbufbase,
+ state.collbufs[proc()].sendbufsize,
+ state.datatype, proc(), 0, dist::comm,
+ &state.srequests[proc()]);
}
}
diff --git a/Carpet/CarpetLib/src/gdata.hh b/Carpet/CarpetLib/src/gdata.hh
index 8750c4bc5..8f2b66035 100644
--- a/Carpet/CarpetLib/src/gdata.hh
+++ b/Carpet/CarpetLib/src/gdata.hh
@@ -141,14 +141,26 @@ private:
const = 0;
public:
- void copy_from (comm_state& state,
- const gdata* src, const ibbox& box);
+ void copy_from (comm_state& state,
+ const gdata* src, const ibbox& box);
+ void interpolate_from (comm_state& state,
+ const vector<const gdata*> srcs,
+ const vector<CCTK_REAL> times,
+ const ibbox& box,
+ const CCTK_REAL time,
+ const int order_space,
+ const int order_time);
+
private:
- void copy_from_nocomm (const gdata* src, const ibbox& box);
- void copy_from_recv (comm_state& state,
- const gdata* src, const ibbox& box);
- void copy_from_send (comm_state& state,
+ void copy_from_post (comm_state& state,
const gdata* src, const ibbox& box);
+ void interpolate_from_post (comm_state& state,
+ const vector<const gdata*> srcs,
+ const vector<CCTK_REAL> times,
+ const ibbox& box,
+ const CCTK_REAL time,
+ const int order_space,
+ const int order_time);
void copy_from_wait (comm_state& state,
const gdata* src, const ibbox& box);
@@ -161,63 +173,6 @@ private:
// of the corresponding source processor
void copy_from_recvbuffer (comm_state& state,
const gdata* src, const ibbox& box);
-
-#if 0
- protected:
- virtual void
- copy_from_recv_inner (comm_state& state,
- const gdata* src,
- const ibbox& box)
- = 0;
- virtual void
- copy_from_send_inner (comm_state& state,
- const gdata* src,
- const ibbox& box)
- = 0;
- virtual void
- copy_from_recv_wait_inner (comm_state& state,
- const gdata* src,
- const ibbox& box)
- = 0;
- virtual void
- copy_from_send_wait_inner (comm_state& state,
- const gdata* src,
- const ibbox& box)
- = 0;
-#endif
-
- public:
- void interpolate_from (comm_state& state,
- const vector<const gdata*> srcs,
- const vector<CCTK_REAL> times,
- const ibbox& box, const CCTK_REAL time,
- const int order_space,
- const int order_time);
- private:
- void interpolate_from_nocomm (const vector<const gdata*> srcs,
- const vector<CCTK_REAL> times,
- const ibbox& box, const CCTK_REAL time,
- const int order_space,
- const int order_time);
- void interpolate_from_recv (comm_state& state,
- const vector<const gdata*> srcs,
- const vector<CCTK_REAL> times,
- const ibbox& box, const CCTK_REAL time,
- const int order_space,
- const int order_time);
- void interpolate_from_send (comm_state& state,
- const vector<const gdata*> srcs,
- const vector<CCTK_REAL> times,
- const ibbox& box, const CCTK_REAL time,
- const int order_space,
- const int order_time);
- void interpolate_from_wait (comm_state& state,
- const vector<const gdata*> srcs,
- const vector<CCTK_REAL> times,
- const ibbox& box, const CCTK_REAL time,
- const int order_space,
- const int order_time);
-
// Interpolate processor-local source data into communication send buffer
// of the corresponding destination processor
// The case when both source and destination are local is also handled here.
@@ -228,15 +183,15 @@ private:
const CCTK_REAL time,
const int order_space,
const int order_time);
- public:
-
-protected:
+
+ protected:
virtual void
copy_from_innerloop (const gdata* src, const ibbox& box) = 0;
virtual void
interpolate_from_innerloop (const vector<const gdata*> srcs,
const vector<CCTK_REAL> times,
- const ibbox& box, const CCTK_REAL time,
+ const ibbox& box,
+ const CCTK_REAL time,
const int order_space,
const int order_time) = 0;
};