diff options
Diffstat (limited to 'src/MPIO.cc')
-rw-r--r-- | src/MPIO.cc | 199 |
1 files changed, 199 insertions, 0 deletions
diff --git a/src/MPIO.cc b/src/MPIO.cc new file mode 100644 index 0000000..dcba3f8 --- /dev/null +++ b/src/MPIO.cc @@ -0,0 +1,199 @@ +#include <stdio.h> +#include "MPIO.hh" + +MPIO::MPIO(IEEEIO *io, MPIcomm &c):file(io),comm(c),sendreq(0){ + + for(int i=0;i<3;i++) globaldims[i]=localdims[i]=localorigin[i]=0; + myid=comm.rank(); + + int *rootarray=0; + if(!myid) rootarray=new int[comm.numProcs()]; + int hasfile= file?1:0; + comm.gather(0,1,&hasfile,rootarray); + if(!myid){ + int count,p; + for(count=0,p=0;p<comm.numProcs();p++){ + if(rootarray[p]) { count++; if(count<=1) root=p; } + } + if(count<=0 || count>1){ + fprintf(stderr,"MPIO only accepts a single IO node right now.\n"); + fprintf(stderr,"You chose %u io nodes\n",count); + fprintf(stderr,"I will select only the first one\n"); + } + } + for(int i=0;i<3;i++) slicebuffer[i]=0; + for(int i=0;i<2;i++) recvreq[i]=0; + comm.bcast(0,root); // tell everyone which proc is root + if(rootarray) {delete rootarray; rootarray=0;} +} + +MPIO::~MPIO(){} + +void MPIO::setLocalDims(int rank,int *origin, int *dims){ + if(rank!=3) perror("MPIO is only for 3D IO for now (sorry)!"); + if(sendreq) { puts("Bad move!!! Die!!!"); exit(0); } + // now check to see if anything has changed + // actually we still need to do this collection because we + // don't know if something might have changed on some other processor + // URK... + allorigins = new int[3*comm.numProcs()]; + alldims = new int[3*comm.numProcs()]; + for(int i=0;i<3;i++) { + //printf("localdims-n-origin[%u] = %u:%u\n",i,dims[i],origin[i]); + localdims[i]=dims[i]; + localorigin[i]=origin[i]; + } + for(int i=0;i<comm.numProcs()*3;i++) alldims[i]=allorigins[i]=-1; + comm.gather(0,3,localdims,alldims); + comm.gather(0,3,localorigin,allorigins); + + comm.bcast(0,comm.numProcs()*3,alldims); + comm.bcast(0,comm.numProcs()*3,allorigins); + + // now everyone figures out the global dimensions + // from max offset+dims + for(int p=0,last=3*comm.numProcs();p<last;p++){ + // if(myid) fprintf(stderr,"Proc[%u] p=%d p%3=%d size=%d\n\to:d = %d:%d\n", + // if(myid) fprintf(stderr,"Proc[%u] p=%d o:d = %d:%d\n", + // myid,p,/* p%3,alldims[p]+allorigins[p],*/ + // allorigins[p],alldims[p]); + if(globaldims[p%3] < alldims[p]+allorigins[p]) + globaldims[p%3] = alldims[p]+allorigins[p]; + } + // nothing to rebroadcast here... + //fprintf(stderr,"PE(%u): setLocalDims: globaldims=%u,%u,%d\n", + // myid,globaldims[0],globaldims[1],globaldims[2]); + // lets re-compute the requests and stuff + if(sendreq) delete sendreq; // clean out the old stuff + sendreq = new MPI_Request[localdims[2]]; + if(isRoot()){ + for(int i=0;i<3;i++) { + if(slicebuffer[i]) delete slicebuffer[i]; + slicebuffer[i] = new float[globaldims[0]*globaldims[1]*3+27]; // padded (extremely) + } + // printf("Slicebuffer size=%u\n",globaldims[0]*globaldims[1]*2); + for(int i=0;i<2;i++) { + if(recvreq[i]) delete recvreq[i]; + recvreq[i] = new MPI_Request[comm.numProcs()*4+7]; // padded + } + } +} + +void MPIO::asyncWrite(float *data){ + //puts("Begin AsyncWrite"); + // get things started by getting root to request initial slice + if(isRoot()){ + requestSlice(0,slicebuffer[0],recvreq[0]); + } + // comm.print("Requested First Slice"); + // now everyone sends their slice over + sendSlice(0,data,sendreq); // separate request for each slice + //comm.print("Sent First slice"); + if(isRoot()){ + file->reserveStream(IObase::Float32,3,globaldims); + } + for(int i=1;i<globaldims[2];i++){ + if(isRoot()) // if I'm root, then make request for nex slice + requestSlice(i,slicebuffer[i%2],recvreq[i%2]); + sendSlice(i,data,sendreq); // separate sendreq for each slice + if(isRoot()) { + // waitfor slice and assemble into the slicebuffer + waitForSlice(i-1,slicebuffer[(i-1)%2],slicebuffer[2],recvreq[(i-1)%2]); + // now write assembled slice to disk stream + // can make disk buffer huge to simulate recieving multiple + // slices + //fprintf(stderr,"WriteStream Slice %u\n",i-1); + file->writeStream(slicebuffer[2],globaldims[0]*globaldims[1]); + } + } + if(isRoot()){ + int n = globaldims[2]-1; + waitForSlice(n,slicebuffer[(n)%2],slicebuffer[2],recvreq[(n)%2]); + file->writeStream(slicebuffer[2],globaldims[0]*globaldims[1]); + // do requestfree (double-buffered request free-s) + //for(int i=0;i<3;i++) delete slicebuffer[i]; // free everything up + //for(int i=0;i<2;i++) delete recvreq[i]; + } + //**** At end (matching deletes) **** + // fprintf(stderr,"Cleanup proc %u\n",myid); + // comm.print("Cleanup"); + for(int n=0;n<localdims[2];n++){ + MPI_Status stat; + comm.wait(sendreq[n],stat); + } + // perhaps should be a persistent variable + // delete sendreq; + //sendreq=0; // null it out so we know we are done +} + +void MPIO::sendSlice(int z,float *data,MPI_Request *req){ + int lz = z - localorigin[2]; // find local z-offset here + // filter out bogus requests + // send even redundant information though... + if(lz<0 || lz >=localdims[2]) return; + // fprintf(stderr,"\tSendSlice: Proc %u Slice %u sndindex=%u\n", + // myid,z,lz); + comm.iSend(root,z, + localdims[0]*localdims[1], // size of the data + data+localdims[0]*localdims[1]*lz,req[lz]); +} + +int MPIO::requestSlice(int z,float *slicebuf, MPI_Request *req){ + if(!isRoot()) return 0; // only root performs this operation + // for every processor that intersects this slice boundary, + // make a request for the contents of that slice! + int reqindex =0; + for(int offset=0,p=0;p<comm.numProcs();p++){ + int idx = 3*p; + int *dims = alldims+idx; + int *origin = allorigins+idx; // need to pre-fix the origins here? + // thus to prevent mishandling of ghost zones here. + // or else have provision to support multi-reception of the same + // tile..... well actually we can do that already... + // we just have to have buckets for 2x more requests. + if(z>=origin[2] && z<origin[2]+dims[2]){ + // fprintf(stderr,"\tRequestSlice: Proc %u Slice %u reqindex=%u\n", + // p,z,reqindex); + comm.iRecv(p,z,dims[0]*dims[1],slicebuf+offset,req[reqindex++]); + // fprintf(stderr,"\tSlicebuf offset=%u of size %u ++ end=%u vs sbsize=%u %u:%u @ %u:%u\n", + // offset,dims[0]*dims[1], + // offset+dims[0]*dims[1],globaldims[0]*globaldims[1]*2, + // globaldims[0],globaldims[1],dims[0],dims[1]); + offset+=dims[0]*dims[1]; + } + } + return reqindex; +} + +/* +Problems: Slice size is too small for slice reception with ghost zones +Also doesn't get proper offset to each slice in the slice buffer (maybe ok) + + */ + +void MPIO::waitForSlice(int z,float *slicebuf, + float *destbuffer,MPI_Request *req){ + if(!isRoot()) return; + for(int p=0,reqindex=0;p<comm.numProcs();p++) { + MPI_Status stat; + int idx = 3*p; + int *dims = alldims+idx; + int *origin = allorigins+idx; // need to pre-fix the origins here? + // thus to prevent mishandling of ghost zones here. + if(z>=origin[2] && z<origin[2]+dims[2]){ + // wiat for the data transfer to complete + //fprintf(stderr,"\tWaitSlice: Proc %u Slice %u reqindex=%u\n", + // p,z,reqindex); + comm.wait(req[reqindex++],stat); // frees request object too + // now we integrate the slices together. (data unscrambling phase) + //fprintf(stderr,"\t****Now Unscramble\n"); + int offset = origin[1]*globaldims[0] + origin[0]; // initial offset for dst + for(int idx=0,pos=offset,j=0;j<dims[1];j++,pos+=(globaldims[0]-dims[0])){ + for(int i=0;i<dims[0];i++){ + destbuffer[pos++]=slicebuf[idx++]; // FUCK!!!! idx is questionable + } + } + // fprintf(stderr,"\t*****\n"); + } + } +} |