diff options
Diffstat (limited to 'src/Startup.c')
-rw-r--r-- | src/Startup.c | 353 |
1 files changed, 213 insertions, 140 deletions
diff --git a/src/Startup.c b/src/Startup.c index 779a051..7404bda 100644 --- a/src/Startup.c +++ b/src/Startup.c @@ -1,213 +1,286 @@ /*@@ @file Startup.c - @date Fri May 21 1999 + @date Mon Jun 19 2000 @author Thomas Radke @desc - Startup routines for StreamedHDF5. + Startup and termination routines for IOStreamedHDF5. @enddesc - @history - @hauthor Thomas Radke @hdate May 21 1999 - @hdesc Just copied from thorn FlexIO. - @endhistory + @version $Id$ @@*/ -#include <stdio.h> #include "cctk.h" #include "cctk_Parameters.h" -#include "CactusBase/IOUtil/src/ioGH.h" #include "BetaThorns/Socket/src/SocketUtils.h" -#include "StreamedHDF5GH.h" +#include "CactusBase/IOUtil/src/ioutil_CheckpointRecovery.h" +#include "ioStreamedHDF5GH.h" #ifdef HAVE_UNISTD_H #include <unistd.h> #endif +/* the rcs ID and its dummy function to use it */ +static char *rcsid = "$Id$"; +CCTK_FILEVERSION(BetaThorns_IOStreamedHDF5_Startup_c) -/* prototypes of functions to be registered */ -int StreamedHDF5_OutputGH(cGH *GH); -int StreamedHDF5_TriggerOutput(cGH *GH, int); -int StreamedHDF5_TimeFor(cGH *GH, int); -int StreamedHDF5_OutputVarAs(cGH *GH, const char *var, const char *alias); /* local function prototypes */ -static void *StreamedHDF5_SetupGH (tFleshConfig *config, - int convergence_level, - cGH *GH); -static int StreamedHDF5_InitGH (cGH *GH); +static void *IOStreamedHDF5_SetupGH (tFleshConfig *config, + int convergence_level, + cGH *GH); /*@@ - @routine StreamedHDF5_Startup - @date Fri May 21 1999 + @routine IOStreamedHDF5_Startup + @date Mon Jun 19 2000 @author Thomas Radke @desc - The startup registration routine for StreamedHDF5. - Registers the GH extensions needed for StreamedHDF5 and - the registerable routines used for each method of StreamedHDF5. - StreamedHDF5 does not overload any functions. + The startup registration routine for IOStreamedHDF5. + Registers the GH extensions needed for IOStreamedHDF5 + along with its setup routine. @enddesc - @calls - @calledby CCTK scheduler at STARTUP - @history - - @endhistory + + @calls CCTK_RegisterGHExtensionSetupGH @@*/ -void StreamedHDF5_Startup (void) +void IOStreamedHDF5_Startup (void) { - int IO_GHExtension; - - - if (CCTK_GHExtensionHandle ("IO") < 0) + /* check that thorn IOHDF5Util was activated */ + if (CCTK_GHExtensionHandle ("IOHDF5Util") < 0) { - CCTK_WARN (1, "Thorn IOUtil was not activated. " - "No StreamedHDF5 IO methods will be enabled."); + CCTK_WARN (1, "Thorn IOHDF5Util was not activated. " + "No IOStreamedHDF5 IO methods will be registered."); return; } - /* this check can go as soon as the Stream VFD - comes with the default HDF5 installation */ -#ifndef H5FDstream_H - CCTK_WARN (1, "The Stream VFD is not included in the configured HDF5 " - "installation. No HDF5 stream output will be available !"); + /* FIXME: this check can go as soon as the Stream VFD + comes with the default HDF5 installation */ +#ifndef H5_HAVE_STREAM + CCTK_WARN (1, "The Stream VFD was not configured in your HDF5 installation. " + "No HDF5 streaming IO will be available !"); #else - IO_GHExtension = CCTK_RegisterGHExtension ("StreamedHDF5"); - CCTK_RegisterGHExtensionSetupGH (IO_GHExtension, StreamedHDF5_SetupGH); - CCTK_RegisterGHExtensionInitGH (IO_GHExtension, StreamedHDF5_InitGH); + CCTK_RegisterGHExtensionSetupGH (CCTK_RegisterGHExtension ("IOStreamedHDF5"), + IOStreamedHDF5_SetupGH); #endif } /*@@ - @routine StreamedHDF5_Terminate + @routine IOStreamedHDF5_Terminate @date Mon Jun 19 2000 @author Thomas Radke @desc - StreamedHDF5's termination routine - Closes the socket if it was opened before. + IOStreamedHDF5's termination routine + Closes all open sockets and destroys the timers. @enddesc - @calls - @calledby CCTK scheduler at TERMINATE - @history - - @endhistory + + @calls CCTK_TimerDestroyI + + @var GH + @vdesc Pointer to CCTK grid hierarchy + @vtype cGH * + @vio in + @endvar @@*/ -void StreamedHDF5_Terminate (cGH *GH) +void IOStreamedHDF5_Terminate (cGH *GH) { - int handle; - StreamedHDF5GH *myGH; + int i; + ioStreamedHDF5GH *myGH; - handle = CCTK_GHExtensionHandle ("StreamedHDF5"); - myGH = handle >= 0 ? (StreamedHDF5GH *) GH->extensions [handle] : NULL; + myGH = (ioStreamedHDF5GH *) CCTK_GHExtension (GH, "IOStreamedHDF5"); if (myGH) { - if (myGH->socket >= 0) + /* close the data and checkpoint output sockets */ + if (myGH->data_socket >= 0) { - close (myGH->socket); + close (myGH->data_socket); + } + if (myGH->checkpoint_socket >= 0) + { + close (myGH->checkpoint_socket); + } + + /* release allocated timers */ + if (myGH->print_timing_info) + { + for (i = 0; i < IOHDF5_NUM_TIMERS; i++) + { + CCTK_TimerDestroyI (myGH->timers[i]); + } } } } -/*************************** local routines *********************************/ -static void *StreamedHDF5_SetupGH (tFleshConfig *config, int convergence_level, cGH *GH) +/****************************************************************************/ +/* local routines */ +/****************************************************************************/ + /*@@ + @routine IOStreamedHDF5_SetupGH + @date Mon Jun 19 2000 + @author Thomas Radke + @desc + Allocates and sets up IOStreamedHDF5's GH extension structure + @enddesc + + @calls CCTK_RegisterIOMethod + CCTK_RegisterIOMethodOutputGH + CCTK_RegisterIOMethodOutputVarAs + CCTK_RegisterIOMethodTimeToOutput + CCTK_RegisterIOMethodTriggerOutput + Socket_TCPOpenServerSock + Socket_SetNonBlocking + CCTK_TimerCreateI + CCTK_TimerDestroyI + CCTK_TimerResetI + + @var config + @vdesc the CCTK configuration as provided by the flesh + @vtype tFleshConfig * + @vio unused + @endvar + @var convergence_level + @vdesc the convergence level + @vtype int + @vio unused + @endvar + @var GH + @vdesc Pointer to CCTK grid hierarchy + @vtype cGH * + @vio in + @endvar + + @returntype void * + @returndesc + pointer to the allocated GH extension structure + @endreturndesc +@@*/ +static void *IOStreamedHDF5_SetupGH (tFleshConfig *config, + int convergence_level, + cGH *GH) { DECLARE_CCTK_PARAMETERS - int IOMethod; - int numvars, socket; - StreamedHDF5GH *newGH; + int i; + int myproc; + int numvars; + ioStreamedHDF5GH *myGH; - if (CCTK_MyProc (GH) == 0) - { - socket = Socket_TCPOpenServerSock (port); - if (socket < 0) - { - CCTK_VWarn (1, __LINE__, __FILE__, CCTK_THORNSTRING, - "Couldn't open TCP server socket on port %d. No HDF5 stream " - "output available !", port); - return (NULL); - } + myproc = CCTK_MyProc (GH); - if (Socket_SetNonBlocking (socket) < 0) - { - CCTK_WARN (1, "Couldn't set socket into non-blocking mode. No HDF5 " - "stream output available !"); - close (socket); - return (NULL); - } - } - else + /* Register IOStreamedHDF5 routines as output methods */ + i = CCTK_RegisterIOMethod ("IOStreamedHDF5"); + CCTK_RegisterIOMethodOutputGH (i, IOStreamedHDF5_OutputGH); + CCTK_RegisterIOMethodOutputVarAs (i, IOStreamedHDF5_OutputVarAs); + CCTK_RegisterIOMethodTimeToOutput (i, IOStreamedHDF5_TimeFor); + CCTK_RegisterIOMethodTriggerOutput (i, IOStreamedHDF5_TriggerOutput); + + /* Register the IOStreamedHDF5 recovery routine to thorn IOUtil */ + if (IOUtil_RegisterRecover ("IOStreamedHDF5 recovery", + IOStreamedHDF5_Recover) < 0) { - socket = -1; + CCTK_WARN (1, "Failed to register IOStreamedHDF5 recovery routine"); } - /* Register StreamedHDF5 routines as output methods */ - IOMethod = CCTK_RegisterIOMethod ("StreamedHDF5"); - CCTK_RegisterIOMethodOutputGH (IOMethod, StreamedHDF5_OutputGH); - CCTK_RegisterIOMethodOutputVarAs (IOMethod, StreamedHDF5_OutputVarAs); - CCTK_RegisterIOMethodTimeToOutput (IOMethod, StreamedHDF5_TimeFor); - CCTK_RegisterIOMethodTriggerOutput (IOMethod, StreamedHDF5_TriggerOutput); - numvars = CCTK_NumVars (); + myGH = (ioStreamedHDF5GH *) malloc (sizeof (ioStreamedHDF5GH)); + myGH->do_output = (char *) malloc (numvars * sizeof (char)); + myGH->geo_output = (ioHDF5Geo_t*) malloc (numvars * sizeof (ioHDF5Geo_t)); + myGH->out_last = (int *) malloc (numvars * sizeof (int)); - newGH = (StreamedHDF5GH *) malloc (sizeof (StreamedHDF5GH)); - newGH->do_output = (char *) malloc (numvars * sizeof (char)); - newGH->geo_output = (StreamGeo_t*) malloc (numvars * sizeof (StreamGeo_t)); - newGH->out_last = (int *) malloc (numvars * sizeof (int)); - newGH->socket = socket; - - /* save the original error printing routine and its argument */ - CACTUS_IOHDF5_ERROR (H5Eget_auto (&newGH->printErrorFn, - &newGH->printErrorFnArg)); - - /* predefine dataspaces for writing scalar and array attributes */ - /* the dimension of the array dataspace is set when used */ - CACTUS_IOHDF5_ERROR (newGH->scalarDataspace = H5Screate (H5S_SCALAR)); - CACTUS_IOHDF5_ERROR (newGH->arrayDataspace = H5Screate (H5S_SIMPLE)); - - /* predefine a IOHDF5_COMPLEX datatype */ - CACTUS_IOHDF5_ERROR (newGH->IOHDF5_COMPLEX = - H5Tcreate (H5T_COMPOUND, sizeof (CCTK_COMPLEX))); - CACTUS_IOHDF5_ERROR (H5Tinsert (newGH->IOHDF5_COMPLEX, "real", - offsetof (CCTK_COMPLEX, Re), IOHDF5_REAL)); - CACTUS_IOHDF5_ERROR (H5Tinsert (newGH->IOHDF5_COMPLEX, "imag", - offsetof (CCTK_COMPLEX, Im), IOHDF5_REAL)); - - /* predefine a C string datatype */ - CACTUS_IOHDF5_ERROR (newGH->IOHDF5_STRING = H5Tcopy (H5T_C_S1)); - - return (newGH); -} - - -static int StreamedHDF5_InitGH (cGH *GH) -{ - DECLARE_CCTK_PARAMETERS - int i, handle; - StreamedHDF5GH *myGH; - + for (i = 0; i < numvars; i++) + { + myGH->out_last[i] = -1; + } - handle = CCTK_GHExtensionHandle ("StreamedHDF5"); - myGH = handle >= 0 ? (StreamedHDF5GH *) GH->extensions [handle] : NULL; - if (handle < 0 || myGH == NULL) - { - CCTK_WARN (1, "Couldn't access GH extension handle\n"); - return (-1); + /* only processor 0 is doing socket I/O */ + myGH->data_socket = -1; + if (myproc == 0) + { + myGH->data_socket = Socket_TCPOpenServerSock (data_port); + if (myGH->data_socket < 0) + { + CCTK_VWarn (1, __LINE__, __FILE__, CCTK_THORNSTRING, + "Couldn't open TCP server socket on output port %d. " + "No HDF5 streaming output will be available !", data_port); + } + else if (Socket_SetNonBlocking (myGH->data_socket) < 0) + { + CCTK_WARN (1, "Couldn't set output socket into non-blocking mode. " + "No HDF5 streaming output will be available !"); + close (myGH->data_socket); + myGH->data_socket = -1; + } } - /* How often to output */ - myGH->out_every = out_every > 0 ? out_every : -1; - if (outHDF5_every > 0) + /* create timers if timing info was requested */ + myGH->print_timing_info = print_timing_info; + if (myGH->print_timing_info) { - myGH->out_every = outHDF5_every; + for (i = 0; i < IOHDF5_NUM_TIMERS; i++) + { + if ((myGH->timers[i] = CCTK_TimerCreateI ()) < 0) + { + break; + } + } + if (i != IOHDF5_NUM_TIMERS) + { + CCTK_WARN (1, "Could not create timers for checkpoint/recovery ! " + "No timer information will be available."); + while (--i >= 0) + { + CCTK_TimerDestroyI (myGH->timers[i]); + } + myGH->print_timing_info = 0; + } + else + { + CCTK_TimerResetI (myGH->timers[CP_TOTAL_TIMER]); + CCTK_TimerResetI (myGH->timers[RECOVERY_TIMER]); + } } - for (i = 0; i < CCTK_NumVars (); i++) + /* FIXME: this check can go as soon as the Stream VFD + comes with the default HDF5 installation */ +#ifdef H5_HAVE_STREAM + /* only processor 0 is doing socket I/O */ + myGH->checkpoint_socket = -1; + myGH->checkpoint_fapl = -1; + if (myproc == 0) { - myGH->out_last [i] = -1; + myGH->checkpoint_socket = Socket_TCPOpenServerSock (checkpoint_port); + if (myGH->checkpoint_socket < 0) + { + CCTK_VWarn (1, __LINE__, __FILE__, CCTK_THORNSTRING, + "Couldn't open TCP server socket on checkpoint port %d. " + "No IOStreamedHDF5 checkpointing will be available !", + checkpoint_port); + } + else if (Socket_SetNonBlocking (myGH->checkpoint_socket) < 0) + { + CCTK_WARN (1, "Couldn't set checkpoint socket into non-blocking mode. " + "No IOStreamedHDF5 checkpointing will be available !"); + close (myGH->checkpoint_socket); + myGH->checkpoint_socket = -1; + } + else + { + H5FD_stream_fapl_t fapl; + + + /* setup file access property list and select Stream VFD */ + fapl.increment = 0; + fapl.socket = myGH->checkpoint_socket; + fapl.do_socket_io = 1; + fapl.backlog = 5; + fapl.broadcast_fn = NULL; + fapl.broadcast_arg = NULL; + IOHDF5_ERROR (myGH->checkpoint_fapl = H5Pcreate (H5P_FILE_ACCESS)); + IOHDF5_ERROR (H5Pset_fapl_stream (myGH->checkpoint_fapl, &fapl)); + } } +#endif - return (0); + return (myGH); } |