summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorZuul <zuul@review.openstack.org>2017-11-30 08:01:37 +0000
committerGerrit Code Review <review@openstack.org>2017-11-30 08:01:37 +0000
commit8ac97450c8e33adb3d424905528f28a3dee203bf (patch)
treeda930a60fb6d55c1d726898203fe87ccef2a4418
parentc7eb5cac660e4cbe46749491a244c821aced842e (diff)
parentc38857e1101027a734a35f4e80bc4084fabc034b (diff)
downloadoslo-messaging-8ac97450c8e33adb3d424905528f28a3dee203bf.tar.gz
Merge "rabbitmq: don't wait for message ack/requeue"
-rw-r--r--oslo_messaging/_drivers/amqpdriver.py27
-rw-r--r--oslo_messaging/server.py12
-rw-r--r--releasenotes/notes/rabbit-no-wait-for-ack-9e5de3e1320d7660.yaml12
3 files changed, 16 insertions, 35 deletions
diff --git a/oslo_messaging/_drivers/amqpdriver.py b/oslo_messaging/_drivers/amqpdriver.py
index 539e48b..deeaba2 100644
--- a/oslo_messaging/_drivers/amqpdriver.py
+++ b/oslo_messaging/_drivers/amqpdriver.py
@@ -56,13 +56,6 @@ class MessageOperationsHandler(object):
target=self._process_in_background)
self._shutdown_thread.daemon = True
- # HACK(sileht): this is set by the server.Server temporary
- # to not have to rewrite the entire internal API to pass
- # executor everywhere to make Listener aware of the server
- # executor. All this hack is only for the blocking executor.
- # And it's deprecated so...
- self._executor = None
-
def stop(self):
self._shutdown.set()
@@ -82,26 +75,14 @@ class MessageOperationsHandler(object):
while True:
try:
- task, event = self._tasks.get(block=False)
+ task = self._tasks.get(block=False)
except moves.queue.Empty:
break
- try:
- task()
- finally:
- event.set()
+ task()
def do(self, task):
- "Put the task in the queue and waits until the task is completed."
- if self._executor is None:
- raise RuntimeError("Unexpected error, no executor is setuped")
- elif self._executor == "blocking":
- # NOTE(sileht): Blocking will hang forever if we waiting the
- # polling thread
- task()
- else:
- event = threading.Event()
- self._tasks.put((task, event))
- event.wait()
+ "Put the task in the queue."
+ self._tasks.put(task)
class AMQPIncomingMessage(base.RpcIncomingMessage):
diff --git a/oslo_messaging/server.py b/oslo_messaging/server.py
index c8e77a6..d2e50ac 100644
--- a/oslo_messaging/server.py
+++ b/oslo_messaging/server.py
@@ -417,18 +417,6 @@ class MessageHandlingServer(service.ServiceBase, _OrderedTaskRunner):
except driver_base.TransportDriverError as ex:
raise ServerListenError(self.target, ex)
- # HACK(sileht): We temporary pass the executor to the rabbit
- # listener to fix a race with the deprecated blocking executor.
- # We do this hack because this is need only for 'synchronous'
- # executor like blocking. And this one is deprecated. Making
- # driver working in an sync and an async way is complicated
- # and blocking have 0% tests coverage.
- if hasattr(self.listener, '_poll_style_listener'):
- l = self.listener._poll_style_listener
- if hasattr(l, "_message_operations_handler"):
- l._message_operations_handler._executor = (
- self.executor_type)
-
self.listener.start(self._on_incoming)
@ordered(after='start')
diff --git a/releasenotes/notes/rabbit-no-wait-for-ack-9e5de3e1320d7660.yaml b/releasenotes/notes/rabbit-no-wait-for-ack-9e5de3e1320d7660.yaml
new file mode 100644
index 0000000..4b9d47a
--- /dev/null
+++ b/releasenotes/notes/rabbit-no-wait-for-ack-9e5de3e1320d7660.yaml
@@ -0,0 +1,12 @@
+---
+other:
+ - |
+ On rabbitmq, in the past, acknownlegement of messages was done within the
+ application callback thread/greenlet. This thread was blocked until the
+ message was ack. In newton, we rewrote the message acknownlegement to
+ ensure we haven't two threads writting the the socket at the same times.
+ Now all pendings ack are done by the main thread. They are no more reason
+ to block the application callback thread until the message is ack. Other
+ driver already release the application callback threads before the message
+ is acknownleged. This is also the case for rabbitmq, now.
+