shithub: mq

Download patch

ref: 4e663b30ce677bb6ab868d83d5ac34ab094c7e3b
parent: a43e93c246dbb5743927d132cb726a61ffb9f35f
author: kvik <[email protected]>
date: Thu Sep 3 09:08:55 EDT 2020

rename Pipe -> Stream; misc refactoring

--- a/src/mq-cat.c
+++ b/src/mq-cat.c
@@ -3,15 +3,15 @@
 
 #include "util.h"
 
-typedef struct Pipe Pipe;
+typedef struct Stream Stream;
 
-struct Pipe {
+struct Stream {
 	char *name;
 	int fd;
 };
 
-int npipes;
-Pipe *pipes;
+int nstreams;
+Stream *streams;
 
 char buf[8192];
 
@@ -37,7 +37,7 @@
 {
 	int mqfd, n, ismq;
 	Dir *dirs, *d;
-	Pipe *p;
+	Stream *s;
 
 	mqfd = eopen(name, OREAD);
 	if((n = dirreadall(mqfd, &dirs)) == -1)
@@ -47,8 +47,8 @@
 	close(mqfd);
 
 	ismq = 0;
-	npipes = n - 2;
-	pipes = p = emalloc(npipes*sizeof(Pipe));
+	nstreams = n - 2;
+	streams = s = emalloc(nstreams*sizeof(Stream));
 	for(d = dirs; n--; d++){
 		if(strncmp(d->name, "ctl", 3) == 0
 		|| strncmp(d->name, "order", 5) == 0){
@@ -55,9 +55,9 @@
 			ismq++;
 			continue;
 		}
-		p->name = estrdup(d->name);
-		p->fd = eopen(d->name, OREAD);
-		p++;
+		s->name = estrdup(d->name);
+		s->fd = eopen(d->name, OREAD);
+		s++;
 	}
 	free(dirs);
 	if(ismq != 2)
@@ -84,7 +84,7 @@
 {
 	int orderfd, n, i;
 	char name[512+1];
-	Pipe *p;
+	Stream *s;
 
 	ARGBEGIN{
 	default: usage();
@@ -99,11 +99,11 @@
 		if((n = read(orderfd, name, sizeof(name)-1)) == 0)
 			break;
 		buf[n] = 0;
-		for(i = 0, p = pipes; i < npipes; i++, p++)
-			if(strcmp(p->name, name) == 0)
-			if(p->fd != -1){
-				if(rdwr(p->fd, 1) == 0)
-					p->fd = -1;
+		for(i = 0, s = streams; i < nstreams; i++, s++)
+			if(strcmp(s->name, name) == 0)
+			if(s->fd != -1){
+				if(rdwr(s->fd, 1) == 0)
+					s->fd = -1;
 				break;
 			}
 	}
--- a/src/mq.c
+++ b/src/mq.c
@@ -7,33 +7,39 @@
 #include "list.h"
 #include "util.h"
 
-typedef struct Client Client;
 typedef struct Mq Mq;
-typedef struct Pipe Pipe;
-typedef struct Write Write;
+typedef struct Stream Stream;
+typedef struct Client Client;
 typedef struct Read Read;
+typedef struct Write Write;
 
-struct Client {
-	Write *cursor; /* reader position */
-};
-
 struct Mq {
-	Pipe *pipes;
-	Pipe *order;
+	Stream *group;
+	Stream *order;
 
 	/* configuration */
 	int replay;
 };
 
-struct Pipe {
+struct Stream {
 	List;
 
-	Mq *group; /* membership */
+	Mq *mq; /* parent */
 
-	Write *history; /* stored messages */
+	Write *queue; /* stored messages */
 	Read *reads; /* readers queue */
 };
 
+struct Client {
+	Write *cursor; /* reader position */
+};
+
+struct Read {
+	List;
+
+	Req *r;
+};
+
 struct Write {
 	List;
 
@@ -43,16 +49,10 @@
 	uchar *data;
 };
 
-struct Read {
-	List;
-
-	Req *r;
-};
-
 enum {
 	Qroot,
 		Qmq,
-			Qpipe,
+			Qstream,
 			Qorder,
 			Qctl,
 };
@@ -76,14 +76,14 @@
 File*
 mqcreate(File *parent, char *name, char *uid, ulong perm)
 {
-	Pipe *pipealloc(Mq*);
-	void *pipeclose(Pipe*);
+	Stream *streamalloc(Mq*);
+	void *streamclose(Stream*);
 	File *d, *ctl, *order;
 	Mq *mq;
 
 	mq = emalloc(sizeof(Mq));
-	mq->pipes = (Pipe*)listalloc();
-	mq->order = (Pipe*)pipealloc(mq);
+	mq->group = (Stream*)listalloc();
+	mq->order = (Stream*)streamalloc(mq);
 	mq->replay = 0;
 
 	ctl = order = nil;
@@ -103,8 +103,8 @@
 
 	return d;
 err:
-	free(mq->pipes);
-	pipeclose(mq->order);
+	free(mq->group);
+	streamclose(mq->order);
 	if(d) closefile(d);
 	if(ctl) closefile(ctl);
 	if(order) closefile(order);
@@ -119,27 +119,27 @@
 	free(mq);
 }
 
-Pipe*
-pipealloc(Mq *mq)
+Stream*
+streamalloc(Mq *mq)
 {
-	Pipe *p;
+	Stream *s;
 	
-	p = emalloc(sizeof(Pipe));
-	p->group = mq;
-	p->history = (Write*)listalloc();
-	p->reads = (Read*)listalloc();
-	return p;
+	s = emalloc(sizeof(Stream));
+	s->mq = mq;
+	s->queue = (Write*)listalloc();
+	s->reads = (Read*)listalloc();
+	return s;
 }
 
 void
-pipeclose(Pipe *p)
+streamclose(Stream *s)
 {
 	Read *r;
 	Write *w;
 
-	listunlink(p);
-	if(p->reads)
-	foreach(Read*, p->reads){
+	listunlink(s);
+	if(s->reads)
+	foreach(Read*, s->reads){
 		/* eof these? */
 		r = ptr;
 		ptr = (Read*)r->tail;
@@ -146,37 +146,36 @@
 		listunlink(r);
 		free(r);
 	}
-	free(p->reads);
-	if(p->history)
-	foreach(Write*, p->history){
+	free(s->reads);
+	if(s->queue)
+	foreach(Write*, s->queue){
 		w = ptr;
 		ptr = (Write*)w->tail;
 		listunlink(w);
 		free(w);
 	}
-	free(p->history);
-	free(p);
+	free(s->queue);
+	free(s);
 }
 
 File*
-pipecreate(File *parent, char *name, char *uid, ulong perm)
+streamcreate(File *parent, char *name, char *uid, ulong perm)
 {
 	File *f;
 	Mq *mq;
-	Pipe *p;
+	Stream *s;
 
 	mq = parent->aux;
-	p = pipealloc(mq);
-	if((f = createfile(parent, name, uid, perm, p)) == nil){
-		pipeclose(p);
+	s = streamalloc(mq);
+	if((f = createfile(parent, name, uid, perm, s)) == nil){
+		streamclose(s);
 		return nil;
 	}
-	listlink(mq->pipes, p);
-	filesettype(f, Qpipe);
+	listlink(mq->group, s);
+	filesettype(f, Qstream);
 	return f;
 }
 
-
 void
 respondread(Req *r, Write *w)
 {
@@ -186,19 +185,19 @@
 }
 
 void
-piperead(Req *r)
+streamread(Req *r)
 {
 	File *f = r->fid->file;
-	Pipe *p = f->aux;
+	Stream *s = f->aux;
 	Client *c = r->fid->aux;
 	Read *rd;
 
-	/* Delay the response if there's no history
+	/* Delay the response if the queue is empty
 	 * or if we've already caught up. */
-	if(listempty(p->history) || listend(c->cursor)){
+	if(listempty(s->queue) || listend(c->cursor)){
 		rd = emalloc(sizeof(Read));
 		rd->r = r;
-		listlink(p->reads, rd);
+		listlink(s->reads, rd);
 		return;
 	}
 	c->cursor = (Write*)c->cursor->link;
@@ -219,17 +218,17 @@
 pipewrite(Req *r)
 {
 	File *f = r->fid->file;
-	Pipe *p = f->aux;
-	Mq *mq = p->group;
+	Stream *s = f->aux;
+	Mq *mq = s->mq;
 	Write *w, *o;
 	long n;
 
-	/* Commit to history */
+	/* Commit to queue */
 	w = writealloc(r->ifcall.count);
 	w->count = r->ifcall.count;
 	w->offset = r->ifcall.offset;
 	memmove(w->data, r->ifcall.data, w->count);
-	listlink(p->history->tail, w);
+	listlink(s->queue->tail, w);
 
 	/* Commit to order */
 	n = strlen(f->name)+1;
@@ -237,10 +236,10 @@
 	o->offset = 0;
 	o->count = n;
 	memmove(o->data, f->name, n);
-	listlink(mq->order->history->tail, o);
+	listlink(mq->order->queue->tail, o);
 
-	/* Kick the blocked pipe readers */
-	foreach(Read*, p->reads){
+	/* Kick the blocked stream readers */
+	foreach(Read*, s->reads){
 		Client *c = ptr->r->fid->aux;
 
 		respondread(ptr->r, w);
@@ -346,7 +345,7 @@
 		if(perm&DMDIR)
 			f = mqcreate(parent, name, uid, perm);
 		else
-			f = pipecreate(parent, name, uid, perm);
+			f = streamcreate(parent, name, uid, perm);
 		break;
 	}
 	if(f == nil)
@@ -361,16 +360,16 @@
 	File *f = r->fid->file;
 
 	switch(filetype(f)){
-	case Qpipe:
+	case Qstream:
 	case Qorder: {
-		Pipe *p = f->aux;
+		Stream *s = f->aux;
 		Client *c;
 
 		c = r->fid->aux = emalloc(sizeof(Client));
-		if(p->group->replay)
-			c->cursor = (Write*)p->history;
+		if(s->mq->replay)
+			c->cursor = (Write*)s->queue;
 		else
-			c->cursor = (Write*)p->history->tail;
+			c->cursor = (Write*)s->queue->tail;
 		break;
 	}}
 	respond(r, nil);
@@ -382,7 +381,7 @@
 	File *f = r->fid->file;
 
 	switch(filetype(f)){
-	case Qpipe:
+	case Qstream:
 		pipewrite(r);
 		break;
 	case Qctl:
@@ -400,9 +399,9 @@
 	File *f = r->fid->file;
 
 	switch(filetype(f)){
-	case Qpipe:
+	case Qstream:
 	case Qorder:
-		piperead(r);
+		streamread(r);
 		break;
 	default:
 		respond(r, "forbidden");
@@ -416,13 +415,13 @@
 	File *f = old->fid->file;
 
 	switch(filetype(f)){
-	case Qpipe:
+	case Qstream:
 	case Qorder: {
-		Pipe *p = f->aux;
+		Stream *s = f->aux;
 
 		if(old->ifcall.type != Tread)
 			break;
-		foreach(Read*, p->reads){
+		foreach(Read*, s->reads){
 			if(ptr->r == old){
 				free(listunlink(ptr));
 				break;
@@ -448,8 +447,8 @@
 	case Qmq:
 		mqclose(f);
 		break;
-	case Qpipe:
-		pipeclose(f->aux);
+	case Qstream:
+		streamclose(f->aux);
 		break;
 	}
 	return;