Commit 8583b142 authored by Josh Allmann's avatar Josh Allmann Committed by Martin Storsjö

rtmp: Support reading interleaved chunks.

A given packet won't always come in contiguously; sometimes
they may be broken up on chunk boundaries by packets of another
channel.

This support primarily involves tracking information about the
data that's been read, so the reader can pick up where it left
off for a given channel.

As a side effect, we no longer over-report the bytes read if
(toread = MIN(size, chunk_size)) == size
Signed-off-by: 's avatarMartin Storsjö <martin@martin.st>
parent d4c2a374
...@@ -140,16 +140,17 @@ int ff_rtmp_packet_read(URLContext *h, RTMPPacket *p, ...@@ -140,16 +140,17 @@ int ff_rtmp_packet_read(URLContext *h, RTMPPacket *p,
return ff_rtmp_packet_read_internal(h, p, chunk_size, prev_pkt, hdr); return ff_rtmp_packet_read_internal(h, p, chunk_size, prev_pkt, hdr);
} }
int ff_rtmp_packet_read_internal(URLContext *h, RTMPPacket *p, int chunk_size, static int rtmp_packet_read_one_chunk(URLContext *h, RTMPPacket *p,
RTMPPacket *prev_pkt, uint8_t hdr) int chunk_size, RTMPPacket *prev_pkt,
uint8_t hdr)
{ {
uint8_t t, buf[16]; uint8_t buf[16];
int channel_id, timestamp, size, offset = 0; int channel_id, timestamp, size;
uint32_t extra = 0; uint32_t extra = 0;
enum RTMPPacketType type; enum RTMPPacketType type;
int written = 0; int written = 0;
int ret; int ret, toread;
written++; written++;
channel_id = hdr & 0x3F; channel_id = hdr & 0x3F;
...@@ -198,37 +199,69 @@ int ff_rtmp_packet_read_internal(URLContext *h, RTMPPacket *p, int chunk_size, ...@@ -198,37 +199,69 @@ int ff_rtmp_packet_read_internal(URLContext *h, RTMPPacket *p, int chunk_size,
if (hdr != RTMP_PS_TWELVEBYTES) if (hdr != RTMP_PS_TWELVEBYTES)
timestamp += prev_pkt[channel_id].timestamp; timestamp += prev_pkt[channel_id].timestamp;
if ((ret = ff_rtmp_packet_create(p, channel_id, type, timestamp, if (!prev_pkt[channel_id].read) {
size)) < 0) if ((ret = ff_rtmp_packet_create(p, channel_id, type, timestamp,
return ret; size)) < 0)
return ret;
p->read = written;
p->offset = 0;
prev_pkt[channel_id].ts_delta = timestamp -
prev_pkt[channel_id].timestamp;
prev_pkt[channel_id].timestamp = timestamp;
} else {
// previous packet in this channel hasn't completed reading
RTMPPacket *prev = &prev_pkt[channel_id];
p->data = prev->data;
p->size = prev->size;
p->channel_id = prev->channel_id;
p->type = prev->type;
p->ts_delta = prev->ts_delta;
p->extra = prev->extra;
p->offset = prev->offset;
p->read = prev->read + written;
p->timestamp = prev->timestamp;
prev->data = NULL;
}
p->extra = extra; p->extra = extra;
// save history // save history
prev_pkt[channel_id].channel_id = channel_id; prev_pkt[channel_id].channel_id = channel_id;
prev_pkt[channel_id].type = type; prev_pkt[channel_id].type = type;
prev_pkt[channel_id].size = size; prev_pkt[channel_id].size = size;
prev_pkt[channel_id].ts_delta = timestamp - prev_pkt[channel_id].timestamp;
prev_pkt[channel_id].timestamp = timestamp;
prev_pkt[channel_id].extra = extra; prev_pkt[channel_id].extra = extra;
while (size > 0) { size = size - p->offset;
int toread = FFMIN(size, chunk_size);
if (ffurl_read_complete(h, p->data + offset, toread) != toread) { toread = FFMIN(size, chunk_size);
ff_rtmp_packet_destroy(p); if (ffurl_read_complete(h, p->data + p->offset, toread) != toread) {
ff_rtmp_packet_destroy(p);
return AVERROR(EIO);
}
size -= toread;
p->read += toread;
p->offset += toread;
if (size > 0) {
RTMPPacket *prev = &prev_pkt[channel_id];
prev->data = p->data;
prev->read = p->read;
prev->offset = p->offset;
return AVERROR(EAGAIN);
}
prev_pkt[channel_id].read = 0; // read complete; reset if needed
return p->read;
}
int ff_rtmp_packet_read_internal(URLContext *h, RTMPPacket *p, int chunk_size,
RTMPPacket *prev_pkt, uint8_t hdr)
{
while (1) {
int ret = rtmp_packet_read_one_chunk(h, p, chunk_size, prev_pkt, hdr);
if (ret > 0 || ret != AVERROR(EAGAIN))
return ret;
if (ffurl_read(h, &hdr, 1) != 1)
return AVERROR(EIO); return AVERROR(EIO);
}
size -= chunk_size;
offset += chunk_size;
written += chunk_size;
if (size > 0) {
if ((ret = ffurl_read_complete(h, &t, 1)) < 0) { // marker
ff_rtmp_packet_destroy(p);
return ret;
}
written++;
if (t != (0xC0 + channel_id))
return -1;
}
} }
return written;
} }
int ff_rtmp_packet_write(URLContext *h, RTMPPacket *pkt, int ff_rtmp_packet_write(URLContext *h, RTMPPacket *pkt,
......
...@@ -82,6 +82,8 @@ typedef struct RTMPPacket { ...@@ -82,6 +82,8 @@ typedef struct RTMPPacket {
uint32_t extra; ///< probably an additional channel ID used during streaming data uint32_t extra; ///< probably an additional channel ID used during streaming data
uint8_t *data; ///< packet payload uint8_t *data; ///< packet payload
int size; ///< packet payload size int size; ///< packet payload size
int offset; ///< amount of data read so far
int read; ///< amount read, including headers
} RTMPPacket; } RTMPPacket;
/** /**
......
...@@ -2309,7 +2309,7 @@ static int get_packet(URLContext *s, int for_header) ...@@ -2309,7 +2309,7 @@ static int get_packet(URLContext *s, int for_header)
static int rtmp_close(URLContext *h) static int rtmp_close(URLContext *h)
{ {
RTMPContext *rt = h->priv_data; RTMPContext *rt = h->priv_data;
int ret = 0; int ret = 0, i, j;
if (!rt->is_input) { if (!rt->is_input) {
rt->flv_data = NULL; rt->flv_data = NULL;
...@@ -2320,6 +2320,9 @@ static int rtmp_close(URLContext *h) ...@@ -2320,6 +2320,9 @@ static int rtmp_close(URLContext *h)
} }
if (rt->state > STATE_HANDSHAKED) if (rt->state > STATE_HANDSHAKED)
ret = gen_delete_stream(h, rt); ret = gen_delete_stream(h, rt);
for (i = 0; i < 2; i++)
for (j = 0; j < RTMP_CHANNELS; j++)
ff_rtmp_packet_destroy(&rt->prev_pkt[i][j]);
free_tracked_methods(rt); free_tracked_methods(rt);
av_freep(&rt->flv_data); av_freep(&rt->flv_data);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment