#include #include #include #include #include #include #include #include #include #include #include #include #include #ifdef HAVE_DIRENT_H # include #endif #include #include "CactusBase/IOUtil/src/ioGH.h" #include "CactusBase/IOUtil/src/ioutil_CheckpointRecovery.h" #include #include "iof5.hh" namespace CarpetIOF5 { using namespace std; using namespace Carpet; // Checkpointing state static int last_checkpoint_iteration = -1; // Scheduled startup routine int CarpetIOF5_Startup() { CCTK_RegisterBanner("AMR F5 I/O provided by CarpetIOF5"); int const GHExtension = CCTK_RegisterGHExtension(CCTK_THORNSTRING); CCTK_RegisterGHExtensionSetupGH(GHExtension, SetupGH); return 0; } // Registered GH extension setup routine void *SetupGH(tFleshConfig *const fleshconfig, int const convLevel, cGH *const cctkGH) { DECLARE_CCTK_PARAMETERS; // register I/O method int const ierr = IOUtil_RegisterRecover("CarpetIOF5 recovery", Input); assert(not ierr); int const IOMethod = CCTK_RegisterIOMethod("IOF5"); CCTK_RegisterIOMethodOutputGH (IOMethod, OutputGH ); CCTK_RegisterIOMethodTimeToOutput (IOMethod, TimeToOutput ); CCTK_RegisterIOMethodTriggerOutput(IOMethod, TriggerOutput); CCTK_RegisterIOMethodOutputVarAs (IOMethod, OutputVarAs ); // there no actual extension data structure return NULL; } // Scheduled initialisation routine void CarpetIOF5_Init(CCTK_ARGUMENTS) { DECLARE_CCTK_ARGUMENTS; *this_iteration = -1; *next_output_iteration = 0; last_checkpoint_iteration = cctk_iteration; } // A mechanism to keep the HDF5 output file open across multiple // write operations: hid_t file = H5I_INVALID_HID; int keep_file_open = 0; void enter_keep_file_open() { ++keep_file_open; } void leave_keep_file_open() { assert(keep_file_open > 0); --keep_file_open; if (keep_file_open==0 and file>=0) { herr_t const herr = H5Fclose(file); assert(not herr); file = H5I_INVALID_HID; } } // Interpret Cactus parameters to decide which variables have been // selected for output class selection_t { int iteration; int times_set; vector selected; vector last_output_iteration; static void do_output(int const vindex, char const *const optstring, void *const callback_arg) { static_cast(callback_arg)->selected.at(vindex) = true; } public: selection_t(): iteration(-1), times_set(-1) { } bool is_selected(cGH const *const cctkGH, int const vindex) { DECLARE_CCTK_PARAMETERS; // Check whether the parameter out_vars changed only once per // iteration if (cctkGH->cctk_iteration != iteration) { iteration = cctkGH->cctk_iteration; int const current_times_set = CCTK_ParameterQueryTimesSet("out_vars", CCTK_THORNSTRING); // Re-scan out_vars (which is somewhat expensive) only if it // has changed if (current_times_set > times_set) { times_set = current_times_set; selected.resize(CCTK_NumVars()); fill(selected.begin(), selected.end(), false); // for (int n=0; ncctk_iteration; } void did_output(cGH const *const cctkGH, int const vindex) { last_output_iteration.resize(CCTK_NumVars(), -1); last_output_iteration.at(vindex) = cctkGH->cctk_iteration; } }; selection_t output_variables; // Callbacks for CarpetIOHDF5's I/O method int OutputGH(cGH const *const cctkGH) { DECLARE_CCTK_ARGUMENTS; DECLARE_CCTK_PARAMETERS; static Carpet::Timer timer("F5::OutputGH"); timer.start(); enter_keep_file_open(); for (int vindex=0; vindex=0 and vindex 0) { if (*this_iteration == cctk_iteration) { // we already decided to output this iteration output_this_iteration = true; } else if (cctk_iteration >= *next_output_iteration) { // it is time for the next output output_this_iteration = true; *this_iteration = cctk_iteration; *next_output_iteration = cctk_iteration + my_out_every; } } return output_this_iteration ? 1 : 0; } int TriggerOutput(cGH const *const cctkGH, int const vindex) { DECLARE_CCTK_PARAMETERS; char *const fullname = CCTK_FullName(vindex); int const gindex = CCTK_GroupIndexFromVarI(vindex); char *const groupname = CCTK_GroupName(gindex); for (char *p=groupname; *p; ++p) *p=tolower(*p); int const retval = OutputVarAs(cctkGH, fullname, groupname); free(groupname); free(fullname); output_variables.did_output(cctkGH, vindex); return retval; } int OutputVarAs(cGH const *const cctkGH, const char *const varname, const char *const alias) { DECLARE_CCTK_PARAMETERS; assert(is_level_mode()); BEGIN_GLOBAL_MODE(cctkGH) { DECLARE_CCTK_ARGUMENTS; CCTK_VInfo(CCTK_THORNSTRING, "F5::OutputVarAs: iteration=%d, variable=%s", cctk_iteration, varname); // We don't know how to open multiple files yet assert(CCTK_EQUALS(file_content, "everything")); // Determine number of I/O processes int const myproc = CCTK_MyProc(cctkGH); int const nprocs = CCTK_nProcs(cctkGH); int const ioproc_every = max_nioprocs == 0 ? 1 : (nprocs + max_nioprocs - 1) / max_nioprocs; assert(ioproc_every > 0); int const nioprocs = nprocs / ioproc_every; assert(nioprocs > 0 and nioprocs <= max_nioprocs); int const myioproc = myproc / ioproc_every * ioproc_every; // We split processes into "I/O groups" which range from // myioproc to myioproc+ioproc_every-1 (inclusive). Within each // group, at most one process can perform I/O. // If I am not the first process in my I/O group, wait for a // token from my predecessor if (myproc > myioproc) { MPI_Recv(NULL, 0, MPI_INT, myproc-1, 0, dist::comm(), MPI_STATUS_IGNORE); } // Open file static bool first_time = true; // The file name doesn't matter since we currently write // everything into a single file int const vindex = CCTK_VarIndex(varname); assert(vindex >= 0); string const basename = generate_basename(cctkGH, vindex); int const ioproc = myioproc / ioproc_every; string const name = create_filename(cctkGH, basename, cctkGH->cctk_iteration, ioproc, io_dir_output, first_time); indent_t indent; cout << indent << "I/O process=" << ioproc << "\n"; enter_keep_file_open(); bool const truncate_file = first_time and IO_TruncateOutputFiles(cctkGH) and myproc == myioproc; if (file < 0) { // Reuse file hid if file is already open file = truncate_file ? H5Fcreate(name.c_str(), H5F_ACC_TRUNC, H5P_DEFAULT, H5P_DEFAULT) : H5Fopen (name.c_str(), H5F_ACC_RDWR , H5P_DEFAULT); assert(file >= 0); } first_time = false; vector output_var(CCTK_NumVars()); output_var.at(vindex) = true; output(cctkGH, file, output_var, false, true); // Close file leave_keep_file_open(); // If I am not the last process in my I/O group, send a token to // my successor if (myproc < max(myioproc + ioproc_every, nprocs) - 1) { MPI_Send(NULL, 0, MPI_INT, myproc+1-1, 0, dist::comm()); } } END_GLOBAL_MODE; return 0; // no error } // Checkpointing void Checkpoint(cGH const *const cctkGH, int const called_from) { DECLARE_CCTK_PARAMETERS; assert(is_global_mode()); CCTK_VInfo(CCTK_THORNSTRING, "F5::Checkpoint: iteration=%d", cctkGH->cctk_iteration); #if 0 // generate filenames for both the temporary and real checkpoint // files int const ioproc = CCTK_MyProc(cctkGH); int const parallel_io = 1; char *const filename = IOUtil_AssembleFilename(cctkGH, NULL, "", ".f5", called_from, ioproc, not parallel_io); char *const tempname = IOUtil_AssembleFilename(cctkGH, NULL, ".tmp", ".f5", called_from, ioproc, not parallel_io); #endif int const myproc = CCTK_MyProc(cctkGH); int const proc = myproc; string const name = create_filename(cctkGH, "checkpoint", cctkGH->cctk_iteration, proc, io_dir_checkpoint, true); string const tempname = create_filename(cctkGH, "checkpoint.tmp", cctkGH->cctk_iteration, proc, io_dir_checkpoint, true); hid_t const file = H5Fcreate(tempname.c_str(), H5F_ACC_TRUNC, H5P_DEFAULT, H5P_DEFAULT); assert(file >= 0); vector output_var(CCTK_NumVars(), false); for (int gindex=0; gindex 0) { char buf[1000]; Util_TableGetString(gdata.tagstable, sizeof buf, buf, "checkpoint"); if (CCTK_EQUALS(buf, "no")) continue; assert(CCTK_EQUALS(buf, "yes")); } int const first_vindex = CCTK_FirstVarIndexI(gindex); int const num_vars = CCTK_NumVarsInGroupI(gindex); if (num_vars > 0) { for (int vindex=first_vindex; vindex 0 and cctk_iteration >= last_checkpoint_iteration + checkpoint_every; bool const do_checkpoint = checkpoint and (checkpoint_by_iteration or checkpoint_next); if (do_checkpoint) { Checkpoint(cctkGH, CP_EVOLUTION_DATA); } } void CarpetIOF5_TerminationCheckpoint(CCTK_ARGUMENTS) { DECLARE_CCTK_ARGUMENTS; DECLARE_CCTK_PARAMETERS; if (checkpoint and checkpoint_on_terminate) { bool const did_checkpoint = cctk_iteration == last_checkpoint_iteration; bool const do_checkpoint = not did_checkpoint; if (do_checkpoint) { Checkpoint(cctkGH, CP_EVOLUTION_DATA); } } } // Recovery information static int recovery_iteration = -1; int Input(cGH *const cctkGH, char const *const basefilename, int const called_from) { DECLARE_CCTK_PARAMETERS; herr_t herr; BEGIN_GLOBAL_MODE(cctkGH) { DECLARE_CCTK_ARGUMENTS; assert(called_from == CP_RECOVER_PARAMETERS or called_from == CP_RECOVER_DATA or called_from == FILEREADER_DATA); bool const in_recovery = called_from == CP_RECOVER_PARAMETERS or called_from == CP_RECOVER_DATA; if (in_recovery) { CCTK_VInfo(CCTK_THORNSTRING, "F5::Input: recovering iteration %d", recovery_iteration); } else { CCTK_VInfo(CCTK_THORNSTRING, "F5::Input: reading iteration %d", cctk_iteration); } cout << "called_from=" << (called_from == CP_RECOVER_PARAMETERS ? "CP_RECOVER_PARAMETERS" : called_from == CP_RECOVER_DATA ? "CP_RECOVER_DATA" : called_from == FILEREADER_DATA ? "FILEREADER_DATA" : NULL) << "\n"; // Determine which variables to read ioGH const *const ioUtilGH = (ioGH const*)CCTK_GHExtension(cctkGH, "IO"); vector input_var(CCTK_NumVars(), true); if (ioUtilGH->do_inVars) { for (int n=0; ndo_inVars[n]; } } scatter_t scatter(cctkGH); // Open file // string const basename = // in_recovery // ? "checkpoint" // : generate_basename(cctkGH, CCTK_VarIndex("grid::r")); string const basename = basefilename; // Keep track of which files could be read, and which could not int foundproc = -1, notfoundproc = -1; // TODO: Store how many processes contributed to the output, and // expect exactly that many files int const myproc = CCTK_MyProc(cctkGH); int const nprocs = CCTK_nProcs(cctkGH); int const ioproc_every = max_nioprocs == 0 ? 1 : (nprocs + max_nioprocs - 1) / max_nioprocs; assert(ioproc_every > 0); int const nioprocs = nprocs / ioproc_every; assert(nioprocs > 0 and nioprocs <= max_nioprocs); int const myioproc = myproc / ioproc_every * ioproc_every; if (myproc == myioproc) { // Loop over all (possible) files for (int ioproc = myioproc / ioproc_every; ; ioproc += nioprocs) { string const name = create_filename(cctkGH, basename, cctkGH->cctk_iteration, ioproc, in_recovery ? io_dir_recover : io_dir_input, false); bool file_exists; H5E_BEGIN_TRY { file_exists = H5Fis_hdf5(name.c_str()) > 0; } H5E_END_TRY; if (not file_exists) { notfoundproc = ioproc; break; } foundproc = ioproc; indent_t indent; cout << indent << "I/O process=" << ioproc << "\n"; hid_t const file = H5Fopen(name.c_str(), H5F_ACC_RDONLY, H5P_DEFAULT); assert(file >= 0); // Iterate over all time slices bool const input_past_timelevels = in_recovery; // TODO: read metadata when recoverying parameters bool const input_metadata = false; input(cctkGH, file, input_var, input_past_timelevels, input_metadata, scatter); // Close file herr = H5Fclose(file); assert(not herr); } } { int maxfoundproc; MPI_Allreduce(&foundproc, &maxfoundproc, 1, MPI_INT, MPI_MAX, dist::comm()); if (maxfoundproc == -1) { string const name = create_filename(cctkGH, basename, cctkGH->cctk_iteration, notfoundproc, in_recovery ? io_dir_recover : io_dir_input, false); CCTK_VWarn(CCTK_WARN_ALERT, __LINE__, __FILE__, CCTK_THORNSTRING, "Could not read input file \"%s\"", name.c_str()); return 1; } if (notfoundproc > -1 and notfoundproc <= maxfoundproc) { CCTK_VWarn(CCTK_WARN_ALERT, __LINE__, __FILE__, CCTK_THORNSTRING, "Could not read file of process %d (but could read file of process %d)", notfoundproc, maxfoundproc); } } // The scatter object is implicitly destroyed here } END_GLOBAL_MODE; return 0; // no error } int CarpetIOF5_RecoverParameters() { #if 0 return IOUtil_RecoverParameters(Input, ".f5", "F5"); #endif DECLARE_CCTK_PARAMETERS; char const *const IO_dir = IO_recover_dir; char const *const F5_dir = recover_dir; bool const use_IO_dir = strcmp(F5_dir, "") == 0; char const *const my_recover_dir = use_IO_dir ? IO_dir : F5_dir; DIR *const dir = opendir(my_recover_dir); if (not dir) { // The recovery directory does not exist if (CCTK_Equals(recover, "autoprobe")) { // This is harmless when "autoprobe" is used CCTK_VInfo(CCTK_THORNSTRING, "Recovery directory \"%s\" doesn't exist", my_recover_dir); return 0; } else { // This is an error when "auto" is used CCTK_VWarn(CCTK_WARN_ALERT, __LINE__, __FILE__, CCTK_THORNSTRING, "Recovery directory \"%s\" doesn't exist", my_recover_dir); return -2; } } // Get the list of potential recovery files char const *const my_recover_file = "checkpoint"; string const prefix = string(my_recover_file) + ".i"; string const infix = ".p"; string const suffix = ".f5"; assert(recovery_iteration < 0); while (dirent *const file = readdir(dir)) { char *p = file->d_name; // First check the file prefix if (prefix.compare(0, prefix.length(), p, prefix.length()) != 0) continue; p += prefix.length(); // Now check if there is an iteration number following the file // prefix int const iter = strtol(p, &p, 10); if (!*p) continue; // Read the process number if (infix.compare(0, infix.length(), p, infix.length()) != 0) continue; p += infix.length(); int const proc = strtol(p, &p, 10); if (!*p) continue; // Finally check the file extension if (suffix.compare(0, suffix.length(), p, suffix.length()) != 0) continue; p += suffix.length(); // Check whether we read the whole string if (*p) continue; // Found a recovery file by that basename recovery_iteration = max(recovery_iteration, iter); } closedir(dir); // There is no recovery file if (recovery_iteration < 0) { if (CCTK_Equals(recover, "autoprobe")) { // This is harmless when "autoprobe" is used CCTK_VInfo(CCTK_THORNSTRING, "No F5 checkpoint files with basefilename \"%s\" found in " "recovery directory \"%s\"", my_recover_file, my_recover_dir); return 0; } else { // This is an error when "auto" is used CCTK_VWarn(CCTK_WARN_ALERT, __LINE__, __FILE__, CCTK_THORNSTRING, "No F5 checkpoint files with basefilename \"%s\" found in " "recovery directory \"%s\"", my_recover_file, my_recover_dir); return -1; } } return Input(NULL, "checkpoint", CP_RECOVER_PARAMETERS); } } // end namespace CarpetIOF5