ref: 827020f6863490c649470087008ab5ca8138afc0
dir: /sys/src/cmd/venti/srv/lumpqueue.c/
#include "stdinc.h" #include "dat.h" #include "fns.h" typedef struct LumpQueue LumpQueue; typedef struct WLump WLump; enum { MaxLumpQ = 1 << 3 /* max. lumps on a single write queue, must be pow 2 */ }; struct WLump { Lump *u; Packet *p; int creator; int gen; uint ms; }; struct LumpQueue { QLock lock; Rendez flush; Rendez full; Rendez empty; WLump q[MaxLumpQ]; int w; int r; }; static LumpQueue *lumpqs; static int nqs; static QLock glk; static int gen; static void queueproc(void *vq); int initlumpqueues(int nq) { LumpQueue *q; int i; nqs = nq; lumpqs = MKNZ(LumpQueue, nq); for(i = 0; i < nq; i++){ q = &lumpqs[i]; q->full.l = &q->lock; q->empty.l = &q->lock; q->flush.l = &q->lock; if(vtproc(queueproc, q) < 0){ seterr(EOk, "can't start write queue slave: %r"); return -1; } } return 0; } /* * queue a lump & it's packet data for writing */ int queuewrite(Lump *u, Packet *p, int creator, uint ms) { LumpQueue *q; int i; trace(TraceProc, "queuewrite"); i = indexsect(mainindex, u->score); if(i < 0 || i >= nqs){ seterr(EBug, "internal error: illegal index section in queuewrite"); return -1; } q = &lumpqs[i]; qlock(&q->lock); while(q->r == ((q->w + 1) & (MaxLumpQ - 1))){ trace(TraceProc, "queuewrite sleep"); rsleep(&q->full); } q->q[q->w].u = u; q->q[q->w].p = p; q->q[q->w].creator = creator; q->q[q->w].ms = ms; q->q[q->w].gen = gen; q->w = (q->w + 1) & (MaxLumpQ - 1); trace(TraceProc, "queuewrite wakeup"); rwakeup(&q->empty); qunlock(&q->lock); return 0; } void flushqueue(void) { int i; LumpQueue *q; if(!lumpqs) return; trace(TraceProc, "flushqueue"); qlock(&glk); gen++; qunlock(&glk); for(i=0; i<mainindex->nsects; i++){ q = &lumpqs[i]; qlock(&q->lock); while(q->w != q->r && gen - q->q[q->r].gen > 0){ trace(TraceProc, "flushqueue sleep q%d", i); rsleep(&q->flush); } qunlock(&q->lock); } } static void queueproc(void *vq) { LumpQueue *q; Lump *u; Packet *p; int creator; uint ms; threadsetname("queueproc"); q = vq; for(;;){ qlock(&q->lock); while(q->w == q->r){ trace(TraceProc, "queueproc sleep empty"); rsleep(&q->empty); } u = q->q[q->r].u; p = q->q[q->r].p; creator = q->q[q->r].creator; ms = q->q[q->r].ms; q->r = (q->r + 1) & (MaxLumpQ - 1); trace(TraceProc, "queueproc wakeup flush"); rwakeupall(&q->flush); trace(TraceProc, "queueproc wakeup full"); rwakeup(&q->full); qunlock(&q->lock); trace(TraceProc, "queueproc writelump %V", u->score); if(writeqlump(u, p, creator, ms) < 0) fprint(2, "failed to write lump for %V: %r", u->score); trace(TraceProc, "queueproc wrotelump %V", u->score); putlump(u); } }