summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorVictor Sergeyev <vsergeyev@mirantis.com>2015-07-30 17:08:37 +0300
committerVictor Sergeyev <vsergeyev@mirantis.com>2015-07-30 17:18:08 +0300
commited2d60ff011560078b3bfa6a5097f6e72af6b5dc (patch)
tree5fefc77963ab1c41513f0e658b26489f087c2159
parentc90525bfead1f495df86c5c5d795d25abad2e1d9 (diff)
downloadoslo-messaging-ed2d60ff011560078b3bfa6a5097f6e72af6b5dc.tar.gz
ZMQ: `Lazify` driver code
Some OpenStack services (e.g. Glance) makes a forks, so there is a sense to initialize socket and thread related stuff `on demand`, not in __init__(). Change-Id: Ie2012b31df86049cc841a0aaed16e6b879e0bcec
-rw-r--r--oslo_messaging/_drivers/zmq_driver/poller/green_poller.py12
-rw-r--r--oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_cast_dealer.py2
-rw-r--r--oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_cast_publisher.py9
-rw-r--r--oslo_messaging/_drivers/zmq_driver/rpc/server/zmq_server.py2
4 files changed, 8 insertions, 17 deletions
diff --git a/oslo_messaging/_drivers/zmq_driver/poller/green_poller.py b/oslo_messaging/_drivers/zmq_driver/poller/green_poller.py
index 72429f1..dcf9da5 100644
--- a/oslo_messaging/_drivers/zmq_driver/poller/green_poller.py
+++ b/oslo_messaging/_drivers/zmq_driver/poller/green_poller.py
@@ -29,12 +29,12 @@ class GreenPoller(zmq_poller.ZmqPoller):
def __init__(self):
self.incoming_queue = six.moves.queue.Queue()
self.green_pool = eventlet.GreenPool()
- self.threads = []
+ self.thread_by_socket = {}
def register(self, socket, recv_method=None):
- self.threads.append(
- self.green_pool.spawn(self._socket_receive, socket,
- recv_method))
+ if socket not in self.thread_by_socket:
+ self.thread_by_socket[socket] = self.green_pool.spawn(
+ self._socket_receive, socket, recv_method)
def _socket_receive(self, socket, recv_method=None):
while True:
@@ -59,10 +59,10 @@ class GreenPoller(zmq_poller.ZmqPoller):
return incoming[0], incoming[1]
def close(self):
- for thread in self.threads:
+ for thread in self.thread_by_socket.values():
thread.kill()
- self.threads = []
+ self.thread_by_socket = {}
class HoldReplyPoller(GreenPoller):
diff --git a/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_cast_dealer.py b/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_cast_dealer.py
index 379d8ef..f1257ba 100644
--- a/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_cast_dealer.py
+++ b/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_cast_dealer.py
@@ -67,7 +67,7 @@ class DealerCastPublisher(zmq_cast_publisher.CastPublisherBase):
if str(target) in self.outbound_sockets:
dealer_socket, hosts = self.outbound_sockets[str(target)]
else:
- dealer_socket = self.zmq_context.socket(zmq.DEALER)
+ dealer_socket = zmq.Context().socket(zmq.DEALER)
hosts = self.matchmaker.get_hosts(target)
for host in hosts:
self._connect_to_host(dealer_socket, host)
diff --git a/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_cast_publisher.py b/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_cast_publisher.py
index 0984545..38a470b 100644
--- a/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_cast_publisher.py
+++ b/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_cast_publisher.py
@@ -13,24 +13,15 @@
# under the License.
import abc
-import logging
import six
-from oslo_messaging._drivers.zmq_driver import zmq_async
-
-
-LOG = logging.getLogger(__name__)
-
-zmq = zmq_async.import_zmq()
-
@six.add_metaclass(abc.ABCMeta)
class CastPublisherBase(object):
def __init__(self, conf):
self.conf = conf
- self.zmq_context = zmq.Context()
self.outbound_sockets = {}
super(CastPublisherBase, self).__init__()
diff --git a/oslo_messaging/_drivers/zmq_driver/rpc/server/zmq_server.py b/oslo_messaging/_drivers/zmq_driver/rpc/server/zmq_server.py
index 17b04e8..981966d 100644
--- a/oslo_messaging/_drivers/zmq_driver/rpc/server/zmq_server.py
+++ b/oslo_messaging/_drivers/zmq_driver/rpc/server/zmq_server.py
@@ -44,10 +44,10 @@ class ZmqServer(base.Listener):
raise rpc_common.RPCException(errmsg)
self.poller = zmq_async.get_poller()
- self.poller.register(self.socket, self._receive_message)
self.matchmaker = matchmaker
def poll(self, timeout=None):
+ self.poller.register(self.socket, self._receive_message)
incoming = self.poller.poll(timeout or self.conf.rpc_poll_timeout)
return incoming[0]