summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2015-02-05 14:38:21 +0000
committerGerrit Code Review <review@openstack.org>2015-02-05 14:38:21 +0000
commit0bf006f24f36824510913846ab148cc93bb2e951 (patch)
treef636ef5548316c17d55354749029a864ab53984c
parentbd52d08efaa608244254a801c9737a0aed7434e0 (diff)
parent8380ac6e661924617c53171569c1ee44dc112cea (diff)
downloadoslo-messaging-0bf006f24f36824510913846ab148cc93bb2e951.tar.gz
Merge "Fix notifications broken with ZMQ driver"
-rw-r--r--oslo_messaging/_drivers/impl_zmq.py24
-rw-r--r--oslo_messaging/tests/drivers/test_impl_zmq.py5
2 files changed, 8 insertions, 21 deletions
diff --git a/oslo_messaging/_drivers/impl_zmq.py b/oslo_messaging/_drivers/impl_zmq.py
index f9cb473..23961ee 100644
--- a/oslo_messaging/_drivers/impl_zmq.py
+++ b/oslo_messaging/_drivers/impl_zmq.py
@@ -302,9 +302,9 @@ class InternalContext(object):
data.setdefault('args', {})
try:
- result = proxy.dispatch(
- ctx, data['version'], data['method'],
- data.get('namespace'), **data['args'])
+ if not data.get("method"):
+ raise KeyError
+ result = proxy.dispatch(ctx, data)
return ConsumerBase.normalize_reply(result, ctx.replies)
except greenlet.GreenletExit:
# ignore these since they are just from shutdowns
@@ -368,18 +368,13 @@ class ConsumerBase(object):
# Method starting with - are
# processed internally. (non-valid method name)
method = data.get('method')
- if not method:
- LOG.error(_("RPC message did not include method."))
- return
-
# Internal method
# uses internal context for safety.
if method == '-reply':
self.private_ctx.reply(ctx, proxy, **data['args'])
return
- proxy.dispatch(ctx, data['version'],
- data['method'], data.get('namespace'), **data['args'])
+ proxy.dispatch(ctx, data)
class ZmqBaseReactor(ConsumerBase):
@@ -834,16 +829,7 @@ class ZmqListener(base.Listener):
super(ZmqListener, self).__init__(driver)
self.incoming_queue = moves.queue.Queue()
- def dispatch(self, ctxt, version, method, namespace, **kwargs):
- message = {
- 'method': method,
- 'args': kwargs
- }
- if version:
- message['version'] = version
- if namespace:
- message['namespace'] = namespace
-
+ def dispatch(self, ctxt, message):
incoming = ZmqIncomingMessage(self,
ctxt.to_dict(),
message)
diff --git a/oslo_messaging/tests/drivers/test_impl_zmq.py b/oslo_messaging/tests/drivers/test_impl_zmq.py
index 173a5df..5593884 100644
--- a/oslo_messaging/tests/drivers/test_impl_zmq.py
+++ b/oslo_messaging/tests/drivers/test_impl_zmq.py
@@ -368,8 +368,9 @@ class TestZmqListener(ZmqBaseTestCase):
kwargs = {'a': 1, 'b': 2}
m = mock.Mock()
ctxt = mock.Mock(autospec=impl_zmq.RpcContext)
- eventlet.spawn_n(listener.dispatch, ctxt, 0,
- m.fake_method, 'name.space', **kwargs)
+ message = {'namespace': 'name.space', 'method': m.fake_method,
+ 'args': kwargs}
+ eventlet.spawn_n(listener.dispatch, ctxt, message)
resp = listener.poll(timeout=10)
msg = {'method': m.fake_method, 'namespace': 'name.space',
'args': kwargs}