aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorThomas Radke <tradke@aei.mpg.de>2005-05-26 11:42:00 +0000
committerThomas Radke <tradke@aei.mpg.de>2005-05-26 11:42:00 +0000
commita9537286b3571f411b7aa5d546c678c937ca786d (patch)
tree7bce2af36a1d658587a5957b673985ce739f425e
parent36477514b8d25f89f76944d5faec5189653501f1 (diff)
CarpetLib: clean-up of communication scheme for individual sends/recvs on single components
The default communication scheme in Carpet (which does an individual send/recv operation for each component) comes with two parameters for fine tuning: CarpetLib::use_lightweight_buffers CarpetLib::combine_recv_send the defaults of which are set to use a well-tested but also slower communication pattern (as turned out during benchmark runs). This patch cleans up the implementation of this communication scheme so that the fastest communication pattern (combined posting of send/recv; use of lightweight buffers) is now always used. The above parameters therefore became obsolete and shouldn't be used anymore in parfiles. darcs-hash:20050526114253-776a0-780933a1539a260d74da8b92522fa2f48c714964.gz
-rw-r--r--Carpet/Carpet/src/CarpetStartup.cc15
-rw-r--r--Carpet/CarpetLib/param.ccl4
-rw-r--r--Carpet/CarpetLib/src/commstate.cc93
-rw-r--r--Carpet/CarpetLib/src/commstate.hh13
-rw-r--r--Carpet/CarpetLib/src/gdata.cc749
-rw-r--r--Carpet/CarpetLib/src/gdata.hh89
6 files changed, 289 insertions, 674 deletions
diff --git a/Carpet/Carpet/src/CarpetStartup.cc b/Carpet/Carpet/src/CarpetStartup.cc
index d6e459845..5f2aa01ca 100644
--- a/Carpet/Carpet/src/CarpetStartup.cc
+++ b/Carpet/Carpet/src/CarpetStartup.cc
@@ -38,6 +38,7 @@ namespace Carpet {
CCTK_OverloadQueryGroupStorageB (QueryGroupStorageB);
CCTK_OverloadGroupDynamicData (GroupDynamicData);
+ // print warnings if the user set deprecated parameters in her/his parfile
if (CCTK_ParameterQueryTimesSet ("minimise_outstanding_communications",
"CarpetLib") > 0) {
CCTK_WARN (CCTK_WARN_COMPLAIN,
@@ -46,6 +47,20 @@ namespace Carpet {
" anymore. Use 'CarpetLib::use_collective_communication_buffers'"
" instead.");
}
+ if (CCTK_ParameterQueryTimesSet ("combine_recv_send",
+ "CarpetLib") > 0) {
+ CCTK_WARN (CCTK_WARN_COMPLAIN,
+ "You set the parameter 'CarpetLib::combine_recv_send'"
+ " in your parfile. This parameter is deprecated and should not be used"
+ " anymore.");
+ }
+ if (CCTK_ParameterQueryTimesSet ("use_lightweight_buffers",
+ "CarpetLib") > 0) {
+ CCTK_WARN (CCTK_WARN_COMPLAIN,
+ "You set the parameter 'CarpetLib::use_lightweight_buffers'"
+ " in your parfile. This parameter is deprecated and should not be used"
+ " anymore.");
+ }
}
} // namespace Carpet
diff --git a/Carpet/CarpetLib/param.ccl b/Carpet/CarpetLib/param.ccl
index 4cd753401..6966e1282 100644
--- a/Carpet/CarpetLib/param.ccl
+++ b/Carpet/CarpetLib/param.ccl
@@ -37,11 +37,11 @@ BOOLEAN use_waitall "Use MPI_Waitall instead individual MPI_Wait/MPI_Waitsome st
{
} "yes"
-BOOLEAN combine_recv_send "Combine MPI_Irecv and MPI_Isend calls"
+BOOLEAN combine_recv_send "Combine MPI_Irecv and MPI_Isend calls -- DEPRECATED - DO NOT USE ANYMORE"
{
} "no"
-BOOLEAN use_lightweight_buffers "Use lightweight communication buffers instead of data objects"
+BOOLEAN use_lightweight_buffers "Use lightweight communication buffers instead of data objects -- DEPRECATED - DO NOT USE ANYMORE"
{
} "no"
diff --git a/Carpet/CarpetLib/src/commstate.cc b/Carpet/CarpetLib/src/commstate.cc
index 887a9bcd7..be3e485a2 100644
--- a/Carpet/CarpetLib/src/commstate.cc
+++ b/Carpet/CarpetLib/src/commstate.cc
@@ -21,10 +21,9 @@ comm_state::comm_state (int vartype_) : vartype(vartype_)
// This path needs the size of the vartype, the corresponding MPI datatype
// and the collective communication buffers initialized also.
//
- // If this comm state is created with a negative vartype then it will be
- // for single-component communications and set up to step through
- // state_recv
- // state_send
+ // If this comm state is created with a negative vartype then individual
+ // communications on single components are used by stepping through
+ // state_post
// state_wait
DECLARE_CCTK_PARAMETERS;
@@ -42,7 +41,7 @@ comm_state::comm_state (int vartype_) : vartype(vartype_)
case N: { T dummy; datatype = dist::datatype(dummy); } break;
#include "carpet_typecase.hh"
#undef TYPECASE
- default: assert (0);
+ default: assert (0 && "invalid datatype");
}
collbufs.resize (dist::size());
rrequests.resize (dist::size(), MPI_REQUEST_NULL);
@@ -50,49 +49,15 @@ comm_state::comm_state (int vartype_) : vartype(vartype_)
recvbuffers_ready.resize (dist::size());
} else {
- thestate = state_recv;
+ thestate = state_post;
}
}
void comm_state::step ()
{
- DECLARE_CCTK_PARAMETERS;
-
- assert (thestate!=state_done);
- if (! uses_collective_communication_buffers && combine_recv_send) {
- switch (thestate) {
- case state_recv:
- assert (tmps1.empty());
- thestate = state_wait;
- break;
- case state_send:
- assert (0);
- case state_wait:
- assert (tmps1.empty());
- assert (tmps2.empty());
- thestate = state_done;
- break;
- case state_done:
- assert (0);
- default:
- assert (0);
- }
- } else {
+ assert (thestate != state_done);
+ if (uses_collective_communication_buffers) {
switch (thestate) {
- case state_recv:
- assert (tmps2.empty());
- thestate = state_send;
- break;
- case state_send:
- assert (tmps1.empty());
- thestate = state_wait;
- break;
- case state_wait:
- assert (tmps1.empty());
- assert (tmps2.empty());
- thestate = state_done;
- break;
-
case state_get_buffer_sizes:
// The sizes of the collective communication buffers are known
// so now allocate them.
@@ -127,7 +92,7 @@ void comm_state::step ()
case state_empty_recv_buffers:
// Finish (at least one of) the posted communications
- if (! AllPostedCommunicationsFinished (use_waitall)) {
+ if (! AllPostedCommunicationsFinished ()) {
// No state change if there are still outstanding communications;
// do another comm_state loop iteration.
} else {
@@ -140,38 +105,50 @@ void comm_state::step ()
}
break;
- case state_done:
- assert (0);
default:
- assert (0);
+ assert (0 && "invalid state");
+ }
+ } else {
+ switch (thestate) {
+ case state_post:
+ // After all sends/recvs have been posted in 'state_post'
+ // now wait for their completion in 'state_wait'.
+ thestate = state_wait;
+ break;
+ case state_wait:
+ thestate = state_done;
+ break;
+ default:
+ assert (0 && "invalid state");
}
}
}
bool comm_state::done ()
{
- return thestate==state_done;
+ return thestate == state_done;
}
comm_state::~comm_state ()
{
assert (thestate == state_done ||
thestate == uses_collective_communication_buffers ?
- state_get_buffer_sizes : state_recv);
- assert (tmps1.empty());
- assert (tmps2.empty());
+ state_get_buffer_sizes : state_post);
assert (requests.empty());
}
// wait for completion of posted collective buffer sends/receives
//
-// Depending on use_waitall, this function will wait for all at once (true)
-// or at least one (false) of the posted receive operations to finish.
+// Depending on the parameter CarpetLib::use_waitall, this function will wait
+// for all at once (true) or at least one (false) of the
+// posted receive operations to finish.
//
// It returns true if all posted communications have been completed.
-bool comm_state::AllPostedCommunicationsFinished (bool use_waitall)
+bool comm_state::AllPostedCommunicationsFinished ()
{
+ DECLARE_CCTK_PARAMETERS;
+
// check if all outstanding receives have been completed already
if (num_posted_recvs == num_completed_recvs) {
// finalize the outstanding sends in one go
@@ -215,16 +192,6 @@ bool comm_state::AllPostedCommunicationsFinished (bool use_waitall)
}
-comm_state::gcommbuf::gcommbuf ()
-{
-}
-
-comm_state::gcommbuf::~gcommbuf ()
-{
-}
-
-
-
template<typename T>
comm_state::commbuf<T>::commbuf (ibbox const & box)
{
diff --git a/Carpet/CarpetLib/src/commstate.hh b/Carpet/CarpetLib/src/commstate.hh
index 21c05320e..e120fa5e7 100644
--- a/Carpet/CarpetLib/src/commstate.hh
+++ b/Carpet/CarpetLib/src/commstate.hh
@@ -21,8 +21,8 @@ class gdata;
// Depending on how a comm state object was created,
// it will step through one of two state transitions (in the given order):
enum astate {
- // "recv -> send -> wait" are used for single-component communications
- state_recv, state_send, state_wait,
+ // these are used for communications on individual components
+ state_post, state_wait,
// these are used for collective communications
state_get_buffer_sizes, state_fill_send_buffers, state_empty_recv_buffers,
@@ -56,16 +56,13 @@ public:
// the following members are used for single-component communications
//////////////////////////////////////////////////////////////////////////
- // Lists of temporary data objects
- queue<gdata*> tmps1, tmps2;
-
// List of MPI requests for use_waitall
vector<MPI_Request> requests;
// Lists of communication buffers for use_lightweight_buffers
struct gcommbuf {
- gcommbuf ();
- virtual ~gcommbuf ();
+ gcommbuf () {};
+ virtual ~gcommbuf () {};
MPI_Request request;
virtual void const * pointer () const = 0;
virtual void * pointer () = 0;
@@ -135,7 +132,7 @@ private:
int num_completed_recvs;
// wait for completion of posted collective buffer sends/receives
- bool AllPostedCommunicationsFinished(bool use_waitall);
+ bool AllPostedCommunicationsFinished();
};
diff --git a/Carpet/CarpetLib/src/gdata.cc b/Carpet/CarpetLib/src/gdata.cc
index 31f62b38e..f5e3ac081 100644
--- a/Carpet/CarpetLib/src/gdata.cc
+++ b/Carpet/CarpetLib/src/gdata.cc
@@ -26,7 +26,7 @@ using namespace std;
static int nexttag ()
{
DECLARE_CCTK_PARAMETERS;
-
+
int const min_tag = 100;
static int last = 0;
++last;
@@ -66,29 +66,16 @@ void gdata::change_processor (comm_state& state,
const int newproc,
void* const mem)
{
- DECLARE_CCTK_PARAMETERS;
-
switch (state.thestate) {
- case state_recv:
- if (combine_recv_send) {
- change_processor_recv (state, newproc, mem);
- change_processor_send (state, newproc, mem);
- } else {
- change_processor_recv (state, newproc, mem);
- }
- break;
- case state_send:
- if (combine_recv_send) {
- // do nothing
- } else {
- change_processor_send (state, newproc, mem);
- }
+ case state_post:
+ change_processor_recv (state, newproc, mem);
+ change_processor_send (state, newproc, mem);
break;
case state_wait:
change_processor_wait (state, newproc, mem);
break;
default:
- assert(0);
+ assert(0 && "invalid state");
}
}
@@ -98,39 +85,34 @@ void gdata::change_processor (comm_state& state,
void gdata::copy_from (comm_state& state,
const gdata* src, const ibbox& box)
{
- DECLARE_CCTK_PARAMETERS;
-
assert (has_storage() && src->has_storage());
assert (all(box.lower()>=extent().lower()
- && box.lower()>=src->extent().lower()));
+ && box.lower()>=src->extent().lower()));
assert (all(box.upper()<=extent().upper()
- && box.upper()<=src->extent().upper()));
+ && box.upper()<=src->extent().upper()));
assert (all(box.stride()==extent().stride()
- && box.stride()==src->extent().stride()));
+ && box.stride()==src->extent().stride()));
assert (all((box.lower()-extent().lower())%box.stride() == 0
- && (box.lower()-src->extent().lower())%box.stride() == 0));
-
+ && (box.lower()-src->extent().lower())%box.stride() == 0));
+
if (box.empty()) return;
-
+ if (dist::rank() != proc() && dist::rank() != src->proc()) return;
+
switch (state.thestate) {
- case state_recv:
- if (combine_recv_send) {
- copy_from_recv (state, src, box);
- copy_from_send (state, src, box);
+ case state_post:
+ if (proc() == src->proc()) {
+ copy_from_innerloop (src, box);
} else {
- copy_from_recv (state, src, box);
- }
- break;
- case state_send:
- if (combine_recv_send) {
- // do nothing
- } else {
- copy_from_send (state, src, box);
+ copy_from_post (state, src, box);
}
break;
+
case state_wait:
- copy_from_wait (state, src, box);
+ if (proc() != src->proc()) {
+ copy_from_wait (state, src, box);
+ }
break;
+
case state_get_buffer_sizes:
// don't count processor-local copies
if (proc() != src->proc()) {
@@ -144,13 +126,19 @@ void gdata::copy_from (comm_state& state,
}
}
break;
+
case state_fill_send_buffers:
// if this is a source processor: copy its data into the send buffer
// (the processor-local case is also handled here)
if (src->proc() == dist::rank()) {
- copy_into_sendbuffer (state, src, box);
+ if (proc() == src->proc()) {
+ copy_from_innerloop (src, box);
+ } else {
+ copy_into_sendbuffer (state, src, box);
+ }
}
break;
+
case state_empty_recv_buffers:
// if this is a destination processor and data has already been received
// from the source processor: copy it from the recv buffer
@@ -158,286 +146,160 @@ void gdata::copy_from (comm_state& state,
copy_from_recvbuffer (state, src, box);
}
break;
+
default:
- assert(0);
+ assert(0 && "invalid state");
}
}
-
-void gdata::copy_from_nocomm (const gdata* src, const ibbox& box)
+void gdata::copy_from_post (comm_state& state,
+ const gdata* src, const ibbox& box)
{
- assert (has_storage() && src->has_storage());
- assert (proc() == src->proc());
-
- // copy on same processor
+ wtime_copyfrom_recv.start();
+
if (dist::rank() == proc()) {
- copy_from_innerloop (src, box);
- }
-}
+ // this processor receives data
+ wtime_copyfrom_recvinner_allocate.start();
+ comm_state::gcommbuf * b = make_typed_commbuf (box);
+ wtime_copyfrom_recvinner_allocate.stop();
+
+ wtime_copyfrom_recvinner_recv.start();
+ MPI_Irecv (b->pointer(), b->size(), b->datatype(), src->proc(),
+ tag, dist::comm, &b->request);
+ wtime_copyfrom_recvinner_recv.stop();
+ state.requests.push_back (b->request);
+ state.recvbufs.push (b);
-void gdata::copy_from_recv (comm_state& state,
- const gdata* src, const ibbox& box)
-{
- DECLARE_CCTK_PARAMETERS;
-
- wtime_copyfrom_recv.start();
-
- if (proc() == src->proc()) {
- // copy on same processor
-
} else {
- // copy to different processor
-
- if (! use_lightweight_buffers) {
-
- wtime_copyfrom_recv_maketyped.start();
- gdata* const tmp = make_typed(varindex, transport_operator);
- wtime_copyfrom_recv_maketyped.stop();
- state.tmps1.push (tmp);
- wtime_copyfrom_recv_allocate.start();
- tmp->allocate (box, src->proc());
- wtime_copyfrom_recv_allocate.stop();
- wtime_copyfrom_recv_changeproc_recv.start();
- tmp->change_processor_recv (state, proc());
- wtime_copyfrom_recv_changeproc_recv.stop();
-
- } else {
-
- if (dist::rank() == proc()) {
- // this processor receives data
-
- wtime_copyfrom_recvinner_allocate.start();
- comm_state::gcommbuf * b = make_typed_commbuf (box);
- wtime_copyfrom_recvinner_allocate.stop();
-
- wtime_copyfrom_recvinner_recv.start();
- MPI_Irecv (b->pointer(), b->size(), b->datatype(), src->proc(),
- tag, dist::comm, &b->request);
- wtime_copyfrom_recvinner_recv.stop();
- if (use_waitall) {
- state.requests.push_back (b->request);
- }
- state.recvbufs.push (b);
-
+ // this processor sends data
+
+ wtime_copyfrom_sendinner_allocate.start();
+ comm_state::gcommbuf * b = src->make_typed_commbuf (box);
+ wtime_copyfrom_sendinner_allocate.stop();
+
+ // copy data into send buffer
+ wtime_copyfrom_sendinner_copy.start();
+ const ibbox& ext = src->extent();
+ ivect shape = ext.shape() / ext.stride();
+ ivect items = (box.upper() - box.lower()) / box.stride() + 1;
+ ivect offs = (box.lower() - ext.lower()) / ext.stride();
+ char* send_buffer = (char*) b->pointer();
+ const int vartype = CCTK_VarTypeI(varindex);
+ const int vartypesize = CCTK_VarTypeSize(vartype);
+ assert(vartypesize >= 0);
+
+ for (int k = 0; k < items[2]; k++) {
+ for (int j = 0; j < items[1]; j++) {
+ int i = offs[0] + shape[0]*((j+offs[1]) + shape[1]*(k+offs[2]));
+ memcpy (send_buffer,
+ ((char*) src->storage()) + vartypesize*i,
+ vartypesize*items[0]);
+ send_buffer += vartypesize*items[0];
}
-
}
-
- }
-
- wtime_copyfrom_recv.stop();
-}
+ wtime_copyfrom_sendinner_copy.stop();
-void gdata::copy_from_send (comm_state& state,
- const gdata* src, const ibbox& box)
-{
- DECLARE_CCTK_PARAMETERS;
-
- wtime_copyfrom_send.start();
-
- if (proc() == src->proc()) {
- // copy on same processor
-
- wtime_copyfrom_send_copyfrom_nocomm1.start();
- copy_from_nocomm (src, box);
- wtime_copyfrom_send_copyfrom_nocomm1.stop();
-
- } else {
- // copy to different processor
-
- if (! use_lightweight_buffers) {
-
- gdata* const tmp = state.tmps1.front();
- state.tmps1.pop();
- state.tmps2.push (tmp);
- assert (tmp);
- wtime_copyfrom_send_copyfrom_nocomm2.start();
- tmp->copy_from_nocomm (src, box);
- wtime_copyfrom_send_copyfrom_nocomm2.stop();
- wtime_copyfrom_send_changeproc_send.start();
- tmp->change_processor_send (state, proc());
- wtime_copyfrom_send_changeproc_send.stop();
-
- } else {
-
- if (dist::rank() == src->proc()) {
- // this processor sends data
-
- wtime_copyfrom_sendinner_allocate.start();
- comm_state::gcommbuf * b = src->make_typed_commbuf (box);
- wtime_copyfrom_sendinner_allocate.stop();
-
- wtime_copyfrom_sendinner_copy.start();
- assert (src->_has_storage);
- assert (dist::rank() == src->proc());
- gdata * tmp = src->make_typed (varindex, transport_operator, tag);
- tmp->allocate (box, src->proc(), b->pointer());
- tmp->copy_from_innerloop (src, box);
- delete tmp;
- wtime_copyfrom_sendinner_copy.stop();
-
- wtime_copyfrom_sendinner_send.start();
- MPI_Isend (b->pointer(), b->size(), b->datatype(), proc(),
+ wtime_copyfrom_sendinner_send.start();
+ MPI_Isend (b->pointer(), b->size(), b->datatype(), proc(),
tag, dist::comm, &b->request);
- wtime_copyfrom_sendinner_send.stop();
- if (use_waitall) {
- state.requests.push_back (b->request);
- }
- state.sendbufs.push (b);
-
- }
- }
-
+ wtime_copyfrom_sendinner_send.stop();
+ state.requests.push_back (b->request);
+ state.sendbufs.push (b);
}
-
- wtime_copyfrom_send.stop();
+
+ wtime_copyfrom_recv.stop();
}
+
void gdata::copy_from_wait (comm_state& state,
const gdata* src, const ibbox& box)
{
- DECLARE_CCTK_PARAMETERS;
-
wtime_copyfrom_wait.start();
-
- if (proc() == src->proc()) {
- // copy on same processor
-
- } else {
- // copy to different processor
-
- if (! use_lightweight_buffers) {
-
- gdata* const tmp = state.tmps2.front();
- state.tmps2.pop();
- assert (tmp);
- wtime_copyfrom_wait_changeproc_wait.start();
- tmp->change_processor_wait (state, proc());
- wtime_copyfrom_wait_changeproc_wait.stop();
- wtime_copyfrom_wait_copyfrom_nocomm.start();
- copy_from_nocomm (tmp, box);
- wtime_copyfrom_wait_copyfrom_nocomm.stop();
- wtime_copyfrom_wait_delete.start();
- delete tmp;
- wtime_copyfrom_wait_delete.stop();
-
- } else {
-
- if (dist::rank() == proc()) {
- // this processor receives data
-
- comm_state::gcommbuf * b = state.recvbufs.front();
- state.recvbufs.pop();
-
- wtime_copyfrom_recvwaitinner_wait.start();
- if (use_waitall) {
- if (! state.requests.empty()) {
- // wait for all requests at once
- MPI_Waitall (state.requests.size(), &state.requests.front(),
- MPI_STATUSES_IGNORE);
- state.requests.clear();
- }
- }
-
- if (! use_waitall) {
- MPI_Wait (&b->request, MPI_STATUS_IGNORE);
- }
- wtime_copyfrom_recvwaitinner_wait.stop();
-
- wtime_copyfrom_recvwaitinner_copy.start();
- assert (_has_storage);
- assert (dist::rank() == proc());
- gdata * tmp = make_typed (varindex, transport_operator, tag);
- tmp->allocate (box, proc(), b->pointer());
- copy_from_innerloop (tmp, box);
- delete tmp;
- wtime_copyfrom_recvwaitinner_copy.stop();
-
- wtime_copyfrom_recvwaitinner_delete.start();
- delete b;
- wtime_copyfrom_recvwaitinner_delete.stop();
-
- }
-
- if (dist::rank() == src->proc()) {
- // this processor sends data
-
- comm_state::gcommbuf * b = state.sendbufs.front();
- state.sendbufs.pop();
-
- wtime_copyfrom_sendwaitinner_wait.start();
- if (use_waitall) {
- if (! state.requests.empty()) {
- // wait for all requests at once
- MPI_Waitall (state.requests.size(), &state.requests.front(),
- MPI_STATUSES_IGNORE);
- state.requests.clear();
- }
- }
-
- if (! use_waitall) {
- MPI_Wait (&b->request, MPI_STATUS_IGNORE);
- }
- wtime_copyfrom_sendwaitinner_wait.stop();
-
- wtime_copyfrom_sendwaitinner_delete.start();
- delete b;
- wtime_copyfrom_sendwaitinner_delete.stop();
-
+
+ wtime_copyfrom_recvwaitinner_wait.start();
+ if (! state.requests.empty()) {
+ // wait for all requests at once
+ MPI_Waitall (state.requests.size(), &state.requests.front(),
+ MPI_STATUSES_IGNORE);
+ state.requests.clear();
+ }
+ wtime_copyfrom_recvwaitinner_wait.stop();
+
+ queue<comm_state::gcommbuf*>* const bufs =
+ dist::rank() == proc() ? &state.recvbufs : &state.sendbufs;
+ comm_state::gcommbuf* b = bufs->front();
+
+ // copy data out of receive buffer
+ if (bufs == &state.recvbufs) {
+ wtime_copyfrom_recvwaitinner_copy.start();
+ const ibbox& ext = extent();
+ ivect shape = ext.shape() / ext.stride();
+ ivect items = (box.upper() - box.lower()) / box.stride() + 1;
+ ivect offs = (box.lower() - ext.lower()) / ext.stride();
+ const char* recv_buffer = (const char*) b->pointer();
+ const int vartype = CCTK_VarTypeI(varindex);
+ const int vartypesize = CCTK_VarTypeSize(vartype);
+ assert(vartypesize >= 0);
+
+ for (int k = 0; k < items[2]; k++) {
+ for (int j = 0; j < items[1]; j++) {
+ int i = offs[0] + shape[0]*((j+offs[1]) + shape[1]*(k+offs[2]));
+ memcpy (((char*) storage()) + vartypesize*i,
+ recv_buffer,
+ vartypesize*items[0]);
+ recv_buffer += vartypesize*items[0];
}
-
}
-
+ wtime_copyfrom_recvwaitinner_copy.stop();
}
-
+
+ wtime_copyfrom_recvwaitinner_delete.start();
+ bufs->pop();
+ delete b;
+ wtime_copyfrom_recvwaitinner_delete.stop();
+
wtime_copyfrom_wait.stop();
}
// Copy processor-local source data into communication send buffer
// of the corresponding destination processor
-// The case when both source and destination are local is also handled here.
void gdata::copy_into_sendbuffer (comm_state& state,
const gdata* src, const ibbox& box)
{
- if (proc() == src->proc()) {
- // copy on same processor
- copy_from_innerloop (src, box);
- } else {
- // copy to remote processor
- assert (src->_has_storage);
- assert (proc() < state.collbufs.size());
- int fillstate = (state.collbufs[proc()].sendbuf +
- box.size()*state.vartypesize) -
- state.collbufs[proc()].sendbufbase;
- assert (fillstate <= state.collbufs[proc()].sendbufsize*state.vartypesize);
-
- // copy this processor's data into the send buffer
- const ibbox& ext = src->extent();
- ivect shape = ext.shape() / ext.stride();
- ivect items = (box.upper() - box.lower()) / box.stride() + 1;
- ivect offs = (box.lower() - ext.lower()) / ext.stride();
+ // copy to remote processor
+ assert (src->has_storage());
+ assert (proc() < state.collbufs.size());
+ int fillstate = (state.collbufs[proc()].sendbuf +
+ box.size()*state.vartypesize) -
+ state.collbufs[proc()].sendbufbase;
+ assert (fillstate <= state.collbufs[proc()].sendbufsize*state.vartypesize);
+
+ // copy this processor's data into the send buffer
+ const ibbox& ext = src->extent();
+ ivect shape = ext.shape() / ext.stride();
+ ivect items = (box.upper() - box.lower()) / box.stride() + 1;
+ ivect offs = (box.lower() - ext.lower()) / ext.stride();
- for (int k = 0; k < items[2]; k++) {
- for (int j = 0; j < items[1]; j++) {
- int i = offs[0] + shape[0]*((j+offs[1]) + shape[1]*(k+offs[2]));
- memcpy (state.collbufs[proc()].sendbuf,
- ((const char*) src->storage()) + state.vartypesize*i,
- state.vartypesize*items[0]);
- state.collbufs[proc()].sendbuf += state.vartypesize*items[0];
- }
+ for (int k = 0; k < items[2]; k++) {
+ for (int j = 0; j < items[1]; j++) {
+ int i = offs[0] + shape[0]*((j+offs[1]) + shape[1]*(k+offs[2]));
+ memcpy (state.collbufs[proc()].sendbuf,
+ ((const char*) src->storage()) + state.vartypesize*i,
+ state.vartypesize*items[0]);
+ state.collbufs[proc()].sendbuf += state.vartypesize*items[0];
}
+ }
- // post the send if the buffer is full
- if (fillstate == state.collbufs[proc()].sendbufsize*state.vartypesize) {
- MPI_Isend (state.collbufs[proc()].sendbufbase,
- state.collbufs[proc()].sendbufsize,
- state.datatype, proc(), 0, dist::comm,
- &state.srequests[proc()]);
- }
+ // post the send if the buffer is full
+ if (fillstate == state.collbufs[proc()].sendbufsize*state.vartypesize) {
+ MPI_Isend (state.collbufs[proc()].sendbufbase,
+ state.collbufs[proc()].sendbufsize,
+ state.datatype, proc(), 0, dist::comm,
+ &state.srequests[proc()]);
}
}
@@ -479,8 +341,6 @@ void gdata
const int order_space,
const int order_time)
{
- DECLARE_CCTK_PARAMETERS;
-
assert (transport_operator != op_error);
if (transport_operator == op_none) return;
@@ -495,38 +355,34 @@ void gdata
assert (all(box.lower()>=srcs.at(t)->extent().lower()));
assert (all(box.upper()<=srcs.at(t)->extent().upper()));
}
-
assert (! box.empty());
- if (box.empty()) return;
-
+ const gdata* src = srcs.at(0);
+ if (dist::rank() != proc() && dist::rank() != src->proc()) return;
+
switch (state.thestate) {
- case state_recv:
- if (combine_recv_send) {
- interpolate_from_recv (state, srcs, times, box, time, order_space, order_time);
- interpolate_from_send (state, srcs, times, box, time, order_space, order_time);
- } else {
- interpolate_from_recv (state, srcs, times, box, time, order_space, order_time);
- }
- break;
- case state_send:
- if (combine_recv_send) {
- // do nothing
+ case state_post:
+ if (proc() == src->proc()) {
+ interpolate_from_innerloop(srcs, times, box, time,
+ order_space, order_time);
} else {
- interpolate_from_send (state, srcs, times, box, time, order_space, order_time);
+ interpolate_from_post(state, srcs, times, box, time,
+ order_space, order_time);
}
break;
case state_wait:
- interpolate_from_wait (state, srcs, times, box, time, order_space, order_time);
+ if (proc() != src->proc()) {
+ copy_from_wait (state, src, box);
+ }
break;
case state_get_buffer_sizes:
// don't count processor-local interpolations
- if (proc() != srcs.at(0)->proc()) {
+ if (proc() != src->proc()) {
// if this is a destination processor: increment its recv buffer size
if (proc() == dist::rank()) {
- state.collbufs.at(srcs.at(0)->proc()).recvbufsize += box.size();
+ state.collbufs.at(src->proc()).recvbufsize += box.size();
}
// if this is a source processor: increment its send buffer size
- if (srcs.at(0)->proc() == dist::rank()) {
+ if (src->proc() == dist::rank()) {
state.collbufs.at(proc()).sendbufsize += box.size();
}
}
@@ -534,242 +390,72 @@ void gdata
case state_fill_send_buffers:
// if this is a source processor: interpolate its data into the send buffer
// (the processor-local case is also handled here)
- if (srcs.at(0)->proc() == dist::rank()) {
- interpolate_into_sendbuffer (state, srcs, times, box, time,
+ if (src->proc() == dist::rank()) {
+ if (proc() == src->proc()) {
+ interpolate_from_innerloop(srcs, times, box, time,
order_space, order_time);
+ } else {
+ interpolate_into_sendbuffer(state, srcs, times, box,
+ time, order_space, order_time);
+ }
}
break;
case state_empty_recv_buffers:
// if this is a destination processor and data has already been received
// from the source processor: copy it from the recv buffer
// (the processor-local case is not handled here)
- if (proc() == dist::rank() && state.recvbuffers_ready.at(srcs.at(0)->proc())) {
- copy_from_recvbuffer (state, srcs.at(0), box);
+ if (proc() == dist::rank() && state.recvbuffers_ready.at(src->proc())) {
+ copy_from_recvbuffer(state, src, box);
}
break;
default:
- assert(0);
- }
-}
-
-
-
-void gdata
-::interpolate_from_nocomm (const vector<const gdata*> srcs,
- const vector<CCTK_REAL> times,
- const ibbox& box, const CCTK_REAL time,
- const int order_space,
- const int order_time)
-{
- assert (proc() == srcs.at(0)->proc());
-
- assert (transport_operator != op_error);
- assert (transport_operator != op_none);
-
- // interpolate on same processor
- if (dist::rank() == proc()) {
- interpolate_from_innerloop
- (srcs, times, box, time, order_space, order_time);
+ assert(0 && "invalid state");
}
}
-
void gdata
-::interpolate_from_recv (comm_state& state,
+::interpolate_from_post (comm_state& state,
const vector<const gdata*> srcs,
const vector<CCTK_REAL> times,
- const ibbox& box, const CCTK_REAL time,
+ const ibbox& box,
+ const CCTK_REAL time,
const int order_space,
const int order_time)
{
- DECLARE_CCTK_PARAMETERS;
-
- if (proc() == srcs.at(0)->proc()) {
- // interpolate on same processor
-
- } else {
+ const gdata* src = srcs.at(0);
+ if (dist::rank() == proc()) {
// interpolate from other processor
-
- if (! use_lightweight_buffers) {
-
- gdata* const tmp = make_typed(varindex, transport_operator);
- state.tmps1.push (tmp);
- tmp->allocate (box, srcs.at(0)->proc());
- tmp->change_processor_recv (state, proc());
-
- } else {
-
- if (dist::rank() == proc()) {
- // this processor receives data
-
- comm_state::gcommbuf * b = make_typed_commbuf (box);
-
- MPI_Irecv (b->pointer(), b->size(), b->datatype(), srcs.at(0)->proc(),
- tag, dist::comm, &b->request);
- if (use_waitall) {
- state.requests.push_back (b->request);
- }
- state.recvbufs.push (b);
-
- }
-
- }
-
- }
-}
+ // this processor receives data
+ comm_state::gcommbuf * b = make_typed_commbuf (box);
-void gdata
-::interpolate_from_send (comm_state& state,
- const vector<const gdata*> srcs,
- const vector<CCTK_REAL> times,
- const ibbox& box, const CCTK_REAL time,
- const int order_space,
- const int order_time)
-{
- DECLARE_CCTK_PARAMETERS;
-
- if (proc() == srcs.at(0)->proc()) {
- // interpolate on same processor
-
- interpolate_from_nocomm (srcs, times, box, time, order_space, order_time);
-
+ MPI_Irecv (b->pointer(), b->size(), b->datatype(), src->proc(),
+ tag, dist::comm, &b->request);
+ state.requests.push_back (b->request);
+ state.recvbufs.push (b);
} else {
- // interpolate from other processor
-
- if (! use_lightweight_buffers) {
-
- gdata* const tmp = state.tmps1.front();
- state.tmps1.pop();
- state.tmps2.push (tmp);
- assert (tmp);
- tmp->interpolate_from_nocomm
- (srcs, times, box, time, order_space, order_time);
- tmp->change_processor_send (state, proc());
-
- } else {
-
- if (dist::rank() == srcs.at(0)->proc()) {
- // this processor sends data
-
- comm_state::gcommbuf * b = srcs.at(0)->make_typed_commbuf (box);
-
- assert (srcs.at(0)->_has_storage);
- assert (dist::rank() == srcs.at(0)->proc());
- gdata * tmp
- = srcs.at(0)->make_typed (varindex, transport_operator, tag);
- tmp->allocate (box, srcs.at(0)->proc(), b->pointer());
- tmp->interpolate_from_innerloop
- (srcs, times, box, time, order_space, order_time);
- delete tmp;
-
- MPI_Isend (b->pointer(), b->size(), b->datatype(), proc(),
- tag, dist::comm, &b->request);
- if (use_waitall) {
- state.requests.push_back (b->request);
- }
- state.sendbufs.push (b);
-
- }
-
- }
-
- }
-}
+ // this processor sends data
+ comm_state::gcommbuf * b = src->make_typed_commbuf (box);
+ gdata * tmp = src->make_typed (varindex, transport_operator, tag);
+ tmp->allocate (box, src->proc(), b->pointer());
+ tmp->interpolate_from_innerloop (srcs, times, box, time,
+ order_space, order_time);
+ delete tmp;
-void gdata
-::interpolate_from_wait (comm_state& state,
- const vector<const gdata*> srcs,
- const vector<CCTK_REAL> times,
- const ibbox& box, const CCTK_REAL time,
- const int order_space,
- const int order_time)
-{
- DECLARE_CCTK_PARAMETERS;
-
- if (proc() == srcs.at(0)->proc()) {
- // interpolate on same processor
-
- } else {
- // interpolate from other processor
-
- if (! use_lightweight_buffers) {
-
- gdata* const tmp = state.tmps2.front();
- state.tmps2.pop();
- assert (tmp);
- tmp->change_processor_wait (state, proc());
- copy_from_nocomm (tmp, box);
- delete tmp;
-
- } else {
-
- if (dist::rank() == proc()) {
- // this processor receives data
-
- comm_state::gcommbuf * b = state.recvbufs.front();
- state.recvbufs.pop();
-
- if (use_waitall) {
- if (! state.requests.empty()) {
- // wait for all requests at once
- MPI_Waitall (state.requests.size(), &state.requests.front(),
- MPI_STATUSES_IGNORE);
- state.requests.clear();
- }
- }
-
- if (! use_waitall) {
- MPI_Wait (&b->request, MPI_STATUS_IGNORE);
- }
-
- assert (_has_storage);
- assert (dist::rank() == proc());
- gdata * tmp = make_typed (varindex, transport_operator, tag);
- tmp->allocate (box, proc(), b->pointer());
- copy_from_innerloop (tmp, box);
- delete tmp;
-
- delete b;
-
- }
-
- if (dist::rank() == srcs.at(0)->proc()) {
- // this processor sends data
-
- comm_state::gcommbuf * b = state.sendbufs.front();
- state.sendbufs.pop();
-
- if (use_waitall) {
- if (! state.requests.empty()) {
- // wait for all requests at once
- MPI_Waitall (state.requests.size(), &state.requests.front(),
- MPI_STATUSES_IGNORE);
- state.requests.clear();
- }
- }
-
- if (! use_waitall) {
- MPI_Wait (&b->request, MPI_STATUS_IGNORE);
- }
-
- delete b;
-
- }
-
- }
-
+ MPI_Isend (b->pointer(), b->size(), b->datatype(), proc(),
+ tag, dist::comm, &b->request);
+ state.requests.push_back (b->request);
+ state.sendbufs.push (b);
}
}
// Interpolate processor-local source data into communication send buffer
// of the corresponding destination processor
-// The case when both source and destination are local is also handled here.
void gdata
::interpolate_into_sendbuffer (comm_state& state,
const vector<const gdata*> srcs,
@@ -779,35 +465,30 @@ void gdata
const int order_space,
const int order_time)
{
- if (proc() == srcs.at(0)->proc()) {
- // interpolate on same processor
- interpolate_from_innerloop (srcs, times, box, time,
- order_space, order_time);
- } else {
- // interpolate to remote processor
- assert (srcs.at(0)->_has_storage);
- assert (proc() < state.collbufs.size());
- int fillstate = (state.collbufs[proc()].sendbuf +
- box.size()*state.vartypesize) -
- state.collbufs[proc()].sendbufbase;
- assert (fillstate <= state.collbufs[proc()].sendbufsize*state.vartypesize);
-
- // interpolate this processor's data into the send buffer
- gdata* tmp = srcs.at(0)->make_typed (varindex, transport_operator, tag);
- tmp->allocate (box, srcs.at(0)->proc(), state.collbufs[proc()].sendbuf);
- tmp->interpolate_from_innerloop (srcs, times, box, time,
- order_space, order_time);
- delete tmp;
+ // interpolate to remote processor
+ const gdata* src = srcs.at(0);
+ assert (src->has_storage());
+ assert (proc() < state.collbufs.size());
+ int fillstate = (state.collbufs[proc()].sendbuf +
+ box.size()*state.vartypesize) -
+ state.collbufs[proc()].sendbufbase;
+ assert (fillstate <= state.collbufs[proc()].sendbufsize*state.vartypesize);
+
+ // interpolate this processor's data into the send buffer
+ gdata* tmp = src->make_typed (varindex, transport_operator, tag);
+ tmp->allocate (box, src->proc(), state.collbufs[proc()].sendbuf);
+ tmp->interpolate_from_innerloop (srcs, times, box, time,
+ order_space, order_time);
+ delete tmp;
- // advance send buffer to point to the next ibbox slot
- state.collbufs[proc()].sendbuf += state.vartypesize * box.size();
+ // advance send buffer to point to the next ibbox slot
+ state.collbufs[proc()].sendbuf += state.vartypesize * box.size();
- // post the send if the buffer is full
- if (fillstate == state.collbufs[proc()].sendbufsize*state.vartypesize) {
- MPI_Isend (state.collbufs[proc()].sendbufbase,
- state.collbufs[proc()].sendbufsize,
- state.datatype, proc(), 0, dist::comm,
- &state.srequests[proc()]);
- }
+ // post the send if the buffer is full
+ if (fillstate == state.collbufs[proc()].sendbufsize*state.vartypesize) {
+ MPI_Isend (state.collbufs[proc()].sendbufbase,
+ state.collbufs[proc()].sendbufsize,
+ state.datatype, proc(), 0, dist::comm,
+ &state.srequests[proc()]);
}
}
diff --git a/Carpet/CarpetLib/src/gdata.hh b/Carpet/CarpetLib/src/gdata.hh
index 8750c4bc5..8f2b66035 100644
--- a/Carpet/CarpetLib/src/gdata.hh
+++ b/Carpet/CarpetLib/src/gdata.hh
@@ -141,14 +141,26 @@ private:
const = 0;
public:
- void copy_from (comm_state& state,
- const gdata* src, const ibbox& box);
+ void copy_from (comm_state& state,
+ const gdata* src, const ibbox& box);
+ void interpolate_from (comm_state& state,
+ const vector<const gdata*> srcs,
+ const vector<CCTK_REAL> times,
+ const ibbox& box,
+ const CCTK_REAL time,
+ const int order_space,
+ const int order_time);
+
private:
- void copy_from_nocomm (const gdata* src, const ibbox& box);
- void copy_from_recv (comm_state& state,
- const gdata* src, const ibbox& box);
- void copy_from_send (comm_state& state,
+ void copy_from_post (comm_state& state,
const gdata* src, const ibbox& box);
+ void interpolate_from_post (comm_state& state,
+ const vector<const gdata*> srcs,
+ const vector<CCTK_REAL> times,
+ const ibbox& box,
+ const CCTK_REAL time,
+ const int order_space,
+ const int order_time);
void copy_from_wait (comm_state& state,
const gdata* src, const ibbox& box);
@@ -161,63 +173,6 @@ private:
// of the corresponding source processor
void copy_from_recvbuffer (comm_state& state,
const gdata* src, const ibbox& box);
-
-#if 0
- protected:
- virtual void
- copy_from_recv_inner (comm_state& state,
- const gdata* src,
- const ibbox& box)
- = 0;
- virtual void
- copy_from_send_inner (comm_state& state,
- const gdata* src,
- const ibbox& box)
- = 0;
- virtual void
- copy_from_recv_wait_inner (comm_state& state,
- const gdata* src,
- const ibbox& box)
- = 0;
- virtual void
- copy_from_send_wait_inner (comm_state& state,
- const gdata* src,
- const ibbox& box)
- = 0;
-#endif
-
- public:
- void interpolate_from (comm_state& state,
- const vector<const gdata*> srcs,
- const vector<CCTK_REAL> times,
- const ibbox& box, const CCTK_REAL time,
- const int order_space,
- const int order_time);
- private:
- void interpolate_from_nocomm (const vector<const gdata*> srcs,
- const vector<CCTK_REAL> times,
- const ibbox& box, const CCTK_REAL time,
- const int order_space,
- const int order_time);
- void interpolate_from_recv (comm_state& state,
- const vector<const gdata*> srcs,
- const vector<CCTK_REAL> times,
- const ibbox& box, const CCTK_REAL time,
- const int order_space,
- const int order_time);
- void interpolate_from_send (comm_state& state,
- const vector<const gdata*> srcs,
- const vector<CCTK_REAL> times,
- const ibbox& box, const CCTK_REAL time,
- const int order_space,
- const int order_time);
- void interpolate_from_wait (comm_state& state,
- const vector<const gdata*> srcs,
- const vector<CCTK_REAL> times,
- const ibbox& box, const CCTK_REAL time,
- const int order_space,
- const int order_time);
-
// Interpolate processor-local source data into communication send buffer
// of the corresponding destination processor
// The case when both source and destination are local is also handled here.
@@ -228,15 +183,15 @@ private:
const CCTK_REAL time,
const int order_space,
const int order_time);
- public:
-
-protected:
+
+ protected:
virtual void
copy_from_innerloop (const gdata* src, const ibbox& box) = 0;
virtual void
interpolate_from_innerloop (const vector<const gdata*> srcs,
const vector<CCTK_REAL> times,
- const ibbox& box, const CCTK_REAL time,
+ const ibbox& box,
+ const CCTK_REAL time,
const int order_space,
const int order_time) = 0;
};