diff options
author | Erik Schnetter <schnetter@cct.lsu.edu> | 2007-04-19 01:43:00 +0000 |
---|---|---|
committer | Erik Schnetter <schnetter@cct.lsu.edu> | 2007-04-19 01:43:00 +0000 |
commit | 2f888b8438cc0eac248c28a0fdaa7d03a21ebdca (patch) | |
tree | b3ad88d1dd74e7960efbe415691138f964712800 /Carpet/CarpetLib/src | |
parent | c1260b1a96d4403a40764d846e633d5b9facc266 (diff) |
CarpetLib: Various changes to class data
Remove outdated communication mechanisms from data<T> and gdata
classes.
Redesign communication mechanism in data<T> and gdata classes.
darcs-hash:20070419014301-dae7b-ece41543dcb1e23ab81f103b04a75880ac68807f.gz
Diffstat (limited to 'Carpet/CarpetLib/src')
-rw-r--r-- | Carpet/CarpetLib/src/data.cc | 433 | ||||
-rw-r--r-- | Carpet/CarpetLib/src/data.hh | 102 | ||||
-rw-r--r-- | Carpet/CarpetLib/src/gdata.cc | 642 | ||||
-rw-r--r-- | Carpet/CarpetLib/src/gdata.hh | 123 |
4 files changed, 390 insertions, 910 deletions
diff --git a/Carpet/CarpetLib/src/data.cc b/Carpet/CarpetLib/src/data.cc index 0bb3e7924..62f724a17 100644 --- a/Carpet/CarpetLib/src/data.cc +++ b/Carpet/CarpetLib/src/data.cc @@ -204,7 +204,7 @@ void data<T>::allocate (const ibbox& extent_, _size = 1; for (int d=0; d<dim; ++d) { _stride[d] = _size; - assert (_shape[d]==0 || _size <= INT_MAX / _shape[d]); + assert (_shape[d]==0 or _size <= numeric_limits<int>::max() / _shape[d]); _size *= _shape[d]; } _proc = proc_; @@ -238,202 +238,13 @@ void data<T>::free () -// Processor management -template<typename T> -void data<T>::change_processor_recv (comm_state& state, - const int newproc, - void* const memptr) -{ - DECLARE_CCTK_PARAMETERS; - - assert (not comm_active); - comm_active = true; - - if (newproc == _proc) { - assert (not memptr); - return; - } - - static Timer total ("change_processor_recv"); - total.start (); - - assert (vectorlength == 1); - - if (_has_storage) { - if (dist::rank() == newproc) { - // copy from other processor - - assert (not _memory); - _memory = new mem<T> (1, _size, (T*)memptr); - _memory->register_client (0); - _storage = _memory->storage (0); - - static Timer timer ("irecv"); - timer.start (); - T dummy; - MPI_Irecv (_memory->storage(0), - _size, dist::datatype(dummy), proc(), - tag, dist::comm(), &request); - timer.stop (_size * sizeof(T)); - if (use_waitall) { - state.requests.push_back (request); - } - - } else if (dist::rank() == _proc) { - // copy to other processor - - } else { - assert (not memptr); - assert (not _memory); - } - } - - total.stop (0); -} - - - -template<typename T> -void data<T>::change_processor_send (comm_state& state, - const int newproc, - void* const memptr) -{ - DECLARE_CCTK_PARAMETERS; - - assert (comm_active); - - if (newproc == _proc) { - assert (not memptr); - return; - } - - static Timer total ("change_processor_send"); - total.start(); - - assert (vectorlength == 1); - - if (_has_storage) { - if (dist::rank() == newproc) { - // copy from other processor - - } else if (dist::rank() == _proc) { - // copy to other processor - - assert (not memptr); - assert (_memory); - - static Timer timer ("isend"); - timer.start (); - T dummy; - MPI_Isend (_memory->storage(0), - _size, dist::datatype(dummy), newproc, - tag, dist::comm(), &request); - timer.stop (_size * sizeof(T)); - if (use_waitall) { - state.requests.push_back (request); - } - - } else { - assert (not memptr); - assert (not _memory); - } - } - - total.stop (0); -} - - - -template<typename T> -void data<T>::change_processor_wait (comm_state& state, - const int newproc, - void* const memptr) -{ - DECLARE_CCTK_PARAMETERS; - - assert (comm_active); - comm_active = false; - - if (newproc == _proc) { - assert (not memptr); - return; - } - - static Timer total ("change_processor_wait"); - total.start (); - - assert (vectorlength == 1); - - if (use_waitall) { - if (not state.requests.empty()) { - // wait for all requests at once - static Timer timer ("irecvwait"); - timer.start (); - MPI_Waitall - (state.requests.size(), &state.requests.front(), MPI_STATUSES_IGNORE); - timer.stop (0); - state.requests.clear(); - } - } - - if (_has_storage) { - if (dist::rank() == newproc) { - // copy from other processor - - if (not use_waitall) { - static Timer timer ("irecvwait"); - timer.start (); - MPI_Wait (&request, MPI_STATUS_IGNORE); - timer.stop (0); - } - - } else if (dist::rank() == _proc) { - // copy to other processor - - assert (not memptr); - assert (_memory); - - if (not use_waitall) { - static Timer timer ("isendwait"); - timer.start (); - MPI_Wait (&request, MPI_STATUS_IGNORE); - timer.stop (0); - } - - _memory->unregister_client (0); - if (not _memory->has_clients()) delete _memory; - _memory = NULL;; - _storage = NULL; - - } else { - assert (not memptr); - assert (not _memory); - } - } - - _proc = newproc; - - total.stop (0); -} - - - // Data manipulators -template<typename T> -comm_state::gcommbuf * -data<T>:: -make_typed_commbuf (const ibbox & box) - const -{ - return new comm_state::commbuf<T> (box); -} - - template <typename T> -void data <T> -::copy_from_innerloop (gdata const * const gsrc, - ibbox const & box) +void +data <T>:: +copy_from_innerloop (gdata const * const gsrc, + ibbox const & box) { data const * const src = dynamic_cast <data const *> (gsrc); assert (has_storage() and src->has_storage()); @@ -453,107 +264,78 @@ void data <T> template <typename T> -void data <T> -::interpolate_from_innerloop (vector <gdata const *> const & gsrcs, - vector <CCTK_REAL> const & times, - ibbox const & box, - CCTK_REAL const time, - int const order_space, - int const order_time) +void +data <T>:: +transfer_from_innerloop (vector <gdata const *> const & gsrcs, + vector <CCTK_REAL> const & times, + ibbox const & box, + CCTK_REAL const time, + int const order_space, + int const order_time) { assert (has_storage()); - - vector <data const *> srcs (gsrcs.size()); - for (size_t t=0; t<srcs.size(); ++t) { - srcs.AT(t) = dynamic_cast <data const *> (gsrcs.AT(t)); - } - assert (srcs.size() == times.size() and srcs.size() > 0); - - for (size_t t=0; t<srcs.size(); ++t) { - assert (srcs.AT(t)->has_storage()); - assert (proc() == srcs.AT(t)->proc()); - } - assert (dist::rank() == proc()); + for (size_t tl=0; tl<gsrcs.size(); ++tl) { + if (gsrcs.AT(tl)) { + assert (gsrcs.AT(tl)->has_storage()); + assert (gsrcs.AT(tl)->proc() == proc()); + } + } - interpolate_time (srcs, times, box, time, order_space, order_time); + transfer_time (gsrcs, times, box, time, order_space, order_time); } template <typename T> -void data <T> -::interpolate_time (vector <data const *> const & srcs, - vector <CCTK_REAL> const & times, - ibbox const & box, - CCTK_REAL const time, - int const order_space, - int const order_time) +void +data <T>:: +transfer_time (vector <gdata const *> const & gsrcs, + vector <CCTK_REAL> const & times, + ibbox const & box, + CCTK_REAL const time, + int const order_space, + int const order_time) { - // Ensure that the times are consistent - assert (times.size() > 0); - CCTK_REAL const min_time = * min_element (times.begin(), times.end()); - CCTK_REAL const max_time = * max_element (times.begin(), times.end()); - if (transport_operator != op_copy) { - if (time < min_time - eps or time > max_time + eps) { - ostringstream buf; - buf << "Internal error: extrapolation in time." - << " time=" << time - << " times=" << times; - CCTK_WARN (0, buf.str().c_str()); - } - } - // Use this timelevel, or interpolate in time if set to -1 - int timelevel = -1; - - // Try to avoid time interpolation if possible - if (timelevel == -1) { - if (times.size() == 1) { - timelevel = 0; - } - } - if (timelevel == -1) { - if (transport_operator == op_copy) { - timelevel = 0; - } - } - if (timelevel == -1) { - for (size_t tl=0; tl<times.size(); ++tl) { - static_assert (abs(0.1) > 0, - "Function CarpetLib::abs has wrong signature"); - if (abs (times.AT(tl) - time) < eps) { - timelevel = tl; - break; - } - } - } + int timelevel0, ntimelevels; + find_source_timelevel (times, time, order_time, timelevel0, ntimelevels); - if (timelevel == -1) { + if (ntimelevels > 1) { // Time interpolation is necessary + assert (timelevel0 == 0); - vector <data *> tmps (times.size()); + assert ((int)gsrcs.size() >= ntimelevels); + assert ((int)times.size() >= ntimelevels); - for (size_t tl=0; tl<times.size(); ++tl) { - + data * const null = 0; + vector <data *> tmps (timelevel0 + ntimelevels, null); + + for (int tl = timelevel0; tl < timelevel0 + ntimelevels; ++ tl) { tmps.AT(tl) = new data (this->varindex, this->cent, this->transport_operator); tmps.AT(tl)->allocate (box, this->proc()); - tmps.AT(tl)->interpolate_p_r (srcs.AT(tl), box, order_space); - + assert (gsrcs.AT(tl)); + data const * const src = dynamic_cast <data const *> (gsrcs.AT(tl)); + tmps.AT(tl)->transfer_p_r (src, box, order_space); } time_interpolate (tmps, box, times, time, order_time); - for (size_t tl=0; tl<times.size(); ++tl) { + for (int tl = timelevel0; tl < timelevel0 + ntimelevels; ++ tl) { delete tmps.AT(tl); } } else { // No time interpolation - interpolate_p_r (srcs.AT(timelevel), box, order_space); + assert ((int)gsrcs.size() > timelevel0); + assert ((int)times.size() > timelevel0); + + data const * const src = dynamic_cast <data const *> (gsrcs.AT(timelevel0)); + + transfer_p_r (src, box, order_space); } // if } @@ -561,17 +343,23 @@ void data <T> template <typename T> -void data <T> -::interpolate_p_r (data const * const src, - ibbox const & box, - int const order_space) +void +data <T>:: +transfer_p_r (data const * const src, + ibbox const & box, + int const order_space) { - if (all (src->extent().stride() > this->extent().stride())) { + if (all (src->extent().stride() == this->extent().stride())) { + // Copy + copy_from_innerloop (src, box); + } else if (all (src->extent().stride() > this->extent().stride())) { // Prolongate - interpolate_p_vc_cc (src, box, order_space); + assert (transport_operator != op_sync); + transfer_p_vc_cc (src, box, order_space); } else if (all (src->extent().stride() < this->extent().stride())) { // Restrict - interpolate_restrict (src, box, order_space); + assert (transport_operator != op_sync); + transfer_restrict (src, box, order_space); } else { assert (0); } @@ -580,15 +368,16 @@ void data <T> template <typename T> -void data <T> -::interpolate_p_vc_cc (data const * const src, - ibbox const & box, - int const order_space) +void +data <T>:: +transfer_p_vc_cc (data const * const src, + ibbox const & box, + int const order_space) { if (cent == vertex_centered) { // Vertex centred - interpolate_prolongate (src, box, order_space); + transfer_prolongate (src, box, order_space); } else if (cent == cell_centered) { // Cell centred @@ -648,7 +437,7 @@ void data <T> newsrc->extent()); // Interpolate - newdst->interpolate_prolongate (newsrc, newdstbox, order_space); + newdst->transfer_prolongate (newsrc, newdstbox, order_space); // Convert destination to standard representation prolongate_3d_cc_rf2_prim2std @@ -669,10 +458,11 @@ void data <T> } template <> -void data <CCTK_INT> -::interpolate_p_vc_cc (data const * const src, - ibbox const & box, - int const order_space) +void +data <CCTK_INT>:: +transfer_p_vc_cc (data const * const src, + ibbox const & box, + int const order_space) { CCTK_WARN (0, "Data type not supported"); } @@ -680,10 +470,11 @@ void data <CCTK_INT> template <typename T> -void data <T> -::interpolate_prolongate (data const * const src, - ibbox const & box, - int const order_space) +void +data <T>:: +transfer_prolongate (data const * const src, + ibbox const & box, + int const order_space) { static Timer total ("prolongate"); total.start (); @@ -786,10 +577,11 @@ void data <T> } template <> -void data <CCTK_INT> -::interpolate_prolongate (data const * const src, - ibbox const & box, - int const order_space) +void +data <CCTK_INT>:: +transfer_prolongate (data const * const src, + ibbox const & box, + int const order_space) { CCTK_WARN (0, "Data type not supported"); } @@ -797,10 +589,11 @@ void data <CCTK_INT> template <typename T> -void data <T> -::interpolate_restrict (data const * const src, - ibbox const & box, - int const order_space) +void +data <T>:: +transfer_restrict (data const * const src, + ibbox const & box, + int const order_space) { static Timer total ("restrict"); total.start (); @@ -844,10 +637,11 @@ void data <T> } template <> -void data <CCTK_INT> -::interpolate_restrict (data const * const src, - ibbox const & box, - int const order_space) +void +data <CCTK_INT>:: +transfer_restrict (data const * const src, + ibbox const & box, + int const order_space) { CCTK_WARN (0, "Data type not supported"); } @@ -855,12 +649,13 @@ void data <CCTK_INT> template <typename T> -void data <T> -::time_interpolate (vector <data *> const & srcs, - ibbox const & box, - vector <CCTK_REAL> const & times, - CCTK_REAL const time, - int const order_time) +void +data <T>:: +time_interpolate (vector <data *> const & srcs, + ibbox const & box, + vector <CCTK_REAL> const & times, + CCTK_REAL const time, + int const order_time) { static Timer total ("time_interpolate"); total.start (); @@ -966,12 +761,13 @@ void data <T> } template <> -void data <CCTK_INT> -::time_interpolate (vector <data *> const & srcs, - ibbox const & box, - vector <CCTK_REAL> const & times, - CCTK_REAL const time, - int const order_time) +void +data <CCTK_INT>:: +time_interpolate (vector <data *> const & srcs, + ibbox const & box, + vector <CCTK_REAL> const & times, + CCTK_REAL const time, + int const order_time) { CCTK_WARN (0, "Data type not supported"); } @@ -980,7 +776,10 @@ void data <CCTK_INT> // Output template<typename T> -ostream& data<T>::output (ostream& os) const +ostream & +data<T>:: +output (ostream & os) + const { T Tdummy; os << "data<" << typestring(Tdummy) << ">:" @@ -990,7 +789,8 @@ ostream& data<T>::output (ostream& os) const } template<typename T> -ostream & operator << (ostream & os, const data<T> & d) +ostream & +operator << (ostream & os, data<T> const & d) { char const * space = ""; for (int i = 0; i < d.vectorlength; i++) { @@ -1002,9 +802,8 @@ ostream & operator << (ostream & os, const data<T> & d) -#define INSTANTIATE(T) \ -template class data<T>; \ -template \ -ostream & operator << <T> (ostream & os, data<T> const & d); +#define INSTANTIATE(T) \ +template class data<T>; \ +template ostream & operator << <T> (ostream & os, data<T> const & d); #include "instantiate" #undef INSTANTIATE diff --git a/Carpet/CarpetLib/src/data.hh b/Carpet/CarpetLib/src/data.hh index 529378149..1cb7cdb25 100644 --- a/Carpet/CarpetLib/src/data.hh +++ b/Carpet/CarpetLib/src/data.hh @@ -69,18 +69,7 @@ public: virtual void allocate (const ibbox& extent, const int proc, void* const memptr = NULL); virtual void free (); - - // Processor management -private: - virtual void change_processor_recv (comm_state& state, - const int newproc, - void* const memptr = NULL); - virtual void change_processor_send (comm_state& state, - const int newproc, - void* const memptr = NULL); - virtual void change_processor_wait (comm_state& state, - const int newproc, - void* const memptr = NULL); + public: // Data accessors @@ -108,51 +97,62 @@ private: } // Data manipulators + private: - virtual comm_state::gcommbuf * - make_typed_commbuf (const ibbox & box) - const; -public: - void copy_from_innerloop (const gdata* gsrc, - const ibbox& box); - void interpolate_from_innerloop (const vector<const gdata*>& gsrcs, - const vector<CCTK_REAL>& times, - const ibbox& box, const CCTK_REAL time, - const int order_space, - const int order_time); + void + copy_from_innerloop (gdata const * gsrc, + ibbox const & box); -private: - void interpolate_time (vector <data const *> const & srcs, - vector <CCTK_REAL> const & times, - ibbox const & box, - CCTK_REAL const time, - int const order_space, - int const order_time); - void interpolate_p_r (data const * const src, - ibbox const & box, - int const order_space); - void interpolate_p_vc_cc (data const * const src, - ibbox const & box, - int const order_space); - void interpolate_prolongate (data const * src, - ibbox const & box, - int order_space); - void interpolate_restrict (data const * src, - ibbox const & box, - int order_space); - void time_interpolate (vector <data *> const & srcs, - ibbox const & box, - vector <CCTK_REAL> const & times, - CCTK_REAL time, - int order_time); + void + transfer_from_innerloop (vector <gdata const *> const & gsrcs, + vector <CCTK_REAL> const & times, + ibbox const & box, + CCTK_REAL time, + int order_space, + int order_time); + + void + transfer_time (vector <gdata const *> const & gsrcs, + vector <CCTK_REAL> const & times, + ibbox const & box, + CCTK_REAL time, + int order_space, + int order_time); + + void + transfer_p_r (data const * const src, + ibbox const & box, + int order_space); + + void + transfer_p_vc_cc (data const * const src, + ibbox const & box, + int order_space); + + void + transfer_prolongate (data const * const src, + ibbox const & box, + int order_space); + + void + transfer_restrict (data const * const src, + ibbox const & box, + int order_space); + + void + time_interpolate (vector <data *> const & srcs, + ibbox const & box, + vector <CCTK_REAL> const & times, + CCTK_REAL time, + int order_time); public: - + // Output - ostream& output (ostream& os) const; - - friend ostream & operator << <T> ( ostream & os, const data<T> & d ); + ostream & output (ostream& os) const; + + friend ostream & operator<< <T> (ostream & os, data<T> const & d); }; #endif // DATA_HH diff --git a/Carpet/CarpetLib/src/gdata.cc b/Carpet/CarpetLib/src/gdata.cc index d20ed17ef..3960ecdc1 100644 --- a/Carpet/CarpetLib/src/gdata.cc +++ b/Carpet/CarpetLib/src/gdata.cc @@ -32,7 +32,7 @@ static int nexttag () int const min_tag = 100; static int last = 0; ++last; - if (last >= max_mpi_tags) last = 0; + if (last >= 30000) last = 0; return min_tag + last; } @@ -68,512 +68,228 @@ gdata::~gdata () -// Processor management -void gdata::change_processor (comm_state& state, - const int newproc, - void* const mem) +// Data manipulators + +void +gdata:: +copy_from (comm_state & state, + gdata const * const src, + ibbox const & box) { - // if this function is being called with collective commbuffers turned on, - // mimic the old state transitions here - switch (state.thestate) { - case state_post: - case state_get_buffer_sizes: - change_processor_recv (state, newproc, mem); - change_processor_send (state, newproc, mem); - break; - case state_wait: - case state_fill_send_buffers: - change_processor_wait (state, newproc, mem); - break; - case state_empty_recv_buffers: - break; - default: - assert(0 and "invalid state"); - } + vector <gdata const *> srcs (1, src); + CCTK_REAL const time = 0.0; + vector <CCTK_REAL> times (1, time); + transfer_from (state, + srcs, times, + box, box, + time, 0, 0); } -// Data manipulators -void gdata::copy_from (comm_state& state, - const gdata* src, const ibbox& box) +void +gdata:: +transfer_from (comm_state & state, + vector<gdata const *> const & srcs, + vector<CCTK_REAL> const & times, + ibbox const & dstbox, + ibbox const & srcbox, + CCTK_REAL const time, + int const order_space, + int const order_time) { - assert (has_storage() and src->has_storage()); - assert (all(box.lower()>=extent().lower() - and box.lower()>=src->extent().lower())); - assert (all(box.upper()<=extent().upper() - and box.upper()<=src->extent().upper())); - assert (all(box.stride()==extent().stride() - and box.stride()==src->extent().stride())); - assert (all((box.lower()-extent().lower())%box.stride() == 0 - and (box.lower()-src->extent().lower())%box.stride() == 0)); - - if (box.empty()) return; + assert (has_storage()); + assert (not dstbox.empty()); + assert (all(dstbox.lower() >= extent().lower())); + assert (all(dstbox.upper() <= extent().upper())); + assert (all(dstbox.stride() == extent().stride())); + assert (all((dstbox.lower() - extent().lower()) % dstbox.stride() == 0)); + + assert (not srcbox.empty()); + assert (srcs.size() == times.size() and srcs.size() > 0); + for (int t=0; t<(int)srcs.size(); ++t) { + assert (srcs.AT(t)->has_storage()); + assert (all(srcbox.lower() >= srcs.AT(t)->extent().lower())); + assert (all(srcbox.upper() <= srcs.AT(t)->extent().upper())); + } + gdata const * const src = srcs.AT(0); + + assert (transport_operator != op_error); + if (transport_operator == op_none) return; + + // Return early if this communication does not concern us if (dist::rank() != proc() and dist::rank() != src->proc()) return; - + + // Interpolate either on the source or on the destination processor, + // depending on whether this increases or reduces the amount of data + int timelevel0, ntimelevels; + find_source_timelevel (times, time, order_time, timelevel0, ntimelevels); + assert (int (srcs.size()) >= ntimelevels); + int const dstpoints = dstbox.size(); + int const srcpoints = srcbox.size() * ntimelevels; + bool const interp_on_src = dstpoints <= srcpoints; + int const npoints = interp_on_src ? dstpoints : srcpoints; + switch (state.thestate) { - case state_post: - if (proc() == src->proc()) { - copy_from_innerloop (src, box); - } else { - copy_from_post (state, src, box); - } - break; - - case state_wait: - if (proc() != src->proc()) { - copy_from_wait (state, src, box); - } - break; - + case state_get_buffer_sizes: // don't count processor-local copies if (proc() != src->proc()) { - // if this is a destination processor: advance its recv buffer size - vector<comm_state::procbufdesc>& procbufs = - state.typebufs.AT(c_datatype()).procbufs; + // if this is a destination processor: advance its recv buffer + // size if (proc() == dist::rank()) { - procbufs.AT(src->proc()).recvbufsize += box.size(); - state.typebufs.AT(c_datatype()).in_use = true; + state.reserve_recv_space (c_datatype(), src->proc(), npoints); } // if this is a source processor: increment its send buffer size if (src->proc() == dist::rank()) { - procbufs.AT(proc()).sendbufsize += box.size(); - state.typebufs.AT(c_datatype()).in_use = true; + state.reserve_send_space (c_datatype(), proc(), npoints); } } break; - + case state_fill_send_buffers: - // if this is a source processor: copy its data into the send buffer - // (the processor-local case is also handled here) - if (src->proc() == dist::rank()) { - if (proc() == src->proc()) { - copy_from_innerloop (src, box); - } else { - copy_into_sendbuffer (state, src, box); + // if this is a source processor: copy its data into the send + // buffer + if (proc() != src->proc()) { + if (src->proc() == dist::rank()) { + if (interp_on_src) { + void * const sendbuf = + state.send_buffer (c_datatype(), proc(), dstbox.size()); + gdata * const buf = + make_typed (varindex, cent, transport_operator, tag); + buf->allocate (dstbox, src->proc(), sendbuf); + buf->transfer_from_innerloop + (srcs, times, dstbox, time, order_space, order_time); + delete buf; + state.commit_send_space (c_datatype(), proc(), dstbox.size()); + } else { + for (int tl = timelevel0; tl < timelevel0 + ntimelevels; ++ tl) { + void * const sendbuf = + state.send_buffer (c_datatype(), proc(), srcbox.size()); + gdata * const buf = + make_typed (varindex, cent, transport_operator, tag); + buf->allocate (srcbox, src->proc(), sendbuf); + buf->copy_from_innerloop (srcs.AT(tl), srcbox); + delete buf; + state.commit_send_space (c_datatype(), proc(), srcbox.size()); + } + } } } break; - + + case state_do_some_work: + // handle the processor-local case + if (proc() == src->proc()) { + if (proc() == dist::rank()) { + transfer_from_innerloop + (srcs, times, dstbox, time, order_space, order_time); + } + } + break; + case state_empty_recv_buffers: - // if this is a destination processor and data has already been received - // from the source processor: copy it from the recv buffer - if (proc() == dist::rank() and - state.recvbuffers_ready.AT(dist::size()*c_datatype() + src->proc())) { - copy_from_recvbuffer (state, src, box); + // if this is a destination processor: copy it from the recv + // buffer + if (proc() != src->proc()) { + if (proc() == dist::rank()) { + if (interp_on_src) { + void * const recvbuf = + state.recv_buffer (c_datatype(), src->proc(), dstbox.size()); + gdata * const buf = + make_typed (varindex, cent, transport_operator, tag); + buf->allocate (dstbox, proc(), recvbuf); + state.commit_recv_space (c_datatype(), src->proc(), dstbox.size()); + copy_from_innerloop (buf, dstbox); + delete buf; + } else { + gdata const * const null = 0; + vector <gdata const *> bufs (timelevel0 + ntimelevels, null); + for (int tl = timelevel0; tl < timelevel0 + ntimelevels; ++ tl) { + void * const recvbuf = + state.recv_buffer (c_datatype(), src->proc(), srcbox.size()); + gdata * const buf = + make_typed (varindex, cent, transport_operator, tag); + buf->allocate (srcbox, proc(), recvbuf); + state.commit_recv_space (c_datatype(), src->proc(), srcbox.size()); + bufs.AT(tl) = buf; + } + transfer_from_innerloop + (bufs, times, dstbox, time, order_space, order_time); + for (int tl = timelevel0; tl < timelevel0 + ntimelevels; ++ tl) { + delete bufs.AT(tl); + } + } + } } break; - + default: - assert(0 and "invalid state"); + assert (0); } } -void gdata::copy_from_post (comm_state& state, - const gdata* src, const ibbox& box) -{ - static Timer total ("copy_from_post"); - total.start(); - if (dist::rank() == proc()) { - - // this processor receives data - - static Timer alloc ("copy_from_post_receive_allocate"); - alloc.start (); - comm_state::gcommbuf * b = make_typed_commbuf (box); - int typesize; - MPI_Type_size (b->datatype(), & typesize); - alloc.stop (b->size() * typesize); - - static Timer timer ("copy_from_post_receive_irecv"); - timer. start (); - MPI_Irecv (b->pointer(), b->size(), b->datatype(), src->proc(), - tag, dist::comm(), &b->request); - timer.stop (b->size() * typesize); - state.requests.push_back (b->request); - state.recvbufs.push (b); - - } else { - // this processor sends data - - static Timer alloc ("copy_from_post_send_allocate"); - alloc.start (); - comm_state::gcommbuf * b = src->make_typed_commbuf (box); - int typesize; - MPI_Type_size (b->datatype(), & typesize); - alloc.stop (b->size() * typesize); - - // copy data into send buffer - static Timer copy ("copy_from_post_send_memcpy"); - copy.start (); - const ibbox& ext = src->extent(); - ivect myshape = ext.shape() / ext.stride(); - ivect items = (box.upper() - box.lower()) / box.stride() + 1; - ivect offs = (box.lower() - ext.lower()) / ext.stride(); - char* send_buffer = (char*) b->pointer(); - int& datatypesize = state.typebufs.AT(c_datatype()).datatypesize; - - double bytes = 0; - for (int k = 0; k < items[2]; k++) { - for (int j = 0; j < items[1]; j++) { - int i = offs[0] + myshape[0]*((j+offs[1]) + myshape[1]*(k+offs[2])); - memcpy (send_buffer, ((char*) src->storage()) + datatypesize*i, - datatypesize * items[0]); - send_buffer += datatypesize * items[0]; - bytes += datatypesize * items[0]; - } - } - copy.stop (bytes); - - static Timer timer ("copy_from_post_send_isend"); - timer.start (); - MPI_Isend (b->pointer(), b->size(), b->datatype(), proc(), - tag, dist::comm(), &b->request); - timer.stop (b->size() * typesize); - state.requests.push_back (b->request); - state.sendbufs.push (b); - } - - total.stop (0); -} - - -void gdata::copy_from_wait (comm_state& state, - const gdata* src, const ibbox& box) +void +gdata:: +find_source_timelevel (vector <CCTK_REAL> const & times, + CCTK_REAL const time, + int const order_time, + int & timelevel0, + int & ntimelevels) + const { - static Timer total ("copy_from_wait"); - total.start (); - - static Timer wait ("copy_from_wait_wait"); - wait.start (); - if (not state.requests.empty()) { - // wait for all requests at once - MPI_Waitall (state.requests.size(), &state.requests.front(), - MPI_STATUSES_IGNORE); - state.requests.clear(); - } - wait.stop (0); - - queue<comm_state::gcommbuf*>* const bufs = - dist::rank() == proc() ? &state.recvbufs : &state.sendbufs; - comm_state::gcommbuf* b = bufs->front(); - - // copy data out of receive buffer - if (bufs == &state.recvbufs) { - static Timer timer ("copy_from_wait_memcpy"); - timer.start (); - const ibbox& ext = extent(); - ivect myshape = ext.shape() / ext.stride(); - ivect items = (box.upper() - box.lower()) / box.stride() + 1; - ivect offs = (box.lower() - ext.lower()) / ext.stride(); - const char* recv_buffer = (const char*) b->pointer(); - int& datatypesize = state.typebufs.AT(c_datatype()).datatypesize; - - for (int k = 0; k < items[2]; k++) { - for (int j = 0; j < items[1]; j++) { - int i = offs[0] + myshape[0]*((j+offs[1]) + myshape[1]*(k+offs[2])); - memcpy (((char*) storage()) + datatypesize*i, recv_buffer, - datatypesize * items[0]); - recv_buffer += datatypesize * items[0]; - } + // Ensure that the times are consistent + assert (times.size() > 0); + assert (order_time >= 0); + + CCTK_REAL const eps = 1.0e-12; + CCTK_REAL const min_time = * min_element (times.begin(), times.end()); + CCTK_REAL const max_time = * max_element (times.begin(), times.end()); + if (transport_operator != op_copy) { + if (time < min_time - eps or time > max_time + eps) { + ostringstream buf; + buf << "Internal error: extrapolation in time." + << " time=" << time + << " times=" << times; + CCTK_WARN (0, buf.str().c_str()); } - timer.stop (0); } - - static Timer del ("copy_from_wait_delete"); - del.start (); - bufs->pop(); - delete b; - del.stop (0); - - total.stop (0); -} - - -// Copy processor-local source data into communication send buffer -// of the corresponding destination processor -void gdata::copy_into_sendbuffer (comm_state& state, - const gdata* src, const ibbox& box) -{ - DECLARE_CCTK_PARAMETERS; - - if (proc() == src->proc()) { - // copy on same processor - copy_from_innerloop (src, box); - } else { - // copy to remote processor - assert (src->_has_storage); - int const datatypesize = state.typebufs.AT(c_datatype()).datatypesize; - comm_state::procbufdesc& procbuf = - state.typebufs.AT(c_datatype()).procbufs.AT(proc()); - assert (procbuf.sendbuf - procbuf.sendbufbase <= - ((int)procbuf.sendbufsize - box.size()) * datatypesize); - int const fillstate = procbuf.sendbuf + (int)box.size()*datatypesize - - procbuf.sendbufbase; - assert (fillstate <= (int)procbuf.sendbufsize * datatypesize); - - // copy this processor's data into the send buffer - ibbox const & ext = src->extent(); - ivect const myshape = ext.shape() / ext.stride(); - ivect const items = (box.upper() - box.lower()) / box.stride() + 1; - ivect const offs = (box.lower() - ext.lower()) / ext.stride(); - static Timer copy ("copy_into_sendbuffer_memcpy"); - copy.start (); - assert (dim == 3); - for (int k = 0; k < items[2]; k++) { - for (int j = 0; j < items[1]; j++) { - int const i = - offs[0] + myshape[0]*((j+offs[1]) + myshape[1]*(k+offs[2])); - memcpy (procbuf.sendbuf, - ((const char*) src->storage()) + datatypesize*i, - datatypesize * items[0]); - procbuf.sendbuf += datatypesize * items[0]; - } - } - copy.stop (datatypesize * prod (items)); + // Use this timelevel, or interpolate in time if set to -1 + int timelevel = -1; - if (not combine_sends) { - // post the send if the buffer is full - if (fillstate == (int)procbuf.sendbufsize * datatypesize) { - static Timer timer ("copy_into_sendbuffer_isend"); - timer.start (); - MPI_Isend (procbuf.sendbufbase, procbuf.sendbufsize, - state.typebufs.AT(c_datatype()).mpi_datatype, - proc(), c_datatype(), dist::comm(), - &state.srequests.AT(dist::size()*c_datatype() + proc())); - timer.stop (procbuf.sendbufsize * datatypesize); - } + // Try to avoid time interpolation if possible + if (timelevel == -1) { + if (times.size() == 1) { + timelevel = 0; } } -} - - -// Copy processor-local destination data from communication recv buffer -// of the corresponding source processor -void gdata::copy_from_recvbuffer (comm_state& state, - const gdata* src, const ibbox& box) -{ - int& datatypesize = state.typebufs.AT(c_datatype()).datatypesize; - comm_state::procbufdesc& procbuf = - state.typebufs.AT(c_datatype()).procbufs.AT(src->proc()); - assert (procbuf.recvbuf - procbuf.recvbufbase <= - ((int)procbuf.recvbufsize-box.size()) * datatypesize); - - // copy this processor's data from the recv buffer - const ibbox& ext = extent(); - ivect myshape = ext.shape() / ext.stride(); - ivect items = (box.upper() - box.lower()) / box.stride() + 1; - ivect offs = (box.lower() - ext.lower()) / ext.stride(); - - static Timer timer ("copy_from_recvbuffer_memcpy"); - timer.start (); - double bytes = 0; - assert (dim == 3); - for (int k = 0; k < items[2]; k++) { - for (int j = 0; j < items[1]; j++) { - int i = offs[0] + myshape[0]*((j+offs[1]) + myshape[1]*(k+offs[2])); - memcpy (((char*) storage()) + datatypesize*i, - procbuf.recvbuf, datatypesize * items[0]); - procbuf.recvbuf += datatypesize * items[0]; - bytes += datatypesize * items[0]; + if (timelevel == -1) { + if (transport_operator == op_copy) { + timelevel = 0; } } - timer.stop (bytes); -} - - -void gdata -::interpolate_from (comm_state& state, - const vector<const gdata*> srcs, - const vector<CCTK_REAL> times, - const ibbox& box, const CCTK_REAL time, - const int order_space, - const int order_time) -{ - assert (transport_operator != op_error); - if (transport_operator == op_none) return; - - assert (has_storage()); - assert (all(box.lower()>=extent().lower())); - assert (all(box.upper()<=extent().upper())); - assert (all(box.stride()==extent().stride())); - assert (all((box.lower()-extent().lower())%box.stride() == 0)); - assert (srcs.size() == times.size() and srcs.size()>0); - for (int t=0; t<(int)srcs.size(); ++t) { - assert (srcs.AT(t)->has_storage()); - assert (all(box.lower()>=srcs.AT(t)->extent().lower())); - assert (all(box.upper()<=srcs.AT(t)->extent().upper())); - } - assert (not box.empty()); - const gdata* src = srcs.AT(0); - if (dist::rank() != proc() and dist::rank() != src->proc()) return; - - switch (state.thestate) { - case state_post: - if (proc() == src->proc()) { - interpolate_from_innerloop(srcs, times, box, time, - order_space, order_time); - } else { - interpolate_from_post(state, srcs, times, box, time, - order_space, order_time); - } - break; - case state_wait: - if (proc() != src->proc()) { - copy_from_wait (state, src, box); - } - break; - case state_get_buffer_sizes: - // don't count processor-local interpolations - if (proc() != src->proc()) { - // if this is a destination processor: increment its recv buffer size - vector<comm_state::procbufdesc>& procbufs = - state.typebufs.AT(c_datatype()).procbufs; - if (proc() == dist::rank()) { - procbufs.AT(src->proc()).recvbufsize += box.size(); - state.typebufs.AT(c_datatype()).in_use = true; + if (timelevel == -1) { + for (size_t tl=0; tl<times.size(); ++tl) { + static_assert (abs(0.1) > 0, + "Function CarpetLib::abs has wrong signature"); + if (abs (times.AT(tl) - time) < eps) { + timelevel = tl; + break; } - // if this is a source processor: increment its send buffer size - if (src->proc() == dist::rank()) { - procbufs.AT(proc()).sendbufsize += box.size(); - state.typebufs.AT(c_datatype()).in_use = true; - } - } - break; - case state_fill_send_buffers: - // if this is a source processor: interpolate its data into the send buffer - // (the processor-local case is also handled here) - if (src->proc() == dist::rank()) { - if (proc() == src->proc()) { - interpolate_from_innerloop(srcs, times, box, time, - order_space, order_time); - } else { - interpolate_into_sendbuffer(state, srcs, times, box, - time, order_space, order_time); - } - } - break; - case state_empty_recv_buffers: - // if this is a destination processor and data has already been received - // from the source processor: copy it from the recv buffer - // (the processor-local case is not handled here) - if (proc() == dist::rank() and - state.recvbuffers_ready.AT(dist::size()*c_datatype() + src->proc())) { - copy_from_recvbuffer(state, src, box); } - break; - default: - assert(0 and "invalid state"); - } -} - - -void gdata -::interpolate_from_post (comm_state& state, - const vector<const gdata*> srcs, - const vector<CCTK_REAL> times, - const ibbox& box, - const CCTK_REAL time, - const int order_space, - const int order_time) -{ - const gdata* src = srcs.AT(0); - if (dist::rank() == proc()) { - // interpolate from other processor - - // this processor receives data - - comm_state::gcommbuf * b = make_typed_commbuf (box); - int typesize; - MPI_Type_size (b->datatype(), & typesize); - - static Timer timer ("interpolate_from_post_irecv"); - timer.start (); - MPI_Irecv (b->pointer(), b->size(), b->datatype(), src->proc(), - tag, dist::comm(), &b->request); - timer.stop (b->size() * typesize); - state.requests.push_back (b->request); - state.recvbufs.push (b); - } else { - // this processor sends data - - comm_state::gcommbuf * b = src->make_typed_commbuf (box); - int typesize; - MPI_Type_size (b->datatype(), & typesize); - - gdata * tmp = src->make_typed (varindex, cent, transport_operator, tag); - tmp->allocate (box, src->proc(), b->pointer()); - tmp->interpolate_from_innerloop (srcs, times, box, time, - order_space, order_time); - delete tmp; - - static Timer timer ("interpolate_from_post_isend"); - timer.start (); - MPI_Isend (b->pointer(), b->size(), b->datatype(), proc(), - tag, dist::comm(), &b->request); - timer.stop (b->size() * typesize); - state.requests.push_back (b->request); - state.sendbufs.push (b); } -} - - -// Interpolate processor-local source data into communication send buffer -// of the corresponding destination processor -void gdata -::interpolate_into_sendbuffer (comm_state& state, - const vector<const gdata*> srcs, - const vector<CCTK_REAL> times, - const ibbox& box, - const CCTK_REAL time, - const int order_space, - const int order_time) -{ - DECLARE_CCTK_PARAMETERS; - if (proc() == srcs.AT(0)->proc()) { - // interpolate on same processor - interpolate_from_innerloop (srcs, times, box, time, - order_space, order_time); + if (timelevel >= 0) { + timelevel0 = timelevel; + ntimelevels = 1; } else { - // interpolate to remote processor - const gdata* src = srcs.AT(0); - assert (src->_has_storage); - int& datatypesize = state.typebufs.AT(c_datatype()).datatypesize; - comm_state::procbufdesc& procbuf = - state.typebufs.AT(c_datatype()).procbufs.AT(proc()); - assert (procbuf.sendbuf - procbuf.sendbufbase <= - ((int)procbuf.sendbufsize - box.size()) * datatypesize); - assert (src->has_storage()); - int const fillstate = (procbuf.sendbuf + box.size()*datatypesize) - - procbuf.sendbufbase; - assert (fillstate <= (int)procbuf.sendbufsize * datatypesize); - - // interpolate this processor's data into the send buffer - gdata* tmp = src->make_typed (varindex, cent, transport_operator, tag); - tmp->allocate (box, src->proc(), procbuf.sendbuf); - tmp->interpolate_from_innerloop (srcs, times, box, time, - order_space, order_time); - delete tmp; - - // advance send buffer to point to the next ibbox slot - procbuf.sendbuf += datatypesize * box.size(); - - if (not combine_sends) { - // post the send if the buffer is full - if (fillstate == (int)procbuf.sendbufsize*datatypesize) { - static Timer timer ("interpolate_into_sendbuffer_isend"); - timer.start (); - MPI_Isend (procbuf.sendbufbase, procbuf.sendbufsize, - state.typebufs.AT(c_datatype()).mpi_datatype, - proc(), c_datatype(), dist::comm(), - &state.srequests.AT(dist::size()*c_datatype() + proc())); - timer.stop (procbuf.sendbufsize*datatypesize); - } - } + timelevel0 = 0; + ntimelevels = order_time + 1; } + + assert (timelevel0 >= 0 and timelevel0 < (int)times.size()); + assert (ntimelevels > 0); } diff --git a/Carpet/CarpetLib/src/gdata.hh b/Carpet/CarpetLib/src/gdata.hh index 106c6ae45..2cdf7fbcd 100644 --- a/Carpet/CarpetLib/src/gdata.hh +++ b/Carpet/CarpetLib/src/gdata.hh @@ -10,10 +10,10 @@ #include "cctk.h" +#include "bbox.hh" #include "commstate.hh" #include "defs.hh" #include "dist.hh" -#include "bbox.hh" #include "operators.hh" #include "timestat.hh" #include "vect.hh" @@ -72,25 +72,6 @@ public: const operator_type transport_operator = op_error, const int tag = -1) const = 0; - // Processor management - void change_processor (comm_state& state, - const int newproc, - void* const mem=0); - protected: - virtual void change_processor_recv (comm_state& state, - const int newproc, - void* const mem=0) - = 0; - virtual void change_processor_send (comm_state& state, - const int newproc, - void* const mem=0) - = 0; - virtual void change_processor_wait (comm_state& state, - const int newproc, - void* const mem=0) - = 0; - public: - // Storage management virtual void allocate (const ibbox& extent, const int proc, void* const mem=0) = 0; @@ -154,67 +135,51 @@ private: // Datatype accessors // maps the C datatype of a data class object to a 0-based index virtual unsigned int c_datatype () const = 0; - + // Data manipulators + +public: + void + copy_from (comm_state & state, + gdata const * src, + ibbox const & box); + + void + transfer_from (comm_state & state, + vector<gdata const *> const & srcs, + vector<CCTK_REAL> const & times, + ibbox const & dstbox, + ibbox const & srcbox, + CCTK_REAL time, + int order_space, + int order_time); + +protected: + void + find_source_timelevel (vector <CCTK_REAL> const & times, + CCTK_REAL time, + int order_time, + int & timelevel0, + int & ntimelevels) + const; + private: - virtual comm_state::gcommbuf * - make_typed_commbuf (const ibbox & box) - const = 0; - - public: - void copy_from (comm_state& state, - const gdata* src, const ibbox& box); - void interpolate_from (comm_state& state, - const vector<const gdata*> srcs, - const vector<CCTK_REAL> times, - const ibbox& box, - const CCTK_REAL time, - const int order_space, - const int order_time); - - private: - void copy_from_post (comm_state& state, - const gdata* src, const ibbox& box); - void interpolate_from_post (comm_state& state, - const vector<const gdata*> srcs, - const vector<CCTK_REAL> times, - const ibbox& box, - const CCTK_REAL time, - const int order_space, - const int order_time); - void copy_from_wait (comm_state& state, - const gdata* src, const ibbox& box); - - // Copy processor-local source data into communication send buffer - // of the corresponding destination processor - // The case when both source and destination are local is also handled here. - void copy_into_sendbuffer (comm_state& state, - const gdata* src, const ibbox& box); - // Copy processor-local destination data from communication recv buffer - // of the corresponding source processor - void copy_from_recvbuffer (comm_state& state, - const gdata* src, const ibbox& box); - // Interpolate processor-local source data into communication send buffer - // of the corresponding destination processor - // The case when both source and destination are local is also handled here. - void interpolate_into_sendbuffer (comm_state& state, - const vector<const gdata*> srcs, - const vector<CCTK_REAL> times, - const ibbox& box, - const CCTK_REAL time, - const int order_space, - const int order_time); - - protected: - virtual void - copy_from_innerloop (const gdata* src, const ibbox& box) = 0; - virtual void - interpolate_from_innerloop (const vector<const gdata*>& srcs, - const vector<CCTK_REAL>& times, - const ibbox& box, - const CCTK_REAL time, - const int order_space, - const int order_time) = 0; + virtual + void + copy_from_innerloop (gdata const * gsrc, + ibbox const & box) + = 0; + + virtual + void + transfer_from_innerloop (vector <gdata const *> const & gsrcs, + vector <CCTK_REAL> const & times, + ibbox const & box, + CCTK_REAL time, + int order_space, + int order_time) + = 0; + }; |