aboutsummaryrefslogtreecommitdiff
path: root/src/SockIOreader.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/SockIOreader.cc')
-rw-r--r--src/SockIOreader.cc346
1 files changed, 346 insertions, 0 deletions
diff --git a/src/SockIOreader.cc b/src/SockIOreader.cc
new file mode 100644
index 0000000..392d28a
--- /dev/null
+++ b/src/SockIOreader.cc
@@ -0,0 +1,346 @@
+#include <iostream.h>
+#include <stdio.h>
+#include <string.h>
+#include <signal.h>
+#include <setjmp.h>
+#include <sys/types.h>
+#include <sys/wait.h>
+#include "FlexArrayTmpl.H"
+
+#include "SockIOreader.hh"
+
+#if defined(T3E) || defined(__hpux)
+#define SIGNAL_CAST
+#else
+#define SIGNAL_CAST (SIG_PF)
+#endif
+
+// Globals
+FlexArray<SockIOreader*> sock_io_readers;
+void IOsigHandler(int sig);
+void reaper(int sig);
+
+// enable signals (needs a refcount)
+class SigMgr {
+ static int sigrefcount;
+ static int nwatchers;
+public:
+ static void AddObserver(){
+ nwatchers++;
+ }
+ static void RemoveObserver(){
+ nwatchers--;
+ // should check refcount status change
+ }
+ static void SigOn(){
+ //puts("sigOn");
+ sigrefcount++;
+ if(sigrefcount>nwatchers){
+ // error
+ sigrefcount=nwatchers;
+ puts("SigMgr: Error... the Signal refcount is > numwatchers");
+ puts("correcting");
+ }
+ if(sigrefcount==nwatchers){
+ // call the handlertest just in case
+ //IOsigHandler((int)SIGIO);
+ for(int i=0; i<sock_io_readers.getSize();i++){
+ //printf("poll reader %u\n",i);
+ sock_io_readers[i]->sigHandler(SIGIO);
+ }
+ signal(SIGIO,SIGNAL_CAST IOsigHandler); // reenable the signal handler
+ signal(SIGURG,SIGNAL_CAST IOsigHandler);// reenable the signal handler
+ }
+ }
+ // disable signals until elemental IO operation completes
+ static void SigOff(){
+ //puts("sigOff");
+ sigrefcount--;
+ if(sigrefcount<0){
+ puts("SigMgr::SigOff(): Signal refcount < 0... correcting");
+ sigrefcount=0;
+ }
+ signal(SIGIO,SIG_DFL); // prevent double-interrupt
+ signal(SIGURG,SIG_DFL);
+ }
+};
+
+int SigMgr::sigrefcount=0;
+int SigMgr::nwatchers=0;
+
+// If any IO event occurs, this will call handlers for all availible SockIOreaders
+// to see if they have an event to take care of.
+void IOsigHandler(int sig){
+ //puts("SigHandler: IO Interrupt");
+ SigMgr::SigOff();
+ // So here we look at the global list of SockIOreaders and pass the event on
+ for(int i=0; i<sock_io_readers.getSize();i++){
+ //printf("poll reader %u\n",i);
+ sock_io_readers[i]->sigHandler(sig);
+ }
+ SigMgr::SigOn();
+}
+
+// This gobbles up zombie children... kind of like Night of the Living Dead
+void reaper(int sig){
+ puts("Reaper: Child Interrupt");
+#ifndef T3E
+ // Eat dead children :b
+ struct rusage status;
+ int statptr;
+ while(wait3(&statptr,WNOHANG,&status)>=0) {} // get all the deadsters
+#endif
+
+ signal(SIGCHLD,SIGNAL_CAST reaper); // reenable the signal handler
+}
+
+// Alarm handler to escape from hanging socket reads (crash in mid-send)
+jmp_buf senv,tenv;
+void timeout(int sig){
+ signal(SIGALRM,SIGNAL_CAST timeout); // reset the handler
+ longjmp(tenv,1); // jump to the error recovery vector
+}
+
+/*
+void InterruptRecovery(int sig){
+ SockIOwriter::halt(); // kill app on first opportunity
+ if(SockIOwriter::sendInProgress()>0){
+ signal(SIGALRM,SIGNAL_CAST timeout); // reset the handler
+ longjmp(tenv,1);
+ }
+ else
+ exit(0);
+}
+*/
+
+// kludge
+int SockIOreader::blockingRead(void *buffer,int size){
+ int nremaining,nreceived;
+ if(!connected_client) return 0;
+ // read in 1k blocks (perhaps have 15-30sec timeout)
+ // should be able to set variable blocksize
+ for(nremaining=size;nremaining>0;nremaining-=nreceived)
+ {
+ nreceived=connected_client->read((char*)buffer,nremaining);
+ if (nreceived<1) // read error
+ return size-nremaining;
+ }
+ return size; // can return alt value if timeout occurs
+}
+//=======================================================================================
+
+void SockIOreader::sigHandler(int sig){
+ // do a select...
+ RawPort *port = master_mux.select();
+ if(port){
+ if(connected_client)
+ //pending++; // (if we didn't prefer new clients....) but....
+ delete connected_client; // has preference for new clients!!
+ // pref for new clients is good for error recovery. (eg. death of client program)
+ puts("\nOpen New Client Port\n");
+ connected_client = ServerPort.accept();
+ mux.addInport(connected_client); // add to select mux list
+ // close and reopent the file trunc
+ delete scratchfile;
+ scratchfile=new IEEEIO(scratchfilename,IObase::Write);
+ delete scratchfile;
+ scratchfile=new IEEEIO(scratchfilename,IObase::Append); // kludge to purge file
+ }
+ scratchfile->seek(scratchfile->nDatasets()-1);
+ while(mux.select()>0){
+ int dims[5];
+ char name[128];
+ int bufsize;
+ void *current_buffer;
+ blockingRead(&current_rec,sizeof(RecordHdr));
+ switch(current_rec.recordtype){
+ case 1: // data record
+ blockingRead(&current_data_rec,sizeof(DataRecordHdr));
+ blockingRead(dims,sizeof(int)*current_data_rec.rank);
+ bufsize=current_data_rec.datasize;
+ break;
+ case 2: // annotation record
+ // do nothing for now.
+ bufsize=current_rec.recordsize;
+ break;
+ case 3: // attribrecord
+ blockingRead(&current_att_rec,sizeof(AttributeRecordHdr));
+ blockingRead(name,current_att_rec.namesize);
+ bufsize=current_att_rec.datasize;
+ break;
+ }
+ current_buffer = new char[bufsize];
+ blockingRead(current_buffer,bufsize);
+ // now commit it to disk
+ switch(current_rec.recordtype){
+ case 1:
+ scratchfile->write(IObase::Int2DataType(current_data_rec.numbertype),
+ current_data_rec.rank,
+ dims,
+ current_buffer);
+ break;
+ case 2:
+ scratchfile->writeAnnotation((char*)current_buffer);
+ break;
+ case 3:
+ scratchfile->writeAttribute(name,IObase::Int2DataType(current_att_rec.numbertype),
+ bufsize/IObase::sizeOf(IObase::Int2DataType(current_att_rec.numbertype)),
+ current_buffer);
+ break;
+ }
+ delete current_buffer;
+ current_buffer=0;
+ }
+ if (current_ds > 0)
+ scratchfile->seek(current_ds);
+}
+
+//------------------------core stuff.....................
+SockIOreader::SockIOreader(CONST char *scratchfilenm,int port):IObase(scratchfilenm,IObase::Read),
+ scratchfile(0),ServerPort(port),
+ pending(0),connected_client(0){
+ current_ds = -1;
+ strcpy(scratchfilename,scratchfilenm);
+ scratchfile=new IEEEIO(scratchfilename,IObase::Write);
+ delete scratchfile;// kludge to purge file
+ scratchfile=new IEEEIO(scratchfilename,IObase::Append);
+ // sockport is open, now set up the select
+ fprintf(stderr,"ServerPort validity=%u\n",ServerPort.isAlive());
+ master_mux.addInport(&ServerPort);
+ // the master mux should always return immediately
+ // (a non-blocking select())
+ master_mux.setTimeout(0,0); // seconds and microseconds =0
+ mux.setTimeout(0,0);
+ // set signals for capturing events
+ sock_io_readers.append(this); // append to handler list
+ SigMgr::AddObserver();
+ puts("Signals On");
+ SigMgr::SigOn();
+ signal(SIGCHLD,SIGNAL_CAST reaper); // reenable the signal reaper handler
+ puts("construction completed");
+}
+
+SockIOreader::SockIOreader(CONST char *scratchfilenm,int port,int windowsize):
+ IObase(scratchfilenm,IObase::Read),
+ scratchfile(0),ServerPort(port,windowsize),
+ pending(0),connected_client(0){
+ current_ds = -1;
+ strcpy(scratchfilename,scratchfilenm);
+ scratchfile=new IEEEIO(scratchfilename,IObase::Write);
+ delete scratchfile;// kludge to purge file
+ scratchfile=new IEEEIO(scratchfilename,IObase::Append);
+ // sockport is open, now set up the select
+ fprintf(stderr,"ServerPort validity=%u\n",ServerPort.isAlive());
+ master_mux.addInport(&ServerPort);
+ // the master mux should always return immediately
+ // (a non-blocking select())
+ master_mux.setTimeout(0,0); // seconds and microseconds =0
+ mux.setTimeout(0,0);
+ // set signals for capturing events
+ sock_io_readers.append(this); // append to handler list
+ SigMgr::AddObserver();
+ puts("Signals On");
+ SigMgr::SigOn();
+ signal(SIGCHLD,SIGNAL_CAST reaper); // reenable the signal reaper handler
+ puts("construction completed");
+}
+
+SockIOreader::~SockIOreader(){
+ SigMgr::SigOff();
+ SigMgr::RemoveObserver();
+ // shut all of the sockets down
+ if(connected_client)
+ delete connected_client; // need to remove from mux list
+ connected_client=0;
+}
+
+// should check server status too
+int SockIOreader::isValid() {
+ if(scratchfile && connected_client) return 1;
+ return 0;
+}
+
+// could use overloading to differentiate type here... (but I'm going simple)
+int SockIOreader::readInfo(IObase::DataType &typeID,int &rank,int *dims,int maxdims){
+ // just hand this off to the datafile (and hope that APPEND mode works correctly)
+ SigMgr::SigOff();
+ int retval = scratchfile->readInfo(typeID,rank,dims,maxdims);
+ SigMgr::SigOn();
+ return retval;
+}
+int SockIOreader::read(void *data){
+ SigMgr::SigOff();
+ int retval = scratchfile->read(data);
+ SigMgr::SigOn();
+ return retval;
+}
+int SockIOreader::seek(int dataset_index){
+ SigMgr::SigOff();
+ current_ds=scratchfile->seek(dataset_index);
+ int retval = current_ds;
+ SigMgr::SigOn();
+ return retval;
+}
+int SockIOreader::nDatasets(){
+ SigMgr::SigOff();
+ int retval = scratchfile->nDatasets(); // must not interrupt these operations...
+ SigMgr::SigOn();
+ return retval;
+}
+int SockIOreader::readAnnotationInfo(int number,int &length){ // returns length (-1 if none left)
+ SigMgr::SigOff();
+ scratchfile->seek(current_ds);
+ int retval = scratchfile->readAnnotationInfo(number,length);
+ SigMgr::SigOn();
+ return retval;
+}
+int SockIOreader::readAnnotation(int number,char *annotation,int maxsize){
+ SigMgr::SigOff();
+ scratchfile->seek(current_ds);
+ int retval = scratchfile->readAnnotation(number,annotation,maxsize);
+ SigMgr::SigOn();
+ return retval;
+}
+int SockIOreader::nAnnotations(){
+ SigMgr::SigOff();
+ scratchfile->seek(current_ds);
+ int retval = scratchfile->nAnnotations();
+ SigMgr::SigOn();
+ return retval;
+}
+int SockIOreader::readAttributeInfo(int number,char *name,IObase::DataType &typeID,Long &nelem,int maxnamelen){
+ SigMgr::SigOff();
+ scratchfile->seek(current_ds);
+ int retval = scratchfile->readAttributeInfo(number,name,typeID,nelem,maxnamelen);
+ SigMgr::SigOn();
+ return retval;
+}
+int SockIOreader::readAttributeInfo(CONST char *name,IObase::DataType &typeID,Long &nelem){ // returns number
+ SigMgr::SigOff();
+ scratchfile->seek(current_ds);
+ int retval = scratchfile->readAttributeInfo(name,typeID,nelem);
+ SigMgr::SigOn();
+ return retval;
+}
+int SockIOreader::readAttribute(int number,void *data){
+ SigMgr::SigOff();
+ scratchfile->seek(current_ds);
+ int retval = scratchfile->readAttribute(number,data);
+ SigMgr::SigOn();
+ return retval;
+}
+int SockIOreader::nAttributes(){
+ SigMgr::SigOff();
+ scratchfile->seek(current_ds);
+ int retval = scratchfile->nAttributes();
+ SigMgr::SigOn();
+ return retval;
+}
+//-----------------Chunking Utilities..................
+int SockIOreader::readChunk(CONST int *chunkdims,CONST int *chunkorigin,void *data){
+ SigMgr::SigOff();
+ scratchfile->seek(current_ds);
+ int retval = scratchfile->readChunk(chunkdims,chunkorigin,data);
+ SigMgr::SigOn();
+ return retval;
+}