ref: 08c39320a46ad94fba9ba3310444f136dd9258f5
parent: 9a90e50142c792fdc06dc4faa8a582f441124aad
author: cinap_lenrek <cinap_lenrek@localhost>
date: Sun Aug 21 23:03:27 EDT 2011
libthread: reimplemented i/o procs using new interrupt ctl message
--- a/sys/man/2/ioproc
+++ b/sys/man/2/ioproc
@@ -3,6 +3,7 @@
closeioproc,
iocall,
ioclose,
+ioflush,
iointerrupt,
iodial,
ioopen,
@@ -33,6 +34,7 @@
long iowrite(Ioproc *io, int fd, void *a, long n);
int iodial(Ioproc *io, char *addr, char *local, char *dir, char *cdfp);
.XX
+int ioflush(Ioproc *io);
void iointerrupt(Ioproc *io);
void closeioproc(Ioproc *io);
.XX
@@ -74,14 +76,16 @@
.IR dial (2))
in the slave process associated with
.IR io .
-It is an error to execute more than one call
-at a time in an I/O proc.
.PP
.I Iointerrupt
-interrupts the call currently executing in the I/O proc.
-If no call is executing,
+interrupts the next or currently executing call in the I/O proc. If
+there was no call executing, the interrupt will stay pending and the
+next I/O call will get interrupted.
+.PP
+.I Ioflush
+executes a non-op in the I/O proc. It is commonly called after
.IR iointerrupt
-is a no-op.
+to clear a pending interrupt.
.PP
.I Closeioproc
terminates the I/O proc and frees the associated
--- a/sys/src/libthread/iocall.c
+++ b/sys/src/libthread/iocall.c
@@ -6,47 +6,23 @@
long
iocall(Ioproc *io, long (*op)(va_list*), ...)
{
- int ret, inted;
- Ioproc *msg;
+ Iocall r;
- if(send(io->c, &io) == -1){
+ r.op = op;
+ va_start(r.arg, op);
+ if(sendp(io->c, &r) < 0){
werrstr("interrupted");
return -1;
}
- assert(!io->inuse);
- io->inuse = 1;
- io->op = op;
- va_start(io->arg, op);
- msg = io;
- inted = 0;
- while(send(io->creply, &msg) == -1){
- msg = nil;
- inted = 1;
+ while(recv(io->creply, nil) < 0){
+ if(canqlock(io)){
+ if(++io->intr == 1)
+ write(io->ctl, "interrupt", 9);
+ qunlock(io);
+ }
}
- if(inted){
- werrstr("interrupted");
- return -1;
- }
-
- /*
- * If we get interrupted, we have to stick around so that
- * the IO proc has someone to talk to. Send it an interrupt
- * and try again.
- */
- inted = 0;
- while(recv(io->creply, nil) == -1){
- inted = 1;
- iointerrupt(io);
- }
- USED(inted);
- va_end(io->arg);
- ret = io->ret;
- if(ret < 0)
- errstr(io->err, sizeof io->err);
- io->inuse = 0;
-
- /* release resources */
- while(send(io->creply, &io) == -1)
- ;
- return ret;
+ va_end(r.arg);
+ if(r.ret < 0)
+ errstr(r.err, sizeof r.err);
+ return r.ret;
}
--- /dev/null
+++ b/sys/src/libthread/ioflush.c
@@ -1,0 +1,16 @@
+#include <u.h>
+#include <libc.h>
+#include <thread.h>
+#include "threadimpl.h"
+
+long
+_ioflush(va_list *)
+{
+ return 0;
+}
+
+int
+ioflush(Ioproc *io)
+{
+ return iocall(io, _ioflush);
+}
--- a/sys/src/libthread/ioproc.c
+++ b/sys/src/libthread/ioproc.c
@@ -11,55 +11,80 @@
void
iointerrupt(Ioproc *io)
{
- if(!io->inuse)
- return;
- threadint(io->tid);
+ qlock(io);
+ if(++io->intr == 1)
+ write(io->ctl, "interrupt", 9);
+ qunlock(io);
}
static void
xioproc(void *a)
{
- Ioproc *io, *x;
- io = a;
- /*
- * first recvp acquires the ioproc.
- * second tells us that the data is ready.
- */
+ Channel *c;
+ Ioproc *io;
+ Iocall *r;
+
+ c = a;
+ if(io = mallocz(sizeof(*io), 1)){
+ char buf[128];
+
+ snprint(buf, sizeof(buf), "/proc/%d/ctl", getpid());
+ if((io->ctl = open(buf, OWRITE)) < 0){
+ free(io);
+ io = nil;
+ } else {
+ if((io->creply = chancreate(sizeof(void*), 0)) == nil){
+ close(io->ctl);
+ free(io);
+ io = nil;
+ } else
+ io->c = c;
+ }
+ }
+ while(send(c, &io) < 0)
+ ;
+ if(io == nil)
+ return;
+
for(;;){
- while(recv(io->c, &x) == -1)
+ while(recv(io->c, &r) < 0)
;
- if(x == 0) /* our cue to leave */
+ if(r == 0)
break;
- assert(x == io);
-
- /* caller is now committed -- even if interrupted he'll return */
- while(recv(io->creply, &x) == -1)
+ if(io->intr){
+ r->ret = -1;
+ strcpy(r->err, "interrupted");
+ } else if((r->ret = r->op(&r->arg)) < 0)
+ rerrstr(r->err, sizeof r->err);
+ qlock(io);
+ if(io->intr){
+ io->intr = 0;
+ write(io->ctl, "nointerrupt", 11);
+ }
+ while(send(io->creply, &r) < 0)
;
- if(x == 0) /* caller backed out */
- continue;
- assert(x == io);
-
- io->ret = io->op(&io->arg);
- if(io->ret < 0)
- rerrstr(io->err, sizeof io->err);
- while(send(io->creply, &io) == -1)
- ;
- while(recv(io->creply, &x) == -1)
- ;
+ qunlock(io);
}
+
+ close(io->ctl);
+ chanfree(io->c);
+ chanfree(io->creply);
+ free(io);
}
Ioproc*
ioproc(void)
{
+ Channel *c;
Ioproc *io;
- io = mallocz(sizeof(*io), 1);
+ if((c = chancreate(sizeof(void*), 0)) == nil)
+ sysfatal("ioproc chancreate");
+ proccreate(xioproc, c, STACK);
+ while(recv(c, &io) < 0)
+ ;
if(io == nil)
- sysfatal("ioproc malloc: %r");
- io->c = chancreate(sizeof(void*), 0);
- io->creply = chancreate(sizeof(void*), 0);
- io->tid = proccreate(xioproc, io, STACK);
+ sysfatal("ioproc alloc");
return io;
}
@@ -69,9 +94,6 @@
if(io == nil)
return;
iointerrupt(io);
- while(send(io->c, 0) == -1)
+ while(sendp(io->c, nil) < 0)
;
- chanfree(io->c);
- chanfree(io->creply);
- free(io);
}
--- a/sys/src/libthread/mkfile
+++ b/sys/src/libthread/mkfile
@@ -21,6 +21,7 @@
ioreadn.$O\
iosleep.$O\
iowrite.$O\
+ ioflush.$O\
kill.$O\
lib.$O\
main.$O\
--- a/sys/src/libthread/threadimpl.h
+++ b/sys/src/libthread/threadimpl.h
@@ -25,6 +25,7 @@
typedef struct Thread Thread;
typedef struct Execargs Execargs;
typedef struct Proc Proc;
+typedef struct Iocall Iocall;
/* must match list in sched.c */
typedef enum
@@ -135,7 +136,8 @@
char threadint; /* tag for threadexitsall() */
};
-struct Pqueue { /* Proc queue */
+struct Pqueue /* Proc queue */
+{
Lock lock;
Proc *head;
Proc **tail;
@@ -143,14 +145,18 @@
struct Ioproc
{
- int tid;
- Channel *c, *creply;
- int inuse;
- long (*op)(va_list*);
- va_list arg;
- long ret;
- char err[ERRMAX];
- Ioproc *next;
+ QLock;
+ int intr;
+ int ctl;
+ Channel *c, *creply;
+};
+
+struct Iocall
+{
+ long (*op)(va_list*);
+ va_list arg;
+ long ret;
+ char err[ERRMAX];
};
void _freeproc(Proc*);