#include #include #include #include #include #include #include #include "FlexArrayTmpl.H" #include "SockIOreader.hh" #if defined(T3E) || defined(__hpux) #define SIGNAL_CAST #else #define SIGNAL_CAST (SIG_PF) #endif // Globals FlexArray sock_io_readers; void IOsigHandler(int sig); void reaper(int sig); // enable signals (needs a refcount) class SigMgr { static int sigrefcount; static int nwatchers; public: static void AddObserver(){ nwatchers++; } static void RemoveObserver(){ nwatchers--; // should check refcount status change } static void SigOn(){ //puts("sigOn"); sigrefcount++; if(sigrefcount>nwatchers){ // error sigrefcount=nwatchers; puts("SigMgr: Error... the Signal refcount is > numwatchers"); puts("correcting"); } if(sigrefcount==nwatchers){ // call the handlertest just in case //IOsigHandler((int)SIGIO); for(int i=0; isigHandler(SIGIO); } signal(SIGIO,SIGNAL_CAST IOsigHandler); // reenable the signal handler signal(SIGURG,SIGNAL_CAST IOsigHandler);// reenable the signal handler } } // disable signals until elemental IO operation completes static void SigOff(){ //puts("sigOff"); sigrefcount--; if(sigrefcount<0){ puts("SigMgr::SigOff(): Signal refcount < 0... correcting"); sigrefcount=0; } signal(SIGIO,SIG_DFL); // prevent double-interrupt signal(SIGURG,SIG_DFL); } }; int SigMgr::sigrefcount=0; int SigMgr::nwatchers=0; // If any IO event occurs, this will call handlers for all availible SockIOreaders // to see if they have an event to take care of. void IOsigHandler(int sig){ //puts("SigHandler: IO Interrupt"); SigMgr::SigOff(); // So here we look at the global list of SockIOreaders and pass the event on for(int i=0; isigHandler(sig); } SigMgr::SigOn(); } // This gobbles up zombie children... kind of like Night of the Living Dead void reaper(int sig){ puts("Reaper: Child Interrupt"); #ifndef T3E // Eat dead children :b struct rusage status; int statptr; while(wait3(&statptr,WNOHANG,&status)>=0) {} // get all the deadsters #endif signal(SIGCHLD,SIGNAL_CAST reaper); // reenable the signal handler } // Alarm handler to escape from hanging socket reads (crash in mid-send) jmp_buf senv,tenv; void timeout(int sig){ signal(SIGALRM,SIGNAL_CAST timeout); // reset the handler longjmp(tenv,1); // jump to the error recovery vector } /* void InterruptRecovery(int sig){ SockIOwriter::halt(); // kill app on first opportunity if(SockIOwriter::sendInProgress()>0){ signal(SIGALRM,SIGNAL_CAST timeout); // reset the handler longjmp(tenv,1); } else exit(0); } */ // kludge int SockIOreader::blockingRead(void *buffer,int size){ int nremaining,nreceived; if(!connected_client) return 0; // read in 1k blocks (perhaps have 15-30sec timeout) // should be able to set variable blocksize for(nremaining=size;nremaining>0;nremaining-=nreceived) { nreceived=connected_client->read((char*)buffer,nremaining); if (nreceived<1) // read error return size-nremaining; } return size; // can return alt value if timeout occurs } //======================================================================================= void SockIOreader::sigHandler(int sig){ // do a select... RawPort *port = master_mux.select(); if(port){ if(connected_client) //pending++; // (if we didn't prefer new clients....) but.... delete connected_client; // has preference for new clients!! // pref for new clients is good for error recovery. (eg. death of client program) puts("\nOpen New Client Port\n"); connected_client = ServerPort.accept(); mux.addInport(connected_client); // add to select mux list // close and reopent the file trunc delete scratchfile; scratchfile=new IEEEIO(scratchfilename,IObase::Write); delete scratchfile; scratchfile=new IEEEIO(scratchfilename,IObase::Append); // kludge to purge file } scratchfile->seek(scratchfile->nDatasets()-1); while(mux.select()>0){ int dims[5]; char name[128]; int bufsize; void *current_buffer; blockingRead(¤t_rec,sizeof(RecordHdr)); switch(current_rec.recordtype){ case 1: // data record blockingRead(¤t_data_rec,sizeof(DataRecordHdr)); blockingRead(dims,sizeof(int)*current_data_rec.rank); bufsize=current_data_rec.datasize; break; case 2: // annotation record // do nothing for now. bufsize=current_rec.recordsize; break; case 3: // attribrecord blockingRead(¤t_att_rec,sizeof(AttributeRecordHdr)); blockingRead(name,current_att_rec.namesize); bufsize=current_att_rec.datasize; break; } current_buffer = new char[bufsize]; blockingRead(current_buffer,bufsize); // now commit it to disk switch(current_rec.recordtype){ case 1: scratchfile->write(IObase::Int2DataType(current_data_rec.numbertype), current_data_rec.rank, dims, current_buffer); break; case 2: scratchfile->writeAnnotation((char*)current_buffer); break; case 3: scratchfile->writeAttribute(name,IObase::Int2DataType(current_att_rec.numbertype), bufsize/IObase::sizeOf(IObase::Int2DataType(current_att_rec.numbertype)), current_buffer); break; } delete current_buffer; current_buffer=0; } if (current_ds > 0) scratchfile->seek(current_ds); } //------------------------core stuff..................... SockIOreader::SockIOreader(CONST char *scratchfilenm,int port):IObase(scratchfilenm,IObase::Read), scratchfile(0),ServerPort(port), pending(0),connected_client(0){ current_ds = -1; strcpy(scratchfilename,scratchfilenm); scratchfile=new IEEEIO(scratchfilename,IObase::Write); delete scratchfile;// kludge to purge file scratchfile=new IEEEIO(scratchfilename,IObase::Append); // sockport is open, now set up the select fprintf(stderr,"ServerPort validity=%u\n",ServerPort.isAlive()); master_mux.addInport(&ServerPort); // the master mux should always return immediately // (a non-blocking select()) master_mux.setTimeout(0,0); // seconds and microseconds =0 mux.setTimeout(0,0); // set signals for capturing events sock_io_readers.append(this); // append to handler list SigMgr::AddObserver(); puts("Signals On"); SigMgr::SigOn(); signal(SIGCHLD,SIGNAL_CAST reaper); // reenable the signal reaper handler puts("construction completed"); } SockIOreader::SockIOreader(CONST char *scratchfilenm,int port,int windowsize): IObase(scratchfilenm,IObase::Read), scratchfile(0),ServerPort(port,windowsize), pending(0),connected_client(0){ current_ds = -1; strcpy(scratchfilename,scratchfilenm); scratchfile=new IEEEIO(scratchfilename,IObase::Write); delete scratchfile;// kludge to purge file scratchfile=new IEEEIO(scratchfilename,IObase::Append); // sockport is open, now set up the select fprintf(stderr,"ServerPort validity=%u\n",ServerPort.isAlive()); master_mux.addInport(&ServerPort); // the master mux should always return immediately // (a non-blocking select()) master_mux.setTimeout(0,0); // seconds and microseconds =0 mux.setTimeout(0,0); // set signals for capturing events sock_io_readers.append(this); // append to handler list SigMgr::AddObserver(); puts("Signals On"); SigMgr::SigOn(); signal(SIGCHLD,SIGNAL_CAST reaper); // reenable the signal reaper handler puts("construction completed"); } SockIOreader::~SockIOreader(){ SigMgr::SigOff(); SigMgr::RemoveObserver(); // shut all of the sockets down if(connected_client) delete connected_client; // need to remove from mux list connected_client=0; } // should check server status too int SockIOreader::isValid() { if(scratchfile && connected_client) return 1; return 0; } // could use overloading to differentiate type here... (but I'm going simple) int SockIOreader::readInfo(IObase::DataType &typeID,int &rank,int *dims,int maxdims){ // just hand this off to the datafile (and hope that APPEND mode works correctly) SigMgr::SigOff(); int retval = scratchfile->readInfo(typeID,rank,dims,maxdims); SigMgr::SigOn(); return retval; } int SockIOreader::read(void *data){ SigMgr::SigOff(); int retval = scratchfile->read(data); SigMgr::SigOn(); return retval; } int SockIOreader::seek(int dataset_index){ SigMgr::SigOff(); current_ds=scratchfile->seek(dataset_index); int retval = current_ds; SigMgr::SigOn(); return retval; } int SockIOreader::nDatasets(){ SigMgr::SigOff(); int retval = scratchfile->nDatasets(); // must not interrupt these operations... SigMgr::SigOn(); return retval; } int SockIOreader::readAnnotationInfo(int number,int &length){ // returns length (-1 if none left) SigMgr::SigOff(); scratchfile->seek(current_ds); int retval = scratchfile->readAnnotationInfo(number,length); SigMgr::SigOn(); return retval; } int SockIOreader::readAnnotation(int number,char *annotation,int maxsize){ SigMgr::SigOff(); scratchfile->seek(current_ds); int retval = scratchfile->readAnnotation(number,annotation,maxsize); SigMgr::SigOn(); return retval; } int SockIOreader::nAnnotations(){ SigMgr::SigOff(); scratchfile->seek(current_ds); int retval = scratchfile->nAnnotations(); SigMgr::SigOn(); return retval; } int SockIOreader::readAttributeInfo(int number,char *name,IObase::DataType &typeID,Long &nelem,int maxnamelen){ SigMgr::SigOff(); scratchfile->seek(current_ds); int retval = scratchfile->readAttributeInfo(number,name,typeID,nelem,maxnamelen); SigMgr::SigOn(); return retval; } int SockIOreader::readAttributeInfo(CONST char *name,IObase::DataType &typeID,Long &nelem){ // returns number SigMgr::SigOff(); scratchfile->seek(current_ds); int retval = scratchfile->readAttributeInfo(name,typeID,nelem); SigMgr::SigOn(); return retval; } int SockIOreader::readAttribute(int number,void *data){ SigMgr::SigOff(); scratchfile->seek(current_ds); int retval = scratchfile->readAttribute(number,data); SigMgr::SigOn(); return retval; } int SockIOreader::nAttributes(){ SigMgr::SigOff(); scratchfile->seek(current_ds); int retval = scratchfile->nAttributes(); SigMgr::SigOn(); return retval; } //-----------------Chunking Utilities.................. int SockIOreader::readChunk(CONST int *chunkdims,CONST int *chunkorigin,void *data){ SigMgr::SigOff(); scratchfile->seek(current_ds); int retval = scratchfile->readChunk(chunkdims,chunkorigin,data); SigMgr::SigOn(); return retval; }