summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSergey Shepelev <temotor@gmail.com>2018-01-11 13:23:11 +0300
committerSergey Shepelev <temotor@gmail.com>2018-01-11 13:23:11 +0300
commit4afa3f46fbf961941d34c58bd61888d9388a4f9c (patch)
tree462b7fb575083906245da14943738b9e5d7f79b7
parentc32684b644eaaf649693d0c9ccd6264eb858f0cd (diff)
downloadeventlet-147-multiprocessing.tar.gz
WIP multiprocessing.ThreadPool AttributeError _state147-multiprocessing
Attempted to fix by patching multiprocessing.pool with non-green threading and queue modules. Does not work yet. Maybe I am working in wrong direction.
-rw-r--r--eventlet/green/multiprocessing/__init__.py1
-rw-r--r--eventlet/green/multiprocessing/pool.py21
-rw-r--r--eventlet/patcher.py16
-rw-r--r--eventlet/queue.py54
-rw-r--r--tests/isolated/patcher_multiprocessing_mainthread_state.py18
-rw-r--r--tests/patcher_test.py4
6 files changed, 111 insertions, 3 deletions
diff --git a/eventlet/green/multiprocessing/__init__.py b/eventlet/green/multiprocessing/__init__.py
new file mode 100644
index 0000000..301bba4
--- /dev/null
+++ b/eventlet/green/multiprocessing/__init__.py
@@ -0,0 +1 @@
+from . import pool
diff --git a/eventlet/green/multiprocessing/pool.py b/eventlet/green/multiprocessing/pool.py
new file mode 100644
index 0000000..d0607cf
--- /dev/null
+++ b/eventlet/green/multiprocessing/pool.py
@@ -0,0 +1,21 @@
+# Force non-green threading for multiprocessing.pool
+from eventlet import patcher
+from eventlet.green import time
+# from eventlet.green import Queue
+from eventlet.support import six
+
+__patched__ = ['Pool', 'ThreadPool']
+
+__orig_threading = patcher.original('threading')
+__orig_queue_name = 'Queue' if six.PY2 else 'queue'
+__orig_queue = patcher.original(__orig_queue_name)
+
+patcher.inject(
+ 'multiprocessing.pool',
+ globals(),
+ (__orig_queue_name, __orig_queue),
+ ('threading', __orig_threading),
+ ('time', time),
+)
+
+del patcher
diff --git a/eventlet/patcher.py b/eventlet/patcher.py
index 82a9cee..25d1fb3 100644
--- a/eventlet/patcher.py
+++ b/eventlet/patcher.py
@@ -233,9 +233,13 @@ def monkey_patch(**on):
# the hub calls into monkey-patched modules.
eventlet.hubs.get_hub()
- accepted_args = set(('os', 'select', 'socket',
- 'thread', 'time', 'psycopg', 'MySQLdb',
- 'builtins', 'subprocess'))
+ accepted_args = frozenset((
+ 'builtins', 'os',
+ 'MySQLdb', 'psycopg',
+ 'multiprocessing',
+ 'select', 'socket', 'subprocess',
+ 'thread', 'time',
+ ))
# To make sure only one of them is passed here
assert not ('__builtin__' in on and 'builtins' in on)
try:
@@ -274,6 +278,7 @@ def monkey_patch(**on):
('MySQLdb', _green_MySQLdb),
('builtins', _green_builtins),
('subprocess', _green_subprocess_modules),
+ ('multiprocessing', _green_multiprocessing_modules),
]:
if on[name] and not already_patched.get(name):
modules_to_patch += modules_function()
@@ -454,6 +459,11 @@ def _green_builtins():
return []
+def _green_multiprocessing_modules():
+ from eventlet.green import multiprocessing
+ return [('multiprocessing.pool', multiprocessing.pool)]
+
+
def slurp_properties(source, destination, ignore=[], srckeys=None):
"""Copy properties from *source* (assumed to be a module) to
*destination* (assumed to be a dict).
diff --git a/eventlet/queue.py b/eventlet/queue.py
index 34d9d08..49846ae 100644
--- a/eventlet/queue.py
+++ b/eventlet/queue.py
@@ -46,6 +46,7 @@ import heapq
import collections
import traceback
+import eventlet
from eventlet.event import Event
from eventlet.greenthread import getcurrent
from eventlet.hubs import get_hub
@@ -143,6 +144,51 @@ class Waiter(object):
self.greenlet = None
+class MockCondition(object):
+ def __init__(self, lock=None):
+ self.lock = lock
+ self._e = Event()
+ self._w = 0
+
+ def acquire(self, *args):
+ # self.lock.acquire(*args)
+ pass
+
+ def release(self):
+ # self.lock.release()
+ pass
+
+ def notify(self, n=1):
+ # assert lock
+ self._w -= n
+ self._e.send()
+ eventlet.sleep(0)
+ # It is bad to use private Event._waiters.
+ # I needed to ensure all waiters are notified before resetting event.
+ while self._e._waiters:
+ eventlet.sleep(0)
+ if self._w <= 0:
+ self._e.reset()
+ self._w = 0
+
+ def notify_all(self):
+ # assert lock
+ self.notify(self._w)
+ notifyAll = notify_all
+
+ def wait(self, timeout=None):
+ # assert lock
+ self._w += 1
+ with eventlet.Timeout(timeout, False):
+ self._e.wait()
+
+ def __enter__(self):
+ self.acquire()
+
+ def __exit__(self, typ, val, tb):
+ self.release()
+
+
class LightQueue(object):
"""
This is a variant of Queue that behaves mostly like the standard
@@ -166,6 +212,10 @@ class LightQueue(object):
def _init(self, maxsize):
self.queue = collections.deque()
+ # Half hearted support of Queue.not_{empty,full} API (for compatibility with multiprocessing.pool)
+ # FIXME: implement proper acquire/release blocking, not only notifications
+ self.not_empty = MockCondition()
+ self.not_full = MockCondition()
def _get(self):
return self.queue.popleft()
@@ -270,6 +320,7 @@ class LightQueue(object):
self.putters.discard(waiter)
else:
raise Full
+ self.not_empty.notify()
def put_nowait(self, item):
"""Put an item into the queue without blocking.
@@ -292,6 +343,7 @@ class LightQueue(object):
if self.qsize():
if self.putters:
self._schedule_unlock()
+ self.not_full.notify()
return self._get()
elif not block and get_hub().greenlet is getcurrent():
# special case to make get_nowait() runnable in the mainloop greenlet
@@ -301,6 +353,7 @@ class LightQueue(object):
if putter:
putter.switch(putter)
if self.qsize():
+ self.not_full.notify()
return self._get()
raise Empty
elif block:
@@ -310,6 +363,7 @@ class LightQueue(object):
self.getters.add(waiter)
if self.putters:
self._schedule_unlock()
+ self.not_full.notify()
return waiter.wait()
finally:
self.getters.discard(waiter)
diff --git a/tests/isolated/patcher_multiprocessing_mainthread_state.py b/tests/isolated/patcher_multiprocessing_mainthread_state.py
new file mode 100644
index 0000000..3b81da8
--- /dev/null
+++ b/tests/isolated/patcher_multiprocessing_mainthread_state.py
@@ -0,0 +1,18 @@
+__test__ = False
+
+if __name__ == '__main__':
+ import eventlet
+ eventlet.monkey_patch()
+ import multiprocessing.pool
+ multiprocessing.util.log_to_stderr(level=1)
+
+ def fun():
+ eventlet.sleep(0.01)
+ return 42
+
+ pool = multiprocessing.pool.ThreadPool(2)
+ result = pool.apply_async(fun, []).get()
+ assert result == 42
+ pool.close()
+ pool.join()
+ print('pass')
diff --git a/tests/patcher_test.py b/tests/patcher_test.py
index ff59400..f719182 100644
--- a/tests/patcher_test.py
+++ b/tests/patcher_test.py
@@ -520,3 +520,7 @@ def test_blocking_select_methods_are_deleted():
def test_regular_file_readall():
tests.run_isolated('regular_file_readall.py')
+
+
+def test_multiprocessing_mainthread_state():
+ tests.run_isolated('patcher_multiprocessing_mainthread_state.py')