diff options
Diffstat (limited to 'oslo_messaging/_drivers/amqp1_driver/controller.py')
-rw-r--r-- | oslo_messaging/_drivers/amqp1_driver/controller.py | 97 |
1 files changed, 76 insertions, 21 deletions
diff --git a/oslo_messaging/_drivers/amqp1_driver/controller.py b/oslo_messaging/_drivers/amqp1_driver/controller.py index 0583d0e..6ef60fd 100644 --- a/oslo_messaging/_drivers/amqp1_driver/controller.py +++ b/oslo_messaging/_drivers/amqp1_driver/controller.py @@ -110,19 +110,24 @@ class SendTask(Task): self._retry = None if retry is None or retry < 0 else retry self._wakeup = threading.Event() self._error = None + self._sender = None def wait(self): self._wakeup.wait() return self._error def _execute(self, controller): + if self.deadline: + # time out the send + self.timer = controller.processor.alarm(self._on_timeout, + self.deadline) controller.send(self) def _prepare(self, sender): """Called immediately before the message is handed off to the i/o system. This implies that the sender link is up. """ - pass + self._sender = sender def _on_ack(self, state, info): """If wait_for_ack is True, this is called by the eventloop thread when @@ -143,10 +148,10 @@ class SendTask(Task): self._wakeup.set() def _on_timeout(self): - """Invoked by the eventloop when the send fails to complete before the - timeout is reached. + """Invoked by the eventloop when our timer expires """ self.timer = None + self._sender and self._sender.cancel_send(self) msg = ("{name} message sent to {target} failed: timed" " out".format(name=self.name, target=self.target)) LOG.warning("%s", msg) @@ -172,6 +177,7 @@ class SendTask(Task): self._wakeup.set() def _cleanup(self): + self._sender = None if self.timer: self.timer.cancel() self.timer = None @@ -202,6 +208,7 @@ class RPCCallTask(SendTask): return error or self._reply_msg def _prepare(self, sender): + super(RPCCallTask, self)._prepare(sender) # reserve a message id for mapping the received response if self._msg_id: # already set so this is a re-transmit. To be safe cancel the old @@ -214,7 +221,6 @@ class RPCCallTask(SendTask): def _on_reply(self, message): # called if/when the reply message arrives self._reply_msg = message - self._msg_id = None # to prevent _cleanup() from cancelling it self._cleanup() self._wakeup.set() @@ -226,10 +232,60 @@ class RPCCallTask(SendTask): def _cleanup(self): if self._msg_id: self._reply_link.cancel_response(self._msg_id) + self._msg_id = None self._reply_link = None super(RPCCallTask, self)._cleanup() +class RPCMonitoredCallTask(RPCCallTask): + """An RPC call which expects a periodic heartbeat until the response is + received. There are two timeouts: + deadline - overall hard timeout, implemented in RPCCallTask + call_monitor_timeout - keep alive timeout, reset when heartbeat arrives + """ + def __init__(self, target, message, deadline, call_monitor_timeout, + retry, wait_for_ack): + super(RPCMonitoredCallTask, self).__init__(target, message, deadline, + retry, wait_for_ack) + assert call_monitor_timeout is not None # nosec + self._monitor_timeout = call_monitor_timeout + self._monitor_timer = None + self._set_alarm = None + + def _execute(self, controller): + self._set_alarm = controller.processor.defer + self._monitor_timer = self._set_alarm(self._call_timeout, + self._monitor_timeout) + super(RPCMonitoredCallTask, self)._execute(controller) + + def _call_timeout(self): + # monitor_timeout expired + self._monitor_timer = None + self._sender and self._sender.cancel_send(self) + msg = ("{name} message sent to {target} failed: call monitor timed" + " out".format(name=self.name, target=self.target)) + LOG.warning("%s", msg) + self._error = exceptions.MessagingTimeout(msg) + self._cleanup() + self._wakeup.set() + + def _on_reply(self, message): + # if reply is null, then this is the call monitor heartbeat + if message.body is None: + self._monitor_timer.cancel() + self._monitor_timer = self._set_alarm(self._call_timeout, + self._monitor_timeout) + else: + super(RPCMonitoredCallTask, self)._on_reply(message) + + def _cleanup(self): + self._set_alarm = None + if self._monitor_timer: + self._monitor_timer.cancel() + self._monitor_timer = None + super(RPCMonitoredCallTask, self)._cleanup() + + class MessageDispositionTask(Task): """A task that updates the message disposition as accepted or released for a Server @@ -329,23 +385,22 @@ class Sender(pyngus.SenderEventHandler): def send_message(self, send_task): """Send a message out the link. """ - if send_task.deadline: - def timer_callback(): - # may be in either list, or none - self._unacked.discard(send_task) - try: - self._pending_sends.remove(send_task) - except ValueError: - pass - send_task._on_timeout() - send_task.timer = self._scheduler.alarm(timer_callback, - send_task.deadline) - if not self._can_send or self._pending_sends: self._pending_sends.append(send_task) else: self._send(send_task) + def cancel_send(self, send_task): + """Attempts to cancel a send request. It is possible that the send has + already completed, so this is best-effort. + """ + # may be in either list, or none + self._unacked.discard(send_task) + try: + self._pending_sends.remove(send_task) + except ValueError: + pass + # Pyngus callbacks: def sender_active(self, sender_link): @@ -537,10 +592,12 @@ class Replies(pyngus.ReceiverEventHandler): def prepare_for_response(self, request, callback): """Apply a unique message identifier to this request message. This will - be used to identify messages sent in reply. The identifier is placed - in the 'id' field of the request message. It is expected that the - identifier will appear in the 'correlation-id' field of the + be used to identify messages received in reply. The identifier is + placed in the 'id' field of the request message. It is expected that + the identifier will appear in the 'correlation-id' field of the corresponding response message. + + When the caller is done receiving replies, it must call cancel_response """ request.id = uuid.uuid4().hex # reply is placed on reply_queue @@ -597,8 +654,6 @@ class Replies(pyngus.ReceiverEventHandler): key = message.correlation_id try: self._correlation[key](message) - # cleanup (only need one response per request) - del self._correlation[key] receiver.message_accepted(handle) except KeyError: LOG.warning(_LW("Can't find receiver for response msg id=%s, " |