ref: c4f625f3c3c5cacce9c401254430cf74f6a68847
dir: /rtmp.c/
#include <u.h> #include <libc.h> #include <thread.h> #include <bio.h> #include <libsec.h> #include "amf.h" #include "ivf.h" #include "rtmp.h" #define min(a,b) ((a)<(b)?(a):(b)) enum { Port = 1935, Sigsz = 1536, Chunk = 128, ChanCtl = 3, SzTiny = 1, SzSmall = 4, SzMedium = 8, SzLarge = 12, PktInvoke = 20, Biobufsz = 64*1024, /* FIXME don't know if it helps with anything */ Bufsz = 64*1024, }; typedef struct Packet Packet; struct Packet { int type; int hsz; int chan; u32int ts; u8int *data; int sz; }; struct RTMP { Biobufhdr; char *app; char *path; char *tcurl; Packet pk; u8int *b, *p, *e; int mode; int bsz; int invokes; int i; u8int biobuf[Biobufsz]; }; #define puti16(i) do{ r->p = amfi16(r->p, r->e, i); }while(0) #define puti24(i) do{ r->p = amfi24(r->p, r->e, i); }while(0) #define puti32(i) do{ r->p = amfi32(r->p, r->e, i); }while(0) #define putnum(v) do{ r->p = amfnum(r->p, r->e, v); }while(0) #define putstr(s) do{ r->p = amfstr(r->p, r->e, s); }while(0) #define putarr() do{ r->p = amfarr(r->p, r->e); }while(0) #define putobj() do{ r->p = amfobj(r->p, r->e); }while(0) #define putend() do{ r->p = amfend(r->p, r->e); }while(0) #define putkvnum(name, v) do{ r->p = amfkvnum(r->p, r->e, name, v); }while(0) #define putkvstr(name, s) do{ r->p = amfkvstr(r->p, r->e, name, s); }while(0) #define putkvbool(name, s) do{ r->p = amfkvbool(r->p, r->e, name, s); }while(0) int rtmpdump = 0; static void newpacket(RTMP *r, int type, int hsz, int chan) { memset(&r->pk, 0, sizeof(r->pk)); r->pk.type = type; r->pk.hsz = hsz; r->pk.chan = chan; r->p = r->b + hsz; r->b[0] = chan; switch(hsz){ case SzTiny: r->b[0] |= 3<<6; break; case SzSmall: r->b[0] |= 2<<6; break; case SzMedium: r->b[0] |= 1<<6; break; } } static int handshake(int f) { u8int cl[1+Sigsz], sv[1+Sigsz]; cl[0] = 3; /* no encryption */ memset(cl+1, 0, 8); prng(cl+1+8, Sigsz-8); if(write(f, cl, sizeof(cl)) != sizeof(cl)) goto err; if(readn(f, sv, sizeof(sv)) != sizeof(sv)) goto err; if(cl[0] != sv[0]){ werrstr("expected %d (no encryption), got %d", cl[0], sv[0]); goto err; } if(write(f, sv+1, Sigsz) != Sigsz) goto err; if(readn(f, sv+1, Sigsz) != Sigsz) goto err; if(memcmp(cl, sv, sizeof(cl)) != 0){ werrstr("signature mismatch"); goto err; } return 0; err: werrstr("handshake: %r"); return -1; } static int rtmpsend(RTMP *r) { int bodysz, n, hsz; u8int *p, *h, *e; assert(r->p != nil); bodysz = r->p - r->b - r->pk.hsz; /* FIXME special case when bodysz is 0 */ h = r->b; e = h + r->pk.hsz; h++; if(r->pk.hsz >= SzSmall){ h = amfi24(h, e, 0); /* FIXME proper timestamps? */ if(r->pk.hsz >= SzMedium){ h = amfi24(h, e, bodysz); h = amfbyte(h, e, r->pk.type); if(r->pk.hsz >= SzLarge) h = amfi32(h, e, 0); /* FIXME seems to be always 0? */ } } if(h == nil) goto err; memset(h, 0, e-h); p = r->b; hsz = e - r->b; for(; hsz+bodysz > 0;){ n = min(bodysz, Chunk); fprint(2, "header is %d bytes, writing %d+%d=%d\n", hsz, hsz, n, hsz+n); if(Bwrite(r, p, hsz+n) < 0) goto err; if(rtmpdump) write(1, p, hsz+n); bodysz -= n; p += hsz+n; hsz = 0; if(bodysz > 0){ *(--p) = 0xc0 | r->b[0]; hsz = 1; } } r->p = nil; return 0; err: werrstr("rtmpsend: %r"); return -1; } static int connect(RTMP *r) { newpacket(r, PktInvoke, SzLarge, ChanCtl); putstr("connect"); putnum(++r->invokes); putobj(); putkvstr("app", r->app); putkvstr("tcUrl", r->tcurl); if(r->mode & OWRITE) putkvstr("type", "nonprivate"); else{ putkvbool("fpad", 0); putkvnum("capabilities", 15); putkvnum("audioCodecs", 3191); putkvnum("videoCodecs", 252); putkvnum("videoFunction", 1); } putend(); return rtmpsend(r); } RTMP * rtmpdial(char *url, int w, int h, int withaudio) { char *s, *e, *path, *app; int f, port, ctl; RTMP *r; r = nil; f = -1; url = strdup(url); /* since we're changing it in-place */ if(memcmp(url, "rtmp://", 7) != 0){ werrstr("invalid url"); goto err; } s = url + 7; if((e = strpbrk(s, ":/")) == nil){ werrstr("no path"); goto err; } port = 1935; if(*e == ':'){ if((port = strtol(e+1, &path, 10)) < 1 || path == e+1 || *path != '/'){ werrstr("invalid port"); goto err; } }else{ path = e; } while(*(++path) == '/'); s = smprint("tcp!%.*s!%d", (int)(e-s), s, port); f = dial(s, nil, nil, &ctl); free(s); if(f < 0) goto err; app = path; if((s = strchr(path, '/')) == nil){ werrstr("no path"); goto err; } if((e = strchr(s+1, '/')) != nil){ /* at this point it can be app instance if there is another slash following */ if((s = strchr(e+1, '/')) == nil){ /* no, just path leftovers */ s = e; } *s = 0; path = s+1; }else{ path = nil; } if(handshake(f) != 0) goto err; if((r = calloc(1, sizeof(*r))) == nil || (r->b = malloc(Bufsz)) == nil) sysfatal("memory"); if((r->app = strdup(app)) == nil || (path != nil && (r->path = strdup(path)) == nil)) sysfatal("memory"); r->tcurl = url; r->bsz = Bufsz; r->e = r->b + r->bsz; Binits(r, f, OWRITE, r->biobuf, sizeof(r->biobuf)); r->i = f; if(connect(r) != 0) goto err; return r; err: werrstr("rtmpdial: %r"); if(r != nil) rtmpclose(r); else if(f >= 0) close(f); free(url); return nil; } void rtmpclose(RTMP *r) { if(r == nil) return; free(r->path); free(r->b); close(r->i); Bterm(r); free(r); }