summaryrefslogtreecommitdiff
path: root/nova/rpc.py
diff options
context:
space:
mode:
authorChris Behrens <cbehrens@codestud.com>2011-05-25 15:42:24 -0700
committertermie <github@anarkystic.com>2011-05-25 15:42:24 -0700
commitb44c1fe9561ee8754137d2700bab295f20a4032b (patch)
tree61c0d97212cb57a06948c201d7032c9e62dca1a2 /nova/rpc.py
parent7622e854ef68fbdbfc531690cf74916301956c8e (diff)
downloadnova-b44c1fe9561ee8754137d2700bab295f20a4032b.tar.gz
Add a connection pool for rpc cast/call
Use the same rabbit connection for all topic listening and wait to be notified vs doing a 0.1 second poll for each.
Diffstat (limited to 'nova/rpc.py')
-rw-r--r--nova/rpc.py96
1 files changed, 72 insertions, 24 deletions
diff --git a/nova/rpc.py b/nova/rpc.py
index f43291c4b7..62590ca928 100644
--- a/nova/rpc.py
+++ b/nova/rpc.py
@@ -35,9 +35,9 @@ from carrot import messaging
import eventlet
from eventlet import greenpool
from eventlet import greenthread
+from eventlet import pools
from eventlet import queue
-
from nova import context
from nova import exception
from nova import fakerabbit
@@ -92,6 +92,11 @@ class Connection(carrot_connection.BrokerConnection):
pass
return cls.instance()
+class Pool(pools.Pool):
+ def create(self):
+ return Connection.instance(new=True)
+
+ConnectionPool = Pool(max_size=20)
class Consumer(messaging.Consumer):
"""Consumer base class.
@@ -163,21 +168,9 @@ class AdapterConsumer(Consumer):
self.pool = greenpool.GreenPool(FLAGS.rpc_thread_pool_size)
super(AdapterConsumer, self).__init__(connection=connection,
topic=topic)
+ self.register_callback(self.process_data)
- def receive(self, *args, **kwargs):
- self.pool.spawn_n(self._receive, *args, **kwargs)
-
- @exception.wrap_exception
- def _receive(self, message_data, message):
- """Magically looks for a method on the proxy object and calls it.
-
- Message data should be a dictionary with two keys:
- method: string representing the method to call
- args: dictionary of arg: value
-
- Example: {'method': 'echo', 'args': {'value': 42}}
-
- """
+ def process_data(self, message_data, message):
LOG.debug(_('received %s') % message_data)
msg_id = message_data.pop('_msg_id', None)
@@ -194,6 +187,19 @@ class AdapterConsumer(Consumer):
LOG.warn(_('no method for message: %s') % message_data)
msg_reply(msg_id, _('No method for message: %s') % message_data)
return
+ self.pool.spawn_n(self._process_data, msg_id, ctxt, method, args)
+
+ @exception.wrap_exception
+ def _process_data(self, msg_id, ctxt, method, args):
+ """Magically looks for a method on the proxy object and calls it.
+
+ Message data should be a dictionary with two keys:
+ method: string representing the method to call
+ args: dictionary of arg: value
+
+ Example: {'method': 'echo', 'args': {'value': 42}}
+
+ """
node_func = getattr(self.proxy, str(method))
node_args = dict((str(k), v) for k, v in args.iteritems())
@@ -214,11 +220,6 @@ class AdapterConsumer(Consumer):
return
-class Publisher(messaging.Publisher):
- """Publisher base class."""
- pass
-
-
class TopicAdapterConsumer(AdapterConsumer):
"""Consumes messages on a specific topic."""
@@ -251,6 +252,50 @@ class FanoutAdapterConsumer(AdapterConsumer):
topic=topic, proxy=proxy)
+class ConsumerSet(object):
+ """Groups consumers to listen on together on a single connection"""
+
+ def __init__(self, conn, consumer_list):
+ self.consumer_list = set(consumer_list)
+ self.consumer_set = None
+ self.init(conn)
+
+ def init(self, conn):
+ if not conn:
+ conn = Connection.instance(new=True)
+ if self.consumer_set:
+ self.consumer_set.close()
+ self.consumer_set = messaging.ConsumerSet(conn)
+ for consumer in self.consumer_list:
+ consumer.connection = conn
+ # consumer.backend is set for us
+ self.consumer_set.add_consumer(consumer)
+
+ def reconnect(self):
+ self.init(None)
+
+ def wait(self, limit=None):
+ while True:
+ it = self.consumer_set.iterconsume(limit=limit)
+ while True:
+ try:
+ it.next()
+ except StopIteration:
+ return
+ except Exception as e:
+ LOG.error(_("Received exception %s " % str(e) + \
+ "while processing consumer"))
+ fuck
+ self.reconnect()
+ # Break to outer loop
+ break
+
+
+class Publisher(messaging.Publisher):
+ """Publisher base class."""
+ pass
+
+
class TopicPublisher(Publisher):
"""Publishes messages on a specific topic."""
@@ -315,7 +360,7 @@ def msg_reply(msg_id, reply=None, failure=None):
LOG.error(_("Returning exception %s to caller"), message)
LOG.error(tb)
failure = (failure[0].__name__, str(failure[1]), tb)
- conn = Connection.instance()
+ conn = ConnectionPool.get()
publisher = DirectPublisher(connection=conn, msg_id=msg_id)
try:
publisher.send({'result': reply, 'failure': failure})
@@ -324,7 +369,9 @@ def msg_reply(msg_id, reply=None, failure=None):
{'result': dict((k, repr(v))
for k, v in reply.__dict__.iteritems()),
'failure': failure})
+
publisher.close()
+ ConnectionPool.put(conn)
class RemoteError(exception.Error):
@@ -393,12 +440,11 @@ def multicall(context, topic, msg):
LOG.debug(_('MSG_ID is %s') % (msg_id))
_pack_context(msg, context)
- conn = Connection.instance()
+ conn = ConnectionPool.get()
consumer = DirectConsumer(connection=conn, msg_id=msg_id)
wait_msg = MulticallWaiter(consumer)
consumer.register_callback(wait_msg)
- conn = Connection.instance()
publisher = TopicPublisher(connection=conn, topic=topic)
publisher.send(msg)
publisher.close()
@@ -462,10 +508,11 @@ def cast(context, topic, msg):
"""Sends a message on a topic without waiting for a response."""
LOG.debug(_('Making asynchronous cast on %s...'), topic)
_pack_context(msg, context)
- conn = Connection.instance()
+ conn = ConnectionPool.get()
publisher = TopicPublisher(connection=conn, topic=topic)
publisher.send(msg)
publisher.close()
+ ConnectionPool.put(conn)
def fanout_cast(context, topic, msg):
@@ -511,6 +558,7 @@ def send_message(topic, message, wait=True):
if wait:
consumer.wait()
+ consumer.close()
if __name__ == '__main__':