#include #include #include "cctk.h" #include "cctk_Parameters.h" #include "bbox.hh" #include "defs.hh" #include "dist.hh" #include "vect.hh" #include "commstate.hh" #include "timestat.hh" using namespace std; using namespace CarpetLib; // Communication state control comm_state::comm_state () { // If CarpetLib::use_collective_communication_buffers is set to true, // this comm_state object will use collective communications, // i.e., it will step through // state_get_buffer_sizes // state_fill_send_buffers // state_empty_recv_buffers // // If CarpetLib::use_collective_communication_buffers is false // then individual communications on single components are used // by stepping through // state_post // state_wait DECLARE_CCTK_PARAMETERS; static Timer timer ("commstate::create"); timer.start (); thestate = use_collective_communication_buffers ? state_get_buffer_sizes : state_post; typebufs.resize (dist::c_ndatatypes()); for (size_t type = 0; type < typebufs.size(); type++) { typebufs[type].procbufs.resize(dist::size()); } #define INSTANTIATE(T) \ { \ T dummy; \ int type = dist::c_datatype (dummy); \ typebufs.AT(type).datatypesize = sizeof (dummy); \ typebufs.AT(type).mpi_datatype = dist::datatype (dummy); \ } #include "instantiate" #undef INSTANTIATE recvbuffers_ready.resize (dist::c_ndatatypes() * dist::size()); srequests.resize (dist::c_ndatatypes() * dist::size(), MPI_REQUEST_NULL); rrequests.resize (dist::c_ndatatypes() * dist::size(), MPI_REQUEST_NULL); timer.stop (0); } void comm_state::step () { DECLARE_CCTK_PARAMETERS; static Timer total ("commstate::step"); total.start (); assert (thestate != state_done); switch (thestate) { case state_post: // After all sends/recvs have been posted in 'state_post' // now wait for their completion in 'state_wait'. thestate = state_wait; break; case state_wait: thestate = state_done; break; case state_get_buffer_sizes: // The sizes of the collective communication buffers are known // so now allocate them. // The receive operations are also posted here already // (a clever MPI layer may take advantage of such early posting). num_posted_recvs = num_completed_recvs = 0; for (int proc1 = 0; proc1 < dist::size(); ++ proc1) { size_t const proc = interleave_communications ? (proc1 + dist::rank()) % dist::size() : proc1; for (size_t type = 0; type < typebufs.size(); type++) { // skip unused datatype buffers if (not typebufs[type].in_use) continue; int datatypesize = typebufs[type].datatypesize; procbufdesc& procbuf = typebufs[type].procbufs[proc]; procbuf.sendbufbase = new char[procbuf.sendbufsize*datatypesize]; procbuf.recvbufbase = new char[procbuf.recvbufsize*datatypesize]; // TODO: this may be a bit extreme, and it is only for // internal consistency checking if (poison_new_memory) { memset (procbuf.sendbufbase, poison_value, procbuf.sendbufsize*datatypesize); memset (procbuf.recvbufbase, poison_value, procbuf.recvbufsize*datatypesize); } procbuf.sendbuf = procbuf.sendbufbase; procbuf.recvbuf = procbuf.recvbufbase; if (procbuf.recvbufsize > 0) { static Timer timer ("commstate_sizes_irecv"); timer.start (); int const tag = vary_tags ? (dist::rank() + dist::size() * (proc + dist::size() * type)) % 32768 : type; MPI_Irecv (procbuf.recvbufbase, procbuf.recvbufsize, typebufs[type].mpi_datatype, proc, tag, dist::comm(), &rrequests[dist::size()*type + proc]); timer.stop (0); num_posted_recvs++; } } } if (barrier_between_stages) { // Add a barrier, to try to ensure that all Irecvs are posted // before the first Isends are made // (Alternative: Use MPI_Alltoallv instead) MPI_Barrier (dist::comm()); } // Now go and get the send buffers filled with data. // Once a buffer is full it will be posted right away // (see gdata::copy_into_sendbuffer() and // gdata::interpolate_into_sendbuffer()). thestate = state_fill_send_buffers; break; case state_fill_send_buffers: if (combine_sends) { // Send the data. Do not send them sequentially, but try to // intersperse the communications for (int proc1 = 0; proc1 < dist::size(); ++ proc1) { int const proc = interleave_communications ? (proc1 + dist::size() - dist::rank()) % dist::size() : proc1; for (size_t type = 0; type < typebufs.size(); type++) { // skip unused datatype buffers if (not typebufs[type].in_use) continue; int const datatypesize = typebufs[type].datatypesize; procbufdesc const & procbuf = typebufs[type].procbufs[proc]; int const fillstate = procbuf.sendbuf - procbuf.sendbufbase; assert (fillstate == (int)procbuf.sendbufsize * datatypesize); if (procbuf.sendbufsize > 0) { int const tag = vary_tags ? (proc + dist::size() * (dist::rank() + dist::size() * type)) % 32768 : type; if (use_mpi_send) { // use MPI_Send static Timer timer ("commstate_send"); timer.start (); MPI_Send (procbuf.sendbufbase, procbuf.sendbufsize, typebufs[type].mpi_datatype, proc, tag, dist::comm()); srequests[dist::size()*type + proc] = MPI_REQUEST_NULL; timer.stop (procbuf.sendbufsize * datatypesize); } else if (use_mpi_ssend) { // use MPI_Ssend static Timer timer ("commstate_ssend"); timer.start (); MPI_Ssend (procbuf.sendbufbase, procbuf.sendbufsize, typebufs[type].mpi_datatype, proc, tag, dist::comm()); srequests[dist::size()*type + proc] = MPI_REQUEST_NULL; timer.stop (procbuf.sendbufsize * datatypesize); } else { // use MPI_Isend static Timer timer ("commstate_isend"); timer.start (); MPI_Isend (procbuf.sendbufbase, procbuf.sendbufsize, typebufs[type].mpi_datatype, proc, tag, dist::comm(), &srequests[dist::size()*type + proc]); timer.stop (procbuf.sendbufsize * datatypesize); } } } // for type } // for proc } // Now fall through to the next state in which the recv buffers // are emptied as soon as data has arrived. thestate = state_empty_recv_buffers; case state_empty_recv_buffers: // Finish (at least one of) the posted communications if (not AllPostedCommunicationsFinished ()) { // No state change if there are still outstanding communications; // do another comm_state loop iteration. } else { // Everything is done so release the collective communication buffers. for (size_t type = 0; type < typebufs.size(); type++) { for (size_t proc = 0; proc < typebufs[type].procbufs.size(); proc++) { delete[] typebufs[type].procbufs[proc].sendbufbase; delete[] typebufs[type].procbufs[proc].recvbufbase; } } thestate = state_done; } break; default: assert (0 && "invalid state"); } total.stop (0); } bool comm_state::done () { return thestate == state_done; } comm_state::~comm_state () { DECLARE_CCTK_PARAMETERS; assert (thestate == state_done or thestate == (use_collective_communication_buffers ? state_get_buffer_sizes : state_post)); assert (requests.empty()); } // wait for completion of posted collective buffer sends/receives // // Depending on the parameter CarpetLib::use_waitall, this function will wait // for all (true) or at least one (false) of the posted receive operations // to finish. // // It returns true if all posted communications have been completed. bool comm_state::AllPostedCommunicationsFinished () { DECLARE_CCTK_PARAMETERS; // check if all outstanding receives have been completed already if (num_posted_recvs == num_completed_recvs) { // finalize the outstanding sends in one go if (reduce_mpi_waitall) { size_t nreqs = 0; for (size_t i=0; i reqs(nreqs); nreqs = 0; for (size_t i=0; i reqs(nreqs); nreqs = 0; for (size_t i=0; i completed_recvs(rrequests.size(), -1); // wait for completion of at least one posted receive operation static Timer timer ("commstate_waitsome"); timer.start (); MPI_Waitsome (rrequests.size(), &rrequests.front(), &num_completed_recvs_, &completed_recvs.front(), MPI_STATUSES_IGNORE); timer.stop (0); assert (0 < num_completed_recvs_); num_completed_recvs += num_completed_recvs_; // mark the recveive buffers of completed communications as ready for (int i = 0; i < num_completed_recvs_; i++) { assert (rrequests.AT(completed_recvs.AT(i)) == MPI_REQUEST_NULL); recvbuffers_ready.AT(completed_recvs.AT(i)) = true; } } return (false); } template comm_state::commbuf::commbuf (ibbox const & box) { data.resize (prod (box.shape() / box.stride())); } template comm_state::commbuf::~commbuf () { } template void const * comm_state::commbuf::pointer () const { return &data.front(); } template void * comm_state::commbuf::pointer () { return &data.front(); } template int comm_state::commbuf::size () const { return data.size(); } template MPI_Datatype comm_state::commbuf::datatype () const { T dummy; return dist::datatype (dummy); } #define INSTANTIATE(T) \ template class comm_state::commbuf; #include "instantiate" #undef INSTANTIATE