summaryrefslogtreecommitdiff
path: root/virtinst/diskbackend.py
diff options
context:
space:
mode:
authorCole Robinson <crobinso@redhat.com>2020-01-29 07:30:51 -0500
committerCole Robinson <crobinso@redhat.com>2020-01-29 08:38:18 -0500
commite7bb021c4c243cdcfbf1db472c45220a2e2ee58f (patch)
tree54619021e2b981bc7d39f78c899b7d92d026dc81 /virtinst/diskbackend.py
parent04c0d48ef75c0675f4ec6654668df62085779645 (diff)
downloadvirt-manager-e7bb021c4c243cdcfbf1db472c45220a2e2ee58f.tar.gz
diskbackend: Add lots of test coverage
Signed-off-by: Cole Robinson <crobinso@redhat.com>
Diffstat (limited to 'virtinst/diskbackend.py')
-rw-r--r--virtinst/diskbackend.py202
1 files changed, 94 insertions, 108 deletions
diff --git a/virtinst/diskbackend.py b/virtinst/diskbackend.py
index 80f900e0..6567b096 100644
--- a/virtinst/diskbackend.py
+++ b/virtinst/diskbackend.py
@@ -50,29 +50,29 @@ def _lookup_vol_by_basename(pool, path):
return pool.storageVolLookupByName(name)
-def _stat_disk(path):
- """
- Returns the tuple (isreg, size)
- """
- if not os.path.exists(path):
- return True, 0
+def _get_block_size(path):
+ try:
+ fd = os.open(path, os.O_RDONLY)
+ # os.SEEK_END is not present on all systems
+ size = os.lseek(fd, 0, 2) # pragma: no cover
+ os.close(fd) # pragma: no cover
+ except Exception:
+ size = 0
+ return size
- mode = os.stat(path)[stat.ST_MODE]
- # os.path.getsize('/dev/..') can be zero on some platforms
- if stat.S_ISBLK(mode):
- try:
- fd = os.open(path, os.O_RDONLY)
- # os.SEEK_END is not present on all systems
- size = os.lseek(fd, 0, 2)
- os.close(fd)
- except Exception:
- size = 0
- return False, size
- elif stat.S_ISREG(mode):
- return True, os.path.getsize(path)
+def _get_size(path):
+ if not os.path.exists(path):
+ return 0
+ if _stat_is_block(path):
+ return _get_block_size(path)
+ return os.path.getsize(path)
- return True, 0
+
+def _stat_is_block(path):
+ if not os.path.exists(path):
+ return False
+ return stat.S_ISBLK(os.stat(path)[stat.ST_MODE])
def _check_if_path_managed(conn, path):
@@ -98,14 +98,14 @@ def _check_if_path_managed(conn, path):
if verr:
try:
vol = _lookup_vol_by_basename(pool, path)
- except Exception:
+ except Exception: # pragma: no cover
pass
- except Exception as e:
+ except Exception as e: # pragma: no cover
vol = None
pool = None
verr = str(e)
- if not vol and not pool and verr:
+ if not vol and not pool and verr: # pragma: no cover
raise ValueError(_("Cannot use storage %(path)s: %(err)s") %
{'path': path, 'err': verr})
@@ -178,18 +178,19 @@ def _get_dev_type(path, vol_xml, vol_object, pool_xml, remote):
"""
Try to get device type for volume.
"""
- if vol_xml:
- if vol_xml.type:
- return vol_xml.type
+ if vol_xml and vol_xml.type:
+ return vol_xml.type
- # If vol_xml.type is None the vol_xml.file_type can return only
- # these types: block, network or file
- if vol_xml.file_type == libvirt.VIR_STORAGE_VOL_BLOCK:
+ if pool_xml:
+ t = pool_xml.get_disk_type()
+ if t == StorageVolume.TYPE_BLOCK:
return "block"
- elif vol_xml.file_type == libvirt.VIR_STORAGE_VOL_NETWORK:
+ elif t == StorageVolume.TYPE_NETWORK:
return "network"
- if vol_object:
+ if vol_object: # pragma: no cover
+ # This path is hard to test, because test suite XML always has
+ # the vol_xml.type set
t = vol_object.info()[0]
if t == StorageVolume.TYPE_FILE:
return "file"
@@ -198,13 +199,6 @@ def _get_dev_type(path, vol_xml, vol_object, pool_xml, remote):
elif t == StorageVolume.TYPE_NETWORK:
return "network"
- if pool_xml:
- t = pool_xml.get_disk_type()
- if t == StorageVolume.TYPE_BLOCK:
- return "block"
- elif t == StorageVolume.TYPE_NETWORK:
- return "network"
-
if path:
if path_is_url(path):
return "network"
@@ -212,9 +206,7 @@ def _get_dev_type(path, vol_xml, vol_object, pool_xml, remote):
if not remote:
if os.path.isdir(path):
return "dir"
- elif _stat_disk(path)[0]:
- return "file"
- else:
+ elif _stat_is_block(path):
return "block"
return "file"
@@ -246,8 +238,11 @@ def path_definitely_exists(conn, path):
# ACL/path perm helpers #
#########################
+SETFACL = "setfacl"
+
+
def _fix_perms_acl(dirname, username):
- cmd = ["setfacl", "--modify", "user:%s:x" % username, dirname]
+ cmd = [SETFACL, "--modify", "user:%s:x" % username, dirname]
proc = subprocess.Popen(cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
@@ -269,8 +264,8 @@ def _fix_perms_chmod(dirname):
if os.stat(dirname).st_mode != newmode:
# Trying to change perms on vfat at least doesn't work
# but also doesn't seem to error. Try and detect that
- raise ValueError(_("Permissions on '%s' did not stick") %
- dirname)
+ raise ValueError( # pragma: no cover
+ _("Permissions on '%s' did not stick") % dirname)
def set_dirs_searchable(dirlist, username):
@@ -289,7 +284,7 @@ def set_dirs_searchable(dirlist, username):
try:
# If we reach here, ACL setting failed, try chmod
_fix_perms_chmod(dirname)
- except Exception as e:
+ except Exception as e: # pragma: no cover
errdict[dirname] = str(e)
return errdict
@@ -304,13 +299,13 @@ def _is_dir_searchable(dirname, uid, username):
try:
statinfo = os.stat(dirname)
- except OSError:
+ except OSError: # pragma: no cover
return False
if uid == statinfo.st_uid:
flag = stat.S_IXUSR
elif uid == statinfo.st_gid:
- flag = stat.S_IXGRP
+ flag = stat.S_IXGRP # pragma: no cover
else:
flag = stat.S_IXOTH
@@ -324,11 +319,11 @@ def _is_dir_searchable(dirname, uid, username):
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
out, err = proc.communicate()
- except OSError:
+ except OSError: # pragma: no cover
log.debug("Didn't find the getfacl command.")
return False
- if proc.returncode != 0:
+ if proc.returncode != 0: # pragma: no cover
log.debug("Cmd '%s' failed: %s", cmd, err)
return False
@@ -386,7 +381,7 @@ class _StorageBase(object):
self._parent_pool_xml = StoragePool(self._conn,
parsexml=self.get_parent_pool().XMLDesc(0))
return self._parent_pool_xml
- def validate(self, disk):
+ def validate(self):
raise NotImplementedError()
def get_path(self):
raise NotImplementedError()
@@ -424,7 +419,11 @@ class _StorageCreator(_StorageBase):
##############
def create(self, progresscb):
- raise NotImplementedError()
+ raise NotImplementedError
+ def validate(self):
+ raise NotImplementedError
+ def get_size(self):
+ raise NotImplementedError
def get_path(self):
if self._vol_install and not self._path:
@@ -445,12 +444,6 @@ class _StorageCreator(_StorageBase):
def get_vol_xml(self):
return self._vol_install
- def get_size(self):
- if self._size is None:
- self._size = (float(self._vol_install.capacity) /
- 1024.0 / 1024.0 / 1024.0)
- return self._size
-
def get_dev_type(self):
if not self._dev_type:
self._dev_type = _get_dev_type(self._path, self._vol_install, None,
@@ -464,25 +457,6 @@ class _StorageCreator(_StorageBase):
return self._vol_install.format
return "raw"
- def validate(self, disk):
- if disk.device in ["floppy", "cdrom"]:
- raise ValueError(_("Cannot create storage for %s device.") %
- disk.device)
-
- if self._vol_install:
- self._vol_install.validate()
- return
-
- if self._size is None:
- raise ValueError(_("size is required for non-existent disk "
- "'%s'" % self.get_path()))
-
- err, msg = self.is_size_conflict()
- if err:
- raise ValueError(msg)
- if msg:
- log.warning(msg)
-
def will_create_storage(self):
return True
def get_vol_object(self):
@@ -495,6 +469,28 @@ class _StorageCreator(_StorageBase):
return False
+class ManagedStorageCreator(_StorageCreator):
+ """
+ Handles storage creation via libvirt APIs. All the actual creation
+ logic lives in StorageVolume, this is mostly about pulling out bits
+ from that class and mapping them to DeviceDisk elements
+ """
+ def __init__(self, conn, vol_install):
+ _StorageCreator.__init__(self, conn)
+
+ self._pool = vol_install.pool
+ self._vol_install = vol_install
+
+ def create(self, progresscb):
+ return self._vol_install.install(meter=progresscb)
+ def is_size_conflict(self):
+ return self._vol_install.is_size_conflict()
+ def validate(self):
+ return self._vol_install.validate()
+ def get_size(self):
+ return float(self._vol_install.capacity) / 1024.0 / 1024.0 / 1024.0
+
+
class CloneStorageCreator(_StorageCreator):
"""
Handles manually copying local files for Cloner
@@ -511,11 +507,14 @@ class CloneStorageCreator(_StorageCreator):
self._size = size
self._sparse = sparse
+ def get_size(self):
+ return self._size
+
def is_size_conflict(self):
ret = False
msg = None
if self.get_dev_type() == "block":
- avail = _stat_disk(self._path)[1]
+ avail = _get_size(self._path)
else:
vfs = os.statvfs(os.path.dirname(self._path))
avail = vfs.f_frsize * vfs.f_bavail
@@ -535,6 +534,17 @@ class CloneStorageCreator(_StorageCreator):
((need // (1024 * 1024)), (avail // (1024 * 1024))))
return (ret, msg)
+ def validate(self):
+ if self._size is None:
+ raise ValueError(_("size is required for non-existent disk "
+ "'%s'" % self.get_path()))
+
+ err, msg = self.is_size_conflict()
+ if err:
+ raise ValueError(msg)
+ if msg:
+ log.warning(msg)
+
def create(self, progresscb):
text = (_("Cloning %(srcfile)s") %
{'srcfile': os.path.basename(self._input_path)})
@@ -547,7 +557,7 @@ class CloneStorageCreator(_StorageCreator):
self._clone_local(progresscb, size_bytes)
def _clone_local(self, meter, size_bytes):
- if self._input_path == "/dev/null":
+ if self._input_path == "/dev/null": # pragma: no cover
# Not really sure why this check is here,
# but keeping for compat
log.debug("Source dev was /dev/null. Skipping")
@@ -605,7 +615,7 @@ class CloneStorageCreator(_StorageCreator):
i += s
if i < size_bytes:
meter.update(i)
- except OSError as e:
+ except OSError as e: # pragma: no cover
raise RuntimeError(_("Error cloning diskimage %s to %s: %s") %
(self._input_path, self._output_path, str(e)))
finally:
@@ -615,24 +625,6 @@ class CloneStorageCreator(_StorageCreator):
os.close(dst_fd)
-class ManagedStorageCreator(_StorageCreator):
- """
- Handles storage creation via libvirt APIs. All the actual creation
- logic lives in StorageVolume, this is mostly about pulling out bits
- from that class and mapping them to DeviceDisk elements
- """
- def __init__(self, conn, vol_install):
- _StorageCreator.__init__(self, conn)
-
- self._pool = vol_install.pool
- self._vol_install = vol_install
-
- def create(self, progresscb):
- return self._vol_install.install(meter=progresscb)
- def is_size_conflict(self):
- return self._vol_install.is_size_conflict()
-
-
class StorageBackendStub(_StorageBase):
"""
Class representing a storage path for a parsed XML disk, that we
@@ -662,8 +654,7 @@ class StorageBackendStub(_StorageBase):
def get_driver_type(self):
return self._driver_type
- def validate(self, disk):
- ignore = disk
+ def validate(self):
return
def get_vol_install(self):
return None
@@ -732,15 +723,15 @@ class StorageBackend(_StorageBase):
if self._vol_object:
ret = self.get_vol_xml().capacity
elif self._path:
- ret = _stat_disk(self._path)[1]
+ ret = _get_size(self._path)
self._size = (float(ret) / 1024.0 / 1024.0 / 1024.0)
return self._size
def exists(self):
if self._exists is None:
- if self._path is None:
+ if self._vol_object:
self._exists = True
- elif self._vol_object:
+ elif self._path is None:
self._exists = True
elif (not self.get_dev_type() == "network" and
not self._conn.is_remote() and
@@ -781,8 +772,7 @@ class StorageBackend(_StorageBase):
return ret
return None
- def validate(self, disk):
- ignore = disk
+ def validate(self):
return
def get_vol_install(self):
return None
@@ -790,7 +780,3 @@ class StorageBackend(_StorageBase):
return (False, None)
def will_create_storage(self):
return False
- def create(self, progresscb):
- ignore = progresscb
- raise RuntimeError("programming error: %s can't create storage" %
- self.__class__.__name__)