diff options
Diffstat (limited to 'src/SockIOreader.cc')
-rw-r--r-- | src/SockIOreader.cc | 346 |
1 files changed, 346 insertions, 0 deletions
diff --git a/src/SockIOreader.cc b/src/SockIOreader.cc new file mode 100644 index 0000000..392d28a --- /dev/null +++ b/src/SockIOreader.cc @@ -0,0 +1,346 @@ +#include <iostream.h> +#include <stdio.h> +#include <string.h> +#include <signal.h> +#include <setjmp.h> +#include <sys/types.h> +#include <sys/wait.h> +#include "FlexArrayTmpl.H" + +#include "SockIOreader.hh" + +#if defined(T3E) || defined(__hpux) +#define SIGNAL_CAST +#else +#define SIGNAL_CAST (SIG_PF) +#endif + +// Globals +FlexArray<SockIOreader*> sock_io_readers; +void IOsigHandler(int sig); +void reaper(int sig); + +// enable signals (needs a refcount) +class SigMgr { + static int sigrefcount; + static int nwatchers; +public: + static void AddObserver(){ + nwatchers++; + } + static void RemoveObserver(){ + nwatchers--; + // should check refcount status change + } + static void SigOn(){ + //puts("sigOn"); + sigrefcount++; + if(sigrefcount>nwatchers){ + // error + sigrefcount=nwatchers; + puts("SigMgr: Error... the Signal refcount is > numwatchers"); + puts("correcting"); + } + if(sigrefcount==nwatchers){ + // call the handlertest just in case + //IOsigHandler((int)SIGIO); + for(int i=0; i<sock_io_readers.getSize();i++){ + //printf("poll reader %u\n",i); + sock_io_readers[i]->sigHandler(SIGIO); + } + signal(SIGIO,SIGNAL_CAST IOsigHandler); // reenable the signal handler + signal(SIGURG,SIGNAL_CAST IOsigHandler);// reenable the signal handler + } + } + // disable signals until elemental IO operation completes + static void SigOff(){ + //puts("sigOff"); + sigrefcount--; + if(sigrefcount<0){ + puts("SigMgr::SigOff(): Signal refcount < 0... correcting"); + sigrefcount=0; + } + signal(SIGIO,SIG_DFL); // prevent double-interrupt + signal(SIGURG,SIG_DFL); + } +}; + +int SigMgr::sigrefcount=0; +int SigMgr::nwatchers=0; + +// If any IO event occurs, this will call handlers for all availible SockIOreaders +// to see if they have an event to take care of. +void IOsigHandler(int sig){ + //puts("SigHandler: IO Interrupt"); + SigMgr::SigOff(); + // So here we look at the global list of SockIOreaders and pass the event on + for(int i=0; i<sock_io_readers.getSize();i++){ + //printf("poll reader %u\n",i); + sock_io_readers[i]->sigHandler(sig); + } + SigMgr::SigOn(); +} + +// This gobbles up zombie children... kind of like Night of the Living Dead +void reaper(int sig){ + puts("Reaper: Child Interrupt"); +#ifndef T3E + // Eat dead children :b + struct rusage status; + int statptr; + while(wait3(&statptr,WNOHANG,&status)>=0) {} // get all the deadsters +#endif + + signal(SIGCHLD,SIGNAL_CAST reaper); // reenable the signal handler +} + +// Alarm handler to escape from hanging socket reads (crash in mid-send) +jmp_buf senv,tenv; +void timeout(int sig){ + signal(SIGALRM,SIGNAL_CAST timeout); // reset the handler + longjmp(tenv,1); // jump to the error recovery vector +} + +/* +void InterruptRecovery(int sig){ + SockIOwriter::halt(); // kill app on first opportunity + if(SockIOwriter::sendInProgress()>0){ + signal(SIGALRM,SIGNAL_CAST timeout); // reset the handler + longjmp(tenv,1); + } + else + exit(0); +} +*/ + +// kludge +int SockIOreader::blockingRead(void *buffer,int size){ + int nremaining,nreceived; + if(!connected_client) return 0; + // read in 1k blocks (perhaps have 15-30sec timeout) + // should be able to set variable blocksize + for(nremaining=size;nremaining>0;nremaining-=nreceived) + { + nreceived=connected_client->read((char*)buffer,nremaining); + if (nreceived<1) // read error + return size-nremaining; + } + return size; // can return alt value if timeout occurs +} +//======================================================================================= + +void SockIOreader::sigHandler(int sig){ + // do a select... + RawPort *port = master_mux.select(); + if(port){ + if(connected_client) + //pending++; // (if we didn't prefer new clients....) but.... + delete connected_client; // has preference for new clients!! + // pref for new clients is good for error recovery. (eg. death of client program) + puts("\nOpen New Client Port\n"); + connected_client = ServerPort.accept(); + mux.addInport(connected_client); // add to select mux list + // close and reopent the file trunc + delete scratchfile; + scratchfile=new IEEEIO(scratchfilename,IObase::Write); + delete scratchfile; + scratchfile=new IEEEIO(scratchfilename,IObase::Append); // kludge to purge file + } + scratchfile->seek(scratchfile->nDatasets()-1); + while(mux.select()>0){ + int dims[5]; + char name[128]; + int bufsize; + void *current_buffer; + blockingRead(¤t_rec,sizeof(RecordHdr)); + switch(current_rec.recordtype){ + case 1: // data record + blockingRead(¤t_data_rec,sizeof(DataRecordHdr)); + blockingRead(dims,sizeof(int)*current_data_rec.rank); + bufsize=current_data_rec.datasize; + break; + case 2: // annotation record + // do nothing for now. + bufsize=current_rec.recordsize; + break; + case 3: // attribrecord + blockingRead(¤t_att_rec,sizeof(AttributeRecordHdr)); + blockingRead(name,current_att_rec.namesize); + bufsize=current_att_rec.datasize; + break; + } + current_buffer = new char[bufsize]; + blockingRead(current_buffer,bufsize); + // now commit it to disk + switch(current_rec.recordtype){ + case 1: + scratchfile->write(IObase::Int2DataType(current_data_rec.numbertype), + current_data_rec.rank, + dims, + current_buffer); + break; + case 2: + scratchfile->writeAnnotation((char*)current_buffer); + break; + case 3: + scratchfile->writeAttribute(name,IObase::Int2DataType(current_att_rec.numbertype), + bufsize/IObase::sizeOf(IObase::Int2DataType(current_att_rec.numbertype)), + current_buffer); + break; + } + delete current_buffer; + current_buffer=0; + } + if (current_ds > 0) + scratchfile->seek(current_ds); +} + +//------------------------core stuff..................... +SockIOreader::SockIOreader(CONST char *scratchfilenm,int port):IObase(scratchfilenm,IObase::Read), + scratchfile(0),ServerPort(port), + pending(0),connected_client(0){ + current_ds = -1; + strcpy(scratchfilename,scratchfilenm); + scratchfile=new IEEEIO(scratchfilename,IObase::Write); + delete scratchfile;// kludge to purge file + scratchfile=new IEEEIO(scratchfilename,IObase::Append); + // sockport is open, now set up the select + fprintf(stderr,"ServerPort validity=%u\n",ServerPort.isAlive()); + master_mux.addInport(&ServerPort); + // the master mux should always return immediately + // (a non-blocking select()) + master_mux.setTimeout(0,0); // seconds and microseconds =0 + mux.setTimeout(0,0); + // set signals for capturing events + sock_io_readers.append(this); // append to handler list + SigMgr::AddObserver(); + puts("Signals On"); + SigMgr::SigOn(); + signal(SIGCHLD,SIGNAL_CAST reaper); // reenable the signal reaper handler + puts("construction completed"); +} + +SockIOreader::SockIOreader(CONST char *scratchfilenm,int port,int windowsize): + IObase(scratchfilenm,IObase::Read), + scratchfile(0),ServerPort(port,windowsize), + pending(0),connected_client(0){ + current_ds = -1; + strcpy(scratchfilename,scratchfilenm); + scratchfile=new IEEEIO(scratchfilename,IObase::Write); + delete scratchfile;// kludge to purge file + scratchfile=new IEEEIO(scratchfilename,IObase::Append); + // sockport is open, now set up the select + fprintf(stderr,"ServerPort validity=%u\n",ServerPort.isAlive()); + master_mux.addInport(&ServerPort); + // the master mux should always return immediately + // (a non-blocking select()) + master_mux.setTimeout(0,0); // seconds and microseconds =0 + mux.setTimeout(0,0); + // set signals for capturing events + sock_io_readers.append(this); // append to handler list + SigMgr::AddObserver(); + puts("Signals On"); + SigMgr::SigOn(); + signal(SIGCHLD,SIGNAL_CAST reaper); // reenable the signal reaper handler + puts("construction completed"); +} + +SockIOreader::~SockIOreader(){ + SigMgr::SigOff(); + SigMgr::RemoveObserver(); + // shut all of the sockets down + if(connected_client) + delete connected_client; // need to remove from mux list + connected_client=0; +} + +// should check server status too +int SockIOreader::isValid() { + if(scratchfile && connected_client) return 1; + return 0; +} + +// could use overloading to differentiate type here... (but I'm going simple) +int SockIOreader::readInfo(IObase::DataType &typeID,int &rank,int *dims,int maxdims){ + // just hand this off to the datafile (and hope that APPEND mode works correctly) + SigMgr::SigOff(); + int retval = scratchfile->readInfo(typeID,rank,dims,maxdims); + SigMgr::SigOn(); + return retval; +} +int SockIOreader::read(void *data){ + SigMgr::SigOff(); + int retval = scratchfile->read(data); + SigMgr::SigOn(); + return retval; +} +int SockIOreader::seek(int dataset_index){ + SigMgr::SigOff(); + current_ds=scratchfile->seek(dataset_index); + int retval = current_ds; + SigMgr::SigOn(); + return retval; +} +int SockIOreader::nDatasets(){ + SigMgr::SigOff(); + int retval = scratchfile->nDatasets(); // must not interrupt these operations... + SigMgr::SigOn(); + return retval; +} +int SockIOreader::readAnnotationInfo(int number,int &length){ // returns length (-1 if none left) + SigMgr::SigOff(); + scratchfile->seek(current_ds); + int retval = scratchfile->readAnnotationInfo(number,length); + SigMgr::SigOn(); + return retval; +} +int SockIOreader::readAnnotation(int number,char *annotation,int maxsize){ + SigMgr::SigOff(); + scratchfile->seek(current_ds); + int retval = scratchfile->readAnnotation(number,annotation,maxsize); + SigMgr::SigOn(); + return retval; +} +int SockIOreader::nAnnotations(){ + SigMgr::SigOff(); + scratchfile->seek(current_ds); + int retval = scratchfile->nAnnotations(); + SigMgr::SigOn(); + return retval; +} +int SockIOreader::readAttributeInfo(int number,char *name,IObase::DataType &typeID,Long &nelem,int maxnamelen){ + SigMgr::SigOff(); + scratchfile->seek(current_ds); + int retval = scratchfile->readAttributeInfo(number,name,typeID,nelem,maxnamelen); + SigMgr::SigOn(); + return retval; +} +int SockIOreader::readAttributeInfo(CONST char *name,IObase::DataType &typeID,Long &nelem){ // returns number + SigMgr::SigOff(); + scratchfile->seek(current_ds); + int retval = scratchfile->readAttributeInfo(name,typeID,nelem); + SigMgr::SigOn(); + return retval; +} +int SockIOreader::readAttribute(int number,void *data){ + SigMgr::SigOff(); + scratchfile->seek(current_ds); + int retval = scratchfile->readAttribute(number,data); + SigMgr::SigOn(); + return retval; +} +int SockIOreader::nAttributes(){ + SigMgr::SigOff(); + scratchfile->seek(current_ds); + int retval = scratchfile->nAttributes(); + SigMgr::SigOn(); + return retval; +} +//-----------------Chunking Utilities.................. +int SockIOreader::readChunk(CONST int *chunkdims,CONST int *chunkorigin,void *data){ + SigMgr::SigOff(); + scratchfile->seek(current_ds); + int retval = scratchfile->readChunk(chunkdims,chunkorigin,data); + SigMgr::SigOn(); + return retval; +} |