diff options
Diffstat (limited to 'morphlib')
-rw-r--r-- | morphlib/fsutils.py | 82 | ||||
-rw-r--r-- | morphlib/fsutils_tests.py | 74 |
2 files changed, 148 insertions, 8 deletions
diff --git a/morphlib/fsutils.py b/morphlib/fsutils.py index 9786e387..84884be8 100644 --- a/morphlib/fsutils.py +++ b/morphlib/fsutils.py @@ -1,4 +1,4 @@ -# Copyright (C) 2012 Codethink Limited +# Copyright (C) 2012-2013 Codethink Limited # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -17,18 +17,18 @@ import os import re -def create_image(runcmd, image_name, size): +def create_image(runcmd, image_name, size): # pragma: no cover # FIXME a pure python implementation may be better runcmd(['dd', 'if=/dev/zero', 'of=' + image_name, 'bs=1', 'seek=%d' % size, 'count=0']) -def partition_image(runcmd, image_name): +def partition_image(runcmd, image_name): # pragma: no cover # FIXME make this more flexible with partitioning options runcmd(['sfdisk', image_name], feed_stdin='1,,83,*\n') -def setup_device_mapping(runcmd, image_name): +def setup_device_mapping(runcmd, image_name): # pragma: no cover findstart = re.compile(r"start=\s+(\d+),") out = runcmd(['sfdisk', '-d', image_name]) for line in out.splitlines(): @@ -43,23 +43,89 @@ def setup_device_mapping(runcmd, image_name): return device.strip() -def create_fs(runcmd, partition): +def create_fs(runcmd, partition): # pragma: no cover runcmd(['mkfs.btrfs', '-L', 'baserock', partition]) -def mount(runcmd, partition, mount_point): +def mount(runcmd, partition, mount_point): # pragma: no cover if not os.path.exists(mount_point): os.mkdir(mount_point) runcmd(['mount', partition, mount_point]) -def unmount(runcmd, mount_point): +def unmount(runcmd, mount_point): # pragma: no cover runcmd(['umount', mount_point]) -def undo_device_mapping(runcmd, image_name): +def undo_device_mapping(runcmd, image_name): # pragma: no cover out = runcmd(['losetup', '-j', image_name]) for line in out.splitlines(): i = line.find(':') device = line[:i] runcmd(['losetup', '-d', device]) + + +def invert_paths(tree_walker, paths): + '''List paths from `tree_walker` that are not in `paths`. + + Given a traversal of a tree and a set of paths separated by os.sep, + return the files and directories that are not part of the set of + paths, culling directories that do not need to be recursed into, + if the traversal supports this. + + `tree_walker` is expected to follow similar behaviour to `os.walk()`. + + This function will remove directores from the ones listed, to avoid + traversing into these subdirectories, if it doesn't need to. + + As such, if a directory is returned, it is implied that its contents + are also not in the set of paths. + + If the tree walker does not support culling the traversal this way, + such as `os.walk(root, topdown=False)`, then the contents will also + be returned. + + The purpose for this is to list the directories that can be made + read-only, such that it would leave everything in paths writable. + + Each path in `paths` is expected to begin with the same path as + yielded by the tree walker. + + ''' + + def is_subpath(prefix, path): + prefix_components = prefix.split(os.sep) + path_components = path.split(os.sep) + return path_components[:len(prefix_components)] == prefix_components + + for dirpath, dirnames, filenames in tree_walker: + + dn_copy = list(dirnames) + for subdir in dn_copy: + subdirpath = os.path.join(dirpath, subdir) + for p in paths: + # Subdir is an exact match for a given path + # Don't recurse into it, so remove from list + # Also don't yield it as we're inverting + if subdirpath == p: + dirnames.remove(subdir) + break + # This directory is a parent directory of one + # of our paths, recurse into it, but don't yield it + elif is_subpath(subdirpath, p): + break + else: + dirnames.remove(subdir) + yield subdirpath + + for filename in filenames: + fullpath = os.path.join(dirpath, filename) + for p in paths: + # The file path is a child of one of the paths + # or is equal. Don't yield because either it is + # one of the specified paths, or is a file in a + # directory specified by a path + if is_subpath(p, fullpath): + break + else: + yield fullpath diff --git a/morphlib/fsutils_tests.py b/morphlib/fsutils_tests.py new file mode 100644 index 00000000..f49e6f89 --- /dev/null +++ b/morphlib/fsutils_tests.py @@ -0,0 +1,74 @@ +# Copyright (C) 2013 Codethink Limited +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; version 2 of the License. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + + +import os +import unittest + +import morphlib + + +def dummy_top_down_walker(root, treedict): + '''Something that imitates os.walk, but with a dict''' + + subdirs = [k for k in treedict if isinstance(treedict[k], dict)] + files = [k for k in treedict if not isinstance(treedict[k], dict)] + yield root, subdirs, files + for subdir in subdirs: + subwalker = dummy_top_down_walker(os.path.join(root, subdir), + treedict[subdir]) + for result in subwalker: + yield result + + +class InvertPathsTests(unittest.TestCase): + + def setUp(self): + self.flat_tree = {"foo": None, "bar": None, "baz": None} + self.nested_tree = { + "foo": { + "bar": None, + "baz": None, + }, + "fs": { + "btrfs": None, + "ext2": None, + "ext3": None, + "ext4": None, + "nfs": None, + }, + } + + def test_flat_lists_single_files(self): + walker = dummy_top_down_walker('.', self.flat_tree) + self.assertEqual(sorted(["./foo", "./bar", "./baz"]), + sorted(morphlib.fsutils.invert_paths(walker, []))) + + def test_flat_excludes_listed_files(self): + walker = dummy_top_down_walker('.', self.flat_tree) + self.assertTrue( + "./bar" not in morphlib.fsutils.invert_paths(walker, ["./bar"])) + + def test_nested_excludes_listed_files(self): + walker = dummy_top_down_walker('.', self.nested_tree) + excludes = ["./foo/bar", "./fs/nfs"] + found = frozenset(morphlib.fsutils.invert_paths(walker, excludes)) + self.assertTrue(all(path not in found for path in excludes)) + + def test_nested_excludes_whole_dir(self): + walker = dummy_top_down_walker('.', self.nested_tree) + found = frozenset(morphlib.fsutils.invert_paths(walker, ["./foo"])) + unexpected = ("./foo", "./foo/bar", "./foo/baz") + self.assertTrue(all(path not in found for path in unexpected)) |