#include #include "MPIutils.hh" #include "MPIO.hh" #define CollectTime 0 /* not collecting timing data right now */ void MPIO::sendSlice(int z,float *data,MPI_Request *req){ int iz = z - localorigin[2]; if(iz<0 || iz>=localdims[2]) return; if(syncsend) comm.isSend(root,z,localdims[0]*localdims[1], data+localdims[0]*localdims[1]*iz,req[iz]); else comm.iSend(root,z,localdims[0]*localdims[1], data+localdims[0]*localdims[1]*iz,req[iz]); } void MPIO::requestSlice(int z,float *slicebuffer,MPI_Request *req){ // skip to z //loop (foreach p of pe's) if(!isRoot()) return; //printf("PE(%u) request slice %u\n",myid,z); for(int offset=0,p=0,reqindex=0;p=idims[2]) continue; comm.iRecv(p,z,idims[0]*idims[1],slicebuffer+offset,req[reqindex++]); //printf("reqslice(%u):MPI_Request=%u\n",iz,req[reqindex-1]); offset+=idims[0]*idims[1]; } // break if outside of z-range (dangerous if non-canonical proc layout) } void MPIO::waitForSlice(int z,float *slicebuffer,float *destbuffer,MPI_Request *req){ if(!isRoot()) return; // could do this 8k at a time // loop for each p of pe's // if within z-range, compute nelem based on dims at p // wait for current request. // re-copy based on offset & origin if(CollectTime) sliceallwait.start(); for(int p=0,reqindex=0;p=idims[2]) continue; MPI_Status stat; //printf("waitfor(%u):MPI_Request=%u\n",iz,req[reqindex]); if(CollectTime) slicewait.start();// this is a timer (start the timer) comm.wait(req[reqindex++],stat); // frees request object too if(CollectTime) slicewait.stop(); // get information out of the status object!!!! if(CollectTime) slicecollect.start(); int offset = iorigin[1]*globaldims[0] + iorigin[0]; for(int idx=0,pos=offset,j=0;j=localdims[2]) return; if(syncsend) comm.isSend(root,z,localdims[0]*localdims[1], data+localdims[0]*localdims[1]*iz,req[iz]); else comm.iSend(root,z,localdims[0]*localdims[1], data+localdims[0]*localdims[1]*iz,req[iz]); } void MPIO::requestSlice(int z,double *slicebuffer,MPI_Request *req){ // skip to z //loop (foreach p of pe's) if(!isRoot()) return; //printf("PE(%u) request slice %u\n",myid,z); for(int offset=0,p=0,reqindex=0;p=idims[2]) continue; comm.iRecv(p,z,idims[0]*idims[1],slicebuffer+offset,req[reqindex++]); //printf("reqslice(%u):MPI_Request=%u\n",iz,req[reqindex-1]); offset+=idims[0]*idims[1]; } // break if outside of z-range (dangerous if non-canonical proc layout) } void MPIO::waitForSlice(int z,double *slicebuffer,double *destbuffer,MPI_Request *req){ if(!isRoot()) return; // could do this 8k at a time // loop for each p of pe's // if within z-range, compute nelem based on dims at p // wait for current request. // re-copy based on offset & origin if(CollectTime) sliceallwait.start(); for(int p=0,reqindex=0;p=idims[2]) continue; MPI_Status stat; //printf("waitfor(%u):MPI_Request=%u\n",iz,req[reqindex]); if(CollectTime) slicewait.start();// this is a timer (start the timer) comm.wait(req[reqindex++],stat); // frees request object too if(CollectTime) slicewait.stop(); // get information out of the status object!!!! if(CollectTime) slicecollect.start(); int offset = iorigin[1]*globaldims[0] + iorigin[0]; for(int idx=0,pos=offset,j=0;j1){ fprintf(stderr,"MPIO only accepts a single IO node right now.\n"); fprintf(stderr,"You chose %u io nodes\n",count); fprintf(stderr,"I will select only the first one\n"); } for(root=-1,p=0;preserveChunk operation if(isRoot()) file->reserveStream(IObase::Float32,3,globaldims); int writecount=0; for(int i=1;iwriteStream(slicebuffer[2],globaldims[0]*globaldims[1]); slicewrite.stop(); } } if(isRoot()){ int n=globaldims[2]-1; waitForSlice(n,slicebuffer[(n)%2],slicebuffer[2],recvreq[(n)%2]); file->writeStream(slicebuffer[2],globaldims[0]*globaldims[1]); writecount+=globaldims[0]*globaldims[1]; // do requestfree (double-buffered request free-s) for(int i=0;i<3;i++) delete slicebuffer[i]; // free everything up for(int i=0;i<2;i++) delete recvreq[i]; } } void MPIO::asyncWrite(double *data){ if(sendreq) return; sendreq = new MPI_Request[localdims[2]]; typedef double *doubleP; // int sliceindex=0; doubleP slicebuffer[3]; // double-buffer the slices as they arrive typedef MPI_Request *MPI_RequestP; MPI_RequestP recvreq[2]; // the third one is a scratch buffer for reorganizing the slice if(isRoot()){ for(int i=0;i<3;i++) slicebuffer[i] = new double[globaldims[0]*globaldims[1]]; for(int i=0;i<2;i++) recvreq[i] = new MPI_Request[comm.numProcs()]; } if(isRoot()){ // puts("request initial slice"); requestSlice(0,slicebuffer[0],recvreq[0]); } // not good! Everyone is sending a slice here! sendSlice(0,data,sendreq); // hide latency behind the costly io->reserveChunk operation if(isRoot()) file->reserveStream(IObase::Float64,3,globaldims); int writecount=0; for(int i=1;iwriteStream(slicebuffer[2],globaldims[0]*globaldims[1]); slicewrite.stop(); } } if(isRoot()){ int n = globaldims[2]-1; waitForSlice(n,slicebuffer[(n)%2],slicebuffer[2],recvreq[(n)%2]); file->writeStream(slicebuffer[2],globaldims[0]*globaldims[1]); writecount+=globaldims[0]*globaldims[1]; // do requestfree (double-buffered request free-s) for(int i=0;i<3;i++) delete slicebuffer[i]; // free everything up for(int i=0;i<2;i++) delete recvreq[i]; } } void MPIO::asyncFinalize(){ MPI_Status stat; if(!sendreq) return; for(int i=0;i