shithub: riscv

Download patch

ref: f5fe39ad7afa9b761821178ba57f28cea03396b8
parent: 98d01a7719188f1fae193da77ed0949d903ed764
author: cinap_lenrek <cinap_lenrek@centraldogma>
date: Sun Oct 23 20:53:27 EDT 2011

torrent: listen support

--- a/sys/src/cmd/ip/torrent.c
+++ b/sys/src/cmd/ip/torrent.c
@@ -6,6 +6,7 @@
 typedef struct Dict Dict;
 typedef struct Piece Piece;
 typedef struct File File;
+typedef struct Stats Stats;
 
 struct Dict
 {
@@ -33,13 +34,21 @@
 	vlong	len;
 };
 
+struct Stats
+{
+	Lock;
+	vlong	up;
+	vlong	down;
+	vlong	left;
+};
+
 enum {
 	MAXIO = 16*1024,
 };
 
 int debug, sflag, pflag, vflag;
-int pidgroup = -1;
-int port = 48123;
+int killgroup = -1;
+int port = 6881;
 char *mntweb = "/mnt/web";
 uchar infohash[20];
 uchar peerid[20];
@@ -50,8 +59,10 @@
 
 int nhavemap;
 uchar *havemap;
+int nhavepieces;
 
 File *files;
+Stats stats;
 
 void
 freedict(Dict *d)
@@ -208,7 +219,13 @@
 	free(p);
 	if(memcmp(hash, pieces[x].hash, 20))
 		return 0;
-	havemap[x>>3] |= m;
+	lock(&stats);
+	if((havemap[x>>3] & m) == 0){
+		havemap[x>>3] |= m;
+		nhavepieces++;
+		stats.left -= pieces[x].len;
+	}
+	unlock(&stats);
 	return 1;
 }
 
@@ -309,86 +326,60 @@
 	return -1;
 }
 
-void
-peer(char *ip, char *port)
-{
-	static Dict *peers;
-	static QLock peerslk;
 
+
+int
+peer(int fd, int incoming, char *addr)
+{
 	uchar buf[64+MAXIO], *map, *told, *p, m;
-	char *addr;
-	int retry, i, o, l, x, n, fd;
 	int mechoking, hechoking;
 	int mewant, hewant;
 	int workpiece;
-	Dict *d;
+	int i, o, l, x, n;
 
-	if(ip == nil || port == nil)
-		return;
+	if(debug) fprint(2, "peer %s: %s connected\n", addr, incoming ? "incoming" : "outgoing");
 
-	d = mallocz(sizeof(*d) + 64, 1);
-	snprint(addr = d->str, 64, "tcp!%s!%s", ip, port);
-	qlock(&peerslk);
-	if(dlook(peers, addr)){
-		qunlock(&peerslk);
-		free(d);
-		return;
+	for(i=0; i<2; i++){
+		if((incoming && i) || (!incoming && !i)){
+			if(debug) fprint(2, "peer %s: -> handshake\n", addr);
+			n = pack(buf, sizeof(buf), "*________**", 
+				20, "\x13BitTorrent protocol",
+				sizeof(infohash), infohash,
+				sizeof(peerid), peerid);
+			if(write(fd, buf, n) != n)
+				return 1;
+		}
+		if((incoming && !i) || (!incoming && i)){
+			n = 20 + 8 + sizeof(infohash);
+			if((n = readn(fd, buf, n)) != n)
+				return 1;
+			if(memcmp(buf, "\x13BitTorrent protocol", 20))
+				return 0;
+			if(debug) fprint(2, "peer %s: <- handshake\n", addr);
+			if(memcmp(infohash, buf + 20 + 8, sizeof(infohash)))
+				return 0;
+		}
 	}
-	d->len = strlen(addr);
-	d->typ = 'd';
-	d->val = d;
-	d->next = peers;
-	peers = d;
-	qunlock(&peerslk);
+	if(readn(fd, buf, sizeof(peerid)) != sizeof(peerid))
+		return 1;
+	if(memcmp(peerid, buf, sizeof(peerid)) == 0)
+		return 0;
+	if(debug) fprint(2, "peer %s: peerid %.*s\n", addr, sizeof(peerid), (char*)buf);
 
-	if(rfork(RFFDG|RFPROC|RFMEM) <= 0)
-		return;
-
-	fd = -1;
-	retry = 0;
-	map = malloc(nhavemap);
+	mechoking = 1;
+	hechoking = 1;
+	mewant = 0;
+	hewant = 0;
+	workpiece = -1;
+	map = mallocz(nhavemap, 1);
 	told = malloc(nhavemap);
-Retry:
-	if(fd >= 0){
-		close(fd);
-		sleep(10000 + nrand(5000));
-	}
-	if(++retry >= 10)
-		goto Exit;
 
-	if(debug) fprint(2, "dial %s\n", addr);
-	if((fd = dial(addr, nil, nil, nil)) < 0)
-		goto Retry;
-
-	if(debug) fprint(2, "peer %s: -> handshake\n", addr);
-	n = pack(buf, sizeof(buf), "*________**", 
-		20, "\x13BitTorrent protocol",
-		sizeof(infohash), infohash,
-		sizeof(peerid), peerid);
-	if(write(fd, buf, n) != n)
-		goto Retry;
-
-	if(read(fd, buf, 1) != 1)
-		goto Retry;
-	n = buf[0] + 8 + sizeof(infohash) + sizeof(peerid);
-	if((n = readn(fd, buf+1, n)) != n)
-		goto Retry;
-	if(debug) fprint(2, "peer %s: <- handshake %.*s\n", addr, buf[0], (char*)buf+1);
-	if(memcmp(infohash, buf + 1 + buf[0] + 8, sizeof(infohash)))
-		goto Exit;
-
 	if(debug) fprint(2, "peer %s: -> bitfield %d\n", addr, nhavemap);
 	memmove(told, havemap, nhavemap);
 	n = pack(buf, sizeof(buf), "lb*", nhavemap+1, 0x05, nhavemap, havemap);
 	if(write(fd, buf, n) != n)
-		goto Retry;
+		goto Out;
 
-	mechoking = 1;
-	hechoking = 1;
-	mewant = 0;
-	hewant = 0;
-	workpiece = -1;
-	memset(map, 0, nhavemap);
 	for(;;){
 		for(i=0; i<nhavemap; i++){
 			if(told[i] != havemap[i]){
@@ -399,7 +390,7 @@
 					if(debug) fprint(2, "peer %s: -> have %d\n", addr, x);
 					n = pack(buf, sizeof(buf), "lbl", 1+4, 0x04, x);
 					if(write(fd, buf, n) != n)
-						goto Retry;
+						goto Out;
 				}
 			}
 			if(!mewant && (map[i] & ~havemap[i])){
@@ -407,7 +398,7 @@
 				if(debug) fprint(2, "peer %s: -> interested\n", addr);
 				n = pack(buf, sizeof(buf), "lb", 1, 0x02);
 				if(write(fd, buf, n) != n)
-					goto Retry;
+					goto Out;
 			}
 		}
 		if(!hechoking && mewant){
@@ -423,7 +414,7 @@
 				if(debug) fprint(2, "peer %s: -> request %d %d %d\n", addr, x, o, l);
 				n = pack(buf, sizeof(buf), "lblll", 1+4+4+4, 0x06, x, o, l);
 				if(write(fd, buf, n) != n)
-					goto Retry;
+					goto Out;
 				workpiece = x;
 			}
 		}
@@ -432,21 +423,21 @@
 			if(debug) fprint(2, "peer %s: -> unchoke\n", addr);
 			n = pack(buf, sizeof(buf), "lb", 1, 0x01);
 			if(write(fd, buf, n) != n)
-				goto Retry;
+				goto Out;
 		}
 
 		if(readn(fd, buf, 4) != 4)
-			goto Retry;
+			break;
 		unpack(buf, 4, "l", &n);
+		if(n < 0 || n > sizeof(buf))
+			break;
 		if(n == 0)
 			continue;
-		if(n < 0 || n > sizeof(buf))
-			goto Retry;
 		if(readn(fd, buf, n) != n)
-			goto Retry;
-		retry = 0;
-		p = buf+1;
+			break;
+
 		n--;
+		p = buf+1;
 		switch(*buf){
 		case 0x00:	// Choke
 			hechoking = 1;
@@ -467,7 +458,7 @@
 			break;
 		case 0x04:	// Have <piceindex>
 			if(unpack(p, n, "l", &x) < 0)
-				goto Retry;
+				goto Out;
 			if(debug) fprint(2, "peer %s: <- have %d\n", addr, x);
 			if(x < 0 || x >= npieces)
 				continue;
@@ -481,7 +472,7 @@
 			break;
 		case 0x06:	// Request <index> <begin> <length>
 			if(unpack(p, n, "lll", &x, &o, &l) < 0)
-				goto Retry;
+				goto Out;
 			if(debug) fprint(2, "peer %s: <- request %d %d %d\n", addr, x, o, l);
 			if(x < 0 || x >= npieces)
 				continue;
@@ -496,13 +487,19 @@
 			n = pack(buf, sizeof(buf), "lbll", 1+4+4+l, 0x07, x, o);
 			n += l;
 			if(write(fd, buf, n) != n)
-				goto Retry;
+				goto Out;
+			lock(&stats);
+			stats.up += n;
+			unlock(&stats);
 			break;
 		case 0x07:	// Piece <index> <begin> <block>
 			if(unpack(p, n, "ll", &x, &o) != 8)
-				goto Retry;
+				goto Out;
 			p += 8;
 			n -= 8;
+			lock(&stats);
+			stats.down += n;
+			unlock(&stats);
 			if(debug) fprint(2, "peer %s: <- piece %d %d %d\n", addr, x, o, n);
 			if(x < 0 || x >= npieces)
 				continue;
@@ -517,22 +514,105 @@
 			break;
 		case 0x08:	// Cancel <index> <begin> <length>
 			if(unpack(p, n, "lll", &x, &o, &l) < 0)
-				goto Retry;
+				goto Out;
 			if(debug) fprint(2, "peer %s: <- cancel %d %d %d\n", addr, x, o, l);
 			break;
 		case 0x09:	// Port <port>
 			if(unpack(p, n, "l", &x) < 0)
-				goto Retry;
+				goto Out;
 			if(debug) fprint(2, "peer %s: <- port %d\n", addr, x);
 			break;
 		}
 	}
-Exit:
+
+Out:
 	free(told);
 	free(map);
+	return 1;
+}
+
+void
+server(void)
+{
+	char addr[64], adir[40], ldir[40];
+	int afd, lfd, dfd;
+	NetConnInfo *ni;
+
+	afd = -1;
+	for(port=6881; port<6890; port++){
+		snprint(addr, sizeof(addr), "tcp!*!%d", port);
+		if((afd = announce(addr, adir)) >= 0)
+			break;
+	}
+	if(afd < 0){
+		fprint(2, "announce: %r");
+		return;
+	}
+	if(rfork(RFFDG|RFPROC|RFMEM))
+		return;
+	for(;;){
+		if((lfd = listen(adir, ldir)) < 0){
+			fprint(2, "listen: %r");
+			break;
+		}
+		if(rfork(RFFDG|RFPROC|RFMEM)){
+			close(lfd);
+			continue;
+		}
+		if((dfd = accept(lfd, ldir)) < 0){
+			fprint(2, "accept: %r");
+			break;
+		}
+		ni = getnetconninfo(ldir, dfd);
+		peer(dfd, 1, ni ? ni->raddr : "???");
+		if(ni) freenetconninfo(ni);	
+		break;
+	}
 	exits(0);
 }
 
+void
+client(char *ip, char *port)
+{
+	static Dict *peers;
+	static QLock peerslk;
+	int try, fd;
+	char *addr;
+	Dict *d;
+
+	if(ip == nil || port == nil)
+		return;
+
+	d = mallocz(sizeof(*d) + 64, 1);
+	snprint(addr = d->str, 64, "tcp!%s!%s", ip, port);
+	qlock(&peerslk);
+	if(dlook(peers, addr)){
+		qunlock(&peerslk);
+		free(d);
+		return;
+	}
+	d->len = strlen(addr);
+	d->typ = 'd';
+	d->val = d;
+	d->next = peers;
+	peers = d;
+	qunlock(&peerslk);
+
+	if(debug) fprint(2, "client %s\n", addr);
+
+	if(rfork(RFFDG|RFPROC|RFMEM))
+		return;
+	for(try = 0; try < 10; try++){
+		if((fd = dial(addr, nil, nil, nil)) >= 0){
+			if(!peer(fd, 0, addr))
+				break;
+			close(fd);
+		}
+		sleep((1000<<try)+nrand(5000));
+	}
+	exits(0);
+}
+
 int
 hopen(char *url, ...)
 {
@@ -571,9 +651,9 @@
 	static Dict *trackers;
 	static QLock trackerslk;
 
+	Dict *d, *l;
 	int n, fd;
 	char *p;
-	Dict *d, *l;
 
 	if(url == nil)
 		return;
@@ -594,17 +674,32 @@
 	url = d->str;
 	qunlock(&trackerslk);
 
-	if(rfork(RFFDG|RFPROC|RFMEM) <= 0)
+	if(debug) fprint(2, "tracker %s\n", url);
+
+	if(rfork(RFPROC|RFMEM))
 		return;
 
 	for(;;){
+		vlong up, down, left;
+
+		lock(&stats);
+		up = stats.up;
+		down = stats.down;
+		left = stats.left;
+		unlock(&stats);
+
 		d = nil;
-		if((fd = hopen("%s?info_hash=%.*H&peer_id=%.*H&port=%d&compact=1",
-			url, sizeof(infohash), infohash, sizeof(peerid), peerid, port)) >= 0){
+		if((fd = hopen("%s?info_hash=%.*H&peer_id=%.*H&port=%d&"
+			"uploaded=%lld&downloaded=%lld&left=%lld&"
+			"compact=1",
+			url, sizeof(infohash), infohash, sizeof(peerid), peerid, port,
+			up, down, left)) >= 0){
 			n = readall(fd, &p);
 			close(fd);
 			bparse(p, p+n, &d);
 			free(p);
+		} else {
+			if(debug) fprint(2, "tracker %s: %r\n", url);
 		}
 		if(l = dlook(d, "peers")){
 			if(l->typ == 's'){
@@ -617,10 +712,10 @@
 
 					snprint(ip, sizeof(ip), "%d.%d.%d.%d", b[0], b[1], b[2], b[3]);
 					snprint(port, sizeof(port), "%d", b[4]<<8 | b[5]);
-					peer(ip, port);
+					client(ip, port);
 				}
 			} else for(; l && l->typ == 'l'; l = l->next)
-				peer(dstr(dlook(l->val, "ip")), dstr(dlook(l->val, "port")));
+				client(dstr(dlook(l->val, "ip")), dstr(dlook(l->val, "port")));
 		}
 		n = 0;
 		if(p = dstr(dlook(d, "interval")))
@@ -651,34 +746,9 @@
 }
 
 int
-progress(void)
+killnote(void *, char *)
 {
-	int i, c;
-	uchar m;
-	c = 0;
-	for(i=0; i<nhavemap; i++)
-		for(m = 0x80; m; m>>=1)
-			if(havemap[i] & m)
-				c++;
-	if(pflag)
-		print("%d %d\n", c, npieces);
-	return c == npieces;
-}
-
-void
-killcohort(void)
-{
-	int i;
-	for(i=0;i!=3;i++){	/* It's a long way to the kitchen */
-		postnote(PNGROUP, pidgroup, "kill");
-		sleep(1);
-	}
-}
-
-int
-catchnote(void *, char *msg)
-{
-	exits(msg);
+	postnote(PNGROUP, killgroup, "kill");
 	return 0;
 }
 
@@ -788,6 +858,7 @@
 		else
 			pieces[i].len = blocksize;
 		len -= pieces[i].len;
+		stats.left += pieces[i].len;
 	}
 	if(len)
 		sysfatal("pieces do not match file length");
@@ -795,23 +866,31 @@
 	for(i = 0; i<npieces; i++)
 		havepiece(i);
 
-	switch(i = rfork(RFPROC|RFMEM|RFNOTEG|RFNAMEG)){
+	srand(time(0));
+	atnotify(killnote, 1);
+	switch(i = rfork(RFPROC|RFMEM|RFNOTEG)){
 	case -1:
 		sysfatal("fork: %r");
 	case 0:
 		memmove(peerid, "-NF9001-", 8);
-		genrandom(peerid+8, sizeof(peerid)-8);
+		for(i=8; i<sizeof(peerid); i++)
+			peerid[i] = nrand(10)+'0';
+		server();
 		tracker(dstr(dlook(torrent, "announce")));
 		for(d = dlook(torrent, "announce-list"); d && d->typ == 'l'; d = d->next)
 			if(d->val && d->val->typ == 'l')
 				tracker(dstr(d->val->val));
+		while(waitpid() != -1)
+			;
 		break;
 	default:
-		pidgroup = i;
-		atexit(killcohort);
-		atnotify(catchnote, 1);
-		while(!progress() || sflag)
+		killgroup = i;
+		while((nhavepieces < npieces) || sflag){
+			if(pflag)
+				print("%d %d\n", nhavepieces, npieces);
 			sleep(1000);
+		}
 	}
+	postnote(PNGROUP, killgroup, "kill");
 	exits(0);
 }