#include "definitions.h" #include "ArrayGroup.h" #include "MPIFS.h" #include "Chunk.h" #include "App_Info.h" #include "Array.h" #include "message.h" #include "Shared_IO.h" extern MPIFS* MPIFS_global_obj; extern int SUBCHUNK_SIZE; Shared_IO::Shared_IO(int *schema_string, int schema_size, int world_rank, int comp_app_num,int comp_app_size , App_Info *app_info) : Simple_IO(schema_string, schema_size, world_rank, comp_app_num, comp_app_size, app_info) { compute_chunk_ = new Chunk(); current_chunk_ = new Chunk(); subchunk_ = new Chunk(); current_array_id_ = -1; if ((op_type_ == RESTART)||(op_type_ == GENERAL_READ)|| (op_type_ == READ_TIMESTEP)) read_op_ = YES; else read_op_ = NO; /* We need to set the following variables so that continue_io()* * would start the I/O of the first subchunk automatically */ contiguous_ = NO; current_array_id_ = -1; current_chunk_id_ = 0; num_of_chunks_ = -1; /* This will cause get_next_chunk() to fail */ current_subchunk_id_ = 0; num_of_subchunks_ = -1; /* Causes get_next_subchunk() to fail */ status_flag_ = START; continue_io(); } Shared_IO::~Shared_IO() { if (subchunk_) delete subchunk_; if (compute_chunk_) delete compute_chunk_; subchunk_ = compute_chunk_ = NULL; } Boolean Shared_IO::get_next_array(){ current_array_id_++; if (current_array_id_ < num_of_arrays_){ make_subchunks_ = -1; current_array_ = find_array(current_array_id_); nat_chunked_ = current_array_->nat_chunked(); sub_chunked_ = current_array_->sub_chunked(); array_rank_ = current_array_->rank(); if (array_rank_ > max_rank_){ realloc_schema_bufs(array_rank_); } num_of_chunks_ = current_array_->layout(IO_NODE)->total_elements(); current_chunk_id_ = -1; if (nat_chunked_ && !sub_chunked_) contiguous_ = YES; /* No need to use derived datatypes */ else contiguous_ = NO; /* Have to use derived datatypes */ bytes_to_go_ = 0; current_subchunk_id_ = -1; return YES; } else return NO; } Boolean Shared_IO::get_next_chunk() { int *ptr; if (!current_array_) return NO; current_chunk_id_ = current_array_->get_next_index(current_chunk_id_, my_io_rank_, num_io_nodes_); if (current_chunk_id_ < num_of_chunks_){ current_chunk_->set_data_ptr(NULL); current_chunk_->init(current_array_, current_chunk_id_, IO_NODE, NO_ALLOC); if (contiguous_){ bytes_to_go_ = current_chunk_->total_size_in_bytes(); current_chunk_->set_data_ptr(mem_buf_); ptr = schema_bufs_[0]; *ptr++ = current_array_id_; *ptr++ = current_chunk_id_; *ptr++ = (int) nat_chunked_; *ptr++ = (int) contiguous_; *ptr++ = op_type_; *ptr++ = 0; *ptr++ = 0; compute_chunk_overlaps(current_array_, current_chunk_); } else { if (!sub_chunked_ && (make_subchunks_ == -1)){ current_array_->make_sub_chunks(current_chunk_); make_subchunks_ = 1; } num_of_subchunks_ = current_array_->layout(SUB_CHUNK)->total_elements(); current_subchunk_id_ = -1; } return YES; } else return NO; } /* This should not be called for the contiguous_ case */ Boolean Shared_IO::get_next_subchunk() { current_subchunk_id_++; if (current_subchunk_id_ < num_of_subchunks_){ subchunk_->set_data_ptr(NULL); subchunk_->init(current_chunk_, current_subchunk_id_, NO_ALLOC); bytes_to_go_ = subchunk_->total_size_in_bytes(); if (bytes_to_go_ < mem_buf_size_) realloc_mem_bufs(bytes_to_go_); subchunk_->set_data_ptr(mem_buf_); return YES; } else return NO; } void Shared_IO::start_subchunk_io() { int *ptr; if (contiguous_){ ptr = schema_bufs_[0]; ptr[6] = min(SUBCHUNK_SIZE, bytes_to_go_); nb_send_message((void *)ptr, 7, MPI_INT, dest_ids_[0], CHUNK_SCHEMA, MPI_COMM_WORLD, &schema_requests_[0]); if (read_op_){ read_data(mem_buf_, ptr[6]); nb_send_message((void *)mem_buf_, ptr[6], MPI_CHAR, dest_ids_[0], CHUNK_DATA_FROM_IO, MPI_COMM_WORLD, &requests_[0]); } else nb_receive_message((void *)mem_buf_, ptr[6], MPI_CHAR, dest_ids_[0], CHUNK_DATA_TO_IO, MPI_COMM_WORLD, &requests_[0]); ptr[5] += ptr[6]; /* Offset of the next subchunk */ bytes_to_go_ -= ptr[6]; status_flag_ = WAITING; } else { compute_chunk_overlaps(current_array_, subchunk_); compute_schemas(current_array_, subchunk_, compute_chunk_, current_array_id_); if (read_op_){ read_data(subchunk_); send_data_to_compute_nodes(subchunk_, NULL, NULL); } else receive_data_from_compute_nodes(subchunk_, NULL, NULL); status_flag_ = WAITING; } } Boolean Shared_IO::test_subchunk_io() { int flag; MPI_Testall(num_overlaps_, requests_, &flag, statuses_); if (flag) { /* Free schema request objects - Do we need to do this */ MPI_Waitall(num_overlaps_, schema_requests_,statuses_); status_flag_ = START; if (!read_op_) if (contiguous_) write_data(mem_buf_, schema_bufs_[0][6], 1); else write_data(subchunk_); if (!contiguous_) free_datatypes(); return YES; } return NO; } /* Return YES, if I/O is complete */ Boolean Shared_IO::continue_io() { if (status_flag_ == START){ if (!start_next_subchunk_io()) return YES; /* IO completed */ } else if (status_flag_ == WAITING){ if (test_subchunk_io()) if (!start_next_subchunk_io()) return YES; /* IO done */ } else { printf("Error - Invalid status_flag value \n"); exit(11); } return NO; } /* Return yes if you can start the io of another subchunk */ Boolean Shared_IO::start_next_subchunk_io() { if (contiguous_){ if (bytes_to_go_ <= 0){ while(!get_next_chunk()){ if (!get_next_array()) return NO; } /* Since we might be looking at another array */ if (!contiguous_) get_next_subchunk(); } start_subchunk_io(); } else { if (!get_next_subchunk()){ /* We have finished this chunk */ while(!get_next_chunk()){ if (!get_next_array()) return NO; } if (!contiguous_) get_next_subchunk(); } start_subchunk_io(); } return YES; }