diff options
author | Victor Sergeyev <vsergeyev@mirantis.com> | 2015-07-30 17:08:37 +0300 |
---|---|---|
committer | Victor Sergeyev <vsergeyev@mirantis.com> | 2015-07-30 17:18:08 +0300 |
commit | ed2d60ff011560078b3bfa6a5097f6e72af6b5dc (patch) | |
tree | 5fefc77963ab1c41513f0e658b26489f087c2159 | |
parent | c90525bfead1f495df86c5c5d795d25abad2e1d9 (diff) | |
download | oslo-messaging-ed2d60ff011560078b3bfa6a5097f6e72af6b5dc.tar.gz |
ZMQ: `Lazify` driver code
Some OpenStack services (e.g. Glance) makes a forks, so there is a sense
to initialize socket and thread related stuff `on demand`, not in
__init__().
Change-Id: Ie2012b31df86049cc841a0aaed16e6b879e0bcec
4 files changed, 8 insertions, 17 deletions
diff --git a/oslo_messaging/_drivers/zmq_driver/poller/green_poller.py b/oslo_messaging/_drivers/zmq_driver/poller/green_poller.py index 72429f1..dcf9da5 100644 --- a/oslo_messaging/_drivers/zmq_driver/poller/green_poller.py +++ b/oslo_messaging/_drivers/zmq_driver/poller/green_poller.py @@ -29,12 +29,12 @@ class GreenPoller(zmq_poller.ZmqPoller): def __init__(self): self.incoming_queue = six.moves.queue.Queue() self.green_pool = eventlet.GreenPool() - self.threads = [] + self.thread_by_socket = {} def register(self, socket, recv_method=None): - self.threads.append( - self.green_pool.spawn(self._socket_receive, socket, - recv_method)) + if socket not in self.thread_by_socket: + self.thread_by_socket[socket] = self.green_pool.spawn( + self._socket_receive, socket, recv_method) def _socket_receive(self, socket, recv_method=None): while True: @@ -59,10 +59,10 @@ class GreenPoller(zmq_poller.ZmqPoller): return incoming[0], incoming[1] def close(self): - for thread in self.threads: + for thread in self.thread_by_socket.values(): thread.kill() - self.threads = [] + self.thread_by_socket = {} class HoldReplyPoller(GreenPoller): diff --git a/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_cast_dealer.py b/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_cast_dealer.py index 379d8ef..f1257ba 100644 --- a/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_cast_dealer.py +++ b/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_cast_dealer.py @@ -67,7 +67,7 @@ class DealerCastPublisher(zmq_cast_publisher.CastPublisherBase): if str(target) in self.outbound_sockets: dealer_socket, hosts = self.outbound_sockets[str(target)] else: - dealer_socket = self.zmq_context.socket(zmq.DEALER) + dealer_socket = zmq.Context().socket(zmq.DEALER) hosts = self.matchmaker.get_hosts(target) for host in hosts: self._connect_to_host(dealer_socket, host) diff --git a/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_cast_publisher.py b/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_cast_publisher.py index 0984545..38a470b 100644 --- a/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_cast_publisher.py +++ b/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_cast_publisher.py @@ -13,24 +13,15 @@ # under the License. import abc -import logging import six -from oslo_messaging._drivers.zmq_driver import zmq_async - - -LOG = logging.getLogger(__name__) - -zmq = zmq_async.import_zmq() - @six.add_metaclass(abc.ABCMeta) class CastPublisherBase(object): def __init__(self, conf): self.conf = conf - self.zmq_context = zmq.Context() self.outbound_sockets = {} super(CastPublisherBase, self).__init__() diff --git a/oslo_messaging/_drivers/zmq_driver/rpc/server/zmq_server.py b/oslo_messaging/_drivers/zmq_driver/rpc/server/zmq_server.py index 17b04e8..981966d 100644 --- a/oslo_messaging/_drivers/zmq_driver/rpc/server/zmq_server.py +++ b/oslo_messaging/_drivers/zmq_driver/rpc/server/zmq_server.py @@ -44,10 +44,10 @@ class ZmqServer(base.Listener): raise rpc_common.RPCException(errmsg) self.poller = zmq_async.get_poller() - self.poller.register(self.socket, self._receive_message) self.matchmaker = matchmaker def poll(self, timeout=None): + self.poller.register(self.socket, self._receive_message) incoming = self.poller.poll(timeout or self.conf.rpc_poll_timeout) return incoming[0] |