aboutsummaryrefslogtreecommitdiff
path: root/src/MPIO.slow.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/MPIO.slow.cc')
-rw-r--r--src/MPIO.slow.cc202
1 files changed, 202 insertions, 0 deletions
diff --git a/src/MPIO.slow.cc b/src/MPIO.slow.cc
new file mode 100644
index 0000000..8675d06
--- /dev/null
+++ b/src/MPIO.slow.cc
@@ -0,0 +1,202 @@
+#include <stdio.h>
+#include "MPIutils.hh"
+#include "MPIO.hh"
+
+void MPIO::sendSlice(int z,float *data,MPI_Request *req){
+ int iz = z - localorigin[2];
+ if(iz<0 || iz>=localdims[2]) return;
+ //printf("PE(%u) send slice z=%u. iz=%u\n",myid,z,iz);
+ // z == the tag
+ //printf("localdims[0]*localdims[1]=%u iz=%u\n",localdims[0]*localdims[1],iz);
+ // if(isRoot())
+ //comm.iSend(root,z,localdims[0]*localdims[1],
+ // data+localdims[0]*localdims[1]*iz,req[iz]);
+ //else
+ comm.isSend(root,z,localdims[0]*localdims[1],
+ data+localdims[0]*localdims[1]*iz,req[iz]);
+ //for(int i=0;i<4;i++) //printf("\tslicedata[%u]@z(%u) = %f\n",i,iz,(data+localdims[0]*localdims[1]*iz)[i]);
+ //printf("PE(%u) send %u complete\n",myid,iz);
+}
+
+void MPIO::requestSlice(int z,float *slicebuffer,MPI_Request *req){
+ // skip to z
+ //loop (foreach p of pe's)
+ if(!isRoot()) return;
+ //printf("PE(%u) request slice %u\n",myid,z);
+ for(int offset=0,p=0,reqindex=0;p<comm.numProcs();p++) {
+ // if within z-range, compute nelem based on dims at p
+ int dindex = 3*p;
+ int *idims = gdims + dindex;
+ int *iorigin = gorigins + dindex;
+ int iz = z-iorigin[2];
+ if(iz<0 || iz>=idims[2]) continue;
+ comm.iRecv(p,z,idims[0]*idims[1],slicebuffer+offset,req[reqindex++]);
+ //printf("reqslice(%u):MPI_Request=%u\n",iz,req[reqindex-1]);
+ offset+=idims[0]*idims[1];
+ }
+ // break if outside of z-range (dangerous if non-canonical proc layout)
+}
+
+void MPIO::waitForSlice(int z,float *slicebuffer,float *destbuffer,MPI_Request *req){
+ if(!isRoot()) return;
+ // could do this 8k at a time
+ // loop for each p of pe's
+ // if within z-range, compute nelem based on dims at p
+ // wait for current request.
+ // re-copy based on offset & origin
+ sliceallwait.start();
+ for(int p=0,reqindex=0;p<comm.numProcs();p++) {
+ // if within z-rang e, compute nelem based on dims at p
+ int dindex = 3*p;
+ int *idims = gdims + dindex;
+ int *iorigin = gorigins + dindex;
+ int iz = z-iorigin[2];
+ if(iz<0 || iz>=idims[2]) continue;
+ MPI_Status stat;
+ //printf("waitfor(%u):MPI_Request=%u\n",iz,req[reqindex]);
+ slicewait.start();
+ comm.wait(req[reqindex++],stat); // frees request object too
+ slicewait.stop();
+ // get information out of the status object!!!!
+ slicecollect.start();
+ int offset = iorigin[1]*globaldims[0] + iorigin[0];
+ for(int idx=0,pos=offset,j=0;j<idims[1];j++,pos+=(globaldims[0]-idims[0]))
+ for(int i=0;i<idims[0];i++)
+ destbuffer[pos++]=slicebuffer[idx++];
+ slicecollect.stop();
+ }
+ sliceallwait.stop();
+ // proclist array
+
+}
+
+MPIO::MPIO(IObase *io,MPIcomm &c):file(io),comm(c){
+ for(int i=0;i<3;i++) globaldims[i]=localdims[i]=localorigin[i]=0;
+ int *rootarray;
+ myid = comm.rank();
+ //printf("my rank = %u\n",myid);
+ if(!myid) rootarray = new int[comm.numProcs()];
+ int rootscale = file?1:0;
+ //printf("rootscale[pid=%u] = %u\n",myid,rootscale);
+ comm.gather(0,1,&rootscale,rootarray); // gather to 0
+ globaldims[0]=globaldims[1]=globaldims[2]=0;
+ if(!myid){
+ for(int count=0,p=0;p<comm.numProcs();p++){
+ //printf("rootarray[%u]=%u\n",p,rootarray[p]);
+ if(rootarray[p]) count++;
+ }
+ if(count<=0 || count>1){
+ fprintf(stderr,"MPIO only accepts a single IO node right now.\n");
+ fprintf(stderr,"You chose %u io nodes\n",count);
+ fprintf(stderr,"I will select only the first one\n");
+ }
+ for(root=-1,p=0;p<comm.numProcs() && root<0;p++)
+ if(rootarray[p]) root=p;
+ delete rootarray;
+ }
+ comm.bcast(0,root); // now everyone knows root
+ //printf("broadcasting root node = %u\n",root);
+}
+
+MPIO::~MPIO(){
+ char buffer[32];
+ sprintf(buffer,"PE(%3u)",comm.rank());
+ if(myid==root) {
+ sprintf(buffer+7,"slicewait"); slicewait.print(buffer);
+ sprintf(buffer+7,"slicewrite"); slicewrite.print(buffer);
+ sprintf(buffer+7,"slicecollect"); slicecollect.print(buffer);
+ //sprintf(buffer+7,"sliceselect"); sliceselect.print(buffer);
+ sprintf(buffer+7,"sliceallwait"); sliceallwait.print(buffer);
+ }
+}
+
+void MPIO::setLocalDims(int rank,int *origin, int *dims){
+ if(rank!=3) perror("MPIO is only for 3D IO for now (sorry)!");
+ if(isRoot()){
+ gorigins = new int[3*comm.numProcs()];
+ gdims = new int[3*comm.numProcs()];
+ }
+ else gdims=gorigins=0; // null for everyone else
+ for(int i=0;i<3;i++) {
+ //printf("localdims-n-origin[%u] = %u:%u\n",i,dims[i],origin[i]);
+ localdims[i]=dims[i];
+ localorigin[i]=origin[i];
+ }
+ comm.gather(root,3,localdims,gdims);
+ comm.gather(root,3,localorigin,gorigins);
+ //if(isRoot()) for(i=0;i<3*comm.numProcs();i++)
+ //printf("gdims-n-origin[%u] = %u:%u\n",i,gdims[i],gorigins[i]);
+ globaldims[0]=globaldims[1]=globaldims[2]=0;
+ if(isRoot())
+ for(int p=0,last=3*comm.numProcs();p<last;p++){
+ //printf("globaldims[%u]: gdims[%u]+gorigins[%u] = %u\n",
+ // p%3,p,p,gdims[p],gorigins[p]);
+ if(globaldims[p%3] < gdims[p]+gorigins[p])
+ globaldims[p%3] = gdims[p]+gorigins[p];
+ }
+ // rebroadcast globaldims to all PE's
+ comm.bcast(root,3,globaldims);
+}
+
+int MPIO::write(IObase::DataType type,int rank,int *dims,void *data){
+ int recalc_layout=0;
+ if(rank!=3) recalc_layout=1;
+ for(int i=0;i<rank;i++) if(localdims[i]!=dims[i]) recalc_layout=1;
+ if(recalc_layout) setLocalDims(rank,localorigin,dims);
+ switch(type){
+ case IObase::Float32:
+ write((float*)data);
+ break;
+ case IObase::Float64:
+ default:
+ break;
+ }
+ return 1; // for now, no error checking
+}
+
+void MPIO::write(float *data){
+ MPI_Request *sendreq = new MPI_Request[localdims[2]];
+ typedef float *floatP;
+ int sliceindex=0;
+ floatP slicebuffer[3]; // double-buffer the slices as they arrive
+ typedef MPI_Request *MPI_RequestP;
+ MPI_RequestP recvreq[2];
+ // the third one is a scratch buffer for reorganizing the slice
+ if(isRoot()){
+ for(int i=0;i<3;i++) slicebuffer[i] = new float[globaldims[0]*globaldims[1]];
+ for(i=0;i<2;i++) recvreq[i] = new MPI_Request[comm.numProcs()];
+ }
+ if(isRoot()){
+ // puts("request initial slice");
+ requestSlice(0,slicebuffer[0],recvreq[0]);
+ }
+ // not good! Everyone is sending a slice here!
+ sendSlice(0,data,sendreq);
+ // hide latency behind the costly io->reserveChunk operation
+ if(isRoot()) file->reserveStream(IObase::Float32,3,globaldims);
+ int writecount=0;
+ for(int i=1;i<globaldims[2];i++){
+ // comm.barrier();
+ if(isRoot())
+ requestSlice(i,slicebuffer[i%2],recvreq[i%2]);
+ sendSlice(i,data,sendreq); // jms
+ if(isRoot()) {
+ waitForSlice(i-1,slicebuffer[(i-1)%2],slicebuffer[2],recvreq[(i-1)%2]);
+ writecount+=globaldims[0]*globaldims[1];
+ slicewrite.start();
+ file->writeStream(slicebuffer[2],globaldims[0]*globaldims[1]);
+ slicewrite.stop();
+ }
+ }
+ if(isRoot()){
+ waitForSlice(i-1,slicebuffer[(i-1)%2],slicebuffer[2],recvreq[(i-1)%2]);
+ file->writeStream(slicebuffer[2],globaldims[0]*globaldims[1]);
+ writecount+=globaldims[0]*globaldims[1];
+ // do requestfree (double-buffered request free-s)
+ for(i=0;i<3;i++) delete slicebuffer[i]; // free everything up
+ for(i=0;i<2;i++) delete recvreq[i];
+ }
+ MPI_Status stat;
+ for(i=0;i<localdims[2];i++) comm.wait(sendreq[i],stat);
+ delete sendreq;
+}