diff options
-rw-r--r-- | morphlib/util.py | 59 | ||||
-rw-r--r-- | morphlib/util_tests.py | 39 |
2 files changed, 92 insertions, 6 deletions
diff --git a/morphlib/util.py b/morphlib/util.py index a9c22217..ead0bafe 100644 --- a/morphlib/util.py +++ b/morphlib/util.py @@ -197,28 +197,28 @@ def copyfileobj(inputfp, outputfp, blocksize=1024*1024): # pragma: no cover while 1: inbuf = inputfp.read(blocksize) if not inbuf: break - if not buf: + if not buf: buf = inbuf else: buf += inbuf - + # Combine "short" reads if (len(buf) < blocksize): continue - + buflen = len(buf) if buf == "\x00" * buflen: outputfp.seek(buflen, os.SEEK_CUR) buf = None # flag sparse=True, that we seek()ed, but have not written yet # The filesize is wrong until we write - sparse = True + sparse = True else: outputfp.write(buf) buf = None # We wrote, so clear sparse. sparse = False - + if buf: outputfp.write(buf) elif sparse: @@ -244,7 +244,7 @@ def on_same_filesystem(path_a, path_b): # pragma: no cover def unify_space_requirements(tmp_path, tmp_min_size, cache_path, cache_min_size): # pragma: no cover """Adjust minimum sizes when paths share a disk. - + Given pairs of path and minimum size, return the minimum sizes such that when the paths are on the same disk, the sizes are added together. @@ -280,3 +280,50 @@ def check_disk_available(tmp_path, tmp_min_size, 'space or reduce the disk space required by the ' 'tempdir-min-space and cachedir-min-space ' 'configuration options.') + + + + +def find_root(dirname, subdir_name): + '''Find parent of a directory, at or above a given directory. + + The sought-after directory is indicated by the existence of a + subdirectory of the indicated name. For example, dirname might + be the current working directory of the process, and subdir_name + might be ".morph"; then the returned value would be the Morph + workspace root directory, which has a subdirectory called + ".morph". + + Return path to desired directory, or None if not found. + + ''' + + dirname = os.path.normpath(os.path.abspath(dirname)) + while not os.path.isdir(os.path.join(dirname, subdir_name)): + if dirname == '/': + return None + dirname = os.path.dirname(dirname) + return dirname + + +def find_leaf(dirname, subdir_name): + '''This is like find_root, except it looks towards leaves. + + It only looks in a subdirectory if it is the only subdirectory. + If there are no subdirectories, or more than one, fail. + (Subdirectories whose name starts with a dot are ignored for this.) + + ''' + + while True: + if os.path.exists(os.path.join(dirname, subdir_name)): + return dirname + pathnames = [ + os.path.join(dirname, x) + for x in os.listdir(dirname) + if not x.startswith('.')] + subdirs = [x for x in pathnames if os.path.isdir(x)] + if len(subdirs) != 1: + return None + dirname = subdirs[0] + diff --git a/morphlib/util_tests.py b/morphlib/util_tests.py index 89fe184e..eaff0821 100644 --- a/morphlib/util_tests.py +++ b/morphlib/util_tests.py @@ -14,6 +14,9 @@ # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +import os +import shutil +import tempfile import unittest import morphlib @@ -48,3 +51,39 @@ class MakeConcurrencyTests(unittest.TestCase): def test_returns_6_for_4_cores(self): self.assertEqual(morphlib.util.make_concurrency(cores=4), 6) + + +class FindParentOfTests(unittest.TestCase): + + def setUp(self): + self.tempdir = tempfile.mkdtemp() + os.makedirs(os.path.join(self.tempdir, 'a', 'b', 'c')) + self.a = os.path.join(self.tempdir, 'a') + self.b = os.path.join(self.tempdir, 'a', 'b') + self.c = os.path.join(self.tempdir, 'a', 'b', 'c') + + def tearDown(self): + shutil.rmtree(self.tempdir) + + def test_find_root_finds_starting_directory(self): + os.mkdir(os.path.join(self.a, '.magic')) + self.assertEqual(morphlib.util.find_root(self.a, '.magic'), self.a) + + def test_find_root_finds_ancestor(self): + os.mkdir(os.path.join(self.a, '.magic')) + self.assertEqual(morphlib.util.find_root(self.c, '.magic'), self.a) + + def test_find_root_returns_none_if_not_found(self): + self.assertEqual(morphlib.util.find_root(self.c, '.magic'), None) + + def test_find_leaf_finds_starting_directory(self): + os.mkdir(os.path.join(self.a, '.magic')) + self.assertEqual(morphlib.util.find_leaf(self.a, '.magic'), self.a) + + def test_find_leaf_finds_child(self): + os.mkdir(os.path.join(self.c, '.magic')) + self.assertEqual(morphlib.util.find_leaf(self.a, '.magic'), self.c) + + def test_find_leaf_returns_none_if_not_found(self): + self.assertEqual(morphlib.util.find_leaf(self.a, '.magic'), None) + |