aboutsummaryrefslogtreecommitdiff
path: root/Carpet/CarpetLib/src/gdata.cc
diff options
context:
space:
mode:
authorThomas Radke <tradke@aei.mpg.de>2005-08-15 15:00:00 +0000
committerThomas Radke <tradke@aei.mpg.de>2005-08-15 15:00:00 +0000
commitb3405326ebf651b20b4c44423df62ef23a1bf8f2 (patch)
tree3f28fb7697589ff69ab3ac283cd0064dba340522 /Carpet/CarpetLib/src/gdata.cc
parent8493c61f465169c3d52b53b5023680a0d33f898c (diff)
Carpet*: generalise the comm_state class for collective buffer communications
CarpetLib's comm_state class (actually, it's still just a struct) has been extended to handle collective buffer communications for all possible C datatypes at the same time. This makes it unnecessary for the higher-level communication routines to loop over each individual datatype separately. darcs-hash:20050815150023-776a0-dddc1aca7ccaebae872f9f451b2c3595cd951fed.gz
Diffstat (limited to 'Carpet/CarpetLib/src/gdata.cc')
-rw-r--r--Carpet/CarpetLib/src/gdata.cc163
1 files changed, 84 insertions, 79 deletions
diff --git a/Carpet/CarpetLib/src/gdata.cc b/Carpet/CarpetLib/src/gdata.cc
index 3b50abbee..0d85915d2 100644
--- a/Carpet/CarpetLib/src/gdata.cc
+++ b/Carpet/CarpetLib/src/gdata.cc
@@ -37,9 +37,11 @@ static int nexttag ()
// Constructors
-gdata::gdata (const int varindex_, const operator_type transport_operator_,
+gdata::gdata (const int varindex_,
+ const operator_type transport_operator_,
const int tag_)
- : varindex(varindex_), transport_operator(transport_operator_),
+ : varindex(varindex_),
+ transport_operator(transport_operator_),
_has_storage(false),
comm_active(false),
tag(tag_ >= 0 ? tag_ : nexttag())
@@ -75,7 +77,7 @@ void gdata::change_processor (comm_state& state,
change_processor_wait (state, newproc, mem);
break;
default:
- assert(0 && "invalid state");
+ assert(0 and "invalid state");
}
}
@@ -85,18 +87,18 @@ void gdata::change_processor (comm_state& state,
void gdata::copy_from (comm_state& state,
const gdata* src, const ibbox& box)
{
- assert (has_storage() && src->has_storage());
+ assert (has_storage() and src->has_storage());
assert (all(box.lower()>=extent().lower()
- && box.lower()>=src->extent().lower()));
+ and box.lower()>=src->extent().lower()));
assert (all(box.upper()<=extent().upper()
- && box.upper()<=src->extent().upper()));
+ and box.upper()<=src->extent().upper()));
assert (all(box.stride()==extent().stride()
- && box.stride()==src->extent().stride()));
+ and box.stride()==src->extent().stride()));
assert (all((box.lower()-extent().lower())%box.stride() == 0
- && (box.lower()-src->extent().lower())%box.stride() == 0));
+ and (box.lower()-src->extent().lower())%box.stride() == 0));
if (box.empty()) return;
- if (dist::rank() != proc() && dist::rank() != src->proc()) return;
+ if (dist::rank() != proc() and dist::rank() != src->proc()) return;
switch (state.thestate) {
case state_post:
@@ -117,12 +119,16 @@ void gdata::copy_from (comm_state& state,
// don't count processor-local copies
if (proc() != src->proc()) {
// if this is a destination processor: advance its recv buffer size
+ vector<comm_state::procbufdesc>& procbufs =
+ state.typebufs.at(c_datatype()).procbufs;
if (proc() == dist::rank()) {
- state.collbufs.at(src->proc()).recvbufsize += box.size();
+ procbufs.at(src->proc()).recvbufsize += box.size();
+ state.typebufs.at(c_datatype()).in_use = true;
}
// if this is a source processor: increment its send buffer size
if (src->proc() == dist::rank()) {
- state.collbufs.at(proc()).sendbufsize += box.size();
+ procbufs.at(proc()).sendbufsize += box.size();
+ state.typebufs.at(c_datatype()).in_use = true;
}
}
break;
@@ -142,13 +148,14 @@ void gdata::copy_from (comm_state& state,
case state_empty_recv_buffers:
// if this is a destination processor and data has already been received
// from the source processor: copy it from the recv buffer
- if (proc() == dist::rank() && state.recvbuffers_ready.at(src->proc())) {
+ if (proc() == dist::rank() and
+ state.recvbuffers_ready.at(dist::size()*c_datatype() + src->proc())) {
copy_from_recvbuffer (state, src, box);
}
break;
default:
- assert(0 && "invalid state");
+ assert(0 and "invalid state");
}
}
@@ -187,24 +194,21 @@ void gdata::copy_from_post (comm_state& state,
ivect items = (box.upper() - box.lower()) / box.stride() + 1;
ivect offs = (box.lower() - ext.lower()) / ext.stride();
char* send_buffer = (char*) b->pointer();
- const int vartype = CCTK_VarTypeI(varindex);
- const int vartypesize = CCTK_VarTypeSize(vartype);
- assert(vartypesize >= 0);
+ int& datatypesize = state.typebufs.at(c_datatype()).datatypesize;
for (int k = 0; k < items[2]; k++) {
for (int j = 0; j < items[1]; j++) {
int i = offs[0] + shape[0]*((j+offs[1]) + shape[1]*(k+offs[2]));
- memcpy (send_buffer,
- ((char*) src->storage()) + vartypesize*i,
- vartypesize*items[0]);
- send_buffer += vartypesize*items[0];
+ memcpy (send_buffer, ((char*) src->storage()) + datatypesize*i,
+ datatypesize * items[0]);
+ send_buffer += datatypesize * items[0];
}
}
wtime_copyfrom_sendinner_copy.stop();
wtime_copyfrom_sendinner_send.start();
MPI_Isend (b->pointer(), b->size(), b->datatype(), proc(),
- tag, dist::comm, &b->request);
+ tag, dist::comm, &b->request);
wtime_copyfrom_sendinner_send.stop();
state.requests.push_back (b->request);
state.sendbufs.push (b);
@@ -220,7 +224,7 @@ void gdata::copy_from_wait (comm_state& state,
wtime_copyfrom_wait.start();
wtime_copyfrom_recvwaitinner_wait.start();
- if (! state.requests.empty()) {
+ if (not state.requests.empty()) {
// wait for all requests at once
MPI_Waitall (state.requests.size(), &state.requests.front(),
MPI_STATUSES_IGNORE);
@@ -240,17 +244,14 @@ void gdata::copy_from_wait (comm_state& state,
ivect items = (box.upper() - box.lower()) / box.stride() + 1;
ivect offs = (box.lower() - ext.lower()) / ext.stride();
const char* recv_buffer = (const char*) b->pointer();
- const int vartype = CCTK_VarTypeI(varindex);
- const int vartypesize = CCTK_VarTypeSize(vartype);
- assert(vartypesize >= 0);
+ int& datatypesize = state.typebufs.at(c_datatype()).datatypesize;
for (int k = 0; k < items[2]; k++) {
for (int j = 0; j < items[1]; j++) {
int i = offs[0] + shape[0]*((j+offs[1]) + shape[1]*(k+offs[2]));
- memcpy (((char*) storage()) + vartypesize*i,
- recv_buffer,
- vartypesize*items[0]);
- recv_buffer += vartypesize*items[0];
+ memcpy (((char*) storage()) + datatypesize*i, recv_buffer,
+ datatypesize * items[0]);
+ recv_buffer += datatypesize * items[0];
}
}
wtime_copyfrom_recvwaitinner_copy.stop();
@@ -276,15 +277,14 @@ void gdata::copy_into_sendbuffer (comm_state& state,
} else {
// copy to remote processor
assert (src->_has_storage);
- assert (state.collbufs.at(proc()).sendbuf -
- state.collbufs.at(proc()).sendbufbase <=
- (state.collbufs.at(proc()).sendbufsize - box.size()) *
- state.vartypesize);
- assert (proc() < state.collbufs.size());
- int fillstate = (state.collbufs[proc()].sendbuf +
- box.size()*state.vartypesize) -
- state.collbufs[proc()].sendbufbase;
- assert (fillstate <= state.collbufs[proc()].sendbufsize*state.vartypesize);
+ int& datatypesize = state.typebufs.at(c_datatype()).datatypesize;
+ comm_state::procbufdesc& procbuf =
+ state.typebufs.at(c_datatype()).procbufs.at(proc());
+ assert (procbuf.sendbuf - procbuf.sendbufbase <=
+ (procbuf.sendbufsize - box.size()) * datatypesize);
+ int fillstate = procbuf.sendbuf + box.size()*datatypesize -
+ procbuf.sendbufbase;
+ assert (fillstate <= procbuf.sendbufsize * datatypesize);
// copy this processor's data into the send buffer
const ibbox& ext = src->extent();
@@ -292,22 +292,23 @@ void gdata::copy_into_sendbuffer (comm_state& state,
ivect items = (box.upper() - box.lower()) / box.stride() + 1;
ivect offs = (box.lower() - ext.lower()) / ext.stride();
+ assert (dim == 3);
for (int k = 0; k < items[2]; k++) {
for (int j = 0; j < items[1]; j++) {
int i = offs[0] + shape[0]*((j+offs[1]) + shape[1]*(k+offs[2]));
- memcpy (state.collbufs[proc()].sendbuf,
- ((const char*) src->storage()) + state.vartypesize*i,
- state.vartypesize*items[0]);
- state.collbufs[proc()].sendbuf += state.vartypesize*items[0];
+ memcpy (procbuf.sendbuf,
+ ((const char*) src->storage()) + datatypesize*i,
+ datatypesize * items[0]);
+ procbuf.sendbuf += datatypesize * items[0];
}
}
// post the send if the buffer is full
- if (fillstate == state.collbufs[proc()].sendbufsize*state.vartypesize) {
- MPI_Isend (state.collbufs[proc()].sendbufbase,
- state.collbufs[proc()].sendbufsize,
- state.datatype, proc(), 0, dist::comm,
- &state.srequests[proc()]);
+ if (fillstate == procbuf.sendbufsize * datatypesize) {
+ MPI_Isend (procbuf.sendbufbase, procbuf.sendbufsize,
+ state.typebufs.at(c_datatype()).mpi_datatype,
+ proc(), c_datatype(), dist::comm,
+ &state.srequests.at(dist::size()*c_datatype() + proc()));
}
}
}
@@ -318,11 +319,11 @@ void gdata::copy_into_sendbuffer (comm_state& state,
void gdata::copy_from_recvbuffer (comm_state& state,
const gdata* src, const ibbox& box)
{
- assert (src->proc() < state.collbufs.size());
- assert (state.collbufs[src->proc()].recvbuf -
- state.collbufs[src->proc()].recvbufbase <=
- (state.collbufs[src->proc()].recvbufsize-box.size()) *
- state.vartypesize);
+ int& datatypesize = state.typebufs.at(c_datatype()).datatypesize;
+ comm_state::procbufdesc& procbuf =
+ state.typebufs.at(c_datatype()).procbufs.at(src->proc());
+ assert (procbuf.recvbuf - procbuf.recvbufbase <=
+ (procbuf.recvbufsize-box.size()) * datatypesize);
// copy this processor's data from the recv buffer
const ibbox& ext = extent();
@@ -330,13 +331,13 @@ void gdata::copy_from_recvbuffer (comm_state& state,
ivect items = (box.upper() - box.lower()) / box.stride() + 1;
ivect offs = (box.lower() - ext.lower()) / ext.stride();
+ assert (dim == 3);
for (int k = 0; k < items[2]; k++) {
for (int j = 0; j < items[1]; j++) {
int i = offs[0] + shape[0]*((j+offs[1]) + shape[1]*(k+offs[2]));
- memcpy (((char*) storage()) + state.vartypesize*i,
- state.collbufs[src->proc()].recvbuf,
- state.vartypesize*items[0]);
- state.collbufs[src->proc()].recvbuf += state.vartypesize*items[0];
+ memcpy (((char*) storage()) + datatypesize*i,
+ procbuf.recvbuf, datatypesize * items[0]);
+ procbuf.recvbuf += datatypesize * items[0];
}
}
}
@@ -358,15 +359,15 @@ void gdata
assert (all(box.upper()<=extent().upper()));
assert (all(box.stride()==extent().stride()));
assert (all((box.lower()-extent().lower())%box.stride() == 0));
- assert (srcs.size() == times.size() && srcs.size()>0);
+ assert (srcs.size() == times.size() and srcs.size()>0);
for (int t=0; t<(int)srcs.size(); ++t) {
assert (srcs.at(t)->has_storage());
assert (all(box.lower()>=srcs.at(t)->extent().lower()));
assert (all(box.upper()<=srcs.at(t)->extent().upper()));
}
- assert (! box.empty());
+ assert (not box.empty());
const gdata* src = srcs.at(0);
- if (dist::rank() != proc() && dist::rank() != src->proc()) return;
+ if (dist::rank() != proc() and dist::rank() != src->proc()) return;
switch (state.thestate) {
case state_post:
@@ -387,12 +388,16 @@ void gdata
// don't count processor-local interpolations
if (proc() != src->proc()) {
// if this is a destination processor: increment its recv buffer size
+ vector<comm_state::procbufdesc>& procbufs =
+ state.typebufs.at(c_datatype()).procbufs;
if (proc() == dist::rank()) {
- state.collbufs.at(src->proc()).recvbufsize += box.size();
+ procbufs.at(src->proc()).recvbufsize += box.size();
+ state.typebufs.at(c_datatype()).in_use = true;
}
// if this is a source processor: increment its send buffer size
if (src->proc() == dist::rank()) {
- state.collbufs.at(proc()).sendbufsize += box.size();
+ procbufs.at(proc()).sendbufsize += box.size();
+ state.typebufs.at(c_datatype()).in_use = true;
}
}
break;
@@ -413,12 +418,13 @@ void gdata
// if this is a destination processor and data has already been received
// from the source processor: copy it from the recv buffer
// (the processor-local case is not handled here)
- if (proc() == dist::rank() && state.recvbuffers_ready.at(src->proc())) {
+ if (proc() == dist::rank() and
+ state.recvbuffers_ready.at(dist::size()*c_datatype() + src->proc())) {
copy_from_recvbuffer(state, src, box);
}
break;
default:
- assert(0 && "invalid state");
+ assert(0 and "invalid state");
}
}
@@ -482,33 +488,32 @@ void gdata
// interpolate to remote processor
const gdata* src = srcs.at(0);
assert (src->_has_storage);
- assert (state.collbufs.at(proc()).sendbuf -
- state.collbufs.at(proc()).sendbufbase <=
- (state.collbufs.at(proc()).sendbufsize - box.size()) *
- state.vartypesize);
+ int& datatypesize = state.typebufs.at(c_datatype()).datatypesize;
+ comm_state::procbufdesc& procbuf =
+ state.typebufs.at(c_datatype()).procbufs.at(proc());
+ assert (procbuf.sendbuf - procbuf.sendbufbase <=
+ (procbuf.sendbufsize - box.size()) * datatypesize);
assert (src->has_storage());
- assert (proc() < state.collbufs.size());
- int fillstate = (state.collbufs[proc()].sendbuf +
- box.size()*state.vartypesize) -
- state.collbufs[proc()].sendbufbase;
- assert (fillstate <= state.collbufs[proc()].sendbufsize*state.vartypesize);
+ int fillstate = (procbuf.sendbuf + box.size()*datatypesize) -
+ procbuf.sendbufbase;
+ assert (fillstate <= procbuf.sendbufsize * datatypesize);
// interpolate this processor's data into the send buffer
gdata* tmp = src->make_typed (varindex, transport_operator, tag);
- tmp->allocate (box, src->proc(), state.collbufs[proc()].sendbuf);
+ tmp->allocate (box, src->proc(), procbuf.sendbuf);
tmp->interpolate_from_innerloop (srcs, times, box, time,
order_space, order_time);
delete tmp;
// advance send buffer to point to the next ibbox slot
- state.collbufs[proc()].sendbuf += state.vartypesize * box.size();
+ procbuf.sendbuf += datatypesize * box.size();
// post the send if the buffer is full
- if (fillstate == state.collbufs[proc()].sendbufsize*state.vartypesize) {
- MPI_Isend (state.collbufs[proc()].sendbufbase,
- state.collbufs[proc()].sendbufsize,
- state.datatype, proc(), 0, dist::comm,
- &state.srequests[proc()]);
+ if (fillstate == procbuf.sendbufsize*datatypesize) {
+ MPI_Isend (procbuf.sendbufbase, procbuf.sendbufsize,
+ state.typebufs.at(c_datatype()).mpi_datatype,
+ proc(), c_datatype(), dist::comm,
+ &state.srequests.at(dist::size()*c_datatype() + proc()));
}
}
}