From 754f26a97c816781e80500d98f2515ae97836145 Mon Sep 17 00:00:00 2001 From: Max Kellermann Date: Wed, 14 Sep 2011 21:46:41 +0200 Subject: input_stream: non-blocking I/O Add GMutex, GCond attributes which will be used by callers to conditionally wait on the stream. Remove the (now-useless) plugin method buffer(), wait on GCond instead. Lock the input_stream before each method call. Do the same with the playlist plugins. --- src/input_stream.h | 121 +++++++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 113 insertions(+), 8 deletions(-) (limited to 'src/input_stream.h') diff --git a/src/input_stream.h b/src/input_stream.h index 7866562a..6a10831d 100644 --- a/src/input_stream.h +++ b/src/input_stream.h @@ -45,6 +45,26 @@ struct input_stream { */ char *uri; + /** + * A mutex that protects the mutable attributes of this object + * and its implementation. It must be locked before calling + * any of the public methods. + * + * This object is allocated by the client, and the client is + * responsible for freeing it. + */ + GMutex *mutex; + + /** + * A cond that gets signalled when the state of this object + * changes from the I/O thread. The client of this object may + * wait on it. Optional, may be NULL. + * + * This object is allocated by the client, and the client is + * responsible for freeing it. + */ + GCond *cond; + /** * indicates whether the stream is ready for reading and * whether the other attributes in this struct are valid @@ -76,20 +96,43 @@ struct input_stream { * Opens a new input stream. You may not access it until the "ready" * flag is set. * + * @param mutex a mutex that is used to protect this object; must be + * locked before calling any of the public methods + * @param cond a cond that gets signalled when the state of + * this object changes; may be NULL if the caller doesn't want to get + * notifications * @return an #input_stream object on success, NULL on error */ -gcc_nonnull(1) +gcc_nonnull(1, 2) G_GNUC_MALLOC struct input_stream * -input_stream_open(const char *uri, GError **error_r); +input_stream_open(const char *uri, + GMutex *mutex, GCond *cond, + GError **error_r); /** * Close the input stream and free resources. + * + * The caller must not lock the mutex. */ gcc_nonnull(1) void input_stream_close(struct input_stream *is); +gcc_nonnull(1) +static inline void +input_stream_lock(struct input_stream *is) +{ + g_mutex_lock(is->mutex); +} + +gcc_nonnull(1) +static inline void +input_stream_unlock(struct input_stream *is) +{ + g_mutex_unlock(is->mutex); +} + /** * Check for errors that may have occurred in the I/O thread. * @@ -107,10 +150,29 @@ gcc_nonnull(1) void input_stream_update(struct input_stream *is); +/** + * Wait until the stream becomes ready. + * + * The caller must lock the mutex. + */ +gcc_nonnull(1) +void +input_stream_wait_ready(struct input_stream *is); + +/** + * Wrapper for input_stream_wait_locked() which locks and unlocks the + * mutex; the caller must not be holding it already. + */ +gcc_nonnull(1) +void +input_stream_lock_wait_ready(struct input_stream *is); + /** * Seeks to the specified position in the stream. This will most * likely fail if the "seekable" flag is false. * + * The caller must lock the mutex. + * * @param is the input_stream object * @param offset the relative offset * @param whence the base of the seek, one of SEEK_SET, SEEK_CUR, SEEK_END @@ -120,16 +182,38 @@ bool input_stream_seek(struct input_stream *is, goffset offset, int whence, GError **error_r); +/** + * Wrapper for input_stream_seek() which locks and unlocks the + * mutex; the caller must not be holding it already. + */ +gcc_nonnull(1) +bool +input_stream_lock_seek(struct input_stream *is, goffset offset, int whence, + GError **error_r); + /** * Returns true if the stream has reached end-of-file. + * + * The caller must lock the mutex. */ gcc_nonnull(1) G_GNUC_PURE bool input_stream_eof(struct input_stream *is); +/** + * Wrapper for input_stream_eof() which locks and unlocks the mutex; + * the caller must not be holding it already. + */ +gcc_nonnull(1) +G_GNUC_PURE +bool +input_stream_lock_eof(struct input_stream *is); + /** * Reads the tag from the stream. * + * The caller must lock the mutex. + * * @return a tag object which must be freed with tag_free(), or NULL * if the tag has not changed since the last call */ @@ -139,20 +223,32 @@ struct tag * input_stream_tag(struct input_stream *is); /** - * Reads some of the stream into its buffer. The following return - * codes are defined: -1 = error, 1 = something was buffered, 0 = - * nothing was buffered. + * Wrapper for input_stream_tag() which locks and unlocks the + * mutex; the caller must not be holding it already. + */ +gcc_nonnull(1) +G_GNUC_MALLOC +struct tag * +input_stream_lock_tag(struct input_stream *is); + +/** + * Returns true if the next read operation will not block: either data + * is available, or end-of-stream has been reached, or an error has + * occurred. * - * The semantics of this function are not well-defined, and it will - * eventually be removed. + * The caller must lock the mutex. */ gcc_nonnull(1) -int input_stream_buffer(struct input_stream *is, GError **error_r); +G_GNUC_PURE +bool +input_stream_available(struct input_stream *is); /** * Reads data from the stream into the caller-supplied buffer. * Returns 0 on error or eof (check with input_stream_eof()). * + * The caller must lock the mutex. + * * @param is the input_stream object * @param ptr the buffer to read into * @param size the maximum number of bytes to read @@ -163,4 +259,13 @@ size_t input_stream_read(struct input_stream *is, void *ptr, size_t size, GError **error_r); +/** + * Wrapper for input_stream_tag() which locks and unlocks the + * mutex; the caller must not be holding it already. + */ +gcc_nonnull(1, 2) +size_t +input_stream_lock_read(struct input_stream *is, void *ptr, size_t size, + GError **error_r); + #endif -- cgit v1.2.3