shithub: riscv

ref: cb9e638c004ea7de8404a6746d12a42041ee9f74
dir: /sys/src/lib9p/queue.c/

View raw version
#include <u.h>
#include <libc.h>
#include <thread.h>
#include <fcall.h>
#include <9p.h>

static int
_reqqueuenote(void *uregs, char *note)
{
	Reqqueue *q;

	if(strcmp(note, "flush") != 0)
		return 0;
	q = *threaddata();
	if(q != nil){
		q->cur = nil;
		notejmp(uregs, q->flush, 1);
	}
	return 1;
}

static void
_reqqueueproc(void *v)
{
	Reqqueue *q;
	Req *r;
	void (*f)(Req *);
	
	q = v;
	*threaddata() = q;
	rfork(RFNOTEG);
	threadnotify(_reqqueuenote, 1);
	for(;;){
		qlock(q);
		q->cur = nil;
		while(q->next == q)
			rsleep(q);
		r = (Req*)(((char*)q->next) - ((char*)&((Req*)0)->qu));
		r->qu.next->prev = r->qu.prev;
		r->qu.prev->next = r->qu.next;
		f = r->qu.f;
		qlock(&r->lk);
		memset(&r->qu, 0, sizeof(r->qu));
		qunlock(&r->lk);
		q->cur = r;
		if(setjmp(q->flush)){
			respond(r, "interrupted");
			continue;
		}
		qunlock(q);
		f(r);
	}
}

Reqqueue *
reqqueuecreate(void)
{
	Reqqueue *q;

	q = emalloc9p(sizeof(*q));
	memset(q, 0, sizeof(*q));
	q->l = q;
	q->next = q->prev = q;
	q->pid = threadpid(proccreate(_reqqueueproc, q, mainstacksize));
	print("%d\n", q->pid);
	return q;
}

void
reqqueuepush(Reqqueue *q, Req *r, void (*f)(Req *))
{
	qlock(q);
	r->qu.f = f;
	r->qu.next = q;
	r->qu.prev = q->prev;
	q->prev->next = &r->qu;
	q->prev = &r->qu;
	rwakeupall(q);
	qunlock(q);
}

void
reqqueueflush(Reqqueue *q, Req *r)
{
	qlock(q);
	if(q->cur == r){
		postnote(PNPROC, q->pid, "flush");
		qunlock(q);
	}else{
		if(r->qu.next != nil){
			r->qu.next->prev = r->qu.prev;
			r->qu.prev->next = r->qu.next;
		}
		qlock(&r->lk);
		memset(&r->qu, 0, sizeof(r->qu));
		qunlock(&r->lk);
		qunlock(q);
		respond(r, "interrupted");
	}
}

int
reqqueueflushed(void)
{
	Reqqueue *q;
	
	q = *threaddata();
	qlock(q);
	if(setjmp(q->flush))
		return 1;
	qunlock(q);
	return 0;
}