shithub: drawterm

Download patch

ref: 401b33384215ea3f7e6a89e61f75e8acb35a3cc4
parent: edcf7d821652f17d23641dc193f769eb1e02be77
author: cinap_lenrek <[email protected]>
date: Sun Mar 31 12:11:31 EDT 2024

qio: sync with 9front

--- a/kern/fns.h
+++ b/kern/fns.h
@@ -179,7 +179,6 @@
 void		qreopen(Queue*);
 void		qsetlimit(Queue*, int);
 void		qunlock(QLock*);
-int		qwindow(Queue*);
 int		qwrite(Queue*, void*, int);
 void		qnoblock(Queue*, int);
 void		randominit(void);
--- a/kern/qio.c
+++ b/kern/qio.c
@@ -4,13 +4,6 @@
 #include	"fns.h"
 #include	"error.h"
 
-static ulong padblockcnt;
-static ulong concatblockcnt;
-static ulong pullupblockcnt;
-static ulong copyblockcnt;
-static ulong consumecnt;
-static ulong producecnt;
-
 #define QDEBUG	if(0)
 
 /*
@@ -17,28 +10,28 @@
  *  IO queues
  */
 typedef struct Queue	Queue;
-
 struct Queue
 {
 	Lock	lk;
 
+	int	state;
+	int	dlen;		/* data length in bytes */
+	uint	rp, wp;		/* read/write position (counting BALLOC() bytes) */
+	int	limit;		/* max BALLOC() bytes in queue */
+	int	inilim;		/* initial limit */
+	uchar	noblock;	/* true if writes return immediately when q full */
+	uchar	eof;		/* number of eofs read by user */
+
 	Block*	bfirst;		/* buffer */
 	Block*	blast;
 
-	int	len;		/* bytes allocated to queue */
-	int	dlen;		/* data bytes in queue */
-	int	limit;		/* max bytes in queue */
-	int	inilim;		/* initial limit */
-	int	state;
-	int	noblock;	/* true if writes return immediately when q full */
-	int	eof;		/* number of eofs read by user */
-
+	void*	arg;		/* argument to kick and bypass */
 	void	(*kick)(void*);	/* restart output */
 	void	(*bypass)(void*, Block*);	/* bypass queue altogether */
-	void*	arg;		/* argument to kick */
 
 	QLock	rlock;		/* mutex for reading processes */
 	Rendez	rr;		/* process waiting to read */
+
 	QLock	wlock;		/* mutex for writing processes */
 	Rendez	wr;		/* process waiting to write */
 
@@ -68,44 +61,6 @@
 }
 
 /*
- *  pad a block to the front (or the back if size is negative)
- */
-Block*
-padblock(Block *bp, int size)
-{
-	int n;
-	Block *nbp;
-
-	QDEBUG checkb(bp, "padblock 0");
-	if(size >= 0){
-		if(bp->rp - bp->base >= size){
-			bp->rp -= size;
-			return bp;
-		}
-		n = BLEN(bp);
-		nbp = allocb(size+n);
-		nbp->rp += size;
-		nbp->wp = nbp->rp;
-		memmove(nbp->wp, bp->rp, n);
-		nbp->wp += n;
-		nbp->rp -= size;
-	} else {
-		size = -size;
-		if(bp->lim - bp->wp >= size)
-			return bp;
-		n = BLEN(bp);
-		nbp = allocb(n+size);
-		memmove(nbp->wp, bp->rp, n);
-		nbp->wp += n;
-	}
-	nbp->next = bp->next;
-	freeb(bp);
-	padblockcnt++;
-	QDEBUG checkb(nbp, "padblock 1");
-	return nbp;
-}
-
-/*
  *  return count of bytes in a string of blocks
  */
 int
@@ -122,19 +77,33 @@
 }
 
 /*
- * return count of space in blocks
+ *  copy the contents of a string of blocks into
+ *  memory from an offset. blocklist kept unchanged.
+ *  return number of copied bytes.
  */
-int
-blockalloclen(Block *bp)
+long
+readblist(Block *b, uchar *p, long n, ulong o)
 {
-	int len;
+	ulong m, r;
 
-	len = 0;
-	while(bp != nil) {
-		len += BALLOC(bp);
-		bp = bp->next;
+	r = 0;
+	while(n > 0 && b != nil){
+		m = BLEN(b);
+		if(o >= m)
+			o -= m;
+		else {
+			m -= o;
+			if(n < m)
+				m = n;
+			memmove(p, b->rp + o, m);
+			p += m;
+			r += m;
+			n -= m;
+			o = 0;
+		}
+		b = b->next;
 	}
-	return len;
+	return r;
 }
 
 /*
@@ -149,7 +118,6 @@
 	if(bp->next == nil)
 		return bp;
 	len = blocklen(bp);
-	concatblockcnt += len;
 	return pullupblock(bp, len);
 }
 
@@ -162,6 +130,8 @@
 	Block *nbp;
 	int i;
 
+	assert(n >= 0);
+
 	/*
 	 *  this should almost always be true, it's
 	 *  just to avoid every caller checking.
@@ -184,7 +154,6 @@
 	 */
 	n -= BLEN(bp);
 	while((nbp = bp->next) != nil){
-		pullupblockcnt++;
 		i = BLEN(nbp);
 		if(i > n) {
 			memmove(bp->wp, nbp->rp, n);
@@ -223,6 +192,8 @@
 {
 	Block *b;
 
+	assert(n >= 0);
+
 	if(BLEN(q->bfirst) >= n)
 		return q->bfirst;
 	q->bfirst = pullupblock(q->bfirst, n);
@@ -241,8 +212,14 @@
 	ulong l;
 	Block *nb, *startb;
 
+	assert(len >= 0);
+	assert(offset >= 0);
+
 	QDEBUG checkb(bp, "trimblock 1");
-	if(blocklen(bp) < offset+len) {
+	l = blocklen(bp);
+	if(offset == 0 && len == l)
+		return bp;
+	if(l < offset+len) {
 		freeblist(bp);
 		return nil;
 	}
@@ -274,6 +251,43 @@
 }
 
 /*
+ *  pad a block to the front (or the back if size is negative)
+ */
+Block*
+padblock(Block *bp, int size)
+{
+	int n;
+	Block *nbp;
+
+	QDEBUG checkb(bp, "padblock 0");
+	if(size >= 0){
+		if(bp->rp - bp->base >= size){
+			bp->rp -= size;
+			return bp;
+		}
+		n = BLEN(bp);
+		nbp = allocb(size+n);
+		nbp->rp += size;
+		nbp->wp = nbp->rp;
+		memmove(nbp->wp, bp->rp, n);
+		nbp->wp += n;
+		nbp->rp -= size;
+	} else {
+		size = -size;
+		if(bp->lim - bp->wp >= size)
+			return bp;
+		n = BLEN(bp);
+		nbp = allocb(n+size);
+		memmove(nbp->wp, bp->rp, n);
+		nbp->wp += n;
+	}
+	nbp->next = bp->next;
+	freeb(bp);
+	QDEBUG checkb(nbp, "padblock 1");
+	return nbp;
+}
+
+/*
  *  copy 'count' bytes into a new block
  */
 Block*
@@ -282,6 +296,8 @@
 	int l;
 	Block *nbp;
 
+	assert(count >= 0);
+
 	QDEBUG checkb(bp, "copyblock 0");
 	nbp = allocb(count);
 	for(; count > 0 && bp != nil; bp = bp->next){
@@ -296,7 +312,6 @@
 		memset(nbp->wp, 0, count);
 		nbp->wp += count;
 	}
-	copyblockcnt++;
 	QDEBUG checkb(nbp, "copyblock 1");
 
 	return nbp;
@@ -330,7 +345,30 @@
 	return bp;
 }
 
+/*
+ *  if the allocated space is way out of line with the used
+ *  space, reallocate to a smaller block
+ */
+Block*
+packblock(Block *bp)
+{
+	Block **l, *nbp;
+	int n;
 
+	for(l = &bp; (nbp = *l) != nil; l = &(*l)->next){
+		n = BLEN(nbp);
+		if((n<<2) < BALLOC(nbp)){
+			*l = allocb(n);
+			memmove((*l)->wp, nbp->rp, n);
+			(*l)->wp += n;
+			(*l)->next = nbp->next;
+			freeb(nbp);
+		}
+	}
+
+	return bp;
+}
+
 /*
  *  throw away up to count bytes from a
  *  list of blocks.  Return count of bytes
@@ -346,8 +384,8 @@
 	if(bph == nil)
 		return 0;
 
-	while(*bph != nil && count != 0) {
-		bp = *bph;
+	while((bp = *bph) != nil && count > 0) {
+		QDEBUG checkb(bp, "pullblock ");
 		n = BLEN(bp);
 		if(count < n)
 			n = count;
@@ -354,7 +392,6 @@
 		bytes += n;
 		count -= n;
 		bp->rp += n;
-		QDEBUG checkb(bp, "pullblock ");
 		if(BLEN(bp) == 0) {
 			*bph = bp->next;
 			bp->next = nil;
@@ -365,103 +402,145 @@
 }
 
 /*
- *  get next block from a queue, return null if nothing there
+ *  remove a block from the front of the queue
  */
 Block*
-qget(Queue *q)
+qremove(Queue *q)
 {
-	int dowakeup;
 	Block *b;
 
-	/* sync with qwrite */
-	ilock(&q->lk);
-
 	b = q->bfirst;
-	if(b == nil){
-		q->state |= Qstarve;
-		iunlock(&q->lk);
+	if(b == nil)
 		return nil;
-	}
-	QDEBUG checkb(b, "qget");
+	QDEBUG checkb(b, "qremove");
 	q->bfirst = b->next;
 	b->next = nil;
-	q->len -= BALLOC(b);
 	q->dlen -= BLEN(b);
+	q->rp += BALLOC(b);
+	return b;
+}
 
-	/* if writer flow controlled, restart */
-	if((q->state & Qflow) && q->len < q->limit/2){
-		q->state &= ~Qflow;
-		dowakeup = 1;
-	} else
-		dowakeup = 0;
+/*
+ *  put a block back to the front of the queue
+ */
+void
+qputback(Queue *q, Block *b)
+{
+	QDEBUG checkb(b, "qputback");
+	b->next = q->bfirst;
+	if(q->bfirst == nil)
+		q->blast = b;
+	q->bfirst = b;
+	q->dlen += BLEN(b);
+	q->rp -= BALLOC(b);
+}
 
+/*
+ *  after removing data from the queue,
+ *  unlock queue and wakeup blocked writer.
+ *  called at interrupt level.
+ */
+static int
+iunlock_consumer(Queue *q)
+{
+	int s = q->state;
+
+	/* stop flow control when back at or below the limit */
+	if((int)(q->wp - q->rp) <= q->limit)
+		q->state = s & ~Qflow;
+
 	iunlock(&q->lk);
 
-	if(dowakeup)
+	if(s & Qflow){
+		/*
+		 * wakeup flow controlled writers.
+		 * note that this is done even when q->state
+		 * still has Qflow set, as the unblocking
+		 * condition depends on the writers local queuing
+		 * position, not on the global queue length.
+		 */
 		wakeup(&q->wr);
-
-	return b;
+	}
+	return s;
 }
 
 /*
- *  throw away the next 'len' bytes in the queue
+ *  after removing data from the queue,
+ *  unlock queue and wakeup blocked writer.
+ *  get output going again when it was blocked.
+ *  called at process level.
  */
-int
-qdiscard(Queue *q, int len)
+static int
+iunlock_reader(Queue *q)
 {
-	Block *b, *tofree = nil;
-	int dowakeup, n, sofar;
+	int s = iunlock_consumer(q);
 
-	ilock(&q->lk);
-	for(sofar = 0; sofar < len; sofar += n){
-		b = q->bfirst;
-		if(b == nil)
-			break;
-		QDEBUG checkb(b, "qdiscard");
-		n = BLEN(b);
-		if(n <= len - sofar){
-			q->bfirst = b->next;
-			q->len -= BALLOC(b);
-			q->dlen -= BLEN(b);
+	if(q->kick != nil && s & Qflow)
+		(*q->kick)(q->arg);
 
-			/* remember to free this */
-			b->next = tofree;
-			tofree = b;
-		} else {
-			n = len - sofar;
-			b->rp += n;
-			q->dlen -= n;
-		}
-	}
+	return s;
+}
 
-	/*
-	 *  if writer flow controlled, restart
-	 *
-	 *  This used to be
-	 *	q->len < q->limit/2
-	 *  but it slows down tcp too much for certain write sizes.
-	 *  I really don't understand it completely.  It may be
-	 *  due to the queue draining so fast that the transmission
-	 *  stalls waiting for the app to produce more data.  - presotto
-	 */
-	if((q->state & Qflow) && q->len < q->limit){
-		q->state &= ~Qflow;
-		dowakeup = 1;
-	} else
-		dowakeup = 0;
+/*
+ *  after inserting into queue,
+ *  unlock queue and wakeup starved reader.
+ *  called at interrupt level.
+ */
+static int
+iunlock_producer(Queue *q)
+{
+	int s = q->state;
 
+	/* start flow control when above the limit */
+	if((int)(q->wp - q->rp) > q->limit)
+		s |= Qflow;
+
+	q->state = s & ~Qstarve;
 	iunlock(&q->lk);
 
-	if(dowakeup)
-		wakeup(&q->wr);
+	if(s & Qstarve)
+		wakeup(&q->rr);
 
-	if(tofree != nil)
-		freeblist(tofree);
+	return s;
+}
 
-	return sofar;
+/*
+ *  unlock queue and wakeup starved reader.
+ *  get output going again when it was starved.
+ *  called at process level.
+ */
+static int
+iunlock_writer(Queue *q)
+{
+	int s = iunlock_producer(q);
+
+	if(q->kick != nil && s & (Qstarve|Qkick))
+		(*q->kick)(q->arg);
+
+	return s;
 }
 
 /*
+ *  get next block from a queue, return null if nothing there
+ *  called at interrupt level.
+ */
+Block*
+qget(Queue *q)
+{
+	Block *b;
+
+	ilock(&q->lk);
+	if((b = qremove(q)) == nil){
+		q->state |= Qstarve;
+		iunlock(&q->lk);
+		return nil;
+	}
+	iunlock_consumer(q);
+
+	return b;
+}
+
+/*
  *  Interrupt level copy out of a queue, return # bytes copied.
  */
 int
@@ -468,12 +547,11 @@
 qconsume(Queue *q, void *vp, int len)
 {
 	Block *b, *tofree = nil;
-	int n, dowakeup;
-	uchar *p = vp;
+	int n;
 
-	/* sync with qwrite */
-	ilock(&q->lk);
+	assert(len >= 0);
 
+	ilock(&q->lk);
 	for(;;) {
 		b = q->bfirst;
 		if(b == nil){
@@ -486,8 +564,10 @@
 		n = BLEN(b);
 		if(n > 0)
 			break;
+
+		/* get rid of zero-length blocks */
 		q->bfirst = b->next;
-		q->len -= BALLOC(b);
+		q->rp += BALLOC(b);
 
 		/* remember to free this */
 		b->next = tofree;
@@ -494,10 +574,9 @@
 		tofree = b;
 	};
 
-	consumecnt += n;
 	if(n < len)
 		len = n;
-	memmove(p, b->rp, len);
+	memmove(vp, b->rp, len);
 	b->rp += len;
 	q->dlen -= len;
 
@@ -504,7 +583,7 @@
 	/* discard the block if we're done with it */
 	if((q->state & Qmsg) || len == n){
 		q->bfirst = b->next;
-		q->len -= BALLOC(b);
+		q->rp += BALLOC(b);
 		q->dlen -= BLEN(b);
 
 		/* remember to free this */
@@ -511,23 +590,40 @@
 		b->next = tofree;
 		tofree = b;
 	}
-
 out:
-	/* if writer flow controlled, restart */
-	if((q->state & Qflow) && q->len < q->limit/2){
-		q->state &= ~Qflow;
-		dowakeup = 1;
-	} else
-		dowakeup = 0;
+	iunlock_consumer(q);
 
-	iunlock(&q->lk);
+	freeblist(tofree);
 
-	if(dowakeup)
-		wakeup(&q->wr);
+	return len;
+}
 
-	if(tofree != nil)
-		freeblist(tofree);
+/*
+ *  add a block list to a queue, return bytes added
+ */
+int
+qaddlist(Queue *q, Block *b)
+{
+	int len;
 
+	QDEBUG checkb(b, "qaddlist 1");
+
+	/* queue the block */
+	if(q->bfirst != nil)
+		q->blast->next = b;
+	else
+		q->bfirst = b;
+
+	len = BLEN(b);
+	q->wp += BALLOC(b);
+	while(b->next != nil){
+		b = b->next;
+		QDEBUG checkb(b, "qaddlist 2");
+		len += BLEN(b);
+		q->wp += BALLOC(b);
+	}
+	q->dlen += len;
+	q->blast = b;
 	return len;
 }
 
@@ -534,36 +630,22 @@
 int
 qpass(Queue *q, Block *b)
 {
-	int len, dowakeup;
+	int len;
 
-	/* sync with qread */
-	dowakeup = 0;
 	ilock(&q->lk);
-	if(q->len >= q->limit){
+	if(q->state & Qclosed){
 		iunlock(&q->lk);
 		freeblist(b);
-		return -1;
+		return 0;
 	}
-	if(q->state & Qclosed){
+	if(q->state & Qflow){
 		iunlock(&q->lk);
 		freeblist(b);
-		return 0;
+		return -1;
 	}
-
 	len = qaddlist(q, b);
+	iunlock_producer(q);
 
-	if(q->len >= q->limit/2)
-		q->state |= Qflow;
-
-	if(q->state & Qstarve){
-		q->state &= ~Qstarve;
-		dowakeup = 1;
-	}
-	iunlock(&q->lk);
-
-	if(dowakeup)
-		wakeup(&q->rr);
-
 	return len;
 }
 
@@ -570,101 +652,36 @@
 int
 qpassnolim(Queue *q, Block *b)
 {
-	int len, dowakeup;
+	int len;
 
-	/* sync with qread */
-	dowakeup = 0;
 	ilock(&q->lk);
-
 	if(q->state & Qclosed){
 		iunlock(&q->lk);
 		freeblist(b);
 		return 0;
 	}
-
 	len = qaddlist(q, b);
+	iunlock_producer(q);
 
-	if(q->len >= q->limit/2)
-		q->state |= Qflow;
-
-	if(q->state & Qstarve){
-		q->state &= ~Qstarve;
-		dowakeup = 1;
-	}
-	iunlock(&q->lk);
-
-	if(dowakeup)
-		wakeup(&q->rr);
-
 	return len;
 }
 
-/*
- *  if the allocated space is way out of line with the used
- *  space, reallocate to a smaller block
- */
-Block*
-packblock(Block *bp)
-{
-	Block **l, *nbp;
-	int n;
-
-	for(l = &bp; (nbp = *l) != nil; l = &(*l)->next){
-		n = BLEN(nbp);
-		if((n<<2) < BALLOC(nbp)){
-			*l = allocb(n);
-			memmove((*l)->wp, nbp->rp, n);
-			(*l)->wp += n;
-			(*l)->next = nbp->next;
-			freeb(nbp);
-		}
-	}
-
-	return bp;
-}
-
 int
 qproduce(Queue *q, void *vp, int len)
 {
 	Block *b;
-	int dowakeup;
-	uchar *p = vp;
 
+	assert(len >= 0);
+
 	b = iallocb(len);
 	if(b == nil)
 		return 0;
 
-	/* sync with qread */
-	dowakeup = 0;
-	ilock(&q->lk);
-
-	/* no waiting receivers, room in buffer? */
-	if(q->len >= q->limit){
-		q->state |= Qflow;
-		iunlock(&q->lk);
-		freeb(b);
-		return -1;
-	}
-	producecnt += len;
-
 	/* save in buffer */
-	memmove(b->wp, p, len);
+	memmove(b->wp, vp, len);
 	b->wp += len;
-	qaddlist(q, b);
 
-	if(q->state & Qstarve){
-		q->state &= ~Qstarve;
-		dowakeup = 1;
-	}
-
-	if(q->len >= q->limit)
-		q->state |= Qflow;
-	iunlock(&q->lk);
-
-	if(dowakeup)
-		wakeup(&q->rr);
-
-	return len;
+	return qpass(q, b);
 }
 
 /*
@@ -675,6 +692,8 @@
 {
 	Block *b;
 
+	assert(len >= 0);
+
 	b = allocb(len);
 	ilock(&q->lk);
 	b->wp += readblist(q->bfirst, b->wp, len, offset);
@@ -690,16 +709,18 @@
 {
 	Queue *q;
 
+	assert(limit >= 0);
+
 	q = malloc(sizeof(Queue));
 	if(q == nil)
 		return nil;
 
+	q->dlen = 0;
+	q->wp = q->rp = 0;
 	q->limit = q->inilim = limit;
 	q->kick = kick;
 	q->arg = arg;
-	q->state = msg;
-	
-	q->state |= Qstarve;
+	q->state = msg | Qstarve;
 	q->eof = 0;
 	q->noblock = 0;
 
@@ -716,10 +737,14 @@
 	if(q == nil)
 		return nil;
 
+	q->dlen = 0;
+	q->wp = q->rp = 0;
 	q->limit = 0;
 	q->arg = arg;
 	q->bypass = bypass;
 	q->state = 0;
+	q->eof = 0;
+	q->noblock = 0;
 
 	return q;
 }
@@ -729,7 +754,7 @@
 {
 	Queue *q = a;
 
-	return (q->state & Qclosed) || q->bfirst != nil;
+	return q->bfirst != nil || (q->state & Qclosed);
 }
 
 /*
@@ -745,10 +770,9 @@
 			break;
 
 		if(q->state & Qclosed){
-			if(++q->eof > 3)
+			if(q->eof >= 3 || (*q->err && strcmp(q->err, Ehungup) != 0))
 				return -1;
-			if(*q->err && strcmp(q->err, Ehungup) != 0)
-				return -1;
+			q->eof++;
 			return 0;
 		}
 
@@ -761,101 +785,6 @@
 }
 
 /*
- * add a block list to a queue, return bytes added
- */
-int
-qaddlist(Queue *q, Block *b)
-{
-	int len, dlen;
-
-	QDEBUG checkb(b, "qaddlist 1");
-
-	/* queue the block */
-	if(q->bfirst != nil)
-		q->blast->next = b;
-	else
-		q->bfirst = b;
-
-	len = BALLOC(b);
-	dlen = BLEN(b);
-	while(b->next != nil){
-		b = b->next;
-		QDEBUG checkb(b, "qaddlist 2");
-
-		len += BALLOC(b);
-		dlen += BLEN(b);
-	}
-	q->blast = b;
-	q->len += len;
-	q->dlen += dlen;
-	return dlen;
-}
-
-/*
- *  called with q ilocked
- */
-Block*
-qremove(Queue *q)
-{
-	Block *b;
-
-	b = q->bfirst;
-	if(b == nil)
-		return nil;
-	QDEBUG checkb(b, "qremove");
-	q->bfirst = b->next;
-	b->next = nil;
-	q->dlen -= BLEN(b);
-	q->len -= BALLOC(b);
-	return b;
-}
-
-/*
- *  copy the contents of a string of blocks into
- *  memory from an offset. blocklist kept unchanged.
- *  return number of copied bytes.
- */
-long
-readblist(Block *b, uchar *p, long n, ulong o)
-{
-	ulong m, r;
-
-	r = 0;
-	while(n > 0 && b != nil){
-		m = BLEN(b);
-		if(o >= m)
-			o -= m;
-		else {
-			m -= o;
-			if(n < m)
-				m = n;
-			memmove(p, b->rp + o, m);
-			p += m;
-			r += m;
-			n -= m;
-			o = 0;
-		}
-		b = b->next;
-	}
-	return r;
-}
-
-/*
- *  put a block back to the front of the queue
- *  called with q ilocked
- */
-void
-qputback(Queue *q, Block *b)
-{
-	b->next = q->bfirst;
-	if(q->bfirst == nil)
-		q->blast = b;
-	q->bfirst = b;
-	q->len += BALLOC(b);
-	q->dlen += BLEN(b);
-}
-
-/*
  *  cut off n bytes from the end of *h. return a new
  *  block with the tail and change *h to refer to the
  *  head.
@@ -885,31 +814,6 @@
 }
 
 /*
- *  flow control, get producer going again
- *  called with q ilocked
- */
-static void
-qwakeup_iunlock(Queue *q)
-{
-	int dowakeup = 0;
-
-	/* if writer flow controlled, restart */
-	if((q->state & Qflow) && q->len < q->limit/2){
-		q->state &= ~Qflow;
-		dowakeup = 1;
-	}
-
-	iunlock(&q->lk);
-
-	/* wakeup flow controlled writers */
-	if(dowakeup){
-		if(q->kick != nil)
-			q->kick(q->arg);
-		wakeup(&q->wr);
-	}
-}
-
-/*
  *  get next block from a queue (up to a limit)
  */
 Block*
@@ -918,6 +822,8 @@
 	Block *b;
 	int n;
 
+	assert(len >= 0);
+
 	qlock(&q->rlock);
 	if(waserror()){
 		qunlock(&q->rlock);
@@ -950,10 +856,8 @@
 		else
 			b->wp -= n;
 	}
+	iunlock_reader(q);
 
-	/* restart producer */
-	qwakeup_iunlock(q);
-
 	qunlock(&q->rlock);
 	poperror();
 
@@ -970,6 +874,8 @@
 	Block *b, *first, **last;
 	int m, n;
 
+	assert(len >= 0);
+
 	qlock(&q->rlock);
 	if(waserror()){
 		qunlock(&q->rlock);
@@ -1001,8 +907,8 @@
 			freeb(qremove(q));
 			goto again;
 		}
-
-		/*  grab the first block plus as many
+		/*
+		 *  grab the first block plus as many
 		 *  following blocks as will partially
 		 *  fit in the read.
 		 */
@@ -1025,8 +931,7 @@
 	if(n > len && (q->state & Qmsg) == 0)
 		qputback(q, splitblock(last, n - len));
 
-	/* restart producer */
-	qwakeup_iunlock(q);
+	iunlock_reader(q);
 
 	qunlock(&q->rlock);
 	poperror();
@@ -1042,34 +947,39 @@
 	return n;
 }
 
+/*
+ *  a Flow represens a flow controlled
+ *  writer on queue q with position p.
+ */
+typedef struct {
+	Queue*	q;
+	uint	p;
+} Flow;
+
 static int
-qnotfull(void *a)
+unblocked(void *a)
 {
-	Queue *q = a;
+	Flow *f = a;
+	Queue *q = f->q;
 
-	return q->len < q->limit || (q->state & Qclosed);
+	return q->noblock || (int)(f->p - q->rp) <= q->limit || (q->state & Qclosed);
 }
 
 /*
- *  flow control, wait for queue to get below the limit
+ *  flow control, wait for queue to drain back to the limit
  */
 static void
-qflow(Queue *q)
+qflow(Flow *f)
 {
-	for(;;){
-		if(q->noblock || qnotfull(q))
-			break;
+	Queue *q = f->q;
 
-		ilock(&q->lk);
-		q->state |= Qflow;
-		iunlock(&q->lk);
-
+	while(!unblocked(f)){
 		qlock(&q->wlock);
 		if(waserror()){
 			qunlock(&q->wlock);
 			nexterror();
 		}
-		sleep(&q->wr, qnotfull, q);
+		sleep(&q->wr, unblocked, f);
 		qunlock(&q->wlock);
 		poperror();
 	}
@@ -1081,7 +991,8 @@
 long
 qbwrite(Queue *q, Block *b)
 {
-	int len, dowakeup;
+	Flow flow;
+	int len;
 
 	if(q->bypass != nil){
 		len = blocklen(b);
@@ -1089,7 +1000,6 @@
 		return len;
 	}
 
-	dowakeup = 0;
 	if(waserror()){
 		freeblist(b);
 		nexterror();
@@ -1101,9 +1011,11 @@
 		iunlock(&q->lk);
 		error(q->err);
 	}
-
-	/* don't queue over the limit */
-	if(q->len >= q->limit && q->noblock){
+	/*
+	 * if the queue is full,
+	 * silently discard when non-blocking
+	 */
+	if(q->state & Qflow && q->noblock){
 		iunlock(&q->lk);
 		poperror();
 		len = blocklen(b);
@@ -1110,33 +1022,25 @@
 		freeblist(b);
 		return len;
 	}
-
 	len = qaddlist(q, b);
-
-	/* make sure other end gets awakened */
-	if(q->state & Qstarve){
-		q->state &= ~Qstarve;
-		dowakeup = 1;
-	}
-	iunlock(&q->lk);
 	poperror();
 
-	/*  get output going again */
-	if(q->kick != nil && (dowakeup || (q->state&Qkick)))
-		q->kick(q->arg);
-
-	/* wakeup anyone consuming at the other end */
-	if(dowakeup)
-		wakeup(&q->rr);
-
 	/*
-	 *  flow control, before allowing the process to continue and
-	 *  queue more. We do this here so that postnote can only
-	 *  interrupt us after the data has been queued.  This means that
-	 *  things like 9p flushes and ssl messages will not be disrupted
-	 *  by software interrupts.
+	 * save our current position in queue
+	 * for flow control below.
 	 */
-	qflow(q);
+	flow.q = q;
+	flow.p = q->wp;
+	if(iunlock_writer(q) & Qflow){
+		/*
+		 *  flow control, before allowing the process to continue and
+		 *  queue more. We do this here so that postnote can only
+		 *  interrupt us after the data has been queued.  This means that
+		 *  things like 9p flushes and ssl messages will not be disrupted
+		 *  by software interrupts.
+		 */
+		qflow(&flow);
+	}
 
 	return len;
 }
@@ -1151,17 +1055,11 @@
 	Block *b;
 	uchar *p = vp;
 
+	assert(len >= 0);
+
 	QDEBUG if(!islo())
 		print("qwrite hi %#p\n", getcallerpc(&q));
 
-	/* stop queue bloat before allocating blocks */
-	if(q->len/2 >= q->limit && q->noblock == 0 && q->bypass == nil){
-		while(waserror())
-			;
-		qflow(q);
-		poperror();
-	}
-
 	sofar = 0;
 	do {
 		n = len-sofar;
@@ -1190,11 +1088,11 @@
 int
 qiwrite(Queue *q, void *vp, int len)
 {
-	int n, sofar, dowakeup;
+	int n, sofar;
 	Block *b;
 	uchar *p = vp;
 
-	dowakeup = 0;
+	assert(len >= 0);
 
 	sofar = 0;
 	do {
@@ -1209,46 +1107,72 @@
 		b->wp += n;
 
 		ilock(&q->lk);
-
-		/* we use an artificially high limit for kernel prints since anything
-		 * over the limit gets dropped
-		 */
-		if((q->state & Qclosed) != 0 || q->len/2 >= q->limit){
+		if(q->state & (Qflow|Qclosed)){
 			iunlock(&q->lk);
 			freeb(b);
 			break;
 		}
+		sofar += qaddlist(q, b);
+		iunlock_writer(q);
+	} while(sofar < len && (q->state & Qmsg) == 0);
 
-		qaddlist(q, b);
+	return sofar;
+}
 
-		if(q->state & Qstarve){
-			q->state &= ~Qstarve;
-			dowakeup = 1;
-		}
+/*
+ *  throw away the next 'len' bytes in the queue
+ */
+int
+qdiscard(Queue *q, int len)
+{
+	Block *b, *tofree = nil;
+	int n, sofar;
 
-		iunlock(&q->lk);
+	assert(len >= 0);
 
-		if(dowakeup){
-			if(q->kick != nil)
-				q->kick(q->arg);
-			wakeup(&q->rr);
+	ilock(&q->lk);
+	for(sofar = 0; sofar < len; sofar += n){
+		b = q->bfirst;
+		if(b == nil)
+			break;
+		QDEBUG checkb(b, "qdiscard");
+		n = BLEN(b);
+		if(n <= len - sofar){
+			q->bfirst = b->next;
+			q->rp += BALLOC(b);
+
+			/* remember to free this */
+			b->next = tofree;
+			tofree = b;
+		} else {
+			n = len - sofar;
+			b->rp += n;
 		}
+		q->dlen -= n;
+	}
+	iunlock_consumer(q);
 
-		sofar += n;
-	} while(sofar < len && (q->state & Qmsg) == 0);
+	freeblist(tofree);
 
 	return sofar;
 }
 
 /*
- *  be extremely careful when calling this,
- *  as there is no reference accounting
+ *  flush the output queue
  */
 void
-qfree(Queue *q)
+qflush(Queue *q)
 {
-	qclose(q);
-	free(q);
+	Block *tofree;
+
+	ilock(&q->lk);
+	tofree = q->bfirst;
+	q->bfirst = nil;
+	q->rp = q->wp;
+	q->dlen = 0;
+	iunlock_consumer(q);
+
+	freeblist(tofree);
 }
 
 /*
@@ -1258,32 +1182,42 @@
 void
 qclose(Queue *q)
 {
-	Block *bfirst;
+	Block *tofree;
 
 	if(q == nil)
 		return;
 
-	/* mark it */
 	ilock(&q->lk);
 	q->state |= Qclosed;
 	q->state &= ~(Qflow|Qstarve);
 	kstrcpy(q->err, Ehungup, ERRMAX);
-	bfirst = q->bfirst;
+	tofree = q->bfirst;
 	q->bfirst = nil;
-	q->len = 0;
+	q->rp = q->wp;
 	q->dlen = 0;
 	q->noblock = 0;
 	iunlock(&q->lk);
 
-	/* free queued blocks */
-	freeblist(bfirst);
-
 	/* wake up readers/writers */
 	wakeup(&q->rr);
 	wakeup(&q->wr);
+
+	/* free queued blocks */
+	freeblist(tofree);
 }
 
 /*
+ *  be extremely careful when calling this,
+ *  as there is no reference accounting
+ */
+void
+qfree(Queue *q)
+{
+	qclose(q);
+	free(q);
+}
+
+/*
  *  Mark a queue as closed.  Wakeup any readers.  Don't remove queued
  *  blocks.
  */
@@ -1290,7 +1224,6 @@
 void
 qhangup(Queue *q, char *msg)
 {
-	/* mark it */
 	ilock(&q->lk);
 	q->state |= Qclosed;
 	if(msg == nil || *msg == '\0')
@@ -1336,26 +1269,21 @@
 }
 
 /*
- * return space remaining before flow control
+ *  return true if we can read without blocking
  */
 int
-qwindow(Queue *q)
+qcanread(Queue *q)
 {
-	int l;
-
-	l = q->limit - q->len;
-	if(l < 0)
-		l = 0;
-	return l;
+	return q->bfirst != nil;
 }
 
 /*
- *  return true if we can read without blocking
+ *  return non-zero when the queue is full
  */
 int
-qcanread(Queue *q)
+qfull(Queue *q)
 {
-	return q->bfirst != nil;
+	return q->state & Qflow;
 }
 
 /*
@@ -1364,7 +1292,11 @@
 void
 qsetlimit(Queue *q, int limit)
 {
+	assert(limit >= 0);
+
+	ilock(&q->lk);
 	q->limit = limit;
+	iunlock_consumer(q);
 }
 
 /*
@@ -1373,34 +1305,7 @@
 void
 qnoblock(Queue *q, int onoff)
 {
-	q->noblock = onoff;
-}
-
-/*
- *  flush the output queue
- */
-void
-qflush(Queue *q)
-{
-	Block *bfirst;
-
-	/* mark it */
 	ilock(&q->lk);
-	bfirst = q->bfirst;
-	q->bfirst = nil;
-	q->len = 0;
-	q->dlen = 0;
-	iunlock(&q->lk);
-
-	/* free queued blocks */
-	freeblist(bfirst);
-
-	/* wake up writers */
-	wakeup(&q->wr);
-}
-
-int
-qfull(Queue *q)
-{
-	return q->state & Qflow;
+	q->noblock = onoff;
+	iunlock_consumer(q);
 }