shithub: gefs

Download patch

ref: 9b3d57022efef51421855243ff1c33989668f48d
parent: ff676ebbf4b493e3dd9d134e24816629ae3cd1c0
author: Ori Bernstein <[email protected]>
date: Sun Apr 30 10:41:17 EDT 2023

Shard the fid lock and remove excess serialization
in the cache.

--- a/cache.c
+++ b/cache.c
@@ -81,7 +81,6 @@
 	Bucket *bkt;
 	u32int h;
 
-	qlock(&fs->lrulk);
 	assert(b->magic == Magic);
 	h = ihash(b->bp.addr);
 	bkt = &fs->bcache[h % fs->cmax];
@@ -88,7 +87,6 @@
 	lock(bkt);
 	if(checkflag(b, Bcached)){
 		unlock(bkt);
-		qunlock(&fs->lrulk);
 		return;
 	}
 	assert(b->hnext == nil);
@@ -99,7 +97,6 @@
 	b->hnext = bkt->b;
 	bkt->b = b;
 	unlock(bkt);
-	qunlock(&fs->lrulk);
 }
 
 void
@@ -140,18 +137,18 @@
 
 	bkt = &fs->bcache[h % fs->cmax];
 
-	qlock(&fs->lrulk);
 	lock(bkt);
 	for(b = bkt->b; b != nil; b = b->hnext){
 		if(b->bp.addr == off){
  			holdblk(b);
+			qlock(&fs->lrulk);
 			lrudel(b);
+			qunlock(&fs->lrulk);
 			b->lasthold = getcallerpc(&off);
 			break;
 		}
 	}
 	unlock(bkt);
-	qunlock(&fs->lrulk);
 
 	return b;
 }
--- a/dat.h
+++ b/dat.h
@@ -390,6 +390,11 @@
 	Bptr	bp;	/* block pointer of root */
 	vlong	gen;	/* generation */
 	vlong	prev;	/* previous snapshot */
+
+	Msg	mq[64];
+	int	qsz;
+	int	nq;
+	char	buf[Bufspc];
 };
 
 struct Bfree {
@@ -441,6 +446,7 @@
 	long	roundrobin;
 	long	syncing;
 	long	nsyncers;
+	long	nreaders;
 
 	int	gotinfo;
 	QLock	synclk;
@@ -453,7 +459,7 @@
 	Conn	*conns;
 
 	Chan	*wrchan;
-	Chan	*rdchan;
+	Chan	**rdchan;
 
 	int	nworker;
 	vlong	qgen;
@@ -565,7 +571,7 @@
 	int	versioned;
 
 	/* fid hash table */
-	Lock	fidtablk;
+	Lock	fidtablk[Nfidtab];
 	Fid	*fidtab[Nfidtab];
 };
 
--- a/fs.c
+++ b/fs.c
@@ -152,7 +152,6 @@
 		c->wp = c->args;
 	unlock(&c->wl);
 	semrelease(&c->count, 1);
-
 }
 
 static void
@@ -506,9 +505,9 @@
 	Conn *c;
 
 	for(c = fs->conns; c != nil; c = c->next){
-		lock(&c->fidtablk);
 		fprint(fd, "fids:%d\n", c->rfd);
-		for(i = 0; i < Nfidtab; i++)
+		for(i = 0; i < Nfidtab; i++){
+			lock(&c->fidtablk[i]);
 			for(f = c->fidtab[i]; f != nil; f = f->next){
 				rlock(f->dent);
 				fprint(fd, "\tfid[%d]: %d [refs=%ld, k=%K, qid=%Q]\n",
@@ -515,7 +514,8 @@
 					i, f->fid, f->dent->ref, &f->dent->Key, f->dent->qid);
 				runlock(f->dent);
 			}
-		unlock(&c->fidtablk);
+			unlock(&c->fidtablk[i]);
+		}
 	}
 }
 
@@ -526,13 +526,13 @@
 	Fid *f;
 
 	h = ihash(fid) % Nfidtab;
-	lock(&c->fidtablk);
+	lock(&c->fidtablk[h]);
 	for(f = c->fidtab[h]; f != nil; f = f->next)
 		if(f->fid == fid){
 			ainc(&f->ref);
 			break;
 		}
-	unlock(&c->fidtablk);
+	unlock(&c->fidtablk[h]);
 	return f;
 }
 
@@ -562,7 +562,7 @@
 	n->mode = -1;
 	n->next = nil;
 
-	lock(&c->fidtablk);
+	lock(&c->fidtablk[h]);
 	for(o = c->fidtab[h]; o != nil; o = o->next)
 		if(o->fid == new)
 			break;
@@ -570,7 +570,7 @@
 		n->next = c->fidtab[h];
 		c->fidtab[h] = n;
 	}
-	unlock(&c->fidtablk);
+	unlock(&c->fidtablk[h]);
 
 	if(o != nil){
 		fprint(2, "fid in use: %d == %d\n", o->fid, new);
@@ -590,8 +590,8 @@
 	Fid *f, **pf;
 	u32int h;
 
-	lock(&c->fidtablk);
 	h = ihash(fid->fid) % Nfidtab;
+	lock(&c->fidtablk[h]);
 	pf = &c->fidtab[h];
 	for(f = c->fidtab[h]; f != nil; f = f->next){
 		if(f == fid){
@@ -602,7 +602,7 @@
 		pf = &f->next;
 	}
 	assert(f != nil);
-	unlock(&c->fidtablk);
+	unlock(&c->fidtablk[h]);
 }
 
 static int
@@ -2028,6 +2028,7 @@
 runfs(int, void *pc)
 {
 	Conn *c;
+	u32int h;
 	char err[128];
 	Fcall r;
 	Fmsg *m;
@@ -2054,6 +2055,7 @@
 		fs->mflush[m->tag] = m;
 		unlock(&fs->mflushlk);
 
+		h = ihash(m->fid) % fs->nreaders;
 		switch(m->type){
 		/* sync setup */
 		case Tversion:	fsversion(m);	break;
@@ -2068,10 +2070,10 @@
 		case Tremove:	chsend(fs->wrchan, m);	break;
 
 		/* reads */
-		case Tattach:	chsend(fs->rdchan, m);	break;
-		case Twalk:	chsend(fs->rdchan, m);	break;
-		case Tread:	chsend(fs->rdchan, m);	break;
-		case Tstat:	chsend(fs->rdchan, m);	break;
+		case Tattach:	chsend(fs->rdchan[h], m);	break;
+		case Twalk:	chsend(fs->rdchan[h], m);	break;
+		case Tread:	chsend(fs->rdchan[h], m);	break;
+		case Tstat:	chsend(fs->rdchan[h], m);	break;
 
 		/* both */
 		case Topen:
@@ -2079,7 +2081,7 @@
 				chsend(fs->wrchan, m);
 			else
 
-				chsend(fs->rdchan, m);
+				chsend(fs->rdchan[h], m);
 			break;
 
 		default:
@@ -2141,12 +2143,12 @@
 }
 
 void
-runread(int wid, void *)
+runread(int wid, void *ch)
 {
 	Fmsg *m;
 
 	while(1){
-		m = chrecv(fs->rdchan);
+		m = chrecv(ch);
 		epochstart(wid);
 		switch(m->type){
 		case Tattach:	fsattach(m);	break;
--- a/main.c
+++ b/main.c
@@ -219,13 +219,17 @@
 
 	rfork(RFNOTEG);
 	loadfs(dev);
-	fs->rdchan = mkchan(32);
 	fs->wrchan = mkchan(32);
 	fs->nsyncers = nproc/2;
+	fs->nreaders = 2;
 	if(fs->nsyncers > fs->narena)
 		fs->nsyncers = fs->narena;
 	for(i = 0; i < fs->nsyncers; i++)
 		qinit(&fs->syncq[i]);
+	if((fs->rdchan = malloc(fs->nreaders*sizeof(Chan*))) == nil)
+		sysfatal("malloc: %r");
+	for(i = 0; i < fs->nreaders; i++)
+		fs->rdchan[i] = mkchan(32);
 	for(i = 0; i < fs->narena; i++)
 		fs->arenas[i].sync = &fs->syncq[i%fs->nsyncers];
 	srvfd = postfd(srvname, "");
@@ -233,8 +237,8 @@
 	launch(runcons, fs->nworker++, (void*)ctlfd, "ctl");
 	launch(runwrite, fs->nworker++, nil, "mutate");
 	launch(runtasks, -1, nil, "tasks");
-	for(i = 0; i < nproc/2; i++)
-		launch(runread, fs->nworker++, nil, "readio");
+	for(i = 0; i < fs->nreaders; i++)
+		launch(runread, fs->nworker++, fs->rdchan[i], "readio");
 	for(i = 0; i < fs->nsyncers; i++)
 		launch(runsync, -1, &fs->syncq[i], "syncio");
 	for(i = 0; i < nann; i++)
@@ -247,7 +251,7 @@
 	if(stdio){
 		if((c = newconn(0, 1)) == nil)
 			sysfatal("%r");
-		runfs(-1, c);
+		launch(runfs, -1, c, "stdio");
 	}
 	exits(nil);
 }