1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
|
#ifndef __SOCKIOWRITER_HH_
#define __SOCKIOWRITER_HH_
#include "IO.hh"
#include "Arch.h"
#include "RawTCP.H"
#include "PortMux.H"
/*====================================================
class: sockIOwriter
description: This class extends the IO & IEEEIO architecture
to write data across a newtork socket. Please
consult http://bach.ncsa.uiuc.edu/IEEEIO for details
on how IEEEIO works. This interface only works
for *writing* data (write-only interface).
The writer opens a socket to the destination address
and then ships data to the destination processor
whenever a write occurs. Sending is immediate.
There is no local caching (all caching is on the
client/reader-side). It could be double-buffered but
this would cause problems for MPI jobs on distributed
memory machines like the T3E and SP2.
problems: All sends are in the foreground. This can hurt
application performance, but we can't afford double-buffering
because of the memory limitations of distributed memory machines.
(a background send would require double buffering but we
can't afford the memory to do that on a distributed memory
machine).
======================================================*/
class SockIOwriter : public IObase {
enum RecordType {DataRecord=1,AnnotationRecord,AttributeRecord};
#ifdef __hpux
// silly HPUX compiler won't give FileHdr access to btorder...
public:
#endif
union btorder {
char c[4]; // write as int, read as int
Int i;
};
struct FileHdr {
// do magic twice so we can figure out which byte-order
// the current machine is in addition (without hardcoding) in
// addition to which byte order the file is in
btorder magic;
btorder byteorder;
Long ndatasets;
short majorversion;
short minorversion;
static int write(FileHdr &rec, RawTCPclient *clientsock){
#ifdef T3E
// T3E pads structures and Unions (even if embedded in other structures).
// So we can't simply write the entire structure in-place. Must write
// each component of the structure independently.
// do it the stupid way first
// would be smarter to pack buffers really
int r=0;
r+=clientsock->write((char *)(rec.magic.c),4);
r+=clientsock->write((char *)(rec.byteorder.c),4);
r+=clientsock->write((char *)&(rec.ndatasets),4);
// this may be incorrect, but it will work for now
// It assumes the low order bytes will be first to
// be written.
r+=clientsock->write((char *)&(rec.majorversion),2);
r+=clientsock->write((char *)&(rec.minorversion),2);
return r;
#else
// problem if this includes pointers to static methods
// will require an extra degree of inheritance. (inherit
// from a base structure.
return clientsock->write((char *)&rec,sizeof(FileHdr));
#endif
}
};
struct RecordHdr {
Int recordtype;
Long recordsize;
Int sequenceID;
static int write(RecordHdr &rec,RawTCPclient *clientsock){
#ifdef T3E
// do it the stupid way first
// would be smarter to pack buffers really
int r=0;
r+=clientsock->write((char *)&(rec.recordtype),4);
r+=clientsock->write((char *)&(rec.recordsize),4);
r+=clientsock->write((char *)&(rec.sequenceID),4);
return r;
#else
return clientsock->write((char *)&rec,sizeof(RecordHdr));
#endif
}
};
struct DataRecordHdr {
Long datasize;
Int numbertype;
Int rank;
static int write(DataRecordHdr &rec, RawTCPclient *clientsock){
#ifdef T3E
// do it the stupid way first
// would be smarter to pack buffers really
int r=0;
r+=clientsock->write((char *)&(rec.datasize),4);
r+=clientsock->write((char *)&(rec.numbertype),4);
r+=clientsock->write((char *)&(rec.rank),4);
return r;
#else
return clientsock->write((char *)&rec,sizeof(DataRecordHdr));
#endif
}
};
struct AttributeRecordHdr {
Long datasize;
Int numbertype;
Int namesize; // attributes are named
/// Alignment-save write
int write(RawTCPclient *clientsock)
{
/// shouldn't there be some hton() conversion??
int r = clientsock->write(&datasize , 4);
r+= clientsock->write(&numbertype, 4);
r+= clientsock->write(&namesize , 4);
return r;
}
static int size() { return 12; }
};
virtual int readInfo(IObase::DataType &typeID,int &rank,int *dims,int maxdims=3){ return 0; }
virtual int read(void *data){ return 0; }
virtual int seek(int dataset_index){ return 0; }
virtual int nDatasets() { return 0; }
virtual int readAnnotationInfo(int number,int &length){ return 0; } // returns length (-1 if none left)
virtual int readAnnotation(int number,char *annotation,int maxsize=128){ return 0; }
virtual int nAnnotations(){ return 0; } // returns number
virtual int readAttributeInfo(int number,char *name,IObase::DataType &typeID,Long &nelem,int maxnamelen=128)
{ return 0; }
virtual int readAttributeInfo(CONST char *name,IObase::DataType &typeID,Long &nelem){ return 0; } // returns number
virtual int readAttribute(int number,void *data){ return 0; }
virtual int nAttributes(){ return 0; }
virtual int readChunk(CONST int *chunkdims,CONST int *chunkorigin,void *data){ return 0; }
virtual int readStream(void *data,int length){return 0;}
RawTCPclient *clientsock;
int ndatasets,datasetnumber;
RecordHdr current_rec;
public:
//------------------------core stuff.....................
SockIOwriter(CONST char *hostname,int port);
SockIOwriter(CONST char *hostname,int port,int tcpwindowsize);
virtual ~SockIOwriter();
virtual int isValid();
// could use overloading to differentiate type here... (but I'm going simple)
virtual int write(IObase::DataType typeID,int rank,CONST int *dims,const void *data);
virtual int writeAnnotation(CONST char *annotation);
virtual int writeAttribute(CONST char *name,IObase::DataType typeID,Long length,const const void *data);
//-----------------Chunking Utilities..................
virtual int reserveChunk(IObase::DataType typeID,int rank,CONST int *dims){
return 0; // this will require a fourth data descriptor type
}
virtual int reserveStream(IObase::DataType typeID,int rank,CONST int *dims){
return 0; // this will require a fourth data descriptor type
}
virtual int writeChunk(CONST int *chunkdims,CONST int *chunkorigin,const void *data){
// need to fix this
return 0;
}
virtual int writeStream(const void *data,int length){return 0;}
RawTCPclient*TCPclient() const { return clientsock; }
};
#endif
|