/*@@ @file Startup.c @date Mon Jun 19 2000 @author Thomas Radke @desc Startup and termination routines for IOStreamedHDF5. @enddesc @version $Id$ @@*/ #include #include "cctk.h" #include "cctk_Parameters.h" #include "util_Network.h" #include "SocketUtils.h" #include "CactusBase/IOUtil/src/ioutil_AdvertisedFiles.h" #include "CactusBase/IOUtil/src/ioutil_CheckpointRecovery.h" #include "ioStreamedHDF5GH.h" /* the rcs ID and its dummy function to use it */ static const char *rcsid = "$Header$"; CCTK_FILEVERSION(CactusPUGHIO_IOStreamedHDF5_Startup_c) /******************************************************************** ******************** Macro Definitions ************************ ********************************************************************/ #define FILENAME_TEMPLATE "fileXXXXXX" /******************************************************************** ******************** External Routines ************************ ********************************************************************/ void IOStreamedHDF5_Startup (void); void IOStreamedHDF5_Terminate (const cGH *GH); /******************************************************************** ******************** Internal Routines ************************ ********************************************************************/ static void *SetupGH (tFleshConfig *config, int conv_level, cGH *GH); /*@@ @routine IOStreamedHDF5_Startup @date Mon Jun 19 2000 @author Thomas Radke @desc The startup registration routine for IOStreamedHDF5. Registers the GH extensions needed for IOStreamedHDF5 along with its setup routine. @enddesc @calls CCTK_RegisterGHExtension CCTK_RegisterGHExtensionSetupGH @@*/ void IOStreamedHDF5_Startup (void) { CCTK_RegisterGHExtensionSetupGH (CCTK_RegisterGHExtension ("IOStreamedHDF5"), SetupGH); } /*@@ @routine IOStreamedHDF5_Terminate @date Mon Jun 19 2000 @author Thomas Radke @desc IOStreamedHDF5's termination routine Closes all open sockets and destroys the timers. @enddesc @calls CCTK_TimerDestroyI @var GH @vdesc Pointer to CCTK grid hierarchy @vtype const cGH * @vio in @endvar @@*/ void IOStreamedHDF5_Terminate (const cGH *GH) { int i; ioStreamedHDF5GH *myGH; myGH = CCTK_GHExtension (GH, "IOStreamedHDF5"); if (myGH) { /* close the data and checkpoint output sockets */ if (myGH->data_socket != INVALID_SOCKET) { Socket_CloseSocket (myGH->data_socket); } if (myGH->checkpoint_socket != INVALID_SOCKET) { Socket_CloseSocket (myGH->checkpoint_socket); } /* release allocated timers */ if (myGH->print_timing_info) { for (i = 0; i < IOHDF5_NUM_TIMERS; i++) { CCTK_TimerDestroyI (myGH->timers[i]); } } /* remove advertised file */ if (strcmp (myGH->advertised_filename, FILENAME_TEMPLATE)) { remove (myGH->advertised_filename); } } } /******************************************************************** ******************** Internal Routines ************************ ********************************************************************/ /*@@ @routine SetupGH @date Mon Jun 19 2000 @author Thomas Radke @desc Allocates and sets up IOStreamedHDF5's GH extension structure @enddesc @calls CCTK_RegisterIOMethod CCTK_RegisterIOMethodOutputGH CCTK_RegisterIOMethodOutputVarAs CCTK_RegisterIOMethodTimeToOutput CCTK_RegisterIOMethodTriggerOutput Socket_TCPOpenServerSocket Socket_SetNonBlocking IOUtil_AdvertiseFile Util_GetHostName CCTK_TimerCreate CCTK_TimerDestroyI CCTK_TimerResetI @var config @vdesc the CCTK configuration as provided by the flesh @vtype tFleshConfig * @vio unused @endvar @var conv_level @vdesc the convergence level @vtype int @vio unused @endvar @var GH @vdesc Pointer to CCTK grid hierarchy @vtype cGH * @vio in @endvar @returntype void * @returndesc pointer to the allocated GH extension structure @endreturndesc @@*/ static void *SetupGH (tFleshConfig *config, int conv_level, cGH *GH) { int i, myproc, numvars; char hostname[256]; ioStreamedHDF5GH *myGH; H5FD_stream_fapl_t fapl; FILE *advertised_file_fd; ioAdvertisedFileDesc advertised_file; const char *timer_names[4] = {"IOStreamedHDF5 time to dump parameters", "IOStreamedHDF5 time to dump variables", "IOStreamedHDF5 total time to checkpoint", "IOStreamedHDF5 time to recover"}; DECLARE_CCTK_PARAMETERS /* suppress compiler warnings about unused variables */ (void) (config + 0); (void) (conv_level + 0); /* register IOStreamedHDF5 routines as a new I/O method */ i = CCTK_RegisterIOMethod ("IOStreamedHDF5"); CCTK_RegisterIOMethodOutputGH (i, IOStreamedHDF5_OutputGH); CCTK_RegisterIOMethodOutputVarAs (i, IOStreamedHDF5_OutputVarAs); CCTK_RegisterIOMethodTimeToOutput (i, IOStreamedHDF5_TimeFor); CCTK_RegisterIOMethodTriggerOutput (i, IOStreamedHDF5_TriggerOutput); /* register the IOStreamedHDF5 recovery routine to thorn IOUtil */ if (IOUtil_RegisterRecover ("IOStreamedHDF5 recovery", IOStreamedHDF5_Recover) < 0) { CCTK_WARN (1, "Failed to register IOStreamedHDF5 recovery routine"); } /* allocate a new GH extension structure */ numvars = CCTK_NumVars (); myGH = malloc (sizeof (ioStreamedHDF5GH)); myGH->requests = calloc (numvars, sizeof (ioRequest *)); myGH->out_last = malloc (numvars * sizeof (int)); strcpy (myGH->advertised_filename, FILENAME_TEMPLATE); myGH->out_vars = strdup (""); myGH->out_every_default = out_every - 1; for (i = 0; i < numvars; i++) { myGH->out_last[i] = -1; } myGH->stop_on_parse_errors = strict_io_parameter_check; if (! CCTK_Equals (verbose, "none")) { CCTK_INFO ("I/O Method 'IOStreamedHDF5' registered: streamed HDF5 output " "of grid variables and hyperslabs thereof"); } IOStreamedHDF5_CheckSteerableParameters (GH, myGH); myGH->stop_on_parse_errors = 0; /* only processor 0 is doing socket I/O */ myGH->data_socket = INVALID_SOCKET; myproc = CCTK_MyProc (GH); if (myproc == 0) { myGH->data_socket = Socket_TCPOpenServerSocket (data_port, &myGH->data_port, 1); if (myGH->data_socket == INVALID_SOCKET) { CCTK_VWarn (1, __LINE__, __FILE__, CCTK_THORNSTRING, "Couldn't open TCP server socket on output port %d. " "No HDF5 streaming output will be available !", data_port); } else if (Socket_SetNonBlocking (myGH->data_socket) < 0) { CCTK_WARN (1, "Couldn't set output socket into non-blocking mode. " "No HDF5 streaming output will be available !"); Socket_CloseSocket (myGH->data_socket); myGH->data_socket = INVALID_SOCKET; } else { Util_GetHostName (hostname, sizeof (hostname)); CCTK_VInfo (CCTK_THORNSTRING, "HDF5 data streaming service started on\n" " %s:%u", hostname, myGH->data_port); /* write the hostname/portnumber information in a temporary file which gets advertised and then can be downloaded */ #ifdef HAVE_MKSTEMP advertised_file_fd = fdopen (mkstemp (myGH->advertised_filename), "w"); #else advertised_file_fd = tmpnam (myGH->advertised_filename) ? fopen (myGH->advertised_filename, "w") : NULL; #endif if (advertised_file_fd) { fprintf (advertised_file_fd, "Hostname: %s\n", hostname); fprintf (advertised_file_fd, "Data port: %d", data_port); fclose (advertised_file_fd); advertised_file.slice = ""; advertised_file.thorn = CCTK_THORNSTRING; advertised_file.varname = "All variables"; advertised_file.description = "Streamed HDF5 data"; advertised_file.mimetype = "data/x-streamed-hdf5"; IOUtil_AdvertiseFile (GH, myGH->advertised_filename,&advertised_file); } else { CCTK_WARN (2, "Couldn't create unique temporary file ! " "HDF5 data streaming was not advertised."); } } } /* create timers if timing info was requested */ myGH->print_timing_info = print_timing_info; if (myGH->print_timing_info) { for (i = 0; i < IOHDF5_NUM_TIMERS; i++) { if ((myGH->timers[i] = CCTK_TimerCreate (timer_names[i])) < 0) { break; } } if (i != IOHDF5_NUM_TIMERS) { CCTK_WARN (1, "Could not create timers for checkpoint/recovery ! " "No timer information will be available."); while (--i >= 0) { CCTK_TimerDestroyI (myGH->timers[i]); } myGH->print_timing_info = 0; } else { CCTK_TimerResetI (myGH->timers[CP_TOTAL_TIMER]); CCTK_TimerResetI (myGH->timers[RECOVERY_TIMER]); } } /* only processor 0 is doing socket I/O */ myGH->checkpoint_socket = INVALID_SOCKET; myGH->checkpoint_fapl = -1; if (myproc == 0) { myGH->checkpoint_socket = Socket_TCPOpenServerSocket (checkpoint_port, &myGH->checkpoint_port, 1); if (myGH->checkpoint_socket == INVALID_SOCKET) { CCTK_VWarn (1, __LINE__, __FILE__, CCTK_THORNSTRING, "Couldn't open TCP server socket on checkpoint port %d. " "No IOStreamedHDF5 checkpointing will be available !", checkpoint_port); } else if (Socket_SetNonBlocking (myGH->checkpoint_socket) < 0) { CCTK_WARN (1, "Couldn't set checkpoint socket into non-blocking mode. " "No IOStreamedHDF5 checkpointing will be available !"); Socket_CloseSocket (myGH->checkpoint_socket); myGH->checkpoint_socket = INVALID_SOCKET; } else { /* setup file access property list and select Stream VFD */ fapl.increment = 0; fapl.socket = myGH->checkpoint_socket; fapl.do_socket_io = 1; fapl.backlog = 5; fapl.broadcast_fn = NULL; fapl.broadcast_arg = NULL; HDF5_ERROR (myGH->checkpoint_fapl = H5Pcreate (H5P_FILE_ACCESS)); HDF5_ERROR (H5Pset_fapl_stream (myGH->checkpoint_fapl, &fapl)); Util_GetHostName (hostname, sizeof (hostname)); CCTK_VInfo (CCTK_THORNSTRING, "HDF5 checkpoint streaming service started on\n" " %s:%u", hostname, myGH->checkpoint_port); } } return (myGH); }