summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--libavformat/udp.c100
1 files changed, 58 insertions, 42 deletions
diff --git a/libavformat/udp.c b/libavformat/udp.c
index 70dc98e4de..58e7498845 100644
--- a/libavformat/udp.c
+++ b/libavformat/udp.c
@@ -29,6 +29,7 @@
#include "avformat.h"
#include "avio_internal.h"
+#include "libavutil/avassert.h"
#include "libavutil/parseutils.h"
#include "libavutil/fifo.h"
#include "libavutil/intreadwrite.h"
@@ -93,6 +94,7 @@ typedef struct UDPContext {
AVFifoBuffer *fifo;
int circular_buffer_error;
int64_t packet_gap; /* delay between transmitted packets */
+ int close_req;
#if HAVE_PTHREAD_CANCEL
pthread_t circular_buffer_thread;
pthread_mutex_t mutex;
@@ -545,30 +547,6 @@ end:
return NULL;
}
-static void do_udp_write(void *arg, void *buf, int size) {
- URLContext *h = arg;
- UDPContext *s = h->priv_data;
-
- int ret;
-
- if (!(h->flags & AVIO_FLAG_NONBLOCK)) {
- ret = ff_network_wait_fd(s->udp_fd, 1);
- if (ret < 0) {
- s->circular_buffer_error = ret;
- return;
- }
- }
-
- if (!s->is_connected) {
- ret = sendto (s->udp_fd, buf, size, 0,
- (struct sockaddr *) &s->dest_addr,
- s->dest_addr_len);
- } else
- ret = send(s->udp_fd, buf, size, 0);
-
- s->circular_buffer_error=ret;
-}
-
static void *circular_buffer_task_tx( void *_URLContext)
{
URLContext *h = _URLContext;
@@ -576,41 +554,67 @@ static void *circular_buffer_task_tx( void *_URLContext)
int old_cancelstate;
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_cancelstate);
+ pthread_mutex_lock(&s->mutex);
+
+ if (ff_socket_nonblock(s->udp_fd, 0) < 0) {
+ av_log(h, AV_LOG_ERROR, "Failed to set blocking mode");
+ s->circular_buffer_error = AVERROR(EIO);
+ goto end;
+ }
for(;;) {
int len;
+ const uint8_t *p;
uint8_t tmp[4];
- pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &old_cancelstate);
-
- av_usleep(s->packet_gap);
-
- pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_cancelstate);
-
- pthread_mutex_lock(&s->mutex);
-
len=av_fifo_size(s->fifo);
while (len<4) {
+ if (s->close_req)
+ goto end;
if (pthread_cond_wait(&s->cond, &s->mutex) < 0) {
goto end;
}
len=av_fifo_size(s->fifo);
}
- av_fifo_generic_peek(s->fifo, tmp, 4, NULL);
+ av_fifo_generic_read(s->fifo, tmp, 4, NULL);
len=AV_RL32(tmp);
- if (len>0 && av_fifo_size(s->fifo)>=len+4) {
- av_fifo_drain(s->fifo, 4); /* skip packet length */
- av_fifo_generic_read(s->fifo, h, len, do_udp_write); /* use function for write from fifo buffer */
- if (s->circular_buffer_error == len) {
- /* all ok - reset error */
- s->circular_buffer_error=0;
+ av_assert0(len >= 0);
+ av_assert0(len <= sizeof(s->tmp));
+
+ av_fifo_generic_read(s->fifo, s->tmp, len, NULL);
+
+ pthread_mutex_unlock(&s->mutex);
+ pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &old_cancelstate);
+
+ p = s->tmp;
+ while (len) {
+ int ret;
+ av_assert0(len > 0);
+ if (!s->is_connected) {
+ ret = sendto (s->udp_fd, p, len, 0,
+ (struct sockaddr *) &s->dest_addr,
+ s->dest_addr_len);
+ } else
+ ret = send(s->udp_fd, p, len, 0);
+ if (ret >= 0) {
+ len -= ret;
+ p += ret;
+ } else {
+ ret = ff_neterrno();
+ if (ret != AVERROR(EAGAIN) && ret != AVERROR(EINTR)) {
+ s->circular_buffer_error = ret;
+ return NULL;
+ }
}
}
- pthread_mutex_unlock(&s->mutex);
+ av_usleep(s->packet_gap);
+
+ pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_cancelstate);
+ pthread_mutex_lock(&s->mutex);
}
end:
@@ -1055,7 +1059,6 @@ static int udp_write(URLContext *h, const uint8_t *buf, int size)
*/
if (s->circular_buffer_error<0) {
int err=s->circular_buffer_error;
- s->circular_buffer_error=0;
pthread_mutex_unlock(&s->mutex);
return err;
}
@@ -1093,13 +1096,26 @@ static int udp_close(URLContext *h)
{
UDPContext *s = h->priv_data;
+#if HAVE_PTHREAD_CANCEL
+ // Request close once writing is finished
+ if (s->thread_started && !(h->flags & AVIO_FLAG_READ)) {
+ int ret;
+ pthread_mutex_lock(&s->mutex);
+ s->close_req = 1;
+ pthread_cond_signal(&s->cond);
+ pthread_mutex_unlock(&s->mutex);
+ }
+#endif
+
if (s->is_multicast && (h->flags & AVIO_FLAG_READ))
udp_leave_multicast_group(s->udp_fd, (struct sockaddr *)&s->dest_addr,(struct sockaddr *)&s->local_addr_storage);
closesocket(s->udp_fd);
#if HAVE_PTHREAD_CANCEL
if (s->thread_started) {
int ret;
- pthread_cancel(s->circular_buffer_thread);
+ // Cancel only read, as write has been signaled as success to the user
+ if (h->flags & AVIO_FLAG_READ)
+ pthread_cancel(s->circular_buffer_thread);
ret = pthread_join(s->circular_buffer_thread, NULL);
if (ret != 0)
av_log(h, AV_LOG_ERROR, "pthread_join(): %s\n", strerror(ret));