async.c 18.6 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
/*
 * Input async protocol.
 * Copyright (c) 2015 Zhang Rui <bbcallen@gmail.com>
 *
 * This file is part of FFmpeg.
 *
 * FFmpeg is free software; you can redistribute it and/or
 * modify it under the terms of the GNU Lesser General Public
 * License as published by the Free Software Foundation; either
 * version 2.1 of the License, or (at your option) any later version.
 *
 * FFmpeg is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 * Lesser General Public License for more details.
 *
 * You should have received a copy of the GNU Lesser General Public
 * License along with FFmpeg; if not, write to the Free Software
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 *
 * Based on libavformat/cache.c by Michael Niedermayer
 */

 /**
 * @TODO
 *      support timeout
 *      support work with concatdec, hls
 */

#include "libavutil/avassert.h"
#include "libavutil/avstring.h"
#include "libavutil/error.h"
#include "libavutil/fifo.h"
#include "libavutil/log.h"
#include "libavutil/opt.h"
36
#include "libavutil/thread.h"
37 38 39 40 41 42 43 44
#include "url.h"
#include <stdint.h>

#if HAVE_UNISTD_H
#include <unistd.h>
#endif

#define BUFFER_CAPACITY         (4 * 1024 * 1024)
45
#define READ_BACK_CAPACITY      (4 * 1024 * 1024)
46 47
#define SHORT_SEEK_THRESHOLD    (256 * 1024)

48 49 50 51 52 53 54 55
typedef struct RingBuffer
{
    AVFifoBuffer *fifo;
    int           read_back_capacity;

    int           read_pos;
} RingBuffer;

56 57 58 59 60
typedef struct Context {
    AVClass        *class;
    URLContext     *inner;

    int             seek_request;
61
    int64_t         seek_pos;
62 63 64 65
    int             seek_whence;
    int             seek_completed;
    int64_t         seek_ret;

66
    int             inner_io_error;
67 68 69
    int             io_error;
    int             io_eof_reached;

70 71
    int64_t         logical_pos;
    int64_t         logical_size;
72
    RingBuffer      ring;
73 74 75 76 77 78 79 80 81 82

    pthread_cond_t  cond_wakeup_main;
    pthread_cond_t  cond_wakeup_background;
    pthread_mutex_t mutex;
    pthread_t       async_buffer_thread;

    int             abort_request;
    AVIOInterruptCB interrupt_callback;
} Context;

83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149
static int ring_init(RingBuffer *ring, unsigned int capacity, int read_back_capacity)
{
    memset(ring, 0, sizeof(RingBuffer));
    ring->fifo = av_fifo_alloc(capacity + read_back_capacity);
    if (!ring->fifo)
        return AVERROR(ENOMEM);

    ring->read_back_capacity = read_back_capacity;
    return 0;
}

static void ring_destroy(RingBuffer *ring)
{
    av_fifo_freep(&ring->fifo);
}

static void ring_reset(RingBuffer *ring)
{
    av_fifo_reset(ring->fifo);
    ring->read_pos = 0;
}

static int ring_size(RingBuffer *ring)
{
    return av_fifo_size(ring->fifo) - ring->read_pos;
}

static int ring_space(RingBuffer *ring)
{
    return av_fifo_space(ring->fifo);
}

static int ring_generic_read(RingBuffer *ring, void *dest, int buf_size, void (*func)(void*, void*, int))
{
    int ret;

    av_assert2(buf_size <= ring_size(ring));
    ret = av_fifo_generic_peek_at(ring->fifo, dest, ring->read_pos, buf_size, func);
    ring->read_pos += buf_size;

    if (ring->read_pos > ring->read_back_capacity) {
        av_fifo_drain(ring->fifo, ring->read_pos - ring->read_back_capacity);
        ring->read_pos = ring->read_back_capacity;
    }

    return ret;
}

static int ring_generic_write(RingBuffer *ring, void *src, int size, int (*func)(void*, void*, int))
{
    av_assert2(size <= ring_space(ring));
    return av_fifo_generic_write(ring->fifo, src, size, func);
}

static int ring_size_of_read_back(RingBuffer *ring)
{
    return ring->read_pos;
}

static int ring_drain(RingBuffer *ring, int offset)
{
    av_assert2(offset >= -ring_size_of_read_back(ring));
    av_assert2(offset <= -ring_size(ring));
    ring->read_pos += offset;
    return 0;
}

150
static int async_check_interrupt(void *arg)
151 152 153 154
{
    URLContext *h   = arg;
    Context    *c   = h->priv_data;

155 156 157 158 159
    if (c->abort_request)
        return 1;

    if (ff_check_interrupt(&c->interrupt_callback))
        c->abort_request = 1;
160 161 162 163

    return c->abort_request;
}

164 165 166 167 168 169 170 171 172 173 174 175
static int wrapped_url_read(void *src, void *dst, int size)
{
    URLContext *h   = src;
    Context    *c   = h->priv_data;
    int         ret;

    ret = ffurl_read(c->inner, dst, size);
    c->inner_io_error = ret < 0 ? ret : 0;

    return ret;
}

176 177 178 179
static void *async_buffer_task(void *arg)
{
    URLContext   *h    = arg;
    Context      *c    = h->priv_data;
180
    RingBuffer   *ring = &c->ring;
181
    int           ret  = 0;
182
    int64_t       seek_ret;
183 184 185 186

    while (1) {
        int fifo_space, to_copy;

187
        pthread_mutex_lock(&c->mutex);
188
        if (async_check_interrupt(h)) {
189 190
            c->io_eof_reached = 1;
            c->io_error       = AVERROR_EXIT;
191
            pthread_cond_signal(&c->cond_wakeup_main);
192
            pthread_mutex_unlock(&c->mutex);
193 194 195 196
            break;
        }

        if (c->seek_request) {
197
            seek_ret = ffurl_seek(c->inner, c->seek_pos, c->seek_whence);
198
            if (seek_ret >= 0) {
199 200
                c->io_eof_reached = 0;
                c->io_error       = 0;
201
                ring_reset(ring);
202 203 204
            }

            c->seek_completed = 1;
205
            c->seek_ret       = seek_ret;
206 207 208 209 210 211 212 213
            c->seek_request   = 0;


            pthread_cond_signal(&c->cond_wakeup_main);
            pthread_mutex_unlock(&c->mutex);
            continue;
        }

214
        fifo_space = ring_space(ring);
215 216 217 218 219 220
        if (c->io_eof_reached || fifo_space <= 0) {
            pthread_cond_signal(&c->cond_wakeup_main);
            pthread_cond_wait(&c->cond_wakeup_background, &c->mutex);
            pthread_mutex_unlock(&c->mutex);
            continue;
        }
221
        pthread_mutex_unlock(&c->mutex);
222 223

        to_copy = FFMIN(4096, fifo_space);
224
        ret = ring_generic_write(ring, (void *)h, to_copy, wrapped_url_read);
225 226

        pthread_mutex_lock(&c->mutex);
227 228
        if (ret <= 0) {
            c->io_eof_reached = 1;
229 230
            if (c->inner_io_error < 0)
                c->io_error = c->inner_io_error;
231 232 233 234 235 236 237 238 239 240 241 242 243
        }

        pthread_cond_signal(&c->cond_wakeup_main);
        pthread_mutex_unlock(&c->mutex);
    }

    return NULL;
}

static int async_open(URLContext *h, const char *arg, int flags, AVDictionary **options)
{
    Context         *c = h->priv_data;
    int              ret;
244
    AVIOInterruptCB  interrupt_callback = {.callback = async_check_interrupt, .opaque = h};
245 246 247

    av_strstart(arg, "async:", &arg);

248 249
    ret = ring_init(&c->ring, BUFFER_CAPACITY, READ_BACK_CAPACITY);
    if (ret < 0)
250 251 252 253
        goto fifo_fail;

    /* wrap interrupt callback */
    c->interrupt_callback = h->interrupt_callback;
254
    ret = ffurl_open_whitelist(&c->inner, arg, flags, &interrupt_callback, options, h->protocol_whitelist, h->protocol_blacklist, h);
255
    if (ret != 0) {
256
        av_log(h, AV_LOG_ERROR, "ffurl_open failed : %s, %s\n", av_err2str(ret), arg);
257 258 259 260 261 262 263 264
        goto url_fail;
    }

    c->logical_size = ffurl_size(c->inner);
    h->is_streamed  = c->inner->is_streamed;

    ret = pthread_mutex_init(&c->mutex, NULL);
    if (ret != 0) {
265
        av_log(h, AV_LOG_ERROR, "pthread_mutex_init failed : %s\n", av_err2str(ret));
266 267 268 269 270
        goto mutex_fail;
    }

    ret = pthread_cond_init(&c->cond_wakeup_main, NULL);
    if (ret != 0) {
271
        av_log(h, AV_LOG_ERROR, "pthread_cond_init failed : %s\n", av_err2str(ret));
272 273 274 275 276
        goto cond_wakeup_main_fail;
    }

    ret = pthread_cond_init(&c->cond_wakeup_background, NULL);
    if (ret != 0) {
277
        av_log(h, AV_LOG_ERROR, "pthread_cond_init failed : %s\n", av_err2str(ret));
278 279 280 281 282
        goto cond_wakeup_background_fail;
    }

    ret = pthread_create(&c->async_buffer_thread, NULL, async_buffer_task, h);
    if (ret) {
283
        av_log(h, AV_LOG_ERROR, "pthread_create failed : %s\n", av_err2str(ret));
284 285 286 287 288 289 290 291 292 293 294 295 296 297
        goto thread_fail;
    }

    return 0;

thread_fail:
    pthread_cond_destroy(&c->cond_wakeup_background);
cond_wakeup_background_fail:
    pthread_cond_destroy(&c->cond_wakeup_main);
cond_wakeup_main_fail:
    pthread_mutex_destroy(&c->mutex);
mutex_fail:
    ffurl_close(c->inner);
url_fail:
298
    ring_destroy(&c->ring);
299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314
fifo_fail:
    return ret;
}

static int async_close(URLContext *h)
{
    Context *c = h->priv_data;
    int      ret;

    pthread_mutex_lock(&c->mutex);
    c->abort_request = 1;
    pthread_cond_signal(&c->cond_wakeup_background);
    pthread_mutex_unlock(&c->mutex);

    ret = pthread_join(c->async_buffer_thread, NULL);
    if (ret != 0)
315
        av_log(h, AV_LOG_ERROR, "pthread_join(): %s\n", av_err2str(ret));
316 317 318 319 320

    pthread_cond_destroy(&c->cond_wakeup_background);
    pthread_cond_destroy(&c->cond_wakeup_main);
    pthread_mutex_destroy(&c->mutex);
    ffurl_close(c->inner);
321
    ring_destroy(&c->ring);
322 323 324 325 326 327 328 329

    return 0;
}

static int async_read_internal(URLContext *h, void *dest, int size, int read_complete,
                               void (*func)(void*, void*, int))
{
    Context      *c       = h->priv_data;
330
    RingBuffer   *ring    = &c->ring;
331 332 333 334 335 336 337
    int           to_read = size;
    int           ret     = 0;

    pthread_mutex_lock(&c->mutex);

    while (to_read > 0) {
        int fifo_size, to_copy;
338
        if (async_check_interrupt(h)) {
339 340 341
            ret = AVERROR_EXIT;
            break;
        }
342
        fifo_size = ring_size(ring);
343 344
        to_copy   = FFMIN(to_read, fifo_size);
        if (to_copy > 0) {
345
            ring_generic_read(ring, dest, to_copy, func);
346 347 348 349 350 351 352 353 354
            if (!func)
                dest = (uint8_t *)dest + to_copy;
            c->logical_pos += to_copy;
            to_read        -= to_copy;
            ret             = size - to_read;

            if (to_read <= 0 || !read_complete)
                break;
        } else if (c->io_eof_reached) {
355 356 357 358 359 360
            if (ret <= 0) {
                if (c->io_error)
                    ret = c->io_error;
                else
                    ret = AVERROR_EOF;
            }
361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384
            break;
        }
        pthread_cond_signal(&c->cond_wakeup_background);
        pthread_cond_wait(&c->cond_wakeup_main, &c->mutex);
    }

    pthread_cond_signal(&c->cond_wakeup_background);
    pthread_mutex_unlock(&c->mutex);

    return ret;
}

static int async_read(URLContext *h, unsigned char *buf, int size)
{
    return async_read_internal(h, buf, size, 0, NULL);
}

static void fifo_do_not_copy_func(void* dest, void* src, int size) {
    // do not copy
}

static int64_t async_seek(URLContext *h, int64_t pos, int whence)
{
    Context      *c    = h->priv_data;
385
    RingBuffer   *ring = &c->ring;
386 387 388
    int64_t       ret;
    int64_t       new_logical_pos;
    int fifo_size;
389
    int fifo_size_of_read_back;
390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405

    if (whence == AVSEEK_SIZE) {
        av_log(h, AV_LOG_TRACE, "async_seek: AVSEEK_SIZE: %"PRId64"\n", (int64_t)c->logical_size);
        return c->logical_size;
    } else if (whence == SEEK_CUR) {
        av_log(h, AV_LOG_TRACE, "async_seek: %"PRId64"\n", pos);
        new_logical_pos = pos + c->logical_pos;
    } else if (whence == SEEK_SET){
        av_log(h, AV_LOG_TRACE, "async_seek: %"PRId64"\n", pos);
        new_logical_pos = pos;
    } else {
        return AVERROR(EINVAL);
    }
    if (new_logical_pos < 0)
        return AVERROR(EINVAL);

406 407
    fifo_size = ring_size(ring);
    fifo_size_of_read_back = ring_size_of_read_back(ring);
408 409 410
    if (new_logical_pos == c->logical_pos) {
        /* current position */
        return c->logical_pos;
411
    } else if ((new_logical_pos >= (c->logical_pos - fifo_size_of_read_back)) &&
412
               (new_logical_pos < (c->logical_pos + fifo_size + SHORT_SEEK_THRESHOLD))) {
413
        int pos_delta = (int)(new_logical_pos - c->logical_pos);
414 415 416 417
        /* fast seek */
        av_log(h, AV_LOG_TRACE, "async_seek: fask_seek %"PRId64" from %d dist:%d/%d\n",
                new_logical_pos, (int)c->logical_pos,
                (int)(new_logical_pos - c->logical_pos), fifo_size);
418 419 420 421 422 423 424 425 426 427

        if (pos_delta > 0) {
            // fast seek forwards
            async_read_internal(h, NULL, pos_delta, 1, fifo_do_not_copy_func);
        } else {
            // fast seek backwards
            ring_drain(ring, pos_delta);
            c->logical_pos = new_logical_pos;
        }

428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445
        return c->logical_pos;
    } else if (c->logical_size <= 0) {
        /* can not seek */
        return AVERROR(EINVAL);
    } else if (new_logical_pos > c->logical_size) {
        /* beyond end */
        return AVERROR(EINVAL);
    }

    pthread_mutex_lock(&c->mutex);

    c->seek_request   = 1;
    c->seek_pos       = new_logical_pos;
    c->seek_whence    = SEEK_SET;
    c->seek_completed = 0;
    c->seek_ret       = 0;

    while (1) {
446
        if (async_check_interrupt(h)) {
447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471
            ret = AVERROR_EXIT;
            break;
        }
        if (c->seek_completed) {
            if (c->seek_ret >= 0)
                c->logical_pos  = c->seek_ret;
            ret = c->seek_ret;
            break;
        }
        pthread_cond_signal(&c->cond_wakeup_background);
        pthread_cond_wait(&c->cond_wakeup_main, &c->mutex);
    }

    pthread_mutex_unlock(&c->mutex);

    return ret;
}

#define OFFSET(x) offsetof(Context, x)
#define D AV_OPT_FLAG_DECODING_PARAM

static const AVOption options[] = {
    {NULL},
};

472 473 474
#undef D
#undef OFFSET

475 476 477 478 479 480 481
static const AVClass async_context_class = {
    .class_name = "Async",
    .item_name  = av_default_item_name,
    .option     = options,
    .version    = LIBAVUTIL_VERSION_INT,
};

482
const URLProtocol ff_async_protocol = {
483 484 485 486 487 488 489 490
    .name                = "async",
    .url_open2           = async_open,
    .url_read            = async_read,
    .url_seek            = async_seek,
    .url_close           = async_close,
    .priv_data_size      = sizeof(Context),
    .priv_data_class     = &async_context_class,
};
491

492
#if 0
493 494 495 496 497 498

#define TEST_SEEK_POS    (1536)
#define TEST_STREAM_SIZE (2048)

typedef struct TestContext {
    AVClass        *class;
499 500
    int64_t         logical_pos;
    int64_t         logical_size;
501 502 503

    /* options */
    int             opt_read_error;
504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524
} TestContext;

static int async_test_open(URLContext *h, const char *arg, int flags, AVDictionary **options)
{
    TestContext *c = h->priv_data;
    c->logical_pos  = 0;
    c->logical_size = TEST_STREAM_SIZE;
    return 0;
}

static int async_test_close(URLContext *h)
{
    return 0;
}

static int async_test_read(URLContext *h, unsigned char *buf, int size)
{
    TestContext *c = h->priv_data;
    int          i;
    int          read_len = 0;

525 526 527
    if (c->opt_read_error)
        return c->opt_read_error;

528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550
    if (c->logical_pos >= c->logical_size)
        return AVERROR_EOF;

    for (i = 0; i < size; ++i) {
        buf[i] = c->logical_pos & 0xFF;

        c->logical_pos++;
        read_len++;

        if (c->logical_pos >= c->logical_size)
            break;
    }

    return read_len;
}

static int64_t async_test_seek(URLContext *h, int64_t pos, int whence)
{
    TestContext *c = h->priv_data;
    int64_t      new_logical_pos;

    if (whence == AVSEEK_SIZE) {
        return c->logical_size;
551
    } else if (whence == SEEK_CUR) {
552 553 554 555 556 557 558 559 560 561 562 563 564
        new_logical_pos = pos + c->logical_pos;
    } else if (whence == SEEK_SET){
        new_logical_pos = pos;
    } else {
        return AVERROR(EINVAL);
    }
    if (new_logical_pos < 0)
        return AVERROR(EINVAL);

    c->logical_pos = new_logical_pos;
    return new_logical_pos;
}

565 566 567 568 569 570 571 572 573 574 575 576
#define OFFSET(x) offsetof(TestContext, x)
#define D AV_OPT_FLAG_DECODING_PARAM

static const AVOption async_test_options[] = {
    { "async-test-read-error",      "cause read fail",
        OFFSET(opt_read_error),     AV_OPT_TYPE_INT, { .i64 = 0 }, INT_MIN, INT_MAX, .flags = D },
    {NULL},
};

#undef D
#undef OFFSET

577 578 579
static const AVClass async_test_context_class = {
    .class_name = "Async-Test",
    .item_name  = av_default_item_name,
580
    .option     = async_test_options,
581 582 583
    .version    = LIBAVUTIL_VERSION_INT,
};

584
const URLProtocol ff_async_test_protocol = {
585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602
    .name                = "async-test",
    .url_open2           = async_test_open,
    .url_read            = async_test_read,
    .url_seek            = async_test_seek,
    .url_close           = async_test_close,
    .priv_data_size      = sizeof(TestContext),
    .priv_data_class     = &async_test_context_class,
};

int main(void)
{
    URLContext   *h = NULL;
    int           i;
    int           ret;
    int64_t       size;
    int64_t       pos;
    int64_t       read_len;
    unsigned char buf[4096];
603
    AVDictionary *opts = NULL;
604 605 606 607

    ffurl_register_protocol(&ff_async_protocol);
    ffurl_register_protocol(&ff_async_test_protocol);

608 609 610
    /*
     * test normal read
     */
611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644
    ret = ffurl_open(&h, "async:async-test:", AVIO_FLAG_READ, NULL, NULL);
    printf("open: %d\n", ret);

    size = ffurl_size(h);
    printf("size: %"PRId64"\n", size);

    pos = ffurl_seek(h, 0, SEEK_CUR);
    read_len = 0;
    while (1) {
        ret = ffurl_read(h, buf, sizeof(buf));
        if (ret == AVERROR_EOF) {
            printf("read-error: AVERROR_EOF at %"PRId64"\n", ffurl_seek(h, 0, SEEK_CUR));
            break;
        }
        else if (ret == 0)
            break;
        else if (ret < 0) {
            printf("read-error: %d at %"PRId64"\n", ret, ffurl_seek(h, 0, SEEK_CUR));
            goto fail;
        } else {
            for (i = 0; i < ret; ++i) {
                if (buf[i] != (pos & 0xFF)) {
                    printf("read-mismatch: actual %d, expecting %d, at %"PRId64"\n",
                           (int)buf[i], (int)(pos & 0xFF), pos);
                    break;
                }
                pos++;
            }
        }

        read_len += ret;
    }
    printf("read: %"PRId64"\n", read_len);

645 646 647
    /*
     * test normal seek
     */
648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681
    ret = ffurl_read(h, buf, 1);
    printf("read: %d\n", ret);

    pos = ffurl_seek(h, TEST_SEEK_POS, SEEK_SET);
    printf("seek: %"PRId64"\n", pos);

    read_len = 0;
    while (1) {
        ret = ffurl_read(h, buf, sizeof(buf));
        if (ret == AVERROR_EOF)
            break;
        else if (ret == 0)
            break;
        else if (ret < 0) {
            printf("read-error: %d at %"PRId64"\n", ret, ffurl_seek(h, 0, SEEK_CUR));
            goto fail;
        } else {
            for (i = 0; i < ret; ++i) {
                if (buf[i] != (pos & 0xFF)) {
                    printf("read-mismatch: actual %d, expecting %d, at %"PRId64"\n",
                           (int)buf[i], (int)(pos & 0xFF), pos);
                    break;
                }
                pos++;
            }
        }

        read_len += ret;
    }
    printf("read: %"PRId64"\n", read_len);

    ret = ffurl_read(h, buf, 1);
    printf("read: %d\n", ret);

682 683 684 685 686 687 688 689 690 691 692
    /*
     * test read error
     */
    ffurl_close(h);
    av_dict_set_int(&opts, "async-test-read-error", -10000, 0);
    ret = ffurl_open(&h, "async:async-test:", AVIO_FLAG_READ, NULL, &opts);
    printf("open: %d\n", ret);

    ret = ffurl_read(h, buf, 1);
    printf("read: %d\n", ret);

693
fail:
694
    av_dict_free(&opts);
695 696 697 698 699
    ffurl_close(h);
    return 0;
}

#endif