diff options
Diffstat (limited to 'src/MPIO.old.cc')
-rw-r--r-- | src/MPIO.old.cc | 344 |
1 files changed, 344 insertions, 0 deletions
diff --git a/src/MPIO.old.cc b/src/MPIO.old.cc new file mode 100644 index 0000000..ef609f1 --- /dev/null +++ b/src/MPIO.old.cc @@ -0,0 +1,344 @@ +#include <stdio.h> +#include "MPIutils.hh" +#include "MPIO.hh" + +#define CollectTime 0 /* not collecting timing data right now */ + +void MPIO::sendSlice(int z,float *data,MPI_Request *req){ + int iz = z - localorigin[2]; + if(iz<0 || iz>=localdims[2]) return; + if(syncsend) + comm.isSend(root,z,localdims[0]*localdims[1], + data+localdims[0]*localdims[1]*iz,req[iz]); + else + comm.iSend(root,z,localdims[0]*localdims[1], + data+localdims[0]*localdims[1]*iz,req[iz]); +} + +void MPIO::requestSlice(int z,float *slicebuffer,MPI_Request *req){ + // skip to z + //loop (foreach p of pe's) + if(!isRoot()) return; + //printf("PE(%u) request slice %u\n",myid,z); + for(int offset=0,p=0,reqindex=0;p<comm.numProcs();p++) { + // if within z-range, compute nelem based on dims at p + int dindex = 3*p; + int *idims = gdims + dindex; + int *iorigin = gorigins + dindex; + int iz = z-iorigin[2]; + if(iz<0 || iz>=idims[2]) continue; + comm.iRecv(p,z,idims[0]*idims[1],slicebuffer+offset,req[reqindex++]); + //printf("reqslice(%u):MPI_Request=%u\n",iz,req[reqindex-1]); + offset+=idims[0]*idims[1]; + } + // break if outside of z-range (dangerous if non-canonical proc layout) +} + +void MPIO::waitForSlice(int z,float *slicebuffer,float *destbuffer,MPI_Request *req){ + if(!isRoot()) return; + // could do this 8k at a time + // loop for each p of pe's + // if within z-range, compute nelem based on dims at p + // wait for current request. + // re-copy based on offset & origin + if(CollectTime) sliceallwait.start(); + for(int p=0,reqindex=0;p<comm.numProcs();p++) { + // if within z-rang e, compute nelem based on dims at p + int dindex = 3*p; + int *idims = gdims + dindex; + int *iorigin = gorigins + dindex; + int iz = z-iorigin[2]; + if(iz<0 || iz>=idims[2]) continue; + MPI_Status stat; + //printf("waitfor(%u):MPI_Request=%u\n",iz,req[reqindex]); + if(CollectTime) slicewait.start();// this is a timer (start the timer) + comm.wait(req[reqindex++],stat); // frees request object too + if(CollectTime) slicewait.stop(); + // get information out of the status object!!!! + if(CollectTime) slicecollect.start(); + int offset = iorigin[1]*globaldims[0] + iorigin[0]; + for(int idx=0,pos=offset,j=0;j<idims[1];j++,pos+=(globaldims[0]-idims[0])) + for(int i=0;i<idims[0];i++) + destbuffer[pos++]=slicebuffer[idx++]; + if(CollectTime) slicecollect.stop(); + } + if(CollectTime) sliceallwait.stop(); + // proclist array + +} + +void MPIO::sendSlice(int z,double *data,MPI_Request *req){ + int iz = z - localorigin[2]; + if(iz<0 || iz>=localdims[2]) return; + if(syncsend) + comm.isSend(root,z,localdims[0]*localdims[1], + data+localdims[0]*localdims[1]*iz,req[iz]); + else + comm.iSend(root,z,localdims[0]*localdims[1], + data+localdims[0]*localdims[1]*iz,req[iz]); +} + +void MPIO::requestSlice(int z,double *slicebuffer,MPI_Request *req){ + // skip to z + //loop (foreach p of pe's) + if(!isRoot()) return; + //printf("PE(%u) request slice %u\n",myid,z); + for(int offset=0,p=0,reqindex=0;p<comm.numProcs();p++) { + // if within z-range, compute nelem based on dims at p + int dindex = 3*p; + int *idims = gdims + dindex; + int *iorigin = gorigins + dindex; + int iz = z-iorigin[2]; + if(iz<0 || iz>=idims[2]) continue; + comm.iRecv(p,z,idims[0]*idims[1],slicebuffer+offset,req[reqindex++]); + //printf("reqslice(%u):MPI_Request=%u\n",iz,req[reqindex-1]); + offset+=idims[0]*idims[1]; + } + // break if outside of z-range (dangerous if non-canonical proc layout) +} + +void MPIO::waitForSlice(int z,double *slicebuffer,double *destbuffer,MPI_Request *req){ + if(!isRoot()) return; + // could do this 8k at a time + // loop for each p of pe's + // if within z-range, compute nelem based on dims at p + // wait for current request. + // re-copy based on offset & origin + if(CollectTime) sliceallwait.start(); + for(int p=0,reqindex=0;p<comm.numProcs();p++) { + // if within z-rang e, compute nelem based on dims at p + int dindex = 3*p; + int *idims = gdims + dindex; + int *iorigin = gorigins + dindex; + int iz = z-iorigin[2]; + if(iz<0 || iz>=idims[2]) continue; + MPI_Status stat; + //printf("waitfor(%u):MPI_Request=%u\n",iz,req[reqindex]); + if(CollectTime) slicewait.start();// this is a timer (start the timer) + comm.wait(req[reqindex++],stat); // frees request object too + if(CollectTime) slicewait.stop(); + // get information out of the status object!!!! + if(CollectTime) slicecollect.start(); + int offset = iorigin[1]*globaldims[0] + iorigin[0]; + for(int idx=0,pos=offset,j=0;j<idims[1];j++,pos+=(globaldims[0]-idims[0])) + for(int i=0;i<idims[0];i++) + destbuffer[pos++]=slicebuffer[idx++]; + if(CollectTime) slicecollect.stop(); + } + if(CollectTime) sliceallwait.stop(); + // proclist array + +} + +MPIO::MPIO(IObase *io,MPIcomm &c):file(io),comm(c),sendreq(0){ + for(int i=0;i<3;i++) globaldims[i]=localdims[i]=localorigin[i]=0; + int *rootarray; + myid = comm.rank(); + //printf("my rank = %u\n",myid); + if(!myid) rootarray = new int[comm.numProcs()]; + int rootscale = file?1:0; + //printf("rootscale[pid=%u] = %u\n",myid,rootscale); + comm.gather(0,1,&rootscale,rootarray); // gather to 0 + globaldims[0]=globaldims[1]=globaldims[2]=0; + if(!myid){ + int count,p; + for(count=0,p=0;p<comm.numProcs();p++){ + //printf("rootarray[%u]=%u\n",p,rootarray[p]); + if(rootarray[p]) count++; + } + if(count<=0 || count>1){ + fprintf(stderr,"MPIO only accepts a single IO node right now.\n"); + fprintf(stderr,"You chose %u io nodes\n",count); + fprintf(stderr,"I will select only the first one\n"); + } + for(root=-1,p=0;p<comm.numProcs() && root<0;p++) + if(rootarray[p]) root=p; + delete rootarray; + } + comm.bcast(0,root); // now everyone knows root + //printf("broadcasting root node = %u\n",root); +} + +MPIO::~MPIO(){ + char buffer[32]; + if(CollectTime){ + if(myid==root) { + sprintf(buffer+7,"slicewait"); slicewait.print(buffer); + sprintf(buffer+7,"slicewrite"); slicewrite.print(buffer); + sprintf(buffer+7,"slicecollect"); slicecollect.print(buffer); + //sprintf(buffer+7,"sliceselect"); sliceselect.print(buffer); + sprintf(buffer+7,"sliceallwait"); sliceallwait.print(buffer); + } + } +} + +void MPIO::setLocalDims(int rank,int *origin, int *dims){ + if(rank!=3) perror("MPIO is only for 3D IO for now (sorry)!"); + if(sendreq) asyncFinalize(); + if(isRoot()){ + gorigins = new int[3*comm.numProcs()]; + gdims = new int[3*comm.numProcs()]; + } + else gdims=gorigins=0; // null for everyone else + for(int i=0;i<3;i++) { + //printf("localdims-n-origin[%u] = %u:%u\n",i,dims[i],origin[i]); + localdims[i]=dims[i]; + localorigin[i]=origin[i]; + ghostmin[i]=ghostmax[i]=0; + } + comm.gather(root,3,localdims,gdims); + comm.gather(root,3,localorigin,gorigins); + //if(isRoot()) for(i=0;i<3*comm.numProcs();i++) + //printf("gdims-n-origin[%u] = %u:%u\n",i,gdims[i],gorigins[i]); + globaldims[0]=globaldims[1]=globaldims[2]=0; + if(isRoot()) + for(int p=0,last=3*comm.numProcs();p<last;p++){ + //printf("globaldims[%u]: gdims[%u]+gorigins[%u] = %u\n", + // p%3,p,p,gdims[p],gorigins[p]); + if(globaldims[p%3] < gdims[p]+gorigins[p]) + globaldims[p%3] = gdims[p]+gorigins[p]; + } + // rebroadcast globaldims to all PE's + comm.bcast(root,3,globaldims); +} +void MPIO::setLocalDims(int rank,int *origin, int *dims, + int *nghostmin,int *nghostmax){ + setLocalDims(rank,origin,dims); + for(int i=0;i<rank;i++){ + ghostmin[i]=nghostmin[i]; + ghostmax[i]=nghostmax[i]; + } +} +int MPIO::write(IObase::DataType type,int rank,int *dims,void *data){ + int recalc_layout=0; + if(rank!=3) recalc_layout=1; + for(int i=0;i<rank;i++) if(localdims[i]!=dims[i]) recalc_layout=1; + if(recalc_layout) setLocalDims(rank,localorigin,dims); + return write(type,data); // for now, no error checking +} + +/* no layout recompute here. + This is for C and F77 support. */ +void MPIO::asyncWrite(IObase::DataType type,void *data){ + switch(type){ + case IObase::Float32: + asyncWrite((float*)data); + break; + case IObase::Float64: + asyncWrite((double*)data); + default: + break; + } +} + +int MPIO::write(IObase::DataType type,void *data){ + switch(type){ + case IObase::Float32: + write((float*)data); + break; + case IObase::Float64: + write((double*)data); + default: + break; + } + return 1; // for now, no error checking +} + +void MPIO::asyncWrite(float *data){ + if(sendreq) return; + sendreq = new MPI_Request[localdims[2]]; + typedef float *floatP; + // int sliceindex=0; + floatP slicebuffer[3]; // double-buffer the slices as they arrive + typedef MPI_Request *MPI_RequestP; + MPI_RequestP recvreq[2]; + // the third one is a scratch buffer for reorganizing the slice + if(isRoot()){ + for(int i=0;i<3;i++) slicebuffer[i] = new float[globaldims[0]*globaldims[1]]; + for(int i=0;i<2;i++) recvreq[i] = new MPI_Request[comm.numProcs()]; + } + if(isRoot()){ + // puts("request initial slice"); + requestSlice(0,slicebuffer[0],recvreq[0]); + } + // not good! Everyone is sending a slice here! + sendSlice(0,data,sendreq); + // hide latency behind the costly io->reserveChunk operation + if(isRoot()) file->reserveStream(IObase::Float32,3,globaldims); + int writecount=0; + for(int i=1;i<globaldims[2];i++){ + // comm.barrier(); + if(isRoot()) + requestSlice(i,slicebuffer[i%2],recvreq[i%2]); + sendSlice(i,data,sendreq); // jms + if(isRoot()) { + waitForSlice(i-1,slicebuffer[(i-1)%2],slicebuffer[2],recvreq[(i-1)%2]); + writecount+=globaldims[0]*globaldims[1]; + slicewrite.start(); + file->writeStream(slicebuffer[2],globaldims[0]*globaldims[1]); + slicewrite.stop(); + } + } + if(isRoot()){ + int n=globaldims[2]-1; + waitForSlice(n,slicebuffer[(n)%2],slicebuffer[2],recvreq[(n)%2]); + file->writeStream(slicebuffer[2],globaldims[0]*globaldims[1]); + writecount+=globaldims[0]*globaldims[1]; + // do requestfree (double-buffered request free-s) + for(int i=0;i<3;i++) delete slicebuffer[i]; // free everything up + for(int i=0;i<2;i++) delete recvreq[i]; + } +} + +void MPIO::asyncWrite(double *data){ + if(sendreq) return; + sendreq = new MPI_Request[localdims[2]]; + typedef double *doubleP; + // int sliceindex=0; + doubleP slicebuffer[3]; // double-buffer the slices as they arrive + typedef MPI_Request *MPI_RequestP; + MPI_RequestP recvreq[2]; + // the third one is a scratch buffer for reorganizing the slice + if(isRoot()){ + for(int i=0;i<3;i++) slicebuffer[i] = new double[globaldims[0]*globaldims[1]]; + for(int i=0;i<2;i++) recvreq[i] = new MPI_Request[comm.numProcs()]; + } + if(isRoot()){ + // puts("request initial slice"); + requestSlice(0,slicebuffer[0],recvreq[0]); + } + // not good! Everyone is sending a slice here! + sendSlice(0,data,sendreq); + // hide latency behind the costly io->reserveChunk operation + if(isRoot()) file->reserveStream(IObase::Float64,3,globaldims); + int writecount=0; + for(int i=1;i<globaldims[2];i++){ + // comm.barrier(); + if(isRoot()) + requestSlice(i,slicebuffer[i%2],recvreq[i%2]); + sendSlice(i,data,sendreq); // jms + if(isRoot()) { + waitForSlice(i-1,slicebuffer[(i-1)%2],slicebuffer[2],recvreq[(i-1)%2]); + writecount+=globaldims[0]*globaldims[1]; + slicewrite.start(); + file->writeStream(slicebuffer[2],globaldims[0]*globaldims[1]); + slicewrite.stop(); + } + } + if(isRoot()){ + int n = globaldims[2]-1; + waitForSlice(n,slicebuffer[(n)%2],slicebuffer[2],recvreq[(n)%2]); + file->writeStream(slicebuffer[2],globaldims[0]*globaldims[1]); + writecount+=globaldims[0]*globaldims[1]; + // do requestfree (double-buffered request free-s) + for(int i=0;i<3;i++) delete slicebuffer[i]; // free everything up + for(int i=0;i<2;i++) delete recvreq[i]; + } +} +void MPIO::asyncFinalize(){ + MPI_Status stat; + if(!sendreq) return; + for(int i=0;i<localdims[2];i++) comm.wait(sendreq[i],stat); + delete sendreq; + sendreq=0; +} |