summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGuido van Rossum <guido@python.org>2012-10-31 13:48:43 -0700
committerGuido van Rossum <guido@python.org>2012-10-31 13:48:43 -0700
commiteab0bafc76eadc7bb41d616098611baeba0fa479 (patch)
treebe298b86d3bdae81e222e1a73f9dbae703d5cbda
parent45882e471ea3556ed99981f0d09edbbde54176a4 (diff)
downloadtrollius-eab0bafc76eadc7bb41d616098611baeba0fa479.tar.gz
Fix race condition in call_in_thread().
-rw-r--r--TODO12
-rw-r--r--scheduling.py54
2 files changed, 43 insertions, 23 deletions
diff --git a/TODO b/TODO
index 8444c2b..163f5d3 100644
--- a/TODO
+++ b/TODO
@@ -1,3 +1,5 @@
+# -*- Mode: text -*-
+
TO DO SMALLER TASKS
- Make Task more like Future; getting result() should re-raise exception.
@@ -13,6 +15,9 @@ TO DO LARGER TASKS
- Write up a tutorial for the scheduling API.
+- More systematic approach to logging. Logger objects? What about
+ heavy-duty logging, tracking essentially all task state changes?
+
TO DO LATER
@@ -126,3 +131,10 @@ MISTAKES I MADE
- Forgot to set the connection socket returned by accept() in
nonblocking mode.
+
+- Nastiest so far (cost me about a day): A race condition in
+ call_in_thread() where the Future's done_callback (which was
+ task.unblock()) would run immediately at the time when
+ add_done_callback() was called, and this screwed over the task
+ state. Solution: wrap the callback in eventloop.call_later().
+ Ironically, I had a comment stating there might be a race condition.
diff --git a/scheduling.py b/scheduling.py
index 057613c..4848165 100644
--- a/scheduling.py
+++ b/scheduling.py
@@ -94,17 +94,21 @@ class Task:
def __repr__(self):
parts = [self.name]
- if self.alive:
- is_current = (self is context.current_task)
- if self.blocked:
- parts.append('blocking' if is_current else 'blocked')
- else:
- parts.append('running' if is_current else 'runnable')
- else:
- if self.exception:
- parts.append('exception=%r' % self.exception)
- else:
- parts.append('result=%r' % self.result)
+ is_current = (self is context.current_task)
+ if self.blocked:
+ parts.append('blocking' if is_current else 'blocked')
+ elif self.alive:
+ parts.append('running' if is_current else 'runnable')
+ if self.must_cancel:
+ parts.append('must_cancel')
+ if self.cancelled:
+ parts.append('cancelled')
+ if self.exception:
+ parts.append('exception=%r' % self.exception)
+ elif not self.alive:
+ parts.append('result=%r' % self.result)
+ if self.timeout is not None:
+ parts.append('timeout=%.3f' % self.timeout)
return 'Task<' + ', '.join(parts) + '>'
def cancel(self):
@@ -113,7 +117,7 @@ class Task:
self.unblock()
def step(self):
- assert self.alive
+ assert self.alive, self
try:
context.current_task = self
if self.must_cancel:
@@ -128,7 +132,7 @@ class Task:
except Exception as exc:
self.alive = False
self.exception = exc
- logging.debug('Uncaught exception in task %r', self.name,
+ logging.debug('Uncaught exception in %s', self,
exc_info=True, stack_info=True)
except BaseException as exc:
self.alive = False
@@ -148,20 +152,20 @@ class Task:
self.eventloop.call_soon(callback, self)
def start(self):
- assert self.alive
+ assert self.alive, self
self.eventloop.call_soon(self.step)
def block(self, unblock_callback=None, *unblock_args):
- assert self is context.current_task
- assert self.alive
- assert not self.blocked
+ assert self is context.current_task, self
+ assert self.alive, self
+ assert not self.blocked, self
self.blocked = True
self.unblocker = (unblock_callback, unblock_args)
def unblock(self, unused=None):
# Ignore optional argument so we can be a Future's done_callback.
- assert self.alive
- assert self.blocked
+ assert self.alive, self
+ assert self.blocked, self
self.blocked = False
unblock_callback, unblock_args = self.unblocker
if unblock_callback is not None:
@@ -187,7 +191,7 @@ class Task:
def wait(self):
"""COROUTINE: Wait until this task is finished."""
current_task = context.current_task
- assert self is not current_task # How confusing!
+ assert self is not current_task, (self, current_task) # How confusing!
if not self.alive:
return
current_task.block()
@@ -238,11 +242,15 @@ def block_w(fd):
def call_in_thread(func, *args, executor=None):
"""COROUTINE: Run a function in a thread."""
- # TODO: Prove there is no race condition here.
- future = context.threadrunner.submit(func, *args, executor=executor)
task = context.current_task
+ eventloop = context.eventloop
+ future = context.threadrunner.submit(func, *args, executor=executor)
task.block(future.cancel)
- future.add_done_callback(task.unblock)
+ # If the thread managed to complete before we get here,
+ # add_done_callback() will call the callback right now. Make sure
+ # the unblock() call doesn't happen until later.
+ # TODO: Make unblock() robust so this doesn't hurt?
+ future.add_done_callback(lambda _: eventloop.call_soon(task.unblock))
yield
assert future.done()
return future.result()