summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--oslo/messaging/_drivers/amqp.py19
-rw-r--r--oslo/messaging/_drivers/amqpdriver.py8
-rw-r--r--tests/test_rabbit.py4
3 files changed, 15 insertions, 16 deletions
diff --git a/oslo/messaging/_drivers/amqp.py b/oslo/messaging/_drivers/amqp.py
index 6b9c88e..4f30567 100644
--- a/oslo/messaging/_drivers/amqp.py
+++ b/oslo/messaging/_drivers/amqp.py
@@ -318,16 +318,17 @@ class _MsgIdCache(object):
"""AMQP consumers may read same message twice when exceptions occur
before ack is returned. This method prevents doing it.
"""
- if UNIQUE_ID in message_data:
- msg_id = message_data.get(UNIQUE_ID)
- if msg_id in self.prev_msgids:
- raise rpc_common.DuplicateMessageError(msg_id=msg_id)
-
- def add(self, message_data):
- if UNIQUE_ID in message_data:
+ try:
msg_id = message_data.pop(UNIQUE_ID)
- if msg_id not in self.prev_msgids:
- self.prev_msgids.append(msg_id)
+ except KeyError:
+ return
+ if msg_id in self.prev_msgids:
+ raise rpc_common.DuplicateMessageError(msg_id=msg_id)
+ return msg_id
+
+ def add(self, msg_id):
+ if msg_id and msg_id not in self.prev_msgids:
+ self.prev_msgids.append(msg_id)
def _add_unique_id(msg):
diff --git a/oslo/messaging/_drivers/amqpdriver.py b/oslo/messaging/_drivers/amqpdriver.py
index 8ccba92..b8b9fce 100644
--- a/oslo/messaging/_drivers/amqpdriver.py
+++ b/oslo/messaging/_drivers/amqpdriver.py
@@ -31,10 +31,11 @@ LOG = logging.getLogger(__name__)
class AMQPIncomingMessage(base.IncomingMessage):
- def __init__(self, listener, ctxt, message, msg_id, reply_q):
+ def __init__(self, listener, ctxt, message, unique_id, msg_id, reply_q):
super(AMQPIncomingMessage, self).__init__(listener, ctxt,
dict(message))
+ self.unique_id = unique_id
self.msg_id = msg_id
self.reply_q = reply_q
self.acknowledge_callback = message.acknowledge
@@ -67,7 +68,7 @@ class AMQPIncomingMessage(base.IncomingMessage):
self._send_reply(conn, ending=True)
def acknowledge(self):
- self.listener.msg_id_cache.add(self.message)
+ self.listener.msg_id_cache.add(self.unique_id)
self.acknowledge_callback()
def requeue(self):
@@ -92,12 +93,13 @@ class AMQPListener(base.Listener):
# FIXME(markmc): logging isn't driver specific
rpc_common._safe_log(LOG.debug, 'received %s', dict(message))
- self.msg_id_cache.check_duplicate_message(message)
+ unique_id = self.msg_id_cache.check_duplicate_message(message)
ctxt = rpc_amqp.unpack_context(self.conf, message)
self.incoming.append(AMQPIncomingMessage(self,
ctxt.to_dict(),
message,
+ unique_id,
ctxt.msg_id,
ctxt.reply_q))
diff --git a/tests/test_rabbit.py b/tests/test_rabbit.py
index 77afd41..8c08741 100644
--- a/tests/test_rabbit.py
+++ b/tests/test_rabbit.py
@@ -206,7 +206,6 @@ class TestSendReceive(test_utils.BaseTestCase):
senders[i].start()
received = listener.poll()
- received.message.pop('_unique_id')
self.assertIsNotNone(received)
self.assertEqual(received.ctxt, self.ctxt)
self.assertEqual(received.message, {'tx_id': i})
@@ -303,14 +302,12 @@ class TestRacyWaitForReply(test_utils.BaseTestCase):
senders[0].start()
msgs.append(listener.poll())
- msgs[-1].message.pop('_unique_id')
self.assertEqual(msgs[-1].message, {'tx_id': 0})
# Start the second guy, receive his message
senders[1].start()
msgs.append(listener.poll())
- msgs[-1].message.pop('_unique_id')
self.assertEqual(msgs[-1].message, {'tx_id': 1})
# Reply to both in order, making the second thread queue
@@ -605,7 +602,6 @@ class TestReplyWireFormat(test_utils.BaseTestCase):
producer.publish(msg)
received = listener.poll()
- received.message.pop('_unique_id')
self.assertIsNotNone(received)
self.assertEqual(self.expected_ctxt, received.ctxt)
self.assertEqual(self.expected, received.message)