summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorZuul <zuul@review.opendev.org>2019-12-18 23:58:11 +0000
committerGerrit Code Review <review@openstack.org>2019-12-18 23:58:11 +0000
commit4e49a286e7da174d2e778c030d4ed2dba1218d3f (patch)
tree7f600896e642bc8302f1c4a445b8060546ef5bf5
parent524f72261c1d0578d3cbc4ea9e7a959e60bc51fe (diff)
parentd873c0d8f5722d5e2a08b5fc49d58b63dbe061a0 (diff)
downloadoslo-messaging-4e49a286e7da174d2e778c030d4ed2dba1218d3f.tar.gz
Merge "Do not use threading.Event"
-rw-r--r--lower-constraints.txt2
-rw-r--r--oslo_messaging/_drivers/amqp1_driver/controller.py5
-rw-r--r--oslo_messaging/_drivers/amqpdriver.py9
-rw-r--r--oslo_messaging/_drivers/impl_fake.py3
-rw-r--r--oslo_messaging/_drivers/impl_kafka.py2
-rw-r--r--oslo_messaging/rpc/dispatcher.py4
-rw-r--r--oslo_messaging/tests/drivers/test_amqp_driver.py9
-rw-r--r--oslo_messaging/tests/drivers/test_impl_rabbit.py3
-rw-r--r--oslo_messaging/tests/rpc/test_server.py27
-rw-r--r--oslo_messaging/tests/utils.py5
-rw-r--r--requirements.txt2
11 files changed, 40 insertions, 31 deletions
diff --git a/lower-constraints.txt b/lower-constraints.txt
index ff23649..abf4267 100644
--- a/lower-constraints.txt
+++ b/lower-constraints.txt
@@ -48,7 +48,7 @@ oslo.log==3.36.0
oslo.middleware==3.31.0
oslo.serialization==2.18.0
oslo.service==1.24.0
-oslo.utils==3.33.0
+oslo.utils==3.37.0
oslotest==3.2.0
Paste==2.0.2
PasteDeploy==1.5.0
diff --git a/oslo_messaging/_drivers/amqp1_driver/controller.py b/oslo_messaging/_drivers/amqp1_driver/controller.py
index 6803c91..24dfc99 100644
--- a/oslo_messaging/_drivers/amqp1_driver/controller.py
+++ b/oslo_messaging/_drivers/amqp1_driver/controller.py
@@ -35,6 +35,7 @@ import threading
import time
import uuid
+from oslo_utils import eventletutils
import proton
import pyngus
from six import iteritems
@@ -85,7 +86,7 @@ class SubscribeTask(Task):
self._subscriber_id = listener.id
self._in_queue = listener.incoming
self._service = SERVICE_NOTIFY if notifications else SERVICE_RPC
- self._wakeup = threading.Event()
+ self._wakeup = eventletutils.Event()
def wait(self):
self._wakeup.wait()
@@ -112,7 +113,7 @@ class SendTask(Task):
self.service = SERVICE_NOTIFY if notification else SERVICE_RPC
self.timer = None
self._retry = None if retry is None or retry < 0 else retry
- self._wakeup = threading.Event()
+ self._wakeup = eventletutils.Event()
self._error = None
self._sender = None
diff --git a/oslo_messaging/_drivers/amqpdriver.py b/oslo_messaging/_drivers/amqpdriver.py
index ccab0ce..b4d0a23 100644
--- a/oslo_messaging/_drivers/amqpdriver.py
+++ b/oslo_messaging/_drivers/amqpdriver.py
@@ -19,6 +19,7 @@ import time
import uuid
import cachetools
+from oslo_utils import eventletutils
from oslo_utils import timeutils
from six import moves
@@ -49,7 +50,7 @@ class MessageOperationsHandler(object):
self.name = "%s (%s)" % (name, hex(id(self)))
self._tasks = moves.queue.Queue()
- self._shutdown = threading.Event()
+ self._shutdown = eventletutils.Event()
self._shutdown_thread = threading.Thread(
target=self._process_in_background)
self._shutdown_thread.daemon = True
@@ -270,8 +271,8 @@ class AMQPListener(base.PollStyleListener):
self.conn = conn
self.msg_id_cache = rpc_amqp._MsgIdCache()
self.incoming = []
- self._shutdown = threading.Event()
- self._shutoff = threading.Event()
+ self._shutdown = eventletutils.Event()
+ self._shutoff = eventletutils.Event()
self._obsolete_reply_queues = ObsoleteReplyQueuesCache()
self._message_operations_handler = MessageOperationsHandler(
"AMQPListener")
@@ -434,7 +435,7 @@ class ReplyWaiter(object):
self.conn.declare_direct_consumer(reply_q, self)
- self._thread_exit_event = threading.Event()
+ self._thread_exit_event = eventletutils.Event()
self._thread = threading.Thread(target=self.poll)
self._thread.daemon = True
self._thread.start()
diff --git a/oslo_messaging/_drivers/impl_fake.py b/oslo_messaging/_drivers/impl_fake.py
index c5476fd..47fa4a3 100644
--- a/oslo_messaging/_drivers/impl_fake.py
+++ b/oslo_messaging/_drivers/impl_fake.py
@@ -18,6 +18,7 @@ import threading
import time
from oslo_serialization import jsonutils
+from oslo_utils import eventletutils
from six import moves
import oslo_messaging
@@ -49,7 +50,7 @@ class FakeListener(base.PollStyleListener):
self._exchange_manager = exchange_manager
self._targets = targets
self._pool = pool
- self._stopped = threading.Event()
+ self._stopped = eventletutils.Event()
# NOTE(sileht): Ensure that all needed queues exists even the listener
# have not been polled yet
diff --git a/oslo_messaging/_drivers/impl_kafka.py b/oslo_messaging/_drivers/impl_kafka.py
index 07b473d..88fdb7e 100644
--- a/oslo_messaging/_drivers/impl_kafka.py
+++ b/oslo_messaging/_drivers/impl_kafka.py
@@ -347,7 +347,7 @@ class KafkaListener(base.PollStyleListener):
def __init__(self, conn):
super(KafkaListener, self).__init__()
- self._stopped = threading.Event()
+ self._stopped = eventletutils.Event()
self.conn = conn
self.incoming_queue = []
diff --git a/oslo_messaging/rpc/dispatcher.py b/oslo_messaging/rpc/dispatcher.py
index 0db66d0..c118931 100644
--- a/oslo_messaging/rpc/dispatcher.py
+++ b/oslo_messaging/rpc/dispatcher.py
@@ -24,6 +24,8 @@ import threading
import six
+from oslo_utils import eventletutils
+
from oslo_messaging import _utils as utils
from oslo_messaging import dispatcher
from oslo_messaging import serializer as msg_serializer
@@ -249,7 +251,7 @@ class RPCDispatcher(dispatcher.DispatcherBase):
# is executing if it runs for some time. The thread will wait
# for the event to be signaled, which we do explicitly below
# after dispatching the method call.
- completion_event = threading.Event()
+ completion_event = eventletutils.Event()
watchdog_thread = threading.Thread(target=self._watchdog,
args=(completion_event, incoming))
if incoming.client_timeout:
diff --git a/oslo_messaging/tests/drivers/test_amqp_driver.py b/oslo_messaging/tests/drivers/test_amqp_driver.py
index 831e161..cfcc067 100644
--- a/oslo_messaging/tests/drivers/test_amqp_driver.py
+++ b/oslo_messaging/tests/drivers/test_amqp_driver.py
@@ -27,6 +27,7 @@ import threading
import time
import uuid
+from oslo_utils import eventletutils
from oslo_utils import importutils
from six import moves
from string import Template
@@ -75,8 +76,8 @@ class _ListenerThread(threading.Thread):
self._msg_ack = msg_ack
self.messages = moves.queue.Queue()
self.daemon = True
- self.started = threading.Event()
- self._done = threading.Event()
+ self.started = eventletutils.Event()
+ self._done = eventletutils.Event()
self.start()
self.started.wait()
@@ -1146,7 +1147,7 @@ class TestLinkRecovery(_AmqpBrokerTestCase):
self._addrs = {'unicast.test-topic': 2,
'broadcast.test-topic.all': 2,
'exclusive.test-topic.server': 2}
- self._recovered = threading.Event()
+ self._recovered = eventletutils.Event()
self._count = 0
def _on_active(link):
@@ -2100,7 +2101,7 @@ class FakeBroker(threading.Thread):
self._connections = {}
self._sources = {}
- self._pause = threading.Event()
+ self._pause = eventletutils.Event()
# count of messages forwarded, by messaging pattern
self.direct_count = 0
self.topic_count = 0
diff --git a/oslo_messaging/tests/drivers/test_impl_rabbit.py b/oslo_messaging/tests/drivers/test_impl_rabbit.py
index ce6d11d..b7ab4f3 100644
--- a/oslo_messaging/tests/drivers/test_impl_rabbit.py
+++ b/oslo_messaging/tests/drivers/test_impl_rabbit.py
@@ -23,6 +23,7 @@ import fixtures
import kombu
import kombu.transport.memory
from oslo_serialization import jsonutils
+from oslo_utils import eventletutils
import testscenarios
import oslo_messaging
@@ -49,7 +50,7 @@ class TestHeartbeat(test_utils.BaseTestCase):
fake_logger, heartbeat_side_effect=None,
info=None):
- event = threading.Event()
+ event = eventletutils.Event()
def heartbeat_check(rate=2):
event.set()
diff --git a/oslo_messaging/tests/rpc/test_server.py b/oslo_messaging/tests/rpc/test_server.py
index 693e88a..b4ec519 100644
--- a/oslo_messaging/tests/rpc/test_server.py
+++ b/oslo_messaging/tests/rpc/test_server.py
@@ -19,6 +19,7 @@ import warnings
import eventlet
import fixtures
from oslo_config import cfg
+from oslo_utils import eventletutils
from six.moves import mock
import testscenarios
@@ -61,7 +62,7 @@ class ServerSetupMixin(object):
class ServerController(object):
def __init__(self):
- self.stopped = threading.Event()
+ self.stopped = eventletutils.Event()
def stop(self, ctxt):
self.stopped.set()
@@ -704,11 +705,11 @@ class TestServerLocking(test_utils.BaseTestCase):
# Test that if 2 threads call a method simultaneously, both will wait,
# but only 1 will call the underlying executor method.
- start_event = threading.Event()
- finish_event = threading.Event()
+ start_event = eventletutils.Event()
+ finish_event = eventletutils.Event()
- running_event = threading.Event()
- done_event = threading.Event()
+ running_event = eventletutils.Event()
+ done_event = eventletutils.Event()
_runner = [None]
@@ -734,7 +735,7 @@ class TestServerLocking(test_utils.BaseTestCase):
runner = _runner[0]
waiter = start2 if runner == start1 else start2
- waiter_finished = threading.Event()
+ waiter_finished = eventletutils.Event()
waiter.link(lambda _: waiter_finished.set())
# At this point, runner is running start(), and waiter() is waiting for
@@ -783,8 +784,8 @@ class TestServerLocking(test_utils.BaseTestCase):
# Ensure that if 2 threads wait for the completion of 'start', the
# first will wait until complete_event is signalled, but the second
# will continue
- complete_event = threading.Event()
- complete_waiting_callback = threading.Event()
+ complete_event = eventletutils.Event()
+ complete_waiting_callback = eventletutils.Event()
start_state = self.server._states['start']
old_wait_for_completion = start_state.wait_for_completion
@@ -801,7 +802,7 @@ class TestServerLocking(test_utils.BaseTestCase):
# thread1 will wait for start to complete until we signal it
thread1 = eventlet.spawn(self.server.stop)
- thread1_finished = threading.Event()
+ thread1_finished = eventletutils.Event()
thread1.link(lambda _: thread1_finished.set())
self.server.start()
@@ -847,7 +848,7 @@ class TestServerLocking(test_utils.BaseTestCase):
# Test that we generate a log message if we wait longer than
# DEFAULT_LOG_AFTER
- log_event = threading.Event()
+ log_event = eventletutils.Event()
mock_log.warning.side_effect = lambda _, __: log_event.set()
# Call stop without calling start. We should log a wait after 1 second
@@ -863,7 +864,7 @@ class TestServerLocking(test_utils.BaseTestCase):
# Test that we generate a log message if we wait longer than
# the number of seconds passed to log_after
- log_event = threading.Event()
+ log_event = eventletutils.Event()
mock_log.warning.side_effect = lambda _, __: log_event.set()
# Call stop without calling start. We should log a wait after 1 second
@@ -879,7 +880,7 @@ class TestServerLocking(test_utils.BaseTestCase):
# Test that we log a message after log_after seconds if we've also
# specified an absolute timeout
- log_event = threading.Event()
+ log_event = eventletutils.Event()
mock_log.warning.side_effect = lambda _, __: log_event.set()
# Call stop without calling start. We should log a wait after 1 second
@@ -904,7 +905,7 @@ class TestServerLocking(test_utils.BaseTestCase):
# Start the server, which will also instantiate an executor
self.server.start()
self.server.stop()
- shutdown_called = threading.Event()
+ shutdown_called = eventletutils.Event()
# Patch the executor's stop method to be very slow
def slow_shutdown(wait):
diff --git a/oslo_messaging/tests/utils.py b/oslo_messaging/tests/utils.py
index c5b25e3..0577966 100644
--- a/oslo_messaging/tests/utils.py
+++ b/oslo_messaging/tests/utils.py
@@ -22,6 +22,7 @@
import threading
from oslo_config import cfg
+from oslo_utils import eventletutils
from oslotest import base
@@ -63,8 +64,8 @@ class ServerThreadHelper(threading.Thread):
super(ServerThreadHelper, self).__init__()
self.daemon = True
self._server = server
- self._stop_event = threading.Event()
- self._start_event = threading.Event()
+ self._stop_event = eventletutils.Event()
+ self._start_event = eventletutils.Event()
def start(self):
super(ServerThreadHelper, self).start()
diff --git a/requirements.txt b/requirements.txt
index 6de25a4..ad19a38 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -7,7 +7,7 @@ pbr!=2.1.0,>=2.0.0 # Apache-2.0
futurist>=1.2.0 # Apache-2.0
oslo.config>=5.2.0 # Apache-2.0
oslo.log>=3.36.0 # Apache-2.0
-oslo.utils>=3.33.0 # Apache-2.0
+oslo.utils>=3.37.0 # Apache-2.0
oslo.serialization!=2.19.1,>=2.18.0 # Apache-2.0
oslo.service!=1.28.1,>=1.24.0 # Apache-2.0
stevedore>=1.20.0 # Apache-2.0