summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorjoyce <qiaojian@awcloud.com>2015-01-27 14:34:18 +0800
committerjoyce <qiaojian@awcloud.com>2015-02-02 12:04:24 +0800
commit8380ac6e661924617c53171569c1ee44dc112cea (patch)
tree35ddd10ba7a8265414d149a06875a0a803a0e1d6
parenteb9251173c30013616ae109b5b945e87e5347e18 (diff)
downloadoslo-messaging-8380ac6e661924617c53171569c1ee44dc112cea.tar.gz
Fix notifications broken with ZMQ driver
The notification message did not contain a method keyword, so it was ignored by consumers. Another in dispatch, the notification msg confilict with other method. Change-Id: I7c60c6f80006222d3cbe818d32288d043b55a7b8 Closes-Bug:#1368154
-rw-r--r--oslo_messaging/_drivers/impl_zmq.py24
-rw-r--r--oslo_messaging/tests/drivers/test_impl_zmq.py5
2 files changed, 8 insertions, 21 deletions
diff --git a/oslo_messaging/_drivers/impl_zmq.py b/oslo_messaging/_drivers/impl_zmq.py
index f9cb473..23961ee 100644
--- a/oslo_messaging/_drivers/impl_zmq.py
+++ b/oslo_messaging/_drivers/impl_zmq.py
@@ -302,9 +302,9 @@ class InternalContext(object):
data.setdefault('args', {})
try:
- result = proxy.dispatch(
- ctx, data['version'], data['method'],
- data.get('namespace'), **data['args'])
+ if not data.get("method"):
+ raise KeyError
+ result = proxy.dispatch(ctx, data)
return ConsumerBase.normalize_reply(result, ctx.replies)
except greenlet.GreenletExit:
# ignore these since they are just from shutdowns
@@ -368,18 +368,13 @@ class ConsumerBase(object):
# Method starting with - are
# processed internally. (non-valid method name)
method = data.get('method')
- if not method:
- LOG.error(_("RPC message did not include method."))
- return
-
# Internal method
# uses internal context for safety.
if method == '-reply':
self.private_ctx.reply(ctx, proxy, **data['args'])
return
- proxy.dispatch(ctx, data['version'],
- data['method'], data.get('namespace'), **data['args'])
+ proxy.dispatch(ctx, data)
class ZmqBaseReactor(ConsumerBase):
@@ -834,16 +829,7 @@ class ZmqListener(base.Listener):
super(ZmqListener, self).__init__(driver)
self.incoming_queue = moves.queue.Queue()
- def dispatch(self, ctxt, version, method, namespace, **kwargs):
- message = {
- 'method': method,
- 'args': kwargs
- }
- if version:
- message['version'] = version
- if namespace:
- message['namespace'] = namespace
-
+ def dispatch(self, ctxt, message):
incoming = ZmqIncomingMessage(self,
ctxt.to_dict(),
message)
diff --git a/oslo_messaging/tests/drivers/test_impl_zmq.py b/oslo_messaging/tests/drivers/test_impl_zmq.py
index 6d287fa..b9a878f 100644
--- a/oslo_messaging/tests/drivers/test_impl_zmq.py
+++ b/oslo_messaging/tests/drivers/test_impl_zmq.py
@@ -416,8 +416,9 @@ class TestZmqListener(test_utils.BaseTestCase):
kwargs = {'a': 1, 'b': 2}
m = mock.Mock()
ctxt = mock.Mock(autospec=impl_zmq.RpcContext)
- eventlet.spawn_n(listener.dispatch, ctxt, 0,
- m.fake_method, 'name.space', **kwargs)
+ message = {'namespace': 'name.space', 'method': m.fake_method,
+ 'args': kwargs}
+ eventlet.spawn_n(listener.dispatch, ctxt, message)
resp = listener.poll(timeout=10)
msg = {'method': m.fake_method, 'namespace': 'name.space',
'args': kwargs}