summaryrefslogtreecommitdiff
path: root/oslo_messaging/_drivers/amqp1_driver/controller.py
diff options
context:
space:
mode:
Diffstat (limited to 'oslo_messaging/_drivers/amqp1_driver/controller.py')
-rw-r--r--oslo_messaging/_drivers/amqp1_driver/controller.py97
1 files changed, 76 insertions, 21 deletions
diff --git a/oslo_messaging/_drivers/amqp1_driver/controller.py b/oslo_messaging/_drivers/amqp1_driver/controller.py
index 0583d0e..6ef60fd 100644
--- a/oslo_messaging/_drivers/amqp1_driver/controller.py
+++ b/oslo_messaging/_drivers/amqp1_driver/controller.py
@@ -110,19 +110,24 @@ class SendTask(Task):
self._retry = None if retry is None or retry < 0 else retry
self._wakeup = threading.Event()
self._error = None
+ self._sender = None
def wait(self):
self._wakeup.wait()
return self._error
def _execute(self, controller):
+ if self.deadline:
+ # time out the send
+ self.timer = controller.processor.alarm(self._on_timeout,
+ self.deadline)
controller.send(self)
def _prepare(self, sender):
"""Called immediately before the message is handed off to the i/o
system. This implies that the sender link is up.
"""
- pass
+ self._sender = sender
def _on_ack(self, state, info):
"""If wait_for_ack is True, this is called by the eventloop thread when
@@ -143,10 +148,10 @@ class SendTask(Task):
self._wakeup.set()
def _on_timeout(self):
- """Invoked by the eventloop when the send fails to complete before the
- timeout is reached.
+ """Invoked by the eventloop when our timer expires
"""
self.timer = None
+ self._sender and self._sender.cancel_send(self)
msg = ("{name} message sent to {target} failed: timed"
" out".format(name=self.name, target=self.target))
LOG.warning("%s", msg)
@@ -172,6 +177,7 @@ class SendTask(Task):
self._wakeup.set()
def _cleanup(self):
+ self._sender = None
if self.timer:
self.timer.cancel()
self.timer = None
@@ -202,6 +208,7 @@ class RPCCallTask(SendTask):
return error or self._reply_msg
def _prepare(self, sender):
+ super(RPCCallTask, self)._prepare(sender)
# reserve a message id for mapping the received response
if self._msg_id:
# already set so this is a re-transmit. To be safe cancel the old
@@ -214,7 +221,6 @@ class RPCCallTask(SendTask):
def _on_reply(self, message):
# called if/when the reply message arrives
self._reply_msg = message
- self._msg_id = None # to prevent _cleanup() from cancelling it
self._cleanup()
self._wakeup.set()
@@ -226,10 +232,60 @@ class RPCCallTask(SendTask):
def _cleanup(self):
if self._msg_id:
self._reply_link.cancel_response(self._msg_id)
+ self._msg_id = None
self._reply_link = None
super(RPCCallTask, self)._cleanup()
+class RPCMonitoredCallTask(RPCCallTask):
+ """An RPC call which expects a periodic heartbeat until the response is
+ received. There are two timeouts:
+ deadline - overall hard timeout, implemented in RPCCallTask
+ call_monitor_timeout - keep alive timeout, reset when heartbeat arrives
+ """
+ def __init__(self, target, message, deadline, call_monitor_timeout,
+ retry, wait_for_ack):
+ super(RPCMonitoredCallTask, self).__init__(target, message, deadline,
+ retry, wait_for_ack)
+ assert call_monitor_timeout is not None # nosec
+ self._monitor_timeout = call_monitor_timeout
+ self._monitor_timer = None
+ self._set_alarm = None
+
+ def _execute(self, controller):
+ self._set_alarm = controller.processor.defer
+ self._monitor_timer = self._set_alarm(self._call_timeout,
+ self._monitor_timeout)
+ super(RPCMonitoredCallTask, self)._execute(controller)
+
+ def _call_timeout(self):
+ # monitor_timeout expired
+ self._monitor_timer = None
+ self._sender and self._sender.cancel_send(self)
+ msg = ("{name} message sent to {target} failed: call monitor timed"
+ " out".format(name=self.name, target=self.target))
+ LOG.warning("%s", msg)
+ self._error = exceptions.MessagingTimeout(msg)
+ self._cleanup()
+ self._wakeup.set()
+
+ def _on_reply(self, message):
+ # if reply is null, then this is the call monitor heartbeat
+ if message.body is None:
+ self._monitor_timer.cancel()
+ self._monitor_timer = self._set_alarm(self._call_timeout,
+ self._monitor_timeout)
+ else:
+ super(RPCMonitoredCallTask, self)._on_reply(message)
+
+ def _cleanup(self):
+ self._set_alarm = None
+ if self._monitor_timer:
+ self._monitor_timer.cancel()
+ self._monitor_timer = None
+ super(RPCMonitoredCallTask, self)._cleanup()
+
+
class MessageDispositionTask(Task):
"""A task that updates the message disposition as accepted or released
for a Server
@@ -329,23 +385,22 @@ class Sender(pyngus.SenderEventHandler):
def send_message(self, send_task):
"""Send a message out the link.
"""
- if send_task.deadline:
- def timer_callback():
- # may be in either list, or none
- self._unacked.discard(send_task)
- try:
- self._pending_sends.remove(send_task)
- except ValueError:
- pass
- send_task._on_timeout()
- send_task.timer = self._scheduler.alarm(timer_callback,
- send_task.deadline)
-
if not self._can_send or self._pending_sends:
self._pending_sends.append(send_task)
else:
self._send(send_task)
+ def cancel_send(self, send_task):
+ """Attempts to cancel a send request. It is possible that the send has
+ already completed, so this is best-effort.
+ """
+ # may be in either list, or none
+ self._unacked.discard(send_task)
+ try:
+ self._pending_sends.remove(send_task)
+ except ValueError:
+ pass
+
# Pyngus callbacks:
def sender_active(self, sender_link):
@@ -537,10 +592,12 @@ class Replies(pyngus.ReceiverEventHandler):
def prepare_for_response(self, request, callback):
"""Apply a unique message identifier to this request message. This will
- be used to identify messages sent in reply. The identifier is placed
- in the 'id' field of the request message. It is expected that the
- identifier will appear in the 'correlation-id' field of the
+ be used to identify messages received in reply. The identifier is
+ placed in the 'id' field of the request message. It is expected that
+ the identifier will appear in the 'correlation-id' field of the
corresponding response message.
+
+ When the caller is done receiving replies, it must call cancel_response
"""
request.id = uuid.uuid4().hex
# reply is placed on reply_queue
@@ -597,8 +654,6 @@ class Replies(pyngus.ReceiverEventHandler):
key = message.correlation_id
try:
self._correlation[key](message)
- # cleanup (only need one response per request)
- del self._correlation[key]
receiver.message_accepted(handle)
except KeyError:
LOG.warning(_LW("Can't find receiver for response msg id=%s, "