summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2015-02-08 15:06:15 +0000
committerGerrit Code Review <review@openstack.org>2015-02-08 15:06:15 +0000
commit8b7bba57494a1a2de275aaeaebb900cf40014e94 (patch)
tree3873738b300d962791555386227d95d886059a81
parent56fda6574618df6a19161e317854e4497af1052b (diff)
parentd8e68c365b86d35b3f83e8fca89c38130c93dbd2 (diff)
downloadoslo-messaging-8b7bba57494a1a2de275aaeaebb900cf40014e94.tar.gz
Merge "Add a new aioeventlet executor"
-rw-r--r--oslo_messaging/_executors/impl_aioeventlet.py75
-rw-r--r--oslo_messaging/_executors/impl_eventlet.py6
-rw-r--r--oslo_messaging/tests/executors/test_executor.py111
-rw-r--r--requirements.txt4
-rw-r--r--setup.cfg1
5 files changed, 181 insertions, 16 deletions
diff --git a/oslo_messaging/_executors/impl_aioeventlet.py b/oslo_messaging/_executors/impl_aioeventlet.py
new file mode 100644
index 0000000..d0fc4aa
--- /dev/null
+++ b/oslo_messaging/_executors/impl_aioeventlet.py
@@ -0,0 +1,75 @@
+# Copyright 2014 eNovance.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import aioeventlet
+import trollius
+
+from oslo_messaging._executors import impl_eventlet
+
+
+class AsyncioEventletExecutor(impl_eventlet.EventletExecutor):
+
+ """A message executor which integrates with eventlet and trollius.
+
+ The executor is based on eventlet executor and so is compatible with it.
+ The executor supports trollius coroutines, explicit asynchronous
+ programming, in addition to eventlet greenthreads, implicit asynchronous
+ programming.
+
+ To use the executor, an aioeventlet event loop must the running in the
+ thread executing the executor (usually the main thread). Example of code to
+ setup and run an aioeventlet event loop for the executor (in the main
+ thread):
+
+ import aioeventlet
+ import trollius
+
+ policy = aioeventlet.EventLoopPolicy()
+ trollius.set_event_loop_policy(policy)
+
+ def run_loop(loop):
+ loop.run_forever()
+ loop.close()
+
+ # Get the aioeventlet event loop (create it if needed)
+ loop = trollius.get_event_loop()
+
+ # run the event loop in a new greenthread,
+ # close it when it is done
+ eventlet.spawn(run_loop, loop)
+ """
+
+ def __init__(self, conf, listener, dispatcher):
+ super(AsyncioEventletExecutor, self).__init__(conf, listener,
+ dispatcher)
+ self._loop = None
+
+ def start(self):
+ # check that the event loop is an aioeventlet event loop
+ loop = trollius.get_event_loop()
+ if not isinstance(loop, aioeventlet.EventLoop):
+ raise RuntimeError("need an aioeventlet event loop")
+ self._loop = loop
+
+ super(AsyncioEventletExecutor, self).start()
+
+ def _coroutine_wrapper(self, func, *args, **kw):
+ result = func(*args, **kw)
+ if trollius.iscoroutine(result):
+ result = aioeventlet.yield_future(result, loop=self._loop)
+ return result
+
+ def _dispatch(self, incoming):
+ ctx = self.dispatcher(incoming, self._coroutine_wrapper)
+ impl_eventlet.spawn_with(ctxt=ctx, pool=self._greenpool)
diff --git a/oslo_messaging/_executors/impl_eventlet.py b/oslo_messaging/_executors/impl_eventlet.py
index 555ae03..3333fe7 100644
--- a/oslo_messaging/_executors/impl_eventlet.py
+++ b/oslo_messaging/_executors/impl_eventlet.py
@@ -82,6 +82,9 @@ class EventletExecutor(base.PooledExecutorBase):
'behavior. In the future, we will raise a '
'RuntimeException in this case.')
+ def _dispatch(self, incoming):
+ spawn_with(ctxt=self.dispatcher(incoming), pool=self._greenpool)
+
def start(self):
if self._thread is not None:
return
@@ -92,8 +95,7 @@ class EventletExecutor(base.PooledExecutorBase):
while self._running:
incoming = self.listener.poll()
if incoming is not None:
- spawn_with(ctxt=self.dispatcher(incoming),
- pool=self._greenpool)
+ self._dispatch(incoming)
except greenlet.GreenletExit:
return
diff --git a/oslo_messaging/tests/executors/test_executor.py b/oslo_messaging/tests/executors/test_executor.py
index cb321dd..4e3ec51 100644
--- a/oslo_messaging/tests/executors/test_executor.py
+++ b/oslo_messaging/tests/executors/test_executor.py
@@ -17,6 +17,12 @@
import contextlib
import threading
+# eventlet 0.16 with monkey patching does not work yet on Python 3,
+# so make aioeventlet, eventlet and trollius import optional
+try:
+ import aioeventlet
+except ImportError:
+ aioeventlet = None
try:
import eventlet
except ImportError:
@@ -24,7 +30,16 @@ except ImportError:
import mock
import testscenarios
import testtools
+try:
+ import trollius
+except ImportError:
+ pass
+
+try:
+ from oslo_messaging._executors import impl_aioeventlet
+except ImportError:
+ impl_aioeventlet = None
from oslo_messaging._executors import impl_blocking
try:
from oslo_messaging._executors import impl_eventlet
@@ -46,42 +61,110 @@ class TestExecutor(test_utils.BaseTestCase):
if impl_eventlet is not None:
impl.append(
('eventlet', dict(executor=impl_eventlet.EventletExecutor)))
+ if impl_aioeventlet is not None:
+ impl.append(
+ ('aioeventlet',
+ dict(executor=impl_aioeventlet.AsyncioEventletExecutor)))
cls.scenarios = testscenarios.multiply_scenarios(impl)
@staticmethod
- def _run_in_thread(executor):
- def thread():
- executor.start()
- executor.wait()
- thread = threading.Thread(target=thread)
+ def _run_in_thread(target, executor):
+ thread = threading.Thread(target=target, args=(executor,))
thread.daemon = True
thread.start()
thread.join(timeout=30)
def test_executor_dispatch(self):
- callback = mock.MagicMock(return_value='result')
+ if impl_aioeventlet is not None:
+ aioeventlet_class = impl_aioeventlet.AsyncioEventletExecutor
+ else:
+ aioeventlet_class = None
+ is_aioeventlet = (self.executor == aioeventlet_class)
+
+ if is_aioeventlet:
+ policy = aioeventlet.EventLoopPolicy()
+ trollius.set_event_loop_policy(policy)
+ self.addCleanup(trollius.set_event_loop_policy, None)
+
+ def run_loop(loop):
+ loop.run_forever()
+ loop.close()
+ trollius.set_event_loop(None)
+
+ def run_executor(executor):
+ # create an event loop in the executor thread
+ loop = trollius.new_event_loop()
+ trollius.set_event_loop(loop)
+ eventlet.spawn(run_loop, loop)
+
+ # run the executor
+ executor.start()
+ executor.wait()
+
+ # stop the event loop: run_loop() will close it
+ loop.stop()
+
+ @trollius.coroutine
+ def simple_coroutine(value):
+ yield None
+ raise trollius.Return(value)
+
+ endpoint = mock.MagicMock(return_value=simple_coroutine('result'))
+ event = eventlet.event.Event()
+ else:
+ def run_executor(executor):
+ executor.start()
+ executor.wait()
+
+ endpoint = mock.MagicMock(return_value='result')
class Dispatcher(object):
+ def __init__(self, endpoint):
+ self.endpoint = endpoint
+ self.result = "not set"
+
@contextlib.contextmanager
- def __call__(self, incoming):
- yield lambda: callback(incoming.ctxt, incoming.message)
+ def __call__(self, incoming, executor_callback=None):
+ if executor_callback is not None:
+ def callback():
+ result = executor_callback(self.endpoint,
+ incoming.ctxt,
+ incoming.message)
+ self.result = result
+ return result
+ yield callback
+ event.send()
+ else:
+ def callback():
+ result = self.endpoint(incoming.ctxt, incoming.message)
+ self.result = result
+ return result
+ yield callback
listener = mock.Mock(spec=['poll'])
- executor = self.executor(self.conf, listener, Dispatcher())
+ dispatcher = Dispatcher(endpoint)
+ executor = self.executor(self.conf, listener, dispatcher)
incoming_message = mock.MagicMock(ctxt={},
message={'payload': 'data'})
def fake_poll(timeout=None):
- if listener.poll.call_count == 1:
- return incoming_message
- executor.stop()
+ if is_aioeventlet:
+ if listener.poll.call_count == 1:
+ return incoming_message
+ event.wait()
+ executor.stop()
+ else:
+ if listener.poll.call_count == 1:
+ return incoming_message
+ executor.stop()
listener.poll.side_effect = fake_poll
- self._run_in_thread(executor)
+ self._run_in_thread(run_executor, executor)
- callback.assert_called_once_with({}, {'payload': 'data'})
+ endpoint.assert_called_once_with({}, {'payload': 'data'})
+ self.assertEqual(dispatcher.result, 'result')
TestExecutor.generate_scenarios()
diff --git a/requirements.txt b/requirements.txt
index 352b14a..e6747b0 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -29,3 +29,7 @@ oslo.middleware>=0.3.0 # Apache-2.0
# for the futures based executor
futures>=2.1.6
+
+# needed by the aioeventlet executor
+aioeventlet>=0.4
+trollius>=1.0
diff --git a/setup.cfg b/setup.cfg
index 3f94cb9..0e191c4 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -42,6 +42,7 @@ oslo.messaging.drivers =
fake = oslo_messaging._drivers.impl_fake:FakeDriver
oslo.messaging.executors =
+ aioeventlet = oslo_messaging._executors.impl_aioeventlet:AsyncioEventletExecutor
blocking = oslo_messaging._executors.impl_blocking:BlockingExecutor
eventlet = oslo_messaging._executors.impl_eventlet:EventletExecutor
threading = oslo_messaging._executors.impl_thread:ThreadExecutor