diff options
author | Thomas Radke <tradke@aei.mpg.de> | 2005-08-15 15:00:00 +0000 |
---|---|---|
committer | Thomas Radke <tradke@aei.mpg.de> | 2005-08-15 15:00:00 +0000 |
commit | b3405326ebf651b20b4c44423df62ef23a1bf8f2 (patch) | |
tree | 3f28fb7697589ff69ab3ac283cd0064dba340522 /Carpet/CarpetLib/src/gdata.cc | |
parent | 8493c61f465169c3d52b53b5023680a0d33f898c (diff) |
Carpet*: generalise the comm_state class for collective buffer communications
CarpetLib's comm_state class (actually, it's still just a struct) has been
extended to handle collective buffer communications for all possible C datatypes
at the same time. This makes it unnecessary for the higher-level communication
routines to loop over each individual datatype separately.
darcs-hash:20050815150023-776a0-dddc1aca7ccaebae872f9f451b2c3595cd951fed.gz
Diffstat (limited to 'Carpet/CarpetLib/src/gdata.cc')
-rw-r--r-- | Carpet/CarpetLib/src/gdata.cc | 163 |
1 files changed, 84 insertions, 79 deletions
diff --git a/Carpet/CarpetLib/src/gdata.cc b/Carpet/CarpetLib/src/gdata.cc index 3b50abbee..0d85915d2 100644 --- a/Carpet/CarpetLib/src/gdata.cc +++ b/Carpet/CarpetLib/src/gdata.cc @@ -37,9 +37,11 @@ static int nexttag () // Constructors -gdata::gdata (const int varindex_, const operator_type transport_operator_, +gdata::gdata (const int varindex_, + const operator_type transport_operator_, const int tag_) - : varindex(varindex_), transport_operator(transport_operator_), + : varindex(varindex_), + transport_operator(transport_operator_), _has_storage(false), comm_active(false), tag(tag_ >= 0 ? tag_ : nexttag()) @@ -75,7 +77,7 @@ void gdata::change_processor (comm_state& state, change_processor_wait (state, newproc, mem); break; default: - assert(0 && "invalid state"); + assert(0 and "invalid state"); } } @@ -85,18 +87,18 @@ void gdata::change_processor (comm_state& state, void gdata::copy_from (comm_state& state, const gdata* src, const ibbox& box) { - assert (has_storage() && src->has_storage()); + assert (has_storage() and src->has_storage()); assert (all(box.lower()>=extent().lower() - && box.lower()>=src->extent().lower())); + and box.lower()>=src->extent().lower())); assert (all(box.upper()<=extent().upper() - && box.upper()<=src->extent().upper())); + and box.upper()<=src->extent().upper())); assert (all(box.stride()==extent().stride() - && box.stride()==src->extent().stride())); + and box.stride()==src->extent().stride())); assert (all((box.lower()-extent().lower())%box.stride() == 0 - && (box.lower()-src->extent().lower())%box.stride() == 0)); + and (box.lower()-src->extent().lower())%box.stride() == 0)); if (box.empty()) return; - if (dist::rank() != proc() && dist::rank() != src->proc()) return; + if (dist::rank() != proc() and dist::rank() != src->proc()) return; switch (state.thestate) { case state_post: @@ -117,12 +119,16 @@ void gdata::copy_from (comm_state& state, // don't count processor-local copies if (proc() != src->proc()) { // if this is a destination processor: advance its recv buffer size + vector<comm_state::procbufdesc>& procbufs = + state.typebufs.at(c_datatype()).procbufs; if (proc() == dist::rank()) { - state.collbufs.at(src->proc()).recvbufsize += box.size(); + procbufs.at(src->proc()).recvbufsize += box.size(); + state.typebufs.at(c_datatype()).in_use = true; } // if this is a source processor: increment its send buffer size if (src->proc() == dist::rank()) { - state.collbufs.at(proc()).sendbufsize += box.size(); + procbufs.at(proc()).sendbufsize += box.size(); + state.typebufs.at(c_datatype()).in_use = true; } } break; @@ -142,13 +148,14 @@ void gdata::copy_from (comm_state& state, case state_empty_recv_buffers: // if this is a destination processor and data has already been received // from the source processor: copy it from the recv buffer - if (proc() == dist::rank() && state.recvbuffers_ready.at(src->proc())) { + if (proc() == dist::rank() and + state.recvbuffers_ready.at(dist::size()*c_datatype() + src->proc())) { copy_from_recvbuffer (state, src, box); } break; default: - assert(0 && "invalid state"); + assert(0 and "invalid state"); } } @@ -187,24 +194,21 @@ void gdata::copy_from_post (comm_state& state, ivect items = (box.upper() - box.lower()) / box.stride() + 1; ivect offs = (box.lower() - ext.lower()) / ext.stride(); char* send_buffer = (char*) b->pointer(); - const int vartype = CCTK_VarTypeI(varindex); - const int vartypesize = CCTK_VarTypeSize(vartype); - assert(vartypesize >= 0); + int& datatypesize = state.typebufs.at(c_datatype()).datatypesize; for (int k = 0; k < items[2]; k++) { for (int j = 0; j < items[1]; j++) { int i = offs[0] + shape[0]*((j+offs[1]) + shape[1]*(k+offs[2])); - memcpy (send_buffer, - ((char*) src->storage()) + vartypesize*i, - vartypesize*items[0]); - send_buffer += vartypesize*items[0]; + memcpy (send_buffer, ((char*) src->storage()) + datatypesize*i, + datatypesize * items[0]); + send_buffer += datatypesize * items[0]; } } wtime_copyfrom_sendinner_copy.stop(); wtime_copyfrom_sendinner_send.start(); MPI_Isend (b->pointer(), b->size(), b->datatype(), proc(), - tag, dist::comm, &b->request); + tag, dist::comm, &b->request); wtime_copyfrom_sendinner_send.stop(); state.requests.push_back (b->request); state.sendbufs.push (b); @@ -220,7 +224,7 @@ void gdata::copy_from_wait (comm_state& state, wtime_copyfrom_wait.start(); wtime_copyfrom_recvwaitinner_wait.start(); - if (! state.requests.empty()) { + if (not state.requests.empty()) { // wait for all requests at once MPI_Waitall (state.requests.size(), &state.requests.front(), MPI_STATUSES_IGNORE); @@ -240,17 +244,14 @@ void gdata::copy_from_wait (comm_state& state, ivect items = (box.upper() - box.lower()) / box.stride() + 1; ivect offs = (box.lower() - ext.lower()) / ext.stride(); const char* recv_buffer = (const char*) b->pointer(); - const int vartype = CCTK_VarTypeI(varindex); - const int vartypesize = CCTK_VarTypeSize(vartype); - assert(vartypesize >= 0); + int& datatypesize = state.typebufs.at(c_datatype()).datatypesize; for (int k = 0; k < items[2]; k++) { for (int j = 0; j < items[1]; j++) { int i = offs[0] + shape[0]*((j+offs[1]) + shape[1]*(k+offs[2])); - memcpy (((char*) storage()) + vartypesize*i, - recv_buffer, - vartypesize*items[0]); - recv_buffer += vartypesize*items[0]; + memcpy (((char*) storage()) + datatypesize*i, recv_buffer, + datatypesize * items[0]); + recv_buffer += datatypesize * items[0]; } } wtime_copyfrom_recvwaitinner_copy.stop(); @@ -276,15 +277,14 @@ void gdata::copy_into_sendbuffer (comm_state& state, } else { // copy to remote processor assert (src->_has_storage); - assert (state.collbufs.at(proc()).sendbuf - - state.collbufs.at(proc()).sendbufbase <= - (state.collbufs.at(proc()).sendbufsize - box.size()) * - state.vartypesize); - assert (proc() < state.collbufs.size()); - int fillstate = (state.collbufs[proc()].sendbuf + - box.size()*state.vartypesize) - - state.collbufs[proc()].sendbufbase; - assert (fillstate <= state.collbufs[proc()].sendbufsize*state.vartypesize); + int& datatypesize = state.typebufs.at(c_datatype()).datatypesize; + comm_state::procbufdesc& procbuf = + state.typebufs.at(c_datatype()).procbufs.at(proc()); + assert (procbuf.sendbuf - procbuf.sendbufbase <= + (procbuf.sendbufsize - box.size()) * datatypesize); + int fillstate = procbuf.sendbuf + box.size()*datatypesize - + procbuf.sendbufbase; + assert (fillstate <= procbuf.sendbufsize * datatypesize); // copy this processor's data into the send buffer const ibbox& ext = src->extent(); @@ -292,22 +292,23 @@ void gdata::copy_into_sendbuffer (comm_state& state, ivect items = (box.upper() - box.lower()) / box.stride() + 1; ivect offs = (box.lower() - ext.lower()) / ext.stride(); + assert (dim == 3); for (int k = 0; k < items[2]; k++) { for (int j = 0; j < items[1]; j++) { int i = offs[0] + shape[0]*((j+offs[1]) + shape[1]*(k+offs[2])); - memcpy (state.collbufs[proc()].sendbuf, - ((const char*) src->storage()) + state.vartypesize*i, - state.vartypesize*items[0]); - state.collbufs[proc()].sendbuf += state.vartypesize*items[0]; + memcpy (procbuf.sendbuf, + ((const char*) src->storage()) + datatypesize*i, + datatypesize * items[0]); + procbuf.sendbuf += datatypesize * items[0]; } } // post the send if the buffer is full - if (fillstate == state.collbufs[proc()].sendbufsize*state.vartypesize) { - MPI_Isend (state.collbufs[proc()].sendbufbase, - state.collbufs[proc()].sendbufsize, - state.datatype, proc(), 0, dist::comm, - &state.srequests[proc()]); + if (fillstate == procbuf.sendbufsize * datatypesize) { + MPI_Isend (procbuf.sendbufbase, procbuf.sendbufsize, + state.typebufs.at(c_datatype()).mpi_datatype, + proc(), c_datatype(), dist::comm, + &state.srequests.at(dist::size()*c_datatype() + proc())); } } } @@ -318,11 +319,11 @@ void gdata::copy_into_sendbuffer (comm_state& state, void gdata::copy_from_recvbuffer (comm_state& state, const gdata* src, const ibbox& box) { - assert (src->proc() < state.collbufs.size()); - assert (state.collbufs[src->proc()].recvbuf - - state.collbufs[src->proc()].recvbufbase <= - (state.collbufs[src->proc()].recvbufsize-box.size()) * - state.vartypesize); + int& datatypesize = state.typebufs.at(c_datatype()).datatypesize; + comm_state::procbufdesc& procbuf = + state.typebufs.at(c_datatype()).procbufs.at(src->proc()); + assert (procbuf.recvbuf - procbuf.recvbufbase <= + (procbuf.recvbufsize-box.size()) * datatypesize); // copy this processor's data from the recv buffer const ibbox& ext = extent(); @@ -330,13 +331,13 @@ void gdata::copy_from_recvbuffer (comm_state& state, ivect items = (box.upper() - box.lower()) / box.stride() + 1; ivect offs = (box.lower() - ext.lower()) / ext.stride(); + assert (dim == 3); for (int k = 0; k < items[2]; k++) { for (int j = 0; j < items[1]; j++) { int i = offs[0] + shape[0]*((j+offs[1]) + shape[1]*(k+offs[2])); - memcpy (((char*) storage()) + state.vartypesize*i, - state.collbufs[src->proc()].recvbuf, - state.vartypesize*items[0]); - state.collbufs[src->proc()].recvbuf += state.vartypesize*items[0]; + memcpy (((char*) storage()) + datatypesize*i, + procbuf.recvbuf, datatypesize * items[0]); + procbuf.recvbuf += datatypesize * items[0]; } } } @@ -358,15 +359,15 @@ void gdata assert (all(box.upper()<=extent().upper())); assert (all(box.stride()==extent().stride())); assert (all((box.lower()-extent().lower())%box.stride() == 0)); - assert (srcs.size() == times.size() && srcs.size()>0); + assert (srcs.size() == times.size() and srcs.size()>0); for (int t=0; t<(int)srcs.size(); ++t) { assert (srcs.at(t)->has_storage()); assert (all(box.lower()>=srcs.at(t)->extent().lower())); assert (all(box.upper()<=srcs.at(t)->extent().upper())); } - assert (! box.empty()); + assert (not box.empty()); const gdata* src = srcs.at(0); - if (dist::rank() != proc() && dist::rank() != src->proc()) return; + if (dist::rank() != proc() and dist::rank() != src->proc()) return; switch (state.thestate) { case state_post: @@ -387,12 +388,16 @@ void gdata // don't count processor-local interpolations if (proc() != src->proc()) { // if this is a destination processor: increment its recv buffer size + vector<comm_state::procbufdesc>& procbufs = + state.typebufs.at(c_datatype()).procbufs; if (proc() == dist::rank()) { - state.collbufs.at(src->proc()).recvbufsize += box.size(); + procbufs.at(src->proc()).recvbufsize += box.size(); + state.typebufs.at(c_datatype()).in_use = true; } // if this is a source processor: increment its send buffer size if (src->proc() == dist::rank()) { - state.collbufs.at(proc()).sendbufsize += box.size(); + procbufs.at(proc()).sendbufsize += box.size(); + state.typebufs.at(c_datatype()).in_use = true; } } break; @@ -413,12 +418,13 @@ void gdata // if this is a destination processor and data has already been received // from the source processor: copy it from the recv buffer // (the processor-local case is not handled here) - if (proc() == dist::rank() && state.recvbuffers_ready.at(src->proc())) { + if (proc() == dist::rank() and + state.recvbuffers_ready.at(dist::size()*c_datatype() + src->proc())) { copy_from_recvbuffer(state, src, box); } break; default: - assert(0 && "invalid state"); + assert(0 and "invalid state"); } } @@ -482,33 +488,32 @@ void gdata // interpolate to remote processor const gdata* src = srcs.at(0); assert (src->_has_storage); - assert (state.collbufs.at(proc()).sendbuf - - state.collbufs.at(proc()).sendbufbase <= - (state.collbufs.at(proc()).sendbufsize - box.size()) * - state.vartypesize); + int& datatypesize = state.typebufs.at(c_datatype()).datatypesize; + comm_state::procbufdesc& procbuf = + state.typebufs.at(c_datatype()).procbufs.at(proc()); + assert (procbuf.sendbuf - procbuf.sendbufbase <= + (procbuf.sendbufsize - box.size()) * datatypesize); assert (src->has_storage()); - assert (proc() < state.collbufs.size()); - int fillstate = (state.collbufs[proc()].sendbuf + - box.size()*state.vartypesize) - - state.collbufs[proc()].sendbufbase; - assert (fillstate <= state.collbufs[proc()].sendbufsize*state.vartypesize); + int fillstate = (procbuf.sendbuf + box.size()*datatypesize) - + procbuf.sendbufbase; + assert (fillstate <= procbuf.sendbufsize * datatypesize); // interpolate this processor's data into the send buffer gdata* tmp = src->make_typed (varindex, transport_operator, tag); - tmp->allocate (box, src->proc(), state.collbufs[proc()].sendbuf); + tmp->allocate (box, src->proc(), procbuf.sendbuf); tmp->interpolate_from_innerloop (srcs, times, box, time, order_space, order_time); delete tmp; // advance send buffer to point to the next ibbox slot - state.collbufs[proc()].sendbuf += state.vartypesize * box.size(); + procbuf.sendbuf += datatypesize * box.size(); // post the send if the buffer is full - if (fillstate == state.collbufs[proc()].sendbufsize*state.vartypesize) { - MPI_Isend (state.collbufs[proc()].sendbufbase, - state.collbufs[proc()].sendbufsize, - state.datatype, proc(), 0, dist::comm, - &state.srequests[proc()]); + if (fillstate == procbuf.sendbufsize*datatypesize) { + MPI_Isend (procbuf.sendbufbase, procbuf.sendbufsize, + state.typebufs.at(c_datatype()).mpi_datatype, + proc(), c_datatype(), dist::comm, + &state.srequests.at(dist::size()*c_datatype() + proc())); } } } |