summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Authors1
-rwxr-xr-xbin/nova-api23
-rwxr-xr-xbin/nova-combined65
-rwxr-xr-xbin/nova-compute18
-rwxr-xr-xbin/nova-dhcpbridge3
-rwxr-xr-xbin/nova-import-canonical-imagestore3
-rwxr-xr-xbin/nova-instancemonitor3
-rwxr-xr-xbin/nova-manage3
-rwxr-xr-xbin/nova-network18
-rwxr-xr-xbin/nova-objectstore3
-rwxr-xr-xbin/nova-scheduler18
-rwxr-xr-xbin/nova-volume18
-rwxr-xr-xcontrib/nova.sh2
-rw-r--r--nova/compute/disk.py76
-rw-r--r--nova/compute/manager.py47
-rw-r--r--nova/flags.py7
-rw-r--r--nova/manager.py4
-rw-r--r--nova/network/manager.py4
-rw-r--r--nova/objectstore/image.py10
-rw-r--r--nova/process.py209
-rw-r--r--nova/rpc.py83
-rw-r--r--nova/server.py151
-rw-r--r--nova/service.py95
-rw-r--r--nova/test.py100
-rw-r--r--nova/tests/__init__.py5
-rw-r--r--nova/tests/access_unittest.py2
-rw-r--r--nova/tests/auth_unittest.py6
-rw-r--r--nova/tests/cloud_unittest.py6
-rw-r--r--nova/tests/compute_unittest.py41
-rw-r--r--nova/tests/flags_unittest.py2
-rw-r--r--nova/tests/misc_unittest.py45
-rw-r--r--nova/tests/network_unittest.py2
-rw-r--r--nova/tests/objectstore_unittest.py4
-rw-r--r--nova/tests/process_unittest.py132
-rw-r--r--nova/tests/quota_unittest.py2
-rw-r--r--nova/tests/rpc_unittest.py36
-rw-r--r--nova/tests/scheduler_unittest.py24
-rw-r--r--nova/tests/service_unittest.py46
-rw-r--r--nova/tests/validator_unittest.py42
-rw-r--r--nova/tests/virt_unittest.py11
-rw-r--r--nova/tests/volume_unittest.py58
-rw-r--r--nova/utils.py56
-rw-r--r--nova/validate.py94
-rw-r--r--nova/virt/fake.py10
-rw-r--r--nova/virt/images.py11
-rw-r--r--nova/virt/libvirt_conn.py181
-rw-r--r--nova/virt/xenapi/network_utils.py10
-rw-r--r--nova/virt/xenapi/vm_utils.py28
-rw-r--r--nova/virt/xenapi/vmops.py67
-rw-r--r--nova/virt/xenapi_conn.py45
-rw-r--r--nova/volume/driver.py140
-rw-r--r--nova/volume/manager.py27
-rw-r--r--run_tests.py11
-rw-r--r--tools/pip-requires1
54 files changed, 784 insertions, 1325 deletions
diff --git a/Authors b/Authors
index 4a526d8491..565444ee12 100644
--- a/Authors
+++ b/Authors
@@ -6,6 +6,7 @@ Chris Behrens <cbehrens@codestud.com>
Chmouel Boudjnah <chmouel@chmouel.com>
Dean Troyer <dtroyer@gmail.com>
Devin Carlen <devin.carlen@gmail.com>
+Eldar Nugaev <enugaev@griddynamics.com>
Eric Day <eday@oddments.org>
Ewan Mellor <ewan.mellor@citrix.com>
Hisaki Ohara <hisaki.ohara@intel.com>
diff --git a/bin/nova-api b/bin/nova-api
index a9c53dbcdf..1c671201eb 100755
--- a/bin/nova-api
+++ b/bin/nova-api
@@ -17,10 +17,10 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-"""
-Nova API daemon.
-"""
+"""Starter script for Nova API."""
+
+import gettext
import os
import sys
@@ -32,9 +32,13 @@ possible_topdir = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]),
if os.path.exists(os.path.join(possible_topdir, 'nova', '__init__.py')):
sys.path.insert(0, possible_topdir)
+gettext.install('nova', unicode=1)
+
+from nova import api
from nova import flags
from nova import utils
-from nova import server
+from nova import wsgi
+
FLAGS = flags.FLAGS
flags.DEFINE_integer('osapi_port', 8774, 'OpenStack API port')
@@ -43,15 +47,10 @@ flags.DEFINE_integer('ec2api_port', 8773, 'EC2 API port')
flags.DEFINE_string('ec2api_host', '0.0.0.0', 'EC2 API host')
-def main(_args):
- from nova import api
- from nova import wsgi
+if __name__ == '__main__':
+ utils.default_flagfile()
+ FLAGS(sys.argv)
server = wsgi.Server()
server.start(api.API('os'), FLAGS.osapi_port, host=FLAGS.osapi_host)
server.start(api.API('ec2'), FLAGS.ec2api_port, host=FLAGS.ec2api_host)
server.wait()
-
-
-if __name__ == '__main__':
- utils.default_flagfile()
- server.serve('nova-api', main)
diff --git a/bin/nova-combined b/bin/nova-combined
new file mode 100755
index 0000000000..c6a04f7e93
--- /dev/null
+++ b/bin/nova-combined
@@ -0,0 +1,65 @@
+#!/usr/bin/env python
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2010 United States Government as represented by the
+# Administrator of the National Aeronautics and Space Administration.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""Combined starter script for Nova services."""
+
+import eventlet
+eventlet.monkey_patch()
+
+import os
+import sys
+
+# If ../nova/__init__.py exists, add ../ to Python search path, so that
+# it will override what happens to be installed in /usr/(local/)lib/python...
+possible_topdir = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]),
+ os.pardir,
+ os.pardir))
+if os.path.exists(os.path.join(possible_topdir, 'nova', '__init__.py')):
+ sys.path.insert(0, possible_topdir)
+
+from nova import api
+from nova import flags
+from nova import service
+from nova import utils
+from nova import wsgi
+
+
+FLAGS = flags.FLAGS
+flags.DEFINE_integer('osapi_port', 8774, 'OpenStack API port')
+flags.DEFINE_string('osapi_host', '0.0.0.0', 'OpenStack API host')
+flags.DEFINE_integer('ec2api_port', 8773, 'EC2 API port')
+flags.DEFINE_string('ec2api_host', '0.0.0.0', 'EC2 API host')
+
+
+if __name__ == '__main__':
+ utils.default_flagfile()
+ FLAGS(sys.argv)
+
+ compute = service.Service.create(binary='nova-compute')
+ network = service.Service.create(binary='nova-network')
+ volume = service.Service.create(binary='nova-volume')
+ scheduler = service.Service.create(binary='nova-scheduler')
+ #objectstore = service.Service.create(binary='nova-objectstore')
+
+ service.serve(compute, network, volume, scheduler)
+
+ server = wsgi.Server()
+ server.start(api.API('os'), FLAGS.osapi_port, host=FLAGS.osapi_host)
+ server.start(api.API('ec2'), FLAGS.ec2api_port, host=FLAGS.ec2api_host)
+ server.wait()
diff --git a/bin/nova-compute b/bin/nova-compute
index ac6378f754..d2d352da20 100755
--- a/bin/nova-compute
+++ b/bin/nova-compute
@@ -17,10 +17,12 @@
# License for the specific language governing permissions and limitations
# under the License.
-"""
- Twistd daemon for the nova compute nodes.
-"""
+"""Starter script for Nova Compute."""
+import eventlet
+eventlet.monkey_patch()
+
+import gettext
import os
import sys
@@ -32,14 +34,12 @@ possible_topdir = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]),
if os.path.exists(os.path.join(possible_topdir, 'nova', '__init__.py')):
sys.path.insert(0, possible_topdir)
+gettext.install('nova', unicode=1)
+
from nova import service
-from nova import twistd
from nova import utils
-
if __name__ == '__main__':
utils.default_flagfile()
- twistd.serve(__file__)
-
-if __name__ == '__builtin__':
- application = service.Service.create() # pylint: disable=C0103
+ service.serve()
+ service.wait()
diff --git a/bin/nova-dhcpbridge b/bin/nova-dhcpbridge
index 17c62da0a0..81b9b6dd3f 100755
--- a/bin/nova-dhcpbridge
+++ b/bin/nova-dhcpbridge
@@ -21,6 +21,7 @@
Handle lease database updates from DHCP servers.
"""
+import gettext
import logging
import os
import sys
@@ -33,6 +34,8 @@ possible_topdir = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]),
if os.path.exists(os.path.join(possible_topdir, 'nova', '__init__.py')):
sys.path.insert(0, possible_topdir)
+gettext.install('nova', unicode=1)
+
from nova import context
from nova import db
from nova import flags
diff --git a/bin/nova-import-canonical-imagestore b/bin/nova-import-canonical-imagestore
index 4ed9e8365e..036b41e48b 100755
--- a/bin/nova-import-canonical-imagestore
+++ b/bin/nova-import-canonical-imagestore
@@ -21,6 +21,7 @@
Download images from Canonical Image Store
"""
+import gettext
import json
import os
import tempfile
@@ -37,6 +38,8 @@ possible_topdir = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]),
if os.path.exists(os.path.join(possible_topdir, 'nova', '__init__.py')):
sys.path.insert(0, possible_topdir)
+gettext.install('nova', unicode=1)
+
from nova import flags
from nova import utils
from nova.objectstore import image
diff --git a/bin/nova-instancemonitor b/bin/nova-instancemonitor
index 9b6c40e820..5dac3ffe60 100755
--- a/bin/nova-instancemonitor
+++ b/bin/nova-instancemonitor
@@ -21,6 +21,7 @@
Daemon for Nova RRD based instance resource monitoring.
"""
+import gettext
import os
import logging
import sys
@@ -34,6 +35,8 @@ possible_topdir = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]),
if os.path.exists(os.path.join(possible_topdir, 'nova', '__init__.py')):
sys.path.insert(0, possible_topdir)
+gettext.install('nova', unicode=1)
+
from nova import utils
from nova import twistd
from nova.compute import monitor
diff --git a/bin/nova-manage b/bin/nova-manage
index 62eec8353f..0c1b621ed7 100755
--- a/bin/nova-manage
+++ b/bin/nova-manage
@@ -53,6 +53,7 @@
CLI interface for nova management.
"""
+import gettext
import logging
import os
import sys
@@ -68,6 +69,8 @@ possible_topdir = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]),
if os.path.exists(os.path.join(possible_topdir, 'nova', '__init__.py')):
sys.path.insert(0, possible_topdir)
+gettext.install('nova', unicode=1)
+
from nova import context
from nova import db
from nova import exception
diff --git a/bin/nova-network b/bin/nova-network
index d1fb552612..0143846a7e 100755
--- a/bin/nova-network
+++ b/bin/nova-network
@@ -17,10 +17,12 @@
# License for the specific language governing permissions and limitations
# under the License.
-"""
- Twistd daemon for the nova network nodes.
-"""
+"""Starter script for Nova Network."""
+import eventlet
+eventlet.monkey_patch()
+
+import gettext
import os
import sys
@@ -32,14 +34,12 @@ possible_topdir = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]),
if os.path.exists(os.path.join(possible_topdir, 'nova', '__init__.py')):
sys.path.insert(0, possible_topdir)
+gettext.install('nova', unicode=1)
+
from nova import service
-from nova import twistd
from nova import utils
-
if __name__ == '__main__':
utils.default_flagfile()
- twistd.serve(__file__)
-
-if __name__ == '__builtin__':
- application = service.Service.create() # pylint: disable-msg=C0103
+ service.serve()
+ service.wait()
diff --git a/bin/nova-objectstore b/bin/nova-objectstore
index 00ae27af93..9fbe228a2c 100755
--- a/bin/nova-objectstore
+++ b/bin/nova-objectstore
@@ -21,6 +21,7 @@
Twisted daemon for nova objectstore. Supports S3 API.
"""
+import gettext
import os
import sys
@@ -32,6 +33,8 @@ possible_topdir = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]),
if os.path.exists(os.path.join(possible_topdir, 'nova', '__init__.py')):
sys.path.insert(0, possible_topdir)
+gettext.install('nova', unicode=1)
+
from nova import flags
from nova import utils
from nova import twistd
diff --git a/bin/nova-scheduler b/bin/nova-scheduler
index 4d1a40cf10..f4c0eaed6d 100755
--- a/bin/nova-scheduler
+++ b/bin/nova-scheduler
@@ -17,10 +17,12 @@
# License for the specific language governing permissions and limitations
# under the License.
-"""
- Twistd daemon for the nova scheduler nodes.
-"""
+"""Starter script for Nova Scheduler."""
+import eventlet
+eventlet.monkey_patch()
+
+import gettext
import os
import sys
@@ -32,14 +34,12 @@ possible_topdir = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]),
if os.path.exists(os.path.join(possible_topdir, 'nova', '__init__.py')):
sys.path.insert(0, possible_topdir)
+gettext.install('nova', unicode=1)
+
from nova import service
-from nova import twistd
from nova import utils
-
if __name__ == '__main__':
utils.default_flagfile()
- twistd.serve(__file__)
-
-if __name__ == '__builtin__':
- application = service.Service.create()
+ service.serve()
+ service.wait()
diff --git a/bin/nova-volume b/bin/nova-volume
index e7281d6c0b..ad3ddc4051 100755
--- a/bin/nova-volume
+++ b/bin/nova-volume
@@ -17,10 +17,12 @@
# License for the specific language governing permissions and limitations
# under the License.
-"""
- Twistd daemon for the nova volume nodes.
-"""
+"""Starter script for Nova Volume."""
+import eventlet
+eventlet.monkey_patch()
+
+import gettext
import os
import sys
@@ -32,14 +34,12 @@ possible_topdir = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]),
if os.path.exists(os.path.join(possible_topdir, 'nova', '__init__.py')):
sys.path.insert(0, possible_topdir)
+gettext.install('nova', unicode=1)
+
from nova import service
-from nova import twistd
from nova import utils
-
if __name__ == '__main__':
utils.default_flagfile()
- twistd.serve(__file__)
-
-if __name__ == '__builtin__':
- application = service.Service.create() # pylint: disable-msg=C0103
+ service.serve()
+ service.wait()
diff --git a/contrib/nova.sh b/contrib/nova.sh
index 7eb934eca9..30df4edb65 100755
--- a/contrib/nova.sh
+++ b/contrib/nova.sh
@@ -72,7 +72,7 @@ fi
# You should only have to run this once
if [ "$CMD" == "install" ]; then
sudo apt-get install -y python-software-properties
- sudo add-apt-repository ppa:nova-core/ppa
+ sudo add-apt-repository ppa:nova-core/trunk
sudo apt-get update
sudo apt-get install -y dnsmasq kpartx kvm gawk iptables ebtables
sudo apt-get install -y user-mode-linux kvm libvirt-bin
diff --git a/nova/compute/disk.py b/nova/compute/disk.py
index a77c30a19a..3d5e077240 100644
--- a/nova/compute/disk.py
+++ b/nova/compute/disk.py
@@ -26,8 +26,6 @@ import logging
import os
import tempfile
-from twisted.internet import defer
-
from nova import exception
from nova import flags
@@ -39,7 +37,6 @@ flags.DEFINE_integer('block_size', 1024 * 1024 * 256,
'block_size to use for dd')
-@defer.inlineCallbacks
def partition(infile, outfile, local_bytes=0, resize=True,
local_type='ext2', execute=None):
"""
@@ -64,10 +61,10 @@ def partition(infile, outfile, local_bytes=0, resize=True,
file_size = os.path.getsize(infile)
if resize and file_size < FLAGS.minimum_root_size:
last_sector = FLAGS.minimum_root_size / sector_size - 1
- yield execute('dd if=/dev/zero of=%s count=1 seek=%d bs=%d'
- % (infile, last_sector, sector_size))
- yield execute('e2fsck -fp %s' % infile, check_exit_code=False)
- yield execute('resize2fs %s' % infile)
+ execute('dd if=/dev/zero of=%s count=1 seek=%d bs=%d'
+ % (infile, last_sector, sector_size))
+ execute('e2fsck -fp %s' % infile, check_exit_code=False)
+ execute('resize2fs %s' % infile)
file_size = FLAGS.minimum_root_size
elif file_size % sector_size != 0:
logging.warn("Input partition size not evenly divisible by"
@@ -86,37 +83,34 @@ def partition(infile, outfile, local_bytes=0, resize=True,
last_sector = local_last # e
# create an empty file
- yield execute('dd if=/dev/zero of=%s count=1 seek=%d bs=%d'
- % (outfile, mbr_last, sector_size))
+ execute('dd if=/dev/zero of=%s count=1 seek=%d bs=%d'
+ % (outfile, mbr_last, sector_size))
# make mbr partition
- yield execute('parted --script %s mklabel msdos' % outfile)
+ execute('parted --script %s mklabel msdos' % outfile)
# append primary file
- yield execute('dd if=%s of=%s bs=%s conv=notrunc,fsync oflag=append'
- % (infile, outfile, FLAGS.block_size))
+ execute('dd if=%s of=%s bs=%s conv=notrunc,fsync oflag=append'
+ % (infile, outfile, FLAGS.block_size))
# make primary partition
- yield execute('parted --script %s mkpart primary %ds %ds'
- % (outfile, primary_first, primary_last))
+ execute('parted --script %s mkpart primary %ds %ds'
+ % (outfile, primary_first, primary_last))
if local_bytes > 0:
# make the file bigger
- yield execute('dd if=/dev/zero of=%s count=1 seek=%d bs=%d'
- % (outfile, last_sector, sector_size))
+ execute('dd if=/dev/zero of=%s count=1 seek=%d bs=%d'
+ % (outfile, last_sector, sector_size))
# make and format local partition
- yield execute('parted --script %s mkpartfs primary %s %ds %ds'
- % (outfile, local_type, local_first, local_last))
+ execute('parted --script %s mkpartfs primary %s %ds %ds'
+ % (outfile, local_type, local_first, local_last))
-@defer.inlineCallbacks
def extend(image, size, execute):
file_size = os.path.getsize(image)
if file_size >= size:
return
- yield execute('truncate -s size %s' % (image,))
-
+ return execute('truncate -s size %s' % (image,))
-@defer.inlineCallbacks
def inject_data(image, key=None, net=None, partition=None, execute=None):
"""Injects a ssh key and optionally net data into a disk image.
@@ -126,14 +120,14 @@ def inject_data(image, key=None, net=None, partition=None, execute=None):
If partition is not specified it mounts the image as a single partition.
"""
- out, err = yield execute('sudo losetup --find --show %s' % image)
+ out, err = execute('sudo losetup --find --show %s' % image)
if err:
raise exception.Error('Could not attach image to loopback: %s' % err)
device = out.strip()
try:
if not partition is None:
# create partition
- out, err = yield execute('sudo kpartx -a %s' % device)
+ out, err = execute('sudo kpartx -a %s' % device)
if err:
raise exception.Error('Failed to load partition: %s' % err)
mapped_device = '/dev/mapper/%sp%s' % (device.split('/')[-1],
@@ -149,12 +143,12 @@ def inject_data(image, key=None, net=None, partition=None, execute=None):
mapped_device)
# Configure ext2fs so that it doesn't auto-check every N boots
- out, err = yield execute('sudo tune2fs -c 0 -i 0 %s' % mapped_device)
+ out, err = execute('sudo tune2fs -c 0 -i 0 %s' % mapped_device)
tmpdir = tempfile.mkdtemp()
try:
# mount loopback to dir
- out, err = yield execute(
+ out, err = execute(
'sudo mount %s %s' % (mapped_device, tmpdir))
if err:
raise exception.Error('Failed to mount filesystem: %s' % err)
@@ -162,24 +156,23 @@ def inject_data(image, key=None, net=None, partition=None, execute=None):
try:
if key:
# inject key file
- yield _inject_key_into_fs(key, tmpdir, execute=execute)
+ _inject_key_into_fs(key, tmpdir, execute=execute)
if net:
- yield _inject_net_into_fs(net, tmpdir, execute=execute)
+ _inject_net_into_fs(net, tmpdir, execute=execute)
finally:
# unmount device
- yield execute('sudo umount %s' % mapped_device)
+ execute('sudo umount %s' % mapped_device)
finally:
# remove temporary directory
- yield execute('rmdir %s' % tmpdir)
+ execute('rmdir %s' % tmpdir)
if not partition is None:
# remove partitions
- yield execute('sudo kpartx -d %s' % device)
+ execute('sudo kpartx -d %s' % device)
finally:
# remove loopback
- yield execute('sudo losetup --detach %s' % device)
+ execute('sudo losetup --detach %s' % device)
-@defer.inlineCallbacks
def _inject_key_into_fs(key, fs, execute=None):
"""Add the given public ssh key to root's authorized_keys.
@@ -187,22 +180,21 @@ def _inject_key_into_fs(key, fs, execute=None):
fs is the path to the base of the filesystem into which to inject the key.
"""
sshdir = os.path.join(fs, 'root', '.ssh')
- yield execute('sudo mkdir -p %s' % sshdir) # existing dir doesn't matter
- yield execute('sudo chown root %s' % sshdir)
- yield execute('sudo chmod 700 %s' % sshdir)
+ execute('sudo mkdir -p %s' % sshdir) # existing dir doesn't matter
+ execute('sudo chown root %s' % sshdir)
+ execute('sudo chmod 700 %s' % sshdir)
keyfile = os.path.join(sshdir, 'authorized_keys')
- yield execute('sudo tee -a %s' % keyfile, '\n' + key.strip() + '\n')
+ execute('sudo tee -a %s' % keyfile, '\n' + key.strip() + '\n')
-@defer.inlineCallbacks
def _inject_net_into_fs(net, fs, execute=None):
"""Inject /etc/network/interfaces into the filesystem rooted at fs.
net is the contents of /etc/network/interfaces.
"""
netdir = os.path.join(os.path.join(fs, 'etc'), 'network')
- yield execute('sudo mkdir -p %s' % netdir) # existing dir doesn't matter
- yield execute('sudo chown root:root %s' % netdir)
- yield execute('sudo chmod 755 %s' % netdir)
+ execute('sudo mkdir -p %s' % netdir) # existing dir doesn't matter
+ execute('sudo chown root:root %s' % netdir)
+ execute('sudo chmod 755 %s' % netdir)
netfile = os.path.join(netdir, 'interfaces')
- yield execute('sudo tee %s' % netfile, net)
+ execute('sudo tee %s' % netfile, net)
diff --git a/nova/compute/manager.py b/nova/compute/manager.py
index dd8d41129c..7eb60e262d 100644
--- a/nova/compute/manager.py
+++ b/nova/compute/manager.py
@@ -37,8 +37,6 @@ terminating it.
import datetime
import logging
-from twisted.internet import defer
-
from nova import exception
from nova import flags
from nova import manager
@@ -78,13 +76,11 @@ class ComputeManager(manager.Manager):
state = power_state.NOSTATE
self.db.instance_set_state(context, instance_id, state)
- @defer.inlineCallbacks
@exception.wrap_exception
def refresh_security_group(self, context, security_group_id, **_kwargs):
"""This call passes stright through to the virtualization driver."""
- yield self.driver.refresh_security_group(security_group_id)
+ self.driver.refresh_security_group(security_group_id)
- @defer.inlineCallbacks
@exception.wrap_exception
def run_instance(self, context, instance_id, **_kwargs):
"""Launch a new instance with specified options."""
@@ -105,7 +101,7 @@ class ComputeManager(manager.Manager):
'spawning')
try:
- yield self.driver.spawn(instance_ref)
+ self.driver.spawn(instance_ref)
now = datetime.datetime.utcnow()
self.db.instance_update(context,
instance_id,
@@ -119,7 +115,6 @@ class ComputeManager(manager.Manager):
self._update_state(context, instance_id)
- @defer.inlineCallbacks
@exception.wrap_exception
def terminate_instance(self, context, instance_id):
"""Terminate an instance on this machine."""
@@ -134,12 +129,11 @@ class ComputeManager(manager.Manager):
self.db.instance_destroy(context, instance_id)
raise exception.Error('trying to destroy already destroyed'
' instance: %s' % instance_id)
- yield self.driver.destroy(instance_ref)
+ self.driver.destroy(instance_ref)
# TODO(ja): should we keep it in a terminated state for a bit?
self.db.instance_destroy(context, instance_id)
- @defer.inlineCallbacks
@exception.wrap_exception
def reboot_instance(self, context, instance_id):
"""Reboot an instance on this server."""
@@ -159,10 +153,9 @@ class ComputeManager(manager.Manager):
instance_id,
power_state.NOSTATE,
'rebooting')
- yield self.driver.reboot(instance_ref)
+ self.driver.reboot(instance_ref)
self._update_state(context, instance_id)
- @defer.inlineCallbacks
@exception.wrap_exception
def rescue_instance(self, context, instance_id):
"""Rescue an instance on this server."""
@@ -175,10 +168,9 @@ class ComputeManager(manager.Manager):
instance_id,
power_state.NOSTATE,
'rescuing')
- yield self.driver.rescue(instance_ref)
+ self.driver.rescue(instance_ref)
self._update_state(context, instance_id)
- @defer.inlineCallbacks
@exception.wrap_exception
def unrescue_instance(self, context, instance_id):
"""Rescue an instance on this server."""
@@ -191,7 +183,7 @@ class ComputeManager(manager.Manager):
instance_id,
power_state.NOSTATE,
'unrescuing')
- yield self.driver.unrescue(instance_ref)
+ self.driver.unrescue(instance_ref)
self._update_state(context, instance_id)
@exception.wrap_exception
@@ -203,7 +195,6 @@ class ComputeManager(manager.Manager):
return self.driver.get_console_output(instance_ref)
- @defer.inlineCallbacks
@exception.wrap_exception
def attach_volume(self, context, instance_id, volume_id, mountpoint):
"""Attach a volume to an instance."""
@@ -211,12 +202,12 @@ class ComputeManager(manager.Manager):
logging.debug("instance %s: attaching volume %s to %s", instance_id,
volume_id, mountpoint)
instance_ref = self.db.instance_get(context, instance_id)
- dev_path = yield self.volume_manager.setup_compute_volume(context,
- volume_id)
+ dev_path = self.volume_manager.setup_compute_volume(context,
+ volume_id)
try:
- yield self.driver.attach_volume(instance_ref['name'],
- dev_path,
- mountpoint)
+ self.driver.attach_volume(instance_ref['name'],
+ dev_path,
+ mountpoint)
self.db.volume_attached(context,
volume_id,
instance_id,
@@ -227,12 +218,12 @@ class ComputeManager(manager.Manager):
# ecxception below.
logging.exception("instance %s: attach failed %s, removing",
instance_id, mountpoint)
- yield self.volume_manager.remove_compute_volume(context,
- volume_id)
+ self.volume_manager.remove_compute_volume(context,
+ volume_id)
raise exc
- defer.returnValue(True)
- @defer.inlineCallbacks
+ return True
+
@exception.wrap_exception
def detach_volume(self, context, instance_id, volume_id):
"""Detach a volume from an instance."""
@@ -246,8 +237,8 @@ class ComputeManager(manager.Manager):
logging.warn("Detaching volume from unknown instance %s",
instance_ref['name'])
else:
- yield self.driver.detach_volume(instance_ref['name'],
- volume_ref['mountpoint'])
- yield self.volume_manager.remove_compute_volume(context, volume_id)
+ self.driver.detach_volume(instance_ref['name'],
+ volume_ref['mountpoint'])
+ self.volume_manager.remove_compute_volume(context, volume_id)
self.db.volume_detached(context, volume_id)
- defer.returnValue(True)
+ return True
diff --git a/nova/flags.py b/nova/flags.py
index 5c265f4eaa..8fa0beb7a5 100644
--- a/nova/flags.py
+++ b/nova/flags.py
@@ -159,6 +159,7 @@ class StrWrapper(object):
return str(val)
raise KeyError(name)
+
FLAGS = FlagValues()
gflags.FLAGS = FLAGS
gflags.DEFINE_flag(gflags.HelpFlag(), FLAGS)
@@ -183,6 +184,12 @@ DEFINE_list = _wrapper(gflags.DEFINE_list)
DEFINE_spaceseplist = _wrapper(gflags.DEFINE_spaceseplist)
DEFINE_multistring = _wrapper(gflags.DEFINE_multistring)
DEFINE_multi_int = _wrapper(gflags.DEFINE_multi_int)
+DEFINE_flag = _wrapper(gflags.DEFINE_flag)
+
+
+HelpFlag = gflags.HelpFlag
+HelpshortFlag = gflags.HelpshortFlag
+HelpXMLFlag = gflags.HelpXMLFlag
def DECLARE(name, module_string, flag_values=FLAGS):
diff --git a/nova/manager.py b/nova/manager.py
index 5b61f7a4cb..3d38504bd8 100644
--- a/nova/manager.py
+++ b/nova/manager.py
@@ -55,7 +55,6 @@ from nova import utils
from nova import flags
from nova.db import base
-from twisted.internet import defer
FLAGS = flags.FLAGS
@@ -67,10 +66,9 @@ class Manager(base.Base):
self.host = host
super(Manager, self).__init__(db_driver)
- @defer.inlineCallbacks
def periodic_tasks(self, context=None):
"""Tasks to be run at a periodic interval"""
- yield
+ pass
def init_host(self):
"""Do any initialization that needs to be run if this is a standalone
diff --git a/nova/network/manager.py b/nova/network/manager.py
index a7298b47f0..6a30f30b79 100644
--- a/nova/network/manager.py
+++ b/nova/network/manager.py
@@ -49,7 +49,6 @@ import logging
import math
import IPy
-from twisted.internet import defer
from nova import context
from nova import db
@@ -399,10 +398,9 @@ class VlanManager(NetworkManager):
instances in its subnet.
"""
- @defer.inlineCallbacks
def periodic_tasks(self, context=None):
"""Tasks to be run at a periodic interval."""
- yield super(VlanManager, self).periodic_tasks(context)
+ super(VlanManager, self).periodic_tasks(context)
now = datetime.datetime.utcnow()
timeout = FLAGS.fixed_ip_disassociate_timeout
time = now - datetime.timedelta(seconds=timeout)
diff --git a/nova/objectstore/image.py b/nova/objectstore/image.py
index 7292dbab8f..abc28182ef 100644
--- a/nova/objectstore/image.py
+++ b/nova/objectstore/image.py
@@ -21,7 +21,6 @@ Take uploaded bucket contents and register them as disk images (AMIs).
Requires decryption using keys in the manifest.
"""
-# TODO(jesse): Got these from Euca2ools, will need to revisit them
import binascii
import glob
@@ -29,7 +28,6 @@ import json
import os
import shutil
import tarfile
-import tempfile
from xml.etree import ElementTree
from nova import exception
@@ -199,12 +197,17 @@ class Image(object):
except:
ramdisk_id = None
+ try:
+ arch = manifest.find("machine_configuration/architecture").text
+ except:
+ arch = 'x86_64'
+
info = {
'imageId': image_id,
'imageLocation': image_location,
'imageOwnerId': context.project_id,
'isPublic': False, # FIXME: grab public from manifest
- 'architecture': 'x86_64', # FIXME: grab architecture from manifest
+ 'architecture': arch,
'imageType': image_type}
if kernel_id:
@@ -264,6 +267,7 @@ class Image(object):
if err:
raise exception.Error("Failed to decrypt initialization "
"vector: %s" % err)
+
_out, err = utils.execute(
'openssl enc -d -aes-128-cbc -in %s -K %s -iv %s -out %s'
% (encrypted_filename, key, iv, decrypted_filename),
diff --git a/nova/process.py b/nova/process.py
deleted file mode 100644
index b33df048bf..0000000000
--- a/nova/process.py
+++ /dev/null
@@ -1,209 +0,0 @@
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
-# Copyright 2010 United States Government as represented by the
-# Administrator of the National Aeronautics and Space Administration.
-# Copyright 2010 FathomDB Inc.
-# All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-"""
-Process pool using twisted threading
-"""
-
-import logging
-import StringIO
-
-from twisted.internet import defer
-from twisted.internet import error
-from twisted.internet import protocol
-from twisted.internet import reactor
-
-from nova import flags
-from nova.exception import ProcessExecutionError
-
-FLAGS = flags.FLAGS
-flags.DEFINE_integer('process_pool_size', 4,
- 'Number of processes to use in the process pool')
-
-
-# This is based on _BackRelay from twister.internal.utils, but modified to
-# capture both stdout and stderr, without odd stderr handling, and also to
-# handle stdin
-class BackRelayWithInput(protocol.ProcessProtocol):
- """
- Trivial protocol for communicating with a process and turning its output
- into the result of a L{Deferred}.
-
- @ivar deferred: A L{Deferred} which will be called back with all of stdout
- and all of stderr as well (as a tuple). C{terminate_on_stderr} is true
- and any bytes are received over stderr, this will fire with an
- L{_ProcessExecutionError} instance and the attribute will be set to
- C{None}.
-
- @ivar onProcessEnded: If C{terminate_on_stderr} is false and bytes are
- received over stderr, this attribute will refer to a L{Deferred} which
- will be called back when the process ends. This C{Deferred} is also
- associated with the L{_ProcessExecutionError} which C{deferred} fires
- with earlier in this case so that users can determine when the process
- has actually ended, in addition to knowing when bytes have been
- received via stderr.
- """
-
- def __init__(self, deferred, cmd, started_deferred=None,
- terminate_on_stderr=False, check_exit_code=True,
- process_input=None):
- self.deferred = deferred
- self.cmd = cmd
- self.stdout = StringIO.StringIO()
- self.stderr = StringIO.StringIO()
- self.started_deferred = started_deferred
- self.terminate_on_stderr = terminate_on_stderr
- self.check_exit_code = check_exit_code
- self.process_input = process_input
- self.on_process_ended = None
-
- def _build_execution_error(self, exit_code=None):
- return ProcessExecutionError(cmd=self.cmd,
- exit_code=exit_code,
- stdout=self.stdout.getvalue(),
- stderr=self.stderr.getvalue())
-
- def errReceived(self, text):
- self.stderr.write(text)
- if self.terminate_on_stderr and (self.deferred is not None):
- self.on_process_ended = defer.Deferred()
- self.deferred.errback(self._build_execution_error())
- self.deferred = None
- self.transport.loseConnection()
-
- def outReceived(self, text):
- self.stdout.write(text)
-
- def processEnded(self, reason):
- if self.deferred is not None:
- stdout, stderr = self.stdout.getvalue(), self.stderr.getvalue()
- exit_code = reason.value.exitCode
- if self.check_exit_code and exit_code != 0:
- self.deferred.errback(self._build_execution_error(exit_code))
- else:
- try:
- if self.check_exit_code:
- reason.trap(error.ProcessDone)
- self.deferred.callback((stdout, stderr))
- except:
- # NOTE(justinsb): This logic is a little suspicious to me.
- # If the callback throws an exception, then errback will
- # be called also. However, this is what the unit tests
- # test for.
- exec_error = self._build_execution_error(exit_code)
- self.deferred.errback(exec_error)
- elif self.on_process_ended is not None:
- self.on_process_ended.errback(reason)
-
- def connectionMade(self):
- if self.started_deferred:
- self.started_deferred.callback(self)
- if self.process_input:
- self.transport.write(str(self.process_input))
- self.transport.closeStdin()
-
-
-def get_process_output(executable, args=None, env=None, path=None,
- process_reactor=None, check_exit_code=True,
- process_input=None, started_deferred=None,
- terminate_on_stderr=False):
- if process_reactor is None:
- process_reactor = reactor
- args = args and args or ()
- env = env and env and {}
- deferred = defer.Deferred()
- cmd = executable
- if args:
- cmd = " ".join([cmd] + args)
- logging.debug("Running cmd: %s", cmd)
- process_handler = BackRelayWithInput(
- deferred,
- cmd,
- started_deferred=started_deferred,
- check_exit_code=check_exit_code,
- process_input=process_input,
- terminate_on_stderr=terminate_on_stderr)
- # NOTE(vish): commands come in as unicode, but self.executes needs
- # strings or process.spawn raises a deprecation warning
- executable = str(executable)
- if not args is None:
- args = [str(x) for x in args]
- process_reactor.spawnProcess(process_handler, executable,
- (executable,) + tuple(args), env, path)
- return deferred
-
-
-class ProcessPool(object):
- """ A simple process pool implementation using Twisted's Process bits.
-
- This is pretty basic right now, but hopefully the API will be the correct
- one so that it can be optimized later.
- """
- def __init__(self, size=None):
- self.size = size and size or FLAGS.process_pool_size
- self._pool = defer.DeferredSemaphore(self.size)
-
- def simple_execute(self, cmd, **kw):
- """ Weak emulation of the old utils.execute() function.
-
- This only exists as a way to quickly move old execute methods to
- this new style of code.
-
- NOTE(termie): This will break on args with spaces in them.
- """
- parsed = cmd.split(' ')
- executable, args = parsed[0], parsed[1:]
- return self.execute(executable, args, **kw)
-
- def execute(self, *args, **kw):
- deferred = self._pool.acquire()
-
- def _associate_process(proto):
- deferred.process = proto.transport
- return proto.transport
-
- started = defer.Deferred()
- started.addCallback(_associate_process)
- kw.setdefault('started_deferred', started)
-
- deferred.process = None
- deferred.started = started
-
- deferred.addCallback(lambda _: get_process_output(*args, **kw))
- deferred.addBoth(self._release)
- return deferred
-
- def _release(self, retval=None):
- self._pool.release()
- return retval
-
-
-class SharedPool(object):
- _instance = None
-
- def __init__(self):
- if SharedPool._instance is None:
- self.__class__._instance = ProcessPool()
-
- def __getattr__(self, key):
- return getattr(self._instance, key)
-
-
-def simple_execute(cmd, **kwargs):
- return SharedPool().simple_execute(cmd, **kwargs)
diff --git a/nova/rpc.py b/nova/rpc.py
index 86a29574f3..6a3f552dbd 100644
--- a/nova/rpc.py
+++ b/nova/rpc.py
@@ -25,18 +25,18 @@ import json
import logging
import sys
import time
+import traceback
import uuid
from carrot import connection as carrot_connection
from carrot import messaging
from eventlet import greenthread
-from twisted.internet import defer
-from twisted.internet import task
+from nova import context
from nova import exception
from nova import fakerabbit
from nova import flags
-from nova import context
+from nova import utils
FLAGS = flags.FLAGS
@@ -128,17 +128,9 @@ class Consumer(messaging.Consumer):
def attach_to_eventlet(self):
"""Only needed for unit tests!"""
- def fetch_repeatedly():
- while True:
- self.fetch(enable_callbacks=True)
- greenthread.sleep(0.1)
- greenthread.spawn(fetch_repeatedly)
-
- def attach_to_twisted(self):
- """Attach a callback to twisted that fires 10 times a second"""
- loop = task.LoopingCall(self.fetch, enable_callbacks=True)
- loop.start(interval=0.1)
- return loop
+ timer = utils.LoopingCall(self.fetch, enable_callbacks=True)
+ timer.start(0.1)
+ return timer
class Publisher(messaging.Publisher):
@@ -196,11 +188,13 @@ class AdapterConsumer(TopicConsumer):
node_func = getattr(self.proxy, str(method))
node_args = dict((str(k), v) for k, v in args.iteritems())
# NOTE(vish): magic is fun!
- # pylint: disable-msg=W0142
- d = defer.maybeDeferred(node_func, context=ctxt, **node_args)
- if msg_id:
- d.addCallback(lambda rval: msg_reply(msg_id, rval, None))
- d.addErrback(lambda e: msg_reply(msg_id, None, e))
+ try:
+ rval = node_func(context=ctxt, **node_args)
+ if msg_id:
+ msg_reply(msg_id, rval, None)
+ except Exception as e:
+ if msg_id:
+ msg_reply(msg_id, None, sys.exc_info())
return
@@ -242,13 +236,15 @@ class DirectPublisher(Publisher):
def msg_reply(msg_id, reply=None, failure=None):
"""Sends a reply or an error on the channel signified by msg_id
- failure should be a twisted failure object"""
+ failure should be a sys.exc_info() tuple.
+
+ """
if failure:
- message = failure.getErrorMessage()
- traceback = failure.getTraceback()
+ message = str(failure[1])
+ tb = traceback.format_exception(*failure)
logging.error("Returning exception %s to caller", message)
- logging.error(traceback)
- failure = (failure.type.__name__, str(failure.value), traceback)
+ logging.error(tb)
+ failure = (failure[0].__name__, str(failure[1]), tb)
conn = Connection.instance()
publisher = DirectPublisher(connection=conn, msg_id=msg_id)
try:
@@ -313,7 +309,6 @@ def call(context, topic, msg):
_pack_context(msg, context)
class WaitMessage(object):
-
def __call__(self, data, message):
"""Acks message and sets result."""
message.ack()
@@ -337,41 +332,15 @@ def call(context, topic, msg):
except StopIteration:
pass
consumer.close()
+ # NOTE(termie): this is a little bit of a change from the original
+ # non-eventlet code where returning a Failure
+ # instance from a deferred call is very similar to
+ # raising an exception
+ if isinstance(wait_msg.result, Exception):
+ raise wait_msg.result
return wait_msg.result
-def call_twisted(context, topic, msg):
- """Sends a message on a topic and wait for a response"""
- LOG.debug("Making asynchronous call...")
- msg_id = uuid.uuid4().hex
- msg.update({'_msg_id': msg_id})
- LOG.debug("MSG_ID is %s" % (msg_id))
- _pack_context(msg, context)
-
- conn = Connection.instance()
- d = defer.Deferred()
- consumer = DirectConsumer(connection=conn, msg_id=msg_id)
-
- def deferred_receive(data, message):
- """Acks message and callbacks or errbacks"""
- message.ack()
- if data['failure']:
- return d.errback(RemoteError(*data['failure']))
- else:
- return d.callback(data['result'])
-
- consumer.register_callback(deferred_receive)
- injected = consumer.attach_to_twisted()
-
- # clean up after the injected listened and return x
- d.addCallback(lambda x: injected.stop() and x or x)
-
- publisher = TopicPublisher(connection=conn, topic=topic)
- publisher.send(msg)
- publisher.close()
- return d
-
-
def cast(context, topic, msg):
"""Sends a message on a topic without waiting for a response"""
LOG.debug("Making asynchronous cast...")
diff --git a/nova/server.py b/nova/server.py
deleted file mode 100644
index a0ee54681a..0000000000
--- a/nova/server.py
+++ /dev/null
@@ -1,151 +0,0 @@
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
-# Copyright 2010 United States Government as represented by the
-# Administrator of the National Aeronautics and Space Administration.
-# All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-"""
-Base functionality for nova daemons - gradually being replaced with twistd.py.
-"""
-
-import daemon
-from daemon import pidlockfile
-import logging
-import logging.handlers
-import os
-import signal
-import sys
-import time
-
-from nova import flags
-
-
-FLAGS = flags.FLAGS
-flags.DEFINE_bool('daemonize', False, 'daemonize this process')
-# NOTE(termie): right now I am defaulting to using syslog when we daemonize
-# it may be better to do something else -shrug-
-# NOTE(Devin): I think we should let each process have its own log file
-# and put it in /var/logs/nova/(appname).log
-# This makes debugging much easier and cuts down on sys log
-# clutter.
-flags.DEFINE_bool('use_syslog', True, 'output to syslog when daemonizing')
-flags.DEFINE_string('logfile', None, 'log file to output to')
-flags.DEFINE_string('logdir', None, 'directory to keep log files in '
- '(will be prepended to $logfile)')
-flags.DEFINE_string('pidfile', None, 'pid file to output to')
-flags.DEFINE_string('working_directory', './', 'working directory...')
-flags.DEFINE_integer('uid', os.getuid(), 'uid under which to run')
-flags.DEFINE_integer('gid', os.getgid(), 'gid under which to run')
-
-
-def stop(pidfile):
- """
- Stop the daemon
- """
- # Get the pid from the pidfile
- try:
- pid = int(open(pidfile, 'r').read().strip())
- except IOError:
- message = "pidfile %s does not exist. Daemon not running?\n"
- sys.stderr.write(message % pidfile)
- return
-
- # Try killing the daemon process
- try:
- while 1:
- os.kill(pid, signal.SIGTERM)
- time.sleep(0.1)
- except OSError, err:
- err = str(err)
- if err.find("No such process") > 0:
- if os.path.exists(pidfile):
- os.remove(pidfile)
- else:
- print str(err)
- sys.exit(1)
-
-
-def serve(name, main):
- """Controller for server"""
- argv = FLAGS(sys.argv)
-
- if not FLAGS.pidfile:
- FLAGS.pidfile = '%s.pid' % name
-
- logging.debug("Full set of FLAGS: \n\n\n")
- for flag in FLAGS:
- logging.debug("%s : %s", flag, FLAGS.get(flag, None))
-
- action = 'start'
- if len(argv) > 1:
- action = argv.pop()
-
- if action == 'stop':
- stop(FLAGS.pidfile)
- sys.exit()
- elif action == 'restart':
- stop(FLAGS.pidfile)
- elif action == 'start':
- pass
- else:
- print 'usage: %s [options] [start|stop|restart]' % argv[0]
- sys.exit(1)
- daemonize(argv, name, main)
-
-
-def daemonize(args, name, main):
- """Does the work of daemonizing the process"""
- logging.getLogger('amqplib').setLevel(logging.WARN)
- files_to_keep = []
- if FLAGS.daemonize:
- logger = logging.getLogger()
- formatter = logging.Formatter(
- name + '(%(name)s): %(levelname)s %(message)s')
- if FLAGS.use_syslog and not FLAGS.logfile:
- syslog = logging.handlers.SysLogHandler(address='/dev/log')
- syslog.setFormatter(formatter)
- logger.addHandler(syslog)
- files_to_keep.append(syslog.socket)
- else:
- if not FLAGS.logfile:
- FLAGS.logfile = '%s.log' % name
- if FLAGS.logdir:
- FLAGS.logfile = os.path.join(FLAGS.logdir, FLAGS.logfile)
- logfile = logging.FileHandler(FLAGS.logfile)
- logfile.setFormatter(formatter)
- logger.addHandler(logfile)
- files_to_keep.append(logfile.stream)
- stdin, stdout, stderr = None, None, None
- else:
- stdin, stdout, stderr = sys.stdin, sys.stdout, sys.stderr
-
- if FLAGS.verbose:
- logging.getLogger().setLevel(logging.DEBUG)
- else:
- logging.getLogger().setLevel(logging.WARNING)
-
- with daemon.DaemonContext(
- detach_process=FLAGS.daemonize,
- working_directory=FLAGS.working_directory,
- pidfile=pidlockfile.TimeoutPIDLockFile(FLAGS.pidfile,
- acquire_timeout=1,
- threaded=False),
- stdin=stdin,
- stdout=stdout,
- stderr=stderr,
- uid=FLAGS.uid,
- gid=FLAGS.gid,
- files_preserve=files_to_keep):
- main(args)
diff --git a/nova/service.py b/nova/service.py
index 9454d4049b..ac30aaceba 100644
--- a/nova/service.py
+++ b/nova/service.py
@@ -17,21 +17,17 @@
# under the License.
"""
-A service is a very thin wrapper around a Manager object. It exposes the
-manager's public methods to other components of the system via rpc. It will
-report state periodically to the database and is responsible for initiating
-any periodic tasts that need to be executed on a given host.
-
-This module contains Service, a generic baseclass for all workers.
+Generic Node baseclass for all workers that run on hosts
"""
import inspect
import logging
import os
+import sys
-from twisted.internet import defer
-from twisted.internet import task
-from twisted.application import service
+from eventlet import event
+from eventlet import greenthread
+from eventlet import greenpool
from nova import context
from nova import db
@@ -50,8 +46,16 @@ flags.DEFINE_integer('periodic_interval', 60,
'seconds between running periodic tasks',
lower_bound=1)
+flags.DEFINE_string('pidfile', None,
+ 'pidfile to use for this service')
+
+
+flags.DEFINE_flag(flags.HelpFlag())
+flags.DEFINE_flag(flags.HelpshortFlag())
+flags.DEFINE_flag(flags.HelpXMLFlag())
+
-class Service(object, service.Service):
+class Service(object):
"""Base class for workers that run on hosts."""
def __init__(self, host, binary, topic, manager, report_interval=None,
@@ -64,8 +68,9 @@ class Service(object, service.Service):
self.periodic_interval = periodic_interval
super(Service, self).__init__(*args, **kwargs)
self.saved_args, self.saved_kwargs = args, kwargs
+ self.timers = []
- def startService(self): # pylint: disable-msg C0103
+ def start(self):
manager_class = utils.import_class(self.manager_class_name)
self.manager = manager_class(host=self.host, *self.saved_args,
**self.saved_kwargs)
@@ -80,26 +85,29 @@ class Service(object, service.Service):
except exception.NotFound:
self._create_service_ref(ctxt)
- conn = rpc.Connection.instance()
+ conn1 = rpc.Connection.instance(new=True)
+ conn2 = rpc.Connection.instance(new=True)
if self.report_interval:
consumer_all = rpc.AdapterConsumer(
- connection=conn,
+ connection=conn1,
topic=self.topic,
proxy=self)
consumer_node = rpc.AdapterConsumer(
- connection=conn,
+ connection=conn2,
topic='%s.%s' % (self.topic, self.host),
proxy=self)
- consumer_all.attach_to_twisted()
- consumer_node.attach_to_twisted()
+ self.timers.append(consumer_all.attach_to_eventlet())
+ self.timers.append(consumer_node.attach_to_eventlet())
- pulse = task.LoopingCall(self.report_state)
+ pulse = utils.LoopingCall(self.report_state)
pulse.start(interval=self.report_interval, now=False)
+ self.timers.append(pulse)
if self.periodic_interval:
- pulse = task.LoopingCall(self.periodic_tasks)
- pulse.start(interval=self.periodic_interval, now=False)
+ periodic = utils.LoopingCall(self.periodic_tasks)
+ periodic.start(interval=self.periodic_interval, now=False)
+ self.timers.append(periodic)
def _create_service_ref(self, context):
service_ref = db.service_create(context,
@@ -147,25 +155,28 @@ class Service(object, service.Service):
service_obj = cls(host, binary, topic, manager,
report_interval, periodic_interval)
- # This is the parent service that twistd will be looking for when it
- # parses this file, return it so that we can get it into globals.
- application = service.Application(binary)
- service_obj.setServiceParent(application)
- return application
+ return service_obj
def kill(self):
"""Destroy the service object in the datastore"""
+ self.stop()
try:
db.service_destroy(context.get_admin_context(), self.service_id)
except exception.NotFound:
logging.warn("Service killed that has no database entry")
- @defer.inlineCallbacks
+ def stop(self):
+ for x in self.timers:
+ try:
+ x.stop()
+ except Exception:
+ pass
+ self.timers = []
+
def periodic_tasks(self):
"""Tasks to be run at a periodic interval"""
- yield self.manager.periodic_tasks(context.get_admin_context())
+ self.manager.periodic_tasks(context.get_admin_context())
- @defer.inlineCallbacks
def report_state(self):
"""Update the state of this service in the datastore."""
ctxt = context.get_admin_context()
@@ -192,4 +203,32 @@ class Service(object, service.Service):
if not getattr(self, "model_disconnected", False):
self.model_disconnected = True
logging.exception("model server went away")
- yield
+
+
+def serve(*services):
+ argv = FLAGS(sys.argv)
+
+ if not services:
+ services = [Service.create()]
+
+ name = '_'.join(x.binary for x in services)
+ logging.debug("Serving %s" % name)
+
+ logging.getLogger('amqplib').setLevel(logging.WARN)
+
+ if FLAGS.verbose:
+ logging.getLogger().setLevel(logging.DEBUG)
+ else:
+ logging.getLogger().setLevel(logging.WARNING)
+
+ logging.debug("Full set of FLAGS:")
+ for flag in FLAGS:
+ logging.debug("%s : %s" % (flag, FLAGS.get(flag, None)))
+
+ for x in services:
+ x.start()
+
+
+def wait():
+ while True:
+ greenthread.sleep(5)
diff --git a/nova/test.py b/nova/test.py
index 5c2a728196..7076f1bf4e 100644
--- a/nova/test.py
+++ b/nova/test.py
@@ -25,11 +25,12 @@ and some black magic for inline callbacks.
import datetime
import sys
import time
+import unittest
import mox
import stubout
from twisted.internet import defer
-from twisted.trial import unittest
+from twisted.trial import unittest as trial_unittest
from nova import context
from nova import db
@@ -55,11 +56,11 @@ def skip_if_fake(func):
return _skipper
-class TrialTestCase(unittest.TestCase):
+class TestCase(unittest.TestCase):
"""Test case base class for all unit tests"""
def setUp(self):
"""Run before each test method to initialize test environment"""
- super(TrialTestCase, self).setUp()
+ super(TestCase, self).setUp()
# NOTE(vish): We need a better method for creating fixtures for tests
# now that we have some required db setup for the system
# to work properly.
@@ -94,7 +95,87 @@ class TrialTestCase(unittest.TestCase):
db.fixed_ip_disassociate_all_by_timeout(ctxt, FLAGS.host,
self.start)
db.network_disassociate_all(ctxt)
- rpc.Consumer.attach_to_twisted = self.originalAttach
+ rpc.Consumer.attach_to_eventlet = self.originalAttach
+ for x in self.injected:
+ try:
+ x.stop()
+ except AssertionError:
+ pass
+
+ if FLAGS.fake_rabbit:
+ fakerabbit.reset_all()
+
+ db.security_group_destroy_all(ctxt)
+ super(TestCase, self).tearDown()
+ finally:
+ self.reset_flags()
+
+ def flags(self, **kw):
+ """Override flag variables for a test"""
+ for k, v in kw.iteritems():
+ if k in self.flag_overrides:
+ self.reset_flags()
+ raise Exception(
+ 'trying to override already overriden flag: %s' % k)
+ self.flag_overrides[k] = getattr(FLAGS, k)
+ setattr(FLAGS, k, v)
+
+ def reset_flags(self):
+ """Resets all flag variables for the test. Runs after each test"""
+ FLAGS.Reset()
+ for k, v in self._original_flags.iteritems():
+ setattr(FLAGS, k, v)
+
+ def _monkey_patch_attach(self):
+ self.originalAttach = rpc.Consumer.attach_to_eventlet
+
+ def _wrapped(innerSelf):
+ rv = self.originalAttach(innerSelf)
+ self.injected.append(rv)
+ return rv
+
+ _wrapped.func_name = self.originalAttach.func_name
+ rpc.Consumer.attach_to_eventlet = _wrapped
+
+
+class TrialTestCase(trial_unittest.TestCase):
+ """Test case base class for all unit tests"""
+ def setUp(self):
+ """Run before each test method to initialize test environment"""
+ super(TrialTestCase, self).setUp()
+ # NOTE(vish): We need a better method for creating fixtures for tests
+ # now that we have some required db setup for the system
+ # to work properly.
+ self.start = datetime.datetime.utcnow()
+ ctxt = context.get_admin_context()
+ if db.network_count(ctxt) != 5:
+ network_manager.VlanManager().create_networks(ctxt,
+ FLAGS.fixed_range,
+ 5, 16,
+ FLAGS.vlan_start,
+ FLAGS.vpn_start)
+
+ # emulate some of the mox stuff, we can't use the metaclass
+ # because it screws with our generators
+ self.mox = mox.Mox()
+ self.stubs = stubout.StubOutForTesting()
+ self.flag_overrides = {}
+ self.injected = []
+ self._original_flags = FLAGS.FlagValuesDict()
+
+ def tearDown(self):
+ """Runs after each test method to finalize/tear down test
+ environment."""
+ try:
+ self.mox.UnsetStubs()
+ self.stubs.UnsetAll()
+ self.stubs.SmartUnsetAll()
+ self.mox.VerifyAll()
+ # NOTE(vish): Clean up any ips associated during the test.
+ ctxt = context.get_admin_context()
+ db.fixed_ip_disassociate_all_by_timeout(ctxt, FLAGS.host,
+ self.start)
+ db.network_disassociate_all(ctxt)
for x in self.injected:
try:
x.stop()
@@ -147,14 +228,3 @@ class TrialTestCase(unittest.TestCase):
return d
_wrapped.func_name = func.func_name
return _wrapped
-
- def _monkey_patch_attach(self):
- self.originalAttach = rpc.Consumer.attach_to_twisted
-
- def _wrapped(innerSelf):
- rv = self.originalAttach(innerSelf)
- self.injected.append(rv)
- return rv
-
- _wrapped.func_name = self.originalAttach.func_name
- rpc.Consumer.attach_to_twisted = _wrapped
diff --git a/nova/tests/__init__.py b/nova/tests/__init__.py
index aaf213923e..8dc87d0e2d 100644
--- a/nova/tests/__init__.py
+++ b/nova/tests/__init__.py
@@ -29,3 +29,8 @@
.. moduleauthor:: Manish Singh <yosh@gimp.org>
.. moduleauthor:: Andy Smith <andy@anarkystic.com>
"""
+
+# See http://code.google.com/p/python-nose/issues/detail?id=373
+# The code below enables nosetests to work with i18n _() blocks
+import __builtin__
+setattr(__builtin__, '_', lambda x: x)
diff --git a/nova/tests/access_unittest.py b/nova/tests/access_unittest.py
index 0f66c0a26b..58fdea3b5c 100644
--- a/nova/tests/access_unittest.py
+++ b/nova/tests/access_unittest.py
@@ -35,7 +35,7 @@ class Context(object):
pass
-class AccessTestCase(test.TrialTestCase):
+class AccessTestCase(test.TestCase):
def setUp(self):
super(AccessTestCase, self).setUp()
um = manager.AuthManager()
diff --git a/nova/tests/auth_unittest.py b/nova/tests/auth_unittest.py
index fe891beeeb..4508d67216 100644
--- a/nova/tests/auth_unittest.py
+++ b/nova/tests/auth_unittest.py
@@ -326,12 +326,12 @@ class AuthManagerTestCase(object):
self.assertTrue(user.is_admin())
-class AuthManagerLdapTestCase(AuthManagerTestCase, test.TrialTestCase):
+class AuthManagerLdapTestCase(AuthManagerTestCase, test.TestCase):
auth_driver = 'nova.auth.ldapdriver.FakeLdapDriver'
def __init__(self, *args, **kwargs):
AuthManagerTestCase.__init__(self)
- test.TrialTestCase.__init__(self, *args, **kwargs)
+ test.TestCase.__init__(self, *args, **kwargs)
import nova.auth.fakeldap as fakeldap
FLAGS.redis_db = 8
if FLAGS.flush_db:
@@ -343,7 +343,7 @@ class AuthManagerLdapTestCase(AuthManagerTestCase, test.TrialTestCase):
self.skip = True
-class AuthManagerDbTestCase(AuthManagerTestCase, test.TrialTestCase):
+class AuthManagerDbTestCase(AuthManagerTestCase, test.TestCase):
auth_driver = 'nova.auth.dbdriver.DbDriver'
diff --git a/nova/tests/cloud_unittest.py b/nova/tests/cloud_unittest.py
index 770c942198..53a762310b 100644
--- a/nova/tests/cloud_unittest.py
+++ b/nova/tests/cloud_unittest.py
@@ -27,8 +27,6 @@ import tempfile
import time
from eventlet import greenthread
-from twisted.internet import defer
-import unittest
from xml.etree import ElementTree
from nova import context
@@ -53,7 +51,7 @@ IMAGES_PATH = os.path.join(OSS_TEMPDIR, 'images')
os.makedirs(IMAGES_PATH)
-class CloudTestCase(test.TrialTestCase):
+class CloudTestCase(test.TestCase):
def setUp(self):
super(CloudTestCase, self).setUp()
self.flags(connection_type='fake', images_path=IMAGES_PATH)
@@ -199,7 +197,7 @@ class CloudTestCase(test.TrialTestCase):
logging.debug("Need to watch instance %s until it's running..." %
instance['instance_id'])
while True:
- rv = yield defer.succeed(time.sleep(1))
+ greenthread.sleep(1)
info = self.cloud._get_instance(instance['instance_id'])
logging.debug(info['state'])
if info['state'] == power_state.RUNNING:
diff --git a/nova/tests/compute_unittest.py b/nova/tests/compute_unittest.py
index 6f3ef96cbb..c6353d3575 100644
--- a/nova/tests/compute_unittest.py
+++ b/nova/tests/compute_unittest.py
@@ -22,8 +22,6 @@ Tests For Compute
import datetime
import logging
-from twisted.internet import defer
-
from nova import context
from nova import db
from nova import exception
@@ -33,10 +31,11 @@ from nova import utils
from nova.auth import manager
from nova.compute import api as compute_api
+
FLAGS = flags.FLAGS
-class ComputeTestCase(test.TrialTestCase):
+class ComputeTestCase(test.TestCase):
"""Test case for compute"""
def setUp(self):
logging.getLogger().setLevel(logging.DEBUG)
@@ -94,24 +93,22 @@ class ComputeTestCase(test.TrialTestCase):
db.security_group_destroy(self.context, group['id'])
db.instance_destroy(self.context, ref[0]['id'])
- @defer.inlineCallbacks
def test_run_terminate(self):
"""Make sure it is possible to run and terminate instance"""
instance_id = self._create_instance()
- yield self.compute.run_instance(self.context, instance_id)
+ self.compute.run_instance(self.context, instance_id)
instances = db.instance_get_all(context.get_admin_context())
logging.info("Running instances: %s", instances)
self.assertEqual(len(instances), 1)
- yield self.compute.terminate_instance(self.context, instance_id)
+ self.compute.terminate_instance(self.context, instance_id)
instances = db.instance_get_all(context.get_admin_context())
logging.info("After terminating instances: %s", instances)
self.assertEqual(len(instances), 0)
- @defer.inlineCallbacks
def test_run_terminate_timestamps(self):
"""Make sure timestamps are set for launched and destroyed"""
instance_id = self._create_instance()
@@ -119,42 +116,40 @@ class ComputeTestCase(test.TrialTestCase):
self.assertEqual(instance_ref['launched_at'], None)
self.assertEqual(instance_ref['deleted_at'], None)
launch = datetime.datetime.utcnow()
- yield self.compute.run_instance(self.context, instance_id)
+ self.compute.run_instance(self.context, instance_id)
instance_ref = db.instance_get(self.context, instance_id)
self.assert_(instance_ref['launched_at'] > launch)
self.assertEqual(instance_ref['deleted_at'], None)
terminate = datetime.datetime.utcnow()
- yield self.compute.terminate_instance(self.context, instance_id)
+ self.compute.terminate_instance(self.context, instance_id)
self.context = self.context.elevated(True)
instance_ref = db.instance_get(self.context, instance_id)
self.assert_(instance_ref['launched_at'] < terminate)
self.assert_(instance_ref['deleted_at'] > terminate)
- @defer.inlineCallbacks
def test_reboot(self):
"""Ensure instance can be rebooted"""
instance_id = self._create_instance()
- yield self.compute.run_instance(self.context, instance_id)
- yield self.compute.reboot_instance(self.context, instance_id)
- yield self.compute.terminate_instance(self.context, instance_id)
+ self.compute.run_instance(self.context, instance_id)
+ self.compute.reboot_instance(self.context, instance_id)
+ self.compute.terminate_instance(self.context, instance_id)
- @defer.inlineCallbacks
def test_console_output(self):
"""Make sure we can get console output from instance"""
instance_id = self._create_instance()
- yield self.compute.run_instance(self.context, instance_id)
+ self.compute.run_instance(self.context, instance_id)
- console = yield self.compute.get_console_output(self.context,
+ console = self.compute.get_console_output(self.context,
instance_id)
self.assert_(console)
- yield self.compute.terminate_instance(self.context, instance_id)
+ self.compute.terminate_instance(self.context, instance_id)
- @defer.inlineCallbacks
def test_run_instance_existing(self):
"""Ensure failure when running an instance that already exists"""
instance_id = self._create_instance()
- yield self.compute.run_instance(self.context, instance_id)
- self.assertFailure(self.compute.run_instance(self.context,
- instance_id),
- exception.Error)
- yield self.compute.terminate_instance(self.context, instance_id)
+ self.compute.run_instance(self.context, instance_id)
+ self.assertRaises(exception.Error,
+ self.compute.run_instance,
+ self.context,
+ instance_id)
+ self.compute.terminate_instance(self.context, instance_id)
diff --git a/nova/tests/flags_unittest.py b/nova/tests/flags_unittest.py
index b97df075de..707300fcf6 100644
--- a/nova/tests/flags_unittest.py
+++ b/nova/tests/flags_unittest.py
@@ -24,7 +24,7 @@ FLAGS = flags.FLAGS
flags.DEFINE_string('flags_unittest', 'foo', 'for testing purposes only')
-class FlagsTestCase(test.TrialTestCase):
+class FlagsTestCase(test.TestCase):
def setUp(self):
super(FlagsTestCase, self).setUp()
diff --git a/nova/tests/misc_unittest.py b/nova/tests/misc_unittest.py
index 667c63ad09..3d947427a3 100644
--- a/nova/tests/misc_unittest.py
+++ b/nova/tests/misc_unittest.py
@@ -20,7 +20,7 @@ from nova import test
from nova.utils import parse_mailmap, str_dict_replace
-class ProjectTestCase(test.TrialTestCase):
+class ProjectTestCase(test.TestCase):
def test_authors_up_to_date(self):
if os.path.exists('../.bzr'):
contributors = set()
@@ -30,23 +30,26 @@ class ProjectTestCase(test.TrialTestCase):
import bzrlib.workingtree
tree = bzrlib.workingtree.WorkingTree.open('..')
tree.lock_read()
- parents = tree.get_parent_ids()
- g = tree.branch.repository.get_graph()
- for p in parents[1:]:
- rev_ids = [r for r, _ in g.iter_ancestry(parents)
- if r != "null:"]
- revs = tree.branch.repository.get_revisions(rev_ids)
- for r in revs:
- for author in r.get_apparent_authors():
- email = author.split(' ')[-1]
- contributors.add(str_dict_replace(email, mailmap))
-
- authors_file = open('../Authors', 'r').read()
-
- missing = set()
- for contributor in contributors:
- if not contributor in authors_file:
- missing.add(contributor)
-
- self.assertTrue(len(missing) == 0,
- '%r not listed in Authors' % missing)
+ try:
+ parents = tree.get_parent_ids()
+ g = tree.branch.repository.get_graph()
+ for p in parents[1:]:
+ rev_ids = [r for r, _ in g.iter_ancestry(parents)
+ if r != "null:"]
+ revs = tree.branch.repository.get_revisions(rev_ids)
+ for r in revs:
+ for author in r.get_apparent_authors():
+ email = author.split(' ')[-1]
+ contributors.add(str_dict_replace(email, mailmap))
+
+ authors_file = open('../Authors', 'r').read()
+
+ missing = set()
+ for contributor in contributors:
+ if not contributor in authors_file:
+ missing.add(contributor)
+
+ self.assertTrue(len(missing) == 0,
+ '%r not listed in Authors' % missing)
+ finally:
+ tree.unlock()
diff --git a/nova/tests/network_unittest.py b/nova/tests/network_unittest.py
index 6f4705719c..bcac205855 100644
--- a/nova/tests/network_unittest.py
+++ b/nova/tests/network_unittest.py
@@ -33,7 +33,7 @@ from nova.auth import manager
FLAGS = flags.FLAGS
-class NetworkTestCase(test.TrialTestCase):
+class NetworkTestCase(test.TestCase):
"""Test cases for network code"""
def setUp(self):
super(NetworkTestCase, self).setUp()
diff --git a/nova/tests/objectstore_unittest.py b/nova/tests/objectstore_unittest.py
index 0617999234..ceac17adb6 100644
--- a/nova/tests/objectstore_unittest.py
+++ b/nova/tests/objectstore_unittest.py
@@ -54,7 +54,7 @@ os.makedirs(os.path.join(OSS_TEMPDIR, 'images'))
os.makedirs(os.path.join(OSS_TEMPDIR, 'buckets'))
-class ObjectStoreTestCase(test.TrialTestCase):
+class ObjectStoreTestCase(test.TestCase):
"""Test objectstore API directly."""
def setUp(self):
@@ -191,7 +191,7 @@ class TestSite(server.Site):
protocol = TestHTTPChannel
-class S3APITestCase(test.TrialTestCase):
+class S3APITestCase(test.TestCase):
"""Test objectstore through S3 API."""
def setUp(self):
diff --git a/nova/tests/process_unittest.py b/nova/tests/process_unittest.py
deleted file mode 100644
index 67245af039..0000000000
--- a/nova/tests/process_unittest.py
+++ /dev/null
@@ -1,132 +0,0 @@
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
-# Copyright 2010 United States Government as represented by the
-# Administrator of the National Aeronautics and Space Administration.
-# All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import logging
-from twisted.internet import defer
-from twisted.internet import reactor
-from xml.etree import ElementTree
-
-from nova import exception
-from nova import flags
-from nova import process
-from nova import test
-from nova import utils
-
-FLAGS = flags.FLAGS
-
-
-class ProcessTestCase(test.TrialTestCase):
- def setUp(self):
- logging.getLogger().setLevel(logging.DEBUG)
- super(ProcessTestCase, self).setUp()
-
- def test_execute_stdout(self):
- pool = process.ProcessPool(2)
- d = pool.simple_execute('echo test')
-
- def _check(rv):
- self.assertEqual(rv[0], 'test\n')
- self.assertEqual(rv[1], '')
-
- d.addCallback(_check)
- d.addErrback(self.fail)
- return d
-
- def test_execute_stderr(self):
- pool = process.ProcessPool(2)
- d = pool.simple_execute('cat BAD_FILE', check_exit_code=False)
-
- def _check(rv):
- self.assertEqual(rv[0], '')
- self.assert_('No such file' in rv[1])
-
- d.addCallback(_check)
- d.addErrback(self.fail)
- return d
-
- def test_execute_unexpected_stderr(self):
- pool = process.ProcessPool(2)
- d = pool.simple_execute('cat BAD_FILE')
- d.addCallback(lambda x: self.fail('should have raised an error'))
- d.addErrback(lambda failure: failure.trap(IOError))
- return d
-
- def test_max_processes(self):
- pool = process.ProcessPool(2)
- d1 = pool.simple_execute('sleep 0.01')
- d2 = pool.simple_execute('sleep 0.01')
- d3 = pool.simple_execute('sleep 0.005')
- d4 = pool.simple_execute('sleep 0.005')
-
- called = []
-
- def _called(rv, name):
- called.append(name)
-
- d1.addCallback(_called, 'd1')
- d2.addCallback(_called, 'd2')
- d3.addCallback(_called, 'd3')
- d4.addCallback(_called, 'd4')
-
- # Make sure that d3 and d4 had to wait on the other two and were called
- # in order
- # NOTE(termie): there may be a race condition in this test if for some
- # reason one of the sleeps takes longer to complete
- # than it should
- d4.addCallback(lambda x: self.assertEqual(called[2], 'd3'))
- d4.addCallback(lambda x: self.assertEqual(called[3], 'd4'))
- d4.addErrback(self.fail)
- return d4
-
- def test_kill_long_process(self):
- pool = process.ProcessPool(2)
-
- d1 = pool.simple_execute('sleep 1')
- d2 = pool.simple_execute('sleep 0.005')
-
- timeout = reactor.callLater(0.1, self.fail, 'should have been killed')
-
- # kill d1 and wait on it to end then cancel the timeout
- d2.addCallback(lambda _: d1.process.signalProcess('KILL'))
- d2.addCallback(lambda _: d1)
- d2.addBoth(lambda _: timeout.active() and timeout.cancel())
- d2.addErrback(self.fail)
- return d2
-
- def test_process_exit_is_contained(self):
- pool = process.ProcessPool(2)
-
- d1 = pool.simple_execute('sleep 1')
- d1.addCallback(lambda x: self.fail('should have errbacked'))
- d1.addErrback(lambda fail: fail.trap(IOError))
- reactor.callLater(0.05, d1.process.signalProcess, 'KILL')
-
- return d1
-
- def test_shared_pool_is_singleton(self):
- pool1 = process.SharedPool()
- pool2 = process.SharedPool()
- self.assertEqual(id(pool1._instance), id(pool2._instance))
-
- def test_shared_pool_works_as_singleton(self):
- d1 = process.simple_execute('sleep 1')
- d2 = process.simple_execute('sleep 0.005')
- # lp609749: would have failed with
- # exceptions.AssertionError: Someone released me too many times:
- # too many tokens!
- return d1
diff --git a/nova/tests/quota_unittest.py b/nova/tests/quota_unittest.py
index 1966b51f7b..8cf2a5e546 100644
--- a/nova/tests/quota_unittest.py
+++ b/nova/tests/quota_unittest.py
@@ -32,7 +32,7 @@ from nova.api.ec2 import cloud
FLAGS = flags.FLAGS
-class QuotaTestCase(test.TrialTestCase):
+class QuotaTestCase(test.TestCase):
def setUp(self):
logging.getLogger().setLevel(logging.DEBUG)
super(QuotaTestCase, self).setUp()
diff --git a/nova/tests/rpc_unittest.py b/nova/tests/rpc_unittest.py
index f35b65a39c..a2495e65a2 100644
--- a/nova/tests/rpc_unittest.py
+++ b/nova/tests/rpc_unittest.py
@@ -20,8 +20,6 @@ Unit Tests for remote procedure calls using queue
"""
import logging
-from twisted.internet import defer
-
from nova import context
from nova import flags
from nova import rpc
@@ -31,7 +29,7 @@ from nova import test
FLAGS = flags.FLAGS
-class RpcTestCase(test.TrialTestCase):
+class RpcTestCase(test.TestCase):
"""Test cases for rpc"""
def setUp(self):
super(RpcTestCase, self).setUp()
@@ -40,23 +38,22 @@ class RpcTestCase(test.TrialTestCase):
self.consumer = rpc.AdapterConsumer(connection=self.conn,
topic='test',
proxy=self.receiver)
- self.consumer.attach_to_twisted()
+ self.consumer.attach_to_eventlet()
self.context = context.get_admin_context()
def test_call_succeed(self):
"""Get a value through rpc call"""
value = 42
- result = yield rpc.call_twisted(self.context,
- 'test', {"method": "echo",
+ result = rpc.call(self.context, 'test', {"method": "echo",
"args": {"value": value}})
self.assertEqual(value, result)
def test_context_passed(self):
"""Makes sure a context is passed through rpc call"""
value = 42
- result = yield rpc.call_twisted(self.context,
- 'test', {"method": "context",
- "args": {"value": value}})
+ result = rpc.call(self.context,
+ 'test', {"method": "context",
+ "args": {"value": value}})
self.assertEqual(self.context.to_dict(), result)
def test_call_exception(self):
@@ -67,14 +64,17 @@ class RpcTestCase(test.TrialTestCase):
to an int in the test.
"""
value = 42
- self.assertFailure(rpc.call_twisted(self.context, 'test',
- {"method": "fail",
- "args": {"value": value}}),
- rpc.RemoteError)
+ self.assertRaises(rpc.RemoteError,
+ rpc.call,
+ self.context,
+ 'test',
+ {"method": "fail",
+ "args": {"value": value}})
try:
- yield rpc.call_twisted(self.context,
- 'test', {"method": "fail",
- "args": {"value": value}})
+ rpc.call(self.context,
+ 'test',
+ {"method": "fail",
+ "args": {"value": value}})
self.fail("should have thrown rpc.RemoteError")
except rpc.RemoteError as exc:
self.assertEqual(int(exc.value), value)
@@ -89,13 +89,13 @@ class TestReceiver(object):
def echo(context, value):
"""Simply returns whatever value is sent in"""
logging.debug("Received %s", value)
- return defer.succeed(value)
+ return value
@staticmethod
def context(context, value):
"""Returns dictionary version of context"""
logging.debug("Received %s", context)
- return defer.succeed(context.to_dict())
+ return context.to_dict()
@staticmethod
def fail(context, value):
diff --git a/nova/tests/scheduler_unittest.py b/nova/tests/scheduler_unittest.py
index cb5fe6b9ca..d1756b8fb5 100644
--- a/nova/tests/scheduler_unittest.py
+++ b/nova/tests/scheduler_unittest.py
@@ -44,7 +44,7 @@ class TestDriver(driver.Scheduler):
return 'named_host'
-class SchedulerTestCase(test.TrialTestCase):
+class SchedulerTestCase(test.TestCase):
"""Test case for scheduler"""
def setUp(self):
super(SchedulerTestCase, self).setUp()
@@ -73,7 +73,7 @@ class SchedulerTestCase(test.TrialTestCase):
scheduler.named_method(ctxt, 'topic', num=7)
-class SimpleDriverTestCase(test.TrialTestCase):
+class SimpleDriverTestCase(test.TestCase):
"""Test case for simple driver"""
def setUp(self):
super(SimpleDriverTestCase, self).setUp()
@@ -122,12 +122,12 @@ class SimpleDriverTestCase(test.TrialTestCase):
'nova-compute',
'compute',
FLAGS.compute_manager)
- compute1.startService()
+ compute1.start()
compute2 = service.Service('host2',
'nova-compute',
'compute',
FLAGS.compute_manager)
- compute2.startService()
+ compute2.start()
hosts = self.scheduler.driver.hosts_up(self.context, 'compute')
self.assertEqual(len(hosts), 2)
compute1.kill()
@@ -139,12 +139,12 @@ class SimpleDriverTestCase(test.TrialTestCase):
'nova-compute',
'compute',
FLAGS.compute_manager)
- compute1.startService()
+ compute1.start()
compute2 = service.Service('host2',
'nova-compute',
'compute',
FLAGS.compute_manager)
- compute2.startService()
+ compute2.start()
instance_id1 = self._create_instance()
compute1.run_instance(self.context, instance_id1)
instance_id2 = self._create_instance()
@@ -162,12 +162,12 @@ class SimpleDriverTestCase(test.TrialTestCase):
'nova-compute',
'compute',
FLAGS.compute_manager)
- compute1.startService()
+ compute1.start()
compute2 = service.Service('host2',
'nova-compute',
'compute',
FLAGS.compute_manager)
- compute2.startService()
+ compute2.start()
instance_ids1 = []
instance_ids2 = []
for index in xrange(FLAGS.max_cores):
@@ -195,12 +195,12 @@ class SimpleDriverTestCase(test.TrialTestCase):
'nova-volume',
'volume',
FLAGS.volume_manager)
- volume1.startService()
+ volume1.start()
volume2 = service.Service('host2',
'nova-volume',
'volume',
FLAGS.volume_manager)
- volume2.startService()
+ volume2.start()
volume_id1 = self._create_volume()
volume1.create_volume(self.context, volume_id1)
volume_id2 = self._create_volume()
@@ -218,12 +218,12 @@ class SimpleDriverTestCase(test.TrialTestCase):
'nova-volume',
'volume',
FLAGS.volume_manager)
- volume1.startService()
+ volume1.start()
volume2 = service.Service('host2',
'nova-volume',
'volume',
FLAGS.volume_manager)
- volume2.startService()
+ volume2.start()
volume_ids1 = []
volume_ids2 = []
for index in xrange(FLAGS.max_gigabytes):
diff --git a/nova/tests/service_unittest.py b/nova/tests/service_unittest.py
index a268bc4fe4..47c092f8e2 100644
--- a/nova/tests/service_unittest.py
+++ b/nova/tests/service_unittest.py
@@ -22,9 +22,6 @@ Unit Tests for remote procedure calls using queue
import mox
-from twisted.application.app import startApplication
-from twisted.internet import defer
-
from nova import exception
from nova import flags
from nova import rpc
@@ -48,7 +45,7 @@ class ExtendedService(service.Service):
return 'service'
-class ServiceManagerTestCase(test.TrialTestCase):
+class ServiceManagerTestCase(test.TestCase):
"""Test cases for Services"""
def test_attribute_error_for_no_manager(self):
@@ -63,7 +60,7 @@ class ServiceManagerTestCase(test.TrialTestCase):
'test',
'test',
'nova.tests.service_unittest.FakeManager')
- serv.startService()
+ serv.start()
self.assertEqual(serv.test_method(), 'manager')
def test_override_manager_method(self):
@@ -71,11 +68,11 @@ class ServiceManagerTestCase(test.TrialTestCase):
'test',
'test',
'nova.tests.service_unittest.FakeManager')
- serv.startService()
+ serv.start()
self.assertEqual(serv.test_method(), 'service')
-class ServiceTestCase(test.TrialTestCase):
+class ServiceTestCase(test.TestCase):
"""Test cases for Services"""
def setUp(self):
@@ -94,8 +91,6 @@ class ServiceTestCase(test.TrialTestCase):
self.mox.StubOutWithMock(rpc,
'AdapterConsumer',
use_mock_anything=True)
- self.mox.StubOutWithMock(
- service.task, 'LoopingCall', use_mock_anything=True)
rpc.AdapterConsumer(connection=mox.IgnoreArg(),
topic=topic,
proxy=mox.IsA(service.Service)).AndReturn(
@@ -106,19 +101,8 @@ class ServiceTestCase(test.TrialTestCase):
proxy=mox.IsA(service.Service)).AndReturn(
rpc.AdapterConsumer)
- rpc.AdapterConsumer.attach_to_twisted()
- rpc.AdapterConsumer.attach_to_twisted()
-
- # Stub out looping call a bit needlessly since we don't have an easy
- # way to cancel it (yet) when the tests finishes
- service.task.LoopingCall(mox.IgnoreArg()).AndReturn(
- service.task.LoopingCall)
- service.task.LoopingCall.start(interval=mox.IgnoreArg(),
- now=mox.IgnoreArg())
- service.task.LoopingCall(mox.IgnoreArg()).AndReturn(
- service.task.LoopingCall)
- service.task.LoopingCall.start(interval=mox.IgnoreArg(),
- now=mox.IgnoreArg())
+ rpc.AdapterConsumer.attach_to_eventlet()
+ rpc.AdapterConsumer.attach_to_eventlet()
service_create = {'host': host,
'binary': binary,
@@ -136,14 +120,14 @@ class ServiceTestCase(test.TrialTestCase):
service_create).AndReturn(service_ref)
self.mox.ReplayAll()
- startApplication(app, False)
+ app.start()
+ app.stop()
self.assert_(app)
# We're testing sort of weird behavior in how report_state decides
# whether it is disconnected, it looks for a variable on itself called
# 'model_disconnected' and report_state doesn't really do much so this
# these are mostly just for coverage
- @defer.inlineCallbacks
def test_report_state_no_service(self):
host = 'foo'
binary = 'bar'
@@ -173,10 +157,9 @@ class ServiceTestCase(test.TrialTestCase):
binary,
topic,
'nova.tests.service_unittest.FakeManager')
- serv.startService()
- yield serv.report_state()
+ serv.start()
+ serv.report_state()
- @defer.inlineCallbacks
def test_report_state_newly_disconnected(self):
host = 'foo'
binary = 'bar'
@@ -204,11 +187,10 @@ class ServiceTestCase(test.TrialTestCase):
binary,
topic,
'nova.tests.service_unittest.FakeManager')
- serv.startService()
- yield serv.report_state()
+ serv.start()
+ serv.report_state()
self.assert_(serv.model_disconnected)
- @defer.inlineCallbacks
def test_report_state_newly_connected(self):
host = 'foo'
binary = 'bar'
@@ -238,8 +220,8 @@ class ServiceTestCase(test.TrialTestCase):
binary,
topic,
'nova.tests.service_unittest.FakeManager')
- serv.startService()
+ serv.start()
serv.model_disconnected = True
- yield serv.report_state()
+ serv.report_state()
self.assert_(not serv.model_disconnected)
diff --git a/nova/tests/validator_unittest.py b/nova/tests/validator_unittest.py
deleted file mode 100644
index b5f1c0667f..0000000000
--- a/nova/tests/validator_unittest.py
+++ /dev/null
@@ -1,42 +0,0 @@
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
-# Copyright 2010 United States Government as represented by the
-# Administrator of the National Aeronautics and Space Administration.
-# All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import logging
-import unittest
-
-from nova import flags
-from nova import test
-from nova import validate
-
-
-class ValidationTestCase(test.TrialTestCase):
- def setUp(self):
- super(ValidationTestCase, self).setUp()
-
- def tearDown(self):
- super(ValidationTestCase, self).tearDown()
-
- def test_type_validation(self):
- self.assertTrue(type_case("foo", 5, 1))
- self.assertRaises(TypeError, type_case, "bar", "5", 1)
- self.assertRaises(TypeError, type_case, None, 5, 1)
-
-
-@validate.typetest(instanceid=str, size=int, number_of_instances=int)
-def type_case(instanceid, size, number_of_instances):
- return True
diff --git a/nova/tests/virt_unittest.py b/nova/tests/virt_unittest.py
index 9e9d84892f..9bbba4ba9f 100644
--- a/nova/tests/virt_unittest.py
+++ b/nova/tests/virt_unittest.py
@@ -30,7 +30,7 @@ FLAGS = flags.FLAGS
flags.DECLARE('instances_path', 'nova.compute.manager')
-class LibvirtConnTestCase(test.TrialTestCase):
+class LibvirtConnTestCase(test.TestCase):
def setUp(self):
super(LibvirtConnTestCase, self).setUp()
self.manager = manager.AuthManager()
@@ -207,7 +207,7 @@ class LibvirtConnTestCase(test.TrialTestCase):
self.manager.delete_user(self.user)
-class NWFilterTestCase(test.TrialTestCase):
+class NWFilterTestCase(test.TestCase):
def setUp(self):
super(NWFilterTestCase, self).setUp()
@@ -319,7 +319,7 @@ class NWFilterTestCase(test.TrialTestCase):
'project_id': 'fake'})
inst_id = instance_ref['id']
- def _ensure_all_called(_):
+ def _ensure_all_called():
instance_filter = 'nova-instance-%s' % instance_ref['name']
secgroup_filter = 'nova-secgroup-%s' % self.security_group['id']
for required in [secgroup_filter, 'allow-dhcp-server',
@@ -337,7 +337,6 @@ class NWFilterTestCase(test.TrialTestCase):
instance = db.instance_get(self.context, inst_id)
d = self.fw.setup_nwfilters_for_instance(instance)
- d.addCallback(_ensure_all_called)
- d.addCallback(lambda _: self.teardown_security_group())
-
+ _ensure_all_called()
+ self.teardown_security_group()
return d
diff --git a/nova/tests/volume_unittest.py b/nova/tests/volume_unittest.py
index 12321a96fb..b13455fb07 100644
--- a/nova/tests/volume_unittest.py
+++ b/nova/tests/volume_unittest.py
@@ -21,8 +21,6 @@ Tests for Volume Code.
"""
import logging
-from twisted.internet import defer
-
from nova import context
from nova import exception
from nova import db
@@ -33,7 +31,7 @@ from nova import utils
FLAGS = flags.FLAGS
-class VolumeTestCase(test.TrialTestCase):
+class VolumeTestCase(test.TestCase):
"""Test Case for volumes."""
def setUp(self):
@@ -56,51 +54,48 @@ class VolumeTestCase(test.TrialTestCase):
vol['attach_status'] = "detached"
return db.volume_create(context.get_admin_context(), vol)['id']
- @defer.inlineCallbacks
def test_create_delete_volume(self):
"""Test volume can be created and deleted."""
volume_id = self._create_volume()
- yield self.volume.create_volume(self.context, volume_id)
+ self.volume.create_volume(self.context, volume_id)
self.assertEqual(volume_id, db.volume_get(context.get_admin_context(),
volume_id).id)
- yield self.volume.delete_volume(self.context, volume_id)
+ self.volume.delete_volume(self.context, volume_id)
self.assertRaises(exception.NotFound,
db.volume_get,
self.context,
volume_id)
- @defer.inlineCallbacks
def test_too_big_volume(self):
"""Ensure failure if a too large of a volume is requested."""
# FIXME(vish): validation needs to move into the data layer in
# volume_create
- defer.returnValue(True)
+ return True
try:
volume_id = self._create_volume('1001')
- yield self.volume.create_volume(self.context, volume_id)
+ self.volume.create_volume(self.context, volume_id)
self.fail("Should have thrown TypeError")
except TypeError:
pass
- @defer.inlineCallbacks
def test_too_many_volumes(self):
"""Ensure that NoMoreTargets is raised when we run out of volumes."""
vols = []
total_slots = FLAGS.iscsi_num_targets
for _index in xrange(total_slots):
volume_id = self._create_volume()
- yield self.volume.create_volume(self.context, volume_id)
+ self.volume.create_volume(self.context, volume_id)
vols.append(volume_id)
volume_id = self._create_volume()
- self.assertFailure(self.volume.create_volume(self.context,
- volume_id),
- db.NoMoreTargets)
+ self.assertRaises(db.NoMoreTargets,
+ self.volume.create_volume,
+ self.context,
+ volume_id)
db.volume_destroy(context.get_admin_context(), volume_id)
for volume_id in vols:
- yield self.volume.delete_volume(self.context, volume_id)
+ self.volume.delete_volume(self.context, volume_id)
- @defer.inlineCallbacks
def test_run_attach_detach_volume(self):
"""Make sure volume can be attached and detached from instance."""
inst = {}
@@ -115,15 +110,15 @@ class VolumeTestCase(test.TrialTestCase):
instance_id = db.instance_create(self.context, inst)['id']
mountpoint = "/dev/sdf"
volume_id = self._create_volume()
- yield self.volume.create_volume(self.context, volume_id)
+ self.volume.create_volume(self.context, volume_id)
if FLAGS.fake_tests:
db.volume_attached(self.context, volume_id, instance_id,
mountpoint)
else:
- yield self.compute.attach_volume(self.context,
- instance_id,
- volume_id,
- mountpoint)
+ self.compute.attach_volume(self.context,
+ instance_id,
+ volume_id,
+ mountpoint)
vol = db.volume_get(context.get_admin_context(), volume_id)
self.assertEqual(vol['status'], "in-use")
self.assertEqual(vol['attach_status'], "attached")
@@ -131,25 +126,26 @@ class VolumeTestCase(test.TrialTestCase):
instance_ref = db.volume_get_instance(self.context, volume_id)
self.assertEqual(instance_ref['id'], instance_id)
- self.assertFailure(self.volume.delete_volume(self.context, volume_id),
- exception.Error)
+ self.assertRaises(exception.Error,
+ self.volume.delete_volume,
+ self.context,
+ volume_id)
if FLAGS.fake_tests:
db.volume_detached(self.context, volume_id)
else:
- yield self.compute.detach_volume(self.context,
- instance_id,
- volume_id)
+ self.compute.detach_volume(self.context,
+ instance_id,
+ volume_id)
vol = db.volume_get(self.context, volume_id)
self.assertEqual(vol['status'], "available")
- yield self.volume.delete_volume(self.context, volume_id)
+ self.volume.delete_volume(self.context, volume_id)
self.assertRaises(exception.Error,
db.volume_get,
self.context,
volume_id)
db.instance_destroy(self.context, instance_id)
- @defer.inlineCallbacks
def test_concurrent_volumes_get_different_targets(self):
"""Ensure multiple concurrent volumes get different targets."""
volume_ids = []
@@ -164,15 +160,11 @@ class VolumeTestCase(test.TrialTestCase):
self.assert_(iscsi_target not in targets)
targets.append(iscsi_target)
logging.debug("Target %s allocated", iscsi_target)
- deferreds = []
total_slots = FLAGS.iscsi_num_targets
for _index in xrange(total_slots):
volume_id = self._create_volume()
d = self.volume.create_volume(self.context, volume_id)
- d.addCallback(_check)
- d.addErrback(self.fail)
- deferreds.append(d)
- yield defer.DeferredList(deferreds)
+ _check(d)
for volume_id in volume_ids:
self.volume.delete_volume(self.context, volume_id)
diff --git a/nova/utils.py b/nova/utils.py
index 142584df8b..ea1f04ca79 100644
--- a/nova/utils.py
+++ b/nova/utils.py
@@ -31,7 +31,8 @@ import socket
import sys
from xml.sax import saxutils
-from twisted.internet.threads import deferToThread
+from eventlet import event
+from eventlet import greenthread
from nova import exception
from nova import flags
@@ -75,7 +76,7 @@ def fetchfile(url, target):
def execute(cmd, process_input=None, addl_env=None, check_exit_code=True):
- logging.debug("Running cmd: %s", cmd)
+ logging.debug("Running cmd (subprocess): %s", cmd)
env = os.environ.copy()
if addl_env:
env.update(addl_env)
@@ -95,6 +96,10 @@ def execute(cmd, process_input=None, addl_env=None, check_exit_code=True):
stdout=stdout,
stderr=stderr,
cmd=cmd)
+ # NOTE(termie): this appears to be necessary to let the subprocess call
+ # clean something up in between calls, without it two
+ # execute calls in a row hangs the second one
+ greenthread.sleep(0)
return result
@@ -123,13 +128,7 @@ def debug(arg):
def runthis(prompt, cmd, check_exit_code=True):
logging.debug("Running %s" % (cmd))
- exit_code = subprocess.call(cmd.split(" "))
- logging.debug(prompt % (exit_code))
- if check_exit_code and exit_code != 0:
- raise ProcessExecutionError(exit_code=exit_code,
- stdout=None,
- stderr=None,
- cmd=cmd)
+ rv, err = execute(cmd, check_exit_code=check_exit_code)
def generate_uid(topic, size=8):
@@ -224,10 +223,41 @@ class LazyPluggable(object):
return getattr(backend, key)
-def deferredToThread(f):
- def g(*args, **kwargs):
- return deferToThread(f, *args, **kwargs)
- return g
+class LoopingCall(object):
+ def __init__(self, f=None, *args, **kw):
+ self.args = args
+ self.kw = kw
+ self.f = f
+ self._running = False
+
+ def start(self, interval, now=True):
+ self._running = True
+ done = event.Event()
+
+ def _inner():
+ if not now:
+ greenthread.sleep(interval)
+ try:
+ while self._running:
+ self.f(*self.args, **self.kw)
+ greenthread.sleep(interval)
+ except Exception:
+ logging.exception('in looping call')
+ done.send_exception(*sys.exc_info())
+ return
+
+ done.send(True)
+
+ self.done = done
+
+ greenthread.spawn(_inner)
+ return self.done
+
+ def stop(self):
+ self._running = False
+
+ def wait(self):
+ return self.done.wait()
def xhtml_escape(value):
diff --git a/nova/validate.py b/nova/validate.py
deleted file mode 100644
index 7ea27daa6e..0000000000
--- a/nova/validate.py
+++ /dev/null
@@ -1,94 +0,0 @@
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
-# Copyright 2010 United States Government as represented by the
-# Administrator of the National Aeronautics and Space Administration.
-# All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-"""Decorators for argument validation, courtesy of
-http://rmi.net/~lutz/rangetest.html"""
-
-
-def rangetest(**argchecks):
- """Validate ranges for both + defaults"""
-
- def onDecorator(func):
- """onCall remembers func and argchecks"""
- import sys
- code = func.__code__ if sys.version_info[0] == 3 else func.func_code
- allargs = code.co_varnames[:code.co_argcount]
- funcname = func.__name__
-
- def onCall(*pargs, **kargs):
- # all pargs match first N args by position
- # the rest must be in kargs or omitted defaults
- positionals = list(allargs)
- positionals = positionals[:len(pargs)]
-
- for (argname, (low, high)) in argchecks.items():
- # for all args to be checked
- if argname in kargs:
- # was passed by name
- if float(kargs[argname]) < low or \
- float(kargs[argname]) > high:
- errmsg = '{0} argument "{1}" not in {2}..{3}'
- errmsg = errmsg.format(funcname, argname, low, high)
- raise TypeError(errmsg)
-
- elif argname in positionals:
- # was passed by position
- position = positionals.index(argname)
- if float(pargs[position]) < low or \
- float(pargs[position]) > high:
- errmsg = '{0} argument "{1}" with value of {4} ' \
- 'not in {2}..{3}'
- errmsg = errmsg.format(funcname, argname, low, high,
- pargs[position])
- raise TypeError(errmsg)
- else:
- pass
-
- return func(*pargs, **kargs) # okay: run original call
- return onCall
- return onDecorator
-
-
-def typetest(**argchecks):
- def onDecorator(func):
- import sys
- code = func.__code__ if sys.version_info[0] == 3 else func.func_code
- allargs = code.co_varnames[:code.co_argcount]
- funcname = func.__name__
-
- def onCall(*pargs, **kargs):
- positionals = list(allargs)[:len(pargs)]
- for (argname, typeof) in argchecks.items():
- if argname in kargs:
- if not isinstance(kargs[argname], typeof):
- errmsg = '{0} argument "{1}" not of type {2}'
- errmsg = errmsg.format(funcname, argname, typeof)
- raise TypeError(errmsg)
- elif argname in positionals:
- position = positionals.index(argname)
- if not isinstance(pargs[position], typeof):
- errmsg = '{0} argument "{1}" with value of {2} ' \
- 'not of type {3}'
- errmsg = errmsg.format(funcname, argname,
- pargs[position], typeof)
- raise TypeError(errmsg)
- else:
- pass
- return func(*pargs, **kargs)
- return onCall
- return onDecorator
diff --git a/nova/virt/fake.py b/nova/virt/fake.py
index f855523d36..77bc926c20 100644
--- a/nova/virt/fake.py
+++ b/nova/virt/fake.py
@@ -25,8 +25,6 @@ semantics of real hypervisor connections.
"""
-from twisted.internet import defer
-
from nova import exception
from nova.compute import power_state
@@ -107,7 +105,6 @@ class FakeConnection(object):
fake_instance = FakeInstance()
self.instances[instance.name] = fake_instance
fake_instance._state = power_state.RUNNING
- return defer.succeed(None)
def reboot(self, instance):
"""
@@ -119,19 +116,19 @@ class FakeConnection(object):
The work will be done asynchronously. This function returns a
Deferred that allows the caller to detect when it is complete.
"""
- return defer.succeed(None)
+ pass
def rescue(self, instance):
"""
Rescue the specified instance.
"""
- return defer.succeed(None)
+ pass
def unrescue(self, instance):
"""
Unrescue the specified instance.
"""
- return defer.succeed(None)
+ pass
def destroy(self, instance):
"""
@@ -144,7 +141,6 @@ class FakeConnection(object):
Deferred that allows the caller to detect when it is complete.
"""
del self.instances[instance.name]
- return defer.succeed(None)
def attach_volume(self, instance_name, device_path, mountpoint):
"""Attach the disk at device_path to the instance at mountpoint"""
diff --git a/nova/virt/images.py b/nova/virt/images.py
index 981aa5cf36..1c9b2e0934 100644
--- a/nova/virt/images.py
+++ b/nova/virt/images.py
@@ -26,7 +26,7 @@ import time
import urlparse
from nova import flags
-from nova import process
+from nova import utils
from nova.auth import manager
from nova.auth import signer
from nova.objectstore import image
@@ -50,7 +50,7 @@ def _fetch_s3_image(image, path, user, project):
# This should probably move somewhere else, like e.g. a download_as
# method on User objects and at the same time get rewritten to use
- # twisted web client.
+ # a web client.
headers = {}
headers['Date'] = time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime())
@@ -63,15 +63,16 @@ def _fetch_s3_image(image, path, user, project):
cmd = ['/usr/bin/curl', '--fail', '--silent', url]
for (k, v) in headers.iteritems():
- cmd += ['-H', '%s: %s' % (k, v)]
+ cmd += ['-H', '"%s: %s"' % (k, v)]
cmd += ['-o', path]
- return process.SharedPool().execute(executable=cmd[0], args=cmd[1:])
+ cmd_out = ' '.join(cmd)
+ return utils.execute(cmd_out)
def _fetch_local_image(image, path, user, project):
source = _image_path('%s/image' % image)
- return process.simple_execute('cp %s %s' % (source, path))
+ return utils.execute('cp %s %s' % (source, path))
def _image_path(path):
diff --git a/nova/virt/libvirt_conn.py b/nova/virt/libvirt_conn.py
index 3529be333f..75c8eaa4c5 100644
--- a/nova/virt/libvirt_conn.py
+++ b/nova/virt/libvirt_conn.py
@@ -40,16 +40,15 @@ import logging
import os
import shutil
+from eventlet import event
+from eventlet import tpool
+
import IPy
-from twisted.internet import defer
-from twisted.internet import task
-from twisted.internet import threads
from nova import context
from nova import db
from nova import exception
from nova import flags
-from nova import process
from nova import utils
#from nova.api import context
from nova.auth import manager
@@ -154,14 +153,12 @@ class LibvirtConnection(object):
except Exception as _err:
pass
# If the instance is already terminated, we're still happy
- d = defer.Deferred()
- if cleanup:
- d.addCallback(lambda _: self._cleanup(instance))
- # FIXME: What does this comment mean?
- # TODO(termie): short-circuit me for tests
- # WE'LL save this for when we do shutdown,
+
+ done = event.Event()
+
+ # We'll save this for when we do shutdown,
# instead of destroy - but destroy returns immediately
- timer = task.LoopingCall(f=None)
+ timer = utils.LoopingCall(f=None)
def _wait_for_shutdown():
try:
@@ -170,17 +167,26 @@ class LibvirtConnection(object):
instance['id'], state)
if state == power_state.SHUTDOWN:
timer.stop()
- d.callback(None)
except Exception:
db.instance_set_state(context.get_admin_context(),
instance['id'],
power_state.SHUTDOWN)
timer.stop()
- d.callback(None)
timer.f = _wait_for_shutdown
- timer.start(interval=0.5, now=True)
- return d
+ timer_done = timer.start(interval=0.5, now=True)
+
+ # NOTE(termie): this is strictly superfluous (we could put the
+ # cleanup code in the timer), but this emulates the
+ # previous model so I am keeping it around until
+ # everything has been vetted a bit
+ def _wait_for_timer():
+ timer_done.wait()
+ self._cleanup(instance)
+ done.send()
+
+ greenthread.spawn(_wait_for_timer)
+ return done
def _cleanup(self, instance):
target = os.path.join(FLAGS.instances_path, instance['name'])
@@ -189,7 +195,6 @@ class LibvirtConnection(object):
if os.path.exists(target):
shutil.rmtree(target)
- @defer.inlineCallbacks
@exception.wrap_exception
def attach_volume(self, instance_name, device_path, mountpoint):
virt_dom = self._conn.lookupByName(instance_name)
@@ -200,7 +205,6 @@ class LibvirtConnection(object):
<target dev='%s' bus='virtio'/>
</disk>""" % (device_path, mount_device)
virt_dom.attachDevice(xml)
- yield
def _get_disk_xml(self, xml, device):
"""Returns the xml for the disk mounted at device"""
@@ -222,7 +226,6 @@ class LibvirtConnection(object):
if doc != None:
doc.freeDoc()
- @defer.inlineCallbacks
@exception.wrap_exception
def detach_volume(self, instance_name, mountpoint):
virt_dom = self._conn.lookupByName(instance_name)
@@ -231,17 +234,13 @@ class LibvirtConnection(object):
if not xml:
raise exception.NotFound("No disk at %s" % mount_device)
virt_dom.detachDevice(xml)
- yield
- @defer.inlineCallbacks
@exception.wrap_exception
def reboot(self, instance):
- yield self.destroy(instance, False)
+ self.destroy(instance, False)
xml = self.to_xml(instance)
- yield self._conn.createXML(xml, 0)
-
- d = defer.Deferred()
- timer = task.LoopingCall(f=None)
+ self._conn.createXML(xml, 0)
+ timer = utils.LoopingCall(f=None)
def _wait_for_reboot():
try:
@@ -251,33 +250,28 @@ class LibvirtConnection(object):
if state == power_state.RUNNING:
logging.debug('instance %s: rebooted', instance['name'])
timer.stop()
- d.callback(None)
except Exception, exn:
logging.error('_wait_for_reboot failed: %s', exn)
db.instance_set_state(context.get_admin_context(),
instance['id'],
power_state.SHUTDOWN)
timer.stop()
- d.callback(None)
timer.f = _wait_for_reboot
- timer.start(interval=0.5, now=True)
- yield d
+ return timer.start(interval=0.5, now=True)
- @defer.inlineCallbacks
@exception.wrap_exception
def rescue(self, instance):
- yield self.destroy(instance, False)
+ self.destroy(instance, False)
xml = self.to_xml(instance, rescue=True)
rescue_images = {'image_id': FLAGS.rescue_image_id,
'kernel_id': FLAGS.rescue_kernel_id,
'ramdisk_id': FLAGS.rescue_ramdisk_id}
- yield self._create_image(instance, xml, 'rescue-', rescue_images)
- yield self._conn.createXML(xml, 0)
+ self._create_image(instance, xml, 'rescue-', rescue_images)
+ self._conn.createXML(xml, 0)
- d = defer.Deferred()
- timer = task.LoopingCall(f=None)
+ timer = utils.LoopingCall(f=None)
def _wait_for_rescue():
try:
@@ -286,27 +280,22 @@ class LibvirtConnection(object):
if state == power_state.RUNNING:
logging.debug('instance %s: rescued', instance['name'])
timer.stop()
- d.callback(None)
except Exception, exn:
logging.error('_wait_for_rescue failed: %s', exn)
db.instance_set_state(None,
instance['id'],
power_state.SHUTDOWN)
timer.stop()
- d.callback(None)
timer.f = _wait_for_rescue
- timer.start(interval=0.5, now=True)
- yield d
+ return timer.start(interval=0.5, now=True)
- @defer.inlineCallbacks
@exception.wrap_exception
def unrescue(self, instance):
# NOTE(vish): Because reboot destroys and recreates an instance using
# the normal xml file, we can just call reboot here
- yield self.reboot(instance)
+ self.reboot(instance)
- @defer.inlineCallbacks
@exception.wrap_exception
def spawn(self, instance):
xml = self.to_xml(instance)
@@ -314,14 +303,12 @@ class LibvirtConnection(object):
instance['id'],
power_state.NOSTATE,
'launching')
- yield NWFilterFirewall(self._conn).\
- setup_nwfilters_for_instance(instance)
- yield self._create_image(instance, xml)
- yield self._conn.createXML(xml, 0)
+ NWFilterFirewall(self._conn).setup_nwfilters_for_instance(instance)
+ self._create_image(instance, xml)
+ self._conn.createXML(xml, 0)
logging.debug("instance %s: is running", instance['name'])
- local_d = defer.Deferred()
- timer = task.LoopingCall(f=None)
+ timer = utils.LoopingCall(f=None)
def _wait_for_boot():
try:
@@ -331,7 +318,6 @@ class LibvirtConnection(object):
if state == power_state.RUNNING:
logging.debug('instance %s: booted', instance['name'])
timer.stop()
- local_d.callback(None)
except:
logging.exception('instance %s: failed to boot',
instance['name'])
@@ -339,10 +325,9 @@ class LibvirtConnection(object):
instance['id'],
power_state.SHUTDOWN)
timer.stop()
- local_d.callback(None)
+
timer.f = _wait_for_boot
- timer.start(interval=0.5, now=True)
- yield local_d
+ return timer.start(interval=0.5, now=True)
def _flush_xen_console(self, virsh_output):
logging.info('virsh said: %r' % (virsh_output,))
@@ -350,10 +335,9 @@ class LibvirtConnection(object):
if virsh_output.startswith('/dev/'):
logging.info('cool, it\'s a device')
- d = process.simple_execute("sudo dd if=%s iflag=nonblock" %
- virsh_output, check_exit_code=False)
- d.addCallback(lambda r: r[0])
- return d
+ out, err = utils.execute("sudo dd if=%s iflag=nonblock" %
+ virsh_output, check_exit_code=False)
+ return out
else:
return ''
@@ -373,21 +357,20 @@ class LibvirtConnection(object):
def get_console_output(self, instance):
console_log = os.path.join(FLAGS.instances_path, instance['name'],
'console.log')
- d = process.simple_execute('sudo chown %d %s' % (os.getuid(),
- console_log))
+
+ utils.execute('sudo chown %d %s' % (os.getuid(), console_log))
+
if FLAGS.libvirt_type == 'xen':
- # Xen is spethial
- d.addCallback(lambda _:
- process.simple_execute("virsh ttyconsole %s" %
- instance['name']))
- d.addCallback(self._flush_xen_console)
- d.addCallback(self._append_to_file, console_log)
+ # Xen is special
+ virsh_output = utils.execute("virsh ttyconsole %s" %
+ instance['name'])
+ data = self._flush_xen_console(virsh_output)
+ fpath = self._append_to_file(data, console_log)
else:
- d.addCallback(lambda _: defer.succeed(console_log))
- d.addCallback(self._dump_file)
- return d
+ fpath = console_log
+
+ return self._dump_file(fpath)
- @defer.inlineCallbacks
def _create_image(self, inst, libvirt_xml, prefix='', disk_images=None):
# syntactic nicety
basepath = lambda fname = '', prefix = prefix: os.path.join(
@@ -396,8 +379,8 @@ class LibvirtConnection(object):
prefix + fname)
# ensure directories exist and are writable
- yield process.simple_execute('mkdir -p %s' % basepath(prefix=''))
- yield process.simple_execute('chmod 0777 %s' % basepath(prefix=''))
+ utils.execute('mkdir -p %s' % basepath(prefix=''))
+ utils.execute('chmod 0777 %s' % basepath(prefix=''))
# TODO(termie): these are blocking calls, it would be great
# if they weren't.
@@ -418,22 +401,22 @@ class LibvirtConnection(object):
'kernel_id': inst['kernel_id'],
'ramdisk_id': inst['ramdisk_id']}
if not os.path.exists(basepath('disk')):
- yield images.fetch(inst.image_id, basepath('disk-raw'), user,
- project)
+ images.fetch(inst.image_id, basepath('disk-raw'), user,
+ project)
if inst['kernel_id']:
if not os.path.exists(basepath('kernel')):
- yield images.fetch(inst['kernel_id'], basepath('kernel'),
- user, project)
+ images.fetch(inst['kernel_id'], basepath('kernel'),
+ user, project)
if inst['ramdisk_id']:
if not os.path.exists(basepath('ramdisk')):
- yield images.fetch(inst['ramdisk_id'], basepath('ramdisk'),
- user, project)
+ images.fetch(inst['ramdisk_id'], basepath('ramdisk'),
+ user, project)
- execute = lambda cmd, process_input = None, check_exit_code = True: \
- process.simple_execute(cmd=cmd,
- process_input=process_input,
- check_exit_code=check_exit_code)
+ def execute(cmd, process_input=None, check_exit_code=True):
+ return utils.execute(cmd=cmd,
+ process_input=process_input,
+ check_exit_code=check_exit_code)
# For now, we assume that if we're not using a kernel, we're using a
# partitioned disk image where the target partition is the first
@@ -463,9 +446,9 @@ class LibvirtConnection(object):
logging.info('instance %s: injecting net into image %s',
inst['name'], inst.image_id)
try:
- yield disk.inject_data(basepath('disk-raw'), key, net,
- partition=target_partition,
- execute=execute)
+ disk.inject_data(basepath('disk-raw'), key, net,
+ partition=target_partition,
+ execute=execute)
except Exception as e:
# This could be a windows image, or a vmdk format disk
logging.warn('instance %s: ignoring error injecting data'
@@ -474,7 +457,7 @@ class LibvirtConnection(object):
if inst['kernel_id']:
if os.path.exists(basepath('disk')):
- yield process.simple_execute('rm -f %s' % basepath('disk'))
+ utils.execute('rm -f %s' % basepath('disk'))
local_bytes = (instance_types.INSTANCE_TYPES[inst.instance_type]
['local_gb']
@@ -485,15 +468,14 @@ class LibvirtConnection(object):
resize = False
if inst['kernel_id']:
- yield disk.partition(basepath('disk-raw'), basepath('disk'),
- local_bytes, resize, execute=execute)
+ disk.partition(basepath('disk-raw'), basepath('disk'),
+ local_bytes, resize, execute=execute)
else:
os.rename(basepath('disk-raw'), basepath('disk'))
- yield disk.extend(basepath('disk'), local_bytes, execute=execute)
+ disk.extend(basepath('disk'), local_bytes, execute=execute)
if FLAGS.libvirt_type == 'uml':
- yield process.simple_execute('sudo chown root %s' %
- basepath('disk'))
+ utils.execute('sudo chown root %s' % basepath('disk'))
def to_xml(self, instance, rescue=False):
# TODO(termie): cache?
@@ -758,15 +740,15 @@ class NWFilterFirewall(object):
def _define_filter(self, xml):
if callable(xml):
xml = xml()
- d = threads.deferToThread(self._conn.nwfilterDefineXML, xml)
- return d
+
+ # execute in a native thread and block current greenthread until done
+ tpool.execute(self._conn.nwfilterDefineXML, xml)
@staticmethod
def _get_net_and_mask(cidr):
net = IPy.IP(cidr)
return str(net.net()), str(net.netmask())
- @defer.inlineCallbacks
def setup_nwfilters_for_instance(self, instance):
"""
Creates an NWFilter for the given instance. In the process,
@@ -774,10 +756,10 @@ class NWFilterFirewall(object):
the base filter are all in place.
"""
- yield self._define_filter(self.nova_base_ipv4_filter)
- yield self._define_filter(self.nova_base_ipv6_filter)
- yield self._define_filter(self.nova_dhcp_filter)
- yield self._define_filter(self.nova_base_filter)
+ self._define_filter(self.nova_base_ipv4_filter)
+ self._define_filter(self.nova_base_ipv6_filter)
+ self._define_filter(self.nova_dhcp_filter)
+ self._define_filter(self.nova_base_filter)
nwfilter_xml = "<filter name='nova-instance-%s' chain='root'>\n" \
" <filterref filter='nova-base' />\n" % \
@@ -789,20 +771,19 @@ class NWFilterFirewall(object):
net, mask = self._get_net_and_mask(network_ref['cidr'])
project_filter = self.nova_project_filter(instance['project_id'],
net, mask)
- yield self._define_filter(project_filter)
+ self._define_filter(project_filter)
nwfilter_xml += " <filterref filter='nova-project-%s' />\n" % \
instance['project_id']
for security_group in instance.security_groups:
- yield self.ensure_security_group_filter(security_group['id'])
+ self.ensure_security_group_filter(security_group['id'])
nwfilter_xml += " <filterref filter='nova-secgroup-%d' />\n" % \
security_group['id']
nwfilter_xml += "</filter>"
- yield self._define_filter(nwfilter_xml)
- return
+ self._define_filter(nwfilter_xml)
def ensure_security_group_filter(self, security_group_id):
return self._define_filter(
diff --git a/nova/virt/xenapi/network_utils.py b/nova/virt/xenapi/network_utils.py
index 8cb4cce3a7..0129543944 100644
--- a/nova/virt/xenapi/network_utils.py
+++ b/nova/virt/xenapi/network_utils.py
@@ -20,8 +20,6 @@ records and their attributes like bridges, PIFs, QoS, as well as
their lookup functions.
"""
-from twisted.internet import defer
-
class NetworkHelper():
"""
@@ -31,14 +29,12 @@ class NetworkHelper():
return
@classmethod
- @defer.inlineCallbacks
def find_network_with_bridge(cls, session, bridge):
- """ Return the network on which the bridge is attached, if found """
+ """ Return the network on which the bridge is attached, if found."""
expr = 'field "bridge" = "%s"' % bridge
- networks = yield session.call_xenapi('network.get_all_records_where',
- expr)
+ networks = session.call_xenapi('network.get_all_records_where', expr)
if len(networks) == 1:
- defer.returnValue(networks.keys()[0])
+ return networks.keys()[0]
elif len(networks) > 1:
raise Exception('Found non-unique network for bridge %s' % bridge)
else:
diff --git a/nova/virt/xenapi/vm_utils.py b/nova/virt/xenapi/vm_utils.py
index 77edb576e8..2f5d78e759 100644
--- a/nova/virt/xenapi/vm_utils.py
+++ b/nova/virt/xenapi/vm_utils.py
@@ -21,18 +21,16 @@ their attributes like VDIs, VIFs, as well as their lookup functions.
import logging
import urllib
-
-from twisted.internet import defer
from xml.dom import minidom
from nova import flags
from nova import utils
-
from nova.auth.manager import AuthManager
from nova.compute import instance_types
from nova.compute import power_state
from nova.virt import images
+
FLAGS = flags.FLAGS
XENAPI_POWER_STATE = {
@@ -64,7 +62,6 @@ class VMHelper():
XenAPI = __import__('XenAPI')
@classmethod
- @defer.inlineCallbacks
def create_vm(cls, session, instance, kernel, ramdisk):
"""Create a VM record. Returns a Deferred that gives the new
VM reference."""
@@ -102,12 +99,11 @@ class VMHelper():
'other_config': {},
}
logging.debug('Created VM %s...', instance.name)
- vm_ref = yield session.call_xenapi('VM.create', rec)
+ vm_ref = session.call_xenapi('VM.create', rec)
logging.debug('Created VM %s as %s.', instance.name, vm_ref)
- defer.returnValue(vm_ref)
+ return vm_ref
@classmethod
- @defer.inlineCallbacks
def create_vbd(cls, session, vm_ref, vdi_ref, userdevice, bootable):
"""Create a VBD record. Returns a Deferred that gives the new
VBD reference."""
@@ -126,13 +122,12 @@ class VMHelper():
vbd_rec['qos_algorithm_params'] = {}
vbd_rec['qos_supported_algorithms'] = []
logging.debug('Creating VBD for VM %s, VDI %s ... ', vm_ref, vdi_ref)
- vbd_ref = yield session.call_xenapi('VBD.create', vbd_rec)
+ vbd_ref = session.call_xenapi('VBD.create', vbd_rec)
logging.debug('Created VBD %s for VM %s, VDI %s.', vbd_ref, vm_ref,
vdi_ref)
- defer.returnValue(vbd_ref)
+ return vbd_ref
@classmethod
- @defer.inlineCallbacks
def create_vif(cls, session, vm_ref, network_ref, mac_address):
"""Create a VIF record. Returns a Deferred that gives the new
VIF reference."""
@@ -148,13 +143,12 @@ class VMHelper():
vif_rec['qos_algorithm_params'] = {}
logging.debug('Creating VIF for VM %s, network %s ... ', vm_ref,
network_ref)
- vif_ref = yield session.call_xenapi('VIF.create', vif_rec)
+ vif_ref = session.call_xenapi('VIF.create', vif_rec)
logging.debug('Created VIF %s for VM %s, network %s.', vif_ref,
vm_ref, network_ref)
- defer.returnValue(vif_ref)
+ return vif_ref
@classmethod
- @defer.inlineCallbacks
def fetch_image(cls, session, image, user, project, use_sr):
"""use_sr: True to put the image as a VDI in an SR, False to place
it on dom0's filesystem. The former is for VM disks, the latter for
@@ -171,12 +165,11 @@ class VMHelper():
args['password'] = user.secret
if use_sr:
args['add_partition'] = 'true'
- task = yield session.async_call_plugin('objectstore', fn, args)
- uuid = yield session.wait_for_task(task)
- defer.returnValue(uuid)
+ task = session.async_call_plugin('objectstore', fn, args)
+ uuid = session.wait_for_task(task)
+ return uuid
@classmethod
- @utils.deferredToThread
def lookup(cls, session, i):
""" Look the instance i up, and returns it if available """
return VMHelper.lookup_blocking(session, i)
@@ -194,7 +187,6 @@ class VMHelper():
return vms[0]
@classmethod
- @utils.deferredToThread
def lookup_vm_vdis(cls, session, vm):
""" Look for the VDIs that are attached to the VM """
return VMHelper.lookup_vm_vdis_blocking(session, vm)
diff --git a/nova/virt/xenapi/vmops.py b/nova/virt/xenapi/vmops.py
index 9bfd072671..3034df9e1d 100644
--- a/nova/virt/xenapi/vmops.py
+++ b/nova/virt/xenapi/vmops.py
@@ -20,8 +20,6 @@ Management class for VM-related functions (spawn, reboot, etc).
import logging
-from twisted.internet import defer
-
from nova import db
from nova import context
@@ -49,10 +47,9 @@ class VMOps(object):
return [self._session.get_xenapi().VM.get_name_label(vm) \
for vm in self._session.get_xenapi().VM.get_all()]
- @defer.inlineCallbacks
def spawn(self, instance):
""" Create VM instance """
- vm = yield VMHelper.lookup(self._session, instance.name)
+ vm = VMHelper.lookup(self._session, instance.name)
if vm is not None:
raise Exception('Attempted to create non-unique name %s' %
instance.name)
@@ -60,66 +57,63 @@ class VMOps(object):
bridge = db.project_get_network(context.get_admin_context(),
instance.project_id).bridge
network_ref = \
- yield NetworkHelper.find_network_with_bridge(self._session, bridge)
+ NetworkHelper.find_network_with_bridge(self._session, bridge)
user = AuthManager().get_user(instance.user_id)
project = AuthManager().get_project(instance.project_id)
- vdi_uuid = yield VMHelper.fetch_image(self._session,
- instance.image_id, user, project, True)
- kernel = yield VMHelper.fetch_image(self._session,
- instance.kernel_id, user, project, False)
- ramdisk = yield VMHelper.fetch_image(self._session,
- instance.ramdisk_id, user, project, False)
- vdi_ref = yield self._session.call_xenapi('VDI.get_by_uuid', vdi_uuid)
- vm_ref = yield VMHelper.create_vm(self._session,
- instance, kernel, ramdisk)
- yield VMHelper.create_vbd(self._session, vm_ref, vdi_ref, 0, True)
+ vdi_uuid = VMHelper.fetch_image(
+ self._session, instance.image_id, user, project, True)
+ kernel = VMHelper.fetch_image(
+ self._session, instance.kernel_id, user, project, False)
+ ramdisk = VMHelper.fetch_image(
+ self._session, instance.ramdisk_id, user, project, False)
+ vdi_ref = self._session.call_xenapi('VDI.get_by_uuid', vdi_uuid)
+ vm_ref = VMHelper.create_vm(
+ self._session, instance, kernel, ramdisk)
+ VMHelper.create_vbd(self._session, vm_ref, vdi_ref, 0, True)
if network_ref:
- yield VMHelper.create_vif(self._session, vm_ref,
- network_ref, instance.mac_address)
+ VMHelper.create_vif(self._session, vm_ref,
+ network_ref, instance.mac_address)
logging.debug('Starting VM %s...', vm_ref)
- yield self._session.call_xenapi('VM.start', vm_ref, False, False)
+ self._session.call_xenapi('VM.start', vm_ref, False, False)
logging.info('Spawning VM %s created %s.', instance.name,
vm_ref)
- @defer.inlineCallbacks
def reboot(self, instance):
""" Reboot VM instance """
instance_name = instance.name
- vm = yield VMHelper.lookup(self._session, instance_name)
+ vm = VMHelper.lookup(self._session, instance_name)
if vm is None:
raise Exception('instance not present %s' % instance_name)
- task = yield self._session.call_xenapi('Async.VM.clean_reboot', vm)
- yield self._session.wait_for_task(task)
+ task = self._session.call_xenapi('Async.VM.clean_reboot', vm)
+ self._session.wait_for_task(task)
- @defer.inlineCallbacks
def destroy(self, instance):
""" Destroy VM instance """
- vm = yield VMHelper.lookup(self._session, instance.name)
+ vm = VMHelper.lookup(self._session, instance.name)
if vm is None:
# Don't complain, just return. This lets us clean up instances
# that have already disappeared from the underlying platform.
- defer.returnValue(None)
+ return
# Get the VDIs related to the VM
- vdis = yield VMHelper.lookup_vm_vdis(self._session, vm)
+ vdis = VMHelper.lookup_vm_vdis(self._session, vm)
try:
- task = yield self._session.call_xenapi('Async.VM.hard_shutdown',
+ task = self._session.call_xenapi('Async.VM.hard_shutdown',
vm)
- yield self._session.wait_for_task(task)
+ self._session.wait_for_task(task)
except XenAPI.Failure, exc:
logging.warn(exc)
# Disk clean-up
if vdis:
for vdi in vdis:
try:
- task = yield self._session.call_xenapi('Async.VDI.destroy',
- vdi)
- yield self._session.wait_for_task(task)
+ task = self._session.call_xenapi('Async.VDI.destroy', vdi)
+ self._session.wait_for_task(task)
except XenAPI.Failure, exc:
logging.warn(exc)
try:
- task = yield self._session.call_xenapi('Async.VM.destroy', vm)
- yield self._session.wait_for_task(task)
+ task = self._session.call_xenapi('Async.VM.destroy', vm)
+ self._session.wait_for_task(task)
except XenAPI.Failure, exc:
logging.warn(exc)
@@ -131,14 +125,13 @@ class VMOps(object):
rec = self._session.get_xenapi().VM.get_record(vm)
return VMHelper.compile_info(rec)
- @defer.inlineCallbacks
def get_diagnostics(self, instance_id):
"""Return data about VM diagnostics"""
- vm = yield VMHelper.lookup(self._session, instance_id)
+ vm = VMHelper.lookup(self._session, instance_id)
if vm is None:
raise Exception("instance not present %s" % instance_id)
- rec = yield self._session.get_xenapi().VM.get_record(vm)
- defer.returnValue(VMHelper.compile_diagnostics(self._session, rec))
+ rec = self._session.get_xenapi().VM.get_record(vm)
+ return VMHelper.compile_diagnostics(self._session, rec)
def get_console_output(self, instance):
""" Return snapshot of console """
diff --git a/nova/virt/xenapi_conn.py b/nova/virt/xenapi_conn.py
index 2153810c85..6beb08f5e8 100644
--- a/nova/virt/xenapi_conn.py
+++ b/nova/virt/xenapi_conn.py
@@ -48,10 +48,11 @@ reactor thread if the VM.get_by_name_label or VM.get_record calls block.
"""
import logging
+import sys
import xmlrpclib
-from twisted.internet import defer
-from twisted.internet import reactor
+from eventlet import event
+from eventlet import tpool
from nova import utils
from nova import flags
@@ -159,53 +160,51 @@ class XenAPISession(object):
""" Return the xenapi host """
return self._session.xenapi.session.get_this_host(self._session.handle)
- @utils.deferredToThread
def call_xenapi(self, method, *args):
- """Call the specified XenAPI method on a background thread. Returns
- a Deferred for the result."""
+ """Call the specified XenAPI method on a background thread."""
f = self._session.xenapi
for m in method.split('.'):
f = f.__getattr__(m)
- return f(*args)
+ return tpool.execute(f, *args)
- @utils.deferredToThread
def async_call_plugin(self, plugin, fn, args):
- """Call Async.host.call_plugin on a background thread. Returns a
- Deferred with the task reference."""
- return _unwrap_plugin_exceptions(
- self._session.xenapi.Async.host.call_plugin,
- self.get_xenapi_host(), plugin, fn, args)
+ """Call Async.host.call_plugin on a background thread."""
+ return tpool.execute(_unwrap_plugin_exceptions,
+ self._session.xenapi.Async.host.call_plugin,
+ self.get_xenapi_host(), plugin, fn, args)
def wait_for_task(self, task):
"""Return a Deferred that will give the result of the given task.
The task is polled until it completes."""
- d = defer.Deferred()
- reactor.callLater(0, self._poll_task, task, d)
- return d
- @utils.deferredToThread
- def _poll_task(self, task, deferred):
+ done = event.Event()
+ loop = utils.LoopingCall(self._poll_task, task, done)
+ loop.start(FLAGS.xenapi_task_poll_interval, now=True)
+ rv = done.wait()
+ loop.stop()
+ return rv
+
+ def _poll_task(self, task, done):
"""Poll the given XenAPI task, and fire the given Deferred if we
get a result."""
try:
#logging.debug('Polling task %s...', task)
status = self._session.xenapi.task.get_status(task)
if status == 'pending':
- reactor.callLater(FLAGS.xenapi_task_poll_interval,
- self._poll_task, task, deferred)
+ return
elif status == 'success':
result = self._session.xenapi.task.get_result(task)
logging.info('Task %s status: success. %s', task, result)
- deferred.callback(_parse_xmlrpc_value(result))
+ done.send(_parse_xmlrpc_value(result))
else:
error_info = self._session.xenapi.task.get_error_info(task)
logging.warn('Task %s status: %s. %s', task, status,
error_info)
- deferred.errback(XenAPI.Failure(error_info))
- #logging.debug('Polling task %s done.', task)
+ done.send_exception(XenAPI.Failure(error_info))
+ #logging.debug('Polling task %s done.', task)
except XenAPI.Failure, exc:
logging.warn(exc)
- deferred.errback(exc)
+ done.send_exception(*sys.exc_info())
def _unwrap_plugin_exceptions(func, *args, **kwargs):
diff --git a/nova/volume/driver.py b/nova/volume/driver.py
index 156aad2a09..1cd4c1fd45 100644
--- a/nova/volume/driver.py
+++ b/nova/volume/driver.py
@@ -22,12 +22,10 @@ Drivers for volumes.
import logging
import os
-
-from twisted.internet import defer
+import time
from nova import exception
from nova import flags
-from nova import process
from nova import utils
@@ -55,14 +53,13 @@ flags.DEFINE_string('iscsi_ip_prefix', '127.0',
class VolumeDriver(object):
"""Executes commands relating to Volumes."""
- def __init__(self, execute=process.simple_execute,
+ def __init__(self, execute=utils.execute,
sync_exec=utils.execute, *args, **kwargs):
# NOTE(vish): db is set by Manager
self.db = None
self._execute = execute
self._sync_exec = sync_exec
- @defer.inlineCallbacks
def _try_execute(self, command):
# NOTE(vish): Volume commands can partially fail due to timing, but
# running them a second time on failure will usually
@@ -70,15 +67,15 @@ class VolumeDriver(object):
tries = 0
while True:
try:
- yield self._execute(command)
- defer.returnValue(True)
+ self._execute(command)
+ return True
except exception.ProcessExecutionError:
tries = tries + 1
if tries >= FLAGS.num_shell_tries:
raise
logging.exception("Recovering from a failed execute."
"Try number %s", tries)
- yield self._execute("sleep %s" % tries ** 2)
+ time.sleep(tries ** 2)
def check_for_setup_error(self):
"""Returns an error if prerequisites aren't met"""
@@ -86,53 +83,45 @@ class VolumeDriver(object):
raise exception.Error("volume group %s doesn't exist"
% FLAGS.volume_group)
- @defer.inlineCallbacks
def create_volume(self, volume):
"""Creates a logical volume."""
if int(volume['size']) == 0:
sizestr = '100M'
else:
sizestr = '%sG' % volume['size']
- yield self._try_execute("sudo lvcreate -L %s -n %s %s" %
- (sizestr,
- volume['name'],
- FLAGS.volume_group))
+ self._try_execute("sudo lvcreate -L %s -n %s %s" %
+ (sizestr,
+ volume['name'],
+ FLAGS.volume_group))
- @defer.inlineCallbacks
def delete_volume(self, volume):
"""Deletes a logical volume."""
- yield self._try_execute("sudo lvremove -f %s/%s" %
- (FLAGS.volume_group,
- volume['name']))
+ self._try_execute("sudo lvremove -f %s/%s" %
+ (FLAGS.volume_group,
+ volume['name']))
- @defer.inlineCallbacks
def local_path(self, volume):
- yield # NOTE(vish): stops deprecation warning
+ # NOTE(vish): stops deprecation warning
escaped_group = FLAGS.volume_group.replace('-', '--')
escaped_name = volume['name'].replace('-', '--')
- defer.returnValue("/dev/mapper/%s-%s" % (escaped_group,
- escaped_name))
+ return "/dev/mapper/%s-%s" % (escaped_group, escaped_name)
def ensure_export(self, context, volume):
"""Synchronously recreates an export for a logical volume."""
raise NotImplementedError()
- @defer.inlineCallbacks
def create_export(self, context, volume):
"""Exports the volume."""
raise NotImplementedError()
- @defer.inlineCallbacks
def remove_export(self, context, volume):
"""Removes an export for a logical volume."""
raise NotImplementedError()
- @defer.inlineCallbacks
def discover_volume(self, volume):
"""Discover volume on a remote host."""
raise NotImplementedError()
- @defer.inlineCallbacks
def undiscover_volume(self, volume):
"""Undiscover volume on a remote host."""
raise NotImplementedError()
@@ -155,14 +144,13 @@ class AOEDriver(VolumeDriver):
dev = {'shelf_id': shelf_id, 'blade_id': blade_id}
self.db.export_device_create_safe(context, dev)
- @defer.inlineCallbacks
def create_export(self, context, volume):
"""Creates an export for a logical volume."""
self._ensure_blades(context)
(shelf_id,
blade_id) = self.db.volume_allocate_shelf_and_blade(context,
volume['id'])
- yield self._try_execute(
+ self._try_execute(
"sudo vblade-persist setup %s %s %s /dev/%s/%s" %
(shelf_id,
blade_id,
@@ -176,33 +164,30 @@ class AOEDriver(VolumeDriver):
# still works for the other volumes, so we
# just wait a bit for the current volume to
# be ready and ignore any errors.
- yield self._execute("sleep 2")
- yield self._execute("sudo vblade-persist auto all",
- check_exit_code=False)
- yield self._execute("sudo vblade-persist start all",
- check_exit_code=False)
+ time.sleep(2)
+ self._execute("sudo vblade-persist auto all",
+ check_exit_code=False)
+ self._execute("sudo vblade-persist start all",
+ check_exit_code=False)
- @defer.inlineCallbacks
def remove_export(self, context, volume):
"""Removes an export for a logical volume."""
(shelf_id,
blade_id) = self.db.volume_get_shelf_and_blade(context,
volume['id'])
- yield self._try_execute("sudo vblade-persist stop %s %s" %
- (shelf_id, blade_id))
- yield self._try_execute("sudo vblade-persist destroy %s %s" %
- (shelf_id, blade_id))
+ self._try_execute("sudo vblade-persist stop %s %s" %
+ (shelf_id, blade_id))
+ self._try_execute("sudo vblade-persist destroy %s %s" %
+ (shelf_id, blade_id))
- @defer.inlineCallbacks
def discover_volume(self, _volume):
"""Discover volume on a remote host."""
- yield self._execute("sudo aoe-discover")
- yield self._execute("sudo aoe-stat", check_exit_code=False)
+ self._execute("sudo aoe-discover")
+ self._execute("sudo aoe-stat", check_exit_code=False)
- @defer.inlineCallbacks
def undiscover_volume(self, _volume):
"""Undiscover volume on a remote host."""
- yield
+ pass
class FakeAOEDriver(AOEDriver):
@@ -252,7 +237,6 @@ class ISCSIDriver(VolumeDriver):
target = {'host': host, 'target_num': target_num}
self.db.iscsi_target_create_safe(context, target)
- @defer.inlineCallbacks
def create_export(self, context, volume):
"""Creates an export for a logical volume."""
self._ensure_iscsi_targets(context, volume['host'])
@@ -261,61 +245,55 @@ class ISCSIDriver(VolumeDriver):
volume['host'])
iscsi_name = "%s%s" % (FLAGS.iscsi_target_prefix, volume['name'])
volume_path = "/dev/%s/%s" % (FLAGS.volume_group, volume['name'])
- yield self._execute("sudo ietadm --op new "
- "--tid=%s --params Name=%s" %
- (iscsi_target, iscsi_name))
- yield self._execute("sudo ietadm --op new --tid=%s "
- "--lun=0 --params Path=%s,Type=fileio" %
- (iscsi_target, volume_path))
-
- @defer.inlineCallbacks
+ self._execute("sudo ietadm --op new "
+ "--tid=%s --params Name=%s" %
+ (iscsi_target, iscsi_name))
+ self._execute("sudo ietadm --op new --tid=%s "
+ "--lun=0 --params Path=%s,Type=fileio" %
+ (iscsi_target, volume_path))
+
def remove_export(self, context, volume):
"""Removes an export for a logical volume."""
iscsi_target = self.db.volume_get_iscsi_target_num(context,
volume['id'])
- yield self._execute("sudo ietadm --op delete --tid=%s "
- "--lun=0" % iscsi_target)
- yield self._execute("sudo ietadm --op delete --tid=%s" %
- iscsi_target)
+ self._execute("sudo ietadm --op delete --tid=%s "
+ "--lun=0" % iscsi_target)
+ self._execute("sudo ietadm --op delete --tid=%s" %
+ iscsi_target)
- @defer.inlineCallbacks
def _get_name_and_portal(self, volume_name, host):
"""Gets iscsi name and portal from volume name and host."""
- (out, _err) = yield self._execute("sudo iscsiadm -m discovery -t "
- "sendtargets -p %s" % host)
+ (out, _err) = self._execute("sudo iscsiadm -m discovery -t "
+ "sendtargets -p %s" % host)
for target in out.splitlines():
if FLAGS.iscsi_ip_prefix in target and volume_name in target:
(location, _sep, iscsi_name) = target.partition(" ")
break
iscsi_portal = location.split(",")[0]
- defer.returnValue((iscsi_name, iscsi_portal))
+ return (iscsi_name, iscsi_portal)
- @defer.inlineCallbacks
def discover_volume(self, volume):
"""Discover volume on a remote host."""
- (iscsi_name,
- iscsi_portal) = yield self._get_name_and_portal(volume['name'],
- volume['host'])
- yield self._execute("sudo iscsiadm -m node -T %s -p %s --login" %
- (iscsi_name, iscsi_portal))
- yield self._execute("sudo iscsiadm -m node -T %s -p %s --op update "
- "-n node.startup -v automatic" %
- (iscsi_name, iscsi_portal))
- defer.returnValue("/dev/iscsi/%s" % volume['name'])
-
- @defer.inlineCallbacks
+ iscsi_name, iscsi_portal = self._get_name_and_portal(volume['name'],
+ volume['host'])
+ self._execute("sudo iscsiadm -m node -T %s -p %s --login" %
+ (iscsi_name, iscsi_portal))
+ self._execute("sudo iscsiadm -m node -T %s -p %s --op update "
+ "-n node.startup -v automatic" %
+ (iscsi_name, iscsi_portal))
+ return "/dev/iscsi/%s" % volume['name']
+
def undiscover_volume(self, volume):
"""Undiscover volume on a remote host."""
- (iscsi_name,
- iscsi_portal) = yield self._get_name_and_portal(volume['name'],
- volume['host'])
- yield self._execute("sudo iscsiadm -m node -T %s -p %s --op update "
- "-n node.startup -v manual" %
- (iscsi_name, iscsi_portal))
- yield self._execute("sudo iscsiadm -m node -T %s -p %s --logout " %
- (iscsi_name, iscsi_portal))
- yield self._execute("sudo iscsiadm -m node --op delete "
- "--targetname %s" % iscsi_name)
+ iscsi_name, iscsi_portal = self._get_name_and_portal(volume['name'],
+ volume['host'])
+ self._execute("sudo iscsiadm -m node -T %s -p %s --op update "
+ "-n node.startup -v manual" %
+ (iscsi_name, iscsi_portal))
+ self._execute("sudo iscsiadm -m node -T %s -p %s --logout " %
+ (iscsi_name, iscsi_portal))
+ self._execute("sudo iscsiadm -m node --op delete "
+ "--targetname %s" % iscsi_name)
class FakeISCSIDriver(ISCSIDriver):
diff --git a/nova/volume/manager.py b/nova/volume/manager.py
index 589e7d7d97..7da125cac0 100644
--- a/nova/volume/manager.py
+++ b/nova/volume/manager.py
@@ -45,7 +45,6 @@ intact.
import logging
import datetime
-from twisted.internet import defer
from nova import context
from nova import exception
@@ -86,7 +85,6 @@ class VolumeManager(manager.Manager):
for volume in volumes:
self.driver.ensure_export(ctxt, volume)
- @defer.inlineCallbacks
def create_volume(self, context, volume_id):
"""Creates and exports the volume."""
context = context.elevated()
@@ -102,19 +100,18 @@ class VolumeManager(manager.Manager):
logging.debug("volume %s: creating lv of size %sG",
volume_ref['name'], volume_ref['size'])
- yield self.driver.create_volume(volume_ref)
+ self.driver.create_volume(volume_ref)
logging.debug("volume %s: creating export", volume_ref['name'])
- yield self.driver.create_export(context, volume_ref)
+ self.driver.create_export(context, volume_ref)
now = datetime.datetime.utcnow()
self.db.volume_update(context,
volume_ref['id'], {'status': 'available',
'launched_at': now})
logging.debug("volume %s: created successfully", volume_ref['name'])
- defer.returnValue(volume_id)
+ return volume_id
- @defer.inlineCallbacks
def delete_volume(self, context, volume_id):
"""Deletes and unexports volume."""
context = context.elevated()
@@ -124,14 +121,13 @@ class VolumeManager(manager.Manager):
if volume_ref['host'] != self.host:
raise exception.Error("Volume is not local to this node")
logging.debug("volume %s: removing export", volume_ref['name'])
- yield self.driver.remove_export(context, volume_ref)
+ self.driver.remove_export(context, volume_ref)
logging.debug("volume %s: deleting", volume_ref['name'])
- yield self.driver.delete_volume(volume_ref)
+ self.driver.delete_volume(volume_ref)
self.db.volume_destroy(context, volume_id)
logging.debug("volume %s: deleted successfully", volume_ref['name'])
- defer.returnValue(True)
+ return True
- @defer.inlineCallbacks
def setup_compute_volume(self, context, volume_id):
"""Setup remote volume on compute host.
@@ -139,17 +135,16 @@ class VolumeManager(manager.Manager):
context = context.elevated()
volume_ref = self.db.volume_get(context, volume_id)
if volume_ref['host'] == self.host and FLAGS.use_local_volumes:
- path = yield self.driver.local_path(volume_ref)
+ path = self.driver.local_path(volume_ref)
else:
- path = yield self.driver.discover_volume(volume_ref)
- defer.returnValue(path)
+ path = self.driver.discover_volume(volume_ref)
+ return path
- @defer.inlineCallbacks
def remove_compute_volume(self, context, volume_id):
"""Remove remote volume on compute host."""
context = context.elevated()
volume_ref = self.db.volume_get(context, volume_id)
if volume_ref['host'] == self.host and FLAGS.use_local_volumes:
- defer.returnValue(True)
+ return True
else:
- yield self.driver.undiscover_volume(volume_ref)
+ self.driver.undiscover_volume(volume_ref)
diff --git a/run_tests.py b/run_tests.py
index 3d427d8af2..6a4b7f1aba 100644
--- a/run_tests.py
+++ b/run_tests.py
@@ -39,10 +39,16 @@ Due to our use of multiprocessing it we frequently get some ignorable
"""
+import eventlet
+eventlet.monkey_patch()
+
import __main__
+import gettext
import os
import sys
+gettext.install('nova', unicode=1)
+
from twisted.scripts import trial as trial_script
from nova import flags
@@ -56,15 +62,12 @@ from nova.tests.compute_unittest import *
from nova.tests.flags_unittest import *
from nova.tests.misc_unittest import *
from nova.tests.network_unittest import *
-from nova.tests.objectstore_unittest import *
-from nova.tests.process_unittest import *
+#from nova.tests.objectstore_unittest import *
from nova.tests.quota_unittest import *
from nova.tests.rpc_unittest import *
from nova.tests.scheduler_unittest import *
from nova.tests.service_unittest import *
from nova.tests.twistd_unittest import *
-from nova.tests.validator_unittest import *
-from nova.tests.virt_unittest import *
from nova.tests.virt_unittest import *
from nova.tests.volume_unittest import *
diff --git a/tools/pip-requires b/tools/pip-requires
index 6bdadf3ed7..e9559521b5 100644
--- a/tools/pip-requires
+++ b/tools/pip-requires
@@ -22,3 +22,4 @@ mox==0.5.0
greenlet==0.3.1
nose
bzr
+Twisted>=10.1.0 \ No newline at end of file