aboutsummaryrefslogtreecommitdiff
path: root/src/SockIOwriter.hh
blob: 96a622332fe32ecd7a613410c0fef6fde1d1d103 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
#ifndef __SOCKIOWRITER_HH_
#define __SOCKIOWRITER_HH_

#include "IO.hh"
#include "Arch.h"
#include "RawTCP.H"
#include "PortMux.H"

/*====================================================
class: sockIOwriter
description: This class extends the IO & IEEEIO architecture
		to write data across a newtork socket.  Please
		consult http://bach.ncsa.uiuc.edu/IEEEIO for details
		on how IEEEIO works.  This interface only works
		for *writing* data (write-only interface).
		
		The writer opens a socket to the destination address
		and then ships data to the destination processor
		whenever a write occurs.  Sending is immediate.  
		There is no local caching (all caching is on the 
		client/reader-side).  It could be double-buffered but
		this would cause problems for MPI jobs on distributed
		memory machines like the T3E and SP2.
problems: All sends are in the foreground.  This can hurt 
	application performance, but we can't afford double-buffering
	because of the memory limitations of distributed memory machines.
	(a background send would require double buffering but we
	 can't afford the memory to do that on a distributed memory
	 machine).
======================================================*/

class SockIOwriter : public IObase {
  enum RecordType {DataRecord=1,AnnotationRecord,AttributeRecord};

#ifdef	__hpux
// silly HPUX compiler won't give FileHdr access to btorder...
public:
#endif

  union btorder {
    char c[4]; // write as int, read as int
    Int  i;
  };

  struct FileHdr {
    // do magic twice so we can figure out which byte-order
    // the current machine is in addition (without hardcoding) in
    // addition to which byte order the file is in
    btorder magic;
    btorder byteorder;
    Long ndatasets;
    short majorversion;
    short minorversion;
    static int write(FileHdr &rec, RawTCPclient *clientsock){
#ifdef T3E
	// T3E pads structures and Unions (even if embedded in other structures).
	// So we can't simply write the entire structure in-place.  Must write
	// each component of the structure independently.
      // do it the stupid way first 
      // would be smarter to pack buffers really
      int r=0;
      r+=clientsock->write((char *)(rec.magic.c),4);
      r+=clientsock->write((char *)(rec.byteorder.c),4);
      r+=clientsock->write((char *)&(rec.ndatasets),4);
      // this may be incorrect, but it will work for now
      // It assumes the low order bytes will be first to
      // be written.
      r+=clientsock->write((char *)&(rec.majorversion),2);
      r+=clientsock->write((char *)&(rec.minorversion),2);
      return r;
#else
      // problem if this includes pointers to static methods
      // will require an extra degree of inheritance. (inherit
      // from a base structure.
      return clientsock->write((char *)&rec,sizeof(FileHdr));
#endif
    }
  };

  struct RecordHdr {
    Int recordtype;
    Long recordsize;
    Int sequenceID;

    static int write(RecordHdr &rec,RawTCPclient *clientsock){
#ifdef T3E
	// do it the stupid way first 
	// would be smarter to pack buffers really	
	int r=0;
	r+=clientsock->write((char *)&(rec.recordtype),4);
	r+=clientsock->write((char *)&(rec.recordsize),4);
	r+=clientsock->write((char *)&(rec.sequenceID),4);
	return r;
#else
	return clientsock->write((char *)&rec,sizeof(RecordHdr));
#endif
    }
  };

  struct DataRecordHdr {
    Long datasize;
    Int numbertype;
    Int rank;
    static int write(DataRecordHdr &rec, RawTCPclient *clientsock){
#ifdef T3E
      // do it the stupid way first 
      // would be smarter to pack buffers really	
      int r=0;
      r+=clientsock->write((char *)&(rec.datasize),4);
      r+=clientsock->write((char *)&(rec.numbertype),4);
      r+=clientsock->write((char *)&(rec.rank),4);
      return r;
#else
      return clientsock->write((char *)&rec,sizeof(DataRecordHdr));
#endif
    }

  };
	
  struct AttributeRecordHdr {
    Long datasize;
    Int numbertype;
    Int namesize; // attributes are named
 
	/// Alignment-save write
    	int write(RawTCPclient *clientsock)
      	{
		/// shouldn't there be some hton() conversion??
	int r = clientsock->write(&datasize  , 4);
	    r+= clientsock->write(&numbertype, 4);
	    r+= clientsock->write(&namesize  , 4);
	    return r;
	}

	static int size() { return 12; }
 };

  virtual int readInfo(IObase::DataType &typeID,int &rank,int *dims,int maxdims=3){ return 0; }
  virtual int read(void *data){ return 0; }
  virtual int seek(int dataset_index){ return 0; }
  virtual int nDatasets() { return 0; }
  virtual int readAnnotationInfo(int number,int &length){ return 0; } // returns length (-1 if none left)
  virtual int readAnnotation(int number,char *annotation,int maxsize=128){ return 0; }
  virtual int nAnnotations(){ return 0; } // returns number
  virtual int readAttributeInfo(int number,char *name,IObase::DataType &typeID,Long &nelem,int maxnamelen=128)
  { return 0; }
  virtual int readAttributeInfo(CONST char *name,IObase::DataType &typeID,Long &nelem){ return 0; } // returns number
  virtual int readAttribute(int number,void *data){ return 0; }
  virtual int nAttributes(){ return 0; }
  virtual int readChunk(CONST int *chunkdims,CONST int *chunkorigin,void *data){ return 0; }
  virtual int readStream(void *data,int length){return 0;}
  RawTCPclient *clientsock;
  int ndatasets,datasetnumber;
  RecordHdr current_rec;
public:
  //------------------------core stuff.....................
  SockIOwriter(CONST char *hostname,int port);
  SockIOwriter(CONST char *hostname,int port,int tcpwindowsize);
  virtual ~SockIOwriter();
  virtual int isValid();
  // could use overloading to differentiate type here... (but I'm going simple)
  virtual int write(IObase::DataType typeID,int rank,CONST int *dims,void *data);
  virtual int writeAnnotation(CONST char *annotation);
  virtual int writeAttribute(CONST char *name,IObase::DataType typeID,Long length,void *data);
  //-----------------Chunking Utilities..................
  virtual int reserveChunk(IObase::DataType typeID,int rank,CONST int *dims){
    return 0; // this will require a fourth data descriptor type
  } 
  virtual int reserveStream(IObase::DataType typeID,int rank,CONST int *dims){
    return 0; // this will require a fourth data descriptor type
  }
  virtual int writeChunk(CONST int *chunkdims,CONST int *chunkorigin,void *data){
  	// need to fix this
    return 0;
  }
  virtual int writeStream(void *data,int length){return 0;}

  RawTCPclient*TCPclient()  const { return clientsock; }
};

#endif