aboutsummaryrefslogtreecommitdiff
path: root/Carpet/CarpetLib/src/commstate.cc
diff options
context:
space:
mode:
authorThomas Radke <tradke@aei.mpg.de>2005-08-15 15:00:00 +0000
committerThomas Radke <tradke@aei.mpg.de>2005-08-15 15:00:00 +0000
commitb3405326ebf651b20b4c44423df62ef23a1bf8f2 (patch)
tree3f28fb7697589ff69ab3ac283cd0064dba340522 /Carpet/CarpetLib/src/commstate.cc
parent8493c61f465169c3d52b53b5023680a0d33f898c (diff)
Carpet*: generalise the comm_state class for collective buffer communications
CarpetLib's comm_state class (actually, it's still just a struct) has been extended to handle collective buffer communications for all possible C datatypes at the same time. This makes it unnecessary for the higher-level communication routines to loop over each individual datatype separately. darcs-hash:20050815150023-776a0-dddc1aca7ccaebae872f9f451b2c3595cd951fed.gz
Diffstat (limited to 'Carpet/CarpetLib/src/commstate.cc')
-rw-r--r--Carpet/CarpetLib/src/commstate.cc145
1 files changed, 77 insertions, 68 deletions
diff --git a/Carpet/CarpetLib/src/commstate.cc b/Carpet/CarpetLib/src/commstate.cc
index be3e485a2..1bd9a2a8c 100644
--- a/Carpet/CarpetLib/src/commstate.cc
+++ b/Carpet/CarpetLib/src/commstate.cc
@@ -10,54 +10,61 @@
// Communication state control
-comm_state::comm_state (int vartype_) : vartype(vartype_)
+comm_state::comm_state ()
{
- // If this comm state is created with a valid (ie. non-negative) CCTK vartype
- // then it is assumed to be used in collective communications for that type
- // later on, ie. it will step through
+ // If CarpetLib::use_collective_communication_buffers is set to true,
+ // this comm_state object will use collective communications,
+ // ie. it will step through
// state_get_buffer_sizes
// state_fill_send_buffers
// state_empty_recv_buffers
- // This path needs the size of the vartype, the corresponding MPI datatype
- // and the collective communication buffers initialized also.
//
- // If this comm state is created with a negative vartype then individual
- // communications on single components are used by stepping through
+ // If CarpetLib::use_collective_communication_buffers is false
+ // then individual communications on single components are used
+ // by stepping through
// state_post
// state_wait
DECLARE_CCTK_PARAMETERS;
- uses_collective_communication_buffers =
- use_collective_communication_buffers && vartype >= 0;
+ thestate = use_collective_communication_buffers ?
+ state_get_buffer_sizes : state_post;
- if (uses_collective_communication_buffers) {
- vartypesize = CCTK_VarTypeSize (vartype);
- assert (vartypesize > 0);
-
- thestate = state_get_buffer_sizes;
- switch (vartype) {
-#define TYPECASE(N,T) \
- case N: { T dummy; datatype = dist::datatype(dummy); } break;
-#include "carpet_typecase.hh"
-#undef TYPECASE
- default: assert (0 && "invalid datatype");
- }
- collbufs.resize (dist::size());
- rrequests.resize (dist::size(), MPI_REQUEST_NULL);
- srequests.resize (dist::size(), MPI_REQUEST_NULL);
- recvbuffers_ready.resize (dist::size());
+ typebufs.resize (dist::c_ndatatypes());
+ for (int type = 0; type < typebufs.size(); type++) {
+ typebufs[type].procbufs.resize(dist::size());
+ }
- } else {
- thestate = state_post;
+#define INSTANTIATE(T) \
+ { \
+ T dummy; \
+ int type = dist::c_datatype (dummy); \
+ typebufs.at(type).datatypesize = sizeof (dummy); \
+ typebufs.at(type).mpi_datatype = dist::datatype (dummy); \
}
+#include "instantiate"
+#undef INSTANTIATE
+
+ recvbuffers_ready.resize (dist::c_ndatatypes() * dist::size());
+ srequests.resize (dist::c_ndatatypes() * dist::size(), MPI_REQUEST_NULL);
+ rrequests.resize (dist::c_ndatatypes() * dist::size(), MPI_REQUEST_NULL);
}
+
void comm_state::step ()
{
assert (thestate != state_done);
- if (uses_collective_communication_buffers) {
- switch (thestate) {
+ switch (thestate) {
+ case state_post:
+ // After all sends/recvs have been posted in 'state_post'
+ // now wait for their completion in 'state_wait'.
+ thestate = state_wait;
+ break;
+
+ case state_wait:
+ thestate = state_done;
+ break;
+
case state_get_buffer_sizes:
// The sizes of the collective communication buffers are known
// so now allocate them.
@@ -65,21 +72,33 @@ void comm_state::step ()
// (a clever MPI layer may take advantage of such early posting).
num_posted_recvs = num_completed_recvs = 0;
- for (size_t i = 0; i < collbufs.size(); i++) {
- collbufs[i].sendbufbase = new char[collbufs[i].sendbufsize*vartypesize];
- collbufs[i].recvbufbase = new char[collbufs[i].recvbufsize*vartypesize];
- collbufs[i].sendbuf = collbufs[i].sendbufbase;
- collbufs[i].recvbuf = collbufs[i].recvbufbase;
+ for (int type = 0; type < typebufs.size(); type++) {
+
+ // skip unused datatype buffers
+ if (not typebufs[type].in_use) {
+ continue;
+ }
- if (collbufs[i].recvbufsize > 0) {
- MPI_Irecv (collbufs[i].recvbufbase, collbufs[i].recvbufsize,
- datatype, i, MPI_ANY_TAG, dist::comm, &rrequests[i]);
- num_posted_recvs++;
+ int& datatypesize = typebufs[type].datatypesize;
+ for (int proc = 0; proc < typebufs[type].procbufs.size(); proc++) {
+ procbufdesc& procbuf = typebufs[type].procbufs[proc];
+
+ procbuf.sendbufbase = new char[procbuf.sendbufsize*datatypesize];
+ procbuf.recvbufbase = new char[procbuf.recvbufsize*datatypesize];
+ procbuf.sendbuf = procbuf.sendbufbase;
+ procbuf.recvbuf = procbuf.recvbufbase;
+
+ if (procbuf.recvbufsize > 0) {
+ MPI_Irecv (procbuf.recvbufbase, procbuf.recvbufsize,
+ typebufs[type].mpi_datatype, proc, type,
+ dist::comm, &rrequests[dist::size()*type + proc]);
+ num_posted_recvs++;
+ }
}
}
// Now go and get the send buffers filled with data.
- // Once a buffer is full it will get posted right away
+ // Once a buffer is full it will be posted right away
// (see gdata::copy_into_sendbuffer() and
// gdata::interpolate_into_sendbuffer()).
thestate = state_fill_send_buffers;
@@ -92,14 +111,16 @@ void comm_state::step ()
case state_empty_recv_buffers:
// Finish (at least one of) the posted communications
- if (! AllPostedCommunicationsFinished ()) {
+ if (not AllPostedCommunicationsFinished ()) {
// No state change if there are still outstanding communications;
// do another comm_state loop iteration.
} else {
// Everything is done so release the collective communication buffers.
- for (size_t i = 0; i < collbufs.size(); i++) {
- delete[] collbufs[i].sendbufbase;
- delete[] collbufs[i].recvbufbase;
+ for (int type = 0; type < typebufs.size(); type++) {
+ for (int proc = 0; proc < typebufs[type].procbufs.size(); proc++) {
+ delete[] typebufs[type].procbufs[proc].sendbufbase;
+ delete[] typebufs[type].procbufs[proc].recvbufbase;
+ }
}
thestate = state_done;
}
@@ -107,33 +128,23 @@ void comm_state::step ()
default:
assert (0 && "invalid state");
- }
- } else {
- switch (thestate) {
- case state_post:
- // After all sends/recvs have been posted in 'state_post'
- // now wait for their completion in 'state_wait'.
- thestate = state_wait;
- break;
- case state_wait:
- thestate = state_done;
- break;
- default:
- assert (0 && "invalid state");
- }
}
}
+
bool comm_state::done ()
{
return thestate == state_done;
}
+
comm_state::~comm_state ()
{
- assert (thestate == state_done ||
- thestate == uses_collective_communication_buffers ?
- state_get_buffer_sizes : state_post);
+ DECLARE_CCTK_PARAMETERS;
+
+ assert (thestate == state_done or
+ thestate == (use_collective_communication_buffers ?
+ state_get_buffer_sizes : state_post));
assert (requests.empty());
}
@@ -141,14 +152,14 @@ comm_state::~comm_state ()
// wait for completion of posted collective buffer sends/receives
//
// Depending on the parameter CarpetLib::use_waitall, this function will wait
-// for all at once (true) or at least one (false) of the
-// posted receive operations to finish.
+// for all (true) or at least one (false) of the posted receive operations
+// to finish.
//
// It returns true if all posted communications have been completed.
bool comm_state::AllPostedCommunicationsFinished ()
{
DECLARE_CCTK_PARAMETERS;
-
+
// check if all outstanding receives have been completed already
if (num_posted_recvs == num_completed_recvs) {
// finalize the outstanding sends in one go
@@ -158,13 +169,11 @@ bool comm_state::AllPostedCommunicationsFinished ()
}
// reset completion flag for all receive buffers
- for (size_t i = 0; i < recvbuffers_ready.size(); i++) {
- recvbuffers_ready[i] = false;
- }
+ recvbuffers_ready.assign (recvbuffers_ready.size(), false);
if (use_waitall) {
// mark all posted recveive buffers as ready
- for (size_t i = 0; i < rrequests.size(); i++) {
+ for (size_t i = 0; i < recvbuffers_ready.size(); i++) {
recvbuffers_ready[i] = rrequests[i] != MPI_REQUEST_NULL;
}