shithub: riscv

Download patch

ref: 623901ed1fce6212619537ad4a5eb01dc73881b5
parent: 0b003bb7c701037ddc69e4bb145d04dbb99b58e2
author: cinap_lenrek <[email protected]>
date: Wed Jun 27 08:36:15 EDT 2012

aan: fix bugs, initial timeout, portable network encoding

--- a/sys/src/cmd/aan.c
+++ b/sys/src/cmd/aan.c
@@ -4,13 +4,11 @@
 #include <fcall.h>
 #include <thread.h>
 
-#define NS(x)		((vlong)x)
-#define US(x)		(NS(x) * 1000LL)
-#define MS(x)		(US(x) * 1000LL)
-#define S(x)		(MS(x) * 1000LL)
+#define NS(x)	((vlong)x)
+#define US(x)	(NS(x) * 1000LL)
+#define MS(x)	(US(x) * 1000LL)
+#define S(x)	(MS(x) * 1000LL)
 
-#define LOGNAME	"aan"
-
 enum {
 	Synctime = S(8),
 	Nbuf = 10,
@@ -17,7 +15,7 @@
 	K = 1024,
 	Bufsize = 8 * K,
 	Stacksize = 8 * K,
-	Timer = 0,					// Alt channels.
+	Timer = 0,				// Alt channels.
 	Unsent = 1,
 	Maxto = 24 * 3600,			// A full day to reconnect.
 };
@@ -31,29 +29,28 @@
 };
 
 typedef struct {
-	ulong		nb;		// Number of data bytes in this message
-	ulong		msg;		// Message number
-	ulong		acked;	// Number of messages acked
+	uchar	nb[4];		// Number of data bytes in this message
+	uchar	msg[4];		// Message number
+	uchar	acked[4];	// Number of messages acked
 } Hdr;
 
 typedef struct t_Buf {
-	Hdr			hdr;
-	uchar		buf[Bufsize];
+	Hdr	hdr;
+	uchar	buf[Bufsize];
 } Buf;
 
-static char 	*progname;
 static Channel	*unsent;
 static Channel	*unacked;
 static Channel	*empty;
-static int		netfd;
-static int		inmsg;
+static int	netfd;
+static int	inmsg;
 static char	*devdir;
-static int		debug;
-static int		done;
+static int	debug;
+static int	done;
 static char	*dialstring;
-static int		maxto = Maxto;
-static char	*Logname = LOGNAME;
-static int		client;
+static int	maxto = Maxto;
+static char	*Logname = "aan";
+static int	client;
 
 static Alt a[] = {
 	/*	c	v	 op   */
@@ -64,7 +61,7 @@
 
 static void		fromnet(void*);
 static void		fromclient(void*);
-static void		reconnect(void);
+static void		reconnect(int);
 static void		synchronize(void);
 static int 		sendcommand(ulong, ulong);
 static void		showmsg(int, char *, Buf *);
@@ -72,13 +69,13 @@
 static int		getport(char *);
 static void		dmessage(int, char *, ...);
 static void		timerproc(void *);
-static Endpoints *getendpoints(char *);
+static Endpoints*	getendpoints(char *);
 static void		freeendpoints(Endpoints *);
 
 static void
 usage(void)
 {
-	fprint(2, "Usage: %s [-cd] [-m maxto] dialstring|netdir\n", progname);
+	fprint(2, "Usage: %s [-cd] [-m maxto] dialstring|netdir\n", argv0);
 	exits("usage");
 }
 
@@ -86,22 +83,46 @@
 catch(void *, char *s)
 {
 	if (!strcmp(s, "alarm")) {
-		syslog(0, Logname, "Timed out while waiting for client on %s, exiting...",
-			   devdir);
+		syslog(0, Logname, "Timed out while waiting for reconnect, exiting...");
 		threadexitsall(nil);
 	}
 	return 0;
 }
 
+static void*
+emalloc(int n)
+{
+	ulong pc;
+	void *v;
+
+	pc = getcallerpc(&n);
+	v = malloc(n);
+	if(v == nil)
+		sysfatal("Cannot allocate memory; pc=%lux", pc);
+	setmalloctag(v, pc);
+	return v;
+}
+
+static char*
+estrdup(char *s)
+{
+	char *v;
+	int n;
+
+	n = strlen(s)+1;
+	v = emalloc(n);
+	memmove(v, s, n);
+	return v;
+}
+
 void
 threadmain(int argc, char **argv)
 {
-	int i, failed;
-	Buf *b;
-	Channel *timer;
 	vlong synctime;
+	int i, n, failed;
+	Channel *timer;
+	Buf *b;
 
-	progname = argv[0];
 	ARGBEGIN {
 	case 'c':
 		client++;
@@ -110,7 +131,7 @@
 		debug++;
 		break;
 	case 'm':
-		maxto = (int)strtol(EARGF(usage()), (char **)nil, 0);
+		maxto = (int)strtol(EARGF(usage()), nil, 0);
 		break;
 	default:
 		usage();
@@ -138,29 +159,33 @@
 
 	atnotify(catch, 1);
 
+	/*
+	 * Set up initial connection. use short timeout
+	 * of 60 seconds so we wont hang arround for too
+	 * long if there is some general connection problem
+	 * (like NAT).
+	 */
+	netfd = -1;
+	reconnect(60);
+
 	unsent = chancreate(sizeof(Buf *), Nbuf);
 	unacked = chancreate(sizeof(Buf *), Nbuf);
 	empty = chancreate(sizeof(Buf *), Nbuf);
 	timer = chancreate(sizeof(uchar *), 1);
+	if(unsent == nil || unacked == nil || empty == nil || timer == nil)
+		sysfatal("Cannot allocate channels");
 
-	for (i = 0; i != Nbuf; i++) {
-		Buf *b = malloc(sizeof(Buf));
-		sendp(empty, b);
-	}
+	for (i = 0; i < Nbuf; i++)
+		sendp(empty, emalloc(sizeof(Buf)));
 
-	netfd = -1;
-
 	if (proccreate(fromnet, nil, Stacksize) < 0)
-		sysfatal("%s; Cannot start fromnet; %r", progname);
+		sysfatal("Cannot start fromnet; %r");
 
-	reconnect();		// Set up the initial connection.
-	synchronize();
-
 	if (proccreate(fromclient, nil, Stacksize) < 0)
-		sysfatal("%s; Cannot start fromclient; %r", progname);
+		sysfatal("Cannot start fromclient; %r");
 
 	if (proccreate(timerproc, timer, Stacksize) < 0)
-		sysfatal("%s; Cannot start timerproc; %r", progname);
+		sysfatal("Cannot start timerproc; %r");
 
 	a[Timer].c = timer;
 	a[Unsent].c = unsent;
@@ -169,9 +194,6 @@
 	synctime = nsec() + Synctime;
 	failed = 0;
 	while (!done) {
-		vlong now;
-		int delta;
-
 		if (failed) {
 			// Wait for the netreader to die.
 			while (netfd >= 0) {
@@ -180,20 +202,17 @@
 			}
 
 			// the reader died; reestablish the world.
-			reconnect();
+			reconnect(maxto);
 			synchronize();
 			failed = 0;
 		}
 
-		now = nsec();
-		delta = (synctime - nsec()) / MS(1);
-
-		if (delta <= 0) {
+		if (nsec() >= synctime) {
 			Hdr hdr;
 
-			hdr.nb = 0;
-			hdr.acked = inmsg;
-			hdr.msg = -1;
+			PBIT32(hdr.nb, 0);
+			PBIT32(hdr.acked, inmsg);
+			PBIT32(hdr.msg, -1);
 
 			if (writen(netfd, (uchar *)&hdr, sizeof(Hdr)) < 0) {
 				dmessage(2, "main; writen failed; %r\n");
@@ -201,7 +220,6 @@
 				continue;
 			}
 			synctime = nsec() + Synctime;
-			assert(synctime > now);
 		}
 
 		switch (alt(a)) {
@@ -211,7 +229,7 @@
 		case Unsent:
 			sendp(unacked, b);
 
-			b->hdr.acked = inmsg;
+			PBIT32(b->hdr.acked, inmsg);
 
 			if (writen(netfd, (uchar *)&b->hdr, sizeof(Hdr)) < 0) {
 				dmessage(2, "main; writen failed; %r\n");
@@ -218,12 +236,13 @@
 				failed = 1;
 			}
 
-			if (writen(netfd, b->buf, b->hdr.nb) < 0) {
+			n = GBIT32(b->hdr.nb);
+			if (writen(netfd, b->buf, n) < 0) {
 				dmessage(2, "main; writen failed; %r\n");
 				failed = 1;
 			}
 
-			if (b->hdr.nb == 0)
+			if (n == 0)
 				done = 1;
 			break;
 		}
@@ -237,26 +256,25 @@
 fromclient(void*)
 {
 	static int outmsg;
+	int n;
+	Buf *b;
 
-	for (;;) {
-		Buf *b;
-
-		b = recvp(empty);	
-		if ((int)(b->hdr.nb = read(0, b->buf, Bufsize)) <= 0) {
-			if ((int)b->hdr.nb < 0)
+	do {
+		b = recvp(empty);
+		n = read(0, b->buf, Bufsize);
+		if (n <= 0) {
+			if (n < 0)
 				dmessage(2, "fromclient; Cannot read 9P message; %r\n");
 			else
 				dmessage(2, "fromclient; Client terminated\n");
-			b->hdr.nb = 0;
+			n = 0;
 		}
-		b->hdr.msg = outmsg++;
-
+		PBIT32(b->hdr.nb, n);
+		PBIT32(b->hdr.msg, outmsg);
 		showmsg(1, "fromclient", b);
 		sendp(unsent, b);
-		
-		if (b->hdr.nb == 0)
-			break;
-	}
+		outmsg++;
+	} while(n > 0);
 }
 
 static void
@@ -263,22 +281,21 @@
 fromnet(void*)
 {
 	static int lastacked;
+	int n, m, len, acked;
 	Buf *b;
 
-	b = (Buf *)malloc(sizeof(Buf));
-	assert(b);
-
+	b = emalloc(sizeof(Buf));
 	while (!done) {
-		int len, acked, i;
-
 		while (netfd < 0) {
-			dmessage(1, "fromnet; waiting for connection... (inmsg %d)\n", 
-					  inmsg);
+			if(done)
+				return;
+			dmessage(1, "fromnet; waiting for connection... (inmsg %d)\n", inmsg);
 			sleep(1000);
 		}
 
 		// Read the header.
-		if ((len = readn(netfd, &b->hdr, sizeof(Hdr))) <= 0) {
+		len = readn(netfd, (uchar *)&b->hdr, sizeof(Hdr));
+		if (len <= 0) {
 			if (len < 0)
 				dmessage(1, "fromnet; (hdr) network failure; %r\n");
 			else
@@ -287,18 +304,27 @@
 			netfd = -1;
 			continue;
 		}
-		dmessage(2, "fromnet: Got message, size %d, nb %d, msg %d\n", len,
-				b->hdr.nb, b->hdr.msg);
+		n = GBIT32(b->hdr.nb);
+		m = GBIT32(b->hdr.msg);
+		acked = GBIT32(b->hdr.acked);
+		dmessage(2, "fromnet: Got message, size %d, nb %d, msg %d, acked %d, lastacked %d\n",
+			len, n, m, acked, lastacked);
 
-		if (b->hdr.nb == 0) {
-			if  ((long)b->hdr.msg >= 0) {
+		if (n == 0) {
+			if (m >= 0) {
 				dmessage(1, "fromnet; network closed\n");
 				break;
 			}
 			continue;
 		}
-	
-		if ((len = readn(netfd, b->buf, b->hdr.nb)) <= 0 || len != b->hdr.nb) {
+
+		if (n > Bufsize) {
+			dmessage(1, "fromnet; message too big %d > %d\n", n, Bufsize);
+			break;
+		}
+
+		len = readn(netfd, b->buf, n);
+		if (len <= 0 || len != n) {
 			if (len == 0)
 				dmessage(1, "fromnet; network closed\n");
 			else
@@ -308,28 +334,25 @@
 			continue;
 		}
 
-		if (b->hdr.msg < inmsg) {
-			dmessage(1, "fromnet; skipping message %d, currently at %d\n",
-					 b->hdr.msg, inmsg);
+		if (m < inmsg) {
+			dmessage(1, "fromnet; skipping message %d, currently at %d\n", m, inmsg);
 			continue;
 		}			
 
 		// Process the acked list.
-		acked = b->hdr.acked - lastacked;
-		for (i = 0; i != acked; i++) {
+		while(lastacked != acked) {
 			Buf *rb;
 
 			rb = recvp(unacked);
-			if (rb->hdr.msg != lastacked + i) {
-				dmessage(1, "rb %p, msg %d, lastacked %d, i %d\n",
-						rb, rb? rb->hdr.msg: -2, lastacked, i);
-				assert(0);
+			m = GBIT32(rb->hdr.msg);
+			if (m != lastacked) {
+				dmessage(1, "fromnet; rb %p, msg %d, lastacked %d\n", rb, m, lastacked);
+				sysfatal("fromnet; bug");
 			}
-			rb->hdr.msg = -1;
+			PBIT32(rb->hdr.msg, -1);
 			sendp(empty, rb);
+			lastacked++;
 		} 
-		lastacked = b->hdr.acked;
-
 		inmsg++;
 
 		showmsg(1, "fromnet", b);
@@ -341,7 +364,7 @@
 }
 
 static void
-reconnect(void)
+reconnect(int secs)
 {
 	char ldir[40];
 	int lcfd, fd;
@@ -348,6 +371,7 @@
 
 	if (dialstring) {
 		syslog(0, Logname, "dialing %s", dialstring);
+		alarm(secs*1000);
   		while ((fd = dial(dialstring, nil, nil, nil)) < 0) {
 			char err[32];
 
@@ -360,6 +384,7 @@
 			dmessage(1, "reconnect: dialed %s; %s\n", dialstring, err);
 			sleep(1000);
 		}
+		alarm(0);
 		syslog(0, Logname, "reconnected to %s", dialstring);
 	} 
 	else {
@@ -366,10 +391,9 @@
 		Endpoints *ep;
 
 		syslog(0, Logname, "waiting for connection on %s", devdir);
-		alarm(maxto * 1000);
+		alarm(secs*1000);
  		if ((lcfd = listen(devdir, ldir)) < 0) 
 			sysfatal("reconnect; cannot listen; %r");
-	
 		if ((fd = accept(lcfd, ldir)) < 0)
 			sysfatal("reconnect; cannot accept; %r");
 		alarm(0);
@@ -381,7 +405,8 @@
 		freeendpoints(ep);
 	}
 	
-	netfd = fd;		// Wakes up the netreader.
+	// Wakes up the netreader.
+	netfd = fd;
 }
 
 static void
@@ -389,6 +414,7 @@
 {
 	Channel *tmp;
 	Buf *b;
+	int n;
 
 	// Ignore network errors here.  If we fail during 
 	// synchronization, the next alarm will pick up 
@@ -396,7 +422,9 @@
 
 	tmp = chancreate(sizeof(Buf *), Nbuf);
 	while ((b = nbrecvp(unacked)) != nil) {
-		writen(netfd, (uchar *)b, sizeof(Hdr) + b->hdr.nb);
+		n = GBIT32(b->hdr.nb);
+		writen(netfd, (uchar *)&b->hdr, sizeof(Hdr));
+		writen(netfd, b->buf, n);
 		sendp(tmp, b);
 	}
 	chanfree(unacked);
@@ -406,14 +434,14 @@
 static void
 showmsg(int level, char *s, Buf *b)
 {
+	int n;
+
 	if (b == nil) {
 		dmessage(level, "%s; b == nil\n", s);
 		return;
 	}
-
-	dmessage(level, 
-			"%s;  (len %d) %X %X %X %X %X %X %X %X %X (%p)\n", s, 
-		  	b->hdr.nb, 
+	n = GBIT32(b->hdr.nb);
+	dmessage(level, "%s;  (len %d) %X %X %X %X %X %X %X %X %X (%p)\n", s, n, 
 		  	b->buf[0], b->buf[1], b->buf[2],
 		  	b->buf[3], b->buf[4], b->buf[5],
 		  	b->buf[6], b->buf[7], b->buf[8], b);
@@ -483,16 +511,16 @@
 			serv = strchr(buf, '!');
 			if(serv){
 				*serv++ = 0;
-				serv = strdup(serv);
+				serv = estrdup(serv);
 			}
-			sys = strdup(buf);
+			sys = estrdup(buf);
 		}
 		close(fd);
 	}
 	if(serv == 0)
-		serv = strdup("unknown");
+		serv = estrdup("unknown");
 	if(sys == 0)
-		sys = strdup("unknown");
+		sys = estrdup("unknown");
 	*servp = serv;
 	*sysp = sys;
 }
@@ -502,7 +530,7 @@
 {
 	Endpoints *ep;
 
-	ep = malloc(sizeof(*ep));
+	ep = emalloc(sizeof(*ep));
 	getendpoint(dir, "local", &ep->lsys, &ep->lserv);
 	getendpoint(dir, "remote", &ep->rsys, &ep->rserv);
 	return ep;
@@ -517,4 +545,3 @@
 	free(ep->rserv);
 	free(ep);
 }
-