1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
|
#ifndef MPIFS_dot_h
#define MPIFS_dot_h
#include "definitions.h"
#include "VirtFS.h"
#include "mpi.h"
#include "App_Info.h"
#include "List.h"
#include "Attribute.h"
class Collective_IO;
class Array;
class MPIFS : public VirtFS {
int node_type_; /* compute,io,part_time .. */
int world_rank_; /* rank in MPI_COMM_WORLD */
int app_num_; /* for io-nodes this should be 0 */
int app_rank_; /* rank within the applications */
int app_size_; /* size of the application */
int master_io_node_;
MPI_Comm *comm_;
App_Info *app_info_; /* rank --> world mapping */
/* Information used by the IO nodes */
int num_apps_; /* # of compute apps */
int num_apps_alive_;
int current_max_app_num_;
int global_barrier_count_;
App_Info **compute_apps_info_;
char *mem_buf_;
int mem_buf_size_;
int num_open_files_;
IOFile open_file_ptrs_[1000];
char *open_file_names_[1000];
Boolean is_new_file_[1000];
/* Information required for part-time nodes */
int io_app_num_;
int io_app_rank_;
int io_app_size_;
MPI_Comm *io_comm_;
App_Info *io_app_info_;
void do_init(int,int,int,int,int*);
void do_init(int,int,int,int*, int,int,int*);
void wait_for_next_message(int*,int*,int*, MPI_Status*);
void process_io_special_message(int,int, MPI_Status*);
void cleanfiles(int);
void createfiles(int);
void flushfiles(int);
void insert_compute_app(int , App_Info*);
App_Info* find_compute_app(int);
Boolean received_quit_message(int,int,MPI_Status*);
public:
MPIFS(int,int,int, int,int*);
MPIFS(int,int,int,int,int*, Boolean);
MPIFS(int,int,int,int*,int,int,int*);
virtual ~MPIFS();
Boolean am_master_compute_node();
Boolean am_compute_node();
Boolean am_master_io_node();
Boolean am_io_node();
void Broadcast(int,void*, int,MPI_Datatype,int);
void io_node_main_loop();
void start_collective_io(int,int, MPI_Status*);
void start_attribute_io(int,int,MPI_Status*);
void part_time_io_node_loop(int*,int, Array*);
void compute_node_io_loop(Array*);
int app_size(int);
int my_rank(int);
void send_array_schema(Array*);
void send_attr_schema(Attribute*, char*, int);
void receive_attr_schema();
void send_attr_data(Attribute *);
void receive_attr_data(Attribute *);
int node_type();
void user_commands(int);
void release_compute_nodes(int);
void compute_side_io_done();
App_Info* io_app_info();
int master_io_node();
int mem_buf_size();
void set_mem_buf_size(int);
char* mem_buf();
void set_mem_buf(char *);
IOFile open_file(char *, int);
Boolean is_new_file(char *);
};
#endif
|