shithub: riscv

ref: 7134d62ccbb32c783de67d26907e4e856c48f29c
dir: /sys/src/libventi/send.c/

View raw version
#include <u.h>
#include <libc.h>
#include <venti.h>
#include "queue.h"

long ventisendbytes, ventisendpackets;
long ventirecvbytes, ventirecvpackets;

static int
_vtsend(VtConn *z, Packet *p)
{
	IOchunk ioc;
	int n, tot;
	uchar buf[2];

	if(z->state != VtStateConnected) {
		werrstr("session not connected");
		return -1;
	}

	/* add framing */
	n = packetsize(p);
	if(n >= (1<<16)) {
		werrstr("packet too large");
		packetfree(p);
		return -1;
	}
	buf[0] = n>>8;
	buf[1] = n;
	packetprefix(p, buf, 2);
	ventisendbytes += n+2;
	ventisendpackets++;

	tot = 0;
	for(;;){
		n = packetfragments(p, &ioc, 1, 0);
		if(n == 0)
			break;
		if(write(z->outfd, ioc.addr, ioc.len) < ioc.len){
			vtlog(VtServerLog, "<font size=-1>%T %s:</font> sending packet %p: %r<br>\n", z->addr, p);
			packetfree(p);
			return -1;
		}
		packetconsume(p, nil, ioc.len);
		tot += ioc.len;
	}
	vtlog(VtServerLog, "<font size=-1>%T %s:</font> sent packet %p (%d bytes)<br>\n", z->addr, p, tot);
	packetfree(p);
	return 1;
}

static int
interrupted(void)
{
	char e[ERRMAX];

	rerrstr(e, sizeof e);
	return strstr(e, "interrupted") != nil;
}


static Packet*
_vtrecv(VtConn *z)
{
	uchar buf[10], *b;
	int n;
	Packet *p;
	int size, len;

	if(z->state != VtStateConnected) {
		werrstr("session not connected");
		return nil;
	}

	p = z->part;
	/* get enough for head size */
	size = packetsize(p);
	while(size < 2) {
		b = packettrailer(p, 2);
		assert(b != nil);
		if(0) fprint(2, "%d read hdr\n", getpid());
		n = read(z->infd, b, 2);
		if(0) fprint(2, "%d got %d (%r)\n", getpid(), n);
		if(n==0 || (n<0 && !interrupted()))
			goto Err;
		size += n;
		packettrim(p, 0, size);
	}

	if(packetconsume(p, buf, 2) < 0)
		goto Err;
	len = (buf[0] << 8) | buf[1];
	size -= 2;

	while(size < len) {
		n = len - size;
		if(n > MaxFragSize)
			n = MaxFragSize;
		b = packettrailer(p, n);
		if(0) fprint(2, "%d read body %d\n", getpid(), n);
		n = read(z->infd, b, n);
		if(0) fprint(2, "%d got %d (%r)\n", getpid(), n);
		if(n > 0)
			size += n;
		packettrim(p, 0, size);
		if(n==0 || (n<0 && !interrupted()))
			goto Err;
	}
	ventirecvbytes += len;
	ventirecvpackets++;
	p = packetsplit(p, len);
	vtlog(VtServerLog, "<font size=-1>%T %s:</font> read packet %p len %d<br>\n", z->addr, p, len);
	return p;
Err:	
	vtlog(VtServerLog, "<font size=-1>%T %s:</font> error reading packet: %r<br>\n", z->addr);
	return nil;	
}

/*
 * If you fork off two procs running vtrecvproc and vtsendproc,
 * then vtrecv/vtsend (and thus vtrpc) will never block except on 
 * rendevouses, which is nice when it's running in one thread of many.
 */
void
vtrecvproc(void *v)
{
	Packet *p;
	VtConn *z;
	Queue *q;

	z = v;
	q = _vtqalloc();

	qlock(&z->lk);
	z->readq = q;
	qlock(&z->inlk);
	rwakeup(&z->rpcfork);
	qunlock(&z->lk);

	while((p = _vtrecv(z)) != nil)
		if(_vtqsend(q, p) < 0){
			packetfree(p);
			break;
		}
	qunlock(&z->inlk);
	qlock(&z->lk);
	_vtqhangup(q);
	while((p = _vtnbqrecv(q)) != nil)
		packetfree(p);
	_vtqdecref(q);
	z->readq = nil;
	rwakeup(&z->rpcfork);
	qunlock(&z->lk);
	vthangup(z);
}

void
vtsendproc(void *v)
{
	Queue *q;
	Packet *p;
	VtConn *z;

	z = v;
	q = _vtqalloc();

	qlock(&z->lk);
	z->writeq = q;
	qlock(&z->outlk);
	rwakeup(&z->rpcfork);
	qunlock(&z->lk);

	while((p = _vtqrecv(q)) != nil)
		if(_vtsend(z, p) < 0)
			break;
	qunlock(&z->outlk);
	qlock(&z->lk);
	_vtqhangup(q);
	while((p = _vtnbqrecv(q)) != nil)
		packetfree(p);
	_vtqdecref(q);
	z->writeq = nil;
	rwakeup(&z->rpcfork);
	qunlock(&z->lk);
	return;
}

Packet*
vtrecv(VtConn *z)
{
	Packet *p;
	Queue *q;

	qlock(&z->lk);
	if(z->state != VtStateConnected){
		werrstr("not connected");
		qunlock(&z->lk);
		return nil;
	}
	if(z->readq){
		q = _vtqincref(z->readq);
		qunlock(&z->lk);
		p = _vtqrecv(q);
		_vtqdecref(q);
		return p;
	}

	qlock(&z->inlk);
	qunlock(&z->lk);
	p = _vtrecv(z);
	qunlock(&z->inlk);
	if(!p)
		vthangup(z);
	return p;
}

int
vtsend(VtConn *z, Packet *p)
{
	Queue *q;

	qlock(&z->lk);
	if(z->state != VtStateConnected){
		packetfree(p);
		werrstr("not connected");
		qunlock(&z->lk);
		return -1;
	}
	if(z->writeq){
		q = _vtqincref(z->writeq);
		qunlock(&z->lk);
		if(_vtqsend(q, p) < 0){
			_vtqdecref(q);
			packetfree(p);
			return -1;
		}
		_vtqdecref(q);
		return 0;
	}

	qlock(&z->outlk);
	qunlock(&z->lk);
	if(_vtsend(z, p) < 0){
		qunlock(&z->outlk);
		vthangup(z);
		return -1;	
	}
	qunlock(&z->outlk);
	return 0;
}