Commit 2063f3ed authored by Jan Sebechlebsky's avatar Jan Sebechlebsky Committed by Marton Balint

avformat/tee: Handling slave failure in tee muxer

Adds per slave option 'onfail' to the tee muxer allowing an output to
fail, so other slave outputs can continue.
Reviewed-by: 's avatarNicolas George <george@nsup.org>
Signed-off-by: 's avatarJan Sebechlebsky <sebechlebskyjan@gmail.com>
Signed-off-by: 's avatarMarton Balint <cus@passwd.hu>
parent f9d7e9fe
...@@ -1453,6 +1453,12 @@ Select the streams that should be mapped to the slave output, ...@@ -1453,6 +1453,12 @@ Select the streams that should be mapped to the slave output,
specified by a stream specifier. If not specified, this defaults to specified by a stream specifier. If not specified, this defaults to
all the input streams. You may use multiple stream specifiers all the input streams. You may use multiple stream specifiers
separated by commas (@code{,}) e.g.: @code{a:0,v} separated by commas (@code{,}) e.g.: @code{a:0,v}
@item onfail
Specify behaviour on output failure. This can be set to either @code{abort} (which is
default) or @code{ignore}. @code{abort} will cause whole process to fail in case of failure
on this slave output. @code{ignore} will ignore failure on this output, so other outputs
will continue without being affected.
@end table @end table
@subsection Examples @subsection Examples
...@@ -1466,6 +1472,14 @@ ffmpeg -i ... -c:v libx264 -c:a mp2 -f tee -map 0:v -map 0:a ...@@ -1466,6 +1472,14 @@ ffmpeg -i ... -c:v libx264 -c:a mp2 -f tee -map 0:v -map 0:a
"archive-20121107.mkv|[f=mpegts]udp://10.0.1.255:1234/" "archive-20121107.mkv|[f=mpegts]udp://10.0.1.255:1234/"
@end example @end example
@item
As above, but continue streaming even if output to local file fails
(for example local drive fills up):
@example
ffmpeg -i ... -c:v libx264 -c:a mp2 -f tee -map 0:v -map 0:a
"[onfail=ignore]archive-20121107.mkv|[f=mpegts]udp://10.0.1.255:1234/"
@end example
@item @item
Use @command{ffmpeg} to encode the input, and send the output Use @command{ffmpeg} to encode the input, and send the output
to three different destinations. The @code{dump_extra} bitstream to three different destinations. The @code{dump_extra} bitstream
......
...@@ -29,10 +29,19 @@ ...@@ -29,10 +29,19 @@
#define MAX_SLAVES 16 #define MAX_SLAVES 16
typedef enum {
ON_SLAVE_FAILURE_ABORT = 1,
ON_SLAVE_FAILURE_IGNORE = 2
} SlaveFailurePolicy;
#define DEFAULT_SLAVE_FAILURE_POLICY ON_SLAVE_FAILURE_ABORT
typedef struct { typedef struct {
AVFormatContext *avf; AVFormatContext *avf;
AVBitStreamFilterContext **bsfs; ///< bitstream filters per stream AVBitStreamFilterContext **bsfs; ///< bitstream filters per stream
SlaveFailurePolicy on_fail;
/** map from input to output streams indexes, /** map from input to output streams indexes,
* disabled output streams are set to -1 */ * disabled output streams are set to -1 */
int *stream_map; int *stream_map;
...@@ -42,6 +51,7 @@ typedef struct { ...@@ -42,6 +51,7 @@ typedef struct {
typedef struct TeeContext { typedef struct TeeContext {
const AVClass *class; const AVClass *class;
unsigned nb_slaves; unsigned nb_slaves;
unsigned nb_alive;
TeeSlave slaves[MAX_SLAVES]; TeeSlave slaves[MAX_SLAVES];
} TeeContext; } TeeContext;
...@@ -136,6 +146,23 @@ end: ...@@ -136,6 +146,23 @@ end:
return ret; return ret;
} }
static inline int parse_slave_failure_policy_option(const char *opt, TeeSlave *tee_slave)
{
if (!opt) {
tee_slave->on_fail = DEFAULT_SLAVE_FAILURE_POLICY;
return 0;
} else if (!av_strcasecmp("abort", opt)) {
tee_slave->on_fail = ON_SLAVE_FAILURE_ABORT;
return 0;
} else if (!av_strcasecmp("ignore", opt)) {
tee_slave->on_fail = ON_SLAVE_FAILURE_IGNORE;
return 0;
}
/* Set failure behaviour to abort, so invalid option error will not be ignored */
tee_slave->on_fail = ON_SLAVE_FAILURE_ABORT;
return AVERROR(EINVAL);
}
static int close_slave(TeeSlave *tee_slave) static int close_slave(TeeSlave *tee_slave)
{ {
AVFormatContext *avf; AVFormatContext *avf;
...@@ -184,7 +211,7 @@ static int open_slave(AVFormatContext *avf, char *slave, TeeSlave *tee_slave) ...@@ -184,7 +211,7 @@ static int open_slave(AVFormatContext *avf, char *slave, TeeSlave *tee_slave)
AVDictionary *options = NULL; AVDictionary *options = NULL;
AVDictionaryEntry *entry; AVDictionaryEntry *entry;
char *filename; char *filename;
char *format = NULL, *select = NULL; char *format = NULL, *select = NULL, *on_fail = NULL;
AVFormatContext *avf2 = NULL; AVFormatContext *avf2 = NULL;
AVStream *st, *st2; AVStream *st, *st2;
int stream_count; int stream_count;
...@@ -204,6 +231,14 @@ static int open_slave(AVFormatContext *avf, char *slave, TeeSlave *tee_slave) ...@@ -204,6 +231,14 @@ static int open_slave(AVFormatContext *avf, char *slave, TeeSlave *tee_slave)
STEAL_OPTION("f", format); STEAL_OPTION("f", format);
STEAL_OPTION("select", select); STEAL_OPTION("select", select);
STEAL_OPTION("onfail", on_fail);
ret = parse_slave_failure_policy_option(on_fail, tee_slave);
if (ret < 0) {
av_log(avf, AV_LOG_ERROR,
"Invalid onfail option value, valid options are 'abort' and 'ignore'\n");
goto end;
}
ret = avformat_alloc_output_context2(&avf2, NULL, format, filename); ret = avformat_alloc_output_context2(&avf2, NULL, format, filename);
if (ret < 0) if (ret < 0)
...@@ -351,6 +386,7 @@ static int open_slave(AVFormatContext *avf, char *slave, TeeSlave *tee_slave) ...@@ -351,6 +386,7 @@ static int open_slave(AVFormatContext *avf, char *slave, TeeSlave *tee_slave)
end: end:
av_free(format); av_free(format);
av_free(select); av_free(select);
av_free(on_fail);
av_dict_free(&options); av_dict_free(&options);
av_freep(&tmp_select); av_freep(&tmp_select);
return ret; return ret;
...@@ -380,6 +416,28 @@ static void log_slave(TeeSlave *slave, void *log_ctx, int log_level) ...@@ -380,6 +416,28 @@ static void log_slave(TeeSlave *slave, void *log_ctx, int log_level)
} }
} }
static int tee_process_slave_failure(AVFormatContext *avf, unsigned slave_idx, int err_n)
{
TeeContext *tee = avf->priv_data;
TeeSlave *tee_slave = &tee->slaves[slave_idx];
tee->nb_alive--;
close_slave(tee_slave);
if (!tee->nb_alive) {
av_log(avf, AV_LOG_ERROR, "All tee outputs failed.\n");
return err_n;
} else if (tee_slave->on_fail == ON_SLAVE_FAILURE_ABORT) {
av_log(avf, AV_LOG_ERROR, "Slave muxer #%u failed, aborting.\n", slave_idx);
return err_n;
} else {
av_log(avf, AV_LOG_ERROR, "Slave muxer #%u failed: %s, continuing with %u/%u slaves.\n",
slave_idx, av_err2str(err_n), tee->nb_alive, tee->nb_slaves);
return 0;
}
}
static int tee_write_header(AVFormatContext *avf) static int tee_write_header(AVFormatContext *avf)
{ {
TeeContext *tee = avf->priv_data; TeeContext *tee = avf->priv_data;
...@@ -403,19 +461,24 @@ static int tee_write_header(AVFormatContext *avf) ...@@ -403,19 +461,24 @@ static int tee_write_header(AVFormatContext *avf)
filename++; filename++;
} }
tee->nb_slaves = nb_slaves; tee->nb_slaves = tee->nb_alive = nb_slaves;
for (i = 0; i < nb_slaves; i++) { for (i = 0; i < nb_slaves; i++) {
if ((ret = open_slave(avf, slaves[i], &tee->slaves[i])) < 0) if ((ret = open_slave(avf, slaves[i], &tee->slaves[i])) < 0) {
goto fail; ret = tee_process_slave_failure(avf, i, ret);
log_slave(&tee->slaves[i], avf, AV_LOG_VERBOSE); if (ret < 0)
goto fail;
} else {
log_slave(&tee->slaves[i], avf, AV_LOG_VERBOSE);
}
av_freep(&slaves[i]); av_freep(&slaves[i]);
} }
for (i = 0; i < avf->nb_streams; i++) { for (i = 0; i < avf->nb_streams; i++) {
int j, mapped = 0; int j, mapped = 0;
for (j = 0; j < tee->nb_slaves; j++) for (j = 0; j < tee->nb_slaves; j++)
mapped += tee->slaves[j].stream_map[i] >= 0; if (tee->slaves[j].avf)
mapped += tee->slaves[j].stream_map[i] >= 0;
if (!mapped) if (!mapped)
av_log(avf, AV_LOG_WARNING, "Input stream #%d is not mapped " av_log(avf, AV_LOG_WARNING, "Input stream #%d is not mapped "
"to any slave.\n", i); "to any slave.\n", i);
...@@ -436,9 +499,11 @@ static int tee_write_trailer(AVFormatContext *avf) ...@@ -436,9 +499,11 @@ static int tee_write_trailer(AVFormatContext *avf)
unsigned i; unsigned i;
for (i = 0; i < tee->nb_slaves; i++) { for (i = 0; i < tee->nb_slaves; i++) {
if ((ret = close_slave(&tee->slaves[i])) < 0) if ((ret = close_slave(&tee->slaves[i])) < 0) {
if (!ret_all) ret = tee_process_slave_failure(avf, i, ret);
if (!ret_all && ret < 0)
ret_all = ret; ret_all = ret;
}
} }
return ret_all; return ret_all;
} }
...@@ -454,7 +519,9 @@ static int tee_write_packet(AVFormatContext *avf, AVPacket *pkt) ...@@ -454,7 +519,9 @@ static int tee_write_packet(AVFormatContext *avf, AVPacket *pkt)
AVRational tb, tb2; AVRational tb, tb2;
for (i = 0; i < tee->nb_slaves; i++) { for (i = 0; i < tee->nb_slaves; i++) {
avf2 = tee->slaves[i].avf; if (!(avf2 = tee->slaves[i].avf))
continue;
s = pkt->stream_index; s = pkt->stream_index;
s2 = tee->slaves[i].stream_map[s]; s2 = tee->slaves[i].stream_map[s];
if (s2 < 0) if (s2 < 0)
...@@ -475,9 +542,11 @@ static int tee_write_packet(AVFormatContext *avf, AVPacket *pkt) ...@@ -475,9 +542,11 @@ static int tee_write_packet(AVFormatContext *avf, AVPacket *pkt)
if ((ret = av_apply_bitstream_filters(avf2->streams[s2]->codec, &pkt2, if ((ret = av_apply_bitstream_filters(avf2->streams[s2]->codec, &pkt2,
tee->slaves[i].bsfs[s2])) < 0 || tee->slaves[i].bsfs[s2])) < 0 ||
(ret = av_interleaved_write_frame(avf2, &pkt2)) < 0) (ret = av_interleaved_write_frame(avf2, &pkt2)) < 0) {
if (!ret_all) ret = tee_process_slave_failure(avf, i, ret);
if (!ret_all && ret < 0)
ret_all = ret; ret_all = ret;
}
} }
return ret_all; return ret_all;
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment