summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKevin_Zheng <zhengzhenyu@huawei.com>2018-04-23 15:41:21 +0800
committerMatt Riedemann <mriedem.os@gmail.com>2018-07-16 13:57:09 -0400
commit79dac41fee178dabb547f4d7bc10609630767131 (patch)
tree5d877e273d064d0dc845b617a8770f6fc5493b29
parentb2dafb1ee8c125cbeca92871e9478ac412084292 (diff)
downloadnova-79dac41fee178dabb547f4d7bc10609630767131.tar.gz
Use ThreadPoolExecutor for max_concurrent_live_migrations
This changes the max_concurrent_live_migrations handling to use a ThreadPoolExecutor so that we can control a bounded pool of Futures in order to cancel queued live migrations later in this series. There is a slight functional difference in the unlimited case since starting in python 3.5, ThreadPoolExecutor will default to ncpu * 5 concurrently running threads. However, max_concurrent_live_migrations defaults to 1 and assuming compute hosts run with 32 physical CPUs on average, you'd be looking at a maximum of 160 concurrently running live migrations, which is probably way above what anyone would consider sane. Co-Authored-By: Matt Riedemann <mriedem.os@gmail.com> Part of blueprint abort-live-migration-in-queued-status Change-Id: Ia9ea1e164fb3b4a386405538eed58d94ad115172
-rw-r--r--lower-constraints.txt1
-rw-r--r--nova/compute/manager.py67
-rw-r--r--nova/exception.py5
-rw-r--r--nova/tests/fixtures.py20
-rw-r--r--nova/tests/unit/compute/test_compute.py3
-rw-r--r--nova/tests/unit/compute/test_compute_mgr.py90
-rw-r--r--requirements.txt1
7 files changed, 151 insertions, 36 deletions
diff --git a/lower-constraints.txt b/lower-constraints.txt
index da681ff196..c2a27a41dd 100644
--- a/lower-constraints.txt
+++ b/lower-constraints.txt
@@ -33,6 +33,7 @@ fixtures==3.0.0
flake8==2.5.5
future==0.16.0
futurist==1.6.0
+futures==3.0.0
gabbi==1.35.0
gitdb2==2.0.3
GitPython==2.1.8
diff --git a/nova/compute/manager.py b/nova/compute/manager.py
index cee21c0287..d8e4abca60 100644
--- a/nova/compute/manager.py
+++ b/nova/compute/manager.py
@@ -27,6 +27,9 @@ terminating it.
import base64
import binascii
+# If py2, concurrent.futures comes from the futures library otherwise it
+# comes from the py3 standard library.
+from concurrent import futures
import contextlib
import functools
import inspect
@@ -521,10 +524,16 @@ class ComputeManager(manager.Manager):
else:
self._build_semaphore = compute_utils.UnlimitedSemaphore()
if max(CONF.max_concurrent_live_migrations, 0) != 0:
- self._live_migration_semaphore = eventlet.semaphore.Semaphore(
- CONF.max_concurrent_live_migrations)
+ self._live_migration_executor = futures.ThreadPoolExecutor(
+ max_workers=CONF.max_concurrent_live_migrations)
else:
- self._live_migration_semaphore = compute_utils.UnlimitedSemaphore()
+ # Starting in python 3.5, this is technically bounded, but it's
+ # ncpu * 5 which is probably much higher than anyone would sanely
+ # use for concurrently running live migrations.
+ self._live_migration_executor = futures.ThreadPoolExecutor()
+ # This is a dict, keyed by migration uuid, to a two-item tuple of
+ # migration object and Future for the queued live migration.
+ self._waiting_live_migrations = {}
super(ComputeManager, self).__init__(service_name="compute",
*args, **kwargs)
@@ -1152,6 +1161,26 @@ class ComputeManager(manager.Manager):
self.driver.register_event_listener(None)
self.instance_events.cancel_all_events()
self.driver.cleanup_host(host=self.host)
+ self._cleanup_live_migrations_in_pool()
+
+ def _cleanup_live_migrations_in_pool(self):
+ # Shutdown the pool so we don't get new requests.
+ self._live_migration_executor.shutdown(wait=False)
+ # For any queued migrations, cancel the migration and update
+ # its status.
+ for migration, future in self._waiting_live_migrations.values():
+ # If we got here before the Future was submitted then we need
+ # to move on since there isn't anything we can do.
+ if future is None:
+ continue
+ if future.cancel():
+ self._set_migration_status(migration, 'cancelled')
+ LOG.info('Successfully cancelled queued live migration.',
+ instance_uuid=migration.instance_uuid)
+ else:
+ LOG.warning('Unable to cancel live migration.',
+ instance_uuid=migration.instance_uuid)
+ self._waiting_live_migrations.clear()
def pre_start_hook(self):
"""After the service is initialized, but before we fully bring
@@ -6146,6 +6175,9 @@ class ComputeManager(manager.Manager):
# done on source/destination. For now, this is just here for status
# reporting
self._set_migration_status(migration, 'preparing')
+ # NOTE(Kevin_Zheng): The migration is no longer in the `queued` status
+ # so lets remove it from the mapping.
+ self._waiting_live_migrations.pop(instance.uuid)
class _BreakWaitForInstanceEvent(Exception):
"""Used as a signal to stop waiting for the network-vif-plugged
@@ -6257,18 +6289,23 @@ class ComputeManager(manager.Manager):
"""
self._set_migration_status(migration, 'queued')
-
- def dispatch_live_migration(*args, **kwargs):
- with self._live_migration_semaphore:
- self._do_live_migration(*args, **kwargs)
-
- # NOTE(danms): We spawn here to return the RPC worker thread back to
- # the pool. Since what follows could take a really long time, we don't
- # want to tie up RPC workers.
- utils.spawn_n(dispatch_live_migration,
- context, dest, instance,
- block_migration, migration,
- migrate_data)
+ # NOTE(Kevin_Zheng): Submit the live_migration job to the pool and
+ # put the returned Future object into dict mapped with migration.uuid
+ # in order to be able to track and abort it in the future.
+ self._waiting_live_migrations[instance.uuid] = (None, None)
+ try:
+ future = self._live_migration_executor.submit(
+ self._do_live_migration, context, dest, instance,
+ block_migration, migration, migrate_data)
+ self._waiting_live_migrations[instance.uuid] = (migration, future)
+ except RuntimeError:
+ # ThreadPoolExecutor.submit will raise RuntimeError if the pool
+ # is shutdown, which happens in _cleanup_live_migrations_in_pool.
+ LOG.info('Migration %s failed to submit as the compute service '
+ 'is shutting down.', migration.uuid, instance=instance)
+ self._set_migration_status(migration, 'error')
+ raise exception.LiveMigrationNotSubmitted(
+ migration_uuid=migration.uuid, instance_uuid=instance.uuid)
@wrap_exception()
@wrap_instance_event(prefix='compute')
diff --git a/nova/exception.py b/nova/exception.py
index 5fb0b71db9..a0df4a1a00 100644
--- a/nova/exception.py
+++ b/nova/exception.py
@@ -1827,6 +1827,11 @@ class InvalidWatchdogAction(Invalid):
msg_fmt = _("Provided watchdog action (%(action)s) is not supported.")
+class LiveMigrationNotSubmitted(NovaException):
+ msg_fmt = _("Failed to submit live migration %(migration_uuid)s for "
+ "instance %(instance_uuid)s for processing.")
+
+
class SelectionObjectsWithOldRPCVersionNotSupported(NovaException):
msg_fmt = _("Requests for Selection objects with alternates are not "
"supported in select_destinations() before RPC version 4.5; "
diff --git a/nova/tests/fixtures.py b/nova/tests/fixtures.py
index 67fe98ca47..7db87df5d3 100644
--- a/nova/tests/fixtures.py
+++ b/nova/tests/fixtures.py
@@ -1010,6 +1010,26 @@ class SpawnIsSynchronousFixture(fixtures.Fixture):
'nova.utils.spawn', _FakeGreenThread))
+class SynchronousThreadPoolExecutorFixture(fixtures.Fixture):
+ """Make ThreadPoolExecutor.submit() synchronous.
+
+ The function passed to submit() will be executed and a mock.Mock
+ object will be returned as the Future where Future.result() will
+ return the result of the call to the submitted function.
+ """
+ def setUp(self):
+ super(SynchronousThreadPoolExecutorFixture, self).setUp()
+
+ def fake_submit(_self, fn, *args, **kwargs):
+ result = fn(*args, **kwargs)
+ future = mock.Mock(spec='concurrent.futures.Future')
+ future.return_value.result.return_value = result
+ return future
+ self.useFixture(fixtures.MonkeyPatch(
+ 'concurrent.futures.ThreadPoolExecutor.submit',
+ fake_submit))
+
+
class BannedDBSchemaOperations(fixtures.Fixture):
"""Ban some operations for migrations"""
def __init__(self, banned_resources=None):
diff --git a/nova/tests/unit/compute/test_compute.py b/nova/tests/unit/compute/test_compute.py
index 0fb2fb08f7..f6e79ea0c1 100644
--- a/nova/tests/unit/compute/test_compute.py
+++ b/nova/tests/unit/compute/test_compute.py
@@ -1490,6 +1490,7 @@ class ComputeTestCase(BaseTestCase,
def setUp(self):
super(ComputeTestCase, self).setUp()
self.useFixture(fixtures.SpawnIsSynchronousFixture())
+ self.useFixture(fixtures.SynchronousThreadPoolExecutorFixture())
self.image_api = image_api.API()
@@ -6348,7 +6349,7 @@ class ComputeTestCase(BaseTestCase,
mock_pre.return_value = migrate_data
# start test
- migration = objects.Migration()
+ migration = objects.Migration(uuid=uuids.migration)
with mock.patch.object(self.compute.driver,
'cleanup') as mock_cleanup:
mock_cleanup.side_effect = test.TestingException
diff --git a/nova/tests/unit/compute/test_compute_mgr.py b/nova/tests/unit/compute/test_compute_mgr.py
index 15cc7f2156..ef34d23495 100644
--- a/nova/tests/unit/compute/test_compute_mgr.py
+++ b/nova/tests/unit/compute/test_compute_mgr.py
@@ -697,6 +697,33 @@ class ComputeManagerUnitTestCase(test.NoDBTestCase):
mock.call(self.compute.handle_events), mock.call(None)])
mock_driver.cleanup_host.assert_called_once_with(host='fake-mini')
+ def test_cleanup_live_migrations_in_pool_with_record(self):
+ fake_future = mock.MagicMock()
+ fake_instance_uuid = uuids.instance
+ fake_migration = objects.Migration(
+ uuid=uuids.migration, instance_uuid=fake_instance_uuid)
+ fake_migration.save = mock.MagicMock()
+ self.compute._waiting_live_migrations[fake_instance_uuid] = (
+ fake_migration, fake_future)
+
+ with mock.patch.object(self.compute, '_live_migration_executor'
+ ) as mock_migration_pool:
+ self.compute._cleanup_live_migrations_in_pool()
+
+ mock_migration_pool.shutdown.assert_called_once_with(wait=False)
+ self.assertEqual('cancelled', fake_migration.status)
+ fake_future.cancel.assert_called_once_with()
+ self.assertEqual({}, self.compute._waiting_live_migrations)
+
+ # test again with Future is None
+ self.compute._waiting_live_migrations[fake_instance_uuid] = (
+ None, None)
+ self.compute._cleanup_live_migrations_in_pool()
+
+ mock_migration_pool.shutdown.assert_called_with(wait=False)
+ self.assertEqual(2, mock_migration_pool.shutdown.call_count)
+ self.assertEqual({}, self.compute._waiting_live_migrations)
+
def test_init_virt_events_disabled(self):
self.flags(handle_virt_lifecycle_events=False, group='workarounds')
with mock.patch.object(self.compute.driver,
@@ -6238,6 +6265,7 @@ class ComputeManagerErrorsOutMigrationTestCase(test.NoDBTestCase):
mock_obj_as_admin.assert_called_once_with()
+@ddt.ddt
class ComputeManagerMigrationTestCase(test.NoDBTestCase):
class TestResizeError(Exception):
pass
@@ -6895,7 +6923,7 @@ class ComputeManagerMigrationTestCase(test.NoDBTestCase):
@mock.patch('nova.objects.Migration.save')
def _do_it(mock_mig_save):
instance = objects.Instance(uuid=uuids.fake)
- migration = objects.Migration()
+ migration = objects.Migration(uuid=uuids.migration)
self.compute.live_migration(self.context,
mock.sentinel.dest,
instance,
@@ -6906,10 +6934,10 @@ class ComputeManagerMigrationTestCase(test.NoDBTestCase):
migration.save.assert_called_once_with()
with mock.patch.object(self.compute,
- '_live_migration_semaphore') as mock_sem:
+ '_live_migration_executor') as mock_exc:
for i in (1, 2, 3):
_do_it()
- self.assertEqual(3, mock_sem.__enter__.call_count)
+ self.assertEqual(3, mock_exc.submit.call_count)
def test_max_concurrent_live_limited(self):
self.flags(max_concurrent_live_migrations=2)
@@ -6919,25 +6947,19 @@ class ComputeManagerMigrationTestCase(test.NoDBTestCase):
self.flags(max_concurrent_live_migrations=0)
self._test_max_concurrent_live()
- def test_max_concurrent_live_semaphore_limited(self):
+ @mock.patch('concurrent.futures.ThreadPoolExecutor')
+ def test_max_concurrent_live_semaphore_limited(self, mock_executor):
self.flags(max_concurrent_live_migrations=123)
- self.assertEqual(
- 123,
- manager.ComputeManager()._live_migration_semaphore.balance)
+ manager.ComputeManager()
+ mock_executor.assert_called_once_with(max_workers=123)
- def test_max_concurrent_live_semaphore_unlimited(self):
- self.flags(max_concurrent_live_migrations=0)
- compute = manager.ComputeManager()
- self.assertEqual(0, compute._live_migration_semaphore.balance)
- self.assertIsInstance(compute._live_migration_semaphore,
- compute_utils.UnlimitedSemaphore)
-
- def test_max_concurrent_live_semaphore_negative(self):
- self.flags(max_concurrent_live_migrations=-2)
- compute = manager.ComputeManager()
- self.assertEqual(0, compute._live_migration_semaphore.balance)
- self.assertIsInstance(compute._live_migration_semaphore,
- compute_utils.UnlimitedSemaphore)
+ @ddt.data(0, -2)
+ def test_max_concurrent_live_semaphore_unlimited(self, max_concurrent):
+ self.flags(max_concurrent_live_migrations=max_concurrent)
+ with mock.patch(
+ 'concurrent.futures.ThreadPoolExecutor') as mock_executor:
+ manager.ComputeManager()
+ mock_executor.assert_called_once_with()
def test_pre_live_migration_cinder_v3_api(self):
# This tests that pre_live_migration with a bdm with an
@@ -7109,6 +7131,9 @@ class ComputeManagerMigrationTestCase(test.NoDBTestCase):
network_info=network_model.NetworkInfo([
network_model.VIF(uuids.port1), network_model.VIF(uuids.port2)
]))
+ self.compute._waiting_live_migrations[self.instance.uuid] = (
+ self.migration, mock.MagicMock()
+ )
with mock.patch.object(self.compute.virtapi,
'wait_for_instance_event') as wait_for_event:
self.compute._do_live_migration(
@@ -7136,6 +7161,9 @@ class ComputeManagerMigrationTestCase(test.NoDBTestCase):
self.instance.info_cache = objects.InstanceInfoCache(
network_info=network_model.NetworkInfo([
network_model.VIF(uuids.port1)]))
+ self.compute._waiting_live_migrations[self.instance.uuid] = (
+ self.migration, mock.MagicMock()
+ )
with mock.patch.object(
self.compute.virtapi, 'wait_for_instance_event'):
self.compute._do_live_migration(
@@ -7160,6 +7188,9 @@ class ComputeManagerMigrationTestCase(test.NoDBTestCase):
self.instance.info_cache = objects.InstanceInfoCache(
network_info=network_model.NetworkInfo([
network_model.VIF(uuids.port1)]))
+ self.compute._waiting_live_migrations[self.instance.uuid] = (
+ self.migration, mock.MagicMock()
+ )
with mock.patch.object(
self.compute.virtapi,
'wait_for_instance_event') as wait_for_event:
@@ -7187,6 +7218,9 @@ class ComputeManagerMigrationTestCase(test.NoDBTestCase):
self.instance.info_cache = objects.InstanceInfoCache(
network_info=network_model.NetworkInfo([
network_model.VIF(uuids.port1)]))
+ self.compute._waiting_live_migrations[self.instance.uuid] = (
+ self.migration, mock.MagicMock()
+ )
with mock.patch.object(
self.compute.virtapi,
'wait_for_instance_event') as wait_for_event:
@@ -7218,6 +7252,9 @@ class ComputeManagerMigrationTestCase(test.NoDBTestCase):
self.instance.info_cache = objects.InstanceInfoCache(
network_info=network_model.NetworkInfo([
network_model.VIF(uuids.port1)]))
+ self.compute._waiting_live_migrations[self.instance.uuid] = (
+ self.migration, mock.MagicMock()
+ )
with mock.patch.object(
self.compute.virtapi,
'wait_for_instance_event') as wait_for_event:
@@ -7229,6 +7266,19 @@ class ComputeManagerMigrationTestCase(test.NoDBTestCase):
self.assertEqual('running', self.migration.status)
mock_rollback_live_mig.assert_not_called()
+ @mock.patch.object(compute_utils, 'add_instance_fault_from_exc')
+ @mock.patch('nova.compute.utils.notify_about_instance_action')
+ def test_live_migration_submit_failed(self, mock_notify, mock_exc):
+ migration = objects.Migration(uuid=uuids.migration)
+ migration.save = mock.MagicMock()
+ with mock.patch.object(
+ self.compute._live_migration_executor, 'submit') as mock_sub:
+ mock_sub.side_effect = RuntimeError
+ self.assertRaises(exception.LiveMigrationNotSubmitted,
+ self.compute.live_migration, self.context,
+ 'fake', self.instance, True, migration, {})
+ self.assertEqual('error', migration.status)
+
def test_live_migration_force_complete_succeeded(self):
migration = objects.Migration()
migration.status = 'running'
diff --git a/requirements.txt b/requirements.txt
index 2e1f9246a8..5b48ebb5fe 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -67,3 +67,4 @@ os-service-types>=1.2.0 # Apache-2.0
taskflow>=2.16.0 # Apache-2.0
python-dateutil>=2.5.3 # BSD
zVMCloudConnector>=1.1.1;sys_platform!='win32' # Apache 2.0 License
+futures>=3.0.0;python_version=='2.7' or python_version=='2.6' # PSF