libavformat/rtmpproto.c
Go to the documentation of this file.
00001 /*
00002  * RTMP network protocol
00003  * Copyright (c) 2009 Kostya Shishkov
00004  *
00005  * This file is part of FFmpeg.
00006  *
00007  * FFmpeg is free software; you can redistribute it and/or
00008  * modify it under the terms of the GNU Lesser General Public
00009  * License as published by the Free Software Foundation; either
00010  * version 2.1 of the License, or (at your option) any later version.
00011  *
00012  * FFmpeg is distributed in the hope that it will be useful,
00013  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00014  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00015  * Lesser General Public License for more details.
00016  *
00017  * You should have received a copy of the GNU Lesser General Public
00018  * License along with FFmpeg; if not, write to the Free Software
00019  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
00020  */
00021 
00027 #include "libavcodec/bytestream.h"
00028 #include "libavutil/avstring.h"
00029 #include "libavutil/intfloat.h"
00030 #include "libavutil/lfg.h"
00031 #include "libavutil/sha.h"
00032 #include "avformat.h"
00033 #include "internal.h"
00034 
00035 #include "network.h"
00036 
00037 #include "flv.h"
00038 #include "rtmp.h"
00039 #include "rtmppkt.h"
00040 #include "url.h"
00041 
00042 //#define DEBUG
00043 
00045 typedef enum {
00046     STATE_START,      
00047     STATE_HANDSHAKED, 
00048     STATE_RELEASING,  
00049     STATE_FCPUBLISH,  
00050     STATE_CONNECTING, 
00051     STATE_READY,      
00052     STATE_PLAYING,    
00053     STATE_PUBLISHING, 
00054     STATE_STOPPED,    
00055 } ClientState;
00056 
00058 typedef struct RTMPContext {
00059     URLContext*   stream;                     
00060     RTMPPacket    prev_pkt[2][RTMP_CHANNELS]; 
00061     int           chunk_size;                 
00062     int           is_input;                   
00063     char          playpath[256];              
00064     char          app[128];                   
00065     ClientState   state;                      
00066     int           main_channel_id;            
00067     uint8_t*      flv_data;                   
00068     int           flv_size;                   
00069     int           flv_off;                    
00070     RTMPPacket    out_pkt;                    
00071     uint32_t      client_report_size;         
00072     uint32_t      bytes_read;                 
00073     uint32_t      last_bytes_read;            
00074     int           skip_bytes;                 
00075     uint8_t       flv_header[11];             
00076     int           flv_header_bytes;           
00077     int           nb_invokes;                 
00078     int           create_stream_invoke;       
00079 } RTMPContext;
00080 
00081 #define PLAYER_KEY_OPEN_PART_LEN 30   ///< length of partial key used for first client digest signing
00082 
00083 static const uint8_t rtmp_player_key[] = {
00084     'G', 'e', 'n', 'u', 'i', 'n', 'e', ' ', 'A', 'd', 'o', 'b', 'e', ' ',
00085     'F', 'l', 'a', 's', 'h', ' ', 'P', 'l', 'a', 'y', 'e', 'r', ' ', '0', '0', '1',
00086 
00087     0xF0, 0xEE, 0xC2, 0x4A, 0x80, 0x68, 0xBE, 0xE8, 0x2E, 0x00, 0xD0, 0xD1, 0x02,
00088     0x9E, 0x7E, 0x57, 0x6E, 0xEC, 0x5D, 0x2D, 0x29, 0x80, 0x6F, 0xAB, 0x93, 0xB8,
00089     0xE6, 0x36, 0xCF, 0xEB, 0x31, 0xAE
00090 };
00091 
00092 #define SERVER_KEY_OPEN_PART_LEN 36   ///< length of partial key used for first server digest signing
00093 
00094 static const uint8_t rtmp_server_key[] = {
00095     'G', 'e', 'n', 'u', 'i', 'n', 'e', ' ', 'A', 'd', 'o', 'b', 'e', ' ',
00096     'F', 'l', 'a', 's', 'h', ' ', 'M', 'e', 'd', 'i', 'a', ' ',
00097     'S', 'e', 'r', 'v', 'e', 'r', ' ', '0', '0', '1',
00098 
00099     0xF0, 0xEE, 0xC2, 0x4A, 0x80, 0x68, 0xBE, 0xE8, 0x2E, 0x00, 0xD0, 0xD1, 0x02,
00100     0x9E, 0x7E, 0x57, 0x6E, 0xEC, 0x5D, 0x2D, 0x29, 0x80, 0x6F, 0xAB, 0x93, 0xB8,
00101     0xE6, 0x36, 0xCF, 0xEB, 0x31, 0xAE
00102 };
00103 
00107 static void gen_connect(URLContext *s, RTMPContext *rt, const char *proto,
00108                         const char *host, int port)
00109 {
00110     RTMPPacket pkt;
00111     uint8_t ver[64], *p;
00112     char tcurl[512];
00113 
00114     ff_rtmp_packet_create(&pkt, RTMP_SYSTEM_CHANNEL, RTMP_PT_INVOKE, 0, 4096);
00115     p = pkt.data;
00116 
00117     ff_url_join(tcurl, sizeof(tcurl), proto, NULL, host, port, "/%s", rt->app);
00118     ff_amf_write_string(&p, "connect");
00119     ff_amf_write_number(&p, ++rt->nb_invokes);
00120     ff_amf_write_object_start(&p);
00121     ff_amf_write_field_name(&p, "app");
00122     ff_amf_write_string(&p, rt->app);
00123 
00124     if (rt->is_input) {
00125         snprintf(ver, sizeof(ver), "%s %d,%d,%d,%d", RTMP_CLIENT_PLATFORM, RTMP_CLIENT_VER1,
00126                  RTMP_CLIENT_VER2, RTMP_CLIENT_VER3, RTMP_CLIENT_VER4);
00127     } else {
00128         snprintf(ver, sizeof(ver), "FMLE/3.0 (compatible; %s)", LIBAVFORMAT_IDENT);
00129         ff_amf_write_field_name(&p, "type");
00130         ff_amf_write_string(&p, "nonprivate");
00131     }
00132     ff_amf_write_field_name(&p, "flashVer");
00133     ff_amf_write_string(&p, ver);
00134     ff_amf_write_field_name(&p, "tcUrl");
00135     ff_amf_write_string(&p, tcurl);
00136     if (rt->is_input) {
00137         ff_amf_write_field_name(&p, "fpad");
00138         ff_amf_write_bool(&p, 0);
00139         ff_amf_write_field_name(&p, "capabilities");
00140         ff_amf_write_number(&p, 15.0);
00141         ff_amf_write_field_name(&p, "audioCodecs");
00142         ff_amf_write_number(&p, 1639.0);
00143         ff_amf_write_field_name(&p, "videoCodecs");
00144         ff_amf_write_number(&p, 252.0);
00145         ff_amf_write_field_name(&p, "videoFunction");
00146         ff_amf_write_number(&p, 1.0);
00147     }
00148     ff_amf_write_object_end(&p);
00149 
00150     pkt.data_size = p - pkt.data;
00151 
00152     ff_rtmp_packet_write(rt->stream, &pkt, rt->chunk_size, rt->prev_pkt[1]);
00153     ff_rtmp_packet_destroy(&pkt);
00154 }
00155 
00160 static void gen_release_stream(URLContext *s, RTMPContext *rt)
00161 {
00162     RTMPPacket pkt;
00163     uint8_t *p;
00164 
00165     ff_rtmp_packet_create(&pkt, RTMP_SYSTEM_CHANNEL, RTMP_PT_INVOKE, 0,
00166                           29 + strlen(rt->playpath));
00167 
00168     av_log(s, AV_LOG_DEBUG, "Releasing stream...\n");
00169     p = pkt.data;
00170     ff_amf_write_string(&p, "releaseStream");
00171     ff_amf_write_number(&p, ++rt->nb_invokes);
00172     ff_amf_write_null(&p);
00173     ff_amf_write_string(&p, rt->playpath);
00174 
00175     ff_rtmp_packet_write(rt->stream, &pkt, rt->chunk_size, rt->prev_pkt[1]);
00176     ff_rtmp_packet_destroy(&pkt);
00177 }
00178 
00183 static void gen_fcpublish_stream(URLContext *s, RTMPContext *rt)
00184 {
00185     RTMPPacket pkt;
00186     uint8_t *p;
00187 
00188     ff_rtmp_packet_create(&pkt, RTMP_SYSTEM_CHANNEL, RTMP_PT_INVOKE, 0,
00189                           25 + strlen(rt->playpath));
00190 
00191     av_log(s, AV_LOG_DEBUG, "FCPublish stream...\n");
00192     p = pkt.data;
00193     ff_amf_write_string(&p, "FCPublish");
00194     ff_amf_write_number(&p, ++rt->nb_invokes);
00195     ff_amf_write_null(&p);
00196     ff_amf_write_string(&p, rt->playpath);
00197 
00198     ff_rtmp_packet_write(rt->stream, &pkt, rt->chunk_size, rt->prev_pkt[1]);
00199     ff_rtmp_packet_destroy(&pkt);
00200 }
00201 
00206 static void gen_fcunpublish_stream(URLContext *s, RTMPContext *rt)
00207 {
00208     RTMPPacket pkt;
00209     uint8_t *p;
00210 
00211     ff_rtmp_packet_create(&pkt, RTMP_SYSTEM_CHANNEL, RTMP_PT_INVOKE, 0,
00212                           27 + strlen(rt->playpath));
00213 
00214     av_log(s, AV_LOG_DEBUG, "UnPublishing stream...\n");
00215     p = pkt.data;
00216     ff_amf_write_string(&p, "FCUnpublish");
00217     ff_amf_write_number(&p, ++rt->nb_invokes);
00218     ff_amf_write_null(&p);
00219     ff_amf_write_string(&p, rt->playpath);
00220 
00221     ff_rtmp_packet_write(rt->stream, &pkt, rt->chunk_size, rt->prev_pkt[1]);
00222     ff_rtmp_packet_destroy(&pkt);
00223 }
00224 
00229 static void gen_create_stream(URLContext *s, RTMPContext *rt)
00230 {
00231     RTMPPacket pkt;
00232     uint8_t *p;
00233 
00234     av_log(s, AV_LOG_DEBUG, "Creating stream...\n");
00235     ff_rtmp_packet_create(&pkt, RTMP_SYSTEM_CHANNEL, RTMP_PT_INVOKE, 0, 25);
00236 
00237     p = pkt.data;
00238     ff_amf_write_string(&p, "createStream");
00239     ff_amf_write_number(&p, ++rt->nb_invokes);
00240     ff_amf_write_null(&p);
00241     rt->create_stream_invoke = rt->nb_invokes;
00242 
00243     ff_rtmp_packet_write(rt->stream, &pkt, rt->chunk_size, rt->prev_pkt[1]);
00244     ff_rtmp_packet_destroy(&pkt);
00245 }
00246 
00247 
00252 static void gen_delete_stream(URLContext *s, RTMPContext *rt)
00253 {
00254     RTMPPacket pkt;
00255     uint8_t *p;
00256 
00257     av_log(s, AV_LOG_DEBUG, "Deleting stream...\n");
00258     ff_rtmp_packet_create(&pkt, RTMP_SYSTEM_CHANNEL, RTMP_PT_INVOKE, 0, 34);
00259 
00260     p = pkt.data;
00261     ff_amf_write_string(&p, "deleteStream");
00262     ff_amf_write_number(&p, ++rt->nb_invokes);
00263     ff_amf_write_null(&p);
00264     ff_amf_write_number(&p, rt->main_channel_id);
00265 
00266     ff_rtmp_packet_write(rt->stream, &pkt, rt->chunk_size, rt->prev_pkt[1]);
00267     ff_rtmp_packet_destroy(&pkt);
00268 }
00269 
00274 static void gen_play(URLContext *s, RTMPContext *rt)
00275 {
00276     RTMPPacket pkt;
00277     uint8_t *p;
00278 
00279     av_log(s, AV_LOG_DEBUG, "Sending play command for '%s'\n", rt->playpath);
00280     ff_rtmp_packet_create(&pkt, RTMP_VIDEO_CHANNEL, RTMP_PT_INVOKE, 0,
00281                           20 + strlen(rt->playpath));
00282     pkt.extra = rt->main_channel_id;
00283 
00284     p = pkt.data;
00285     ff_amf_write_string(&p, "play");
00286     ff_amf_write_number(&p, ++rt->nb_invokes);
00287     ff_amf_write_null(&p);
00288     ff_amf_write_string(&p, rt->playpath);
00289 
00290     ff_rtmp_packet_write(rt->stream, &pkt, rt->chunk_size, rt->prev_pkt[1]);
00291     ff_rtmp_packet_destroy(&pkt);
00292 
00293     // set client buffer time disguised in ping packet
00294     ff_rtmp_packet_create(&pkt, RTMP_NETWORK_CHANNEL, RTMP_PT_PING, 1, 10);
00295 
00296     p = pkt.data;
00297     bytestream_put_be16(&p, 3);
00298     bytestream_put_be32(&p, 1);
00299     bytestream_put_be32(&p, 256); //TODO: what is a good value here?
00300 
00301     ff_rtmp_packet_write(rt->stream, &pkt, rt->chunk_size, rt->prev_pkt[1]);
00302     ff_rtmp_packet_destroy(&pkt);
00303 }
00304 
00308 static void gen_publish(URLContext *s, RTMPContext *rt)
00309 {
00310     RTMPPacket pkt;
00311     uint8_t *p;
00312 
00313     av_log(s, AV_LOG_DEBUG, "Sending publish command for '%s'\n", rt->playpath);
00314     ff_rtmp_packet_create(&pkt, RTMP_SOURCE_CHANNEL, RTMP_PT_INVOKE, 0,
00315                           30 + strlen(rt->playpath));
00316     pkt.extra = rt->main_channel_id;
00317 
00318     p = pkt.data;
00319     ff_amf_write_string(&p, "publish");
00320     ff_amf_write_number(&p, ++rt->nb_invokes);
00321     ff_amf_write_null(&p);
00322     ff_amf_write_string(&p, rt->playpath);
00323     ff_amf_write_string(&p, "live");
00324 
00325     ff_rtmp_packet_write(rt->stream, &pkt, rt->chunk_size, rt->prev_pkt[1]);
00326     ff_rtmp_packet_destroy(&pkt);
00327 }
00328 
00332 static void gen_pong(URLContext *s, RTMPContext *rt, RTMPPacket *ppkt)
00333 {
00334     RTMPPacket pkt;
00335     uint8_t *p;
00336 
00337     ff_rtmp_packet_create(&pkt, RTMP_NETWORK_CHANNEL, RTMP_PT_PING, ppkt->timestamp + 1, 6);
00338     p = pkt.data;
00339     bytestream_put_be16(&p, 7);
00340     bytestream_put_be32(&p, AV_RB32(ppkt->data+2));
00341     ff_rtmp_packet_write(rt->stream, &pkt, rt->chunk_size, rt->prev_pkt[1]);
00342     ff_rtmp_packet_destroy(&pkt);
00343 }
00344 
00348 static void gen_bytes_read(URLContext *s, RTMPContext *rt, uint32_t ts)
00349 {
00350     RTMPPacket pkt;
00351     uint8_t *p;
00352 
00353     ff_rtmp_packet_create(&pkt, RTMP_NETWORK_CHANNEL, RTMP_PT_BYTES_READ, ts, 4);
00354     p = pkt.data;
00355     bytestream_put_be32(&p, rt->bytes_read);
00356     ff_rtmp_packet_write(rt->stream, &pkt, rt->chunk_size, rt->prev_pkt[1]);
00357     ff_rtmp_packet_destroy(&pkt);
00358 }
00359 
00360 //TODO: Move HMAC code somewhere. Eventually.
00361 #define HMAC_IPAD_VAL 0x36
00362 #define HMAC_OPAD_VAL 0x5C
00363 
00375 static void rtmp_calc_digest(const uint8_t *src, int len, int gap,
00376                              const uint8_t *key, int keylen, uint8_t *dst)
00377 {
00378     struct AVSHA *sha;
00379     uint8_t hmac_buf[64+32] = {0};
00380     int i;
00381 
00382     sha = av_mallocz(av_sha_size);
00383 
00384     if (keylen < 64) {
00385         memcpy(hmac_buf, key, keylen);
00386     } else {
00387         av_sha_init(sha, 256);
00388         av_sha_update(sha,key, keylen);
00389         av_sha_final(sha, hmac_buf);
00390     }
00391     for (i = 0; i < 64; i++)
00392         hmac_buf[i] ^= HMAC_IPAD_VAL;
00393 
00394     av_sha_init(sha, 256);
00395     av_sha_update(sha, hmac_buf, 64);
00396     if (gap <= 0) {
00397         av_sha_update(sha, src, len);
00398     } else { //skip 32 bytes used for storing digest
00399         av_sha_update(sha, src, gap);
00400         av_sha_update(sha, src + gap + 32, len - gap - 32);
00401     }
00402     av_sha_final(sha, hmac_buf + 64);
00403 
00404     for (i = 0; i < 64; i++)
00405         hmac_buf[i] ^= HMAC_IPAD_VAL ^ HMAC_OPAD_VAL; //reuse XORed key for opad
00406     av_sha_init(sha, 256);
00407     av_sha_update(sha, hmac_buf, 64+32);
00408     av_sha_final(sha, dst);
00409 
00410     av_free(sha);
00411 }
00412 
00420 static int rtmp_handshake_imprint_with_digest(uint8_t *buf)
00421 {
00422     int i, digest_pos = 0;
00423 
00424     for (i = 8; i < 12; i++)
00425         digest_pos += buf[i];
00426     digest_pos = (digest_pos % 728) + 12;
00427 
00428     rtmp_calc_digest(buf, RTMP_HANDSHAKE_PACKET_SIZE, digest_pos,
00429                      rtmp_player_key, PLAYER_KEY_OPEN_PART_LEN,
00430                      buf + digest_pos);
00431     return digest_pos;
00432 }
00433 
00441 static int rtmp_validate_digest(uint8_t *buf, int off)
00442 {
00443     int i, digest_pos = 0;
00444     uint8_t digest[32];
00445 
00446     for (i = 0; i < 4; i++)
00447         digest_pos += buf[i + off];
00448     digest_pos = (digest_pos % 728) + off + 4;
00449 
00450     rtmp_calc_digest(buf, RTMP_HANDSHAKE_PACKET_SIZE, digest_pos,
00451                      rtmp_server_key, SERVER_KEY_OPEN_PART_LEN,
00452                      digest);
00453     if (!memcmp(digest, buf + digest_pos, 32))
00454         return digest_pos;
00455     return 0;
00456 }
00457 
00464 static int rtmp_handshake(URLContext *s, RTMPContext *rt)
00465 {
00466     AVLFG rnd;
00467     uint8_t tosend    [RTMP_HANDSHAKE_PACKET_SIZE+1] = {
00468         3,                // unencrypted data
00469         0, 0, 0, 0,       // client uptime
00470         RTMP_CLIENT_VER1,
00471         RTMP_CLIENT_VER2,
00472         RTMP_CLIENT_VER3,
00473         RTMP_CLIENT_VER4,
00474     };
00475     uint8_t clientdata[RTMP_HANDSHAKE_PACKET_SIZE];
00476     uint8_t serverdata[RTMP_HANDSHAKE_PACKET_SIZE+1];
00477     int i;
00478     int server_pos, client_pos;
00479     uint8_t digest[32];
00480 
00481     av_log(s, AV_LOG_DEBUG, "Handshaking...\n");
00482 
00483     av_lfg_init(&rnd, 0xDEADC0DE);
00484     // generate handshake packet - 1536 bytes of pseudorandom data
00485     for (i = 9; i <= RTMP_HANDSHAKE_PACKET_SIZE; i++)
00486         tosend[i] = av_lfg_get(&rnd) >> 24;
00487     client_pos = rtmp_handshake_imprint_with_digest(tosend + 1);
00488 
00489     ffurl_write(rt->stream, tosend, RTMP_HANDSHAKE_PACKET_SIZE + 1);
00490     i = ffurl_read_complete(rt->stream, serverdata, RTMP_HANDSHAKE_PACKET_SIZE + 1);
00491     if (i != RTMP_HANDSHAKE_PACKET_SIZE + 1) {
00492         av_log(s, AV_LOG_ERROR, "Cannot read RTMP handshake response\n");
00493         return -1;
00494     }
00495     i = ffurl_read_complete(rt->stream, clientdata, RTMP_HANDSHAKE_PACKET_SIZE);
00496     if (i != RTMP_HANDSHAKE_PACKET_SIZE) {
00497         av_log(s, AV_LOG_ERROR, "Cannot read RTMP handshake response\n");
00498         return -1;
00499     }
00500 
00501     av_log(s, AV_LOG_DEBUG, "Server version %d.%d.%d.%d\n",
00502            serverdata[5], serverdata[6], serverdata[7], serverdata[8]);
00503 
00504     if (rt->is_input && serverdata[5] >= 3) {
00505         server_pos = rtmp_validate_digest(serverdata + 1, 772);
00506         if (!server_pos) {
00507             server_pos = rtmp_validate_digest(serverdata + 1, 8);
00508             if (!server_pos) {
00509                 av_log(s, AV_LOG_ERROR, "Server response validating failed\n");
00510                 return -1;
00511             }
00512         }
00513 
00514         rtmp_calc_digest(tosend + 1 + client_pos, 32, 0,
00515                          rtmp_server_key, sizeof(rtmp_server_key),
00516                          digest);
00517         rtmp_calc_digest(clientdata, RTMP_HANDSHAKE_PACKET_SIZE-32, 0,
00518                          digest, 32,
00519                          digest);
00520         if (memcmp(digest, clientdata + RTMP_HANDSHAKE_PACKET_SIZE - 32, 32)) {
00521             av_log(s, AV_LOG_ERROR, "Signature mismatch\n");
00522             return -1;
00523         }
00524 
00525         for (i = 0; i < RTMP_HANDSHAKE_PACKET_SIZE; i++)
00526             tosend[i] = av_lfg_get(&rnd) >> 24;
00527         rtmp_calc_digest(serverdata + 1 + server_pos, 32, 0,
00528                          rtmp_player_key, sizeof(rtmp_player_key),
00529                          digest);
00530         rtmp_calc_digest(tosend,  RTMP_HANDSHAKE_PACKET_SIZE - 32, 0,
00531                          digest, 32,
00532                          tosend + RTMP_HANDSHAKE_PACKET_SIZE - 32);
00533 
00534         // write reply back to the server
00535         ffurl_write(rt->stream, tosend, RTMP_HANDSHAKE_PACKET_SIZE);
00536     } else {
00537         ffurl_write(rt->stream, serverdata+1, RTMP_HANDSHAKE_PACKET_SIZE);
00538     }
00539 
00540     return 0;
00541 }
00542 
00549 static int rtmp_parse_result(URLContext *s, RTMPContext *rt, RTMPPacket *pkt)
00550 {
00551     int i, t;
00552     const uint8_t *data_end = pkt->data + pkt->data_size;
00553 
00554 #ifdef DEBUG
00555     ff_rtmp_packet_dump(s, pkt);
00556 #endif
00557 
00558     switch (pkt->type) {
00559     case RTMP_PT_CHUNK_SIZE:
00560         if (pkt->data_size != 4) {
00561             av_log(s, AV_LOG_ERROR,
00562                    "Chunk size change packet is not 4 bytes long (%d)\n", pkt->data_size);
00563             return -1;
00564         }
00565         if (!rt->is_input)
00566             ff_rtmp_packet_write(rt->stream, pkt, rt->chunk_size, rt->prev_pkt[1]);
00567         rt->chunk_size = AV_RB32(pkt->data);
00568         if (rt->chunk_size <= 0) {
00569             av_log(s, AV_LOG_ERROR, "Incorrect chunk size %d\n", rt->chunk_size);
00570             return -1;
00571         }
00572         av_log(s, AV_LOG_DEBUG, "New chunk size = %d\n", rt->chunk_size);
00573         break;
00574     case RTMP_PT_PING:
00575         t = AV_RB16(pkt->data);
00576         if (t == 6)
00577             gen_pong(s, rt, pkt);
00578         break;
00579     case RTMP_PT_CLIENT_BW:
00580         if (pkt->data_size < 4) {
00581             av_log(s, AV_LOG_ERROR,
00582                    "Client bandwidth report packet is less than 4 bytes long (%d)\n",
00583                    pkt->data_size);
00584             return -1;
00585         }
00586         av_log(s, AV_LOG_DEBUG, "Client bandwidth = %d\n", AV_RB32(pkt->data));
00587         rt->client_report_size = AV_RB32(pkt->data) >> 1;
00588         break;
00589     case RTMP_PT_INVOKE:
00590         //TODO: check for the messages sent for wrong state?
00591         if (!memcmp(pkt->data, "\002\000\006_error", 9)) {
00592             uint8_t tmpstr[256];
00593 
00594             if (!ff_amf_get_field_value(pkt->data + 9, data_end,
00595                                         "description", tmpstr, sizeof(tmpstr)))
00596                 av_log(s, AV_LOG_ERROR, "Server error: %s\n",tmpstr);
00597             return -1;
00598         } else if (!memcmp(pkt->data, "\002\000\007_result", 10)) {
00599             switch (rt->state) {
00600             case STATE_HANDSHAKED:
00601                 if (!rt->is_input) {
00602                     gen_release_stream(s, rt);
00603                     gen_fcpublish_stream(s, rt);
00604                     rt->state = STATE_RELEASING;
00605                 } else {
00606                     rt->state = STATE_CONNECTING;
00607                 }
00608                 gen_create_stream(s, rt);
00609                 break;
00610             case STATE_FCPUBLISH:
00611                 rt->state = STATE_CONNECTING;
00612                 break;
00613             case STATE_RELEASING:
00614                 rt->state = STATE_FCPUBLISH;
00615                 /* hack for Wowza Media Server, it does not send result for
00616                  * releaseStream and FCPublish calls */
00617                 if (!pkt->data[10]) {
00618                     int pkt_id = av_int2double(AV_RB64(pkt->data + 11));
00619                     if (pkt_id == rt->create_stream_invoke)
00620                         rt->state = STATE_CONNECTING;
00621                 }
00622                 if (rt->state != STATE_CONNECTING)
00623                     break;
00624             case STATE_CONNECTING:
00625                 //extract a number from the result
00626                 if (pkt->data[10] || pkt->data[19] != 5 || pkt->data[20]) {
00627                     av_log(s, AV_LOG_WARNING, "Unexpected reply on connect()\n");
00628                 } else {
00629                     rt->main_channel_id = av_int2double(AV_RB64(pkt->data + 21));
00630                 }
00631                 if (rt->is_input) {
00632                     gen_play(s, rt);
00633                 } else {
00634                     gen_publish(s, rt);
00635                 }
00636                 rt->state = STATE_READY;
00637                 break;
00638             }
00639         } else if (!memcmp(pkt->data, "\002\000\010onStatus", 11)) {
00640             const uint8_t* ptr = pkt->data + 11;
00641             uint8_t tmpstr[256];
00642 
00643             for (i = 0; i < 2; i++) {
00644                 t = ff_amf_tag_size(ptr, data_end);
00645                 if (t < 0)
00646                     return 1;
00647                 ptr += t;
00648             }
00649             t = ff_amf_get_field_value(ptr, data_end,
00650                                        "level", tmpstr, sizeof(tmpstr));
00651             if (!t && !strcmp(tmpstr, "error")) {
00652                 if (!ff_amf_get_field_value(ptr, data_end,
00653                                             "description", tmpstr, sizeof(tmpstr)))
00654                     av_log(s, AV_LOG_ERROR, "Server error: %s\n",tmpstr);
00655                 return -1;
00656             }
00657             t = ff_amf_get_field_value(ptr, data_end,
00658                                        "code", tmpstr, sizeof(tmpstr));
00659             if (!t && !strcmp(tmpstr, "NetStream.Play.Start")) rt->state = STATE_PLAYING;
00660             if (!t && !strcmp(tmpstr, "NetStream.Play.Stop")) rt->state = STATE_STOPPED;
00661             if (!t && !strcmp(tmpstr, "NetStream.Play.UnpublishNotify")) rt->state = STATE_STOPPED;
00662             if (!t && !strcmp(tmpstr, "NetStream.Publish.Start")) rt->state = STATE_PUBLISHING;
00663         }
00664         break;
00665     }
00666     return 0;
00667 }
00668 
00680 static int get_packet(URLContext *s, int for_header)
00681 {
00682     RTMPContext *rt = s->priv_data;
00683     int ret;
00684     uint8_t *p;
00685     const uint8_t *next;
00686     uint32_t data_size;
00687     uint32_t ts, cts, pts=0;
00688 
00689     if (rt->state == STATE_STOPPED)
00690         return AVERROR_EOF;
00691 
00692     for (;;) {
00693         RTMPPacket rpkt = { 0 };
00694         if ((ret = ff_rtmp_packet_read(rt->stream, &rpkt,
00695                                        rt->chunk_size, rt->prev_pkt[0])) <= 0) {
00696             if (ret == 0) {
00697                 return AVERROR(EAGAIN);
00698             } else {
00699                 return AVERROR(EIO);
00700             }
00701         }
00702         rt->bytes_read += ret;
00703         if (rt->bytes_read - rt->last_bytes_read > rt->client_report_size) {
00704             av_log(s, AV_LOG_DEBUG, "Sending bytes read report\n");
00705             gen_bytes_read(s, rt, rpkt.timestamp + 1);
00706             rt->last_bytes_read = rt->bytes_read;
00707         }
00708 
00709         ret = rtmp_parse_result(s, rt, &rpkt);
00710         if (ret < 0) {//serious error in current packet
00711             ff_rtmp_packet_destroy(&rpkt);
00712             return -1;
00713         }
00714         if (rt->state == STATE_STOPPED) {
00715             ff_rtmp_packet_destroy(&rpkt);
00716             return AVERROR_EOF;
00717         }
00718         if (for_header && (rt->state == STATE_PLAYING || rt->state == STATE_PUBLISHING)) {
00719             ff_rtmp_packet_destroy(&rpkt);
00720             return 0;
00721         }
00722         if (!rpkt.data_size || !rt->is_input) {
00723             ff_rtmp_packet_destroy(&rpkt);
00724             continue;
00725         }
00726         if (rpkt.type == RTMP_PT_VIDEO || rpkt.type == RTMP_PT_AUDIO ||
00727            (rpkt.type == RTMP_PT_NOTIFY && !memcmp("\002\000\012onMetaData", rpkt.data, 13))) {
00728             ts = rpkt.timestamp;
00729 
00730             // generate packet header and put data into buffer for FLV demuxer
00731             rt->flv_off  = 0;
00732             rt->flv_size = rpkt.data_size + 15;
00733             rt->flv_data = p = av_realloc(rt->flv_data, rt->flv_size);
00734             bytestream_put_byte(&p, rpkt.type);
00735             bytestream_put_be24(&p, rpkt.data_size);
00736             bytestream_put_be24(&p, ts);
00737             bytestream_put_byte(&p, ts >> 24);
00738             bytestream_put_be24(&p, 0);
00739             bytestream_put_buffer(&p, rpkt.data, rpkt.data_size);
00740             bytestream_put_be32(&p, 0);
00741             ff_rtmp_packet_destroy(&rpkt);
00742             return 0;
00743         } else if (rpkt.type == RTMP_PT_METADATA) {
00744             // we got raw FLV data, make it available for FLV demuxer
00745             rt->flv_off  = 0;
00746             rt->flv_size = rpkt.data_size;
00747             rt->flv_data = av_realloc(rt->flv_data, rt->flv_size);
00748             /* rewrite timestamps */
00749             next = rpkt.data;
00750             ts = rpkt.timestamp;
00751             while (next - rpkt.data < rpkt.data_size - 11) {
00752                 next++;
00753                 data_size = bytestream_get_be24(&next);
00754                 p=next;
00755                 cts = bytestream_get_be24(&next);
00756                 cts |= bytestream_get_byte(&next) << 24;
00757                 if (pts==0)
00758                     pts=cts;
00759                 ts += cts - pts;
00760                 pts = cts;
00761                 bytestream_put_be24(&p, ts);
00762                 bytestream_put_byte(&p, ts >> 24);
00763                 next += data_size + 3 + 4;
00764             }
00765             memcpy(rt->flv_data, rpkt.data, rpkt.data_size);
00766             ff_rtmp_packet_destroy(&rpkt);
00767             return 0;
00768         }
00769         ff_rtmp_packet_destroy(&rpkt);
00770     }
00771 }
00772 
00773 static int rtmp_close(URLContext *h)
00774 {
00775     RTMPContext *rt = h->priv_data;
00776 
00777     if (!rt->is_input) {
00778         rt->flv_data = NULL;
00779         if (rt->out_pkt.data_size)
00780             ff_rtmp_packet_destroy(&rt->out_pkt);
00781         if (rt->state > STATE_FCPUBLISH)
00782             gen_fcunpublish_stream(h, rt);
00783     }
00784     if (rt->state > STATE_HANDSHAKED)
00785         gen_delete_stream(h, rt);
00786 
00787     av_freep(&rt->flv_data);
00788     ffurl_close(rt->stream);
00789     return 0;
00790 }
00791 
00801 static int rtmp_open(URLContext *s, const char *uri, int flags)
00802 {
00803     RTMPContext *rt = s->priv_data;
00804     char proto[8], hostname[256], path[1024], *fname;
00805     uint8_t buf[2048];
00806     int port;
00807     int ret;
00808 
00809     rt->is_input = !(flags & AVIO_FLAG_WRITE);
00810 
00811     av_url_split(proto, sizeof(proto), NULL, 0, hostname, sizeof(hostname), &port,
00812                  path, sizeof(path), s->filename);
00813 
00814     if (port < 0)
00815         port = RTMP_DEFAULT_PORT;
00816     ff_url_join(buf, sizeof(buf), "tcp", NULL, hostname, port, NULL);
00817 
00818     if (ffurl_open(&rt->stream, buf, AVIO_FLAG_READ_WRITE,
00819                    &s->interrupt_callback, NULL) < 0) {
00820         av_log(s , AV_LOG_ERROR, "Cannot open connection %s\n", buf);
00821         goto fail;
00822     }
00823 
00824     rt->state = STATE_START;
00825     if (rtmp_handshake(s, rt))
00826         goto fail;
00827 
00828     rt->chunk_size = 128;
00829     rt->state = STATE_HANDSHAKED;
00830     //extract "app" part from path
00831     if (!strncmp(path, "/ondemand/", 10)) {
00832         fname = path + 10;
00833         memcpy(rt->app, "ondemand", 9);
00834     } else {
00835         char *p = strchr(path + 1, '/');
00836         if (!p) {
00837             fname = path + 1;
00838             rt->app[0] = '\0';
00839         } else {
00840             char *c = strchr(p + 1, ':');
00841             fname = strchr(p + 1, '/');
00842             if (!fname || c < fname) {
00843                 fname = p + 1;
00844                 av_strlcpy(rt->app, path + 1, p - path);
00845             } else {
00846                 fname++;
00847                 av_strlcpy(rt->app, path + 1, fname - path - 1);
00848             }
00849         }
00850     }
00851     if (!strchr(fname, ':') &&
00852         (!strcmp(fname + strlen(fname) - 4, ".f4v") ||
00853          !strcmp(fname + strlen(fname) - 4, ".mp4"))) {
00854         memcpy(rt->playpath, "mp4:", 5);
00855     } else {
00856         rt->playpath[0] = 0;
00857     }
00858     strncat(rt->playpath, fname, sizeof(rt->playpath) - 5);
00859 
00860     rt->client_report_size = 1048576;
00861     rt->bytes_read = 0;
00862     rt->last_bytes_read = 0;
00863 
00864     av_log(s, AV_LOG_DEBUG, "Proto = %s, path = %s, app = %s, fname = %s\n",
00865            proto, path, rt->app, rt->playpath);
00866     gen_connect(s, rt, proto, hostname, port);
00867 
00868     do {
00869         ret = get_packet(s, 1);
00870     } while (ret == EAGAIN);
00871     if (ret < 0)
00872         goto fail;
00873 
00874     if (rt->is_input) {
00875         // generate FLV header for demuxer
00876         rt->flv_size = 13;
00877         rt->flv_data = av_realloc(rt->flv_data, rt->flv_size);
00878         rt->flv_off  = 0;
00879         memcpy(rt->flv_data, "FLV\1\5\0\0\0\011\0\0\0\0", rt->flv_size);
00880     } else {
00881         rt->flv_size = 0;
00882         rt->flv_data = NULL;
00883         rt->flv_off  = 0;
00884         rt->skip_bytes = 13;
00885     }
00886 
00887     s->max_packet_size = rt->stream->max_packet_size;
00888     s->is_streamed     = 1;
00889     return 0;
00890 
00891 fail:
00892     rtmp_close(s);
00893     return AVERROR(EIO);
00894 }
00895 
00896 static int rtmp_read(URLContext *s, uint8_t *buf, int size)
00897 {
00898     RTMPContext *rt = s->priv_data;
00899     int orig_size = size;
00900     int ret;
00901 
00902     while (size > 0) {
00903         int data_left = rt->flv_size - rt->flv_off;
00904 
00905         if (data_left >= size) {
00906             memcpy(buf, rt->flv_data + rt->flv_off, size);
00907             rt->flv_off += size;
00908             return orig_size;
00909         }
00910         if (data_left > 0) {
00911             memcpy(buf, rt->flv_data + rt->flv_off, data_left);
00912             buf  += data_left;
00913             size -= data_left;
00914             rt->flv_off = rt->flv_size;
00915             return data_left;
00916         }
00917         if ((ret = get_packet(s, 0)) < 0)
00918            return ret;
00919     }
00920     return orig_size;
00921 }
00922 
00923 static int rtmp_write(URLContext *s, const uint8_t *buf, int size)
00924 {
00925     RTMPContext *rt = s->priv_data;
00926     int size_temp = size;
00927     int pktsize, pkttype;
00928     uint32_t ts;
00929     const uint8_t *buf_temp = buf;
00930 
00931     do {
00932         if (rt->skip_bytes) {
00933             int skip = FFMIN(rt->skip_bytes, size_temp);
00934             buf_temp       += skip;
00935             size_temp      -= skip;
00936             rt->skip_bytes -= skip;
00937             continue;
00938         }
00939 
00940         if (rt->flv_header_bytes < 11) {
00941             const uint8_t *header = rt->flv_header;
00942             int copy = FFMIN(11 - rt->flv_header_bytes, size_temp);
00943             bytestream_get_buffer(&buf_temp, rt->flv_header + rt->flv_header_bytes, copy);
00944             rt->flv_header_bytes += copy;
00945             size_temp            -= copy;
00946             if (rt->flv_header_bytes < 11)
00947                 break;
00948 
00949             pkttype = bytestream_get_byte(&header);
00950             pktsize = bytestream_get_be24(&header);
00951             ts = bytestream_get_be24(&header);
00952             ts |= bytestream_get_byte(&header) << 24;
00953             bytestream_get_be24(&header);
00954             rt->flv_size = pktsize;
00955 
00956             //force 12bytes header
00957             if (((pkttype == RTMP_PT_VIDEO || pkttype == RTMP_PT_AUDIO) && ts == 0) ||
00958                 pkttype == RTMP_PT_NOTIFY) {
00959                 if (pkttype == RTMP_PT_NOTIFY)
00960                     pktsize += 16;
00961                 rt->prev_pkt[1][RTMP_SOURCE_CHANNEL].channel_id = 0;
00962             }
00963 
00964             //this can be a big packet, it's better to send it right here
00965             ff_rtmp_packet_create(&rt->out_pkt, RTMP_SOURCE_CHANNEL, pkttype, ts, pktsize);
00966             rt->out_pkt.extra = rt->main_channel_id;
00967             rt->flv_data = rt->out_pkt.data;
00968 
00969             if (pkttype == RTMP_PT_NOTIFY)
00970                 ff_amf_write_string(&rt->flv_data, "@setDataFrame");
00971         }
00972 
00973         if (rt->flv_size - rt->flv_off > size_temp) {
00974             bytestream_get_buffer(&buf_temp, rt->flv_data + rt->flv_off, size_temp);
00975             rt->flv_off += size_temp;
00976             size_temp = 0;
00977         } else {
00978             bytestream_get_buffer(&buf_temp, rt->flv_data + rt->flv_off, rt->flv_size - rt->flv_off);
00979             size_temp   -= rt->flv_size - rt->flv_off;
00980             rt->flv_off += rt->flv_size - rt->flv_off;
00981         }
00982 
00983         if (rt->flv_off == rt->flv_size) {
00984             rt->skip_bytes = 4;
00985 
00986             ff_rtmp_packet_write(rt->stream, &rt->out_pkt, rt->chunk_size, rt->prev_pkt[1]);
00987             ff_rtmp_packet_destroy(&rt->out_pkt);
00988             rt->flv_size = 0;
00989             rt->flv_off = 0;
00990             rt->flv_header_bytes = 0;
00991         }
00992     } while (buf_temp - buf < size);
00993     return size;
00994 }
00995 
00996 URLProtocol ff_rtmp_protocol = {
00997     .name           = "rtmp",
00998     .url_open       = rtmp_open,
00999     .url_read       = rtmp_read,
01000     .url_write      = rtmp_write,
01001     .url_close      = rtmp_close,
01002     .priv_data_size = sizeof(RTMPContext),
01003     .flags          = URL_PROTOCOL_FLAG_NETWORK,
01004 };