#include "definitions.h" #include "ArrayGroup.h" #include "MPIFS.h" #include "Chunk.h" #include "App_Info.h" #include "Array.h" #include "message.h" #include "CSDIO.h" #include "List.h" extern MPIFS* MPIFS_global_obj; extern int SUBCHUNK_SIZE; /* This code is executed on the compute nodes (excluding the part-time i/o * nodes). */ void CSDIO::compute_node_io_loop(ArrayGroup *group) { int array_idx; Boolean read_op; op_type_ = group->op_type(); if ((op_type_ == RESTART) || (op_type_ == GENERAL_READ) || (op_type_ == READ_TIMESTEP)){ read_op = YES; } else { read_op = NO; } MPI_Comm_rank(MPI_COMM_WORLD, &world_rank_); num_of_arrays_ = group->num_of_arrays(); receive_io_app_info(); num_io_nodes_ = io_app_info_->app_size(); #ifdef DEBUG printf("%d: op_type_ = %d read_op =%d\n", world_rank_, op_type_, read_op); printf("%d: Compute node - num of arrays %d - num of io_nodes %d\n", world_rank_, num_of_arrays_, num_io_nodes_); #endif comp_current_array_ = new Array(); comp_current_array_id_ = -1; for(array_idx = 0; array_idx < num_of_arrays_; array_idx++){ while(!process_compute_side_array(group, array_idx, read_op)){}; } delete comp_current_array_; comp_current_array_ = NULL; } /* An array is stored in the comp_current_array_. this must be instatntiated * before use. If the input array_id is the same as that stored in * comp_current_array_id_, then it means that all the required sends/recvs * have been posted and all we have to do is to verify its completion. If * they are different, then it means that we have to start the i/o for the * new array. */ Boolean CSDIO::process_compute_side_array(ArrayGroup *group, int array_idx, Boolean read_op) { int make_subchunks=-1, tag, tag_ctr=0, buf_size, bytes_to_go, flag, i; char *tmp_buf; void *void_buf; Chunk *compute_chunk=NULL, *io_chunk=NULL, *subchunk=NULL; if (comp_current_array_id_ != array_idx){ /* We have to post the sends/recvs for this array*/ comp_current_array_->copy(group->find_array(array_idx)); comp_array_rank_ = comp_current_array_->rank(); if (comp_array_rank_ > max_comp_rank_){ realloc_compute_schema_bufs(comp_array_rank_); } nat_chunked_ = comp_current_array_->nat_chunked(); sub_chunked_ = comp_current_array_->sub_chunked(); if (nat_chunked_ && !sub_chunked_) contiguous_ = YES; else contiguous_ = NO; compute_pending_ = 0; if (contiguous_){ /* Nat chunking with no user-specified chunking. We don't need * to use any MPI dervied datatypes. */ comp_current_array_->list_clear(); compute_chunk = comp_current_array_->get_next_chunk(); while(compute_chunk != NULL){ comp_current_chunk_id_ = compute_chunk->chunk_id(); io_overlaps_ = 1; io_overlap_chunk_ids_[0] = comp_current_chunk_id_; io_dest_ids_[0] = io_app_info_->world_rank(comp_current_array_->which_node( comp_current_chunk_id_, IO_NODE, num_io_nodes_)); if (io_dest_ids_[0] == world_rank_){ /* Part-time case - do nothing, the io node should take care of this */ } else { bytes_to_go = compute_chunk->total_size_in_bytes(); tmp_buf = (char *)compute_chunk->data_ptr(); tag_ctr = 0; while(bytes_to_go > 0){ buf_size = min(SUBCHUNK_SIZE, bytes_to_go); if (compute_pending_ >= max_pending_){ realloc_pending_messages(compute_pending_+1); } tag = comp_current_chunk_id_ * 1000 + tag_ctr*10; if (read_op) nb_receive_message((void *) tmp_buf, buf_size, MPI_CHAR, io_dest_ids_[0], tag + CHUNK_DATA_FROM_IO, MPI_COMM_WORLD, &comp_requests_[compute_pending_]); else nb_send_message((void *) tmp_buf, buf_size, MPI_CHAR, io_dest_ids_[0], tag + CHUNK_DATA_TO_IO, MPI_COMM_WORLD, &comp_requests_[compute_pending_]); tag_ctr++; tmp_buf += buf_size; bytes_to_go -= buf_size; compute_pending_++; } } compute_chunk = comp_current_array_->get_next_chunk(); } comp_current_array_->list_clear(); } /* End if contiguous */ else { /* We have to use mpi-derived datatypes */ make_subchunks = -1; io_chunk = new Chunk(); subchunk = new Chunk(); comp_current_array_->list_clear(); compute_chunk = comp_current_array_->get_next_chunk(); while (compute_chunk != NULL){ comp_current_chunk_id_ = compute_chunk->chunk_id(); /* Determine the overlapping I/O chunks */ io_chunk_overlaps(comp_current_array_, compute_chunk); for( i=0;i< io_overlaps_;i++){ if (io_dest_ids_[i] != world_rank_){ /* Different node- so we have to post the send/recv */ io_chunk->init(comp_current_array_, io_overlap_chunk_ids_[i], IO_NODE, NO_ALLOC); if (!sub_chunked_ && (make_subchunks == -1)){ comp_current_array_->make_sub_chunks(io_chunk); make_subchunks = 1; } tag_ctr=0; comp_num_of_subchunks_ = comp_current_array_->layout(SUB_CHUNK)->total_elements(); #ifdef DEBUG printf("comp_num_of_subchunks_ = %d\n", comp_num_of_subchunks_); #endif for(comp_current_subchunk_id_ = 0; comp_current_subchunk_id_ < comp_num_of_subchunks_; comp_current_subchunk_id_++){ #ifdef DEBUG printf("io_chunk = %d subchunk_id = %d\n", io_chunk->chunk_id(), comp_current_subchunk_id_); #endif subchunk->init(io_chunk, comp_current_subchunk_id_, NO_ALLOC); subchunk->compute_overlap(compute_chunk, comp_overlap_base_, comp_overlap_size_, comp_overlap_stride_); buf_size = num_elements(comp_array_rank_, comp_overlap_size_); if (buf_size > 0){ /* Something to send */ if (compute_pending_ >= max_pending_){ realloc_pending_messages(compute_pending_+1); } void_buf = (void *)tmp_buf; compute_chunk->make_datatype(comp_overlap_base_, comp_overlap_size_, comp_overlap_stride_, &void_buf, &comp_datatypes_[compute_pending_]); tmp_buf = (char *)void_buf; tag = io_chunk->chunk_id()*1000 + tag_ctr*10; if (read_op) nb_receive_message((void *) tmp_buf, 1, comp_datatypes_[compute_pending_], io_dest_ids_[i], tag + CHUNK_DATA_FROM_IO, MPI_COMM_WORLD, &comp_requests_[compute_pending_]); else nb_send_message((void *) tmp_buf, 1, comp_datatypes_[compute_pending_], io_dest_ids_[i], tag + CHUNK_DATA_TO_IO, MPI_COMM_WORLD, &comp_requests_[compute_pending_]); compute_pending_++; } tag_ctr++; } } } compute_chunk = comp_current_array_->get_next_chunk(); } } comp_current_array_id_ = array_idx; return NO; } else { if (part_time_io_){ /* Just test and get back to io-node stuff */ MPI_Testall(compute_pending_, comp_requests_, &flag, comp_statuses_); if (flag){ if (!contiguous_){ for(i=0; iclear(); return YES; } } else { #ifdef DEBUG printf("%d: Waiting for %d messages to complete\n", world_rank_, compute_pending_); #endif MPI_Waitall(compute_pending_, comp_requests_, comp_statuses_); if (!contiguous_){ for(i=0; iclear(); return YES; } return NO; } } void CSDIO::start_to_finish(Boolean part_time, ArrayGroup *compute_group) { int array_idx, make_subchunks, bytes_to_go, buf_size, tag_ctr, tag; Boolean read_op, part_time_done; Chunk *chunk=NULL, *subchunk=NULL, *compute_chunk=NULL; /* Don't ask me why. Ask szu-Wen */ comp_current_array_id_ = -1; if ((op_type_ == RESTART) || (op_type_ == GENERAL_READ) || (op_type_ == READ_TIMESTEP)){ read_op = YES; } else { read_op = NO; } part_time_io_ = part_time; compute_node_group_ = compute_group; comp_current_array_ = NULL; if (part_time_io_){ comp_current_array_ = new Array(); } /* Receive the i/o node information */ receive_io_app_info(); /* To reduce costs associated with object creation and deletion, we * * will create a dummy chunk,subchunk and compute chunk object and * * re-initialize them whenever necessary. */ chunk = new Chunk(); current_chunk_ = chunk; subchunk = new Chunk(); compute_chunk = new Chunk(); for(array_idx=0; array_idxnat_chunked(); sub_chunked_ = current_array_->sub_chunked(); if (nat_chunked_ && !sub_chunked_) contiguous_ = YES; else contiguous_ = NO; array_rank_ = current_array_->rank(); if (array_rank_ > max_rank_){ realloc_schema_bufs(array_rank_); } num_of_chunks_ = current_array_->layout(IO_NODE)->total_elements(); current_chunk_id_ = current_array_->get_next_index(-1, my_io_rank_, num_io_nodes_); if (contiguous_){ /* Natural chunked and no-user specified subchunking */ while(current_chunk_id_ < num_of_chunks_){ num_overlaps_ = 1; overlap_chunk_ids_[0] = current_chunk_id_; dest_ids_[0] = app_info_->world_rank(current_array_->which_node( current_chunk_id_, COMPUTE_NODE)); if (part_time_io_ && (world_rank_ == dest_ids_[0])){ direct_io(array_idx, current_chunk_id_, read_op, NULL, NULL); } else { chunk->init(current_array_, current_chunk_id_, IO_NODE, NO_ALLOC); bytes_to_go = chunk->total_size_in_bytes(); chunk->set_data_ptr(mem_buf_); /* We don't have to make the schema requests - just post the send/recv */ tag_ctr = 0; while (bytes_to_go > 0){ buf_size = min(SUBCHUNK_SIZE, bytes_to_go); tag = current_chunk_id_*1000+tag_ctr*10; if (read_op) { read_data(mem_buf_, buf_size); nb_send_message((void *)mem_buf_, buf_size, MPI_CHAR, dest_ids_[0], tag+CHUNK_DATA_FROM_IO, MPI_COMM_WORLD, &requests_[0]); wait_for_completion(); } else { nb_receive_message((void *)mem_buf_, buf_size, MPI_CHAR, dest_ids_[0],tag+CHUNK_DATA_TO_IO, MPI_COMM_WORLD, &requests_[0]); wait_for_completion(); write_data(mem_buf_, buf_size, chunk->element_size()); } bytes_to_go -= buf_size; tag_ctr++; } chunk->set_data_ptr(NULL); } current_chunk_id_ = current_array_->get_next_index(current_chunk_id_, my_io_rank_, num_io_nodes_); } } /* End if contiguous_ */ else { /* Have to use MPI-derived datatypes */ while(current_chunk_id_ < num_of_chunks_){ chunk->init(current_array_, current_chunk_id_, IO_NODE, NO_ALLOC); if (!sub_chunked_ && (make_subchunks == -1)){ current_array_->make_sub_chunks(chunk); make_subchunks = 1; } num_of_subchunks_=current_array_->layout(SUB_CHUNK)->total_elements(); tag_ctr=0; for(current_subchunk_id_ = 0; current_subchunk_id_ < num_of_subchunks_; current_subchunk_id_++){ subchunk->init(current_chunk_, current_subchunk_id_, NO_ALLOC); bytes_to_go = subchunk->total_size_in_bytes(); if (bytes_to_go > mem_buf_size_){ realloc_mem_bufs(bytes_to_go); } subchunk->set_data_ptr(mem_buf_); compute_chunk_overlaps(current_array_, subchunk); compute_schemas(current_array_, subchunk, compute_chunk, array_idx); tag = current_chunk_id_ * 1000 + tag_ctr*10; if (read_op){ read_data(subchunk); send_data_to_compute_nodes(subchunk, tag); wait_for_completion(); } else { receive_data_from_compute_nodes(subchunk, tag); wait_for_completion(); write_data(subchunk); } tag_ctr++; subchunk->set_data_ptr(NULL); } current_chunk_id_ = current_array_->get_next_index(current_chunk_id_, my_io_rank_, num_io_nodes_); } } if (part_time_io_) while (!process_compute_side_array(compute_group, array_idx, read_op)){}; } /* Free the temp chunk objects */ delete(chunk); delete(subchunk); delete(compute_chunk); chunk = current_chunk_ = subchunk = compute_chunk = NULL; if (comp_current_array_){ delete(comp_current_array_); comp_current_array_ = NULL; } } /* This constructor is for pure io_nodes only */ CSDIO::CSDIO(int *schema_string, int schema_size, int world_rank, int comp_app_num, int comp_app_size, App_Info *app_info): Simple_IO(schema_string, schema_size, world_rank, comp_app_num, comp_app_size, app_info) { clear(); } /* This call is for compute nodes only */ CSDIO::CSDIO() { do_init(); } CSDIO::CSDIO(int *schema_string, int schema_size, int world_rank, int comp_app_num, int comp_app_size, App_Info *app_info, Boolean part_time): Simple_IO(schema_string, schema_size, world_rank, comp_app_num, comp_app_size, app_info) { if (part_time){ /* This is a part-time i/o node */ do_init(); part_time_io_ = part_time; } else { clear(); } } void CSDIO::clear() { comp_datatypes_ = NULL; comp_requests_ = NULL; comp_statuses_ = NULL; io_overlap_chunk_ids_ = io_dest_ids_ = comp_overlap_base_ = NULL; comp_overlap_size_ = comp_overlap_stride_ = NULL; io_app_info_ = NULL; } void CSDIO::do_init() { max_pending_ = 1; compute_pending_ = 0; comp_datatypes_ = (MPI_Datatype *)malloc(sizeof(MPI_Datatype)*max_pending_); comp_requests_ = (MPI_Request *)malloc(sizeof(MPI_Request)*max_pending_); comp_statuses_ = (MPI_Status *)malloc(sizeof(MPI_Status)*max_pending_); io_max_overlaps_ = 1; io_overlaps_ =0; io_overlap_chunk_ids_ = (int *) malloc(sizeof(int)*io_max_overlaps_); io_dest_ids_ = (int *) malloc(sizeof(int)*io_max_overlaps_); max_comp_rank_ = 10; comp_array_rank_ = 0; comp_overlap_base_ = (int *) malloc(sizeof(int)*max_comp_rank_); comp_overlap_size_ = (int *) malloc(sizeof(int)*max_comp_rank_); comp_overlap_stride_ = (int *) malloc(sizeof(int)*max_comp_rank_); } CSDIO::~CSDIO() { if (part_time_io_ || dummy_){ if (comp_datatypes_) free(comp_datatypes_); if (comp_requests_) free(comp_requests_); if (comp_statuses_) free(comp_statuses_); if (comp_overlap_base_) free(comp_overlap_base_); if (comp_overlap_size_) free(comp_overlap_size_); if (comp_overlap_stride_) free(comp_overlap_stride_); if (io_overlap_chunk_ids_) free(io_overlap_chunk_ids_); if (io_dest_ids_) free(io_dest_ids_); if (comp_current_array_) delete(comp_current_array_); if (io_app_info_) delete(io_app_info_); }; clear(); } void CSDIO::receive_io_app_info() { int node_type = MPIFS_global_obj->node_type(); int num_of_world_nodes, app_info_buf_size, *app_info_buf; int tag = APP_INFO * 10 + SPECIAL; App_Info *tmp_info = NULL; MPI_Status app_status; MPI_Comm_size(MPI_COMM_WORLD, &num_of_world_nodes); app_info_buf_size = num_of_world_nodes+2; /* Num io nodes <= total nodes */ app_info_buf = (int *)malloc(sizeof(int)*app_info_buf_size); if (node_type == IO_NODE){ /* Master i/o node sends io app info to the master compute node */ if (MPIFS_global_obj->am_master_io_node()){ tmp_info = MPIFS_global_obj->io_app_info(); app_info_buf[0] = tmp_info->app_num(); app_info_buf[1] = tmp_info->app_size(); tmp_info->world_ranks(&app_info_buf[2]); app_info_buf_size = app_info_buf[1] + 2; #ifdef DEBUG printf("%d:app_num=%d app_size=%d\n", world_rank_, app_info_buf[0], app_info_buf[1]); printf("sending messages to %d\n", app_info_->get_master()); #endif send_message((void *) app_info_buf, app_info_buf_size, MPI_INT, app_info_->get_master(), tag, MPI_COMM_WORLD); #ifdef DEBUG printf("%d: %d %d %d\n", world_rank_,app_info_buf[0], app_info_buf[1], app_info_buf[2]); #endif } } else if (node_type == PART_TIME_IO){ if (MPIFS_global_obj->am_master_io_node()){ tmp_info = MPIFS_global_obj->io_app_info(); app_info_buf[0] = tmp_info->app_num(); app_info_buf[1] = tmp_info->app_size(); tmp_info->world_ranks(&app_info_buf[2]); app_info_buf_size = app_info_buf[1] + 2; if (MPIFS_global_obj->am_master_compute_node()){ MPIFS_global_obj->Broadcast(COMPUTE_NODE, (void *) app_info_buf, app_info_buf_size, MPI_INT, tag); } else { send_message((void *)app_info_buf, app_info_buf_size, MPI_INT, app_info_->get_master(), tag, MPI_COMM_WORLD); receive_message((void *)app_info_buf, app_info_buf_size, MPI_INT, MPI_ANY_SOURCE, tag, MPI_COMM_WORLD, &app_status); mpi_get_count(&app_status, MPI_INT, &app_info_buf_size); MPIFS_global_obj->Broadcast(COMPUTE_NODE, (void *) app_info_buf, app_info_buf_size, MPI_INT, tag); } } else { receive_message((void *)app_info_buf, app_info_buf_size, MPI_INT, MPI_ANY_SOURCE, tag, MPI_COMM_WORLD, &app_status); mpi_get_count(&app_status, MPI_INT, &app_info_buf_size); MPIFS_global_obj->Broadcast(COMPUTE_NODE, (void *) app_info_buf, app_info_buf_size, MPI_INT, tag); } io_app_info_ = new App_Info(app_info_buf[0], app_info_buf[1], &app_info_buf[2]); } else if (node_type == COMPUTE_NODE) { receive_message((void *)app_info_buf, app_info_buf_size, MPI_INT, MPI_ANY_SOURCE, tag, MPI_COMM_WORLD, &app_status); mpi_get_count(&app_status, MPI_INT, &app_info_buf_size); #ifdef DEBUG printf("%d:app_info_buf_size =%d\n", world_rank_, app_info_buf_size); #endif io_app_info_ = new App_Info(app_info_buf[0], app_info_buf[1], &app_info_buf[2]); MPIFS_global_obj->Broadcast(COMPUTE_NODE, (void *) app_info_buf, app_info_buf_size, MPI_INT, tag); } else if (node_type == PART_TIME_COMPUTE) { receive_message((void *)app_info_buf, app_info_buf_size, MPI_INT, MPI_ANY_SOURCE, tag, MPI_COMM_WORLD, &app_status); mpi_get_count(&app_status, MPI_INT, &app_info_buf_size); io_app_info_ = new App_Info(app_info_buf[0], app_info_buf[1], &app_info_buf[2]); MPIFS_global_obj->Broadcast(COMPUTE_NODE, (void *) app_info_buf, app_info_buf_size, MPI_INT, tag); } else { printf("Error in CSDIO::receive_io_app_info - incorrect node type\n"); exit(1); } free(app_info_buf); app_info_buf = NULL; } /* Store the schema only for the part-time i/o case. Don't send the any * schema message. */ void CSDIO::send_schema_message(int array_id, int index) { int *ptr = schema_bufs_[index]; if (part_time_io_ && (dest_ids_[index] == world_rank_)){ *ptr++ = array_id; *ptr++ = overlap_chunk_ids_[index]; *ptr++ = (int) nat_chunked_; *ptr++ = (int) contiguous_; *ptr++ = array_rank_; *ptr++ = op_type_; for(int i=0; i < array_rank_; i++) *ptr++ = overlap_base_[i]; for(i=0; i < array_rank_; i++) *ptr++ = overlap_size_[i]; for(i=0; i < array_rank_; i++) *ptr++ = overlap_stride_[i]; } } void CSDIO::send_data_to_compute_nodes(Chunk *subchunk, int tag) { for(int i=0; i < num_overlaps_; i++){ if (part_time_io_ && (dest_ids_[i] == world_rank_)){ copy_data(subchunk, i, YES, NULL, NULL); requests_[i] = MPI_REQUEST_NULL; } else { nb_send_message((void *)data_ptrs_[i], 1, datatypes_[i], dest_ids_[i], tag+CHUNK_DATA_FROM_IO,MPI_COMM_WORLD, &requests_[i]); } } } void CSDIO::receive_data_from_compute_nodes(Chunk *subchunk, int tag) { for(int i=0; i < num_overlaps_; i++){ if (part_time_io_ && (dest_ids_[i] == world_rank_)){ copy_data(subchunk, i, NO, NULL, NULL); requests_[i] = MPI_REQUEST_NULL; } else { nb_receive_message((void *)data_ptrs_[i], 1, datatypes_[i], dest_ids_[i], tag+CHUNK_DATA_TO_IO,MPI_COMM_WORLD, &requests_[i]); } } } void CSDIO::realloc_compute_schema_bufs(int new_max) { max_comp_rank_ = new_max; comp_overlap_base_ = (int *) realloc(comp_overlap_base_, new_max*sizeof(int)); comp_overlap_stride_ = (int *) realloc(comp_overlap_stride_, new_max*sizeof(int)); comp_overlap_size_ = (int *) realloc(comp_overlap_size_, new_max*sizeof(int)); } void CSDIO::realloc_pending_messages(int new_max) { max_pending_ = new_max; comp_datatypes_ =(MPI_Datatype *)realloc(comp_datatypes_,new_max*sizeof(MPI_Datatype)); comp_requests_ = (MPI_Request *)realloc(comp_requests_, new_max*sizeof(MPI_Request)); comp_statuses_ = (MPI_Status*)realloc(comp_statuses_, new_max*sizeof(MPI_Status)); } void CSDIO::realloc_io_buffers(int new_max) { io_max_overlaps_ = new_max; io_overlap_chunk_ids_ =(int*)realloc(io_overlap_chunk_ids_, new_max*sizeof(int)); io_dest_ids_ = (int *) realloc(io_dest_ids_, new_max*sizeof(int)); } void CSDIO::io_chunk_overlaps(Array *array, Chunk *subchunk) { int num_compute_chunks; if (nat_chunked_){ io_overlaps_ = 1; io_overlap_chunk_ids_[0] = current_chunk_id_; } else{ num_compute_chunks = array->layout(IO_NODE)->total_elements(); if (num_compute_chunks > io_max_overlaps_) realloc_io_buffers(num_compute_chunks); subchunk->chunk_overlaps(array, &io_overlaps_, io_overlap_chunk_ids_, IO_NODE); } for(int i=0; i < io_overlaps_;i++) io_dest_ids_[i] = io_app_info_->world_rank (array->which_node(io_overlap_chunk_ids_[i], IO_NODE, num_io_nodes_)); } void CSDIO::wait_for_completion() { MPI_Waitall(num_overlaps_, requests_, statuses_); if (!contiguous_) for(int i=0; i< num_overlaps_;i++) MPI_Type_free(&datatypes_[i]); } char* CSDIO::name() { return name_; }