Commit 413c842a authored by Pavel Nikiforov's avatar Pavel Nikiforov Committed by Michael Niedermayer

avformat/udp: Add a delay between packets for streaming to clients with short buffer

This commit enables sending UDP packets in a background thread with specified delay.
When sending packets without a delay some devices with small RX buffer
( MAG200 STB, for example) will drop tail packets in bursts causing
decoding errors.

To use it specify "fifo_size" with "packet_gap" .

The output url will looks like udp://xxx:yyy?fifo_size=<output fifo
size>&packet_gap=<delay in usecs>
Signed-off-by: 's avatarMichael Niedermayer <michael@niedermayer.cc>
parent 49640ae3
...@@ -1285,6 +1285,9 @@ Set the UDP maximum socket buffer size in bytes. This is used to set either ...@@ -1285,6 +1285,9 @@ Set the UDP maximum socket buffer size in bytes. This is used to set either
the receive or send buffer size, depending on what the socket is used for. the receive or send buffer size, depending on what the socket is used for.
Default is 64KB. See also @var{fifo_size}. Default is 64KB. See also @var{fifo_size}.
@item packet_gap=@var{seconds}
Delay between packets
@item localport=@var{port} @item localport=@var{port}
Override the local UDP port to bind with. Override the local UDP port to bind with.
......
...@@ -92,6 +92,7 @@ typedef struct UDPContext { ...@@ -92,6 +92,7 @@ typedef struct UDPContext {
int circular_buffer_size; int circular_buffer_size;
AVFifoBuffer *fifo; AVFifoBuffer *fifo;
int circular_buffer_error; int circular_buffer_error;
int64_t packet_gap; /* delay between transmitted packets */
#if HAVE_PTHREAD_CANCEL #if HAVE_PTHREAD_CANCEL
pthread_t circular_buffer_thread; pthread_t circular_buffer_thread;
pthread_mutex_t mutex; pthread_mutex_t mutex;
...@@ -112,6 +113,7 @@ typedef struct UDPContext { ...@@ -112,6 +113,7 @@ typedef struct UDPContext {
#define E AV_OPT_FLAG_ENCODING_PARAM #define E AV_OPT_FLAG_ENCODING_PARAM
static const AVOption options[] = { static const AVOption options[] = {
{ "buffer_size", "System data size (in bytes)", OFFSET(buffer_size), AV_OPT_TYPE_INT, { .i64 = -1 }, -1, INT_MAX, .flags = D|E }, { "buffer_size", "System data size (in bytes)", OFFSET(buffer_size), AV_OPT_TYPE_INT, { .i64 = -1 }, -1, INT_MAX, .flags = D|E },
{ "packet_gap", "Delay between packets", OFFSET(packet_gap), AV_OPT_TYPE_DURATION, { .i64 = 0 }, 0, INT_MAX, .flags = E },
{ "localport", "Local port", OFFSET(local_port), AV_OPT_TYPE_INT, { .i64 = -1 }, -1, INT_MAX, D|E }, { "localport", "Local port", OFFSET(local_port), AV_OPT_TYPE_INT, { .i64 = -1 }, -1, INT_MAX, D|E },
{ "local_port", "Local port", OFFSET(local_port), AV_OPT_TYPE_INT, { .i64 = -1 }, -1, INT_MAX, .flags = D|E }, { "local_port", "Local port", OFFSET(local_port), AV_OPT_TYPE_INT, { .i64 = -1 }, -1, INT_MAX, .flags = D|E },
{ "localaddr", "Local address", OFFSET(localaddr), AV_OPT_TYPE_STRING, { .str = NULL }, .flags = D|E }, { "localaddr", "Local address", OFFSET(localaddr), AV_OPT_TYPE_STRING, { .str = NULL }, .flags = D|E },
...@@ -486,7 +488,7 @@ static int udp_get_file_handle(URLContext *h) ...@@ -486,7 +488,7 @@ static int udp_get_file_handle(URLContext *h)
} }
#if HAVE_PTHREAD_CANCEL #if HAVE_PTHREAD_CANCEL
static void *circular_buffer_task( void *_URLContext) static void *circular_buffer_task_rx( void *_URLContext)
{ {
URLContext *h = _URLContext; URLContext *h = _URLContext;
UDPContext *s = h->priv_data; UDPContext *s = h->priv_data;
...@@ -542,6 +544,81 @@ end: ...@@ -542,6 +544,81 @@ end:
pthread_mutex_unlock(&s->mutex); pthread_mutex_unlock(&s->mutex);
return NULL; return NULL;
} }
static void do_udp_write(void *arg, void *buf, int size) {
URLContext *h = arg;
UDPContext *s = h->priv_data;
int ret;
if (!(h->flags & AVIO_FLAG_NONBLOCK)) {
ret = ff_network_wait_fd(s->udp_fd, 1);
if (ret < 0) {
s->circular_buffer_error = ret;
return;
}
}
if (!s->is_connected) {
ret = sendto (s->udp_fd, buf, size, 0,
(struct sockaddr *) &s->dest_addr,
s->dest_addr_len);
} else
ret = send(s->udp_fd, buf, size, 0);
s->circular_buffer_error=ret;
}
static void *circular_buffer_task_tx( void *_URLContext)
{
URLContext *h = _URLContext;
UDPContext *s = h->priv_data;
int old_cancelstate;
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_cancelstate);
for(;;) {
int len;
uint8_t tmp[4];
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &old_cancelstate);
av_usleep(s->packet_gap);
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_cancelstate);
pthread_mutex_lock(&s->mutex);
len=av_fifo_size(s->fifo);
while (len<4) {
if (pthread_cond_wait(&s->cond, &s->mutex) < 0) {
goto end;
}
len=av_fifo_size(s->fifo);
}
av_fifo_generic_peek(s->fifo, tmp, 4, NULL);
len=AV_RL32(tmp);
if (len>0 && av_fifo_size(s->fifo)>=len+4) {
av_fifo_drain(s->fifo, 4); /* skip packet length */
av_fifo_generic_read(s->fifo, h, len, do_udp_write); /* use function for write from fifo buffer */
if (s->circular_buffer_error == len) {
/* all ok - reset error */
s->circular_buffer_error=0;
}
}
pthread_mutex_unlock(&s->mutex);
}
end:
pthread_mutex_unlock(&s->mutex);
return NULL;
}
#endif #endif
static int parse_source_list(char *buf, char **sources, int *num_sources, static int parse_source_list(char *buf, char **sources, int *num_sources,
...@@ -650,6 +727,16 @@ static int udp_open(URLContext *h, const char *uri, int flags) ...@@ -650,6 +727,16 @@ static int udp_open(URLContext *h, const char *uri, int flags)
"'circular_buffer_size' option was set but it is not supported " "'circular_buffer_size' option was set but it is not supported "
"on this build (pthread support is required)\n"); "on this build (pthread support is required)\n");
} }
if (av_find_info_tag(buf, sizeof(buf), "packet_gap", p)) {
if (av_parse_time(&s->packet_gap, buf, 1)<0) {
av_log(h, AV_LOG_ERROR, "Can't parse 'packet_gap'");
goto fail;
}
if (!HAVE_PTHREAD_CANCEL)
av_log(h, AV_LOG_WARNING,
"'packet_gap' option was set but it is not supported "
"on this build (pthread support is required)\n");
}
if (av_find_info_tag(buf, sizeof(buf), "localaddr", p)) { if (av_find_info_tag(buf, sizeof(buf), "localaddr", p)) {
av_strlcpy(localaddr, buf, sizeof(localaddr)); av_strlcpy(localaddr, buf, sizeof(localaddr));
} }
...@@ -829,7 +916,18 @@ static int udp_open(URLContext *h, const char *uri, int flags) ...@@ -829,7 +916,18 @@ static int udp_open(URLContext *h, const char *uri, int flags)
s->udp_fd = udp_fd; s->udp_fd = udp_fd;
#if HAVE_PTHREAD_CANCEL #if HAVE_PTHREAD_CANCEL
if (!is_output && s->circular_buffer_size) { /*
Create thread in case of:
1. Input and circular_buffer_size is set
2. Output and packet_gap and circular_buffer_size is set
*/
if (is_output && s->packet_gap && !s->circular_buffer_size) {
/* Warn user in case of 'circular_buffer_size' is not set */
av_log(h, AV_LOG_WARNING,"'packet_gap' option was set but 'circular_buffer_size' is not, but required\n");
}
if ((!is_output && s->circular_buffer_size) || (is_output && s->packet_gap && s->circular_buffer_size)) {
int ret; int ret;
/* start the task going */ /* start the task going */
...@@ -844,7 +942,7 @@ static int udp_open(URLContext *h, const char *uri, int flags) ...@@ -844,7 +942,7 @@ static int udp_open(URLContext *h, const char *uri, int flags)
av_log(h, AV_LOG_ERROR, "pthread_cond_init failed : %s\n", strerror(ret)); av_log(h, AV_LOG_ERROR, "pthread_cond_init failed : %s\n", strerror(ret));
goto cond_fail; goto cond_fail;
} }
ret = pthread_create(&s->circular_buffer_thread, NULL, circular_buffer_task, h); ret = pthread_create(&s->circular_buffer_thread, NULL, is_output?circular_buffer_task_tx:circular_buffer_task_rx, h);
if (ret != 0) { if (ret != 0) {
av_log(h, AV_LOG_ERROR, "pthread_create failed : %s\n", strerror(ret)); av_log(h, AV_LOG_ERROR, "pthread_create failed : %s\n", strerror(ret));
goto thread_fail; goto thread_fail;
...@@ -945,6 +1043,36 @@ static int udp_write(URLContext *h, const uint8_t *buf, int size) ...@@ -945,6 +1043,36 @@ static int udp_write(URLContext *h, const uint8_t *buf, int size)
UDPContext *s = h->priv_data; UDPContext *s = h->priv_data;
int ret; int ret;
#if HAVE_PTHREAD_CANCEL
if (s->fifo) {
uint8_t tmp[4];
pthread_mutex_lock(&s->mutex);
/*
Return error if last tx failed.
Here we can't know on which packet error was, but it needs to know that error exists.
*/
if (s->circular_buffer_error<0) {
int err=s->circular_buffer_error;
s->circular_buffer_error=0;
pthread_mutex_unlock(&s->mutex);
return err;
}
if(av_fifo_space(s->fifo) < size + 4) {
/* What about a partial packet tx ? */
pthread_mutex_unlock(&s->mutex);
return AVERROR(ENOMEM);
}
AV_WL32(tmp, size);
av_fifo_generic_write(s->fifo, tmp, 4, NULL); /* size of packet */
av_fifo_generic_write(s->fifo, (uint8_t *)buf, size, NULL); /* the data */
pthread_cond_signal(&s->cond);
pthread_mutex_unlock(&s->mutex);
return size;
}
#endif
if (!(h->flags & AVIO_FLAG_NONBLOCK)) { if (!(h->flags & AVIO_FLAG_NONBLOCK)) {
ret = ff_network_wait_fd(s->udp_fd, 1); ret = ff_network_wait_fd(s->udp_fd, 1);
if (ret < 0) if (ret < 0)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment