ref: e61ca3be6f93135afc575df89d146e946e2b6ec4
parent: 0646450b2051b5b34f80cdf64810474b256248be
author: cinap_lenrek <[email protected]>
date: Thu Feb 2 15:49:05 EST 2017
aan: fix ack logic, handle connection shutdown
--- a/aan.c
+++ b/aan.c
@@ -138,8 +138,8 @@
aanreader(void *arg)
{
Client *c = (Client*)arg;
- Buf *b, *x, **l;
- long a, m;
+ long a, m, lastacked = 0;
+ Buf *b, *x;
int n;
Restart:
@@ -150,34 +150,42 @@
a = GBIT32(b->hdr.acked);
m = GBIT32(b->hdr.msg);
n = GBIT32(b->hdr.nb);
- if(n > Bufsize)
- break;
+ if(n == 0){
+ if(m < 0)
+ continue;
+ goto Closed;
+ } else if(n < 0 || n > Bufsize)
+ goto Closed;
- qlock(&c->lk);
- l = &c->unackedhead;
- for(x = c->unackedhead; x != nil; x = *l){
- if(a >= GBIT32(x->hdr.msg)){
- if((*l = x->next) == nil)
- c->unackedtail = l;
- free(x);
- } else {
- l = &x->next;
- }
- }
- qunlock(&c->lk);
-
if(readn(c->netfd, b->buf, n) != n)
break;
if(m < c->inmsg)
continue;
c->inmsg++;
+
+ if(lastacked != a){
+ qlock(&c->lk);
+ while((x = c->unackedhead) != nil){
+ assert(GBIT32(x->hdr.msg) == lastacked);
+ c->unackedhead = x->next;
+ free(x);
+ if(++lastacked == a)
+ break;
+ }
+ qunlock(&c->lk);
+ }
+
if(c->pipefd < 0)
- return;
+ goto Closed;
write(c->pipefd, b->buf, n);
}
free(b);
reconnect(c);
goto Restart;
+Closed:
+ free(b);
+ if(c->pipefd >= 0)
+ write(c->pipefd, "", 0);
}
int