ref: 827020f6863490c649470087008ab5ca8138afc0
dir: /sys/src/cmd/venti/srv/buildindex.c/
/* * Rebuild the index from scratch, in place. */ #include "stdinc.h" #include "dat.h" #include "fns.h" enum { MinBufSize = 64*1024, MaxBufSize = 4*1024*1024, }; int dumb; int errors; char **isect; int nisect; int bloom; int zero; u32int isectmem; u64int totalbuckets; u64int totalclumps; Channel *arenadonechan; Channel *isectdonechan; Index *ix; u64int arenaentries; u64int skipentries; u64int indexentries; static int shouldprocess(ISect*); static void isectproc(void*); static void arenapartproc(void*); void usage(void) { fprint(2, "usage: buildindex [-b] [-i isect]... [-M imem] venti.conf\n"); threadexitsall("usage"); } void threadmain(int argc, char *argv[]) { int fd, i, napart, nfinish, maxdisks; u32int bcmem, imem; Config conf; Part *p; maxdisks = 100000; ventifmtinstall(); imem = 256*1024*1024; ARGBEGIN{ case 'b': bloom = 1; break; case 'd': /* debugging - make sure to run all 3 passes */ dumb = 1; break; case 'i': isect = vtrealloc(isect, (nisect+1)*sizeof(isect[0])); isect[nisect++] = EARGF(usage()); break; case 'M': imem = unittoull(EARGF(usage())); break; case 'm': /* temporary - might go away */ maxdisks = atoi(EARGF(usage())); break; default: usage(); break; }ARGEND if(argc != 1) usage(); if(initventi(argv[0], &conf) < 0) sysfatal("can't init venti: %r"); ix = mainindex; if(nisect == 0 && ix->bloom) bloom = 1; if(bloom && ix->bloom && resetbloom(ix->bloom) < 0) sysfatal("loadbloom: %r"); if(bloom && !ix->bloom) sysfatal("-b specified but no bloom filter"); if(!bloom) ix->bloom = nil; isectmem = imem/ix->nsects; /* * safety first - only need read access to arenas */ p = nil; for(i=0; i<ix->narenas; i++){ if(ix->arenas[i]->part != p){ p = ix->arenas[i]->part; if((fd = open(p->filename, OREAD)) < 0) sysfatal("cannot reopen %s: %r", p->filename); dup(fd, p->fd); close(fd); } } /* * need a block for every arena */ bcmem = maxblocksize * (mainindex->narenas + 16); if(0) fprint(2, "initialize %d bytes of disk block cache\n", bcmem); initdcache(bcmem); totalclumps = 0; for(i=0; i<ix->narenas; i++) totalclumps += ix->arenas[i]->diskstats.clumps; totalbuckets = 0; for(i=0; i<ix->nsects; i++) totalbuckets += ix->sects[i]->blocks; fprint(2, "%,lld clumps, %,lld buckets\n", totalclumps, totalbuckets); /* start index procs */ fprint(2, "%T read index\n"); isectdonechan = chancreate(sizeof(void*), 0); for(i=0; i<ix->nsects; i++){ if(shouldprocess(ix->sects[i])){ ix->sects[i]->writechan = chancreate(sizeof(IEntry), 0); vtproc(isectproc, ix->sects[i]); } } for(i=0; i<nisect; i++) if(isect[i]) fprint(2, "warning: did not find index section %s\n", isect[i]); /* start arena procs */ p = nil; napart = 0; nfinish = 0; arenadonechan = chancreate(sizeof(void*), 0); for(i=0; i<ix->narenas; i++){ if(ix->arenas[i]->part != p){ p = ix->arenas[i]->part; vtproc(arenapartproc, p); if(++napart >= maxdisks){ recvp(arenadonechan); nfinish++; } } } /* wait for arena procs to finish */ for(nfinish=0; nfinish<napart; nfinish++) recvp(arenadonechan); /* tell index procs to finish */ for(i=0; i<ix->nsects; i++) if(ix->sects[i]->writechan) send(ix->sects[i]->writechan, nil); /* wait for index procs to finish */ for(i=0; i<ix->nsects; i++) if(ix->sects[i]->writechan) recvp(isectdonechan); if(ix->bloom && writebloom(ix->bloom) < 0) fprint(2, "writing bloom filter: %r\n"); fprint(2, "%T done arenaentries=%,lld indexed=%,lld (nskip=%,lld)\n", arenaentries, indexentries, skipentries); threadexitsall(nil); } static int shouldprocess(ISect *is) { int i; if(nisect == 0) return 1; for(i=0; i<nisect; i++) if(isect[i] && strcmp(isect[i], is->name) == 0){ isect[i] = nil; return 1; } return 0; } static void add(u64int *a, u64int n) { static Lock l; lock(&l); *a += n; unlock(&l); } /* * Read through an arena partition and send each of its IEntries * to the appropriate index section. When finished, send on * arenadonechan. */ enum { ClumpChunks = 32*1024, }; static void arenapartproc(void *v) { int i, j, n, nskip, x; u32int clump; u64int addr, tot; Arena *a; ClumpInfo *ci, *cis; IEntry ie; Part *p; p = v; threadsetname("arenaproc %s", p->name); nskip = 0; tot = 0; cis = MKN(ClumpInfo, ClumpChunks); for(i=0; i<ix->narenas; i++){ a = ix->arenas[i]; if(a->part != p) continue; if(a->memstats.clumps) fprint(2, "%T arena %s: %d entries\n", a->name, a->memstats.clumps); /* * Running the loop backwards accesses the * clump info blocks forwards, since they are * stored in reverse order at the end of the arena. * This speeds things slightly. */ addr = ix->amap[i].start + a->memstats.used; for(clump=a->memstats.clumps; clump > 0; clump-=n){ n = ClumpChunks; if(n > clump) n = clump; if(readclumpinfos(a, clump-n, cis, n) != n){ fprint(2, "%T arena %s: directory read: %r\n", a->name); errors = 1; break; } for(j=n-1; j>=0; j--){ ci = &cis[j]; ie.ia.type = ci->type; ie.ia.size = ci->uncsize; addr -= ci->size + ClumpSize; ie.ia.addr = addr; ie.ia.blocks = (ci->size + ClumpSize + (1<<ABlockLog)-1) >> ABlockLog; scorecp(ie.score, ci->score); if(ci->type == VtCorruptType) nskip++; else{ tot++; x = indexsect(ix, ie.score); assert(0 <= x && x < ix->nsects); if(ix->sects[x]->writechan) send(ix->sects[x]->writechan, &ie); if(ix->bloom) markbloomfilter(ix->bloom, ie.score); } } } if(addr != ix->amap[i].start) fprint(2, "%T arena %s: clump miscalculation %lld != %lld\n", a->name, addr, ix->amap[i].start); } add(&arenaentries, tot); add(&skipentries, nskip); sendp(arenadonechan, p); } /* * Convert score into relative bucket number in isect. * Can pass a packed ientry instead of score - score is first. */ static u32int score2bucket(ISect *is, uchar *score) { u32int b; b = hashbits(score, 32)/ix->div; if(b < is->start || b >= is->stop){ fprint(2, "score2bucket: score=%V div=%d b=%ud start=%ud stop=%ud\n", score, ix->div, b, is->start, is->stop); } assert(is->start <= b && b < is->stop); return b - is->start; } /* * Convert offset in index section to bucket number. */ static u32int offset2bucket(ISect *is, u64int offset) { u32int b; assert(is->blockbase <= offset); offset -= is->blockbase; b = offset/is->blocksize; assert(b < is->stop-is->start); return b; } /* * Convert bucket number to offset. */ static u64int bucket2offset(ISect *is, u32int b) { assert(b <= is->stop-is->start); return is->blockbase + (u64int)b*is->blocksize; } /* * IEntry buffers to hold initial round of spraying. */ typedef struct Buf Buf; struct Buf { Part *part; /* partition being written */ uchar *bp; /* current block */ uchar *ep; /* end of block */ uchar *wp; /* write position in block */ u64int boffset; /* start offset */ u64int woffset; /* next write offset */ u64int eoffset; /* end offset */ u32int nentry; /* number of entries written */ }; static void bflush(Buf *buf) { u32int bufsize; if(buf->woffset >= buf->eoffset) sysfatal("buf index chunk overflow - need bigger index"); bufsize = buf->ep - buf->bp; if(writepart(buf->part, buf->woffset, buf->bp, bufsize) < 0){ fprint(2, "write %s: %r\n", buf->part->name); errors = 1; } buf->woffset += bufsize; memset(buf->bp, 0, bufsize); buf->wp = buf->bp; } static void bwrite(Buf *buf, IEntry *ie) { if(buf->wp+IEntrySize > buf->ep) bflush(buf); assert(buf->bp <= buf->wp && buf->wp < buf->ep); packientry(ie, buf->wp); buf->wp += IEntrySize; assert(buf->bp <= buf->wp && buf->wp <= buf->ep); buf->nentry++; } /* * Minibuffer. In-memory data structure holds our place * in the buffer but has no block data. We are writing and * reading the minibuffers at the same time. (Careful!) */ typedef struct Minibuf Minibuf; struct Minibuf { u64int boffset; /* start offset */ u64int roffset; /* read offset */ u64int woffset; /* write offset */ u64int eoffset; /* end offset */ u32int nentry; /* # entries left to read */ u32int nwentry; /* # entries written */ }; /* * Index entry pool. Used when trying to shuffle around * the entries in a big buffer into the corresponding M minibuffers. * Sized to hold M*EntriesPerBlock entries, so that there will always * either be room in the pool for another block worth of entries * or there will be an entire block worth of sorted entries to * write out. */ typedef struct IEntryLink IEntryLink; typedef struct IPool IPool; struct IEntryLink { uchar ie[IEntrySize]; /* raw IEntry */ IEntryLink *next; /* next in chain */ }; struct IPool { ISect *isect; u32int buck0; /* first bucket in pool */ u32int mbufbuckets; /* buckets per minibuf */ IEntryLink *entry; /* all IEntryLinks */ u32int nentry; /* # of IEntryLinks */ IEntryLink *free; /* free list */ u32int nfree; /* # on free list */ Minibuf *mbuf; /* all minibufs */ u32int nmbuf; /* # of minibufs */ IEntryLink **mlist; /* lists for each minibuf */ u32int *mcount; /* # on each mlist[i] */ u32int bufsize; /* block buffer size */ uchar *rbuf; /* read buffer */ uchar *wbuf; /* write buffer */ u32int epbuf; /* entries per block buffer */ }; /* static int countsokay(IPool *p) { int i; u64int n; n = 0; for(i=0; i<p->nmbuf; i++) n += p->mcount[i]; n += p->nfree; if(n != p->nentry){ print("free %ud:", p->nfree); for(i=0; i<p->nmbuf; i++) print(" %ud", p->mcount[i]); print(" = %lld nentry: %ud\n", n, p->nentry); } return n == p->nentry; } */ static IPool* mkipool(ISect *isect, Minibuf *mbuf, u32int nmbuf, u32int mbufbuckets, u32int bufsize) { u32int i, nentry; uchar *data; IPool *p; IEntryLink *l; nentry = (nmbuf+1)*bufsize / IEntrySize; p = ezmalloc(sizeof(IPool) +nentry*sizeof(IEntry) +nmbuf*sizeof(IEntryLink*) +nmbuf*sizeof(u32int) +3*bufsize); p->isect = isect; p->mbufbuckets = mbufbuckets; p->bufsize = bufsize; p->entry = (IEntryLink*)(p+1); p->nentry = nentry; p->mlist = (IEntryLink**)(p->entry+nentry); p->mcount = (u32int*)(p->mlist+nmbuf); p->nmbuf = nmbuf; p->mbuf = mbuf; data = (uchar*)(p->mcount+nmbuf); data += bufsize - (uintptr)data%bufsize; p->rbuf = data; p->wbuf = data+bufsize; p->epbuf = bufsize/IEntrySize; for(i=0; i<p->nentry; i++){ l = &p->entry[i]; l->next = p->free; p->free = l; p->nfree++; } return p; } /* * Add the index entry ie to the pool p. * Caller must know there is room. */ static void ipoolinsert(IPool *p, uchar *ie) { u32int buck, x; IEntryLink *l; assert(p->free != nil); buck = score2bucket(p->isect, ie); x = (buck-p->buck0) / p->mbufbuckets; if(x >= p->nmbuf){ fprint(2, "buck=%ud mbufbucket=%ud x=%ud\n", buck, p->mbufbuckets, x); } assert(x < p->nmbuf); l = p->free; p->free = l->next; p->nfree--; memmove(l->ie, ie, IEntrySize); l->next = p->mlist[x]; p->mlist[x] = l; p->mcount[x]++; } /* * Pull out a block containing as many * entries as possible for minibuffer x. */ static u32int ipoolgetbuf(IPool *p, u32int x) { uchar *bp, *ep, *wp; IEntryLink *l; u32int n; bp = p->wbuf; ep = p->wbuf + p->bufsize; n = 0; assert(x < p->nmbuf); for(wp=bp; wp+IEntrySize<=ep && p->mlist[x]; wp+=IEntrySize){ l = p->mlist[x]; p->mlist[x] = l->next; p->mcount[x]--; memmove(wp, l->ie, IEntrySize); l->next = p->free; p->free = l; p->nfree++; n++; } memset(wp, 0, ep-wp); return n; } /* * Read a block worth of entries from the minibuf * into the pool. Caller must know there is room. */ static void ipoolloadblock(IPool *p, Minibuf *mb) { u32int i, n; assert(mb->nentry > 0); assert(mb->roffset >= mb->woffset); assert(mb->roffset < mb->eoffset); n = p->bufsize/IEntrySize; if(n > mb->nentry) n = mb->nentry; if(readpart(p->isect->part, mb->roffset, p->rbuf, p->bufsize) < 0) fprint(2, "readpart %s: %r\n", p->isect->part->name); else{ for(i=0; i<n; i++) ipoolinsert(p, p->rbuf+i*IEntrySize); } mb->nentry -= n; mb->roffset += p->bufsize; } /* * Write out a block worth of entries to minibuffer x. * If necessary, pick up the data there before overwriting it. */ static void ipoolflush0(IPool *pool, u32int x) { u32int bufsize; Minibuf *mb; mb = pool->mbuf+x; bufsize = pool->bufsize; mb->nwentry += ipoolgetbuf(pool, x); if(mb->nentry > 0 && mb->roffset == mb->woffset){ assert(pool->nfree >= pool->bufsize/IEntrySize); /* * There will be room in the pool -- we just * removed a block worth. */ ipoolloadblock(pool, mb); } if(writepart(pool->isect->part, mb->woffset, pool->wbuf, bufsize) < 0) fprint(2, "writepart %s: %r\n", pool->isect->part->name); mb->woffset += bufsize; } /* * Write out some full block of entries. * (There must be one -- the pool is almost full!) */ static void ipoolflush1(IPool *pool) { u32int i; assert(pool->nfree <= pool->epbuf); for(i=0; i<pool->nmbuf; i++){ if(pool->mcount[i] >= pool->epbuf){ ipoolflush0(pool, i); return; } } /* can't be reached - someone must be full */ sysfatal("ipoolflush1"); } /* * Flush all the entries in the pool out to disk. * Nothing more to read from disk. */ static void ipoolflush(IPool *pool) { u32int i; for(i=0; i<pool->nmbuf; i++) while(pool->mlist[i]) ipoolflush0(pool, i); assert(pool->nfree == pool->nentry); } /* * Third pass. Pick up each minibuffer from disk into * memory and then write out the buckets. */ /* * Compare two packed index entries. * Usual ordering except break ties by putting higher * index addresses first (assumes have duplicates * due to corruption in the lower addresses). */ static int ientrycmpaddr(const void *va, const void *vb) { int i; uchar *a, *b; a = (uchar*)va; b = (uchar*)vb; i = ientrycmp(a, b); if(i) return i; return -memcmp(a+IEntryAddrOff, b+IEntryAddrOff, 8); } static void zerorange(Part *p, u64int o, u64int e) { static uchar zero[MaxIoSize]; u32int n; for(; o<e; o+=n){ n = sizeof zero; if(o+n > e) n = e-o; if(writepart(p, o, zero, n) < 0) fprint(2, "writepart %s: %r\n", p->name); } } /* * Load a minibuffer into memory and write out the * corresponding buckets. */ static void sortminibuffer(ISect *is, Minibuf *mb, uchar *buf, u32int nbuf, u32int bufsize) { uchar *buckdata, *p, *q, *ep; u32int b, lastb, memsize, n; u64int o; IBucket ib; Part *part; part = is->part; buckdata = emalloc(is->blocksize); if(mb->nwentry == 0) return; /* * read entire buffer. */ assert(mb->nwentry*IEntrySize <= mb->woffset-mb->boffset); assert(mb->woffset-mb->boffset <= nbuf); if(readpart(part, mb->boffset, buf, mb->woffset-mb->boffset) < 0){ fprint(2, "readpart %s: %r\n", part->name); errors = 1; return; } assert(*(uint*)buf != 0xa5a5a5a5); /* * remove fragmentation due to IEntrySize * not evenly dividing Bufsize */ memsize = (bufsize/IEntrySize)*IEntrySize; for(o=mb->boffset, p=q=buf; o<mb->woffset; o+=bufsize){ memmove(p, q, memsize); p += memsize; q += bufsize; } ep = buf + mb->nwentry*IEntrySize; assert(ep <= buf+nbuf); /* * sort entries */ qsort(buf, mb->nwentry, IEntrySize, ientrycmpaddr); /* * write buckets out */ n = 0; lastb = offset2bucket(is, mb->boffset); for(p=buf; p<ep; p=q){ b = score2bucket(is, p); for(q=p; q<ep && score2bucket(is, q)==b; q+=IEntrySize) ; if(lastb+1 < b && zero) zerorange(part, bucket2offset(is, lastb+1), bucket2offset(is, b)); if(IBucketSize+(q-p) > is->blocksize) sysfatal("bucket overflow - make index bigger"); memmove(buckdata+IBucketSize, p, q-p); ib.n = (q-p)/IEntrySize; n += ib.n; packibucket(&ib, buckdata, is->bucketmagic); if(writepart(part, bucket2offset(is, b), buckdata, is->blocksize) < 0) fprint(2, "write %s: %r\n", part->name); lastb = b; } if(lastb+1 < is->stop-is->start && zero) zerorange(part, bucket2offset(is, lastb+1), bucket2offset(is, is->stop - is->start)); if(n != mb->nwentry) fprint(2, "sortminibuffer bug: n=%ud nwentry=%ud have=%zd\n", n, mb->nwentry, (ep-buf)/IEntrySize); free(buckdata); } static void isectproc(void *v) { u32int buck, bufbuckets, bufsize, epbuf, i, j; u32int mbufbuckets, n, nbucket, nn, space; u32int nbuf, nminibuf, xminiclump, prod; u64int blocksize, offset, xclump; uchar *data, *p; Buf *buf; IEntry ie; IPool *ipool; ISect *is; Minibuf *mbuf, *mb; is = v; blocksize = is->blocksize; nbucket = is->stop - is->start; /* * Three passes: * pass 1 - write index entries from arenas into * large sequential sections on index disk. * requires nbuf * bufsize memory. * * pass 2 - split each section into minibufs. * requires nminibuf * bufsize memory. * * pass 3 - read each minibuf into memory and * write buckets out. * requires entries/minibuf * IEntrySize memory. * * The larger we set bufsize the less seeking hurts us. * * The fewer sections and minibufs we have, the less * seeking hurts us. * * The fewer sections and minibufs we have, the * more entries we end up with in each minibuf * at the end. * * Shoot for using half our memory to hold each * minibuf. The chance of a random distribution * getting off by 2x is quite low. * * Once that is decided, figure out the smallest * nminibuf and nsection/biggest bufsize we can use * and still fit in the memory constraints. */ /* expected number of clump index entries we'll see */ xclump = nbucket * (double)totalclumps/totalbuckets; /* number of clumps we want to see in a minibuf */ xminiclump = isectmem/2/IEntrySize; /* total number of minibufs we need */ prod = (xclump+xminiclump-1) / xminiclump; /* if possible, skip second pass */ if(!dumb && prod*MinBufSize < isectmem){ nbuf = prod; nminibuf = 1; }else{ /* otherwise use nsection = sqrt(nmini) */ for(nbuf=1; nbuf*nbuf<prod; nbuf++) ; if(nbuf*MinBufSize > isectmem) sysfatal("not enough memory"); nminibuf = nbuf; } if (nbuf == 0) { fprint(2, "%s: brand-new index, no work to do\n", argv0); exits(0); } /* size buffer to use extra memory */ bufsize = MinBufSize; while(bufsize*2*nbuf <= isectmem && bufsize < MaxBufSize) bufsize *= 2; data = emalloc(nbuf*bufsize); epbuf = bufsize/IEntrySize; fprint(2, "%T %s: %,ud buckets, %,ud groups, %,ud minigroups, %,ud buffer\n", is->part->name, nbucket, nbuf, nminibuf, bufsize); /* * Accept index entries from arena procs. */ buf = MKNZ(Buf, nbuf); p = data; offset = is->blockbase; bufbuckets = (nbucket+nbuf-1)/nbuf; for(i=0; i<nbuf; i++){ buf[i].part = is->part; buf[i].bp = p; buf[i].wp = p; p += bufsize; buf[i].ep = p; buf[i].boffset = offset; buf[i].woffset = offset; if(i < nbuf-1){ offset += bufbuckets*blocksize; buf[i].eoffset = offset; }else{ offset = is->blockbase + nbucket*blocksize; buf[i].eoffset = offset; } } assert(p == data+nbuf*bufsize); n = 0; while(recv(is->writechan, &ie) == 1){ if(ie.ia.addr == 0) break; buck = score2bucket(is, ie.score); i = buck/bufbuckets; assert(i < nbuf); bwrite(&buf[i], &ie); n++; } add(&indexentries, n); nn = 0; for(i=0; i<nbuf; i++){ bflush(&buf[i]); buf[i].bp = nil; buf[i].ep = nil; buf[i].wp = nil; nn += buf[i].nentry; } if(n != nn) fprint(2, "isectproc bug: n=%ud nn=%ud\n", n, nn); free(data); fprint(2, "%T %s: reordering\n", is->part->name); /* * Rearrange entries into minibuffers and then * split each minibuffer into buckets. * The minibuffer must be sized so that it is * a multiple of blocksize -- ipoolloadblock assumes * that each minibuf starts aligned on a blocksize * boundary. */ mbuf = MKN(Minibuf, nminibuf); mbufbuckets = (bufbuckets+nminibuf-1)/nminibuf; while(mbufbuckets*blocksize % bufsize) mbufbuckets++; for(i=0; i<nbuf; i++){ /* * Set up descriptors. */ n = buf[i].nentry; nn = 0; offset = buf[i].boffset; memset(mbuf, 0, nminibuf*sizeof(mbuf[0])); for(j=0; j<nminibuf; j++){ mb = &mbuf[j]; mb->boffset = offset; offset += mbufbuckets*blocksize; if(offset > buf[i].eoffset) offset = buf[i].eoffset; mb->eoffset = offset; mb->roffset = mb->boffset; mb->woffset = mb->boffset; mb->nentry = epbuf * (mb->eoffset - mb->boffset)/bufsize; if(mb->nentry > buf[i].nentry) mb->nentry = buf[i].nentry; buf[i].nentry -= mb->nentry; nn += mb->nentry; } if(n != nn) fprint(2, "isectproc bug2: n=%ud nn=%ud (i=%d)\n", n, nn, i);; /* * Rearrange. */ if(!dumb && nminibuf == 1){ mbuf[0].nwentry = mbuf[0].nentry; mbuf[0].woffset = buf[i].woffset; }else{ ipool = mkipool(is, mbuf, nminibuf, mbufbuckets, bufsize); ipool->buck0 = bufbuckets*i; for(j=0; j<nminibuf; j++){ mb = &mbuf[j]; while(mb->nentry > 0){ if(ipool->nfree < epbuf){ ipoolflush1(ipool); /* ipoolflush1 might change mb->nentry */ continue; } assert(ipool->nfree >= epbuf); ipoolloadblock(ipool, mb); } } ipoolflush(ipool); nn = 0; for(j=0; j<nminibuf; j++) nn += mbuf[j].nwentry; if(n != nn) fprint(2, "isectproc bug3: n=%ud nn=%ud (i=%d)\n", n, nn, i); free(ipool); } /* * Make buckets. */ space = 0; for(j=0; j<nminibuf; j++) if(space < mbuf[j].woffset - mbuf[j].boffset) space = mbuf[j].woffset - mbuf[j].boffset; data = emalloc(space); for(j=0; j<nminibuf; j++){ mb = &mbuf[j]; sortminibuffer(is, mb, data, space, bufsize); } free(data); } sendp(isectdonechan, is); }