diff options
-rw-r--r-- | Carpet/Carpet/src/CarpetStartup.cc | 15 | ||||
-rw-r--r-- | Carpet/CarpetLib/param.ccl | 4 | ||||
-rw-r--r-- | Carpet/CarpetLib/src/commstate.cc | 93 | ||||
-rw-r--r-- | Carpet/CarpetLib/src/commstate.hh | 13 | ||||
-rw-r--r-- | Carpet/CarpetLib/src/gdata.cc | 749 | ||||
-rw-r--r-- | Carpet/CarpetLib/src/gdata.hh | 89 |
6 files changed, 289 insertions, 674 deletions
diff --git a/Carpet/Carpet/src/CarpetStartup.cc b/Carpet/Carpet/src/CarpetStartup.cc index d6e459845..5f2aa01ca 100644 --- a/Carpet/Carpet/src/CarpetStartup.cc +++ b/Carpet/Carpet/src/CarpetStartup.cc @@ -38,6 +38,7 @@ namespace Carpet { CCTK_OverloadQueryGroupStorageB (QueryGroupStorageB); CCTK_OverloadGroupDynamicData (GroupDynamicData); + // print warnings if the user set deprecated parameters in her/his parfile if (CCTK_ParameterQueryTimesSet ("minimise_outstanding_communications", "CarpetLib") > 0) { CCTK_WARN (CCTK_WARN_COMPLAIN, @@ -46,6 +47,20 @@ namespace Carpet { " anymore. Use 'CarpetLib::use_collective_communication_buffers'" " instead."); } + if (CCTK_ParameterQueryTimesSet ("combine_recv_send", + "CarpetLib") > 0) { + CCTK_WARN (CCTK_WARN_COMPLAIN, + "You set the parameter 'CarpetLib::combine_recv_send'" + " in your parfile. This parameter is deprecated and should not be used" + " anymore."); + } + if (CCTK_ParameterQueryTimesSet ("use_lightweight_buffers", + "CarpetLib") > 0) { + CCTK_WARN (CCTK_WARN_COMPLAIN, + "You set the parameter 'CarpetLib::use_lightweight_buffers'" + " in your parfile. This parameter is deprecated and should not be used" + " anymore."); + } } } // namespace Carpet diff --git a/Carpet/CarpetLib/param.ccl b/Carpet/CarpetLib/param.ccl index 4cd753401..6966e1282 100644 --- a/Carpet/CarpetLib/param.ccl +++ b/Carpet/CarpetLib/param.ccl @@ -37,11 +37,11 @@ BOOLEAN use_waitall "Use MPI_Waitall instead individual MPI_Wait/MPI_Waitsome st { } "yes" -BOOLEAN combine_recv_send "Combine MPI_Irecv and MPI_Isend calls" +BOOLEAN combine_recv_send "Combine MPI_Irecv and MPI_Isend calls -- DEPRECATED - DO NOT USE ANYMORE" { } "no" -BOOLEAN use_lightweight_buffers "Use lightweight communication buffers instead of data objects" +BOOLEAN use_lightweight_buffers "Use lightweight communication buffers instead of data objects -- DEPRECATED - DO NOT USE ANYMORE" { } "no" diff --git a/Carpet/CarpetLib/src/commstate.cc b/Carpet/CarpetLib/src/commstate.cc index 887a9bcd7..be3e485a2 100644 --- a/Carpet/CarpetLib/src/commstate.cc +++ b/Carpet/CarpetLib/src/commstate.cc @@ -21,10 +21,9 @@ comm_state::comm_state (int vartype_) : vartype(vartype_) // This path needs the size of the vartype, the corresponding MPI datatype // and the collective communication buffers initialized also. // - // If this comm state is created with a negative vartype then it will be - // for single-component communications and set up to step through - // state_recv - // state_send + // If this comm state is created with a negative vartype then individual + // communications on single components are used by stepping through + // state_post // state_wait DECLARE_CCTK_PARAMETERS; @@ -42,7 +41,7 @@ comm_state::comm_state (int vartype_) : vartype(vartype_) case N: { T dummy; datatype = dist::datatype(dummy); } break; #include "carpet_typecase.hh" #undef TYPECASE - default: assert (0); + default: assert (0 && "invalid datatype"); } collbufs.resize (dist::size()); rrequests.resize (dist::size(), MPI_REQUEST_NULL); @@ -50,49 +49,15 @@ comm_state::comm_state (int vartype_) : vartype(vartype_) recvbuffers_ready.resize (dist::size()); } else { - thestate = state_recv; + thestate = state_post; } } void comm_state::step () { - DECLARE_CCTK_PARAMETERS; - - assert (thestate!=state_done); - if (! uses_collective_communication_buffers && combine_recv_send) { - switch (thestate) { - case state_recv: - assert (tmps1.empty()); - thestate = state_wait; - break; - case state_send: - assert (0); - case state_wait: - assert (tmps1.empty()); - assert (tmps2.empty()); - thestate = state_done; - break; - case state_done: - assert (0); - default: - assert (0); - } - } else { + assert (thestate != state_done); + if (uses_collective_communication_buffers) { switch (thestate) { - case state_recv: - assert (tmps2.empty()); - thestate = state_send; - break; - case state_send: - assert (tmps1.empty()); - thestate = state_wait; - break; - case state_wait: - assert (tmps1.empty()); - assert (tmps2.empty()); - thestate = state_done; - break; - case state_get_buffer_sizes: // The sizes of the collective communication buffers are known // so now allocate them. @@ -127,7 +92,7 @@ void comm_state::step () case state_empty_recv_buffers: // Finish (at least one of) the posted communications - if (! AllPostedCommunicationsFinished (use_waitall)) { + if (! AllPostedCommunicationsFinished ()) { // No state change if there are still outstanding communications; // do another comm_state loop iteration. } else { @@ -140,38 +105,50 @@ void comm_state::step () } break; - case state_done: - assert (0); default: - assert (0); + assert (0 && "invalid state"); + } + } else { + switch (thestate) { + case state_post: + // After all sends/recvs have been posted in 'state_post' + // now wait for their completion in 'state_wait'. + thestate = state_wait; + break; + case state_wait: + thestate = state_done; + break; + default: + assert (0 && "invalid state"); } } } bool comm_state::done () { - return thestate==state_done; + return thestate == state_done; } comm_state::~comm_state () { assert (thestate == state_done || thestate == uses_collective_communication_buffers ? - state_get_buffer_sizes : state_recv); - assert (tmps1.empty()); - assert (tmps2.empty()); + state_get_buffer_sizes : state_post); assert (requests.empty()); } // wait for completion of posted collective buffer sends/receives // -// Depending on use_waitall, this function will wait for all at once (true) -// or at least one (false) of the posted receive operations to finish. +// Depending on the parameter CarpetLib::use_waitall, this function will wait +// for all at once (true) or at least one (false) of the +// posted receive operations to finish. // // It returns true if all posted communications have been completed. -bool comm_state::AllPostedCommunicationsFinished (bool use_waitall) +bool comm_state::AllPostedCommunicationsFinished () { + DECLARE_CCTK_PARAMETERS; + // check if all outstanding receives have been completed already if (num_posted_recvs == num_completed_recvs) { // finalize the outstanding sends in one go @@ -215,16 +192,6 @@ bool comm_state::AllPostedCommunicationsFinished (bool use_waitall) } -comm_state::gcommbuf::gcommbuf () -{ -} - -comm_state::gcommbuf::~gcommbuf () -{ -} - - - template<typename T> comm_state::commbuf<T>::commbuf (ibbox const & box) { diff --git a/Carpet/CarpetLib/src/commstate.hh b/Carpet/CarpetLib/src/commstate.hh index 21c05320e..e120fa5e7 100644 --- a/Carpet/CarpetLib/src/commstate.hh +++ b/Carpet/CarpetLib/src/commstate.hh @@ -21,8 +21,8 @@ class gdata; // Depending on how a comm state object was created, // it will step through one of two state transitions (in the given order): enum astate { - // "recv -> send -> wait" are used for single-component communications - state_recv, state_send, state_wait, + // these are used for communications on individual components + state_post, state_wait, // these are used for collective communications state_get_buffer_sizes, state_fill_send_buffers, state_empty_recv_buffers, @@ -56,16 +56,13 @@ public: // the following members are used for single-component communications ////////////////////////////////////////////////////////////////////////// - // Lists of temporary data objects - queue<gdata*> tmps1, tmps2; - // List of MPI requests for use_waitall vector<MPI_Request> requests; // Lists of communication buffers for use_lightweight_buffers struct gcommbuf { - gcommbuf (); - virtual ~gcommbuf (); + gcommbuf () {}; + virtual ~gcommbuf () {}; MPI_Request request; virtual void const * pointer () const = 0; virtual void * pointer () = 0; @@ -135,7 +132,7 @@ private: int num_completed_recvs; // wait for completion of posted collective buffer sends/receives - bool AllPostedCommunicationsFinished(bool use_waitall); + bool AllPostedCommunicationsFinished(); }; diff --git a/Carpet/CarpetLib/src/gdata.cc b/Carpet/CarpetLib/src/gdata.cc index 31f62b38e..f5e3ac081 100644 --- a/Carpet/CarpetLib/src/gdata.cc +++ b/Carpet/CarpetLib/src/gdata.cc @@ -26,7 +26,7 @@ using namespace std; static int nexttag () { DECLARE_CCTK_PARAMETERS; - + int const min_tag = 100; static int last = 0; ++last; @@ -66,29 +66,16 @@ void gdata::change_processor (comm_state& state, const int newproc, void* const mem) { - DECLARE_CCTK_PARAMETERS; - switch (state.thestate) { - case state_recv: - if (combine_recv_send) { - change_processor_recv (state, newproc, mem); - change_processor_send (state, newproc, mem); - } else { - change_processor_recv (state, newproc, mem); - } - break; - case state_send: - if (combine_recv_send) { - // do nothing - } else { - change_processor_send (state, newproc, mem); - } + case state_post: + change_processor_recv (state, newproc, mem); + change_processor_send (state, newproc, mem); break; case state_wait: change_processor_wait (state, newproc, mem); break; default: - assert(0); + assert(0 && "invalid state"); } } @@ -98,39 +85,34 @@ void gdata::change_processor (comm_state& state, void gdata::copy_from (comm_state& state, const gdata* src, const ibbox& box) { - DECLARE_CCTK_PARAMETERS; - assert (has_storage() && src->has_storage()); assert (all(box.lower()>=extent().lower() - && box.lower()>=src->extent().lower())); + && box.lower()>=src->extent().lower())); assert (all(box.upper()<=extent().upper() - && box.upper()<=src->extent().upper())); + && box.upper()<=src->extent().upper())); assert (all(box.stride()==extent().stride() - && box.stride()==src->extent().stride())); + && box.stride()==src->extent().stride())); assert (all((box.lower()-extent().lower())%box.stride() == 0 - && (box.lower()-src->extent().lower())%box.stride() == 0)); - + && (box.lower()-src->extent().lower())%box.stride() == 0)); + if (box.empty()) return; - + if (dist::rank() != proc() && dist::rank() != src->proc()) return; + switch (state.thestate) { - case state_recv: - if (combine_recv_send) { - copy_from_recv (state, src, box); - copy_from_send (state, src, box); + case state_post: + if (proc() == src->proc()) { + copy_from_innerloop (src, box); } else { - copy_from_recv (state, src, box); - } - break; - case state_send: - if (combine_recv_send) { - // do nothing - } else { - copy_from_send (state, src, box); + copy_from_post (state, src, box); } break; + case state_wait: - copy_from_wait (state, src, box); + if (proc() != src->proc()) { + copy_from_wait (state, src, box); + } break; + case state_get_buffer_sizes: // don't count processor-local copies if (proc() != src->proc()) { @@ -144,13 +126,19 @@ void gdata::copy_from (comm_state& state, } } break; + case state_fill_send_buffers: // if this is a source processor: copy its data into the send buffer // (the processor-local case is also handled here) if (src->proc() == dist::rank()) { - copy_into_sendbuffer (state, src, box); + if (proc() == src->proc()) { + copy_from_innerloop (src, box); + } else { + copy_into_sendbuffer (state, src, box); + } } break; + case state_empty_recv_buffers: // if this is a destination processor and data has already been received // from the source processor: copy it from the recv buffer @@ -158,286 +146,160 @@ void gdata::copy_from (comm_state& state, copy_from_recvbuffer (state, src, box); } break; + default: - assert(0); + assert(0 && "invalid state"); } } - -void gdata::copy_from_nocomm (const gdata* src, const ibbox& box) +void gdata::copy_from_post (comm_state& state, + const gdata* src, const ibbox& box) { - assert (has_storage() && src->has_storage()); - assert (proc() == src->proc()); - - // copy on same processor + wtime_copyfrom_recv.start(); + if (dist::rank() == proc()) { - copy_from_innerloop (src, box); - } -} + // this processor receives data + wtime_copyfrom_recvinner_allocate.start(); + comm_state::gcommbuf * b = make_typed_commbuf (box); + wtime_copyfrom_recvinner_allocate.stop(); + + wtime_copyfrom_recvinner_recv.start(); + MPI_Irecv (b->pointer(), b->size(), b->datatype(), src->proc(), + tag, dist::comm, &b->request); + wtime_copyfrom_recvinner_recv.stop(); + state.requests.push_back (b->request); + state.recvbufs.push (b); -void gdata::copy_from_recv (comm_state& state, - const gdata* src, const ibbox& box) -{ - DECLARE_CCTK_PARAMETERS; - - wtime_copyfrom_recv.start(); - - if (proc() == src->proc()) { - // copy on same processor - } else { - // copy to different processor - - if (! use_lightweight_buffers) { - - wtime_copyfrom_recv_maketyped.start(); - gdata* const tmp = make_typed(varindex, transport_operator); - wtime_copyfrom_recv_maketyped.stop(); - state.tmps1.push (tmp); - wtime_copyfrom_recv_allocate.start(); - tmp->allocate (box, src->proc()); - wtime_copyfrom_recv_allocate.stop(); - wtime_copyfrom_recv_changeproc_recv.start(); - tmp->change_processor_recv (state, proc()); - wtime_copyfrom_recv_changeproc_recv.stop(); - - } else { - - if (dist::rank() == proc()) { - // this processor receives data - - wtime_copyfrom_recvinner_allocate.start(); - comm_state::gcommbuf * b = make_typed_commbuf (box); - wtime_copyfrom_recvinner_allocate.stop(); - - wtime_copyfrom_recvinner_recv.start(); - MPI_Irecv (b->pointer(), b->size(), b->datatype(), src->proc(), - tag, dist::comm, &b->request); - wtime_copyfrom_recvinner_recv.stop(); - if (use_waitall) { - state.requests.push_back (b->request); - } - state.recvbufs.push (b); - + // this processor sends data + + wtime_copyfrom_sendinner_allocate.start(); + comm_state::gcommbuf * b = src->make_typed_commbuf (box); + wtime_copyfrom_sendinner_allocate.stop(); + + // copy data into send buffer + wtime_copyfrom_sendinner_copy.start(); + const ibbox& ext = src->extent(); + ivect shape = ext.shape() / ext.stride(); + ivect items = (box.upper() - box.lower()) / box.stride() + 1; + ivect offs = (box.lower() - ext.lower()) / ext.stride(); + char* send_buffer = (char*) b->pointer(); + const int vartype = CCTK_VarTypeI(varindex); + const int vartypesize = CCTK_VarTypeSize(vartype); + assert(vartypesize >= 0); + + for (int k = 0; k < items[2]; k++) { + for (int j = 0; j < items[1]; j++) { + int i = offs[0] + shape[0]*((j+offs[1]) + shape[1]*(k+offs[2])); + memcpy (send_buffer, + ((char*) src->storage()) + vartypesize*i, + vartypesize*items[0]); + send_buffer += vartypesize*items[0]; } - } - - } - - wtime_copyfrom_recv.stop(); -} + wtime_copyfrom_sendinner_copy.stop(); -void gdata::copy_from_send (comm_state& state, - const gdata* src, const ibbox& box) -{ - DECLARE_CCTK_PARAMETERS; - - wtime_copyfrom_send.start(); - - if (proc() == src->proc()) { - // copy on same processor - - wtime_copyfrom_send_copyfrom_nocomm1.start(); - copy_from_nocomm (src, box); - wtime_copyfrom_send_copyfrom_nocomm1.stop(); - - } else { - // copy to different processor - - if (! use_lightweight_buffers) { - - gdata* const tmp = state.tmps1.front(); - state.tmps1.pop(); - state.tmps2.push (tmp); - assert (tmp); - wtime_copyfrom_send_copyfrom_nocomm2.start(); - tmp->copy_from_nocomm (src, box); - wtime_copyfrom_send_copyfrom_nocomm2.stop(); - wtime_copyfrom_send_changeproc_send.start(); - tmp->change_processor_send (state, proc()); - wtime_copyfrom_send_changeproc_send.stop(); - - } else { - - if (dist::rank() == src->proc()) { - // this processor sends data - - wtime_copyfrom_sendinner_allocate.start(); - comm_state::gcommbuf * b = src->make_typed_commbuf (box); - wtime_copyfrom_sendinner_allocate.stop(); - - wtime_copyfrom_sendinner_copy.start(); - assert (src->_has_storage); - assert (dist::rank() == src->proc()); - gdata * tmp = src->make_typed (varindex, transport_operator, tag); - tmp->allocate (box, src->proc(), b->pointer()); - tmp->copy_from_innerloop (src, box); - delete tmp; - wtime_copyfrom_sendinner_copy.stop(); - - wtime_copyfrom_sendinner_send.start(); - MPI_Isend (b->pointer(), b->size(), b->datatype(), proc(), + wtime_copyfrom_sendinner_send.start(); + MPI_Isend (b->pointer(), b->size(), b->datatype(), proc(), tag, dist::comm, &b->request); - wtime_copyfrom_sendinner_send.stop(); - if (use_waitall) { - state.requests.push_back (b->request); - } - state.sendbufs.push (b); - - } - } - + wtime_copyfrom_sendinner_send.stop(); + state.requests.push_back (b->request); + state.sendbufs.push (b); } - - wtime_copyfrom_send.stop(); + + wtime_copyfrom_recv.stop(); } + void gdata::copy_from_wait (comm_state& state, const gdata* src, const ibbox& box) { - DECLARE_CCTK_PARAMETERS; - wtime_copyfrom_wait.start(); - - if (proc() == src->proc()) { - // copy on same processor - - } else { - // copy to different processor - - if (! use_lightweight_buffers) { - - gdata* const tmp = state.tmps2.front(); - state.tmps2.pop(); - assert (tmp); - wtime_copyfrom_wait_changeproc_wait.start(); - tmp->change_processor_wait (state, proc()); - wtime_copyfrom_wait_changeproc_wait.stop(); - wtime_copyfrom_wait_copyfrom_nocomm.start(); - copy_from_nocomm (tmp, box); - wtime_copyfrom_wait_copyfrom_nocomm.stop(); - wtime_copyfrom_wait_delete.start(); - delete tmp; - wtime_copyfrom_wait_delete.stop(); - - } else { - - if (dist::rank() == proc()) { - // this processor receives data - - comm_state::gcommbuf * b = state.recvbufs.front(); - state.recvbufs.pop(); - - wtime_copyfrom_recvwaitinner_wait.start(); - if (use_waitall) { - if (! state.requests.empty()) { - // wait for all requests at once - MPI_Waitall (state.requests.size(), &state.requests.front(), - MPI_STATUSES_IGNORE); - state.requests.clear(); - } - } - - if (! use_waitall) { - MPI_Wait (&b->request, MPI_STATUS_IGNORE); - } - wtime_copyfrom_recvwaitinner_wait.stop(); - - wtime_copyfrom_recvwaitinner_copy.start(); - assert (_has_storage); - assert (dist::rank() == proc()); - gdata * tmp = make_typed (varindex, transport_operator, tag); - tmp->allocate (box, proc(), b->pointer()); - copy_from_innerloop (tmp, box); - delete tmp; - wtime_copyfrom_recvwaitinner_copy.stop(); - - wtime_copyfrom_recvwaitinner_delete.start(); - delete b; - wtime_copyfrom_recvwaitinner_delete.stop(); - - } - - if (dist::rank() == src->proc()) { - // this processor sends data - - comm_state::gcommbuf * b = state.sendbufs.front(); - state.sendbufs.pop(); - - wtime_copyfrom_sendwaitinner_wait.start(); - if (use_waitall) { - if (! state.requests.empty()) { - // wait for all requests at once - MPI_Waitall (state.requests.size(), &state.requests.front(), - MPI_STATUSES_IGNORE); - state.requests.clear(); - } - } - - if (! use_waitall) { - MPI_Wait (&b->request, MPI_STATUS_IGNORE); - } - wtime_copyfrom_sendwaitinner_wait.stop(); - - wtime_copyfrom_sendwaitinner_delete.start(); - delete b; - wtime_copyfrom_sendwaitinner_delete.stop(); - + + wtime_copyfrom_recvwaitinner_wait.start(); + if (! state.requests.empty()) { + // wait for all requests at once + MPI_Waitall (state.requests.size(), &state.requests.front(), + MPI_STATUSES_IGNORE); + state.requests.clear(); + } + wtime_copyfrom_recvwaitinner_wait.stop(); + + queue<comm_state::gcommbuf*>* const bufs = + dist::rank() == proc() ? &state.recvbufs : &state.sendbufs; + comm_state::gcommbuf* b = bufs->front(); + + // copy data out of receive buffer + if (bufs == &state.recvbufs) { + wtime_copyfrom_recvwaitinner_copy.start(); + const ibbox& ext = extent(); + ivect shape = ext.shape() / ext.stride(); + ivect items = (box.upper() - box.lower()) / box.stride() + 1; + ivect offs = (box.lower() - ext.lower()) / ext.stride(); + const char* recv_buffer = (const char*) b->pointer(); + const int vartype = CCTK_VarTypeI(varindex); + const int vartypesize = CCTK_VarTypeSize(vartype); + assert(vartypesize >= 0); + + for (int k = 0; k < items[2]; k++) { + for (int j = 0; j < items[1]; j++) { + int i = offs[0] + shape[0]*((j+offs[1]) + shape[1]*(k+offs[2])); + memcpy (((char*) storage()) + vartypesize*i, + recv_buffer, + vartypesize*items[0]); + recv_buffer += vartypesize*items[0]; } - } - + wtime_copyfrom_recvwaitinner_copy.stop(); } - + + wtime_copyfrom_recvwaitinner_delete.start(); + bufs->pop(); + delete b; + wtime_copyfrom_recvwaitinner_delete.stop(); + wtime_copyfrom_wait.stop(); } // Copy processor-local source data into communication send buffer // of the corresponding destination processor -// The case when both source and destination are local is also handled here. void gdata::copy_into_sendbuffer (comm_state& state, const gdata* src, const ibbox& box) { - if (proc() == src->proc()) { - // copy on same processor - copy_from_innerloop (src, box); - } else { - // copy to remote processor - assert (src->_has_storage); - assert (proc() < state.collbufs.size()); - int fillstate = (state.collbufs[proc()].sendbuf + - box.size()*state.vartypesize) - - state.collbufs[proc()].sendbufbase; - assert (fillstate <= state.collbufs[proc()].sendbufsize*state.vartypesize); - - // copy this processor's data into the send buffer - const ibbox& ext = src->extent(); - ivect shape = ext.shape() / ext.stride(); - ivect items = (box.upper() - box.lower()) / box.stride() + 1; - ivect offs = (box.lower() - ext.lower()) / ext.stride(); + // copy to remote processor + assert (src->has_storage()); + assert (proc() < state.collbufs.size()); + int fillstate = (state.collbufs[proc()].sendbuf + + box.size()*state.vartypesize) - + state.collbufs[proc()].sendbufbase; + assert (fillstate <= state.collbufs[proc()].sendbufsize*state.vartypesize); + + // copy this processor's data into the send buffer + const ibbox& ext = src->extent(); + ivect shape = ext.shape() / ext.stride(); + ivect items = (box.upper() - box.lower()) / box.stride() + 1; + ivect offs = (box.lower() - ext.lower()) / ext.stride(); - for (int k = 0; k < items[2]; k++) { - for (int j = 0; j < items[1]; j++) { - int i = offs[0] + shape[0]*((j+offs[1]) + shape[1]*(k+offs[2])); - memcpy (state.collbufs[proc()].sendbuf, - ((const char*) src->storage()) + state.vartypesize*i, - state.vartypesize*items[0]); - state.collbufs[proc()].sendbuf += state.vartypesize*items[0]; - } + for (int k = 0; k < items[2]; k++) { + for (int j = 0; j < items[1]; j++) { + int i = offs[0] + shape[0]*((j+offs[1]) + shape[1]*(k+offs[2])); + memcpy (state.collbufs[proc()].sendbuf, + ((const char*) src->storage()) + state.vartypesize*i, + state.vartypesize*items[0]); + state.collbufs[proc()].sendbuf += state.vartypesize*items[0]; } + } - // post the send if the buffer is full - if (fillstate == state.collbufs[proc()].sendbufsize*state.vartypesize) { - MPI_Isend (state.collbufs[proc()].sendbufbase, - state.collbufs[proc()].sendbufsize, - state.datatype, proc(), 0, dist::comm, - &state.srequests[proc()]); - } + // post the send if the buffer is full + if (fillstate == state.collbufs[proc()].sendbufsize*state.vartypesize) { + MPI_Isend (state.collbufs[proc()].sendbufbase, + state.collbufs[proc()].sendbufsize, + state.datatype, proc(), 0, dist::comm, + &state.srequests[proc()]); } } @@ -479,8 +341,6 @@ void gdata const int order_space, const int order_time) { - DECLARE_CCTK_PARAMETERS; - assert (transport_operator != op_error); if (transport_operator == op_none) return; @@ -495,38 +355,34 @@ void gdata assert (all(box.lower()>=srcs.at(t)->extent().lower())); assert (all(box.upper()<=srcs.at(t)->extent().upper())); } - assert (! box.empty()); - if (box.empty()) return; - + const gdata* src = srcs.at(0); + if (dist::rank() != proc() && dist::rank() != src->proc()) return; + switch (state.thestate) { - case state_recv: - if (combine_recv_send) { - interpolate_from_recv (state, srcs, times, box, time, order_space, order_time); - interpolate_from_send (state, srcs, times, box, time, order_space, order_time); - } else { - interpolate_from_recv (state, srcs, times, box, time, order_space, order_time); - } - break; - case state_send: - if (combine_recv_send) { - // do nothing + case state_post: + if (proc() == src->proc()) { + interpolate_from_innerloop(srcs, times, box, time, + order_space, order_time); } else { - interpolate_from_send (state, srcs, times, box, time, order_space, order_time); + interpolate_from_post(state, srcs, times, box, time, + order_space, order_time); } break; case state_wait: - interpolate_from_wait (state, srcs, times, box, time, order_space, order_time); + if (proc() != src->proc()) { + copy_from_wait (state, src, box); + } break; case state_get_buffer_sizes: // don't count processor-local interpolations - if (proc() != srcs.at(0)->proc()) { + if (proc() != src->proc()) { // if this is a destination processor: increment its recv buffer size if (proc() == dist::rank()) { - state.collbufs.at(srcs.at(0)->proc()).recvbufsize += box.size(); + state.collbufs.at(src->proc()).recvbufsize += box.size(); } // if this is a source processor: increment its send buffer size - if (srcs.at(0)->proc() == dist::rank()) { + if (src->proc() == dist::rank()) { state.collbufs.at(proc()).sendbufsize += box.size(); } } @@ -534,242 +390,72 @@ void gdata case state_fill_send_buffers: // if this is a source processor: interpolate its data into the send buffer // (the processor-local case is also handled here) - if (srcs.at(0)->proc() == dist::rank()) { - interpolate_into_sendbuffer (state, srcs, times, box, time, + if (src->proc() == dist::rank()) { + if (proc() == src->proc()) { + interpolate_from_innerloop(srcs, times, box, time, order_space, order_time); + } else { + interpolate_into_sendbuffer(state, srcs, times, box, + time, order_space, order_time); + } } break; case state_empty_recv_buffers: // if this is a destination processor and data has already been received // from the source processor: copy it from the recv buffer // (the processor-local case is not handled here) - if (proc() == dist::rank() && state.recvbuffers_ready.at(srcs.at(0)->proc())) { - copy_from_recvbuffer (state, srcs.at(0), box); + if (proc() == dist::rank() && state.recvbuffers_ready.at(src->proc())) { + copy_from_recvbuffer(state, src, box); } break; default: - assert(0); - } -} - - - -void gdata -::interpolate_from_nocomm (const vector<const gdata*> srcs, - const vector<CCTK_REAL> times, - const ibbox& box, const CCTK_REAL time, - const int order_space, - const int order_time) -{ - assert (proc() == srcs.at(0)->proc()); - - assert (transport_operator != op_error); - assert (transport_operator != op_none); - - // interpolate on same processor - if (dist::rank() == proc()) { - interpolate_from_innerloop - (srcs, times, box, time, order_space, order_time); + assert(0 && "invalid state"); } } - void gdata -::interpolate_from_recv (comm_state& state, +::interpolate_from_post (comm_state& state, const vector<const gdata*> srcs, const vector<CCTK_REAL> times, - const ibbox& box, const CCTK_REAL time, + const ibbox& box, + const CCTK_REAL time, const int order_space, const int order_time) { - DECLARE_CCTK_PARAMETERS; - - if (proc() == srcs.at(0)->proc()) { - // interpolate on same processor - - } else { + const gdata* src = srcs.at(0); + if (dist::rank() == proc()) { // interpolate from other processor - - if (! use_lightweight_buffers) { - - gdata* const tmp = make_typed(varindex, transport_operator); - state.tmps1.push (tmp); - tmp->allocate (box, srcs.at(0)->proc()); - tmp->change_processor_recv (state, proc()); - - } else { - - if (dist::rank() == proc()) { - // this processor receives data - - comm_state::gcommbuf * b = make_typed_commbuf (box); - - MPI_Irecv (b->pointer(), b->size(), b->datatype(), srcs.at(0)->proc(), - tag, dist::comm, &b->request); - if (use_waitall) { - state.requests.push_back (b->request); - } - state.recvbufs.push (b); - - } - - } - - } -} + // this processor receives data + comm_state::gcommbuf * b = make_typed_commbuf (box); -void gdata -::interpolate_from_send (comm_state& state, - const vector<const gdata*> srcs, - const vector<CCTK_REAL> times, - const ibbox& box, const CCTK_REAL time, - const int order_space, - const int order_time) -{ - DECLARE_CCTK_PARAMETERS; - - if (proc() == srcs.at(0)->proc()) { - // interpolate on same processor - - interpolate_from_nocomm (srcs, times, box, time, order_space, order_time); - + MPI_Irecv (b->pointer(), b->size(), b->datatype(), src->proc(), + tag, dist::comm, &b->request); + state.requests.push_back (b->request); + state.recvbufs.push (b); } else { - // interpolate from other processor - - if (! use_lightweight_buffers) { - - gdata* const tmp = state.tmps1.front(); - state.tmps1.pop(); - state.tmps2.push (tmp); - assert (tmp); - tmp->interpolate_from_nocomm - (srcs, times, box, time, order_space, order_time); - tmp->change_processor_send (state, proc()); - - } else { - - if (dist::rank() == srcs.at(0)->proc()) { - // this processor sends data - - comm_state::gcommbuf * b = srcs.at(0)->make_typed_commbuf (box); - - assert (srcs.at(0)->_has_storage); - assert (dist::rank() == srcs.at(0)->proc()); - gdata * tmp - = srcs.at(0)->make_typed (varindex, transport_operator, tag); - tmp->allocate (box, srcs.at(0)->proc(), b->pointer()); - tmp->interpolate_from_innerloop - (srcs, times, box, time, order_space, order_time); - delete tmp; - - MPI_Isend (b->pointer(), b->size(), b->datatype(), proc(), - tag, dist::comm, &b->request); - if (use_waitall) { - state.requests.push_back (b->request); - } - state.sendbufs.push (b); - - } - - } - - } -} + // this processor sends data + comm_state::gcommbuf * b = src->make_typed_commbuf (box); + gdata * tmp = src->make_typed (varindex, transport_operator, tag); + tmp->allocate (box, src->proc(), b->pointer()); + tmp->interpolate_from_innerloop (srcs, times, box, time, + order_space, order_time); + delete tmp; -void gdata -::interpolate_from_wait (comm_state& state, - const vector<const gdata*> srcs, - const vector<CCTK_REAL> times, - const ibbox& box, const CCTK_REAL time, - const int order_space, - const int order_time) -{ - DECLARE_CCTK_PARAMETERS; - - if (proc() == srcs.at(0)->proc()) { - // interpolate on same processor - - } else { - // interpolate from other processor - - if (! use_lightweight_buffers) { - - gdata* const tmp = state.tmps2.front(); - state.tmps2.pop(); - assert (tmp); - tmp->change_processor_wait (state, proc()); - copy_from_nocomm (tmp, box); - delete tmp; - - } else { - - if (dist::rank() == proc()) { - // this processor receives data - - comm_state::gcommbuf * b = state.recvbufs.front(); - state.recvbufs.pop(); - - if (use_waitall) { - if (! state.requests.empty()) { - // wait for all requests at once - MPI_Waitall (state.requests.size(), &state.requests.front(), - MPI_STATUSES_IGNORE); - state.requests.clear(); - } - } - - if (! use_waitall) { - MPI_Wait (&b->request, MPI_STATUS_IGNORE); - } - - assert (_has_storage); - assert (dist::rank() == proc()); - gdata * tmp = make_typed (varindex, transport_operator, tag); - tmp->allocate (box, proc(), b->pointer()); - copy_from_innerloop (tmp, box); - delete tmp; - - delete b; - - } - - if (dist::rank() == srcs.at(0)->proc()) { - // this processor sends data - - comm_state::gcommbuf * b = state.sendbufs.front(); - state.sendbufs.pop(); - - if (use_waitall) { - if (! state.requests.empty()) { - // wait for all requests at once - MPI_Waitall (state.requests.size(), &state.requests.front(), - MPI_STATUSES_IGNORE); - state.requests.clear(); - } - } - - if (! use_waitall) { - MPI_Wait (&b->request, MPI_STATUS_IGNORE); - } - - delete b; - - } - - } - + MPI_Isend (b->pointer(), b->size(), b->datatype(), proc(), + tag, dist::comm, &b->request); + state.requests.push_back (b->request); + state.sendbufs.push (b); } } // Interpolate processor-local source data into communication send buffer // of the corresponding destination processor -// The case when both source and destination are local is also handled here. void gdata ::interpolate_into_sendbuffer (comm_state& state, const vector<const gdata*> srcs, @@ -779,35 +465,30 @@ void gdata const int order_space, const int order_time) { - if (proc() == srcs.at(0)->proc()) { - // interpolate on same processor - interpolate_from_innerloop (srcs, times, box, time, - order_space, order_time); - } else { - // interpolate to remote processor - assert (srcs.at(0)->_has_storage); - assert (proc() < state.collbufs.size()); - int fillstate = (state.collbufs[proc()].sendbuf + - box.size()*state.vartypesize) - - state.collbufs[proc()].sendbufbase; - assert (fillstate <= state.collbufs[proc()].sendbufsize*state.vartypesize); - - // interpolate this processor's data into the send buffer - gdata* tmp = srcs.at(0)->make_typed (varindex, transport_operator, tag); - tmp->allocate (box, srcs.at(0)->proc(), state.collbufs[proc()].sendbuf); - tmp->interpolate_from_innerloop (srcs, times, box, time, - order_space, order_time); - delete tmp; + // interpolate to remote processor + const gdata* src = srcs.at(0); + assert (src->has_storage()); + assert (proc() < state.collbufs.size()); + int fillstate = (state.collbufs[proc()].sendbuf + + box.size()*state.vartypesize) - + state.collbufs[proc()].sendbufbase; + assert (fillstate <= state.collbufs[proc()].sendbufsize*state.vartypesize); + + // interpolate this processor's data into the send buffer + gdata* tmp = src->make_typed (varindex, transport_operator, tag); + tmp->allocate (box, src->proc(), state.collbufs[proc()].sendbuf); + tmp->interpolate_from_innerloop (srcs, times, box, time, + order_space, order_time); + delete tmp; - // advance send buffer to point to the next ibbox slot - state.collbufs[proc()].sendbuf += state.vartypesize * box.size(); + // advance send buffer to point to the next ibbox slot + state.collbufs[proc()].sendbuf += state.vartypesize * box.size(); - // post the send if the buffer is full - if (fillstate == state.collbufs[proc()].sendbufsize*state.vartypesize) { - MPI_Isend (state.collbufs[proc()].sendbufbase, - state.collbufs[proc()].sendbufsize, - state.datatype, proc(), 0, dist::comm, - &state.srequests[proc()]); - } + // post the send if the buffer is full + if (fillstate == state.collbufs[proc()].sendbufsize*state.vartypesize) { + MPI_Isend (state.collbufs[proc()].sendbufbase, + state.collbufs[proc()].sendbufsize, + state.datatype, proc(), 0, dist::comm, + &state.srequests[proc()]); } } diff --git a/Carpet/CarpetLib/src/gdata.hh b/Carpet/CarpetLib/src/gdata.hh index 8750c4bc5..8f2b66035 100644 --- a/Carpet/CarpetLib/src/gdata.hh +++ b/Carpet/CarpetLib/src/gdata.hh @@ -141,14 +141,26 @@ private: const = 0; public: - void copy_from (comm_state& state, - const gdata* src, const ibbox& box); + void copy_from (comm_state& state, + const gdata* src, const ibbox& box); + void interpolate_from (comm_state& state, + const vector<const gdata*> srcs, + const vector<CCTK_REAL> times, + const ibbox& box, + const CCTK_REAL time, + const int order_space, + const int order_time); + private: - void copy_from_nocomm (const gdata* src, const ibbox& box); - void copy_from_recv (comm_state& state, - const gdata* src, const ibbox& box); - void copy_from_send (comm_state& state, + void copy_from_post (comm_state& state, const gdata* src, const ibbox& box); + void interpolate_from_post (comm_state& state, + const vector<const gdata*> srcs, + const vector<CCTK_REAL> times, + const ibbox& box, + const CCTK_REAL time, + const int order_space, + const int order_time); void copy_from_wait (comm_state& state, const gdata* src, const ibbox& box); @@ -161,63 +173,6 @@ private: // of the corresponding source processor void copy_from_recvbuffer (comm_state& state, const gdata* src, const ibbox& box); - -#if 0 - protected: - virtual void - copy_from_recv_inner (comm_state& state, - const gdata* src, - const ibbox& box) - = 0; - virtual void - copy_from_send_inner (comm_state& state, - const gdata* src, - const ibbox& box) - = 0; - virtual void - copy_from_recv_wait_inner (comm_state& state, - const gdata* src, - const ibbox& box) - = 0; - virtual void - copy_from_send_wait_inner (comm_state& state, - const gdata* src, - const ibbox& box) - = 0; -#endif - - public: - void interpolate_from (comm_state& state, - const vector<const gdata*> srcs, - const vector<CCTK_REAL> times, - const ibbox& box, const CCTK_REAL time, - const int order_space, - const int order_time); - private: - void interpolate_from_nocomm (const vector<const gdata*> srcs, - const vector<CCTK_REAL> times, - const ibbox& box, const CCTK_REAL time, - const int order_space, - const int order_time); - void interpolate_from_recv (comm_state& state, - const vector<const gdata*> srcs, - const vector<CCTK_REAL> times, - const ibbox& box, const CCTK_REAL time, - const int order_space, - const int order_time); - void interpolate_from_send (comm_state& state, - const vector<const gdata*> srcs, - const vector<CCTK_REAL> times, - const ibbox& box, const CCTK_REAL time, - const int order_space, - const int order_time); - void interpolate_from_wait (comm_state& state, - const vector<const gdata*> srcs, - const vector<CCTK_REAL> times, - const ibbox& box, const CCTK_REAL time, - const int order_space, - const int order_time); - // Interpolate processor-local source data into communication send buffer // of the corresponding destination processor // The case when both source and destination are local is also handled here. @@ -228,15 +183,15 @@ private: const CCTK_REAL time, const int order_space, const int order_time); - public: - -protected: + + protected: virtual void copy_from_innerloop (const gdata* src, const ibbox& box) = 0; virtual void interpolate_from_innerloop (const vector<const gdata*> srcs, const vector<CCTK_REAL> times, - const ibbox& box, const CCTK_REAL time, + const ibbox& box, + const CCTK_REAL time, const int order_space, const int order_time) = 0; }; |