summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorZuul <zuul@review.opendev.org>2021-06-15 00:47:26 +0000
committerGerrit Code Review <review@openstack.org>2021-06-15 00:47:26 +0000
commit7f83cbe9e2a0e95fc0b7bc10d3eee7528e27b943 (patch)
tree34b0895907da0e609078326b1dc9816457b6841c
parent1f6d62c2a29c96e4a081981121b6b8467ae15ad2 (diff)
parentd71dfa8b8d7f3e170a0ff66064bdf950a787a028 (diff)
downloadnova-7f83cbe9e2a0e95fc0b7bc10d3eee7528e27b943.tar.gz
Merge "Move fake_notifier impl under NotificationFixture"
-rw-r--r--nova/tests/fixtures/notifications.py179
-rw-r--r--nova/tests/unit/fake_notifier.py153
2 files changed, 171 insertions, 161 deletions
diff --git a/nova/tests/fixtures/notifications.py b/nova/tests/fixtures/notifications.py
index 7785c85798..8f9b63b70e 100644
--- a/nova/tests/fixtures/notifications.py
+++ b/nova/tests/fixtures/notifications.py
@@ -10,9 +10,158 @@
# License for the specific language governing permissions and limitations
# under the License.
+import collections
+import functools
+import threading
+
import fixtures
+from oslo_log import log as logging
+import oslo_messaging
+from oslo_serialization import jsonutils
+from oslo_utils import excutils
+from oslo_utils import timeutils
+
+from nova import rpc
+
+LOG = logging.getLogger(__name__)
+
+
+class _Sub(object):
+ """Allow a subscriber to efficiently wait for an event to occur, and
+ retrieve events which have occurred.
+ """
+
+ def __init__(self):
+ self._cond = threading.Condition()
+ self._notifications = []
+
+ def received(self, notification):
+ with self._cond:
+ self._notifications.append(notification)
+ self._cond.notifyAll()
+
+ def wait_n(self, n, event, timeout):
+ """Wait until at least n notifications have been received, and return
+ them. May return less than n notifications if timeout is reached.
+ """
+
+ with timeutils.StopWatch(timeout) as timer:
+ with self._cond:
+ while len(self._notifications) < n:
+ if timer.expired():
+ # notifications = pprint.pformat(
+ # {event: sub._notifications
+ # for event, sub in VERSIONED_SUBS.items()})
+ # FIXME: tranform this to get access to all the
+ # versioned notifications
+ notifications = []
+ raise AssertionError(
+ "Notification %(event)s hasn't been "
+ "received. Received:\n%(notifications)s" % {
+ 'event': event,
+ 'notifications': notifications,
+ })
+ self._cond.wait(timer.leftover())
+
+ # Return a copy of the notifications list
+ return list(self._notifications)
+
+
+FakeMessage = collections.namedtuple(
+ 'FakeMessage',
+ ['publisher_id', 'priority', 'event_type', 'payload', 'context'])
+
+
+class FakeNotifier(object):
+
+ def __init__(
+ self, transport, publisher_id, serializer=None, parent=None,
+ ):
+ self.transport = transport
+ self.publisher_id = publisher_id
+ self._serializer = \
+ serializer or oslo_messaging.serializer.NoOpSerializer()
+ if parent:
+ self.notifications = parent.notifications
+ else:
+ self.notifications = []
+
+ for priority in ['debug', 'info', 'warn', 'error', 'critical']:
+ setattr(
+ self, priority,
+ functools.partial(self._notify, priority.upper()),
+ )
+
+ def prepare(self, publisher_id=None):
+ if publisher_id is None:
+ publisher_id = self.publisher_id
-from nova.tests.unit import fake_notifier
+ return self.__class__(
+ self.transport, publisher_id,
+ serializer=self._serializer, parent=self,
+ )
+
+ def _notify(self, priority, ctxt, event_type, payload):
+ try:
+ payload = self._serializer.serialize_entity(ctxt, payload)
+ except Exception:
+ with excutils.save_and_reraise_exception():
+ LOG.error('Error serializing payload: %s', payload)
+
+ # NOTE(sileht): simulate the kombu serializer
+ # this permit to raise an exception if something have not
+ # been serialized correctly
+ jsonutils.to_primitive(payload)
+ # NOTE(melwitt): Try to serialize the context, as the rpc would.
+ # An exception will be raised if something is wrong
+ # with the context.
+ self._serializer.serialize_context(ctxt)
+ msg = FakeMessage(
+ self.publisher_id, priority, event_type, payload, ctxt)
+ self.notifications.append(msg)
+
+ def is_enabled(self):
+ return True
+
+ def reset(self):
+ self.notifications.clear()
+
+
+class FakeVersionedNotifier(FakeNotifier):
+ def __init__(
+ self, transport, publisher_id, serializer=None, parent=None,
+ ):
+ super().__init__(transport, publisher_id, serializer)
+ if parent:
+ self.versioned_notifications = parent.versioned_notifications
+ else:
+ self.versioned_notifications = []
+
+ if parent:
+ self.subscriptions = parent.subscriptions
+ else:
+ self.subscriptions = collections.defaultdict(_Sub)
+
+ def _notify(self, priority, ctxt, event_type, payload):
+ payload = self._serializer.serialize_entity(ctxt, payload)
+ notification = {
+ 'publisher_id': self.publisher_id,
+ 'priority': priority,
+ 'event_type': event_type,
+ 'payload': payload,
+ }
+ self.versioned_notifications.append(notification)
+ self.subscriptions[event_type].received(notification)
+
+ def reset(self):
+ self.versioned_notifications.clear()
+ self.subscriptions.clear()
+
+ def wait_for_versioned_notifications(
+ self, event_type, n_events=1, timeout=10.0,
+ ):
+ return self.subscriptions[event_type].wait_n(
+ n_events, event_type, timeout)
class NotificationFixture(fixtures.Fixture):
@@ -21,23 +170,37 @@ class NotificationFixture(fixtures.Fixture):
def setUp(self):
super().setUp()
- self.addCleanup(fake_notifier.reset)
- fake_notifier.stub_notifier(self.test)
+ self.addCleanup(self.reset)
+
+ self.fake_notifier = FakeNotifier(
+ rpc.LEGACY_NOTIFIER.transport,
+ rpc.LEGACY_NOTIFIER.publisher_id,
+ serializer=getattr(
+ rpc.LEGACY_NOTIFIER, '_serializer', None))
+ self.fake_versioned_notifier = FakeVersionedNotifier(
+ rpc.NOTIFIER.transport,
+ rpc.NOTIFIER.publisher_id,
+ serializer=getattr(rpc.NOTIFIER, '_serializer', None))
+ if rpc.LEGACY_NOTIFIER and rpc.NOTIFIER:
+ self.test.stub_out('nova.rpc.LEGACY_NOTIFIER', self.fake_notifier)
+ self.test.stub_out(
+ 'nova.rpc.NOTIFIER', self.fake_versioned_notifier)
def reset(self):
- fake_notifier.reset()
+ self.fake_notifier.reset()
+ self.fake_versioned_notifier.reset()
def wait_for_versioned_notifications(
self, event_type, n_events=1, timeout=10.0,
):
- return fake_notifier.VERSIONED_SUBS[event_type].wait_n(
- n_events, event_type, timeout,
+ return self.fake_versioned_notifier.wait_for_versioned_notifications(
+ event_type, n_events, timeout,
)
@property
def versioned_notifications(self):
- return fake_notifier.VERSIONED_NOTIFICATIONS
+ return self.fake_versioned_notifier.versioned_notifications
@property
def notifications(self):
- return fake_notifier.NOTIFICATIONS
+ return self.fake_notifier.notifications
diff --git a/nova/tests/unit/fake_notifier.py b/nova/tests/unit/fake_notifier.py
deleted file mode 100644
index 62c2fa8bb5..0000000000
--- a/nova/tests/unit/fake_notifier.py
+++ /dev/null
@@ -1,153 +0,0 @@
-# Copyright 2013 Red Hat, Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import collections
-import functools
-import pprint
-import threading
-
-from oslo_log import log as logging
-import oslo_messaging as messaging
-from oslo_serialization import jsonutils
-from oslo_utils import excutils
-from oslo_utils import timeutils
-
-from nova import rpc
-
-LOG = logging.getLogger(__name__)
-
-
-class _Sub(object):
- """Allow a subscriber to efficiently wait for an event to occur, and
- retrieve events which have occured.
- """
-
- def __init__(self):
- self._cond = threading.Condition()
- self._notifications = []
-
- def received(self, notification):
- with self._cond:
- self._notifications.append(notification)
- self._cond.notifyAll()
-
- def wait_n(self, n, event, timeout):
- """Wait until at least n notifications have been received, and return
- them. May return less than n notifications if timeout is reached.
- """
-
- with timeutils.StopWatch(timeout) as timer:
- with self._cond:
- while len(self._notifications) < n:
- if timer.expired():
- notifications = pprint.pformat(
- {event: sub._notifications
- for event, sub in VERSIONED_SUBS.items()})
- raise AssertionError(
- "Notification %(event)s hasn't been "
- "received. Received:\n%(notifications)s" % {
- 'event': event,
- 'notifications': notifications,
- })
- self._cond.wait(timer.leftover())
-
- # Return a copy of the notifications list
- return list(self._notifications)
-
-
-VERSIONED_SUBS = collections.defaultdict(_Sub)
-VERSIONED_NOTIFICATIONS = []
-NOTIFICATIONS = []
-
-
-def reset():
- del NOTIFICATIONS[:]
- del VERSIONED_NOTIFICATIONS[:]
- VERSIONED_SUBS.clear()
-
-
-FakeMessage = collections.namedtuple('Message',
- ['publisher_id', 'priority',
- 'event_type', 'payload', 'context'])
-
-
-class FakeNotifier(object):
-
- def __init__(self, transport, publisher_id, serializer=None):
- self.transport = transport
- self.publisher_id = publisher_id
- self._serializer = serializer or messaging.serializer.NoOpSerializer()
-
- for priority in ['debug', 'info', 'warn', 'error', 'critical']:
- setattr(self, priority,
- functools.partial(self._notify, priority.upper()))
-
- def prepare(self, publisher_id=None):
- if publisher_id is None:
- publisher_id = self.publisher_id
- return self.__class__(self.transport, publisher_id,
- serializer=self._serializer)
-
- def _notify(self, priority, ctxt, event_type, payload):
- try:
- payload = self._serializer.serialize_entity(ctxt, payload)
- except Exception:
- with excutils.save_and_reraise_exception():
- LOG.error('Error serializing payload: %s', payload)
- # NOTE(sileht): simulate the kombu serializer
- # this permit to raise an exception if something have not
- # been serialized correctly
- jsonutils.to_primitive(payload)
- # NOTE(melwitt): Try to serialize the context, as the rpc would.
- # An exception will be raised if something is wrong
- # with the context.
- self._serializer.serialize_context(ctxt)
- msg = FakeMessage(self.publisher_id, priority, event_type,
- payload, ctxt)
- NOTIFICATIONS.append(msg)
-
- def is_enabled(self):
- return True
-
-
-class FakeVersionedNotifier(FakeNotifier):
- def _notify(self, priority, ctxt, event_type, payload):
- payload = self._serializer.serialize_entity(ctxt, payload)
- notification = {'publisher_id': self.publisher_id,
- 'priority': priority,
- 'event_type': event_type,
- 'payload': payload}
- VERSIONED_NOTIFICATIONS.append(notification)
- VERSIONED_SUBS[event_type].received(notification)
-
-
-def stub_notifier(test):
- test.stub_out('oslo_messaging.Notifier', FakeNotifier)
- if rpc.LEGACY_NOTIFIER and rpc.NOTIFIER:
- test.stub_out('nova.rpc.LEGACY_NOTIFIER',
- FakeNotifier(rpc.LEGACY_NOTIFIER.transport,
- rpc.LEGACY_NOTIFIER.publisher_id,
- serializer=getattr(rpc.LEGACY_NOTIFIER,
- '_serializer',
- None)))
- test.stub_out('nova.rpc.NOTIFIER',
- FakeVersionedNotifier(rpc.NOTIFIER.transport,
- rpc.NOTIFIER.publisher_id,
- serializer=getattr(rpc.NOTIFIER,
- '_serializer',
- None)))
-
-
-def wait_for_versioned_notifications(event_type, n_events=1, timeout=10.0):
- return VERSIONED_SUBS[event_type].wait_n(n_events, event_type, timeout)