summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorwtc%netscape.com <devnull@localhost>2000-01-08 23:05:24 +0000
committerwtc%netscape.com <devnull@localhost>2000-01-08 23:05:24 +0000
commit9f4864cbf70f2839d272775ef6c8d64536cd3417 (patch)
tree911317a67ce9481401d3eafa276f587e79b41256
parent07c5def62e944e23762c2acd2730e467fc4c16d9 (diff)
downloadnspr-hg-9f4864cbf70f2839d272775ef6c8d64536cd3417.tar.gz
Backed out the NSPR 3.5.1 merge. Mozilla client cannot exit on shutdown.
Modified files: _win95.h, _winnt.h, primpl.h, prfdcach.c, prfile.c, ntio.c, w95io.c, ptio.c, ptthread.c
-rw-r--r--pr/include/md/_win95.h1
-rw-r--r--pr/include/md/_winnt.h1
-rw-r--r--pr/include/private/primpl.h35
-rw-r--r--pr/src/io/prfdcach.c4
-rw-r--r--pr/src/io/prfile.c35
-rw-r--r--pr/src/md/windows/ntio.c10
-rw-r--r--pr/src/md/windows/w95io.c9
-rw-r--r--pr/src/pthreads/ptio.c861
-rw-r--r--pr/src/pthreads/ptthread.c7
9 files changed, 776 insertions, 187 deletions
diff --git a/pr/include/md/_win95.h b/pr/include/md/_win95.h
index 6878b152..e82ddfe3 100644
--- a/pr/include/md/_win95.h
+++ b/pr/include/md/_win95.h
@@ -245,7 +245,6 @@ extern PRInt32 _MD_CloseSocket(PRInt32 osfd);
#define _MD_SOCKET _PR_MD_SOCKET
extern PRInt32 _MD_SocketAvailable(PRFileDesc *fd);
#define _MD_SOCKETAVAILABLE _MD_SocketAvailable
-#define _MD_PIPEAVAILABLE _PR_MD_PIPEAVAILABLE
#define _MD_CONNECT _PR_MD_CONNECT
extern PRInt32 _MD_Accept(PRFileDesc *fd, PRNetAddr *raddr, PRUint32 *rlen,
PRIntervalTime timeout);
diff --git a/pr/include/md/_winnt.h b/pr/include/md/_winnt.h
index 85ddd505..77db2ddb 100644
--- a/pr/include/md/_winnt.h
+++ b/pr/include/md/_winnt.h
@@ -247,7 +247,6 @@ extern int _PR_NTFiberSafeSelect(int, fd_set *, fd_set *, fd_set *,
const struct timeval *);
#define _MD_FSYNC _PR_MD_FSYNC
#define _MD_SOCKETAVAILABLE _PR_MD_SOCKETAVAILABLE
-#define _MD_PIPEAVAILABLE _PR_MD_PIPEAVAILABLE
#define _MD_SET_FD_INHERITABLE _PR_MD_SET_FD_INHERITABLE
#define _MD_INIT_ATOMIC()
diff --git a/pr/include/private/primpl.h b/pr/include/private/primpl.h
index 8b96c424..ad468d2c 100644
--- a/pr/include/private/primpl.h
+++ b/pr/include/private/primpl.h
@@ -186,6 +186,11 @@ struct _PT_Notified
typedef struct PTDebug
{
PRTime timeStarted;
+ PRUintn predictionsFoiled;
+ PRUintn pollingListMax;
+ PRUintn continuationsServed;
+ PRUintn recyclesNeeded;
+ PRUintn quiescentIO;
PRUintn locks_created, locks_destroyed;
PRUintn locks_acquired, locks_released;
PRUintn cvars_created, cvars_destroyed;
@@ -1416,7 +1421,9 @@ struct PRThread {
#if defined(_PR_PTHREADS)
pthread_t id; /* pthread identifier for the thread */
PRBool okToDelete; /* ok to delete the PRThread struct? */
+ PRCondVar *io_cv; /* a condition used to run i/o */
PRCondVar *waiting; /* where the thread is waiting | NULL */
+ PRIntn io_tq_index; /* the io-queue index for this thread */
void *sp; /* recorded sp for garbage collection */
PRThread *next, *prev; /* simple linked list of all threads */
PRUint32 suspend; /* used to store suspend and resume flags */
@@ -1542,8 +1549,36 @@ struct PRFilePrivate {
PRFileDesc *next;
PRIntn lockCount;
_MDFileDesc md;
+#ifdef _PR_PTHREADS
+ PRIntn eventMask[1]; /* An array of _pt_tq_count bitmasks.
+ * eventMask[i] is only accessed by
+ * the i-th i/o continuation thread.
+ * A 0 in a bitmask means the event
+ * should be igored in the revents
+ * bitmask returned by poll.
+ *
+ * poll's revents bitmask is a short,
+ * but we need to declare eventMask
+ * as an array of PRIntn's so that
+ * each bitmask can be updated
+ * individually without disturbing
+ * adjacent memory. Only the lower
+ * 16 bits of each PRIntn are used. */
+#endif
+/* IMPORTANT: eventMask MUST BE THE LAST FIELD OF THIS STRUCTURE */
};
+/*
+ * The actual size of the PRFilePrivate structure,
+ * including the eventMask array at the end
+ */
+#ifdef _PR_PTHREADS
+extern PRIntn _pt_tq_count;
+#define PRFILEPRIVATE_SIZE (sizeof(PRFilePrivate) + (_pt_tq_count-1) * sizeof(PRIntn))
+#else
+#define PRFILEPRIVATE_SIZE sizeof(PRFilePrivate)
+#endif
+
struct PRDir {
PRDirEntry d;
_MDDir md;
diff --git a/pr/src/io/prfdcach.c b/pr/src/io/prfdcach.c
index 00947d31..6ad2e84c 100644
--- a/pr/src/io/prfdcach.c
+++ b/pr/src/io/prfdcach.c
@@ -115,14 +115,14 @@ finished:
fd->dtor = NULL;
fd->lower = fd->higher = NULL;
fd->identity = PR_NSPR_IO_LAYER;
- memset(fd->secret, 0, sizeof(PRFilePrivate));
+ memset(fd->secret, 0, PRFILEPRIVATE_SIZE);
return fd;
allocate:
fd = PR_NEW(PRFileDesc);
if (NULL != fd)
{
- fd->secret = PR_NEW(PRFilePrivate);
+ fd->secret = (PRFilePrivate *) PR_MALLOC(PRFILEPRIVATE_SIZE);
if (NULL == fd->secret) PR_DELETE(fd);
}
if (NULL != fd) goto finished;
diff --git a/pr/src/io/prfile.c b/pr/src/io/prfile.c
index 31540e6a..ab16e27a 100644
--- a/pr/src/io/prfile.c
+++ b/pr/src/io/prfile.c
@@ -155,7 +155,7 @@ static PRInt64 PR_CALLBACK FileAvailable64(PRFileDesc *fd)
return result;
}
-#if defined(XP_UNIX) || defined(WIN32)
+#ifndef WIN32
static PRInt32 PR_CALLBACK PipeAvailable(PRFileDesc *fd)
{
PRInt32 rv;
@@ -169,29 +169,8 @@ static PRInt64 PR_CALLBACK PipeAvailable64(PRFileDesc *fd)
LL_I2L(rv, _PR_MD_PIPEAVAILABLE(fd));
return rv;
}
-#else
-static PRInt32 PR_CALLBACK PipeAvailable(PRFileDesc *fd)
-{
- return -1;
-}
-
-static PRInt64 PR_CALLBACK PipeAvailable64(PRFileDesc *fd)
-{
- PRInt64 rv;
- LL_I2L(rv, -1);
- return rv;
-}
#endif
-static PRStatus PR_CALLBACK PipeSync(PRFileDesc *fd)
-{
-#if defined(XP_MAC)
-#pragma unused (fd)
-#endif
-
- return PR_SUCCESS;
-}
-
static PRStatus PR_CALLBACK FileInfo(PRFileDesc *fd, PRFileInfo *info)
{
PRInt32 rv;
@@ -304,9 +283,14 @@ static PRIOMethods _pr_pipeMethods = {
FileClose,
FileRead,
FileWrite,
+#ifdef WIN32
+ FileAvailable,
+ FileAvailable64,
+#else
PipeAvailable,
PipeAvailable64,
- PipeSync,
+#endif
+ FileSync,
(PRSeekFN)_PR_InvalidInt,
(PRSeek64FN)_PR_InvalidInt64,
(PRFileInfoFN)_PR_InvalidStatus,
@@ -338,11 +322,6 @@ static PRIOMethods _pr_pipeMethods = {
(PRReservedFN)_PR_InvalidInt
};
-PR_IMPLEMENT(const PRIOMethods*) PR_GetPipeMethods(void)
-{
- return &_pr_pipeMethods;
-}
-
PR_IMPLEMENT(PRFileDesc*) PR_Open(const char *name, PRIntn flags, PRIntn mode)
{
PRInt32 osfd;
diff --git a/pr/src/md/windows/ntio.c b/pr/src/md/windows/ntio.c
index 1b60a380..201bee2f 100644
--- a/pr/src/md/windows/ntio.c
+++ b/pr/src/md/windows/ntio.c
@@ -2475,16 +2475,6 @@ _PR_MD_SOCKETAVAILABLE(PRFileDesc *fd)
return result;
}
-PRInt32
-_PR_MD_PIPEAVAILABLE(PRFileDesc *fd)
-{
- if (NULL == fd)
- PR_SetError(PR_BAD_DESCRIPTOR_ERROR, 0);
- else
- PR_SetError(PR_NOT_IMPLEMENTED_ERROR, 0);
- return -1;
-}
-
PROffset32
_PR_MD_LSEEK(PRFileDesc *fd, PROffset32 offset, int whence)
{
diff --git a/pr/src/md/windows/w95io.c b/pr/src/md/windows/w95io.c
index 6e969a98..c2553fc7 100644
--- a/pr/src/md/windows/w95io.c
+++ b/pr/src/md/windows/w95io.c
@@ -923,12 +923,3 @@ _PR_MD_UNLOCKFILE(PRInt32 f)
}
} /* end _PR_MD_UNLOCKFILE() */
-PRInt32
-_PR_MD_PIPEAVAILABLE(PRFileDesc *fd)
-{
- if (NULL == fd)
- PR_SetError(PR_BAD_DESCRIPTOR_ERROR, 0);
- else
- PR_SetError(PR_NOT_IMPLEMENTED_ERROR, 0);
- return -1;
-}
diff --git a/pr/src/pthreads/ptio.c b/pr/src/pthreads/ptio.c
index f1e6a255..accca747 100644
--- a/pr/src/pthreads/ptio.c
+++ b/pr/src/pthreads/ptio.c
@@ -190,7 +190,7 @@ static PRBool IsValidNetAddrLen(const PRNetAddr *addr, PRInt32 addr_len)
* The polling interval defines the maximum amount of time that a thread
* might hang up before an interrupt is noticed.
*/
-#define PT_DEFAULT_POLL_MSEC 5000
+#define PT_DEFAULT_POLL_MSEC 100
/*
* pt_SockLen is the type for the length of a socket address
@@ -217,11 +217,17 @@ typedef PRBool (*ContinuationFn)(pt_Continuation *op, PRInt16 revents);
typedef enum pr_ContuationStatus
{
pt_continuation_pending,
+ pt_continuation_recycle,
+ pt_continuation_abort,
pt_continuation_done
} pr_ContuationStatus;
struct pt_Continuation
{
+ /* These objects are linked in ascending timeout order */
+ pt_Continuation *next, *prev; /* self linked list of these things */
+
+ PRFileDesc *fd;
/* The building of the continuation operation */
ContinuationFn function; /* what function to continue */
union { PRIntn osfd; } arg1; /* #1 - the op's fd */
@@ -252,6 +258,7 @@ struct pt_Continuation
#endif /* HPUX11 */
PRIntervalTime timeout; /* client (relative) timeout */
+ PRIntervalTime absolute; /* internal (absolute) timeout */
PRInt16 event; /* flags for poll()'s events */
@@ -264,8 +271,27 @@ struct pt_Continuation
PRIntn syserrno; /* in case it failed, why (errno) */
pr_ContuationStatus status; /* the status of the operation */
+ PRCondVar *complete; /* to notify the initiating thread */
+ PRIntn io_tq_index; /* io-queue index */
};
+static struct pt_TimedQueue
+{
+ PRLock *ml; /* a little protection */
+ PRThread *thread; /* internal thread's identification */
+ PRUintn op_count; /* number of operations in the list */
+ pt_Continuation *head, *tail; /* head/tail of list of operations */
+
+ pt_Continuation *op; /* timed operation furthest in future */
+ struct pollfd *pollingList; /* list built for polling */
+ PRIntn pollingSlotsAllocated; /* # entries available in list */
+ pt_Continuation **pollingOps; /* list paralleling polling list */
+} *pt_tqp; /* an array */
+
+static PRIntn _pt_num_cpus;
+PRIntn _pt_tq_count; /* size of the pt_tqp array */
+static PRInt32 _pt_tq_index; /* atomically incremented */
+
#if defined(DEBUG)
PTDebug pt_debug; /* this is shared between several modules */
@@ -290,6 +316,16 @@ PR_IMPLEMENT(void) PT_FPrintStats(PRFileDesc *debug_out, const char *msg)
PR_fprintf(
debug_out, "\tstarted: %s[%lld]\n", buffer, elapsed);
PR_fprintf(
+ debug_out, "\tmissed predictions: %u\n", stats.predictionsFoiled);
+ PR_fprintf(
+ debug_out, "\tpollingList max: %u\n", stats.pollingListMax);
+ PR_fprintf(
+ debug_out, "\tcontinuations served: %u\n", stats.continuationsServed);
+ PR_fprintf(
+ debug_out, "\trecycles needed: %u\n", stats.recyclesNeeded);
+ PR_fprintf(
+ debug_out, "\tquiescent IO: %u\n", stats.quiescentIO);
+ PR_fprintf(
debug_out, "\tlocks [created: %u, destroyed: %u]\n",
stats.locks_created, stats.locks_destroyed);
PR_fprintf(
@@ -305,135 +341,547 @@ PR_IMPLEMENT(void) PT_FPrintStats(PRFileDesc *debug_out, const char *msg)
#endif /* DEBUG */
-static void pt_poll_now(pt_Continuation *op)
+/*
+ * The following two functions, pt_InsertTimedInternal and
+ * pt_FinishTimedInternal, are always called with the tqp->ml
+ * lock held. The "internal" in the functions' names come from
+ * the Mesa programming language. Internal functions are always
+ * called from inside a monitor.
+ */
+
+static void pt_InsertTimedInternal(pt_Continuation *op)
{
- PRInt32 msecs;
- PRIntervalTime epoch, now, elapsed, remaining;
- PRThread *self = PR_GetCurrentThread();
+ pt_Continuation *t_op = NULL;
+ PRIntervalTime now = PR_IntervalNow();
+ struct pt_TimedQueue *tqp = &pt_tqp[op->io_tq_index];
+
+#if defined(DEBUG)
+ {
+ PRIntn count;
+ pt_Continuation *tmp;
+ PRThread *self = PR_GetCurrentThread();
+
+ PR_ASSERT(tqp == &pt_tqp[self->io_tq_index]);
+ PR_ASSERT((tqp->head == NULL) == (tqp->tail == NULL));
+ PR_ASSERT((tqp->head == NULL) == (tqp->op_count == 0));
+ for (tmp = tqp->head, count = 0; tmp != NULL; tmp = tmp->next) count += 1;
+ PR_ASSERT(count == tqp->op_count);
+ for (tmp = tqp->tail, count = 0; tmp != NULL; tmp = tmp->prev) count += 1;
+ PR_ASSERT(count == tqp->op_count);
+ for (tmp = tqp->head; tmp != NULL; tmp = tmp->next)
+ PR_ASSERT(tmp->io_tq_index == op->io_tq_index);
+ }
+#endif /* defined(DEBUG) */
+
+ /*
+ * If this element operation isn't timed, it gets queued at the
+ * end of the list (just after tqp->tail) and we're
+ * finishd early.
+ */
+ if (PR_INTERVAL_NO_TIMEOUT == op->timeout)
+ {
+ t_op = tqp->tail; /* put it at the end */
+ goto done;
+ }
+
+ /*
+ * The portion of this routine deals with timed ops.
+ */
+ op->absolute = now + op->timeout; /* absolute ticks */
+ if (NULL == tqp->op) tqp->op = op;
+ else
+ {
+ /*
+ * To find where in the list to put the new operation, based
+ * on the absolute time the operation in question will expire.
+ *
+ * The new operation ('op') will expire at now() + op->timeout.
+ *
+ * This should be easy!
+ */
+
+ for (t_op = tqp->op; NULL != t_op; t_op = t_op->prev)
+ {
+ /*
+ * If 'op' expires later than t_op, then insert 'op' just
+ * ahead of t_op. Otherwise, compute when operation[n-1]
+ * expires and try again.
+ *
+ * The actual difference between the expiriation of 'op'
+ * and the current operation what becomes the new operaton's
+ * timeout interval. That interval is also subtracted from
+ * the interval of the operation immediately following where
+ * we stick 'op' (unless the next one isn't timed). The new
+ * timeout assigned to 'op' takes into account the values of
+ * now() and when the previous intervals were computed.
+ */
+ if ((PRInt32)(op->absolute - t_op->absolute) >= 0)
+ {
+ if (t_op == tqp->op) tqp->op = op;
+ break;
+ }
+ }
+ }
+
+done:
+
+ /*
+ * Insert 'op' into the queue just after t_op or if t_op is null,
+ * at the head of the list.
+ *
+ * We need to set up the 'next' and 'prev' pointers of 'op'
+ * correctly before inserting 'op' into the queue. Also, we insert
+ * 'op' by updating tqp->head or op->prev->next first, and then
+ * updating op->next->prev. We want to make sure that the 'next'
+ * pointers are linked up correctly at all times so that we can
+ * traverse the queue by starting with tqp->head and following
+ * the 'next' pointers, without having to acquire the tqp->ml lock.
+ * (we do that in pt_ContinuationThreadInternal). We traverse the 'prev'
+ * pointers only in this function, which is called with the lock held.
+ *
+ * Similar care is taken in pt_FinishTimedInternal where we remove
+ * an op from the queue.
+ */
+ if (NULL == t_op)
+ {
+ op->prev = NULL;
+ op->next = tqp->head;
+ tqp->head = op;
+ if (NULL == tqp->tail) tqp->tail = op;
+ else op->next->prev = op;
+ }
+ else
+ {
+ op->prev = t_op;
+ op->next = t_op->next;
+ if (NULL != op->prev)
+ op->prev->next = op;
+ if (NULL != op->next)
+ op->next->prev = op;
+ if (t_op == tqp->tail)
+ tqp->tail = op;
+ }
+
+ tqp->op_count += 1;
+
+#if defined(DEBUG)
+ {
+ PRIntn count;
+ pt_Continuation *tmp;
+ PR_ASSERT(tqp->head != NULL);
+ PR_ASSERT(tqp->tail != NULL);
+ PR_ASSERT(tqp->op_count != 0);
+ PR_ASSERT(tqp->head->prev == NULL);
+ PR_ASSERT(tqp->tail->next == NULL);
+ if (tqp->op_count > 1)
+ {
+ PR_ASSERT(tqp->head->next != NULL);
+ PR_ASSERT(tqp->tail->prev != NULL);
+ }
+ else
+ {
+ PR_ASSERT(tqp->head->next == NULL);
+ PR_ASSERT(tqp->tail->prev == NULL);
+ }
+ for (tmp = tqp->head, count = 0; tmp != NULL; tmp = tmp->next) count += 1;
+ PR_ASSERT(count == tqp->op_count);
+ for (tmp = tqp->tail, count = 0; tmp != NULL; tmp = tmp->prev) count += 1;
+ PR_ASSERT(count == tqp->op_count);
+ }
+#endif /* defined(DEBUG) */
+
+} /* pt_InsertTimedInternal */
+
+/*
+ * function: pt_FinishTimedInternal
+ *
+ * Takes the finished operation out of the timed queue. It
+ * notifies the initiating thread that the opertions is
+ * complete and returns to the caller the value of the next
+ * operation in the list (or NULL).
+ */
+static pt_Continuation *pt_FinishTimedInternal(pt_Continuation *op)
+{
+ pt_Continuation *next;
+ struct pt_TimedQueue *tqp = &pt_tqp[op->io_tq_index];
+
+#if defined(DEBUG)
+ {
+ PRIntn count;
+ pt_Continuation *tmp;
+ PR_ASSERT(tqp->head != NULL);
+ PR_ASSERT(tqp->tail != NULL);
+ PR_ASSERT(tqp->op_count != 0);
+ PR_ASSERT(tqp->head->prev == NULL);
+ PR_ASSERT(tqp->tail->next == NULL);
+ if (tqp->op_count > 1)
+ {
+ PR_ASSERT(tqp->head->next != NULL);
+ PR_ASSERT(tqp->tail->prev != NULL);
+ }
+ else
+ {
+ PR_ASSERT(tqp->head->next == NULL);
+ PR_ASSERT(tqp->tail->prev == NULL);
+ }
+ for (tmp = tqp->head, count = 0; tmp != NULL; tmp = tmp->next) count += 1;
+ PR_ASSERT(count == tqp->op_count);
+ for (tmp = tqp->tail, count = 0; tmp != NULL; tmp = tmp->prev) count += 1;
+ PR_ASSERT(count == tqp->op_count);
+ }
+#endif /* defined(DEBUG) */
+
+ /* remove this one from the list */
+ if (NULL == op->prev) tqp->head = op->next;
+ else op->prev->next = op->next;
+ if (NULL == op->next) tqp->tail = op->prev;
+ else op->next->prev = op->prev;
+
+ /* did we happen to hit the timed op? */
+ if (op == tqp->op) tqp->op = op->prev;
+
+ next = op->next;
+ op->next = op->prev = NULL;
+ op->status = pt_continuation_done;
+
+ tqp->op_count -= 1;
+
+#if defined(DEBUG)
+ pt_debug.continuationsServed += 1;
+#endif
+ PR_NotifyCondVar(op->complete);
+
+#if defined(DEBUG)
+ {
+ PRIntn count;
+ pt_Continuation *tmp;
+ PR_ASSERT((tqp->head == NULL) == (tqp->tail == NULL));
+ PR_ASSERT((tqp->head == NULL) == (tqp->op_count == 0));
+ for (tmp = tqp->head, count = 0; tmp != NULL; tmp = tmp->next) count += 1;
+ PR_ASSERT(count == tqp->op_count);
+ for (tmp = tqp->tail, count = 0; tmp != NULL; tmp = tmp->prev) count += 1;
+ PR_ASSERT(count == tqp->op_count);
+ }
+#endif /* defined(DEBUG) */
+
+ return next;
+} /* pt_FinishTimedInternal */
+
+static void pt_ContinuationThreadInternal(pt_Continuation *my_op)
+{
+ /* initialization */
+ PRInt32 msecs, mx_poll_ticks;
+ PRThreadPriority priority; /* used to save caller's prio */
+ PRIntn pollingListUsed; /* # entries used in the list */
+ PRIntn pollingListNeeded; /* # entries needed this time */
+ PRIntn io_tq_index = my_op->io_tq_index;
+ struct pt_TimedQueue *tqp = &pt_tqp[my_op->io_tq_index];
+ struct pollfd *pollingList = tqp->pollingList;
+ PRIntn pollingSlotsAllocated = tqp->pollingSlotsAllocated;
+ pt_Continuation **pollingOps = tqp->pollingOps;
- PR_ASSERT(PR_INTERVAL_NO_WAIT != op->timeout);
-
- switch (op->timeout) {
- case PR_INTERVAL_NO_TIMEOUT:
- msecs = PT_DEFAULT_POLL_MSEC;
- do
- {
- PRIntn rv;
- struct pollfd tmp_pfd;
-
- tmp_pfd.revents = 0;
- tmp_pfd.fd = op->arg1.osfd;
- tmp_pfd.events = op->event;
-
- rv = poll(&tmp_pfd, 1, msecs);
-
- if (self->state & PT_THREAD_ABORTED)
- {
- self->state &= ~PT_THREAD_ABORTED;
- op->result.code = -1;
- op->syserrno = EINTR;
- op->status = pt_continuation_done;
- return;
- }
-
- if ((-1 == rv) && ((errno == EINTR) || (errno == EAGAIN)))
- continue; /* go around the loop again */
-
- if (rv > 0)
- {
- PRIntn fd = tmp_pfd.fd;
- PRInt16 events = tmp_pfd.events;
- PRInt16 revents = tmp_pfd.revents;
-
- if ((revents & POLLNVAL) /* busted in all cases */
- || ((events & POLLOUT) && (revents & POLLHUP)))
- /* write op & hup */
- {
- op->result.code = -1;
- if (POLLNVAL & revents) op->syserrno = EBADF;
- else if (POLLHUP & revents) op->syserrno = EPIPE;
- op->status = pt_continuation_done;
- } else {
- if (op->function(op, revents))
- op->status = pt_continuation_done;
- }
- } else if (rv == -1) {
- op->result.code = -1;
- op->syserrno = errno;
- op->status = pt_continuation_done;
- }
- /* else, poll timed out */
- } while (pt_continuation_done != op->status);
- break;
- default:
- now = epoch = PR_IntervalNow();
- remaining = op->timeout;
- do
- {
- PRIntn rv;
- struct pollfd tmp_pfd;
-
- tmp_pfd.revents = 0;
- tmp_pfd.fd = op->arg1.osfd;
- tmp_pfd.events = op->event;
-
- msecs = (PRInt32)PR_IntervalToMilliseconds(remaining);
- if (msecs > PT_DEFAULT_POLL_MSEC)
- msecs = PT_DEFAULT_POLL_MSEC;
- rv = poll(&tmp_pfd, 1, msecs);
-
- if (self->state & PT_THREAD_ABORTED)
- {
- self->state &= ~PT_THREAD_ABORTED;
- op->result.code = -1;
- op->syserrno = EINTR;
- op->status = pt_continuation_done;
- return;
- }
-
- if (rv > 0)
- {
- PRIntn fd = tmp_pfd.fd;
- PRInt16 events = tmp_pfd.events;
- PRInt16 revents = tmp_pfd.revents;
-
- if ((revents & POLLNVAL) /* busted in all cases */
- || ((events & POLLOUT) && (revents & POLLHUP)))
- /* write op & hup */
- {
- op->result.code = -1;
- if (POLLNVAL & revents) op->syserrno = EBADF;
- else if (POLLHUP & revents) op->syserrno = EPIPE;
- op->status = pt_continuation_done;
- } else {
- if (op->function(op, revents))
- {
- op->status = pt_continuation_done;
- }
- }
- } else if ((rv == 0) ||
- ((errno == EINTR) || (errno == EAGAIN))) {
- if (rv == 0) /* poll timed out */
- now += PR_MillisecondsToInterval(msecs);
- else
- now = PR_IntervalNow();
- elapsed = (PRIntervalTime) (now - epoch);
- if (elapsed >= op->timeout) {
- op->result.code = -1;
- op->syserrno = ETIMEDOUT;
- op->status = pt_continuation_done;
- } else
- remaining = op->timeout - elapsed;
- } else {
- op->result.code = -1;
- op->syserrno = errno;
- op->status = pt_continuation_done;
- }
- } while (pt_continuation_done != op->status);
- break;
- }
-
-} /* pt_poll_now */
+ PR_Unlock(tqp->ml); /* don't need that silly lock for a bit */
+
+ priority = PR_GetThreadPriority(tqp->thread);
+ PR_SetThreadPriority(tqp->thread, PR_PRIORITY_HIGH);
+
+ mx_poll_ticks = (PRInt32)PR_MillisecondsToInterval(PT_DEFAULT_POLL_MSEC);
+
+ /* do some real work */
+ while (PR_TRUE)
+ {
+ PRIntn rv;
+ PRInt32 timeout;
+ PRIntn pollIndex;
+ PRIntervalTime now;
+ pt_Continuation *op, *next_op;
+
+ PR_ASSERT(NULL != tqp->head);
+
+ pollingListNeeded = tqp->op_count;
+
+ /*
+ * We are not holding the tqp->ml lock now, so more items may
+ * get added to pt_tq during this window of time. We hope
+ * that 10 more spaces in the polling list should be enough.
+ *
+ * The space allocated is for both a vector that parallels the
+ * polling list, providing pointers directly into the operation's
+ * table and the polling list itself. There is a guard element
+ * between the two structures.
+ */
+ pollingListNeeded += 10;
+ if (pollingListNeeded > pollingSlotsAllocated)
+ {
+ if (NULL != pollingOps) PR_Free(pollingOps);
+ pollingOps = (pt_Continuation**)PR_Malloc(
+ sizeof(pt_Continuation**) + pollingListNeeded *
+ (sizeof(struct pollfd) + sizeof(pt_Continuation*)));
+ PR_ASSERT(NULL != pollingOps);
+ tqp->pollingOps = pollingOps;
+ pollingSlotsAllocated = pollingListNeeded;
+ tqp->pollingSlotsAllocated = pollingSlotsAllocated;
+ pollingOps[pollingSlotsAllocated] = (pt_Continuation*)-1;
+ pollingList = (struct pollfd*)(&pollingOps[pollingSlotsAllocated + 1]);
+ tqp->pollingList = pollingList;
+
+ }
+
+#if defined(DEBUG)
+ if (pollingListNeeded > pt_debug.pollingListMax)
+ pt_debug.pollingListMax = pollingListNeeded;
+#endif
+
+ /*
+ ** This is interrupt processing. If this thread was interrupted,
+ ** the thread state will have the PT_THREAD_ABORTED bit set. This
+ ** overrides good completions as well as timeouts.
+ **
+ ** BTW, it does no good to hold the lock here. This lock doesn't
+ ** protect the thread structure in any way. Testing the bit and
+ ** (perhaps) resetting it are safe 'cause it's the only modifiable
+ ** bit in that word.
+ */
+ if (_PT_THREAD_INTERRUPTED(tqp->thread))
+ {
+ my_op->status = pt_continuation_abort;
+ tqp->thread->state &= ~PT_THREAD_ABORTED;
+ }
+
+
+ /*
+ * Build up a polling list.
+ * This list is sorted on time. Operations that have been
+ * interrupted are completed and not included in the list.
+ * There is an assertion that the operation is in progress.
+ */
+ pollingListUsed = 0;
+ PR_Lock(tqp->ml);
+
+ for (op = tqp->head; NULL != op;)
+ {
+ if (pt_continuation_abort == op->status)
+ {
+ op->result.code = -1;
+ op->syserrno = EINTR;
+ next_op = pt_FinishTimedInternal(op);
+ if (op == my_op) goto recycle;
+ else op = next_op;
+ PR_ASSERT(NULL != tqp->head);
+ }
+ else
+ {
+ op->status = pt_continuation_pending;
+ if (pollingListUsed >= pollingSlotsAllocated)
+ {
+#if defined(DEBUG)
+ pt_debug.predictionsFoiled += 1;
+#endif
+ break;
+ }
+ PR_ASSERT((pt_Continuation*)-1 == pollingOps[pollingSlotsAllocated]);
+ /*
+ * eventMask bitmasks are declared as PRIntn so that
+ * each bitmask can be updated individually without
+ * disturbing adjacent memory, but only the lower 16
+ * bits of a bitmask are used.
+ */
+ op->fd->secret->eventMask[io_tq_index] = 0xffff;
+ pollingOps[pollingListUsed] = op;
+ pollingList[pollingListUsed].revents = 0;
+ pollingList[pollingListUsed].fd = op->arg1.osfd;
+ pollingList[pollingListUsed].events = op->event;
+ pollingListUsed += 1;
+ op = op->next;
+ }
+ }
+
+ /*
+ * We don't want to wait forever on this poll. So keep
+ * the interval down. The operations, if they are timed,
+ * still have to timeout, while those that are not timed
+ * should persist forever. But they may be aborted. That's
+ * what this anxiety is all about.
+ */
+ if (PR_INTERVAL_NO_TIMEOUT == tqp->head->timeout)
+ msecs = PT_DEFAULT_POLL_MSEC;
+ else
+ {
+ timeout = tqp->head->absolute - PR_IntervalNow();
+ if (timeout <= 0) msecs = 0; /* already timed out */
+ else if (timeout >= mx_poll_ticks) msecs = PT_DEFAULT_POLL_MSEC;
+ else msecs = (PRInt32)PR_IntervalToMilliseconds(timeout);
+ }
+
+ PR_Unlock(tqp->ml);
+
+ /*
+ * If 'op' isn't NULL at this point, then we didn't get to
+ * the end of the list. That means that more items got added
+ * to the list than we anticipated. So, forget this iteration,
+ * go around the horn again.
+ *
+ * One would hope this doesn't happen all that often.
+ */
+ if (NULL != op) continue; /* make it rethink things */
+
+ PR_ASSERT((pt_Continuation*)-1 == pollingOps[pollingSlotsAllocated]);
+
+ rv = poll(pollingList, pollingListUsed, msecs);
+
+ if ((-1 == rv) && ((errno == EINTR) || (errno == EAGAIN)))
+ continue; /* go around the loop again */
+
+ if (rv > 0)
+ {
+ /*
+ * poll() says that something in our list is ready for some more
+ * action. Find it, load up the operation and see what happens.
+ */
+
+ /*
+ * This may work out okay. The rule is that only this thread,
+ * the continuation thread, can remove elements from the list.
+ * Therefore, the list is at worst, longer than when we built
+ * the polling list.
+ */
+
+ for (pollIndex = 0; pollIndex < pollingListUsed; ++pollIndex)
+ {
+ PRInt16 events = pollingList[pollIndex].events;
+ PRInt16 revents = pollingList[pollIndex].revents;
+
+ op = pollingOps[pollIndex]; /* this is the operation */
+
+ /* (ref: Bug #153459)
+ ** In case of POLLERR we let the operation retry in hope
+ ** of getting a more definitive OS error.
+ */
+ if ((revents & POLLNVAL) /* busted in all cases */
+ || ((events & POLLOUT) && (revents & POLLHUP))) /* write op & hup */
+ {
+ PR_Lock(tqp->ml);
+ op->result.code = -1;
+ if (POLLNVAL & revents) op->syserrno = EBADF;
+ else if (POLLHUP & revents) op->syserrno = EPIPE;
+ (void)pt_FinishTimedInternal(op);
+ if (op == my_op) goto recycle;
+ PR_Unlock(tqp->ml);
+ }
+ else if ((0 != (revents & op->fd->secret->eventMask[io_tq_index]))
+ && (pt_continuation_pending == op->status))
+ {
+ /*
+ * Only good?(?) revents left. Operations not pending
+ * will be pruned next time we build a list. This operation
+ * will be pruned if the continuation indicates it is
+ * finished.
+ */
+
+ if (op->function(op, revents))
+ {
+ PR_Lock(tqp->ml);
+ (void)pt_FinishTimedInternal(op);
+ if (op == my_op) goto recycle;
+ PR_Unlock(tqp->ml);
+ }
+ else
+ {
+ /*
+ * If the continuation function returns
+ * PR_FALSE, it means available data have
+ * been read, output buffer space has been
+ * filled, or pending connections have been
+ * accepted by prior calls. If the
+ * continuation function is immediately
+ * invoked again, it will most likely
+ * return PR_FALSE. So turn off these
+ * events in the event mask for this fd so
+ * that if this fd is encountered again in
+ * the polling list with these events on,
+ * we won't invoke the continuation
+ * function again.
+ */
+ op->fd->secret->eventMask[io_tq_index] &= ~revents;
+ }
+ }
+ }
+ }
+
+ /*
+ * This is timeout processing. It is done after checking
+ * for good completions. Those that just made it under the
+ * wire are lucky, but none the less, valid.
+ */
+ now = PR_IntervalNow();
+ PR_Lock(tqp->ml);
+ while ((NULL != tqp->head)
+ && (PR_INTERVAL_NO_TIMEOUT != tqp->head->timeout))
+ {
+ op = tqp->head; /* get a copy of this before finishing it */
+ if ((PRInt32)(op->absolute - now) > 0) break;
+ /*
+ * The head element of the timed queue has timed out. Record
+ * the reason for completion and take it out of the list.
+ */
+ op->result.code = -1;
+ op->syserrno = ETIMEDOUT;
+ (void)pt_FinishTimedInternal(op);
+
+ /*
+ * If it's 'my_op' then we have to switch threads. Exit w/o
+ * finishing the scan. The scan will be completed when another
+ * thread calls in acting as the continuation thread.
+ */
+ if (op == my_op) goto recycle; /* exit w/o unlocking */
+ }
+ PR_Unlock(tqp->ml);
+ }
+
+ PR_NOT_REACHED("This is a while(true) loop /w no breaks");
+
+recycle:
+ /*
+ ** Recycling the continuation thread.
+ **
+ ** The thread we were using for I/O continuations just completed
+ ** the I/O it submitted. It has to return to it's caller. We need
+ ** another thread to act in the continuation role. We can do that
+ ** by taking any operation off the timed queue, setting its state
+ ** to 'recycle' and notifying the condition.
+ **
+ ** Selecting a likely thread seems like magic. I'm going to try
+ ** using one that has the longest (or no) timeout, tqp->tail.
+ ** If that slot's empty, then there's no outstanding I/O and we
+ ** don't need a thread at all.
+ **
+ ** BTW, we're locked right now, and we'll be returning with the
+ ** the lock held as well. Seems odd, doesn't it?
+ */
+
+ /* $$$ should this be called with the lock held? $$$ */
+ PR_SetThreadPriority(tqp->thread, priority); /* reset back to caller's */
+
+ PR_ASSERT((NULL == tqp->head) == (0 == tqp->op_count));
+ PR_ASSERT((NULL == tqp->head) == (NULL == tqp->tail));
+ PR_ASSERT(pt_continuation_done == my_op->status);
+
+ if (NULL != tqp->tail)
+ {
+ if (tqp->tail->status != pt_continuation_abort)
+ {
+ tqp->tail->status = pt_continuation_recycle;
+ }
+ PR_NotifyCondVar(tqp->tail->complete);
+#if defined(DEBUG)
+ pt_debug.recyclesNeeded += 1;
+#endif
+ }
+#if defined(DEBUG)
+ else pt_debug.quiescentIO += 1;
+#endif
+
+} /* pt_ContinuationThreadInternal */
static PRIntn pt_Continue(pt_Continuation *op)
{
@@ -441,13 +889,99 @@ static PRIntn pt_Continue(pt_Continuation *op)
PRThread *self = PR_GetCurrentThread();
struct pt_TimedQueue *tqp;
+ /* lazy assignment of the thread's ioq */
+ if (-1 == self->io_tq_index)
+ {
+ self->io_tq_index = (PR_AtomicIncrement(&_pt_tq_index)-1) % _pt_tq_count;
+ }
+
+ PR_ASSERT(self->io_tq_index >= 0);
+ tqp = &pt_tqp[self->io_tq_index];
+
+ /* lazy allocation of the thread's cv */
+ if (NULL == self->io_cv)
+ self->io_cv = PR_NewCondVar(tqp->ml);
+ /* Finish filling in the blank slots */
+ op->complete = self->io_cv;
op->status = pt_continuation_pending; /* set default value */
- /*
- * let each thread call poll directly
- */
- pt_poll_now(op);
- PR_ASSERT(pt_continuation_done == op->status);
- return op->result.code;
+ op->io_tq_index = self->io_tq_index;
+ PR_Lock(tqp->ml); /* we provide the locking */
+
+ pt_InsertTimedInternal(op); /* insert in the structure */
+
+ /*
+ ** At this point, we try to decide whether there is a continuation
+ ** thread, or whether we should assign this one to serve in that role.
+ */
+ do
+ {
+ if (NULL == tqp->thread)
+ {
+ /*
+ ** We're the one. Call the processing function with the lock
+ ** held. It will return with it held as well, though there
+ ** will certainly be times within the function when it gets
+ ** released.
+ */
+ tqp->thread = self; /* I'm taking control */
+ pt_ContinuationThreadInternal(op); /* go slash and burn */
+ PR_ASSERT(pt_continuation_done == op->status);
+ tqp->thread = NULL; /* I'm abdicating my rule */
+ }
+ else
+ {
+ rv = PR_WaitCondVar(op->complete, PR_INTERVAL_NO_TIMEOUT);
+ /*
+ * If we get interrupted, we set state the continuation thread will
+ * see and allow it to finish the I/O operation w/ error. That way
+ * the rule that only the continuation thread is removing elements
+ * from the list is still valid.
+ *
+ * Don't call interrupt on the continuation thread. That'll just
+ * irritate him. He's cycling around at least every mx_poll_ticks
+ * anyhow and should notice the request in there. When he does
+ * notice, this operation will be finished and the op's status
+ * marked as pt_continuation_done.
+ */
+ if ((PR_FAILURE == rv) /* the wait was interrupted */
+ && (PR_PENDING_INTERRUPT_ERROR == PR_GetError()))
+ {
+ if (pt_continuation_done == op->status)
+ {
+ /*
+ * The op is done and has been removed
+ * from the timed queue. We must not
+ * change op->status, otherwise this
+ * thread will go around the loop again.
+ *
+ * It's harsh to mark the op failed with
+ * interrupt error when the io is already
+ * done, but we should indicate the fact
+ * that the thread was interrupted. So
+ * we set the aborted flag to abort the
+ * thread's next blocking call. Is this
+ * the right thing to do?
+ */
+ self->state |= PT_THREAD_ABORTED;
+ }
+ else
+ {
+ /* go around the loop again */
+ op->status = pt_continuation_abort;
+ }
+ }
+ /*
+ * If we're to recycle, continue within this loop. This will
+ * cause this thread to become the continuation thread.
+ */
+
+ }
+ } while (pt_continuation_done != op->status);
+
+
+ PR_Unlock(tqp->ml); /* we provided the locking */
+
+ return op->result.code; /* and the primary answer */
} /* pt_Continue */
/*****************************************************************************/
@@ -756,8 +1290,52 @@ static PRBool pt_hpux_sendfile_cont(pt_Continuation *op, PRInt16 revents)
}
#endif /* HPUX11 */
+#define _MD_CPUS_ONLINE 2
+
void _PR_InitIO()
{
+ PRIntn index;
+ char *num_io_queues;
+
+ if (NULL != (num_io_queues = getenv("NSPR_NUM_IO_QUEUES")))
+ {
+ _pt_tq_count = atoi(num_io_queues);
+ }
+ else
+ {
+ /*
+ * Get the number of CPUs if the pthread
+ * library has kernel-scheduled entities that
+ * can run on multiple CPUs.
+ */
+#ifdef HPUX11
+ _pt_num_cpus = pthread_num_processors_np();
+#elif defined(IRIX) || defined(OSF1)
+ _pt_num_cpus = sysconf(_SC_NPROC_ONLN);
+#elif defined(AIX) || defined(LINUX) || defined(SOLARIS)
+ _pt_num_cpus = sysconf(_SC_NPROCESSORS_ONLN);
+#else
+ /*
+ * A pure user-level (Mx1) pthread library can
+ * only use one CPU, even on a multiprocessor.
+ */
+ _pt_num_cpus = 1;
+#endif
+ if (_pt_num_cpus < 0)
+ _pt_num_cpus = _MD_CPUS_ONLINE;
+ _pt_tq_count = _pt_num_cpus;
+ }
+
+ pt_tqp = (struct pt_TimedQueue *)
+ PR_CALLOC(_pt_tq_count * sizeof(struct pt_TimedQueue));
+ PR_ASSERT(NULL != pt_tqp);
+
+ for (index = 0; index < _pt_tq_count; index++)
+ {
+ pt_tqp[index].ml = PR_NewLock();
+ PR_ASSERT(NULL != pt_tqp[index].ml);
+ }
+
#if defined(DEBUG)
memset(&pt_debug, 0, sizeof(PTDebug));
pt_debug.timeStarted = PR_Now();
@@ -860,6 +1438,7 @@ static PRInt32 pt_Read(PRFileDesc *fd, void *buf, PRInt32 amount)
&& (!fd->secret->nonblocking))
{
pt_Continuation op;
+ op.fd = fd;
op.arg1.osfd = fd->secret->md.osfd;
op.arg2.buffer = buf;
op.arg3.amount = amount;
@@ -900,6 +1479,7 @@ static PRInt32 pt_Write(PRFileDesc *fd, const void *buf, PRInt32 amount)
if (fNeedContinue == PR_TRUE)
{
pt_Continuation op;
+ op.fd = fd;
op.arg1.osfd = fd->secret->md.osfd;
op.arg2.buffer = (void*)buf;
op.arg3.amount = amount;
@@ -993,6 +1573,7 @@ static PRInt32 pt_Writev(
{
pt_Continuation op;
+ op.fd = fd;
op.arg1.osfd = fd->secret->md.osfd;
op.arg2.buffer = (void*)osiov;
op.arg3.amount = osiov_len;
@@ -1132,6 +1713,7 @@ static PRStatus pt_Connect(
else
{
pt_Continuation op;
+ op.fd = fd;
op.arg1.osfd = fd->secret->md.osfd;
#ifdef _PR_HAVE_SOCKADDR_LEN
op.arg2.buffer = (void*)&addrCopy;
@@ -1206,6 +1788,7 @@ static PRFileDesc* pt_Accept(
else
{
pt_Continuation op;
+ op.fd = fd;
op.arg1.osfd = fd->secret->md.osfd;
op.arg2.buffer = addr;
op.arg3.addr_len = &addr_len;
@@ -1336,6 +1919,7 @@ static PRInt32 pt_Recv(
else
{
pt_Continuation op;
+ op.fd = fd;
op.arg1.osfd = fd->secret->md.osfd;
op.arg2.buffer = buf;
op.arg3.amount = amount;
@@ -1420,6 +2004,7 @@ static PRInt32 pt_Send(
if (fNeedContinue == PR_TRUE)
{
pt_Continuation op;
+ op.fd = fd;
op.arg1.osfd = fd->secret->md.osfd;
op.arg2.buffer = (void*)buf;
op.arg3.amount = amount;
@@ -1479,6 +2064,7 @@ static PRInt32 pt_SendTo(
if (fNeedContinue == PR_TRUE)
{
pt_Continuation op;
+ op.fd = fd;
op.arg1.osfd = fd->secret->md.osfd;
op.arg2.buffer = (void*)buf;
op.arg3.amount = amount;
@@ -1524,6 +2110,7 @@ static PRInt32 pt_RecvFrom(PRFileDesc *fd, void *buf, PRInt32 amount,
if (fNeedContinue == PR_TRUE)
{
pt_Continuation op;
+ op.fd = fd;
op.arg1.osfd = fd->secret->md.osfd;
op.arg2.buffer = buf;
op.arg3.amount = amount;
@@ -1651,6 +2238,7 @@ static PRInt32 pt_AIXSendFile(PRFileDesc *sd, PRSendFileData *sfd,
if ((rv == 1) || ((rv == -1) && (count == 0))) {
pt_Continuation op;
+ op.fd = sd;
op.arg1.osfd = sd->secret->md.osfd;
op.arg2.buffer = &sf_struct;
op.arg4.flags = send_flags;
@@ -1769,6 +2357,7 @@ static PRInt32 pt_HPUXSendFile(PRFileDesc *sd, PRSendFileData *sfd,
hdtrl[1].iov_len = sfd->tlen - trailer_nbytes_sent;
}
+ op.fd = sd;
op.arg1.osfd = sd->secret->md.osfd;
op.filedesc = sfd->fd->secret->md.osfd;
op.arg2.buffer = hdtrl;
@@ -2326,7 +2915,7 @@ static PRIOMethods _pr_pipe_methods = {
pt_Write,
pt_Available_s,
pt_Available64_s,
- pt_Synch,
+ pt_Fsync,
(PRSeekFN)_PR_InvalidInt,
(PRSeek64FN)_PR_InvalidInt64,
(PRFileInfoFN)_PR_InvalidStatus,
@@ -2544,7 +3133,7 @@ PR_IMPLEMENT(const PRIOMethods*) PR_GetFileMethods()
PR_IMPLEMENT(const PRIOMethods*) PR_GetPipeMethods()
{
return &_pr_pipe_methods;
-} /* PR_GetPipeMethods */
+} /* PR_GetFileMethods */
PR_IMPLEMENT(const PRIOMethods*) PR_GetTCPMethods()
{
diff --git a/pr/src/pthreads/ptthread.c b/pr/src/pthreads/ptthread.c
index 35f204f4..49f08da1 100644
--- a/pr/src/pthreads/ptthread.c
+++ b/pr/src/pthreads/ptthread.c
@@ -224,6 +224,7 @@ static PRThread* pt_AttachThread(void)
PR_ASSERT(0 == rv);
thred->state = PT_THREAD_GLOBAL | PT_THREAD_FOREIGN;
+ thred->io_tq_index = -1;
PR_Lock(pt_book.ml);
/* then put it into the list */
@@ -362,6 +363,8 @@ static PRThread* _PR_CreateThread(
thred->stack->stackSize = stackSize;
thred->stack->thr = thred;
+ thred->io_tq_index = -1;
+
#ifdef PT_NO_SIGTIMEDWAIT
pthread_mutex_init(&thred->suspendResumeMutex,NULL);
pthread_cond_init(&thred->suspendResumeCV,NULL);
@@ -739,6 +742,8 @@ static void _pt_thread_death(void *arg)
PR_Free(thred->privateData);
if (NULL != thred->errorString)
PR_Free(thred->errorString);
+ if (NULL != thred->io_cv)
+ PR_DestroyCondVar(thred->io_cv);
PR_Free(thred->stack);
#if defined(DEBUG)
memset(thred, 0xaf, sizeof(PRThread));
@@ -804,6 +809,8 @@ void _PR_InitThreads(
thred->stack->thr = thred;
_PR_InitializeStack(thred->stack);
+ thred->io_tq_index = -1;
+
/*
* Create a key for our use to store a backpointer in the pthread
* to our PRThread object. This object gets deleted when the thread