diff options
| author | Sergey Shepelev <temotor@gmail.com> | 2018-01-11 13:23:11 +0300 |
|---|---|---|
| committer | Sergey Shepelev <temotor@gmail.com> | 2018-01-11 13:23:11 +0300 |
| commit | 4afa3f46fbf961941d34c58bd61888d9388a4f9c (patch) | |
| tree | 462b7fb575083906245da14943738b9e5d7f79b7 | |
| parent | c32684b644eaaf649693d0c9ccd6264eb858f0cd (diff) | |
| download | eventlet-147-multiprocessing.tar.gz | |
WIP multiprocessing.ThreadPool AttributeError _state147-multiprocessing
Attempted to fix by patching multiprocessing.pool with non-green threading and queue modules.
Does not work yet. Maybe I am working in wrong direction.
| -rw-r--r-- | eventlet/green/multiprocessing/__init__.py | 1 | ||||
| -rw-r--r-- | eventlet/green/multiprocessing/pool.py | 21 | ||||
| -rw-r--r-- | eventlet/patcher.py | 16 | ||||
| -rw-r--r-- | eventlet/queue.py | 54 | ||||
| -rw-r--r-- | tests/isolated/patcher_multiprocessing_mainthread_state.py | 18 | ||||
| -rw-r--r-- | tests/patcher_test.py | 4 |
6 files changed, 111 insertions, 3 deletions
diff --git a/eventlet/green/multiprocessing/__init__.py b/eventlet/green/multiprocessing/__init__.py new file mode 100644 index 0000000..301bba4 --- /dev/null +++ b/eventlet/green/multiprocessing/__init__.py @@ -0,0 +1 @@ +from . import pool diff --git a/eventlet/green/multiprocessing/pool.py b/eventlet/green/multiprocessing/pool.py new file mode 100644 index 0000000..d0607cf --- /dev/null +++ b/eventlet/green/multiprocessing/pool.py @@ -0,0 +1,21 @@ +# Force non-green threading for multiprocessing.pool +from eventlet import patcher +from eventlet.green import time +# from eventlet.green import Queue +from eventlet.support import six + +__patched__ = ['Pool', 'ThreadPool'] + +__orig_threading = patcher.original('threading') +__orig_queue_name = 'Queue' if six.PY2 else 'queue' +__orig_queue = patcher.original(__orig_queue_name) + +patcher.inject( + 'multiprocessing.pool', + globals(), + (__orig_queue_name, __orig_queue), + ('threading', __orig_threading), + ('time', time), +) + +del patcher diff --git a/eventlet/patcher.py b/eventlet/patcher.py index 82a9cee..25d1fb3 100644 --- a/eventlet/patcher.py +++ b/eventlet/patcher.py @@ -233,9 +233,13 @@ def monkey_patch(**on): # the hub calls into monkey-patched modules. eventlet.hubs.get_hub() - accepted_args = set(('os', 'select', 'socket', - 'thread', 'time', 'psycopg', 'MySQLdb', - 'builtins', 'subprocess')) + accepted_args = frozenset(( + 'builtins', 'os', + 'MySQLdb', 'psycopg', + 'multiprocessing', + 'select', 'socket', 'subprocess', + 'thread', 'time', + )) # To make sure only one of them is passed here assert not ('__builtin__' in on and 'builtins' in on) try: @@ -274,6 +278,7 @@ def monkey_patch(**on): ('MySQLdb', _green_MySQLdb), ('builtins', _green_builtins), ('subprocess', _green_subprocess_modules), + ('multiprocessing', _green_multiprocessing_modules), ]: if on[name] and not already_patched.get(name): modules_to_patch += modules_function() @@ -454,6 +459,11 @@ def _green_builtins(): return [] +def _green_multiprocessing_modules(): + from eventlet.green import multiprocessing + return [('multiprocessing.pool', multiprocessing.pool)] + + def slurp_properties(source, destination, ignore=[], srckeys=None): """Copy properties from *source* (assumed to be a module) to *destination* (assumed to be a dict). diff --git a/eventlet/queue.py b/eventlet/queue.py index 34d9d08..49846ae 100644 --- a/eventlet/queue.py +++ b/eventlet/queue.py @@ -46,6 +46,7 @@ import heapq import collections import traceback +import eventlet from eventlet.event import Event from eventlet.greenthread import getcurrent from eventlet.hubs import get_hub @@ -143,6 +144,51 @@ class Waiter(object): self.greenlet = None +class MockCondition(object): + def __init__(self, lock=None): + self.lock = lock + self._e = Event() + self._w = 0 + + def acquire(self, *args): + # self.lock.acquire(*args) + pass + + def release(self): + # self.lock.release() + pass + + def notify(self, n=1): + # assert lock + self._w -= n + self._e.send() + eventlet.sleep(0) + # It is bad to use private Event._waiters. + # I needed to ensure all waiters are notified before resetting event. + while self._e._waiters: + eventlet.sleep(0) + if self._w <= 0: + self._e.reset() + self._w = 0 + + def notify_all(self): + # assert lock + self.notify(self._w) + notifyAll = notify_all + + def wait(self, timeout=None): + # assert lock + self._w += 1 + with eventlet.Timeout(timeout, False): + self._e.wait() + + def __enter__(self): + self.acquire() + + def __exit__(self, typ, val, tb): + self.release() + + class LightQueue(object): """ This is a variant of Queue that behaves mostly like the standard @@ -166,6 +212,10 @@ class LightQueue(object): def _init(self, maxsize): self.queue = collections.deque() + # Half hearted support of Queue.not_{empty,full} API (for compatibility with multiprocessing.pool) + # FIXME: implement proper acquire/release blocking, not only notifications + self.not_empty = MockCondition() + self.not_full = MockCondition() def _get(self): return self.queue.popleft() @@ -270,6 +320,7 @@ class LightQueue(object): self.putters.discard(waiter) else: raise Full + self.not_empty.notify() def put_nowait(self, item): """Put an item into the queue without blocking. @@ -292,6 +343,7 @@ class LightQueue(object): if self.qsize(): if self.putters: self._schedule_unlock() + self.not_full.notify() return self._get() elif not block and get_hub().greenlet is getcurrent(): # special case to make get_nowait() runnable in the mainloop greenlet @@ -301,6 +353,7 @@ class LightQueue(object): if putter: putter.switch(putter) if self.qsize(): + self.not_full.notify() return self._get() raise Empty elif block: @@ -310,6 +363,7 @@ class LightQueue(object): self.getters.add(waiter) if self.putters: self._schedule_unlock() + self.not_full.notify() return waiter.wait() finally: self.getters.discard(waiter) diff --git a/tests/isolated/patcher_multiprocessing_mainthread_state.py b/tests/isolated/patcher_multiprocessing_mainthread_state.py new file mode 100644 index 0000000..3b81da8 --- /dev/null +++ b/tests/isolated/patcher_multiprocessing_mainthread_state.py @@ -0,0 +1,18 @@ +__test__ = False + +if __name__ == '__main__': + import eventlet + eventlet.monkey_patch() + import multiprocessing.pool + multiprocessing.util.log_to_stderr(level=1) + + def fun(): + eventlet.sleep(0.01) + return 42 + + pool = multiprocessing.pool.ThreadPool(2) + result = pool.apply_async(fun, []).get() + assert result == 42 + pool.close() + pool.join() + print('pass') diff --git a/tests/patcher_test.py b/tests/patcher_test.py index ff59400..f719182 100644 --- a/tests/patcher_test.py +++ b/tests/patcher_test.py @@ -520,3 +520,7 @@ def test_blocking_select_methods_are_deleted(): def test_regular_file_readall(): tests.run_isolated('regular_file_readall.py') + + +def test_multiprocessing_mainthread_state(): + tests.run_isolated('patcher_multiprocessing_mainthread_state.py') |
