ref: f5fe39ad7afa9b761821178ba57f28cea03396b8
parent: 98d01a7719188f1fae193da77ed0949d903ed764
author: cinap_lenrek <cinap_lenrek@centraldogma>
date: Sun Oct 23 20:53:27 EDT 2011
torrent: listen support
--- a/sys/src/cmd/ip/torrent.c
+++ b/sys/src/cmd/ip/torrent.c
@@ -6,6 +6,7 @@
typedef struct Dict Dict;
typedef struct Piece Piece;
typedef struct File File;
+typedef struct Stats Stats;
struct Dict
{
@@ -33,13 +34,21 @@
vlong len;
};
+struct Stats
+{
+ Lock;
+ vlong up;
+ vlong down;
+ vlong left;
+};
+
enum {
MAXIO = 16*1024,
};
int debug, sflag, pflag, vflag;
-int pidgroup = -1;
-int port = 48123;
+int killgroup = -1;
+int port = 6881;
char *mntweb = "/mnt/web";
uchar infohash[20];
uchar peerid[20];
@@ -50,8 +59,10 @@
int nhavemap;
uchar *havemap;
+int nhavepieces;
File *files;
+Stats stats;
void
freedict(Dict *d)
@@ -208,7 +219,13 @@
free(p);
if(memcmp(hash, pieces[x].hash, 20))
return 0;
- havemap[x>>3] |= m;
+ lock(&stats);
+ if((havemap[x>>3] & m) == 0){
+ havemap[x>>3] |= m;
+ nhavepieces++;
+ stats.left -= pieces[x].len;
+ }
+ unlock(&stats);
return 1;
}
@@ -309,86 +326,60 @@
return -1;
}
-void
-peer(char *ip, char *port)
-{
- static Dict *peers;
- static QLock peerslk;
+
+int
+peer(int fd, int incoming, char *addr)
+{
uchar buf[64+MAXIO], *map, *told, *p, m;
- char *addr;
- int retry, i, o, l, x, n, fd;
int mechoking, hechoking;
int mewant, hewant;
int workpiece;
- Dict *d;
+ int i, o, l, x, n;
- if(ip == nil || port == nil)
- return;
+ if(debug) fprint(2, "peer %s: %s connected\n", addr, incoming ? "incoming" : "outgoing");
- d = mallocz(sizeof(*d) + 64, 1);
- snprint(addr = d->str, 64, "tcp!%s!%s", ip, port);
- qlock(&peerslk);
- if(dlook(peers, addr)){
- qunlock(&peerslk);
- free(d);
- return;
+ for(i=0; i<2; i++){
+ if((incoming && i) || (!incoming && !i)){
+ if(debug) fprint(2, "peer %s: -> handshake\n", addr);
+ n = pack(buf, sizeof(buf), "*________**",
+ 20, "\x13BitTorrent protocol",
+ sizeof(infohash), infohash,
+ sizeof(peerid), peerid);
+ if(write(fd, buf, n) != n)
+ return 1;
+ }
+ if((incoming && !i) || (!incoming && i)){
+ n = 20 + 8 + sizeof(infohash);
+ if((n = readn(fd, buf, n)) != n)
+ return 1;
+ if(memcmp(buf, "\x13BitTorrent protocol", 20))
+ return 0;
+ if(debug) fprint(2, "peer %s: <- handshake\n", addr);
+ if(memcmp(infohash, buf + 20 + 8, sizeof(infohash)))
+ return 0;
+ }
}
- d->len = strlen(addr);
- d->typ = 'd';
- d->val = d;
- d->next = peers;
- peers = d;
- qunlock(&peerslk);
+ if(readn(fd, buf, sizeof(peerid)) != sizeof(peerid))
+ return 1;
+ if(memcmp(peerid, buf, sizeof(peerid)) == 0)
+ return 0;
+ if(debug) fprint(2, "peer %s: peerid %.*s\n", addr, sizeof(peerid), (char*)buf);
- if(rfork(RFFDG|RFPROC|RFMEM) <= 0)
- return;
-
- fd = -1;
- retry = 0;
- map = malloc(nhavemap);
+ mechoking = 1;
+ hechoking = 1;
+ mewant = 0;
+ hewant = 0;
+ workpiece = -1;
+ map = mallocz(nhavemap, 1);
told = malloc(nhavemap);
-Retry:
- if(fd >= 0){
- close(fd);
- sleep(10000 + nrand(5000));
- }
- if(++retry >= 10)
- goto Exit;
- if(debug) fprint(2, "dial %s\n", addr);
- if((fd = dial(addr, nil, nil, nil)) < 0)
- goto Retry;
-
- if(debug) fprint(2, "peer %s: -> handshake\n", addr);
- n = pack(buf, sizeof(buf), "*________**",
- 20, "\x13BitTorrent protocol",
- sizeof(infohash), infohash,
- sizeof(peerid), peerid);
- if(write(fd, buf, n) != n)
- goto Retry;
-
- if(read(fd, buf, 1) != 1)
- goto Retry;
- n = buf[0] + 8 + sizeof(infohash) + sizeof(peerid);
- if((n = readn(fd, buf+1, n)) != n)
- goto Retry;
- if(debug) fprint(2, "peer %s: <- handshake %.*s\n", addr, buf[0], (char*)buf+1);
- if(memcmp(infohash, buf + 1 + buf[0] + 8, sizeof(infohash)))
- goto Exit;
-
if(debug) fprint(2, "peer %s: -> bitfield %d\n", addr, nhavemap);
memmove(told, havemap, nhavemap);
n = pack(buf, sizeof(buf), "lb*", nhavemap+1, 0x05, nhavemap, havemap);
if(write(fd, buf, n) != n)
- goto Retry;
+ goto Out;
- mechoking = 1;
- hechoking = 1;
- mewant = 0;
- hewant = 0;
- workpiece = -1;
- memset(map, 0, nhavemap);
for(;;){
for(i=0; i<nhavemap; i++){
if(told[i] != havemap[i]){
@@ -399,7 +390,7 @@
if(debug) fprint(2, "peer %s: -> have %d\n", addr, x);
n = pack(buf, sizeof(buf), "lbl", 1+4, 0x04, x);
if(write(fd, buf, n) != n)
- goto Retry;
+ goto Out;
}
}
if(!mewant && (map[i] & ~havemap[i])){
@@ -407,7 +398,7 @@
if(debug) fprint(2, "peer %s: -> interested\n", addr);
n = pack(buf, sizeof(buf), "lb", 1, 0x02);
if(write(fd, buf, n) != n)
- goto Retry;
+ goto Out;
}
}
if(!hechoking && mewant){
@@ -423,7 +414,7 @@
if(debug) fprint(2, "peer %s: -> request %d %d %d\n", addr, x, o, l);
n = pack(buf, sizeof(buf), "lblll", 1+4+4+4, 0x06, x, o, l);
if(write(fd, buf, n) != n)
- goto Retry;
+ goto Out;
workpiece = x;
}
}
@@ -432,21 +423,21 @@
if(debug) fprint(2, "peer %s: -> unchoke\n", addr);
n = pack(buf, sizeof(buf), "lb", 1, 0x01);
if(write(fd, buf, n) != n)
- goto Retry;
+ goto Out;
}
if(readn(fd, buf, 4) != 4)
- goto Retry;
+ break;
unpack(buf, 4, "l", &n);
+ if(n < 0 || n > sizeof(buf))
+ break;
if(n == 0)
continue;
- if(n < 0 || n > sizeof(buf))
- goto Retry;
if(readn(fd, buf, n) != n)
- goto Retry;
- retry = 0;
- p = buf+1;
+ break;
+
n--;
+ p = buf+1;
switch(*buf){
case 0x00: // Choke
hechoking = 1;
@@ -467,7 +458,7 @@
break;
case 0x04: // Have <piceindex>
if(unpack(p, n, "l", &x) < 0)
- goto Retry;
+ goto Out;
if(debug) fprint(2, "peer %s: <- have %d\n", addr, x);
if(x < 0 || x >= npieces)
continue;
@@ -481,7 +472,7 @@
break;
case 0x06: // Request <index> <begin> <length>
if(unpack(p, n, "lll", &x, &o, &l) < 0)
- goto Retry;
+ goto Out;
if(debug) fprint(2, "peer %s: <- request %d %d %d\n", addr, x, o, l);
if(x < 0 || x >= npieces)
continue;
@@ -496,13 +487,19 @@
n = pack(buf, sizeof(buf), "lbll", 1+4+4+l, 0x07, x, o);
n += l;
if(write(fd, buf, n) != n)
- goto Retry;
+ goto Out;
+ lock(&stats);
+ stats.up += n;
+ unlock(&stats);
break;
case 0x07: // Piece <index> <begin> <block>
if(unpack(p, n, "ll", &x, &o) != 8)
- goto Retry;
+ goto Out;
p += 8;
n -= 8;
+ lock(&stats);
+ stats.down += n;
+ unlock(&stats);
if(debug) fprint(2, "peer %s: <- piece %d %d %d\n", addr, x, o, n);
if(x < 0 || x >= npieces)
continue;
@@ -517,22 +514,105 @@
break;
case 0x08: // Cancel <index> <begin> <length>
if(unpack(p, n, "lll", &x, &o, &l) < 0)
- goto Retry;
+ goto Out;
if(debug) fprint(2, "peer %s: <- cancel %d %d %d\n", addr, x, o, l);
break;
case 0x09: // Port <port>
if(unpack(p, n, "l", &x) < 0)
- goto Retry;
+ goto Out;
if(debug) fprint(2, "peer %s: <- port %d\n", addr, x);
break;
}
}
-Exit:
+
+Out:
free(told);
free(map);
+ return 1;
+}
+
+void
+server(void)
+{
+ char addr[64], adir[40], ldir[40];
+ int afd, lfd, dfd;
+ NetConnInfo *ni;
+
+ afd = -1;
+ for(port=6881; port<6890; port++){
+ snprint(addr, sizeof(addr), "tcp!*!%d", port);
+ if((afd = announce(addr, adir)) >= 0)
+ break;
+ }
+ if(afd < 0){
+ fprint(2, "announce: %r");
+ return;
+ }
+ if(rfork(RFFDG|RFPROC|RFMEM))
+ return;
+ for(;;){
+ if((lfd = listen(adir, ldir)) < 0){
+ fprint(2, "listen: %r");
+ break;
+ }
+ if(rfork(RFFDG|RFPROC|RFMEM)){
+ close(lfd);
+ continue;
+ }
+ if((dfd = accept(lfd, ldir)) < 0){
+ fprint(2, "accept: %r");
+ break;
+ }
+ ni = getnetconninfo(ldir, dfd);
+ peer(dfd, 1, ni ? ni->raddr : "???");
+ if(ni) freenetconninfo(ni);
+ break;
+ }
exits(0);
}
+void
+client(char *ip, char *port)
+{
+ static Dict *peers;
+ static QLock peerslk;
+ int try, fd;
+ char *addr;
+ Dict *d;
+
+ if(ip == nil || port == nil)
+ return;
+
+ d = mallocz(sizeof(*d) + 64, 1);
+ snprint(addr = d->str, 64, "tcp!%s!%s", ip, port);
+ qlock(&peerslk);
+ if(dlook(peers, addr)){
+ qunlock(&peerslk);
+ free(d);
+ return;
+ }
+ d->len = strlen(addr);
+ d->typ = 'd';
+ d->val = d;
+ d->next = peers;
+ peers = d;
+ qunlock(&peerslk);
+
+ if(debug) fprint(2, "client %s\n", addr);
+
+ if(rfork(RFFDG|RFPROC|RFMEM))
+ return;
+ for(try = 0; try < 10; try++){
+ if((fd = dial(addr, nil, nil, nil)) >= 0){
+ if(!peer(fd, 0, addr))
+ break;
+ close(fd);
+ }
+ sleep((1000<<try)+nrand(5000));
+ }
+ exits(0);
+}
+
int
hopen(char *url, ...)
{
@@ -571,9 +651,9 @@
static Dict *trackers;
static QLock trackerslk;
+ Dict *d, *l;
int n, fd;
char *p;
- Dict *d, *l;
if(url == nil)
return;
@@ -594,17 +674,32 @@
url = d->str;
qunlock(&trackerslk);
- if(rfork(RFFDG|RFPROC|RFMEM) <= 0)
+ if(debug) fprint(2, "tracker %s\n", url);
+
+ if(rfork(RFPROC|RFMEM))
return;
for(;;){
+ vlong up, down, left;
+
+ lock(&stats);
+ up = stats.up;
+ down = stats.down;
+ left = stats.left;
+ unlock(&stats);
+
d = nil;
- if((fd = hopen("%s?info_hash=%.*H&peer_id=%.*H&port=%d&compact=1",
- url, sizeof(infohash), infohash, sizeof(peerid), peerid, port)) >= 0){
+ if((fd = hopen("%s?info_hash=%.*H&peer_id=%.*H&port=%d&"
+ "uploaded=%lld&downloaded=%lld&left=%lld&"
+ "compact=1",
+ url, sizeof(infohash), infohash, sizeof(peerid), peerid, port,
+ up, down, left)) >= 0){
n = readall(fd, &p);
close(fd);
bparse(p, p+n, &d);
free(p);
+ } else {
+ if(debug) fprint(2, "tracker %s: %r\n", url);
}
if(l = dlook(d, "peers")){
if(l->typ == 's'){
@@ -617,10 +712,10 @@
snprint(ip, sizeof(ip), "%d.%d.%d.%d", b[0], b[1], b[2], b[3]);
snprint(port, sizeof(port), "%d", b[4]<<8 | b[5]);
- peer(ip, port);
+ client(ip, port);
}
} else for(; l && l->typ == 'l'; l = l->next)
- peer(dstr(dlook(l->val, "ip")), dstr(dlook(l->val, "port")));
+ client(dstr(dlook(l->val, "ip")), dstr(dlook(l->val, "port")));
}
n = 0;
if(p = dstr(dlook(d, "interval")))
@@ -651,34 +746,9 @@
}
int
-progress(void)
+killnote(void *, char *)
{
- int i, c;
- uchar m;
- c = 0;
- for(i=0; i<nhavemap; i++)
- for(m = 0x80; m; m>>=1)
- if(havemap[i] & m)
- c++;
- if(pflag)
- print("%d %d\n", c, npieces);
- return c == npieces;
-}
-
-void
-killcohort(void)
-{
- int i;
- for(i=0;i!=3;i++){ /* It's a long way to the kitchen */
- postnote(PNGROUP, pidgroup, "kill");
- sleep(1);
- }
-}
-
-int
-catchnote(void *, char *msg)
-{
- exits(msg);
+ postnote(PNGROUP, killgroup, "kill");
return 0;
}
@@ -788,6 +858,7 @@
else
pieces[i].len = blocksize;
len -= pieces[i].len;
+ stats.left += pieces[i].len;
}
if(len)
sysfatal("pieces do not match file length");
@@ -795,23 +866,31 @@
for(i = 0; i<npieces; i++)
havepiece(i);
- switch(i = rfork(RFPROC|RFMEM|RFNOTEG|RFNAMEG)){
+ srand(time(0));
+ atnotify(killnote, 1);
+ switch(i = rfork(RFPROC|RFMEM|RFNOTEG)){
case -1:
sysfatal("fork: %r");
case 0:
memmove(peerid, "-NF9001-", 8);
- genrandom(peerid+8, sizeof(peerid)-8);
+ for(i=8; i<sizeof(peerid); i++)
+ peerid[i] = nrand(10)+'0';
+ server();
tracker(dstr(dlook(torrent, "announce")));
for(d = dlook(torrent, "announce-list"); d && d->typ == 'l'; d = d->next)
if(d->val && d->val->typ == 'l')
tracker(dstr(d->val->val));
+ while(waitpid() != -1)
+ ;
break;
default:
- pidgroup = i;
- atexit(killcohort);
- atnotify(catchnote, 1);
- while(!progress() || sflag)
+ killgroup = i;
+ while((nhavepieces < npieces) || sflag){
+ if(pflag)
+ print("%d %d\n", nhavepieces, npieces);
sleep(1000);
+ }
}
+ postnote(PNGROUP, killgroup, "kill");
exits(0);
}