From 84a125c4c28f3e3e215d2e6c32f7f0ec43bbc04c Mon Sep 17 00:00:00 2001 From: Martin Storsjö Date: Fri, 11 Oct 2013 22:16:04 +0300 Subject: rtmp: Allocate the prev_pkt arrays dynamically MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Normally, all channel ids are between 0 and 10, while they in uncommon cases can have values up to 64k. This avoids allocating two arrays for up to 64k entries (at a total of over 6 MB in size) each when most of them aren't used at all. Signed-off-by: Martin Storsjö --- libavformat/rtmppkt.c | 50 ++++++++++++++++++++++++++++++++++++++++++------- libavformat/rtmppkt.h | 22 +++++++++++++++++++--- libavformat/rtmpproto.c | 48 +++++++++++++++++++++++++++++------------------ 3 files changed, 92 insertions(+), 28 deletions(-) diff --git a/libavformat/rtmppkt.c b/libavformat/rtmppkt.c index defe81e05e..f58f668b53 100644 --- a/libavformat/rtmppkt.c +++ b/libavformat/rtmppkt.c @@ -129,20 +129,42 @@ int ff_amf_read_null(GetByteContext *bc) return 0; } +int ff_rtmp_check_alloc_array(RTMPPacket **prev_pkt, int *nb_prev_pkt, + int channel) +{ + int nb_alloc; + RTMPPacket *ptr; + if (channel < *nb_prev_pkt) + return 0; + + nb_alloc = channel + 16; + // This can't use the av_reallocp family of functions, since we + // would need to free each element in the array before the array + // itself is freed. + ptr = av_realloc_array(*prev_pkt, nb_alloc, sizeof(**prev_pkt)); + if (!ptr) + return AVERROR(ENOMEM); + memset(ptr + *nb_prev_pkt, 0, (nb_alloc - *nb_prev_pkt) * sizeof(*ptr)); + *prev_pkt = ptr; + *nb_prev_pkt = nb_alloc; + return 0; +} + int ff_rtmp_packet_read(URLContext *h, RTMPPacket *p, - int chunk_size, RTMPPacket *prev_pkt) + int chunk_size, RTMPPacket **prev_pkt, int *nb_prev_pkt) { uint8_t hdr; if (ffurl_read(h, &hdr, 1) != 1) return AVERROR(EIO); - return ff_rtmp_packet_read_internal(h, p, chunk_size, prev_pkt, hdr); + return ff_rtmp_packet_read_internal(h, p, chunk_size, prev_pkt, + nb_prev_pkt, hdr); } static int rtmp_packet_read_one_chunk(URLContext *h, RTMPPacket *p, - int chunk_size, RTMPPacket *prev_pkt, - uint8_t hdr) + int chunk_size, RTMPPacket **prev_pkt_ptr, + int *nb_prev_pkt, uint8_t hdr) { uint8_t buf[16]; @@ -151,6 +173,7 @@ static int rtmp_packet_read_one_chunk(URLContext *h, RTMPPacket *p, enum RTMPPacketType type; int written = 0; int ret, toread; + RTMPPacket *prev_pkt; written++; channel_id = hdr & 0x3F; @@ -162,6 +185,10 @@ static int rtmp_packet_read_one_chunk(URLContext *h, RTMPPacket *p, written += channel_id + 1; channel_id = AV_RL16(buf) + 64; } + if ((ret = ff_rtmp_check_alloc_array(prev_pkt_ptr, nb_prev_pkt, + channel_id)) < 0) + return ret; + prev_pkt = *prev_pkt_ptr; size = prev_pkt[channel_id].size; type = prev_pkt[channel_id].type; extra = prev_pkt[channel_id].extra; @@ -252,10 +279,12 @@ static int rtmp_packet_read_one_chunk(URLContext *h, RTMPPacket *p, } int ff_rtmp_packet_read_internal(URLContext *h, RTMPPacket *p, int chunk_size, - RTMPPacket *prev_pkt, uint8_t hdr) + RTMPPacket **prev_pkt, int *nb_prev_pkt, + uint8_t hdr) { while (1) { - int ret = rtmp_packet_read_one_chunk(h, p, chunk_size, prev_pkt, hdr); + int ret = rtmp_packet_read_one_chunk(h, p, chunk_size, prev_pkt, + nb_prev_pkt, hdr); if (ret > 0 || ret != AVERROR(EAGAIN)) return ret; @@ -265,13 +294,20 @@ int ff_rtmp_packet_read_internal(URLContext *h, RTMPPacket *p, int chunk_size, } int ff_rtmp_packet_write(URLContext *h, RTMPPacket *pkt, - int chunk_size, RTMPPacket *prev_pkt) + int chunk_size, RTMPPacket **prev_pkt_ptr, + int *nb_prev_pkt) { uint8_t pkt_hdr[16], *p = pkt_hdr; int mode = RTMP_PS_TWELVEBYTES; int off = 0; int written = 0; int ret; + RTMPPacket *prev_pkt; + + if ((ret = ff_rtmp_check_alloc_array(prev_pkt_ptr, nb_prev_pkt, + pkt->channel_id)) < 0) + return ret; + prev_pkt = *prev_pkt_ptr; pkt->ts_delta = pkt->timestamp - prev_pkt[pkt->channel_id].timestamp; diff --git a/libavformat/rtmppkt.h b/libavformat/rtmppkt.h index ce326d15db..7121d7e268 100644 --- a/libavformat/rtmppkt.h +++ b/libavformat/rtmppkt.h @@ -114,10 +114,12 @@ void ff_rtmp_packet_destroy(RTMPPacket *pkt); * @param chunk_size current chunk size * @param prev_pkt previously read packet headers for all channels * (may be needed for restoring incomplete packet header) + * @param nb_prev_pkt number of allocated elements in prev_pkt * @return number of bytes read on success, negative value otherwise */ int ff_rtmp_packet_read(URLContext *h, RTMPPacket *p, - int chunk_size, RTMPPacket *prev_pkt); + int chunk_size, RTMPPacket **prev_pkt, + int *nb_prev_pkt); /** * Read internal RTMP packet sent by the server. * @@ -126,11 +128,13 @@ int ff_rtmp_packet_read(URLContext *h, RTMPPacket *p, * @param chunk_size current chunk size * @param prev_pkt previously read packet headers for all channels * (may be needed for restoring incomplete packet header) + * @param nb_prev_pkt number of allocated elements in prev_pkt * @param c the first byte already read * @return number of bytes read on success, negative value otherwise */ int ff_rtmp_packet_read_internal(URLContext *h, RTMPPacket *p, int chunk_size, - RTMPPacket *prev_pkt, uint8_t c); + RTMPPacket **prev_pkt, int *nb_prev_pkt, + uint8_t c); /** * Send RTMP packet to the server. @@ -140,10 +144,12 @@ int ff_rtmp_packet_read_internal(URLContext *h, RTMPPacket *p, int chunk_size, * @param chunk_size current chunk size * @param prev_pkt previously sent packet headers for all channels * (may be used for packet header compressing) + * @param nb_prev_pkt number of allocated elements in prev_pkt * @return number of bytes written on success, negative value otherwise */ int ff_rtmp_packet_write(URLContext *h, RTMPPacket *p, - int chunk_size, RTMPPacket *prev_pkt); + int chunk_size, RTMPPacket **prev_pkt, + int *nb_prev_pkt); /** * Print information and contents of RTMP packet. @@ -153,6 +159,16 @@ int ff_rtmp_packet_write(URLContext *h, RTMPPacket *p, */ void ff_rtmp_packet_dump(void *ctx, RTMPPacket *p); +/** + * Enlarge the prev_pkt array to fit the given channel + * + * @param prev_pkt array with previously sent packet headers + * @param nb_prev_pkt number of allocated elements in prev_pkt + * @param channel the channel number that needs to be allocated + */ +int ff_rtmp_check_alloc_array(RTMPPacket **prev_pkt, int *nb_prev_pkt, + int channel); + /** * @name Functions used to work with the AMF format (which is also used in .flv) * @see amf_* funcs in libavformat/flvdec.c diff --git a/libavformat/rtmpproto.c b/libavformat/rtmpproto.c index 3dbfc92c48..027f6f6941 100644 --- a/libavformat/rtmpproto.c +++ b/libavformat/rtmpproto.c @@ -77,7 +77,8 @@ typedef struct TrackedMethod { typedef struct RTMPContext { const AVClass *class; URLContext* stream; ///< TCP stream used in interactions with RTMP server - RTMPPacket prev_pkt[2][RTMP_CHANNELS]; ///< packet history used when reading and sending packets ([0] for reading, [1] for writing) + RTMPPacket *prev_pkt[2]; ///< packet history used when reading and sending packets ([0] for reading, [1] for writing) + int nb_prev_pkt[2]; ///< number of elements in prev_pkt int in_chunk_size; ///< size of the chunks incoming RTMP packets are divided into int out_chunk_size; ///< size of the chunks outgoing RTMP packets are divided into int is_input; ///< input/output flag @@ -238,7 +239,7 @@ static int rtmp_send_packet(RTMPContext *rt, RTMPPacket *pkt, int track) } ret = ff_rtmp_packet_write(rt->stream, pkt, rt->out_chunk_size, - rt->prev_pkt[1]); + &rt->prev_pkt[1], &rt->nb_prev_pkt[1]); fail: ff_rtmp_packet_destroy(pkt); return ret; @@ -408,7 +409,7 @@ static int read_connect(URLContext *s, RTMPContext *rt) GetByteContext gbc; if ((ret = ff_rtmp_packet_read(rt->stream, &pkt, rt->in_chunk_size, - rt->prev_pkt[0])) < 0) + &rt->prev_pkt[0], &rt->nb_prev_pkt[0])) < 0) return ret; cp = pkt.data; bytestream2_init(&gbc, cp, pkt.size); @@ -444,7 +445,7 @@ static int read_connect(URLContext *s, RTMPContext *rt) bytestream_put_be32(&p, rt->server_bw); pkt.size = p - pkt.data; ret = ff_rtmp_packet_write(rt->stream, &pkt, rt->out_chunk_size, - rt->prev_pkt[1]); + &rt->prev_pkt[1], &rt->nb_prev_pkt[1]); ff_rtmp_packet_destroy(&pkt); if (ret < 0) return ret; @@ -457,7 +458,7 @@ static int read_connect(URLContext *s, RTMPContext *rt) bytestream_put_byte(&p, 2); // dynamic pkt.size = p - pkt.data; ret = ff_rtmp_packet_write(rt->stream, &pkt, rt->out_chunk_size, - rt->prev_pkt[1]); + &rt->prev_pkt[1], &rt->nb_prev_pkt[1]); ff_rtmp_packet_destroy(&pkt); if (ret < 0) return ret; @@ -471,7 +472,7 @@ static int read_connect(URLContext *s, RTMPContext *rt) bytestream_put_be16(&p, 0); // 0 -> Stream Begin bytestream_put_be32(&p, 0); ret = ff_rtmp_packet_write(rt->stream, &pkt, rt->out_chunk_size, - rt->prev_pkt[1]); + &rt->prev_pkt[1], &rt->nb_prev_pkt[1]); ff_rtmp_packet_destroy(&pkt); if (ret < 0) return ret; @@ -484,7 +485,7 @@ static int read_connect(URLContext *s, RTMPContext *rt) p = pkt.data; bytestream_put_be32(&p, rt->out_chunk_size); ret = ff_rtmp_packet_write(rt->stream, &pkt, rt->out_chunk_size, - rt->prev_pkt[1]); + &rt->prev_pkt[1], &rt->nb_prev_pkt[1]); ff_rtmp_packet_destroy(&pkt); if (ret < 0) return ret; @@ -519,7 +520,7 @@ static int read_connect(URLContext *s, RTMPContext *rt) pkt.size = p - pkt.data; ret = ff_rtmp_packet_write(rt->stream, &pkt, rt->out_chunk_size, - rt->prev_pkt[1]); + &rt->prev_pkt[1], &rt->nb_prev_pkt[1]); ff_rtmp_packet_destroy(&pkt); if (ret < 0) return ret; @@ -534,7 +535,7 @@ static int read_connect(URLContext *s, RTMPContext *rt) ff_amf_write_number(&p, 8192); pkt.size = p - pkt.data; ret = ff_rtmp_packet_write(rt->stream, &pkt, rt->out_chunk_size, - rt->prev_pkt[1]); + &rt->prev_pkt[1], &rt->nb_prev_pkt[1]); ff_rtmp_packet_destroy(&pkt); return ret; @@ -1455,7 +1456,7 @@ static int handle_chunk_size(URLContext *s, RTMPPacket *pkt) /* Send the same chunk size change packet back to the server, * setting the outgoing chunk size to the same as the incoming one. */ if ((ret = ff_rtmp_packet_write(rt->stream, pkt, rt->out_chunk_size, - rt->prev_pkt[1])) < 0) + &rt->prev_pkt[1], &rt->nb_prev_pkt[1])) < 0) return ret; rt->out_chunk_size = AV_RB32(pkt->data); } @@ -1784,7 +1785,7 @@ static int write_begin(URLContext *s) bytestream2_put_be32(&pbc, rt->nb_streamid); ret = ff_rtmp_packet_write(rt->stream, &spkt, rt->out_chunk_size, - rt->prev_pkt[1]); + &rt->prev_pkt[1], &rt->nb_prev_pkt[1]); ff_rtmp_packet_destroy(&spkt); @@ -1831,7 +1832,7 @@ static int write_status(URLContext *s, RTMPPacket *pkt, spkt.size = pp - spkt.data; ret = ff_rtmp_packet_write(rt->stream, &spkt, rt->out_chunk_size, - rt->prev_pkt[1]); + &rt->prev_pkt[1], &rt->nb_prev_pkt[1]); ff_rtmp_packet_destroy(&spkt); return ret; @@ -1932,7 +1933,7 @@ static int send_invoke_response(URLContext *s, RTMPPacket *pkt) } spkt.size = pp - spkt.data; ret = ff_rtmp_packet_write(rt->stream, &spkt, rt->out_chunk_size, - rt->prev_pkt[1]); + &rt->prev_pkt[1], &rt->nb_prev_pkt[1]); ff_rtmp_packet_destroy(&spkt); return ret; } @@ -2262,7 +2263,8 @@ static int get_packet(URLContext *s, int for_header) for (;;) { RTMPPacket rpkt = { 0 }; if ((ret = ff_rtmp_packet_read(rt->stream, &rpkt, - rt->in_chunk_size, rt->prev_pkt[0])) <= 0) { + rt->in_chunk_size, &rt->prev_pkt[0], + &rt->nb_prev_pkt[0])) <= 0) { if (ret == 0) { return AVERROR(EAGAIN); } else { @@ -2343,9 +2345,11 @@ static int rtmp_close(URLContext *h) } if (rt->state > STATE_HANDSHAKED) ret = gen_delete_stream(h, rt); - for (i = 0; i < 2; i++) - for (j = 0; j < RTMP_CHANNELS; j++) + for (i = 0; i < 2; i++) { + for (j = 0; j < rt->nb_prev_pkt[i]; j++) ff_rtmp_packet_destroy(&rt->prev_pkt[i][j]); + av_freep(&rt->prev_pkt[i]); + } free_tracked_methods(rt); av_freep(&rt->flv_data); @@ -2562,11 +2566,14 @@ reconnect: goto fail; if (rt->do_reconnect) { + int i; ffurl_close(rt->stream); rt->stream = NULL; rt->do_reconnect = 0; rt->nb_invokes = 0; - memset(rt->prev_pkt, 0, sizeof(rt->prev_pkt)); + for (i = 0; i < 2; i++) + memset(rt->prev_pkt[i], 0, + sizeof(**rt->prev_pkt) * rt->nb_prev_pkt[i]); free_tracked_methods(rt); goto reconnect; } @@ -2687,6 +2694,10 @@ static int rtmp_write(URLContext *s, const uint8_t *buf, int size) pkttype == RTMP_PT_NOTIFY) { if (pkttype == RTMP_PT_NOTIFY) pktsize += 16; + if ((ret = ff_rtmp_check_alloc_array(&rt->prev_pkt[1], + &rt->nb_prev_pkt[1], + channel)) < 0) + return ret; rt->prev_pkt[1][channel].channel_id = 0; } @@ -2747,7 +2758,8 @@ static int rtmp_write(URLContext *s, const uint8_t *buf, int size) if ((ret = ff_rtmp_packet_read_internal(rt->stream, &rpkt, rt->in_chunk_size, - rt->prev_pkt[0], c)) <= 0) + &rt->prev_pkt[0], + &rt->nb_prev_pkt[0], c)) <= 0) return ret; if ((ret = rtmp_parse_result(s, rt, &rpkt)) < 0) -- cgit v1.2.3