From 754f26a97c816781e80500d98f2515ae97836145 Mon Sep 17 00:00:00 2001 From: Max Kellermann Date: Wed, 14 Sep 2011 21:46:41 +0200 Subject: input_stream: non-blocking I/O Add GMutex, GCond attributes which will be used by callers to conditionally wait on the stream. Remove the (now-useless) plugin method buffer(), wait on GCond instead. Lock the input_stream before each method call. Do the same with the playlist plugins. --- test/dump_playlist.c | 25 ++++++++++--------------- test/read_tags.c | 11 +++++++++-- test/run_decoder.c | 10 ++++++++-- test/run_input.c | 30 ++++++++++++++++++------------ 4 files changed, 45 insertions(+), 31 deletions(-) (limited to 'test') diff --git a/test/dump_playlist.c b/test/dump_playlist.c index bf3fed7c..079fdeac 100644 --- a/test/dump_playlist.c +++ b/test/dump_playlist.c @@ -92,11 +92,14 @@ int main(int argc, char **argv) /* open the playlist */ - playlist = playlist_list_open_uri(uri); + GMutex *mutex = g_mutex_new(); + GCond *cond = g_cond_new(); + + playlist = playlist_list_open_uri(uri, mutex, cond); if (playlist == NULL) { /* open the stream and wait until it becomes ready */ - is = input_stream_open(uri, &error); + is = input_stream_open(uri, mutex, cond, &error); if (is == NULL) { if (error != NULL) { g_warning("%s", error->message); @@ -106,19 +109,7 @@ int main(int argc, char **argv) return 2; } - while (!is->ready) { - int ret = input_stream_buffer(is, &error); - if (ret < 0) { - /* error */ - g_warning("%s", error->message); - g_error_free(error); - return 2; - } - - if (ret == 0) - /* nothing was buffered - wait */ - g_usleep(10000); - } + input_stream_lock_wait_ready(is); /* open the playlist */ @@ -157,6 +148,10 @@ int main(int argc, char **argv) playlist_plugin_close(playlist); if (is != NULL) input_stream_close(is); + + g_cond_free(cond); + g_mutex_free(mutex); + playlist_list_global_finish(); input_stream_global_finish(); io_thread_deinit(); diff --git a/test/read_tags.c b/test/read_tags.c index c2e3b2ca..1d742e9e 100644 --- a/test/read_tags.c +++ b/test/read_tags.c @@ -91,7 +91,7 @@ decoder_read(G_GNUC_UNUSED struct decoder *decoder, struct input_stream *is, void *buffer, size_t length) { - return input_stream_read(is, buffer, length, NULL); + return input_stream_lock_read(is, buffer, length, NULL); } void @@ -193,7 +193,11 @@ int main(int argc, char **argv) tag = decoder_plugin_tag_dup(plugin, path); if (tag == NULL && plugin->stream_tag != NULL) { - struct input_stream *is = input_stream_open(path, &error); + GMutex *mutex = g_mutex_new(); + GCond *cond = g_cond_new(); + + struct input_stream *is = + input_stream_open(path, mutex, cond, &error); if (is == NULL) { g_printerr("Failed to open %s: %s\n", @@ -204,6 +208,9 @@ int main(int argc, char **argv) tag = decoder_plugin_stream_tag(plugin, is); input_stream_close(is); + + g_cond_free(cond); + g_mutex_free(mutex); } decoder_plugin_deinit_all(); diff --git a/test/run_decoder.c b/test/run_decoder.c index efc246f5..455b73ce 100644 --- a/test/run_decoder.c +++ b/test/run_decoder.c @@ -112,7 +112,7 @@ decoder_read(G_GNUC_UNUSED struct decoder *decoder, struct input_stream *is, void *buffer, size_t length) { - return input_stream_read(is, buffer, length, NULL); + return input_stream_lock_read(is, buffer, length, NULL); } void @@ -209,8 +209,11 @@ int main(int argc, char **argv) decoder_plugin_file_decode(decoder.plugin, &decoder, decoder.uri); } else if (decoder.plugin->stream_decode != NULL) { + GMutex *mutex = g_mutex_new(); + GCond *cond = g_cond_new(); + struct input_stream *is = - input_stream_open(decoder.uri, &error); + input_stream_open(decoder.uri, mutex, cond, &error); if (is == NULL) { if (error != NULL) { g_warning("%s", error->message); @@ -224,6 +227,9 @@ int main(int argc, char **argv) decoder_plugin_stream_decode(decoder.plugin, &decoder, is); input_stream_close(is); + + g_cond_free(cond); + g_mutex_free(mutex); } else { g_printerr("Decoder plugin is not usable\n"); return 1; diff --git a/test/run_input.c b/test/run_input.c index 651d3648..0fe5a01f 100644 --- a/test/run_input.c +++ b/test/run_input.c @@ -53,20 +53,17 @@ dump_input_stream(struct input_stream *is) size_t num_read; ssize_t num_written; + g_mutex_lock(is->mutex); + /* wait until the stream becomes ready */ - while (!is->ready) { - int ret = input_stream_buffer(is, &error); - if (ret < 0) { - /* error */ - g_warning("%s", error->message); - g_error_free(error); - return 2; - } + input_stream_wait_ready(is); - if (ret == 0) - /* nothing was buffered - wait */ - g_usleep(10000); + if (!input_stream_check(is, &error)) { + g_warning("%s", error->message); + g_error_free(error); + g_mutex_unlock(is->mutex); + return EXIT_FAILURE; } /* print meta data */ @@ -103,9 +100,12 @@ dump_input_stream(struct input_stream *is) if (!input_stream_check(is, &error)) { g_warning("%s", error->message); g_error_free(error); + g_mutex_unlock(is->mutex); return EXIT_FAILURE; } + g_mutex_unlock(is->mutex); + return 0; } @@ -149,7 +149,10 @@ int main(int argc, char **argv) /* open the stream and dump it */ - is = input_stream_open(argv[1], &error); + GMutex *mutex = g_mutex_new(); + GCond *cond = g_cond_new(); + + is = input_stream_open(argv[1], mutex, cond, &error); if (is != NULL) { ret = dump_input_stream(is); input_stream_close(is); @@ -162,6 +165,9 @@ int main(int argc, char **argv) ret = 2; } + g_cond_free(cond); + g_mutex_free(mutex); + /* deinitialize everything */ input_stream_global_finish(); -- cgit v1.2.3