Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Contribute to GitLab
Sign in / Register
Toggle navigation
F
ffmpeg.wasm-core
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
Linshizhi
ffmpeg.wasm-core
Commits
f89584ca
Commit
f89584ca
authored
Aug 08, 2012
by
Samuel Pitoiset
Committed by
Martin Storsjö
Aug 08, 2012
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
rtmp: Add message tracking
Signed-off-by:
Martin Storsjö
<
martin@martin.st
>
parent
1243c722
Hide whitespace changes
Inline
Side-by-side
Showing
1 changed file
with
160 additions
and
138 deletions
+160
-138
rtmpproto.c
libavformat/rtmpproto.c
+160
-138
No files found.
libavformat/rtmpproto.c
View file @
f89584ca
...
...
@@ -52,15 +52,17 @@
typedef
enum
{
STATE_START
,
///< client has not done anything yet
STATE_HANDSHAKED
,
///< client has performed handshake
STATE_RELEASING
,
///< client releasing stream before publish it (for output)
STATE_FCPUBLISH
,
///< client FCPublishing stream (for output)
STATE_CONNECTING
,
///< client connected to server successfully
STATE_READY
,
///< client has sent all needed commands and waits for server reply
STATE_PLAYING
,
///< client has started receiving multimedia data from server
STATE_PUBLISHING
,
///< client has started sending multimedia data to server (for output)
STATE_STOPPED
,
///< the broadcast has been stopped
}
ClientState
;
typedef
struct
TrackedMethod
{
char
*
name
;
int
id
;
}
TrackedMethod
;
/** protocol handler context */
typedef
struct
RTMPContext
{
const
AVClass
*
class
;
...
...
@@ -86,7 +88,6 @@ typedef struct RTMPContext {
uint8_t
flv_header
[
11
];
///< partial incoming flv packet header
int
flv_header_bytes
;
///< number of initialized bytes in flv_header
int
nb_invokes
;
///< keeps track of invoke messages
int
create_stream_invoke
;
///< invoke id for the create stream command
char
*
tcurl
;
///< url of the target stream
char
*
flashver
;
///< version of the flash plugin
char
*
swfurl
;
///< url of the swf player
...
...
@@ -96,6 +97,9 @@ typedef struct RTMPContext {
int
client_buffer_time
;
///< client buffer time in ms
int
flush_interval
;
///< number of packets flushed in the same request (RTMPT only)
int
encrypted
;
///< use an encrypted connection (RTMPE only)
TrackedMethod
*
tracked_methods
;
///< tracked methods buffer
int
nb_tracked_methods
;
///< number of tracked methods
int
tracked_methods_size
;
///< size of the tracked methods buffer
}
RTMPContext
;
#define PLAYER_KEY_OPEN_PART_LEN 30 ///< length of partial key used for first client digest signing
...
...
@@ -121,6 +125,72 @@ static const uint8_t rtmp_server_key[] = {
0xE6
,
0x36
,
0xCF
,
0xEB
,
0x31
,
0xAE
};
static
int
add_tracked_method
(
RTMPContext
*
rt
,
const
char
*
name
,
int
id
)
{
void
*
ptr
;
if
(
rt
->
nb_tracked_methods
+
1
>
rt
->
tracked_methods_size
)
{
rt
->
tracked_methods_size
=
(
rt
->
nb_tracked_methods
+
1
)
*
2
;
ptr
=
av_realloc
(
rt
->
tracked_methods
,
rt
->
tracked_methods_size
*
sizeof
(
*
rt
->
tracked_methods
));
if
(
!
ptr
)
return
AVERROR
(
ENOMEM
);
rt
->
tracked_methods
=
ptr
;
}
rt
->
tracked_methods
[
rt
->
nb_tracked_methods
].
name
=
av_strdup
(
name
);
if
(
!
rt
->
tracked_methods
[
rt
->
nb_tracked_methods
].
name
)
return
AVERROR
(
ENOMEM
);
rt
->
tracked_methods
[
rt
->
nb_tracked_methods
].
id
=
id
;
rt
->
nb_tracked_methods
++
;
return
0
;
}
static
void
del_tracked_method
(
RTMPContext
*
rt
,
int
index
)
{
memmove
(
&
rt
->
tracked_methods
[
index
],
&
rt
->
tracked_methods
[
index
+
1
],
sizeof
(
*
rt
->
tracked_methods
)
*
(
rt
->
nb_tracked_methods
-
index
-
1
));
rt
->
nb_tracked_methods
--
;
}
static
void
free_tracked_methods
(
RTMPContext
*
rt
)
{
int
i
;
for
(
i
=
0
;
i
<
rt
->
nb_tracked_methods
;
i
++
)
av_free
(
rt
->
tracked_methods
[
i
].
name
);
av_free
(
rt
->
tracked_methods
);
}
static
int
rtmp_send_packet
(
RTMPContext
*
rt
,
RTMPPacket
*
pkt
,
int
track
)
{
int
ret
;
if
(
pkt
->
type
==
RTMP_PT_INVOKE
&&
track
)
{
GetByteContext
gbc
;
char
name
[
128
];
double
pkt_id
;
int
len
;
bytestream2_init
(
&
gbc
,
pkt
->
data
,
pkt
->
data_size
);
if
((
ret
=
ff_amf_read_string
(
&
gbc
,
name
,
sizeof
(
name
),
&
len
))
<
0
)
goto
fail
;
if
((
ret
=
ff_amf_read_number
(
&
gbc
,
&
pkt_id
))
<
0
)
goto
fail
;
if
((
ret
=
add_tracked_method
(
rt
,
name
,
pkt_id
))
<
0
)
goto
fail
;
}
ret
=
ff_rtmp_packet_write
(
rt
->
stream
,
pkt
,
rt
->
chunk_size
,
rt
->
prev_pkt
[
1
]);
fail:
ff_rtmp_packet_destroy
(
pkt
);
return
ret
;
}
static
int
rtmp_write_amf_data
(
URLContext
*
s
,
char
*
param
,
uint8_t
**
p
)
{
char
*
field
,
*
value
;
...
...
@@ -269,11 +339,7 @@ static int gen_connect(URLContext *s, RTMPContext *rt)
pkt
.
data_size
=
p
-
pkt
.
data
;
ret
=
ff_rtmp_packet_write
(
rt
->
stream
,
&
pkt
,
rt
->
chunk_size
,
rt
->
prev_pkt
[
1
]);
ff_rtmp_packet_destroy
(
&
pkt
);
return
ret
;
return
rtmp_send_packet
(
rt
,
&
pkt
,
1
);
}
/**
...
...
@@ -297,11 +363,7 @@ static int gen_release_stream(URLContext *s, RTMPContext *rt)
ff_amf_write_null
(
&
p
);
ff_amf_write_string
(
&
p
,
rt
->
playpath
);
ret
=
ff_rtmp_packet_write
(
rt
->
stream
,
&
pkt
,
rt
->
chunk_size
,
rt
->
prev_pkt
[
1
]);
ff_rtmp_packet_destroy
(
&
pkt
);
return
ret
;
return
rtmp_send_packet
(
rt
,
&
pkt
,
0
);
}
/**
...
...
@@ -325,11 +387,7 @@ static int gen_fcpublish_stream(URLContext *s, RTMPContext *rt)
ff_amf_write_null
(
&
p
);
ff_amf_write_string
(
&
p
,
rt
->
playpath
);
ret
=
ff_rtmp_packet_write
(
rt
->
stream
,
&
pkt
,
rt
->
chunk_size
,
rt
->
prev_pkt
[
1
]);
ff_rtmp_packet_destroy
(
&
pkt
);
return
ret
;
return
rtmp_send_packet
(
rt
,
&
pkt
,
0
);
}
/**
...
...
@@ -353,11 +411,7 @@ static int gen_fcunpublish_stream(URLContext *s, RTMPContext *rt)
ff_amf_write_null
(
&
p
);
ff_amf_write_string
(
&
p
,
rt
->
playpath
);
ret
=
ff_rtmp_packet_write
(
rt
->
stream
,
&
pkt
,
rt
->
chunk_size
,
rt
->
prev_pkt
[
1
]);
ff_rtmp_packet_destroy
(
&
pkt
);
return
ret
;
return
rtmp_send_packet
(
rt
,
&
pkt
,
0
);
}
/**
...
...
@@ -380,13 +434,8 @@ static int gen_create_stream(URLContext *s, RTMPContext *rt)
ff_amf_write_string
(
&
p
,
"createStream"
);
ff_amf_write_number
(
&
p
,
++
rt
->
nb_invokes
);
ff_amf_write_null
(
&
p
);
rt
->
create_stream_invoke
=
rt
->
nb_invokes
;
ret
=
ff_rtmp_packet_write
(
rt
->
stream
,
&
pkt
,
rt
->
chunk_size
,
rt
->
prev_pkt
[
1
]);
ff_rtmp_packet_destroy
(
&
pkt
);
return
ret
;
return
rtmp_send_packet
(
rt
,
&
pkt
,
1
);
}
...
...
@@ -412,11 +461,7 @@ static int gen_delete_stream(URLContext *s, RTMPContext *rt)
ff_amf_write_null
(
&
p
);
ff_amf_write_number
(
&
p
,
rt
->
main_channel_id
);
ret
=
ff_rtmp_packet_write
(
rt
->
stream
,
&
pkt
,
rt
->
chunk_size
,
rt
->
prev_pkt
[
1
]);
ff_rtmp_packet_destroy
(
&
pkt
);
return
ret
;
return
rtmp_send_packet
(
rt
,
&
pkt
,
0
);
}
/**
...
...
@@ -437,11 +482,7 @@ static int gen_buffer_time(URLContext *s, RTMPContext *rt)
bytestream_put_be32
(
&
p
,
rt
->
main_channel_id
);
bytestream_put_be32
(
&
p
,
rt
->
client_buffer_time
);
ret
=
ff_rtmp_packet_write
(
rt
->
stream
,
&
pkt
,
rt
->
chunk_size
,
rt
->
prev_pkt
[
1
]);
ff_rtmp_packet_destroy
(
&
pkt
);
return
ret
;
return
rtmp_send_packet
(
rt
,
&
pkt
,
0
);
}
/**
...
...
@@ -469,11 +510,7 @@ static int gen_play(URLContext *s, RTMPContext *rt)
ff_amf_write_string
(
&
p
,
rt
->
playpath
);
ff_amf_write_number
(
&
p
,
rt
->
live
);
ret
=
ff_rtmp_packet_write
(
rt
->
stream
,
&
pkt
,
rt
->
chunk_size
,
rt
->
prev_pkt
[
1
]);
ff_rtmp_packet_destroy
(
&
pkt
);
return
ret
;
return
rtmp_send_packet
(
rt
,
&
pkt
,
1
);
}
/**
...
...
@@ -500,11 +537,7 @@ static int gen_publish(URLContext *s, RTMPContext *rt)
ff_amf_write_string
(
&
p
,
rt
->
playpath
);
ff_amf_write_string
(
&
p
,
"live"
);
ret
=
ff_rtmp_packet_write
(
rt
->
stream
,
&
pkt
,
rt
->
chunk_size
,
rt
->
prev_pkt
[
1
]);
ff_rtmp_packet_destroy
(
&
pkt
);
return
ret
;
return
rtmp_send_packet
(
rt
,
&
pkt
,
1
);
}
/**
...
...
@@ -529,11 +562,8 @@ static int gen_pong(URLContext *s, RTMPContext *rt, RTMPPacket *ppkt)
p
=
pkt
.
data
;
bytestream_put_be16
(
&
p
,
7
);
bytestream_put_be32
(
&
p
,
AV_RB32
(
ppkt
->
data
+
2
));
ret
=
ff_rtmp_packet_write
(
rt
->
stream
,
&
pkt
,
rt
->
chunk_size
,
rt
->
prev_pkt
[
1
]);
ff_rtmp_packet_destroy
(
&
pkt
);
return
r
et
;
return
r
tmp_send_packet
(
rt
,
&
pkt
,
0
)
;
}
/**
...
...
@@ -551,11 +581,8 @@ static int gen_server_bw(URLContext *s, RTMPContext *rt)
p
=
pkt
.
data
;
bytestream_put_be32
(
&
p
,
rt
->
server_bw
);
ret
=
ff_rtmp_packet_write
(
rt
->
stream
,
&
pkt
,
rt
->
chunk_size
,
rt
->
prev_pkt
[
1
]);
ff_rtmp_packet_destroy
(
&
pkt
);
return
r
et
;
return
r
tmp_send_packet
(
rt
,
&
pkt
,
0
)
;
}
/**
...
...
@@ -576,11 +603,7 @@ static int gen_check_bw(URLContext *s, RTMPContext *rt)
ff_amf_write_number
(
&
p
,
RTMP_NOTIFICATION
);
ff_amf_write_null
(
&
p
);
ret
=
ff_rtmp_packet_write
(
rt
->
stream
,
&
pkt
,
rt
->
chunk_size
,
rt
->
prev_pkt
[
1
]);
ff_rtmp_packet_destroy
(
&
pkt
);
return
ret
;
return
rtmp_send_packet
(
rt
,
&
pkt
,
0
);
}
/**
...
...
@@ -598,11 +621,8 @@ static int gen_bytes_read(URLContext *s, RTMPContext *rt, uint32_t ts)
p
=
pkt
.
data
;
bytestream_put_be32
(
&
p
,
rt
->
bytes_read
);
ret
=
ff_rtmp_packet_write
(
rt
->
stream
,
&
pkt
,
rt
->
chunk_size
,
rt
->
prev_pkt
[
1
]);
ff_rtmp_packet_destroy
(
&
pkt
);
return
r
et
;
return
r
tmp_send_packet
(
rt
,
&
pkt
,
0
)
;
}
static
int
gen_fcsubscribe_stream
(
URLContext
*
s
,
RTMPContext
*
rt
,
...
...
@@ -622,11 +642,7 @@ static int gen_fcsubscribe_stream(URLContext *s, RTMPContext *rt,
ff_amf_write_null
(
&
p
);
ff_amf_write_string
(
&
p
,
subscribe
);
ret
=
ff_rtmp_packet_write
(
rt
->
stream
,
&
pkt
,
rt
->
chunk_size
,
rt
->
prev_pkt
[
1
]);
ff_rtmp_packet_destroy
(
&
pkt
);
return
ret
;
return
rtmp_send_packet
(
rt
,
&
pkt
,
1
);
}
int
ff_rtmp_calc_digest
(
const
uint8_t
*
src
,
int
len
,
int
gap
,
...
...
@@ -1010,7 +1026,8 @@ static int handle_invoke(URLContext *s, RTMPPacket *pkt)
RTMPContext
*
rt
=
s
->
priv_data
;
int
i
,
t
;
const
uint8_t
*
data_end
=
pkt
->
data
+
pkt
->
data_size
;
int
ret
;
char
*
tracked_method
=
NULL
;
int
ret
=
0
;
//TODO: check for the messages sent for wrong state?
if
(
!
memcmp
(
pkt
->
data
,
"
\002\000\006
_error"
,
9
))
{
...
...
@@ -1021,68 +1038,72 @@ static int handle_invoke(URLContext *s, RTMPPacket *pkt)
av_log
(
s
,
AV_LOG_ERROR
,
"Server error: %s
\n
"
,
tmpstr
);
return
-
1
;
}
else
if
(
!
memcmp
(
pkt
->
data
,
"
\002\000\007
_result"
,
10
))
{
switch
(
rt
->
state
)
{
case
STATE_HANDSHAKED
:
if
(
!
rt
->
is_input
)
{
if
((
ret
=
gen_release_stream
(
s
,
rt
))
<
0
)
return
ret
;
if
((
ret
=
gen_fcpublish_stream
(
s
,
rt
))
<
0
)
return
ret
;
rt
->
state
=
STATE_RELEASING
;
}
else
{
if
((
ret
=
gen_server_bw
(
s
,
rt
))
<
0
)
return
ret
;
rt
->
state
=
STATE_CONNECTING
;
}
if
((
ret
=
gen_create_stream
(
s
,
rt
))
<
0
)
return
ret
;
if
(
rt
->
is_input
)
{
/* Send the FCSubscribe command when the name of live
* stream is defined by the user or if it's a live stream. */
if
(
rt
->
subscribe
)
{
if
((
ret
=
gen_fcsubscribe_stream
(
s
,
rt
,
rt
->
subscribe
))
<
0
)
return
ret
;
}
else
if
(
rt
->
live
==
-
1
)
{
if
((
ret
=
gen_fcsubscribe_stream
(
s
,
rt
,
rt
->
playpath
))
<
0
)
return
ret
;
}
}
break
;
case
STATE_FCPUBLISH
:
rt
->
state
=
STATE_CONNECTING
;
break
;
case
STATE_RELEASING
:
rt
->
state
=
STATE_FCPUBLISH
;
/* hack for Wowza Media Server, it does not send result for
* releaseStream and FCPublish calls */
if
(
!
pkt
->
data
[
10
])
{
int
pkt_id
=
av_int2double
(
AV_RB64
(
pkt
->
data
+
11
));
if
(
pkt_id
==
rt
->
create_stream_invoke
)
rt
->
state
=
STATE_CONNECTING
;
}
if
(
rt
->
state
!=
STATE_CONNECTING
)
break
;
case
STATE_CONNECTING
:
//extract a number from the result
if
(
pkt
->
data
[
10
]
||
pkt
->
data
[
19
]
!=
5
||
pkt
->
data
[
20
])
{
av_log
(
s
,
AV_LOG_WARNING
,
"Unexpected reply on connect()
\n
"
);
}
else
{
rt
->
main_channel_id
=
av_int2double
(
AV_RB64
(
pkt
->
data
+
21
));
}
if
(
rt
->
is_input
)
{
if
((
ret
=
gen_play
(
s
,
rt
))
<
0
)
return
ret
;
if
((
ret
=
gen_buffer_time
(
s
,
rt
))
<
0
)
return
ret
;
}
else
{
if
((
ret
=
gen_publish
(
s
,
rt
))
<
0
)
return
ret
;
GetByteContext
gbc
;
double
pkt_id
;
bytestream2_init
(
&
gbc
,
pkt
->
data
+
10
,
pkt
->
data_size
);
if
((
ret
=
ff_amf_read_number
(
&
gbc
,
&
pkt_id
))
<
0
)
return
ret
;
for
(
i
=
0
;
i
<
rt
->
nb_tracked_methods
;
i
++
)
{
if
(
rt
->
tracked_methods
[
i
].
id
!=
pkt_id
)
continue
;
tracked_method
=
rt
->
tracked_methods
[
i
].
name
;
del_tracked_method
(
rt
,
i
);
break
;
}
if
(
!
tracked_method
)
{
/* Ignore this reply when the current method is not tracked. */
return
0
;
}
if
(
!
memcmp
(
tracked_method
,
"connect"
,
7
))
{
if
(
!
rt
->
is_input
)
{
if
((
ret
=
gen_release_stream
(
s
,
rt
))
<
0
)
goto
invoke_fail
;
if
((
ret
=
gen_fcpublish_stream
(
s
,
rt
))
<
0
)
goto
invoke_fail
;
}
else
{
if
((
ret
=
gen_server_bw
(
s
,
rt
))
<
0
)
goto
invoke_fail
;
}
if
((
ret
=
gen_create_stream
(
s
,
rt
))
<
0
)
goto
invoke_fail
;
if
(
rt
->
is_input
)
{
/* Send the FCSubscribe command when the name of live
* stream is defined by the user or if it's a live stream. */
if
(
rt
->
subscribe
)
{
if
((
ret
=
gen_fcsubscribe_stream
(
s
,
rt
,
rt
->
subscribe
))
<
0
)
goto
invoke_fail
;
}
else
if
(
rt
->
live
==
-
1
)
{
if
((
ret
=
gen_fcsubscribe_stream
(
s
,
rt
,
rt
->
playpath
))
<
0
)
goto
invoke_fail
;
}
rt
->
state
=
STATE_READY
;
break
;
}
}
else
if
(
!
memcmp
(
tracked_method
,
"createStream"
,
12
))
{
//extract a number from the result
if
(
pkt
->
data
[
10
]
||
pkt
->
data
[
19
]
!=
5
||
pkt
->
data
[
20
])
{
av_log
(
s
,
AV_LOG_WARNING
,
"Unexpected reply on connect()
\n
"
);
}
else
{
rt
->
main_channel_id
=
av_int2double
(
AV_RB64
(
pkt
->
data
+
21
));
}
if
(
!
rt
->
is_input
)
{
if
((
ret
=
gen_publish
(
s
,
rt
))
<
0
)
goto
invoke_fail
;
}
else
{
if
((
ret
=
gen_play
(
s
,
rt
))
<
0
)
goto
invoke_fail
;
if
((
ret
=
gen_buffer_time
(
s
,
rt
))
<
0
)
goto
invoke_fail
;
}
}
}
else
if
(
!
memcmp
(
pkt
->
data
,
"
\002\000\010
onStatus"
,
11
))
{
const
uint8_t
*
ptr
=
pkt
->
data
+
11
;
...
...
@@ -1113,7 +1134,9 @@ static int handle_invoke(URLContext *s, RTMPPacket *pkt)
return
ret
;
}
return
0
;
invoke_fail:
av_free
(
tracked_method
);
return
ret
;
}
/**
...
...
@@ -1283,6 +1306,7 @@ static int rtmp_close(URLContext *h)
if
(
rt
->
state
>
STATE_HANDSHAKED
)
ret
=
gen_delete_stream
(
h
,
rt
);
free_tracked_methods
(
rt
);
av_freep
(
&
rt
->
flv_data
);
ffurl_close
(
rt
->
stream
);
return
ret
;
...
...
@@ -1570,10 +1594,8 @@ static int rtmp_write(URLContext *s, const uint8_t *buf, int size)
if
(
rt
->
flv_off
==
rt
->
flv_size
)
{
rt
->
skip_bytes
=
4
;
if
((
ret
=
ff_rtmp_packet_write
(
rt
->
stream
,
&
rt
->
out_pkt
,
rt
->
chunk_size
,
rt
->
prev_pkt
[
1
]))
<
0
)
if
((
ret
=
rtmp_send_packet
(
rt
,
&
rt
->
out_pkt
,
0
))
<
0
)
return
ret
;
ff_rtmp_packet_destroy
(
&
rt
->
out_pkt
);
rt
->
flv_size
=
0
;
rt
->
flv_off
=
0
;
rt
->
flv_header_bytes
=
0
;
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment