aboutsummaryrefslogtreecommitdiff
path: root/src/Panda/Shared_IO.C
diff options
context:
space:
mode:
Diffstat (limited to 'src/Panda/Shared_IO.C')
-rw-r--r--src/Panda/Shared_IO.C237
1 files changed, 237 insertions, 0 deletions
diff --git a/src/Panda/Shared_IO.C b/src/Panda/Shared_IO.C
new file mode 100644
index 0000000..8b5a5cd
--- /dev/null
+++ b/src/Panda/Shared_IO.C
@@ -0,0 +1,237 @@
+#include "definitions.h"
+#include "ArrayGroup.h"
+#include "MPIFS.h"
+#include "Chunk.h"
+#include "App_Info.h"
+#include "Array.h"
+#include "message.h"
+#include "Shared_IO.h"
+
+
+extern MPIFS* MPIFS_global_obj;
+extern int SUBCHUNK_SIZE;
+
+Shared_IO::Shared_IO(int *schema_string, int schema_size, int world_rank,
+ int comp_app_num,int comp_app_size , App_Info *app_info)
+: Simple_IO(schema_string, schema_size, world_rank, comp_app_num,
+ comp_app_size, app_info)
+{
+
+ compute_chunk_ = new Chunk();
+ current_chunk_ = new Chunk();
+ subchunk_ = new Chunk();
+ current_array_id_ = -1;
+ if ((op_type_ == RESTART)||(op_type_ == GENERAL_READ)||
+ (op_type_ == READ_TIMESTEP))
+ read_op_ = YES;
+ else
+ read_op_ = NO;
+
+ /* We need to set the following variables so that continue_io()*
+ * would start the I/O of the first subchunk automatically */
+ contiguous_ = NO;
+ current_array_id_ = -1;
+ current_chunk_id_ = 0;
+ num_of_chunks_ = -1; /* This will cause get_next_chunk() to fail */
+ current_subchunk_id_ = 0;
+ num_of_subchunks_ = -1; /* Causes get_next_subchunk() to fail */
+ status_flag_ = START;
+ continue_io();
+}
+
+Shared_IO::~Shared_IO()
+{
+ if (subchunk_) delete subchunk_;
+ if (compute_chunk_) delete compute_chunk_;
+ subchunk_ = compute_chunk_ = NULL;
+}
+
+Boolean Shared_IO::get_next_array(){
+ current_array_id_++;
+ if (current_array_id_ < num_of_arrays_){
+ make_subchunks_ = -1;
+ current_array_ = find_array(current_array_id_);
+ nat_chunked_ = current_array_->nat_chunked();
+ sub_chunked_ = current_array_->sub_chunked();
+ array_rank_ = current_array_->rank();
+
+ if (array_rank_ > max_rank_){
+ realloc_schema_bufs(array_rank_);
+ }
+ num_of_chunks_ = current_array_->layout(IO_NODE)->total_elements();
+ current_chunk_id_ = -1;
+ if (nat_chunked_ && !sub_chunked_)
+ contiguous_ = YES; /* No need to use derived datatypes */
+ else
+ contiguous_ = NO; /* Have to use derived datatypes */
+
+ bytes_to_go_ = 0;
+ current_subchunk_id_ = -1;
+ return YES;
+ } else
+ return NO;
+}
+
+
+Boolean Shared_IO::get_next_chunk()
+{
+ int *ptr;
+
+ if (!current_array_) return NO;
+ current_chunk_id_ = current_array_->get_next_index(current_chunk_id_,
+ my_io_rank_,
+ num_io_nodes_);
+ if (current_chunk_id_ < num_of_chunks_){
+ current_chunk_->set_data_ptr(NULL);
+ current_chunk_->init(current_array_, current_chunk_id_,
+ IO_NODE, NO_ALLOC);
+ if (contiguous_){
+ bytes_to_go_ = current_chunk_->total_size_in_bytes();
+ current_chunk_->set_data_ptr(mem_buf_);
+ ptr = schema_bufs_[0];
+ *ptr++ = current_array_id_;
+ *ptr++ = current_chunk_id_;
+ *ptr++ = (int) nat_chunked_;
+ *ptr++ = (int) contiguous_;
+ *ptr++ = op_type_;
+ *ptr++ = 0;
+ *ptr++ = 0;
+ compute_chunk_overlaps(current_array_, current_chunk_);
+ }
+ else {
+ if (!sub_chunked_ && (make_subchunks_ == -1)){
+ current_array_->make_sub_chunks(current_chunk_);
+ make_subchunks_ = 1;
+ }
+ num_of_subchunks_ = current_array_->layout(SUB_CHUNK)->total_elements();
+ current_subchunk_id_ = -1;
+ }
+ return YES;
+ }
+ else
+ return NO;
+}
+
+
+/* This should not be called for the contiguous_ case */
+Boolean Shared_IO::get_next_subchunk()
+{
+ current_subchunk_id_++;
+ if (current_subchunk_id_ < num_of_subchunks_){
+ subchunk_->set_data_ptr(NULL);
+ subchunk_->init(current_chunk_, current_subchunk_id_, NO_ALLOC);
+ bytes_to_go_ = subchunk_->total_size_in_bytes();
+
+ if (bytes_to_go_ < mem_buf_size_)
+ realloc_mem_bufs(bytes_to_go_);
+
+ subchunk_->set_data_ptr(mem_buf_);
+ return YES;
+ }
+ else
+ return NO;
+}
+
+
+void Shared_IO::start_subchunk_io()
+{
+ int *ptr;
+
+ if (contiguous_){
+ ptr = schema_bufs_[0];
+ ptr[6] = min(SUBCHUNK_SIZE, bytes_to_go_);
+
+ nb_send_message((void *)ptr, 7, MPI_INT, dest_ids_[0],
+ CHUNK_SCHEMA, MPI_COMM_WORLD, &schema_requests_[0]);
+ if (read_op_){
+ read_data(mem_buf_, ptr[6]);
+ nb_send_message((void *)mem_buf_, ptr[6], MPI_CHAR, dest_ids_[0],
+ CHUNK_DATA_FROM_IO, MPI_COMM_WORLD, &requests_[0]);
+ }
+ else
+ nb_receive_message((void *)mem_buf_, ptr[6], MPI_CHAR, dest_ids_[0],
+ CHUNK_DATA_TO_IO, MPI_COMM_WORLD, &requests_[0]);
+ ptr[5] += ptr[6]; /* Offset of the next subchunk */
+ bytes_to_go_ -= ptr[6];
+ status_flag_ = WAITING;
+
+ } else {
+ compute_chunk_overlaps(current_array_, subchunk_);
+
+ compute_schemas(current_array_, subchunk_, compute_chunk_,
+ current_array_id_);
+ if (read_op_){
+ read_data(subchunk_);
+ send_data_to_compute_nodes(subchunk_, NULL, NULL);
+ }
+ else
+ receive_data_from_compute_nodes(subchunk_, NULL, NULL);
+ status_flag_ = WAITING;
+ }
+}
+
+
+Boolean Shared_IO::test_subchunk_io()
+{
+ int flag;
+ MPI_Testall(num_overlaps_, requests_, &flag, statuses_);
+ if (flag) {
+ /* Free schema request objects - Do we need to do this */
+ MPI_Waitall(num_overlaps_, schema_requests_,statuses_);
+ status_flag_ = START;
+ if (!read_op_)
+ if (contiguous_)
+ write_data(mem_buf_, schema_bufs_[0][6], 1);
+ else
+ write_data(subchunk_);
+
+ if (!contiguous_) free_datatypes();
+ return YES;
+ }
+ return NO;
+}
+
+
+/* Return YES, if I/O is complete */
+Boolean Shared_IO::continue_io()
+{
+ if (status_flag_ == START){
+ if (!start_next_subchunk_io()) return YES; /* IO completed */
+ } else if (status_flag_ == WAITING){
+ if (test_subchunk_io())
+ if (!start_next_subchunk_io()) return YES; /* IO done */
+ } else {
+ printf("Error - Invalid status_flag value \n");
+ exit(11);
+ }
+ return NO;
+}
+
+/* Return yes if you can start the io of another subchunk */
+Boolean Shared_IO::start_next_subchunk_io()
+{
+ if (contiguous_){
+ if (bytes_to_go_ <= 0){
+ while(!get_next_chunk()){
+ if (!get_next_array()) return NO;
+ }
+ /* Since we might be looking at another array */
+ if (!contiguous_) get_next_subchunk();
+ }
+
+ start_subchunk_io();
+ } else {
+
+ if (!get_next_subchunk()){
+ /* We have finished this chunk */
+ while(!get_next_chunk()){
+ if (!get_next_array()) return NO;
+ }
+ if (!contiguous_) get_next_subchunk();
+ }
+
+ start_subchunk_io();
+ }
+ return YES;
+}
+