diff options
author | Guido van Rossum <guido@python.org> | 2012-10-31 13:48:43 -0700 |
---|---|---|
committer | Guido van Rossum <guido@python.org> | 2012-10-31 13:48:43 -0700 |
commit | eab0bafc76eadc7bb41d616098611baeba0fa479 (patch) | |
tree | be298b86d3bdae81e222e1a73f9dbae703d5cbda | |
parent | 45882e471ea3556ed99981f0d09edbbde54176a4 (diff) | |
download | trollius-eab0bafc76eadc7bb41d616098611baeba0fa479.tar.gz |
Fix race condition in call_in_thread().
-rw-r--r-- | TODO | 12 | ||||
-rw-r--r-- | scheduling.py | 54 |
2 files changed, 43 insertions, 23 deletions
@@ -1,3 +1,5 @@ +# -*- Mode: text -*- + TO DO SMALLER TASKS - Make Task more like Future; getting result() should re-raise exception. @@ -13,6 +15,9 @@ TO DO LARGER TASKS - Write up a tutorial for the scheduling API. +- More systematic approach to logging. Logger objects? What about + heavy-duty logging, tracking essentially all task state changes? + TO DO LATER @@ -126,3 +131,10 @@ MISTAKES I MADE - Forgot to set the connection socket returned by accept() in nonblocking mode. + +- Nastiest so far (cost me about a day): A race condition in + call_in_thread() where the Future's done_callback (which was + task.unblock()) would run immediately at the time when + add_done_callback() was called, and this screwed over the task + state. Solution: wrap the callback in eventloop.call_later(). + Ironically, I had a comment stating there might be a race condition. diff --git a/scheduling.py b/scheduling.py index 057613c..4848165 100644 --- a/scheduling.py +++ b/scheduling.py @@ -94,17 +94,21 @@ class Task: def __repr__(self): parts = [self.name] - if self.alive: - is_current = (self is context.current_task) - if self.blocked: - parts.append('blocking' if is_current else 'blocked') - else: - parts.append('running' if is_current else 'runnable') - else: - if self.exception: - parts.append('exception=%r' % self.exception) - else: - parts.append('result=%r' % self.result) + is_current = (self is context.current_task) + if self.blocked: + parts.append('blocking' if is_current else 'blocked') + elif self.alive: + parts.append('running' if is_current else 'runnable') + if self.must_cancel: + parts.append('must_cancel') + if self.cancelled: + parts.append('cancelled') + if self.exception: + parts.append('exception=%r' % self.exception) + elif not self.alive: + parts.append('result=%r' % self.result) + if self.timeout is not None: + parts.append('timeout=%.3f' % self.timeout) return 'Task<' + ', '.join(parts) + '>' def cancel(self): @@ -113,7 +117,7 @@ class Task: self.unblock() def step(self): - assert self.alive + assert self.alive, self try: context.current_task = self if self.must_cancel: @@ -128,7 +132,7 @@ class Task: except Exception as exc: self.alive = False self.exception = exc - logging.debug('Uncaught exception in task %r', self.name, + logging.debug('Uncaught exception in %s', self, exc_info=True, stack_info=True) except BaseException as exc: self.alive = False @@ -148,20 +152,20 @@ class Task: self.eventloop.call_soon(callback, self) def start(self): - assert self.alive + assert self.alive, self self.eventloop.call_soon(self.step) def block(self, unblock_callback=None, *unblock_args): - assert self is context.current_task - assert self.alive - assert not self.blocked + assert self is context.current_task, self + assert self.alive, self + assert not self.blocked, self self.blocked = True self.unblocker = (unblock_callback, unblock_args) def unblock(self, unused=None): # Ignore optional argument so we can be a Future's done_callback. - assert self.alive - assert self.blocked + assert self.alive, self + assert self.blocked, self self.blocked = False unblock_callback, unblock_args = self.unblocker if unblock_callback is not None: @@ -187,7 +191,7 @@ class Task: def wait(self): """COROUTINE: Wait until this task is finished.""" current_task = context.current_task - assert self is not current_task # How confusing! + assert self is not current_task, (self, current_task) # How confusing! if not self.alive: return current_task.block() @@ -238,11 +242,15 @@ def block_w(fd): def call_in_thread(func, *args, executor=None): """COROUTINE: Run a function in a thread.""" - # TODO: Prove there is no race condition here. - future = context.threadrunner.submit(func, *args, executor=executor) task = context.current_task + eventloop = context.eventloop + future = context.threadrunner.submit(func, *args, executor=executor) task.block(future.cancel) - future.add_done_callback(task.unblock) + # If the thread managed to complete before we get here, + # add_done_callback() will call the callback right now. Make sure + # the unblock() call doesn't happen until later. + # TODO: Make unblock() robust so this doesn't hurt? + future.add_done_callback(lambda _: eventloop.call_soon(task.unblock)) yield assert future.done() return future.result() |