aboutsummaryrefslogtreecommitdiff
path: root/src/MPIO.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/MPIO.cc')
-rw-r--r--src/MPIO.cc199
1 files changed, 199 insertions, 0 deletions
diff --git a/src/MPIO.cc b/src/MPIO.cc
new file mode 100644
index 0000000..dcba3f8
--- /dev/null
+++ b/src/MPIO.cc
@@ -0,0 +1,199 @@
+#include <stdio.h>
+#include "MPIO.hh"
+
+MPIO::MPIO(IEEEIO *io, MPIcomm &c):file(io),comm(c),sendreq(0){
+
+ for(int i=0;i<3;i++) globaldims[i]=localdims[i]=localorigin[i]=0;
+ myid=comm.rank();
+
+ int *rootarray=0;
+ if(!myid) rootarray=new int[comm.numProcs()];
+ int hasfile= file?1:0;
+ comm.gather(0,1,&hasfile,rootarray);
+ if(!myid){
+ int count,p;
+ for(count=0,p=0;p<comm.numProcs();p++){
+ if(rootarray[p]) { count++; if(count<=1) root=p; }
+ }
+ if(count<=0 || count>1){
+ fprintf(stderr,"MPIO only accepts a single IO node right now.\n");
+ fprintf(stderr,"You chose %u io nodes\n",count);
+ fprintf(stderr,"I will select only the first one\n");
+ }
+ }
+ for(int i=0;i<3;i++) slicebuffer[i]=0;
+ for(int i=0;i<2;i++) recvreq[i]=0;
+ comm.bcast(0,root); // tell everyone which proc is root
+ if(rootarray) {delete rootarray; rootarray=0;}
+}
+
+MPIO::~MPIO(){}
+
+void MPIO::setLocalDims(int rank,int *origin, int *dims){
+ if(rank!=3) perror("MPIO is only for 3D IO for now (sorry)!");
+ if(sendreq) { puts("Bad move!!! Die!!!"); exit(0); }
+ // now check to see if anything has changed
+ // actually we still need to do this collection because we
+ // don't know if something might have changed on some other processor
+ // URK...
+ allorigins = new int[3*comm.numProcs()];
+ alldims = new int[3*comm.numProcs()];
+ for(int i=0;i<3;i++) {
+ //printf("localdims-n-origin[%u] = %u:%u\n",i,dims[i],origin[i]);
+ localdims[i]=dims[i];
+ localorigin[i]=origin[i];
+ }
+ for(int i=0;i<comm.numProcs()*3;i++) alldims[i]=allorigins[i]=-1;
+ comm.gather(0,3,localdims,alldims);
+ comm.gather(0,3,localorigin,allorigins);
+
+ comm.bcast(0,comm.numProcs()*3,alldims);
+ comm.bcast(0,comm.numProcs()*3,allorigins);
+
+ // now everyone figures out the global dimensions
+ // from max offset+dims
+ for(int p=0,last=3*comm.numProcs();p<last;p++){
+ // if(myid) fprintf(stderr,"Proc[%u] p=%d p%3=%d size=%d\n\to:d = %d:%d\n",
+ // if(myid) fprintf(stderr,"Proc[%u] p=%d o:d = %d:%d\n",
+ // myid,p,/* p%3,alldims[p]+allorigins[p],*/
+ // allorigins[p],alldims[p]);
+ if(globaldims[p%3] < alldims[p]+allorigins[p])
+ globaldims[p%3] = alldims[p]+allorigins[p];
+ }
+ // nothing to rebroadcast here...
+ //fprintf(stderr,"PE(%u): setLocalDims: globaldims=%u,%u,%d\n",
+ // myid,globaldims[0],globaldims[1],globaldims[2]);
+ // lets re-compute the requests and stuff
+ if(sendreq) delete sendreq; // clean out the old stuff
+ sendreq = new MPI_Request[localdims[2]];
+ if(isRoot()){
+ for(int i=0;i<3;i++) {
+ if(slicebuffer[i]) delete slicebuffer[i];
+ slicebuffer[i] = new float[globaldims[0]*globaldims[1]*3+27]; // padded (extremely)
+ }
+ // printf("Slicebuffer size=%u\n",globaldims[0]*globaldims[1]*2);
+ for(int i=0;i<2;i++) {
+ if(recvreq[i]) delete recvreq[i];
+ recvreq[i] = new MPI_Request[comm.numProcs()*4+7]; // padded
+ }
+ }
+}
+
+void MPIO::asyncWrite(float *data){
+ //puts("Begin AsyncWrite");
+ // get things started by getting root to request initial slice
+ if(isRoot()){
+ requestSlice(0,slicebuffer[0],recvreq[0]);
+ }
+ // comm.print("Requested First Slice");
+ // now everyone sends their slice over
+ sendSlice(0,data,sendreq); // separate request for each slice
+ //comm.print("Sent First slice");
+ if(isRoot()){
+ file->reserveStream(IObase::Float32,3,globaldims);
+ }
+ for(int i=1;i<globaldims[2];i++){
+ if(isRoot()) // if I'm root, then make request for nex slice
+ requestSlice(i,slicebuffer[i%2],recvreq[i%2]);
+ sendSlice(i,data,sendreq); // separate sendreq for each slice
+ if(isRoot()) {
+ // waitfor slice and assemble into the slicebuffer
+ waitForSlice(i-1,slicebuffer[(i-1)%2],slicebuffer[2],recvreq[(i-1)%2]);
+ // now write assembled slice to disk stream
+ // can make disk buffer huge to simulate recieving multiple
+ // slices
+ //fprintf(stderr,"WriteStream Slice %u\n",i-1);
+ file->writeStream(slicebuffer[2],globaldims[0]*globaldims[1]);
+ }
+ }
+ if(isRoot()){
+ int n = globaldims[2]-1;
+ waitForSlice(n,slicebuffer[(n)%2],slicebuffer[2],recvreq[(n)%2]);
+ file->writeStream(slicebuffer[2],globaldims[0]*globaldims[1]);
+ // do requestfree (double-buffered request free-s)
+ //for(int i=0;i<3;i++) delete slicebuffer[i]; // free everything up
+ //for(int i=0;i<2;i++) delete recvreq[i];
+ }
+ //**** At end (matching deletes) ****
+ // fprintf(stderr,"Cleanup proc %u\n",myid);
+ // comm.print("Cleanup");
+ for(int n=0;n<localdims[2];n++){
+ MPI_Status stat;
+ comm.wait(sendreq[n],stat);
+ }
+ // perhaps should be a persistent variable
+ // delete sendreq;
+ //sendreq=0; // null it out so we know we are done
+}
+
+void MPIO::sendSlice(int z,float *data,MPI_Request *req){
+ int lz = z - localorigin[2]; // find local z-offset here
+ // filter out bogus requests
+ // send even redundant information though...
+ if(lz<0 || lz >=localdims[2]) return;
+ // fprintf(stderr,"\tSendSlice: Proc %u Slice %u sndindex=%u\n",
+ // myid,z,lz);
+ comm.iSend(root,z,
+ localdims[0]*localdims[1], // size of the data
+ data+localdims[0]*localdims[1]*lz,req[lz]);
+}
+
+int MPIO::requestSlice(int z,float *slicebuf, MPI_Request *req){
+ if(!isRoot()) return 0; // only root performs this operation
+ // for every processor that intersects this slice boundary,
+ // make a request for the contents of that slice!
+ int reqindex =0;
+ for(int offset=0,p=0;p<comm.numProcs();p++){
+ int idx = 3*p;
+ int *dims = alldims+idx;
+ int *origin = allorigins+idx; // need to pre-fix the origins here?
+ // thus to prevent mishandling of ghost zones here.
+ // or else have provision to support multi-reception of the same
+ // tile..... well actually we can do that already...
+ // we just have to have buckets for 2x more requests.
+ if(z>=origin[2] && z<origin[2]+dims[2]){
+ // fprintf(stderr,"\tRequestSlice: Proc %u Slice %u reqindex=%u\n",
+ // p,z,reqindex);
+ comm.iRecv(p,z,dims[0]*dims[1],slicebuf+offset,req[reqindex++]);
+ // fprintf(stderr,"\tSlicebuf offset=%u of size %u ++ end=%u vs sbsize=%u %u:%u @ %u:%u\n",
+ // offset,dims[0]*dims[1],
+ // offset+dims[0]*dims[1],globaldims[0]*globaldims[1]*2,
+ // globaldims[0],globaldims[1],dims[0],dims[1]);
+ offset+=dims[0]*dims[1];
+ }
+ }
+ return reqindex;
+}
+
+/*
+Problems: Slice size is too small for slice reception with ghost zones
+Also doesn't get proper offset to each slice in the slice buffer (maybe ok)
+
+ */
+
+void MPIO::waitForSlice(int z,float *slicebuf,
+ float *destbuffer,MPI_Request *req){
+ if(!isRoot()) return;
+ for(int p=0,reqindex=0;p<comm.numProcs();p++) {
+ MPI_Status stat;
+ int idx = 3*p;
+ int *dims = alldims+idx;
+ int *origin = allorigins+idx; // need to pre-fix the origins here?
+ // thus to prevent mishandling of ghost zones here.
+ if(z>=origin[2] && z<origin[2]+dims[2]){
+ // wiat for the data transfer to complete
+ //fprintf(stderr,"\tWaitSlice: Proc %u Slice %u reqindex=%u\n",
+ // p,z,reqindex);
+ comm.wait(req[reqindex++],stat); // frees request object too
+ // now we integrate the slices together. (data unscrambling phase)
+ //fprintf(stderr,"\t****Now Unscramble\n");
+ int offset = origin[1]*globaldims[0] + origin[0]; // initial offset for dst
+ for(int idx=0,pos=offset,j=0;j<dims[1];j++,pos+=(globaldims[0]-dims[0])){
+ for(int i=0;i<dims[0];i++){
+ destbuffer[pos++]=slicebuf[idx++]; // FUCK!!!! idx is questionable
+ }
+ }
+ // fprintf(stderr,"\t*****\n");
+ }
+ }
+}