udp.c 35.1 KB
Newer Older
Fabrice Bellard's avatar
Fabrice Bellard committed
1 2
/*
 * UDP prototype streaming system
3
 * Copyright (c) 2000, 2001, 2002 Fabrice Bellard
Fabrice Bellard's avatar
Fabrice Bellard committed
4
 *
5 6 7
 * This file is part of FFmpeg.
 *
 * FFmpeg is free software; you can redistribute it and/or
8 9
 * modify it under the terms of the GNU Lesser General Public
 * License as published by the Free Software Foundation; either
10
 * version 2.1 of the License, or (at your option) any later version.
Fabrice Bellard's avatar
Fabrice Bellard committed
11
 *
12
 * FFmpeg is distributed in the hope that it will be useful,
Fabrice Bellard's avatar
Fabrice Bellard committed
13
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 15
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 * Lesser General Public License for more details.
Fabrice Bellard's avatar
Fabrice Bellard committed
16
 *
17
 * You should have received a copy of the GNU Lesser General Public
18
 * License along with FFmpeg; if not, write to the Free Software
19
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
Fabrice Bellard's avatar
Fabrice Bellard committed
20
 */
21 22

/**
23
 * @file
24 25 26
 * UDP protocol
 */

27
#define _DEFAULT_SOURCE
28
#define _BSD_SOURCE     /* Needed for using struct ip_mreq with recent glibc */
29

Fabrice Bellard's avatar
Fabrice Bellard committed
30
#include "avformat.h"
31
#include "avio_internal.h"
32
#include "libavutil/parseutils.h"
33
#include "libavutil/fifo.h"
34
#include "libavutil/intreadwrite.h"
35
#include "libavutil/avstring.h"
36 37
#include "libavutil/opt.h"
#include "libavutil/log.h"
38
#include "libavutil/time.h"
39
#include "internal.h"
40
#include "network.h"
41
#include "os_support.h"
42
#include "url.h"
43

44 45 46 47 48 49 50 51 52 53 54 55 56 57
#if HAVE_UDPLITE_H
#include "udplite.h"
#else
/* On many Linux systems, udplite.h is missing but the kernel supports UDP-Lite.
 * So, we provide a fallback here.
 */
#define UDPLITE_SEND_CSCOV                               10
#define UDPLITE_RECV_CSCOV                               11
#endif

#ifndef IPPROTO_UDPLITE
#define IPPROTO_UDPLITE                                  136
#endif

58
#if HAVE_PTHREAD_CANCEL
59
#include <pthread.h>
60 61
#endif

62 63 64 65
#ifndef HAVE_PTHREAD_CANCEL
#define HAVE_PTHREAD_CANCEL 0
#endif

66 67 68 69 70
#ifndef IPV6_ADD_MEMBERSHIP
#define IPV6_ADD_MEMBERSHIP IPV6_JOIN_GROUP
#define IPV6_DROP_MEMBERSHIP IPV6_LEAVE_GROUP
#endif

71 72
#define UDP_TX_BUF_SIZE 32768
#define UDP_MAX_PKT_SIZE 65536
73
#define UDP_HEADER_SIZE 8
74

75
typedef struct UDPContext {
76
    const AVClass *class;
77 78
    int udp_fd;
    int ttl;
79
    int udplite_coverage;
80
    int buffer_size;
Luca Barbato's avatar
Luca Barbato committed
81
    int pkt_size;
82
    int is_multicast;
83
    int is_broadcast;
84
    int local_port;
85
    int reuse_socket;
86
    int overrun_nonfatal;
87
    struct sockaddr_storage dest_addr;
88
    int dest_addr_len;
89
    int is_connected;
90 91 92

    /* Circular Buffer variables for use in UDP receive code */
    int circular_buffer_size;
93
    AVFifoBuffer *fifo;
94
    int circular_buffer_error;
95
#if HAVE_PTHREAD_CANCEL
96
    pthread_t circular_buffer_thread;
97 98
    pthread_mutex_t mutex;
    pthread_cond_t cond;
99
    int thread_started;
100
#endif
101 102
    uint8_t tmp[UDP_MAX_PKT_SIZE+4];
    int remaining_in_dg;
Luca Barbato's avatar
Luca Barbato committed
103
    char *localaddr;
104
    int timeout;
105
    struct sockaddr_storage local_addr_storage;
Luca Barbato's avatar
Luca Barbato committed
106 107
    char *sources;
    char *block;
Fabrice Bellard's avatar
Fabrice Bellard committed
108 109
} UDPContext;

110 111 112 113
#define OFFSET(x) offsetof(UDPContext, x)
#define D AV_OPT_FLAG_DECODING_PARAM
#define E AV_OPT_FLAG_ENCODING_PARAM
static const AVOption options[] = {
Luca Barbato's avatar
Luca Barbato committed
114
    { "buffer_size",    "System data size (in bytes)",                     OFFSET(buffer_size),    AV_OPT_TYPE_INT,    { .i64 = -1 },    -1, INT_MAX, .flags = D|E },
115
    { "localport",      "Local port",                                      OFFSET(local_port),     AV_OPT_TYPE_INT,    { .i64 = -1 },    -1, INT_MAX, D|E },
Luca Barbato's avatar
Luca Barbato committed
116 117
    { "local_port",     "Local port",                                      OFFSET(local_port),     AV_OPT_TYPE_INT,    { .i64 = -1 },    -1, INT_MAX, .flags = D|E },
    { "localaddr",      "Local address",                                   OFFSET(localaddr),      AV_OPT_TYPE_STRING, { .str = NULL },               .flags = D|E },
118 119 120 121 122 123 124 125 126 127
    { "udplite_coverage", "choose UDPLite head size which should be validated by checksum", OFFSET(udplite_coverage), AV_OPT_TYPE_INT, {.i64 = 0}, 0, INT_MAX, D|E },
    { "pkt_size",       "Maximum UDP packet size",                         OFFSET(pkt_size),       AV_OPT_TYPE_INT,    { .i64 = 1472 },  -1, INT_MAX, .flags = D|E },
    { "reuse",          "explicitly allow reusing UDP sockets",            OFFSET(reuse_socket),   AV_OPT_TYPE_INT,    { .i64 = -1 },    -1, 1,       D|E },
    { "reuse_socket",   "explicitly allow reusing UDP sockets",            OFFSET(reuse_socket),   AV_OPT_TYPE_INT,    { .i64 = -1 },    -1, 1,       .flags = D|E },
    { "broadcast", "explicitly allow or disallow broadcast destination",   OFFSET(is_broadcast),   AV_OPT_TYPE_INT,    { .i64 = 0  },     0, 1,       E },
    { "ttl",            "Time to live (multicast only)",                   OFFSET(ttl),            AV_OPT_TYPE_INT,    { .i64 = 16 },     0, INT_MAX, E },
    { "connect",        "set if connect() should be called on socket",     OFFSET(is_connected),   AV_OPT_TYPE_INT,    { .i64 =  0 },     0, 1,       .flags = D|E },
    { "fifo_size",      "set the UDP receiving circular buffer size, expressed as a number of packets with size of 188 bytes", OFFSET(circular_buffer_size), AV_OPT_TYPE_INT, {.i64 = 7*4096}, 0, INT_MAX, D },
    { "overrun_nonfatal", "survive in case of UDP receiving circular buffer overrun", OFFSET(overrun_nonfatal), AV_OPT_TYPE_INT, {.i64 = 0}, 0, 1,    D },
    { "timeout",        "set raise error timeout (only in read mode)",     OFFSET(timeout),        AV_OPT_TYPE_INT,    { .i64 = 0 },      0, INT_MAX, D },
Luca Barbato's avatar
Luca Barbato committed
128 129 130
    { "sources",        "Source list",                                     OFFSET(sources),        AV_OPT_TYPE_STRING, { .str = NULL },               .flags = D|E },
    { "block",          "Block list",                                      OFFSET(block),          AV_OPT_TYPE_STRING, { .str = NULL },               .flags = D|E },
    { NULL }
131 132
};

Luca Barbato's avatar
Luca Barbato committed
133 134 135 136 137
static const AVClass udp_class = {
    .class_name = "udp",
    .item_name  = av_default_item_name,
    .option     = options,
    .version    = LIBAVUTIL_VERSION_INT,
138 139
};

140 141 142 143 144 145 146
static const AVClass udplite_context_class = {
    .class_name     = "udplite",
    .item_name      = av_default_item_name,
    .option         = options,
    .version        = LIBAVUTIL_VERSION_INT,
};

147 148 149 150 151 152 153
static void log_net_error(void *ctx, int level, const char* prefix)
{
    char errbuf[100];
    av_strerror(ff_neterrno(), errbuf, sizeof(errbuf));
    av_log(ctx, level, "%s: %s\n", prefix, errbuf);
}

154 155 156
static int udp_set_multicast_ttl(int sockfd, int mcastTTL,
                                 struct sockaddr *addr)
{
157
#ifdef IP_MULTICAST_TTL
158 159
    if (addr->sa_family == AF_INET) {
        if (setsockopt(sockfd, IPPROTO_IP, IP_MULTICAST_TTL, &mcastTTL, sizeof(mcastTTL)) < 0) {
160
            log_net_error(NULL, AV_LOG_ERROR, "setsockopt(IP_MULTICAST_TTL)");
161 162 163
            return -1;
        }
    }
164
#endif
165
#if defined(IPPROTO_IPV6) && defined(IPV6_MULTICAST_HOPS)
166 167
    if (addr->sa_family == AF_INET6) {
        if (setsockopt(sockfd, IPPROTO_IPV6, IPV6_MULTICAST_HOPS, &mcastTTL, sizeof(mcastTTL)) < 0) {
168
            log_net_error(NULL, AV_LOG_ERROR, "setsockopt(IPV6_MULTICAST_HOPS)");
169 170 171
            return -1;
        }
    }
172
#endif
173 174 175
    return 0;
}

176
static int udp_join_multicast_group(int sockfd, struct sockaddr *addr,struct sockaddr *local_addr)
177
{
178
#ifdef IP_ADD_MEMBERSHIP
179
    if (addr->sa_family == AF_INET) {
180 181
        struct ip_mreq mreq;

182
        mreq.imr_multiaddr.s_addr = ((struct sockaddr_in *)addr)->sin_addr.s_addr;
183 184 185 186
        if (local_addr)
            mreq.imr_interface= ((struct sockaddr_in *)local_addr)->sin_addr;
        else
            mreq.imr_interface.s_addr= INADDR_ANY;
187
        if (setsockopt(sockfd, IPPROTO_IP, IP_ADD_MEMBERSHIP, (const void *)&mreq, sizeof(mreq)) < 0) {
188
            log_net_error(NULL, AV_LOG_ERROR, "setsockopt(IP_ADD_MEMBERSHIP)");
189 190 191
            return -1;
        }
    }
192
#endif
193
#if HAVE_STRUCT_IPV6_MREQ && defined(IPPROTO_IPV6)
194
    if (addr->sa_family == AF_INET6) {
195 196
        struct ipv6_mreq mreq6;

197 198 199
        memcpy(&mreq6.ipv6mr_multiaddr, &(((struct sockaddr_in6 *)addr)->sin6_addr), sizeof(struct in6_addr));
        mreq6.ipv6mr_interface= 0;
        if (setsockopt(sockfd, IPPROTO_IPV6, IPV6_ADD_MEMBERSHIP, &mreq6, sizeof(mreq6)) < 0) {
200
            log_net_error(NULL, AV_LOG_ERROR, "setsockopt(IPV6_ADD_MEMBERSHIP)");
201 202 203
            return -1;
        }
    }
204
#endif
205 206 207
    return 0;
}

208
static int udp_leave_multicast_group(int sockfd, struct sockaddr *addr,struct sockaddr *local_addr)
209
{
210
#ifdef IP_DROP_MEMBERSHIP
211
    if (addr->sa_family == AF_INET) {
212 213
        struct ip_mreq mreq;

214
        mreq.imr_multiaddr.s_addr = ((struct sockaddr_in *)addr)->sin_addr.s_addr;
215 216 217 218
        if (local_addr)
            mreq.imr_interface= ((struct sockaddr_in *)local_addr)->sin_addr;
        else
            mreq.imr_interface.s_addr= INADDR_ANY;
219
        if (setsockopt(sockfd, IPPROTO_IP, IP_DROP_MEMBERSHIP, (const void *)&mreq, sizeof(mreq)) < 0) {
220
            log_net_error(NULL, AV_LOG_ERROR, "setsockopt(IP_DROP_MEMBERSHIP)");
221 222 223
            return -1;
        }
    }
224
#endif
225
#if HAVE_STRUCT_IPV6_MREQ && defined(IPPROTO_IPV6)
226
    if (addr->sa_family == AF_INET6) {
227 228
        struct ipv6_mreq mreq6;

229 230 231
        memcpy(&mreq6.ipv6mr_multiaddr, &(((struct sockaddr_in6 *)addr)->sin6_addr), sizeof(struct in6_addr));
        mreq6.ipv6mr_interface= 0;
        if (setsockopt(sockfd, IPPROTO_IPV6, IPV6_DROP_MEMBERSHIP, &mreq6, sizeof(mreq6)) < 0) {
232
            log_net_error(NULL, AV_LOG_ERROR, "setsockopt(IPV6_DROP_MEMBERSHIP)");
233 234 235
            return -1;
        }
    }
236
#endif
237 238 239
    return 0;
}

240 241 242
static struct addrinfo* udp_resolve_host(const char *hostname, int port,
                                         int type, int family, int flags)
{
243
    struct addrinfo hints = { 0 }, *res = 0;
244 245
    int error;
    char sport[16];
246
    const char *node = 0, *service = "0";
247 248

    if (port > 0) {
Michael Niedermayer's avatar
Michael Niedermayer committed
249
        snprintf(sport, sizeof(sport), "%d", port);
250 251 252 253 254
        service = sport;
    }
    if ((hostname) && (hostname[0] != '\0') && (hostname[0] != '?')) {
        node = hostname;
    }
Luca Abeni's avatar
Luca Abeni committed
255 256 257 258
    hints.ai_socktype = type;
    hints.ai_family   = family;
    hints.ai_flags = flags;
    if ((error = getaddrinfo(node, service, &hints, &res))) {
259
        res = NULL;
260
        av_log(NULL, AV_LOG_ERROR, "udp_resolve_host: %s\n", gai_strerror(error));
Luca Abeni's avatar
Luca Abeni committed
261 262
    }

263 264 265
    return res;
}

266 267 268 269 270 271 272 273 274 275 276 277 278
static int udp_set_multicast_sources(int sockfd, struct sockaddr *addr,
                                     int addr_len, char **sources,
                                     int nb_sources, int include)
{
#if HAVE_STRUCT_GROUP_SOURCE_REQ && defined(MCAST_BLOCK_SOURCE) && !defined(_WIN32)
    /* These ones are available in the microsoft SDK, but don't seem to work
     * as on linux, so just prefer the v4-only approach there for now. */
    int i;
    for (i = 0; i < nb_sources; i++) {
        struct group_source_req mreqs;
        int level = addr->sa_family == AF_INET ? IPPROTO_IP : IPPROTO_IPV6;
        struct addrinfo *sourceaddr = udp_resolve_host(sources[i], 0,
                                                       SOCK_DGRAM, AF_UNSPEC,
279
                                                       0);
280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308
        if (!sourceaddr)
            return AVERROR(ENOENT);

        mreqs.gsr_interface = 0;
        memcpy(&mreqs.gsr_group, addr, addr_len);
        memcpy(&mreqs.gsr_source, sourceaddr->ai_addr, sourceaddr->ai_addrlen);
        freeaddrinfo(sourceaddr);

        if (setsockopt(sockfd, level,
                       include ? MCAST_JOIN_SOURCE_GROUP : MCAST_BLOCK_SOURCE,
                       (const void *)&mreqs, sizeof(mreqs)) < 0) {
            if (include)
                log_net_error(NULL, AV_LOG_ERROR, "setsockopt(MCAST_JOIN_SOURCE_GROUP)");
            else
                log_net_error(NULL, AV_LOG_ERROR, "setsockopt(MCAST_BLOCK_SOURCE)");
            return ff_neterrno();
        }
    }
#elif HAVE_STRUCT_IP_MREQ_SOURCE && defined(IP_BLOCK_SOURCE)
    int i;
    if (addr->sa_family != AF_INET) {
        av_log(NULL, AV_LOG_ERROR,
               "Setting multicast sources only supported for IPv4\n");
        return AVERROR(EINVAL);
    }
    for (i = 0; i < nb_sources; i++) {
        struct ip_mreq_source mreqs;
        struct addrinfo *sourceaddr = udp_resolve_host(sources[i], 0,
                                                       SOCK_DGRAM, AF_UNSPEC,
309
                                                       0);
310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338
        if (!sourceaddr)
            return AVERROR(ENOENT);
        if (sourceaddr->ai_addr->sa_family != AF_INET) {
            freeaddrinfo(sourceaddr);
            av_log(NULL, AV_LOG_ERROR, "%s is of incorrect protocol family\n",
                   sources[i]);
            return AVERROR(EINVAL);
        }

        mreqs.imr_multiaddr.s_addr = ((struct sockaddr_in *)addr)->sin_addr.s_addr;
        mreqs.imr_interface.s_addr = INADDR_ANY;
        mreqs.imr_sourceaddr.s_addr = ((struct sockaddr_in *)sourceaddr->ai_addr)->sin_addr.s_addr;
        freeaddrinfo(sourceaddr);

        if (setsockopt(sockfd, IPPROTO_IP,
                       include ? IP_ADD_SOURCE_MEMBERSHIP : IP_BLOCK_SOURCE,
                       (const void *)&mreqs, sizeof(mreqs)) < 0) {
            if (include)
                log_net_error(NULL, AV_LOG_ERROR, "setsockopt(IP_ADD_SOURCE_MEMBERSHIP)");
            else
                log_net_error(NULL, AV_LOG_ERROR, "setsockopt(IP_BLOCK_SOURCE)");
            return ff_neterrno();
        }
    }
#else
    return AVERROR(ENOSYS);
#endif
    return 0;
}
339 340 341
static int udp_set_url(struct sockaddr_storage *addr,
                       const char *hostname, int port)
{
342
    struct addrinfo *res0;
343 344
    int addr_len;

345
    res0 = udp_resolve_host(hostname, port, SOCK_DGRAM, AF_UNSPEC, 0);
346
    if (!res0) return AVERROR(EIO);
347 348
    memcpy(addr, res0->ai_addr, res0->ai_addrlen);
    addr_len = res0->ai_addrlen;
349
    freeaddrinfo(res0);
350 351

    return addr_len;
352 353
}

354
static int udp_socket_create(UDPContext *s, struct sockaddr_storage *addr,
355
                             socklen_t *addr_len, const char *localaddr)
356
{
357
    int udp_fd = -1;
358
    struct addrinfo *res0, *res;
359
    int family = AF_UNSPEC;
360

361 362
    if (((struct sockaddr *) &s->dest_addr)->sa_family)
        family = ((struct sockaddr *) &s->dest_addr)->sa_family;
363
    res0 = udp_resolve_host((localaddr && localaddr[0]) ? localaddr : NULL, s->local_port,
364
                            SOCK_DGRAM, family, AI_PASSIVE);
365
    if (!res0)
Luca Abeni's avatar
Luca Abeni committed
366 367
        goto fail;
    for (res = res0; res; res=res->ai_next) {
368 369 370 371
        if (s->udplite_coverage)
            udp_fd = ff_socket(res->ai_family, SOCK_DGRAM, IPPROTO_UDPLITE);
        else
            udp_fd = ff_socket(res->ai_family, SOCK_DGRAM, 0);
372
        if (udp_fd != -1) break;
373
        log_net_error(NULL, AV_LOG_ERROR, "socket");
Luca Abeni's avatar
Luca Abeni committed
374
    }
375 376

    if (udp_fd < 0)
377
        goto fail;
378

379 380
    memcpy(addr, res->ai_addr, res->ai_addrlen);
    *addr_len = res->ai_addrlen;
381

382
    freeaddrinfo(res0);
383

384
    return udp_fd;
385

386 387 388
 fail:
    if (udp_fd >= 0)
        closesocket(udp_fd);
389 390
    if(res0)
        freeaddrinfo(res0);
391 392 393
    return -1;
}

394 395
static int udp_port(struct sockaddr_storage *addr, int addr_len)
{
396
    char sbuf[sizeof(int)*3+1];
397
    int error;
398

399 400
    if ((error = getnameinfo((struct sockaddr *)addr, addr_len, NULL, 0,  sbuf, sizeof(sbuf), NI_NUMERICSERV)) != 0) {
        av_log(NULL, AV_LOG_ERROR, "getnameinfo: %s\n", gai_strerror(error));
401 402 403 404 405 406
        return -1;
    }

    return strtol(sbuf, NULL, 10);
}

407

408 409 410 411 412 413
/**
 * If no filename is given to av_open_input_file because you want to
 * get the local port first, then you must call this function to set
 * the remote server address.
 *
 * url syntax: udp://host:port[?option=val...]
414
 * option: 'ttl=n'       : set the ttl value (for multicast only)
415
 *         'localport=n' : set the local port
416
 *         'pkt_size=n'  : set max packet size
417
 *         'reuse=1'     : enable reusing the socket
418
 *         'overrun_nonfatal=1': survive in case of circular buffer overrun
419
 *
420
 * @param h media file context
421 422 423
 * @param uri of the remote server
 * @return zero if no error.
 */
424
int ff_udp_set_remote_url(URLContext *h, const char *uri)
425 426
{
    UDPContext *s = h->priv_data;
427
    char hostname[256], buf[10];
428
    int port;
429
    const char *p;
430

431
    av_url_split(NULL, 0, NULL, 0, hostname, sizeof(hostname), &port, NULL, 0, uri);
Fabrice Bellard's avatar
Fabrice Bellard committed
432

433
    /* set the destination address */
434 435
    s->dest_addr_len = udp_set_url(&s->dest_addr, hostname, port);
    if (s->dest_addr_len < 0) {
436
        return AVERROR(EIO);
437
    }
438
    s->is_multicast = ff_is_multicast_address((struct sockaddr*) &s->dest_addr);
439 440
    p = strchr(uri, '?');
    if (p) {
441
        if (av_find_info_tag(buf, sizeof(buf), "connect", p)) {
442 443 444 445 446 447
            int was_connected = s->is_connected;
            s->is_connected = strtol(buf, NULL, 10);
            if (s->is_connected && !was_connected) {
                if (connect(s->udp_fd, (struct sockaddr *) &s->dest_addr,
                            s->dest_addr_len)) {
                    s->is_connected = 0;
448
                    log_net_error(h, AV_LOG_ERROR, "connect");
449 450 451 452 453
                    return AVERROR(EIO);
                }
            }
        }
    }
454

455 456 457 458
    return 0;
}

/**
459
 * Return the local port used by the UDP connection
460
 * @param h media file context
461 462
 * @return the local port number
 */
463
int ff_udp_get_local_port(URLContext *h)
464 465 466 467 468 469 470 471 472 473
{
    UDPContext *s = h->priv_data;
    return s->local_port;
}

/**
 * Return the udp file handle for select() usage to wait for several RTP
 * streams at the same time.
 * @param h media file context
 */
474
static int udp_get_file_handle(URLContext *h)
475 476 477 478 479
{
    UDPContext *s = h->priv_data;
    return s->udp_fd;
}

480
#if HAVE_PTHREAD_CANCEL
481 482 483 484
static void *circular_buffer_task( void *_URLContext)
{
    URLContext *h = _URLContext;
    UDPContext *s = h->priv_data;
485
    int old_cancelstate;
486

487
    pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_cancelstate);
488
    pthread_mutex_lock(&s->mutex);
489 490 491 492 493
    if (ff_socket_nonblock(s->udp_fd, 0) < 0) {
        av_log(h, AV_LOG_ERROR, "Failed to set blocking mode");
        s->circular_buffer_error = AVERROR(EIO);
        goto end;
    }
494
    while(1) {
495 496
        int len;

497
        pthread_mutex_unlock(&s->mutex);
498 499 500 501
        /* Blocking operations are always cancellation points;
           see "General Information" / "Thread Cancelation Overview"
           in Single Unix. */
        pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &old_cancelstate);
502
        len = recv(s->udp_fd, s->tmp+4, sizeof(s->tmp)-4, 0);
503
        pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_cancelstate);
504
        pthread_mutex_lock(&s->mutex);
505 506
        if (len < 0) {
            if (ff_neterrno() != AVERROR(EAGAIN) && ff_neterrno() != AVERROR(EINTR)) {
507
                s->circular_buffer_error = ff_neterrno();
508
                goto end;
509
            }
510
            continue;
511
        }
512
        AV_WL32(s->tmp, len);
513 514

        if(av_fifo_space(s->fifo) < len + 4) {
515 516 517 518 519 520 521 522 523 524 525 526 527
            /* No Space left */
            if (s->overrun_nonfatal) {
                av_log(h, AV_LOG_WARNING, "Circular buffer overrun. "
                        "Surviving due to overrun_nonfatal option\n");
                continue;
            } else {
                av_log(h, AV_LOG_ERROR, "Circular buffer overrun. "
                        "To avoid, increase fifo_size URL option. "
                        "To survive in such case, use overrun_nonfatal option\n");
                s->circular_buffer_error = AVERROR(EIO);
                goto end;
            }
        }
528
        av_fifo_generic_write(s->fifo, s->tmp, len+4, NULL);
529
        pthread_cond_signal(&s->cond);
530 531
    }

532 533 534
end:
    pthread_cond_signal(&s->cond);
    pthread_mutex_unlock(&s->mutex);
535 536
    return NULL;
}
537
#endif
538

539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559
static int parse_source_list(char *buf, char **sources, int *num_sources,
                             int max_sources)
{
    char *source_start;

    source_start = buf;
    while (1) {
        char *next = strchr(source_start, ',');
        if (next)
            *next = '\0';
        sources[*num_sources] = av_strdup(source_start);
        if (!sources[*num_sources])
            return AVERROR(ENOMEM);
        source_start = next + 1;
        (*num_sources)++;
        if (*num_sources >= max_sources || !next)
            break;
    }
    return 0;
}

560
/* put it in UDP context */
Fabrice Bellard's avatar
Fabrice Bellard committed
561 562 563
/* return non zero if error */
static int udp_open(URLContext *h, const char *uri, int flags)
{
564
    char hostname[1024], localaddr[1024] = "";
565
    int port, udp_fd = -1, tmp, bind_ret = -1, dscp = -1;
566
    UDPContext *s = h->priv_data;
567
    int is_output;
568 569
    const char *p;
    char buf[256];
570
    struct sockaddr_storage my_addr;
571
    socklen_t len;
572 573
    int i, num_include_sources = 0, num_exclude_sources = 0;
    char *include_sources[32], *exclude_sources[32];
Fabrice Bellard's avatar
Fabrice Bellard committed
574 575 576

    h->is_streamed = 1;

577
    is_output = !(flags & AVIO_FLAG_READ);
Luca Barbato's avatar
Luca Barbato committed
578
    if (s->buffer_size < 0)
579
        s->buffer_size = is_output ? UDP_TX_BUF_SIZE : UDP_MAX_PKT_SIZE;
580

Luca Barbato's avatar
Luca Barbato committed
581 582 583 584 585 586 587 588 589 590 591 592 593
    if (s->sources) {
        if (parse_source_list(s->sources, include_sources,
                              &num_include_sources,
                              FF_ARRAY_ELEMS(include_sources)))
            goto fail;
    }

    if (s->block) {
        if (parse_source_list(s->block, exclude_sources, &num_exclude_sources,
                              FF_ARRAY_ELEMS(exclude_sources)))
            goto fail;
    }

594
    if (s->pkt_size > 0)
Luca Barbato's avatar
Luca Barbato committed
595
        h->max_packet_size = s->pkt_size;
596

597 598
    p = strchr(uri, '?');
    if (p) {
599
        if (av_find_info_tag(buf, sizeof(buf), "reuse", p)) {
600
            char *endptr = NULL;
601 602 603 604 605
            s->reuse_socket = strtol(buf, &endptr, 10);
            /* assume if no digits were found it is a request to enable it */
            if (buf == endptr)
                s->reuse_socket = 1;
        }
606 607 608 609 610 611
        if (av_find_info_tag(buf, sizeof(buf), "overrun_nonfatal", p)) {
            char *endptr = NULL;
            s->overrun_nonfatal = strtol(buf, &endptr, 10);
            /* assume if no digits were found it is a request to enable it */
            if (buf == endptr)
                s->overrun_nonfatal = 1;
612 613 614 615
            if (!HAVE_PTHREAD_CANCEL)
                av_log(h, AV_LOG_WARNING,
                       "'overrun_nonfatal' option was set but it is not supported "
                       "on this build (pthread support is required)\n");
616
        }
617
        if (av_find_info_tag(buf, sizeof(buf), "ttl", p)) {
618 619
            s->ttl = strtol(buf, NULL, 10);
        }
620 621 622
        if (av_find_info_tag(buf, sizeof(buf), "udplite_coverage", p)) {
            s->udplite_coverage = strtol(buf, NULL, 10);
        }
623
        if (av_find_info_tag(buf, sizeof(buf), "localport", p)) {
624 625
            s->local_port = strtol(buf, NULL, 10);
        }
626
        if (av_find_info_tag(buf, sizeof(buf), "pkt_size", p)) {
627
            s->pkt_size = strtol(buf, NULL, 10);
628
        }
629
        if (av_find_info_tag(buf, sizeof(buf), "buffer_size", p)) {
630 631
            s->buffer_size = strtol(buf, NULL, 10);
        }
632
        if (av_find_info_tag(buf, sizeof(buf), "connect", p)) {
633 634
            s->is_connected = strtol(buf, NULL, 10);
        }
635 636 637
        if (av_find_info_tag(buf, sizeof(buf), "dscp", p)) {
            dscp = strtol(buf, NULL, 10);
        }
638
        if (av_find_info_tag(buf, sizeof(buf), "fifo_size", p)) {
639
            s->circular_buffer_size = strtol(buf, NULL, 10);
640 641 642 643
            if (!HAVE_PTHREAD_CANCEL)
                av_log(h, AV_LOG_WARNING,
                       "'circular_buffer_size' option was set but it is not supported "
                       "on this build (pthread support is required)\n");
644
        }
645 646 647
        if (av_find_info_tag(buf, sizeof(buf), "localaddr", p)) {
            av_strlcpy(localaddr, buf, sizeof(localaddr));
        }
648 649 650 651 652 653 654 655 656
        if (av_find_info_tag(buf, sizeof(buf), "sources", p)) {
            if (parse_source_list(buf, include_sources, &num_include_sources,
                                  FF_ARRAY_ELEMS(include_sources)))
                goto fail;
        }
        if (av_find_info_tag(buf, sizeof(buf), "block", p)) {
            if (parse_source_list(buf, exclude_sources, &num_exclude_sources,
                                  FF_ARRAY_ELEMS(exclude_sources)))
                goto fail;
657
        }
658
        if (!is_output && av_find_info_tag(buf, sizeof(buf), "timeout", p))
659
            s->timeout = strtol(buf, NULL, 10);
660 661
        if (is_output && av_find_info_tag(buf, sizeof(buf), "broadcast", p))
            s->is_broadcast = strtol(buf, NULL, 10);
Fabrice Bellard's avatar
Fabrice Bellard committed
662
    }
663 664
    /* handling needed to support options picking from both AVOption and URL */
    s->circular_buffer_size *= 188;
665
    if (flags & AVIO_FLAG_WRITE) {
666
        h->max_packet_size = s->pkt_size;
667 668 669
    } else {
        h->max_packet_size = UDP_MAX_PKT_SIZE;
    }
670
    h->rw_timeout = s->timeout;
671 672

    /* fill the dest addr */
673
    av_url_split(NULL, 0, NULL, 0, hostname, sizeof(hostname), &port, NULL, 0, uri);
674

675
    /* XXX: fix av_url_split */
676 677
    if (hostname[0] == '\0' || hostname[0] == '?') {
        /* only accepts null hostname if input */
678
        if (!(flags & AVIO_FLAG_READ))
679 680
            goto fail;
    } else {
681
        if (ff_udp_set_remote_url(h, uri) < 0)
682
            goto fail;
683
    }
Fabrice Bellard's avatar
Fabrice Bellard committed
684

685
    if ((s->is_multicast || s->local_port <= 0) && (h->flags & AVIO_FLAG_READ))
686
        s->local_port = port;
Luca Barbato's avatar
Luca Barbato committed
687 688 689 690 691

    if (localaddr[0])
        udp_fd = udp_socket_create(s, &my_addr, &len, localaddr);
    else
        udp_fd = udp_socket_create(s, &my_addr, &len, s->localaddr);
692 693 694
    if (udp_fd < 0)
        goto fail;

695 696
    s->local_addr_storage=my_addr; //store for future multicast join

697
    /* Follow the requested reuse option, unless it's multicast in which
698
     * case enable reuse unless explicitly disabled.
699
     */
Luca Barbato's avatar
Luca Barbato committed
700
    if (s->reuse_socket > 0 || (s->is_multicast && s->reuse_socket < 0)) {
701
        s->reuse_socket = 1;
702 703
        if (setsockopt (udp_fd, SOL_SOCKET, SO_REUSEADDR, &(s->reuse_socket), sizeof(s->reuse_socket)) != 0)
            goto fail;
704
    }
705

706
    if (s->is_broadcast) {
707
#ifdef SO_BROADCAST
708
        if (setsockopt (udp_fd, SOL_SOCKET, SO_BROADCAST, &(s->is_broadcast), sizeof(s->is_broadcast)) != 0)
709
#endif
710 711 712
           goto fail;
    }

713 714 715 716 717 718 719 720 721 722 723 724
    /* Set the checksum coverage for UDP-Lite (RFC 3828) for sending and receiving.
     * The receiver coverage has to be less than or equal to the sender coverage.
     * Otherwise, the receiver will drop all packets.
     */
    if (s->udplite_coverage) {
        if (setsockopt (udp_fd, IPPROTO_UDPLITE, UDPLITE_SEND_CSCOV, &(s->udplite_coverage), sizeof(s->udplite_coverage)) != 0)
            av_log(h, AV_LOG_WARNING, "socket option UDPLITE_SEND_CSCOV not available");

        if (setsockopt (udp_fd, IPPROTO_UDPLITE, UDPLITE_RECV_CSCOV, &(s->udplite_coverage), sizeof(s->udplite_coverage)) != 0)
            av_log(h, AV_LOG_WARNING, "socket option UDPLITE_RECV_CSCOV not available");
    }

725 726 727 728 729 730
    if (dscp >= 0) {
        dscp <<= 2;
        if (setsockopt (udp_fd, IPPROTO_IP, IP_TOS, &dscp, sizeof(dscp)) != 0)
            goto fail;
    }

731 732
    /* If multicast, try binding the multicast address first, to avoid
     * receiving UDP packets from other sources aimed at the same UDP
733 734 735
     * port. This fails on windows. This makes sending to the same address
     * using sendto() fail, so only do it if we're opened in read-only mode. */
    if (s->is_multicast && !(h->flags & AVIO_FLAG_WRITE)) {
736 737 738 739
        bind_ret = bind(udp_fd,(struct sockaddr *)&s->dest_addr, len);
    }
    /* bind to the local address if not multicast or if the multicast
     * bind failed */
Martin Storsjö's avatar
Martin Storsjö committed
740
    /* the bind is needed to give a port to the socket now */
741
    if (bind_ret < 0 && bind(udp_fd,(struct sockaddr *)&my_addr, len) < 0) {
742
        log_net_error(h, AV_LOG_ERROR, "bind failed");
Fabrice Bellard's avatar
Fabrice Bellard committed
743
        goto fail;
744
    }
Fabrice Bellard's avatar
Fabrice Bellard committed
745

746 747
    len = sizeof(my_addr);
    getsockname(udp_fd, (struct sockaddr *)&my_addr, &len);
748 749
    s->local_port = udp_port(&my_addr, len);

750
    if (s->is_multicast) {
751
        if (h->flags & AVIO_FLAG_WRITE) {
752
            /* output */
753
            if (udp_set_multicast_ttl(udp_fd, s->ttl, (struct sockaddr *)&s->dest_addr) < 0)
754
                goto fail;
755 756
        }
        if (h->flags & AVIO_FLAG_READ) {
757
            /* input */
758 759 760 761 762 763 764 765
            if (num_include_sources && num_exclude_sources) {
                av_log(h, AV_LOG_ERROR, "Simultaneously including and excluding multicast sources is not supported\n");
                goto fail;
            }
            if (num_include_sources) {
                if (udp_set_multicast_sources(udp_fd, (struct sockaddr *)&s->dest_addr, s->dest_addr_len, include_sources, num_include_sources, 1) < 0)
                    goto fail;
            } else {
766
                if (udp_join_multicast_group(udp_fd, (struct sockaddr *)&s->dest_addr,(struct sockaddr *)&s->local_addr_storage) < 0)
767
                    goto fail;
768 769 770
            }
            if (num_exclude_sources) {
                if (udp_set_multicast_sources(udp_fd, (struct sockaddr *)&s->dest_addr, s->dest_addr_len, exclude_sources, num_exclude_sources, 0) < 0)
771 772
                    goto fail;
            }
773 774
        }
    }
Fabrice Bellard's avatar
Fabrice Bellard committed
775

776 777
    if (is_output) {
        /* limit the tx buf size to limit latency */
778
        tmp = s->buffer_size;
779
        if (setsockopt(udp_fd, SOL_SOCKET, SO_SNDBUF, &tmp, sizeof(tmp)) < 0) {
780
            log_net_error(h, AV_LOG_ERROR, "setsockopt(SO_SNDBUF)");
781 782
            goto fail;
        }
783
    } else {
784
        /* set udp recv buffer size to the requested value (default 64K) */
785 786
        tmp = s->buffer_size;
        if (setsockopt(udp_fd, SOL_SOCKET, SO_RCVBUF, &tmp, sizeof(tmp)) < 0) {
787
            log_net_error(h, AV_LOG_WARNING, "setsockopt(SO_RECVBUF)");
788
        }
789 790 791
        len = sizeof(tmp);
        if (getsockopt(udp_fd, SOL_SOCKET, SO_RCVBUF, &tmp, &len) < 0) {
            log_net_error(h, AV_LOG_WARNING, "getsockopt(SO_RCVBUF)");
792
        } else {
793
            av_log(h, AV_LOG_DEBUG, "end receive buffer size reported is %d\n", tmp);
794 795 796
            if(tmp < s->buffer_size)
                av_log(h, AV_LOG_WARNING, "attempted to set receive buffer to size %d but it only ended up set as %d", s->buffer_size, tmp);
        }
797

798 799
        /* make the socket non-blocking */
        ff_socket_nonblock(udp_fd, 1);
800
    }
801 802
    if (s->is_connected) {
        if (connect(udp_fd, (struct sockaddr *) &s->dest_addr, s->dest_addr_len)) {
803
            log_net_error(h, AV_LOG_ERROR, "connect");
804 805 806
            goto fail;
        }
    }
807

808 809 810 811
    for (i = 0; i < num_include_sources; i++)
        av_freep(&include_sources[i]);
    for (i = 0; i < num_exclude_sources; i++)
        av_freep(&exclude_sources[i]);
812

813
    s->udp_fd = udp_fd;
814

815
#if HAVE_PTHREAD_CANCEL
816
    if (!is_output && s->circular_buffer_size) {
817 818
        int ret;

819
        /* start the task going */
820
        s->fifo = av_fifo_alloc(s->circular_buffer_size);
821 822 823
        ret = pthread_mutex_init(&s->mutex, NULL);
        if (ret != 0) {
            av_log(h, AV_LOG_ERROR, "pthread_mutex_init failed : %s\n", strerror(ret));
824 825
            goto fail;
        }
826 827 828 829 830 831 832 833 834 835 836
        ret = pthread_cond_init(&s->cond, NULL);
        if (ret != 0) {
            av_log(h, AV_LOG_ERROR, "pthread_cond_init failed : %s\n", strerror(ret));
            goto cond_fail;
        }
        ret = pthread_create(&s->circular_buffer_thread, NULL, circular_buffer_task, h);
        if (ret != 0) {
            av_log(h, AV_LOG_ERROR, "pthread_create failed : %s\n", strerror(ret));
            goto thread_fail;
        }
        s->thread_started = 1;
837
    }
838
#endif
839

Fabrice Bellard's avatar
Fabrice Bellard committed
840
    return 0;
841
#if HAVE_PTHREAD_CANCEL
842 843 844 845 846
 thread_fail:
    pthread_cond_destroy(&s->cond);
 cond_fail:
    pthread_mutex_destroy(&s->mutex);
#endif
Fabrice Bellard's avatar
Fabrice Bellard committed
847
 fail:
848
    if (udp_fd >= 0)
849
        closesocket(udp_fd);
Lukasz Marek's avatar
Lukasz Marek committed
850
    av_fifo_freep(&s->fifo);
851 852 853 854
    for (i = 0; i < num_include_sources; i++)
        av_freep(&include_sources[i]);
    for (i = 0; i < num_exclude_sources; i++)
        av_freep(&exclude_sources[i]);
855
    return AVERROR(EIO);
Fabrice Bellard's avatar
Fabrice Bellard committed
856 857
}

858 859 860 861 862 863 864 865 866 867
static int udplite_open(URLContext *h, const char *uri, int flags)
{
    UDPContext *s = h->priv_data;

    // set default checksum coverage
    s->udplite_coverage = UDP_HEADER_SIZE;

    return udp_open(h, uri, flags);
}

868
static int udp_read(URLContext *h, uint8_t *buf, int size)
Fabrice Bellard's avatar
Fabrice Bellard committed
869 870
{
    UDPContext *s = h->priv_data;
871
    int ret;
872
#if HAVE_PTHREAD_CANCEL
873
    int avail, nonblock = h->flags & AVIO_FLAG_NONBLOCK;
874

875
    if (s->fifo) {
876
        pthread_mutex_lock(&s->mutex);
877
        do {
878
            avail = av_fifo_size(s->fifo);
879
            if (avail) { // >=size) {
880 881 882 883 884 885 886 887
                uint8_t tmp[4];

                av_fifo_generic_read(s->fifo, tmp, 4, NULL);
                avail= AV_RL32(tmp);
                if(avail > size){
                    av_log(h, AV_LOG_WARNING, "Part of datagram lost due to insufficient buffer size\n");
                    avail= size;
                }
888

889
                av_fifo_generic_read(s->fifo, buf, avail, NULL);
890
                av_fifo_drain(s->fifo, AV_RL32(tmp) - avail);
891
                pthread_mutex_unlock(&s->mutex);
892
                return avail;
893
            } else if(s->circular_buffer_error){
894
                int err = s->circular_buffer_error;
895
                pthread_mutex_unlock(&s->mutex);
896
                return err;
897
            } else if(nonblock) {
898 899
                pthread_mutex_unlock(&s->mutex);
                return AVERROR(EAGAIN);
900 901
            }
            else {
902 903 904 905 906
                /* FIXME: using the monotonic clock would be better,
                   but it does not exist on all supported platforms. */
                int64_t t = av_gettime() + 100000;
                struct timespec tv = { .tv_sec  =  t / 1000000,
                                       .tv_nsec = (t % 1000000) * 1000 };
907 908
                if (pthread_cond_timedwait(&s->cond, &s->mutex, &tv) < 0) {
                    pthread_mutex_unlock(&s->mutex);
909
                    return AVERROR(errno == ETIMEDOUT ? EAGAIN : errno);
910
                }
911
                nonblock = 1;
912 913 914
            }
        } while( 1);
    }
915
#endif
916

917
    if (!(h->flags & AVIO_FLAG_NONBLOCK)) {
918 919 920
        ret = ff_network_wait_fd(s->udp_fd, 0);
        if (ret < 0)
            return ret;
921
    }
922
    ret = recv(s->udp_fd, buf, size, 0);
923

924
    return ret < 0 ? ff_neterrno() : ret;
Fabrice Bellard's avatar
Fabrice Bellard committed
925 926
}

927
static int udp_write(URLContext *h, const uint8_t *buf, int size)
Fabrice Bellard's avatar
Fabrice Bellard committed
928 929
{
    UDPContext *s = h->priv_data;
930 931
    int ret;

932
    if (!(h->flags & AVIO_FLAG_NONBLOCK)) {
933 934 935
        ret = ff_network_wait_fd(s->udp_fd, 1);
        if (ret < 0)
            return ret;
Fabrice Bellard's avatar
Fabrice Bellard committed
936
    }
937 938 939 940 941 942 943 944 945

    if (!s->is_connected) {
        ret = sendto (s->udp_fd, buf, size, 0,
                      (struct sockaddr *) &s->dest_addr,
                      s->dest_addr_len);
    } else
        ret = send(s->udp_fd, buf, size, 0);

    return ret < 0 ? ff_neterrno() : ret;
946 947 948 949 950 951
}

static int udp_close(URLContext *h)
{
    UDPContext *s = h->priv_data;

952
    if (s->is_multicast && (h->flags & AVIO_FLAG_READ))
953
        udp_leave_multicast_group(s->udp_fd, (struct sockaddr *)&s->dest_addr,(struct sockaddr *)&s->local_addr_storage);
954
    closesocket(s->udp_fd);
955
#if HAVE_PTHREAD_CANCEL
956
    if (s->thread_started) {
957
        int ret;
958
        pthread_cancel(s->circular_buffer_thread);
959 960 961
        ret = pthread_join(s->circular_buffer_thread, NULL);
        if (ret != 0)
            av_log(h, AV_LOG_ERROR, "pthread_join(): %s\n", strerror(ret));
962 963
        pthread_mutex_destroy(&s->mutex);
        pthread_cond_destroy(&s->cond);
964
    }
965
#endif
Lukasz Marek's avatar
Lukasz Marek committed
966
    av_fifo_freep(&s->fifo);
967
    return 0;
Fabrice Bellard's avatar
Fabrice Bellard committed
968 969
}

970
URLProtocol ff_udp_protocol = {
971 972 973 974 975
    .name                = "udp",
    .url_open            = udp_open,
    .url_read            = udp_read,
    .url_write           = udp_write,
    .url_close           = udp_close,
976
    .url_get_file_handle = udp_get_file_handle,
977
    .priv_data_size      = sizeof(UDPContext),
Luca Barbato's avatar
Luca Barbato committed
978
    .priv_data_class     = &udp_class,
979
    .flags               = URL_PROTOCOL_FLAG_NETWORK,
Fabrice Bellard's avatar
Fabrice Bellard committed
980
};
981 982 983 984 985 986 987 988 989 990 991 992

URLProtocol ff_udplite_protocol = {
    .name                = "udplite",
    .url_open            = udplite_open,
    .url_read            = udp_read,
    .url_write           = udp_write,
    .url_close           = udp_close,
    .url_get_file_handle = udp_get_file_handle,
    .priv_data_size      = sizeof(UDPContext),
    .priv_data_class     = &udplite_context_class,
    .flags               = URL_PROTOCOL_FLAG_NETWORK,
};