diff options
author | Thomas Radke <tradke@aei.mpg.de> | 2005-06-27 14:17:00 +0000 |
---|---|---|
committer | Thomas Radke <tradke@aei.mpg.de> | 2005-06-27 14:17:00 +0000 |
commit | d2552f96b45a95001e024a05923373344e851f1a (patch) | |
tree | cee8146af232b658deeb8c36bf83709852c3b60d /Carpet/CarpetIOStreamedHDF5/src | |
parent | c6bcafe52e9dc27e6958b6e3a1f3cf524ad4f0da (diff) |
CarpetIOStreamedHDF5: new thorn which provides streamed HDF5 data output
darcs-hash:20050627141701-776a0-cba926db30167a725a6dad6b584a31fa5476b5e0.gz
Diffstat (limited to 'Carpet/CarpetIOStreamedHDF5/src')
4 files changed, 500 insertions, 0 deletions
diff --git a/Carpet/CarpetIOStreamedHDF5/src/CarpetIOStreamedHDF5.cc b/Carpet/CarpetIOStreamedHDF5/src/CarpetIOStreamedHDF5.cc new file mode 100644 index 000000000..65e8ccdbf --- /dev/null +++ b/Carpet/CarpetIOStreamedHDF5/src/CarpetIOStreamedHDF5.cc @@ -0,0 +1,442 @@ +#include <assert.h> + +#include "cctk.h" +#include "cctk_Arguments.h" +#include "cctk_Parameters.h" +#include "util_String.h" +#include "util_Network.h" + +#ifdef HAVE_SYS_TIME_H +#include <sys/time.h> +#endif +#ifdef HAVE_SYS_TYPES_H +#include <sys/types.h> +#endif +#ifdef HAVE_UNISTD_H +#include <unistd.h> +#endif + +#include "CactusBase/IOUtil/src/ioGH.h" + +#include "CarpetIOStreamedHDF5.hh" + + +namespace CarpetIOStreamedHDF5 +{ + +using namespace std; +using namespace Carpet; +using namespace CarpetIOHDF5; + + +// Variable definitions +static vector<vector<vector<int> > > last_output; // [ml][rl][var] + +// registered GH extension setup routine +static void* SetupGH (tFleshConfig* const fleshconfig, + const int convLevel, cGH* const cctkGH); + +// callbacks for CarpetIOStreamedHDF5's I/O method +static int OutputGH (const cGH* const cctkGH); +static int TimeToOutput (const cGH* const cctkGH, const int vindex); +static int OutputVar (const cGH* const cctkGH, const ioRequest* const request, + hid_t file); + +static void CheckSteerableParameters (const cGH *const cctkGH, + CarpetIOStreamedHDF5GH *myGH); + +////////////////////////////////////////////////////////////////////////////// +// public routines +////////////////////////////////////////////////////////////////////////////// + +void CarpetIOStreamedHDF5_Startup (void) +{ + CCTK_RegisterBanner ("AMR streamed HDF5 output " + "provided by CarpetIOStreamedHDF5"); + + const int GHExtension = CCTK_RegisterGHExtension (CCTK_THORNSTRING); + CCTK_RegisterGHExtensionSetupGH (GHExtension, SetupGH); +} + + +void CarpetIOStreamedHDF5_Init (const cGH* const cctkGH) +{ + DECLARE_CCTK_ARGUMENTS; + + *this_iteration = -1; + *next_output_iteration = 0; + *next_output_time = cctk_time; +} + + +// close the socket used for HDF5 streaming +void CarpetIOStreamedHDF5_Terminate (const cGH* const cctkGH) +{ + CarpetIOStreamedHDF5GH* myGH = + (CarpetIOStreamedHDF5GH*) CCTK_GHExtension (cctkGH, CCTK_THORNSTRING); + + Socket_CloseSocket (myGH->socket); +} + + +////////////////////////////////////////////////////////////////////////////// +// private routines +////////////////////////////////////////////////////////////////////////////// +static void* SetupGH (tFleshConfig* const fleshconfig, + const int convLevel, cGH* const cctkGH) +{ + DECLARE_CCTK_PARAMETERS; + + // processor 0 opens a socket on the given port to listen for clients + unsigned int real_port; + SOCKET real_socket = INVALID_SOCKET; + if (dist::rank() == 0) { + + real_socket = Socket_TCPOpenServerSocket (port, &real_port, 1); + + if (real_socket == INVALID_SOCKET) { + + CCTK_VWarn (1, __LINE__, __FILE__, CCTK_THORNSTRING, + "Couldn't open TCP server socket on output port %d. " + "No HDF5 streaming output will be available !", port); + + } else if (Socket_SetNonBlocking (real_socket) < 0) { + + CCTK_WARN (1, "Couldn't set output socket into non-blocking mode. " + "No HDF5 streaming output will be available !"); + Socket_CloseSocket (real_socket); + real_socket = INVALID_SOCKET; + + } + } + + // broadcast success of socket operation to all processors + int have_socket = real_socket != INVALID_SOCKET; + MPI_Bcast (&have_socket, 1, MPI_INT, 0, dist::comm); + + // do not continue here if no socket is available + if (not have_socket) { + return (NULL); + } + + if (dist::rank() == 0) { + char hostname[256]; + Util_GetHostName (hostname, sizeof (hostname)); + CCTK_VInfo (CCTK_THORNSTRING, + "data streaming service started on '%s:%u'", hostname, port); + } + + // register CarpetIOStreamedHDF5's routines as a new I/O method + const int IOMethod = CCTK_RegisterIOMethod ("IOStreamedHDF5"); + CCTK_RegisterIOMethodOutputGH (IOMethod, OutputGH); +#if 0 + // for now only register the OutputGH() callback + CCTK_RegisterIOMethodOutputVarAs (IOMethod, OutputVarAs); + CCTK_RegisterIOMethodTimeToOutput (IOMethod, TimeToOutput); + CCTK_RegisterIOMethodTriggerOutput (IOMethod, TriggerOutput); +#endif + + if (not CCTK_Equals (verbose, "none")) { + CCTK_INFO ("I/O Method 'IOStreamedHDF5' registered: AMR streamed HDF5 " + "output of grid variables"); + } + + const int numvars = CCTK_NumVars (); + + // allocate a new GH extension structure + CarpetIOStreamedHDF5GH* myGH = new CarpetIOStreamedHDF5GH; + + myGH->out_last.resize(numvars); + for (int i = 0; i < numvars; i++) { + myGH->out_last[i] = -1; + } + myGH->requests.resize(numvars); + myGH->out_vars = strdup (""); + myGH->out_every_default = out_every - 1; + + // initial I/O parameter check + myGH->stop_on_parse_errors = strict_io_parameter_check; + CheckSteerableParameters (cctkGH, myGH); + myGH->stop_on_parse_errors = 0; + + // no iterations have yet been output + last_output.resize(mglevels); + for (int i = 0; i < mglevels; i++) { + last_output[i].resize (maxreflevels); + for (int j = 0; j < maxreflevels; j++) { + last_output[i][j].resize (numvars, INT_MIN); + } + } + + myGH->socket = real_socket; + myGH->port = real_port; + + return (myGH); +} + + +static void CheckSteerableParameters (const cGH *const cctkGH, + CarpetIOStreamedHDF5GH *myGH) +{ + DECLARE_CCTK_PARAMETERS; + + // re-parse the 'IOHDF5::out_vars' parameter if it has changed + if (strcmp (out_vars, myGH->out_vars)) { + IOUtil_ParseVarsForOutput (cctkGH, CCTK_THORNSTRING, + "IOStreamedHDF5::out_vars", + myGH->stop_on_parse_errors, out_vars, + -1, &myGH->requests[0]); + + // notify the user about the new setting + if (not CCTK_Equals (verbose, "none")) { + char *msg = NULL; + for (int i = CCTK_NumVars () - 1; i >= 0; i--) { + if (myGH->requests[i]) { + char *fullname = CCTK_FullName (i); + if (not msg) { + Util_asprintf (&msg, "Periodic streamed HDF5 output requested " + "for '%s'", fullname); + } else { + Util_asprintf (&msg, "%s, '%s'", msg, fullname); + } + free (fullname); + } + } + if (msg) { + CCTK_INFO (msg); + free (msg); + } + } + + // save the last setting of 'IOHDF5::out_vars' parameter + free (myGH->out_vars); + myGH->out_vars = strdup (out_vars); + } +} + + +static int OutputGH (const cGH* const cctkGH) +{ + DECLARE_CCTK_PARAMETERS; + + CarpetIOStreamedHDF5GH *myGH = + (CarpetIOStreamedHDF5GH *) CCTK_GHExtension (cctkGH, CCTK_THORNSTRING); + + static hid_t file = -1; + static int client_is_ready = 0; + + // loop over all variables + for (int vindex = CCTK_NumVars () - 1; vindex >= 0; vindex--) { + if (TimeToOutput (cctkGH, vindex)) { + + // check if any client is ready to receive streamed data + // + // Even though this requires a broadcast operation, it should be + // by far less expensive than gathering all the data on processor 0 + // and find out afterwards that no client wants to have it. + if (not client_is_ready) { + if (file < 0 and dist::rank() == 0) { + fd_set read_set; + FD_ZERO (&read_set); + FD_SET (myGH->socket, &read_set); + + struct timeval timeout = {0, 0}; + client_is_ready = + select (FD_SETSIZE, &read_set, NULL, NULL, &timeout) == 1; + } + MPI_Bcast (&client_is_ready, 1, MPI_INT, 0, dist::comm); + } + + // short cut if there is nothing to do + if (not client_is_ready) { + continue; + } + + // when called the first time during the current iteration, + // open the file on processor 0 + if (file < 0 and dist::rank() == 0) { + + if (CCTK_Equals (verbose, "full")) { + CCTK_VInfo (CCTK_THORNSTRING, "Opening HDF5 output file on output " + "port %u", myGH->port); + } + + // set file access property list to use the Stream VFD and open the file + H5FD_stream_fapl_t fapl; + fapl.increment = 0; + fapl.socket = myGH->socket; + fapl.do_socket_io = 1; + fapl.backlog = 5; + fapl.broadcast_fn = NULL; + fapl.broadcast_arg = NULL; + + hid_t plist; + HDF5_ERROR (plist = H5Pcreate (H5P_FILE_ACCESS)); + HDF5_ERROR (H5Pset_fapl_stream (plist, &fapl)); + + // a filename is not used by Stream VFD + assert (file < 0); + HDF5_ERROR (file = H5Fcreate ("unused", H5F_ACC_TRUNC, H5P_DEFAULT, + plist)); + HDF5_ERROR (H5Pclose (plist)); + } + assert (dist::rank() or file >= 0); + + OutputVar (cctkGH, myGH->requests[vindex], file); + } + } + + // close an open file if the finest level has been output + if (reflevel == reflevels - 1) { + if (dist::rank() == 0 and file >= 0) { + if (CCTK_Equals (verbose, "full")) { + CCTK_VInfo (CCTK_THORNSTRING, "Closing HDF5 output file on port %u", + myGH->port); + } + HDF5_ERROR (H5Fclose (file)); + file = -1; + } + client_is_ready = 0; + } + + return (0); +} + + +static int TimeToOutput (const cGH* const cctkGH, const int vindex) +{ + DECLARE_CCTK_ARGUMENTS; + DECLARE_CCTK_PARAMETERS; + + const int numvars = CCTK_NumVars(); + assert (vindex>=0 and vindex<numvars); + + if (CCTK_GroupTypeFromVarI (vindex) != CCTK_GF and not do_global_mode) { + return 0; + } + + CarpetIOStreamedHDF5GH *myGH = + (CarpetIOStreamedHDF5GH *) CCTK_GHExtension (cctkGH, CCTK_THORNSTRING); + CheckSteerableParameters (cctkGH, myGH); + + // check if output for this variable was requested + if (not myGH->requests[vindex]) { + return (0); + } + + // check whether this refinement level should be output + if (not (myGH->requests[vindex]->refinement_levels & (1 << reflevel))) { + return (0); + } + + // check if output for this variable was requested individually + // by a "<varname>{ out_every = <number> }" option string + // this will overwrite the output criterion setting + const char *myoutcriterion = CCTK_EQUALS (out_criterion, "default") ? + io_out_criterion : out_criterion; + if (myGH->requests[vindex]->out_every >= 0) { + myoutcriterion = "divisor"; + } + + if (CCTK_EQUALS (myoutcriterion, "never")) { + return (0); + } + + // check whether to output at this iteration + bool output_this_iteration = false; + + if (CCTK_EQUALS (myoutcriterion, "iteration")) { + int myoutevery = out_every == -2 ? io_out_every : out_every; + if (myoutevery > 0) { + if (*this_iteration == cctk_iteration) { + // we already decided to output this iteration + output_this_iteration = true; + } else if (cctk_iteration >= *next_output_iteration) { + // it is time for the next output + output_this_iteration = true; + *this_iteration = cctk_iteration; + *next_output_iteration = cctk_iteration + myoutevery; + } + } + } else if (CCTK_EQUALS (myoutcriterion, "divisor")) { + int myoutevery = out_every == -2 ? io_out_every : out_every; + if (myGH->requests[vindex]->out_every >= 0) { + myoutevery = myGH->requests[vindex]->out_every; + } + if (myoutevery > 0 and (cctk_iteration % myoutevery) == 0) { + // we already decided to output this iteration + output_this_iteration = true; + } + } else if (CCTK_EQUALS (myoutcriterion, "time")) { + CCTK_REAL myoutdt = out_dt == -2 ? io_out_dt : out_dt; + if (myoutdt == 0 or *this_iteration == cctk_iteration) { + output_this_iteration = true; + } else if (myoutdt > 0 and (cctk_time / cctk_delta_time + >= *next_output_time / cctk_delta_time - 1.0e-12)) { + // it is time for the next output + output_this_iteration = true; + *this_iteration = cctk_iteration; + *next_output_time = cctk_time + myoutdt; + } + } + + if (not output_this_iteration) { + return 0; + } + + if (last_output.at(mglevel).at(reflevel).at(vindex) == cctk_iteration) { + // Has already been output during this iteration + char* varname = CCTK_FullName(vindex); + CCTK_VWarn (5, __LINE__, __FILE__, CCTK_THORNSTRING, + "Skipping output for variable \"%s\", because this variable " + "has already been output during the current iteration -- " + "probably via a trigger during the analysis stage", + varname); + free (varname); + return 0; + } + + assert (last_output.at(mglevel).at(reflevel).at(vindex) < cctk_iteration); + + // Should be output during this iteration + return 1; +} + + +static int OutputVar (const cGH* const cctkGH, const ioRequest* const request, + hid_t file) +{ + DECLARE_CCTK_PARAMETERS; + + const int group = CCTK_GroupIndexFromVarI (request->vindex); + assert (group >= 0); + cGroup groupdata; + CCTK_GroupData (group, &groupdata); + if (groupdata.grouptype == CCTK_SCALAR or groupdata.grouptype == CCTK_ARRAY) { + assert (do_global_mode); + } + + // check for storage + if (not CCTK_QueryGroupStorageI (cctkGH, group)) { + char* fullname = CCTK_FullName (request->vindex); + CCTK_VWarn (1, __LINE__, __FILE__, CCTK_THORNSTRING, + "Cannot output variable '%s' because it has no storage", + fullname); + free (fullname); + return (0); + } + + if (out_unchunked) { + WriteVarUnchunked (cctkGH, file, request, false); + } else { + WriteVarChunkedSequential (cctkGH, file, request, false); + } + + last_output.at(mglevel).at(reflevel).at(request->vindex) = + cctkGH->cctk_iteration; + + return (0); +} + + +} // namespace CarpetIOStreamedHDF5 diff --git a/Carpet/CarpetIOStreamedHDF5/src/CarpetIOStreamedHDF5.hh b/Carpet/CarpetIOStreamedHDF5/src/CarpetIOStreamedHDF5.hh new file mode 100644 index 000000000..9f8038e2b --- /dev/null +++ b/Carpet/CarpetIOStreamedHDF5/src/CarpetIOStreamedHDF5.hh @@ -0,0 +1,48 @@ +#ifndef CARPETIOSTREAMEDHDF5_HH +#define CARPETIOSTREAMEDHDF5_HH + +#include "CarpetIOHDF5.hh" +#include "SocketUtils.h" + + +// CarpetIOStreamed GH extension structure +typedef struct +{ + // port for clients to connect to + unsigned int port; + + // socket to stream data over a TCP connection + SOCKET socket; + + // default number of times to output + int out_every_default; + + // the last iteration output for each variable + vector<int> out_last; + + // list of variables to output + char *out_vars; + + // stop on I/O parameter parsing errors ? + int stop_on_parse_errors; + + // I/O request description list (for all variables) + vector<ioRequest*> requests; + +} CarpetIOStreamedHDF5GH; + + +namespace CarpetIOStreamedHDF5 +{ + // scheduled routines (must be declared as C according to schedule.ccl) + extern "C" { + + void CarpetIOStreamedHDF5_Startup (void); + void CarpetIOStreamedHDF5_Init (const cGH* const); + void CarpetIOStreamedHDF5_Terminate (const cGH* const); + + } // extern "C" + +} // namespace CarpetIOStreamedHDF5 + +#endif // !defined(CARPETIOSTREAMEDHDF5_HH) diff --git a/Carpet/CarpetIOStreamedHDF5/src/make.code.defn b/Carpet/CarpetIOStreamedHDF5/src/make.code.defn new file mode 100644 index 000000000..11bf54377 --- /dev/null +++ b/Carpet/CarpetIOStreamedHDF5/src/make.code.defn @@ -0,0 +1,4 @@ +# Main make.code.defn file for thorn CarpetIOStreamedHDF5 + +# Source files in this directory +SRCS = CarpetIOStreamedHDF5.cc diff --git a/Carpet/CarpetIOStreamedHDF5/src/make.configuration.defn b/Carpet/CarpetIOStreamedHDF5/src/make.configuration.defn new file mode 100644 index 000000000..68f89cb2c --- /dev/null +++ b/Carpet/CarpetIOStreamedHDF5/src/make.configuration.defn @@ -0,0 +1,6 @@ +# make.configuration.defn for IOStreamedHDF5 + +# make sure that the HDF5 Stream Virtual File Driver is available +ifeq ($(strip $(HAVE_HDF5_STREAM_VFD)),) + $(error 'CarpetIOStreamedHDF5 requires an HDF5 installation with built-in Stream Virtual File Driver. Please reconfigure with an appropriate HDF5 installation or remove CarpetIOStreamedHDF5 from ThornList !') +endif |