diff options
Diffstat (limited to 'src/Panda/Shared_IO.C')
-rw-r--r-- | src/Panda/Shared_IO.C | 237 |
1 files changed, 237 insertions, 0 deletions
diff --git a/src/Panda/Shared_IO.C b/src/Panda/Shared_IO.C new file mode 100644 index 0000000..8b5a5cd --- /dev/null +++ b/src/Panda/Shared_IO.C @@ -0,0 +1,237 @@ +#include "definitions.h" +#include "ArrayGroup.h" +#include "MPIFS.h" +#include "Chunk.h" +#include "App_Info.h" +#include "Array.h" +#include "message.h" +#include "Shared_IO.h" + + +extern MPIFS* MPIFS_global_obj; +extern int SUBCHUNK_SIZE; + +Shared_IO::Shared_IO(int *schema_string, int schema_size, int world_rank, + int comp_app_num,int comp_app_size , App_Info *app_info) +: Simple_IO(schema_string, schema_size, world_rank, comp_app_num, + comp_app_size, app_info) +{ + + compute_chunk_ = new Chunk(); + current_chunk_ = new Chunk(); + subchunk_ = new Chunk(); + current_array_id_ = -1; + if ((op_type_ == RESTART)||(op_type_ == GENERAL_READ)|| + (op_type_ == READ_TIMESTEP)) + read_op_ = YES; + else + read_op_ = NO; + + /* We need to set the following variables so that continue_io()* + * would start the I/O of the first subchunk automatically */ + contiguous_ = NO; + current_array_id_ = -1; + current_chunk_id_ = 0; + num_of_chunks_ = -1; /* This will cause get_next_chunk() to fail */ + current_subchunk_id_ = 0; + num_of_subchunks_ = -1; /* Causes get_next_subchunk() to fail */ + status_flag_ = START; + continue_io(); +} + +Shared_IO::~Shared_IO() +{ + if (subchunk_) delete subchunk_; + if (compute_chunk_) delete compute_chunk_; + subchunk_ = compute_chunk_ = NULL; +} + +Boolean Shared_IO::get_next_array(){ + current_array_id_++; + if (current_array_id_ < num_of_arrays_){ + make_subchunks_ = -1; + current_array_ = find_array(current_array_id_); + nat_chunked_ = current_array_->nat_chunked(); + sub_chunked_ = current_array_->sub_chunked(); + array_rank_ = current_array_->rank(); + + if (array_rank_ > max_rank_){ + realloc_schema_bufs(array_rank_); + } + num_of_chunks_ = current_array_->layout(IO_NODE)->total_elements(); + current_chunk_id_ = -1; + if (nat_chunked_ && !sub_chunked_) + contiguous_ = YES; /* No need to use derived datatypes */ + else + contiguous_ = NO; /* Have to use derived datatypes */ + + bytes_to_go_ = 0; + current_subchunk_id_ = -1; + return YES; + } else + return NO; +} + + +Boolean Shared_IO::get_next_chunk() +{ + int *ptr; + + if (!current_array_) return NO; + current_chunk_id_ = current_array_->get_next_index(current_chunk_id_, + my_io_rank_, + num_io_nodes_); + if (current_chunk_id_ < num_of_chunks_){ + current_chunk_->set_data_ptr(NULL); + current_chunk_->init(current_array_, current_chunk_id_, + IO_NODE, NO_ALLOC); + if (contiguous_){ + bytes_to_go_ = current_chunk_->total_size_in_bytes(); + current_chunk_->set_data_ptr(mem_buf_); + ptr = schema_bufs_[0]; + *ptr++ = current_array_id_; + *ptr++ = current_chunk_id_; + *ptr++ = (int) nat_chunked_; + *ptr++ = (int) contiguous_; + *ptr++ = op_type_; + *ptr++ = 0; + *ptr++ = 0; + compute_chunk_overlaps(current_array_, current_chunk_); + } + else { + if (!sub_chunked_ && (make_subchunks_ == -1)){ + current_array_->make_sub_chunks(current_chunk_); + make_subchunks_ = 1; + } + num_of_subchunks_ = current_array_->layout(SUB_CHUNK)->total_elements(); + current_subchunk_id_ = -1; + } + return YES; + } + else + return NO; +} + + +/* This should not be called for the contiguous_ case */ +Boolean Shared_IO::get_next_subchunk() +{ + current_subchunk_id_++; + if (current_subchunk_id_ < num_of_subchunks_){ + subchunk_->set_data_ptr(NULL); + subchunk_->init(current_chunk_, current_subchunk_id_, NO_ALLOC); + bytes_to_go_ = subchunk_->total_size_in_bytes(); + + if (bytes_to_go_ < mem_buf_size_) + realloc_mem_bufs(bytes_to_go_); + + subchunk_->set_data_ptr(mem_buf_); + return YES; + } + else + return NO; +} + + +void Shared_IO::start_subchunk_io() +{ + int *ptr; + + if (contiguous_){ + ptr = schema_bufs_[0]; + ptr[6] = min(SUBCHUNK_SIZE, bytes_to_go_); + + nb_send_message((void *)ptr, 7, MPI_INT, dest_ids_[0], + CHUNK_SCHEMA, MPI_COMM_WORLD, &schema_requests_[0]); + if (read_op_){ + read_data(mem_buf_, ptr[6]); + nb_send_message((void *)mem_buf_, ptr[6], MPI_CHAR, dest_ids_[0], + CHUNK_DATA_FROM_IO, MPI_COMM_WORLD, &requests_[0]); + } + else + nb_receive_message((void *)mem_buf_, ptr[6], MPI_CHAR, dest_ids_[0], + CHUNK_DATA_TO_IO, MPI_COMM_WORLD, &requests_[0]); + ptr[5] += ptr[6]; /* Offset of the next subchunk */ + bytes_to_go_ -= ptr[6]; + status_flag_ = WAITING; + + } else { + compute_chunk_overlaps(current_array_, subchunk_); + + compute_schemas(current_array_, subchunk_, compute_chunk_, + current_array_id_); + if (read_op_){ + read_data(subchunk_); + send_data_to_compute_nodes(subchunk_, NULL, NULL); + } + else + receive_data_from_compute_nodes(subchunk_, NULL, NULL); + status_flag_ = WAITING; + } +} + + +Boolean Shared_IO::test_subchunk_io() +{ + int flag; + MPI_Testall(num_overlaps_, requests_, &flag, statuses_); + if (flag) { + /* Free schema request objects - Do we need to do this */ + MPI_Waitall(num_overlaps_, schema_requests_,statuses_); + status_flag_ = START; + if (!read_op_) + if (contiguous_) + write_data(mem_buf_, schema_bufs_[0][6], 1); + else + write_data(subchunk_); + + if (!contiguous_) free_datatypes(); + return YES; + } + return NO; +} + + +/* Return YES, if I/O is complete */ +Boolean Shared_IO::continue_io() +{ + if (status_flag_ == START){ + if (!start_next_subchunk_io()) return YES; /* IO completed */ + } else if (status_flag_ == WAITING){ + if (test_subchunk_io()) + if (!start_next_subchunk_io()) return YES; /* IO done */ + } else { + printf("Error - Invalid status_flag value \n"); + exit(11); + } + return NO; +} + +/* Return yes if you can start the io of another subchunk */ +Boolean Shared_IO::start_next_subchunk_io() +{ + if (contiguous_){ + if (bytes_to_go_ <= 0){ + while(!get_next_chunk()){ + if (!get_next_array()) return NO; + } + /* Since we might be looking at another array */ + if (!contiguous_) get_next_subchunk(); + } + + start_subchunk_io(); + } else { + + if (!get_next_subchunk()){ + /* We have finished this chunk */ + while(!get_next_chunk()){ + if (!get_next_array()) return NO; + } + if (!contiguous_) get_next_subchunk(); + } + + start_subchunk_io(); + } + return YES; +} + |