diff options
author | Jenkins <jenkins@review.openstack.org> | 2015-02-05 14:38:21 +0000 |
---|---|---|
committer | Gerrit Code Review <review@openstack.org> | 2015-02-05 14:38:21 +0000 |
commit | 0bf006f24f36824510913846ab148cc93bb2e951 (patch) | |
tree | f636ef5548316c17d55354749029a864ab53984c | |
parent | bd52d08efaa608244254a801c9737a0aed7434e0 (diff) | |
parent | 8380ac6e661924617c53171569c1ee44dc112cea (diff) | |
download | oslo-messaging-0bf006f24f36824510913846ab148cc93bb2e951.tar.gz |
Merge "Fix notifications broken with ZMQ driver"
-rw-r--r-- | oslo_messaging/_drivers/impl_zmq.py | 24 | ||||
-rw-r--r-- | oslo_messaging/tests/drivers/test_impl_zmq.py | 5 |
2 files changed, 8 insertions, 21 deletions
diff --git a/oslo_messaging/_drivers/impl_zmq.py b/oslo_messaging/_drivers/impl_zmq.py index f9cb473..23961ee 100644 --- a/oslo_messaging/_drivers/impl_zmq.py +++ b/oslo_messaging/_drivers/impl_zmq.py @@ -302,9 +302,9 @@ class InternalContext(object): data.setdefault('args', {}) try: - result = proxy.dispatch( - ctx, data['version'], data['method'], - data.get('namespace'), **data['args']) + if not data.get("method"): + raise KeyError + result = proxy.dispatch(ctx, data) return ConsumerBase.normalize_reply(result, ctx.replies) except greenlet.GreenletExit: # ignore these since they are just from shutdowns @@ -368,18 +368,13 @@ class ConsumerBase(object): # Method starting with - are # processed internally. (non-valid method name) method = data.get('method') - if not method: - LOG.error(_("RPC message did not include method.")) - return - # Internal method # uses internal context for safety. if method == '-reply': self.private_ctx.reply(ctx, proxy, **data['args']) return - proxy.dispatch(ctx, data['version'], - data['method'], data.get('namespace'), **data['args']) + proxy.dispatch(ctx, data) class ZmqBaseReactor(ConsumerBase): @@ -834,16 +829,7 @@ class ZmqListener(base.Listener): super(ZmqListener, self).__init__(driver) self.incoming_queue = moves.queue.Queue() - def dispatch(self, ctxt, version, method, namespace, **kwargs): - message = { - 'method': method, - 'args': kwargs - } - if version: - message['version'] = version - if namespace: - message['namespace'] = namespace - + def dispatch(self, ctxt, message): incoming = ZmqIncomingMessage(self, ctxt.to_dict(), message) diff --git a/oslo_messaging/tests/drivers/test_impl_zmq.py b/oslo_messaging/tests/drivers/test_impl_zmq.py index 173a5df..5593884 100644 --- a/oslo_messaging/tests/drivers/test_impl_zmq.py +++ b/oslo_messaging/tests/drivers/test_impl_zmq.py @@ -368,8 +368,9 @@ class TestZmqListener(ZmqBaseTestCase): kwargs = {'a': 1, 'b': 2} m = mock.Mock() ctxt = mock.Mock(autospec=impl_zmq.RpcContext) - eventlet.spawn_n(listener.dispatch, ctxt, 0, - m.fake_method, 'name.space', **kwargs) + message = {'namespace': 'name.space', 'method': m.fake_method, + 'args': kwargs} + eventlet.spawn_n(listener.dispatch, ctxt, message) resp = listener.poll(timeout=10) msg = {'method': m.fake_method, 'namespace': 'name.space', 'args': kwargs} |