diff options
author | Martin Panter <vadmium+py@gmail.com> | 2016-10-22 03:21:36 +0000 |
---|---|---|
committer | Martin Panter <vadmium+py@gmail.com> | 2016-10-22 03:21:36 +0000 |
commit | 73a0af3545f0e9b62ac149367c0cfbdbfe40afe4 (patch) | |
tree | 9db0bac0a71ca7296ff714745c0f4eee0b07f6a5 /Lib/test/test_os.py | |
parent | 6f5021addeb1d9ddd3610409d2461ec79870f9a5 (diff) | |
parent | 6fec8a2a83b0f93a80bb83e76dceeda1cf9d00dc (diff) | |
download | cpython-73a0af3545f0e9b62ac149367c0cfbdbfe40afe4.tar.gz |
Issue #28435: Merge urllib test fixes from 3.5 into 3.6
Diffstat (limited to 'Lib/test/test_os.py')
-rw-r--r-- | Lib/test/test_os.py | 692 |
1 files changed, 490 insertions, 202 deletions
diff --git a/Lib/test/test_os.py b/Lib/test/test_os.py index 97202109ee..1746ca4525 100644 --- a/Lib/test/test_os.py +++ b/Lib/test/test_os.py @@ -15,7 +15,6 @@ import locale import mmap import os import pickle -import platform import re import shutil import signal @@ -65,6 +64,8 @@ except ImportError: INT_MAX = PY_SSIZE_T_MAX = sys.maxsize from test.support.script_helper import assert_python_ok +from test.support import unix_shell + root_in_posix = False if hasattr(os, 'geteuid'): @@ -82,6 +83,32 @@ else: # Issue #14110: Some tests fail on FreeBSD if the user is in the wheel group. HAVE_WHEEL_GROUP = sys.platform.startswith('freebsd') and os.getgid() == 0 + +@contextlib.contextmanager +def ignore_deprecation_warnings(msg_regex, quiet=False): + with support.check_warnings((msg_regex, DeprecationWarning), quiet=quiet): + yield + + +def requires_os_func(name): + return unittest.skipUnless(hasattr(os, name), 'requires os.%s' % name) + + +class _PathLike(os.PathLike): + + def __init__(self, path=""): + self.path = path + + def __str__(self): + return str(self.path) + + def __fspath__(self): + if isinstance(self.path, BaseException): + raise self.path + else: + return self.path + + def create_file(filename, content=b'content'): with open(filename, "xb", 0) as fp: fp.write(content) @@ -145,9 +172,8 @@ class FileTests(unittest.TestCase): "needs INT_MAX < PY_SSIZE_T_MAX") @support.bigmemtest(size=INT_MAX + 10, memuse=1, dry_run=False) def test_large_read(self, size): - with open(support.TESTFN, "wb") as fp: - fp.write(b'test') self.addCleanup(support.unlink, support.TESTFN) + create_file(support.TESTFN, b'test') # Issue #21932: Make sure that os.read() does not raise an # OverflowError for size larger than INT_MAX @@ -204,11 +230,12 @@ class FileTests(unittest.TestCase): def test_replace(self): TESTFN2 = support.TESTFN + ".2" - with open(support.TESTFN, 'w') as f: - f.write("1") - with open(TESTFN2, 'w') as f: - f.write("2") - self.addCleanup(os.unlink, TESTFN2) + self.addCleanup(support.unlink, support.TESTFN) + self.addCleanup(support.unlink, TESTFN2) + + create_file(support.TESTFN, b"1") + create_file(TESTFN2, b"2") + os.replace(support.TESTFN, TESTFN2) self.assertRaises(FileNotFoundError, os.stat, support.TESTFN) with open(TESTFN2, 'r') as f: @@ -309,9 +336,7 @@ class StatAttributeTests(unittest.TestCase): fname = self.fname.encode(sys.getfilesystemencoding()) except UnicodeEncodeError: self.skipTest("cannot encode %a for the filesystem" % self.fname) - with warnings.catch_warnings(): - warnings.simplefilter("ignore", DeprecationWarning) - self.check_stat_attributes(fname) + self.check_stat_attributes(fname) def test_stat_result_pickle(self): result = os.stat(self.fname) @@ -462,19 +487,14 @@ class UtimeTests(unittest.TestCase): self.addCleanup(support.rmtree, self.dirname) os.mkdir(self.dirname) - with open(self.fname, 'wb') as fp: - fp.write(b"ABC") + create_file(self.fname) def restore_float_times(state): - with warnings.catch_warnings(): - warnings.simplefilter("ignore", DeprecationWarning) - + with ignore_deprecation_warnings('stat_float_times'): os.stat_float_times(state) # ensure that st_atime and st_mtime are float - with warnings.catch_warnings(): - warnings.simplefilter("ignore", DeprecationWarning) - + with ignore_deprecation_warnings('stat_float_times'): old_float_times = os.stat_float_times(-1) self.addCleanup(restore_float_times, old_float_times) @@ -566,7 +586,7 @@ class UtimeTests(unittest.TestCase): "fd support for utime required for this test.") def test_utime_fd(self): def set_time(filename, ns): - with open(filename, 'wb') as fp: + with open(filename, 'wb', 0) as fp: # use a file descriptor to test futimens(timespec) # or futimes(timeval) os.utime(fp.fileno(), ns=ns) @@ -678,18 +698,20 @@ class EnvironTests(mapping_tests.BasicTestMappingProtocol): return os.environ # Bug 1110478 - @unittest.skipUnless(os.path.exists('/bin/sh'), 'requires /bin/sh') + @unittest.skipUnless(unix_shell and os.path.exists(unix_shell), + 'requires a shell') def test_update2(self): os.environ.clear() os.environ.update(HELLO="World") - with os.popen("/bin/sh -c 'echo $HELLO'") as popen: + with os.popen("%s -c 'echo $HELLO'" % unix_shell) as popen: value = popen.read().strip() self.assertEqual(value, "World") - @unittest.skipUnless(os.path.exists('/bin/sh'), 'requires /bin/sh') + @unittest.skipUnless(unix_shell and os.path.exists(unix_shell), + 'requires a shell') def test_os_popen_iter(self): - with os.popen( - "/bin/sh -c 'echo \"line1\nline2\nline3\"'") as popen: + with os.popen("%s -c 'echo \"line1\nline2\nline3\"'" + % unix_shell) as popen: it = iter(popen) self.assertEqual(next(it), "line1\n") self.assertEqual(next(it), "line2\n") @@ -820,6 +842,7 @@ class WalkTests(unittest.TestCase): def setUp(self): join = os.path.join + self.addCleanup(support.rmtree, support.TESTFN) # Build: # TESTFN/ @@ -852,9 +875,8 @@ class WalkTests(unittest.TestCase): os.makedirs(t2_path) for path in tmp1_path, tmp2_path, tmp3_path, tmp4_path: - f = open(path, "w") - f.write("I'm " + path + " and proud of it. Blame test_os.\n") - f.close() + with open(path, "x") as f: + f.write("I'm " + path + " and proud of it. Blame test_os.\n") if support.can_symlink(): os.symlink(os.path.abspath(t2_path), self.link_path) @@ -879,10 +901,12 @@ class WalkTests(unittest.TestCase): self.assertEqual(all[2 + flipped], (self.sub11_path, [], [])) self.assertEqual(all[3 - 2 * flipped], self.sub2_tree) - def test_walk_prune(self): + def test_walk_prune(self, walk_path=None): + if walk_path is None: + walk_path = self.walk_path # Prune the search. all = [] - for root, dirs, files in self.walk(self.walk_path): + for root, dirs, files in self.walk(walk_path): all.append((root, dirs, files)) # Don't descend into SUB1. if 'SUB1' in dirs: @@ -891,16 +915,19 @@ class WalkTests(unittest.TestCase): self.assertEqual(len(all), 2) self.assertEqual(all[0], - (self.walk_path, ["SUB2"], ["tmp1"])) + (str(walk_path), ["SUB2"], ["tmp1"])) all[1][-1].sort() self.assertEqual(all[1], self.sub2_tree) + def test_file_like_path(self): + self.test_walk_prune(_PathLike(self.walk_path)) + def test_walk_bottom_up(self): # Walk bottom-up. all = list(self.walk(self.walk_path, topdown=False)) - self.assertEqual(len(all), 4) + self.assertEqual(len(all), 4, all) # We can't know which order SUB1 and SUB2 will appear in. # Not flipped: SUB11, SUB1, SUB2, TESTFN # flipped: SUB2, SUB11, SUB1, TESTFN @@ -930,22 +957,6 @@ class WalkTests(unittest.TestCase): else: self.fail("Didn't follow symlink with followlinks=True") - def tearDown(self): - # Tear everything down. This is a decent use for bottom-up on - # Windows, which doesn't have a recursive delete command. The - # (not so) subtlety is that rmdir will fail unless the dir's - # kids are removed first, so bottom up is essential. - for root, dirs, files in os.walk(support.TESTFN, topdown=False): - for name in files: - os.remove(os.path.join(root, name)) - for name in dirs: - dirname = os.path.join(root, name) - if not os.path.islink(dirname): - os.rmdir(dirname) - else: - os.remove(dirname) - os.rmdir(support.TESTFN) - def test_walk_bad_dir(self): # Walk top-down. errors = [] @@ -1028,27 +1039,11 @@ class FwalkTests(WalkTests): self.addCleanup(os.close, newfd) self.assertEqual(newfd, minfd) - def tearDown(self): - # cleanup - for root, dirs, files, rootfd in os.fwalk(support.TESTFN, topdown=False): - for name in files: - os.unlink(name, dir_fd=rootfd) - for name in dirs: - st = os.stat(name, dir_fd=rootfd, follow_symlinks=False) - if stat.S_ISDIR(st.st_mode): - os.rmdir(name, dir_fd=rootfd) - else: - os.unlink(name, dir_fd=rootfd) - os.rmdir(support.TESTFN) - class BytesWalkTests(WalkTests): """Tests for os.walk() with bytes.""" def setUp(self): super().setUp() self.stack = contextlib.ExitStack() - if os.name == 'nt': - self.stack.enter_context(warnings.catch_warnings()) - warnings.simplefilter("ignore", DeprecationWarning) def tearDown(self): self.stack.close() @@ -1225,8 +1220,7 @@ class RemoveDirsTests(unittest.TestCase): os.mkdir(dira) dirb = os.path.join(dira, 'dirb') os.mkdir(dirb) - with open(os.path.join(dira, 'file.txt'), 'w') as f: - f.write('text') + create_file(os.path.join(dira, 'file.txt')) os.removedirs(dirb) self.assertFalse(os.path.exists(dirb)) self.assertTrue(os.path.exists(dira)) @@ -1237,8 +1231,7 @@ class RemoveDirsTests(unittest.TestCase): os.mkdir(dira) dirb = os.path.join(dira, 'dirb') os.mkdir(dirb) - with open(os.path.join(dirb, 'file.txt'), 'w') as f: - f.write('text') + create_file(os.path.join(dirb, 'file.txt')) with self.assertRaises(OSError): os.removedirs(dirb) self.assertTrue(os.path.exists(dirb)) @@ -1248,7 +1241,7 @@ class RemoveDirsTests(unittest.TestCase): class DevNullTests(unittest.TestCase): def test_devnull(self): - with open(os.devnull, 'wb') as f: + with open(os.devnull, 'wb', 0) as f: f.write(b'hello') f.close() with open(os.devnull, 'rb') as f: @@ -1265,6 +1258,7 @@ class URandomTests(unittest.TestCase): def test_urandom_value(self): data1 = os.urandom(16) + self.assertIsInstance(data1, bytes) data2 = os.urandom(16) self.assertNotEqual(data1, data2) @@ -1285,6 +1279,49 @@ class URandomTests(unittest.TestCase): self.assertNotEqual(data1, data2) +@unittest.skipUnless(hasattr(os, 'getrandom'), 'need os.getrandom()') +class GetRandomTests(unittest.TestCase): + @classmethod + def setUpClass(cls): + try: + os.getrandom(1) + except OSError as exc: + if exc.errno == errno.ENOSYS: + # Python compiled on a more recent Linux version + # than the current Linux kernel + raise unittest.SkipTest("getrandom() syscall fails with ENOSYS") + else: + raise + + def test_getrandom_type(self): + data = os.getrandom(16) + self.assertIsInstance(data, bytes) + self.assertEqual(len(data), 16) + + def test_getrandom0(self): + empty = os.getrandom(0) + self.assertEqual(empty, b'') + + def test_getrandom_random(self): + self.assertTrue(hasattr(os, 'GRND_RANDOM')) + + # Don't test os.getrandom(1, os.GRND_RANDOM) to not consume the rare + # resource /dev/random + + def test_getrandom_nonblock(self): + # The call must not fail. Check also that the flag exists + try: + os.getrandom(1, os.GRND_NONBLOCK) + except BlockingIOError: + # System urandom is not initialized yet + pass + + def test_getrandom_value(self): + data1 = os.getrandom(16) + data2 = os.getrandom(16) + self.assertNotEqual(data1, data2) + + # os.urandom() doesn't use a file descriptor when it is implemented with the # getentropy() function, the getrandom() function or the getrandom() syscall OS_URANDOM_DONT_USE_FD = ( @@ -1335,9 +1372,9 @@ class URandomFDTests(unittest.TestCase): def test_urandom_fd_reopened(self): # Issue #21207: urandom() should detect its fd to /dev/urandom # changed to something else, and reopen it. - with open(support.TESTFN, 'wb') as f: - f.write(b"x" * 256) - self.addCleanup(os.unlink, support.TESTFN) + self.addCleanup(support.unlink, support.TESTFN) + create_file(support.TESTFN, b"x" * 256) + code = """if 1: import os import sys @@ -1399,6 +1436,7 @@ def _execvpe_mockup(defpath=None): os.execve = orig_execve os.defpath = orig_defpath + class ExecTests(unittest.TestCase): @unittest.skipIf(USING_LINUXTHREADS, "avoid triggering a linuxthreads bug: see issue #4970") @@ -1466,6 +1504,18 @@ class ExecTests(unittest.TestCase): @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests") class Win32ErrorTests(unittest.TestCase): + def setUp(self): + try: + os.stat(support.TESTFN) + except FileNotFoundError: + exists = False + except OSError as exc: + exists = True + self.fail("file %s must not exist; os.stat failed with %s" + % (support.TESTFN, exc)) + else: + self.fail("file %s must not exist" % support.TESTFN) + def test_rename(self): self.assertRaises(OSError, os.rename, support.TESTFN, support.TESTFN+".bak") @@ -1476,12 +1526,10 @@ class Win32ErrorTests(unittest.TestCase): self.assertRaises(OSError, os.chdir, support.TESTFN) def test_mkdir(self): - f = open(support.TESTFN, "w") - try: + self.addCleanup(support.unlink, support.TESTFN) + + with open(support.TESTFN, "x") as f: self.assertRaises(OSError, os.mkdir, support.TESTFN) - finally: - f.close() - os.unlink(support.TESTFN) def test_utime(self): self.assertRaises(OSError, os.utime, support.TESTFN, None) @@ -1489,6 +1537,7 @@ class Win32ErrorTests(unittest.TestCase): def test_chmod(self): self.assertRaises(OSError, os.chmod, support.TESTFN, 0) + class TestInvalidFD(unittest.TestCase): singles = ["fchdir", "dup", "fdopen", "fdatasync", "fstat", "fstatvfs", "fsync", "tcgetpgrp", "ttyname"] @@ -1600,12 +1649,9 @@ class LinkTests(unittest.TestCase): os.unlink(file) def _test_link(self, file1, file2): - with open(file1, "w") as f1: - f1.write("test") + create_file(file1) - with warnings.catch_warnings(): - warnings.simplefilter("ignore", DeprecationWarning) - os.link(file1, file2) + os.link(file1, file2) with open(file1, "r") as f1, open(file2, "r") as f2: self.assertTrue(os.path.sameopenfile(f1.fileno(), f2.fileno())) @@ -1896,6 +1942,7 @@ class Win32ListdirTests(unittest.TestCase): self.assertEqual( sorted(os.listdir(support.TESTFN)), self.created_paths) + # bytes self.assertEqual( sorted(os.listdir(os.fsencode(support.TESTFN))), @@ -1909,6 +1956,7 @@ class Win32ListdirTests(unittest.TestCase): self.assertEqual( sorted(os.listdir(path)), self.created_paths) + # bytes path = b'\\\\?\\' + os.fsencode(os.path.abspath(support.TESTFN)) self.assertEqual( @@ -1988,51 +2036,43 @@ class Win32SymlinkTests(unittest.TestCase): self.assertNotEqual(os.lstat(link), os.stat(link)) bytes_link = os.fsencode(link) - with warnings.catch_warnings(): - warnings.simplefilter("ignore", DeprecationWarning) - self.assertEqual(os.stat(bytes_link), os.stat(target)) - self.assertNotEqual(os.lstat(bytes_link), os.stat(bytes_link)) + self.assertEqual(os.stat(bytes_link), os.stat(target)) + self.assertNotEqual(os.lstat(bytes_link), os.stat(bytes_link)) def test_12084(self): level1 = os.path.abspath(support.TESTFN) level2 = os.path.join(level1, "level2") level3 = os.path.join(level2, "level3") - try: - os.mkdir(level1) - os.mkdir(level2) - os.mkdir(level3) + self.addCleanup(support.rmtree, level1) - file1 = os.path.abspath(os.path.join(level1, "file1")) + os.mkdir(level1) + os.mkdir(level2) + os.mkdir(level3) - with open(file1, "w") as f: - f.write("file1") + file1 = os.path.abspath(os.path.join(level1, "file1")) + create_file(file1) - orig_dir = os.getcwd() - try: - os.chdir(level2) - link = os.path.join(level2, "link") - os.symlink(os.path.relpath(file1), "link") - self.assertIn("link", os.listdir(os.getcwd())) - - # Check os.stat calls from the same dir as the link - self.assertEqual(os.stat(file1), os.stat("link")) - - # Check os.stat calls from a dir below the link - os.chdir(level1) - self.assertEqual(os.stat(file1), - os.stat(os.path.relpath(link))) - - # Check os.stat calls from a dir above the link - os.chdir(level3) - self.assertEqual(os.stat(file1), - os.stat(os.path.relpath(link))) - finally: - os.chdir(orig_dir) - except OSError as err: - self.fail(err) + orig_dir = os.getcwd() + try: + os.chdir(level2) + link = os.path.join(level2, "link") + os.symlink(os.path.relpath(file1), "link") + self.assertIn("link", os.listdir(os.getcwd())) + + # Check os.stat calls from the same dir as the link + self.assertEqual(os.stat(file1), os.stat("link")) + + # Check os.stat calls from a dir below the link + os.chdir(level1) + self.assertEqual(os.stat(file1), + os.stat(os.path.relpath(link))) + + # Check os.stat calls from a dir above the link + os.chdir(level3) + self.assertEqual(os.stat(file1), + os.stat(os.path.relpath(link))) finally: - os.remove(file1) - shutil.rmtree(level1) + os.chdir(orig_dir) @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests") @@ -2070,7 +2110,7 @@ class Win32JunctionTests(unittest.TestCase): class NonLocalSymlinkTests(unittest.TestCase): def setUp(self): - """ + r""" Create this structure: base @@ -2141,11 +2181,110 @@ class PidTests(unittest.TestCase): def test_waitpid(self): args = [sys.executable, '-c', 'pass'] - pid = os.spawnv(os.P_NOWAIT, args[0], args) + # Add an implicit test for PyUnicode_FSConverter(). + pid = os.spawnv(os.P_NOWAIT, _PathLike(args[0]), args) status = os.waitpid(pid, 0) self.assertEqual(status, (pid, 0)) +class SpawnTests(unittest.TestCase): + def create_args(self, *, with_env=False, use_bytes=False): + self.exitcode = 17 + + filename = support.TESTFN + self.addCleanup(support.unlink, filename) + + if not with_env: + code = 'import sys; sys.exit(%s)' % self.exitcode + else: + self.env = dict(os.environ) + # create an unique key + self.key = str(uuid.uuid4()) + self.env[self.key] = self.key + # read the variable from os.environ to check that it exists + code = ('import sys, os; magic = os.environ[%r]; sys.exit(%s)' + % (self.key, self.exitcode)) + + with open(filename, "w") as fp: + fp.write(code) + + args = [sys.executable, filename] + if use_bytes: + args = [os.fsencode(a) for a in args] + self.env = {os.fsencode(k): os.fsencode(v) + for k, v in self.env.items()} + + return args + + @requires_os_func('spawnl') + def test_spawnl(self): + args = self.create_args() + exitcode = os.spawnl(os.P_WAIT, args[0], *args) + self.assertEqual(exitcode, self.exitcode) + + @requires_os_func('spawnle') + def test_spawnle(self): + args = self.create_args(with_env=True) + exitcode = os.spawnle(os.P_WAIT, args[0], *args, self.env) + self.assertEqual(exitcode, self.exitcode) + + @requires_os_func('spawnlp') + def test_spawnlp(self): + args = self.create_args() + exitcode = os.spawnlp(os.P_WAIT, args[0], *args) + self.assertEqual(exitcode, self.exitcode) + + @requires_os_func('spawnlpe') + def test_spawnlpe(self): + args = self.create_args(with_env=True) + exitcode = os.spawnlpe(os.P_WAIT, args[0], *args, self.env) + self.assertEqual(exitcode, self.exitcode) + + @requires_os_func('spawnv') + def test_spawnv(self): + args = self.create_args() + exitcode = os.spawnv(os.P_WAIT, args[0], args) + self.assertEqual(exitcode, self.exitcode) + + @requires_os_func('spawnve') + def test_spawnve(self): + args = self.create_args(with_env=True) + exitcode = os.spawnve(os.P_WAIT, args[0], args, self.env) + self.assertEqual(exitcode, self.exitcode) + + @requires_os_func('spawnvp') + def test_spawnvp(self): + args = self.create_args() + exitcode = os.spawnvp(os.P_WAIT, args[0], args) + self.assertEqual(exitcode, self.exitcode) + + @requires_os_func('spawnvpe') + def test_spawnvpe(self): + args = self.create_args(with_env=True) + exitcode = os.spawnvpe(os.P_WAIT, args[0], args, self.env) + self.assertEqual(exitcode, self.exitcode) + + @requires_os_func('spawnv') + def test_nowait(self): + args = self.create_args() + pid = os.spawnv(os.P_NOWAIT, args[0], args) + result = os.waitpid(pid, 0) + self.assertEqual(result[0], pid) + status = result[1] + if hasattr(os, 'WIFEXITED'): + self.assertTrue(os.WIFEXITED(status)) + self.assertEqual(os.WEXITSTATUS(status), self.exitcode) + else: + self.assertEqual(status, self.exitcode << 8) + + @requires_os_func('spawnve') + def test_spawnve_bytes(self): + # Test bytes handling in parse_arglist and parse_envlist (#28114) + args = self.create_args(with_env=True, use_bytes=True) + exitcode = os.spawnve(os.P_WAIT, args[0], args, self.env) + self.assertEqual(exitcode, self.exitcode) + + # The introduction of this TestCase caused at least two different errors on # *nix buildbots. Temporarily skip this to let the buildbots move along. @unittest.skip("Skip due to platform/environment differences on *NIX buildbots") @@ -2168,8 +2307,8 @@ class ProgramPriorityTests(unittest.TestCase): try: new_prio = os.getpriority(os.PRIO_PROCESS, os.getpid()) if base >= 19 and new_prio <= 19: - raise unittest.SkipTest( - "unable to reliably test setpriority at current nice level of %s" % base) + raise unittest.SkipTest("unable to reliably test setpriority " + "at current nice level of %s" % base) else: self.assertEqual(new_prio, base + 1) finally: @@ -2279,8 +2418,7 @@ class TestSendfile(unittest.TestCase): @classmethod def setUpClass(cls): cls.key = support.threading_setup() - with open(support.TESTFN, "wb") as f: - f.write(cls.DATA) + create_file(support.TESTFN, cls.DATA) @classmethod def tearDownClass(cls): @@ -2430,10 +2568,11 @@ class TestSendfile(unittest.TestCase): def test_trailers(self): TESTFN2 = support.TESTFN + "2" file_data = b"abcdef" - with open(TESTFN2, 'wb') as f: - f.write(file_data) - with open(TESTFN2, 'rb')as f: - self.addCleanup(os.remove, TESTFN2) + + self.addCleanup(support.unlink, TESTFN2) + create_file(TESTFN2, file_data) + + with open(TESTFN2, 'rb') as f: os.sendfile(self.sockno, f.fileno(), 0, len(file_data), trailers=[b"1234"]) self.client.close() @@ -2456,35 +2595,37 @@ class TestSendfile(unittest.TestCase): def supports_extended_attributes(): if not hasattr(os, "setxattr"): return False + try: - with open(support.TESTFN, "wb") as fp: + with open(support.TESTFN, "xb", 0) as fp: try: os.setxattr(fp.fileno(), b"user.test", b"") except OSError: return False finally: support.unlink(support.TESTFN) - # Kernels < 2.6.39 don't respect setxattr flags. - kernel_version = platform.release() - m = re.match("2.6.(\d{1,2})", kernel_version) - return m is None or int(m.group(1)) >= 39 + + return True @unittest.skipUnless(supports_extended_attributes(), "no non-broken extended attribute support") +# Kernels < 2.6.39 don't respect setxattr flags. +@support.requires_linux_version(2, 6, 39) class ExtendedAttributeTests(unittest.TestCase): - def tearDown(self): - support.unlink(support.TESTFN) - def _check_xattrs_str(self, s, getxattr, setxattr, removexattr, listxattr, **kwargs): fn = support.TESTFN - open(fn, "wb").close() + self.addCleanup(support.unlink, fn) + create_file(fn) + with self.assertRaises(OSError) as cm: getxattr(fn, s("user.test"), **kwargs) self.assertEqual(cm.exception.errno, errno.ENODATA) + init_xattr = listxattr(fn) self.assertIsInstance(init_xattr, list) + setxattr(fn, s("user.test"), b"", **kwargs) xattr = set(init_xattr) xattr.add("user.test") @@ -2492,19 +2633,24 @@ class ExtendedAttributeTests(unittest.TestCase): self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"") setxattr(fn, s("user.test"), b"hello", os.XATTR_REPLACE, **kwargs) self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"hello") + with self.assertRaises(OSError) as cm: setxattr(fn, s("user.test"), b"bye", os.XATTR_CREATE, **kwargs) self.assertEqual(cm.exception.errno, errno.EEXIST) + with self.assertRaises(OSError) as cm: setxattr(fn, s("user.test2"), b"bye", os.XATTR_REPLACE, **kwargs) self.assertEqual(cm.exception.errno, errno.ENODATA) + setxattr(fn, s("user.test2"), b"foo", os.XATTR_CREATE, **kwargs) xattr.add("user.test2") self.assertEqual(set(listxattr(fn)), xattr) removexattr(fn, s("user.test"), **kwargs) + with self.assertRaises(OSError) as cm: getxattr(fn, s("user.test"), **kwargs) self.assertEqual(cm.exception.errno, errno.ENODATA) + xattr.remove("user.test") self.assertEqual(set(listxattr(fn)), xattr) self.assertEqual(getxattr(fn, s("user.test2"), **kwargs), b"foo") @@ -2517,11 +2663,11 @@ class ExtendedAttributeTests(unittest.TestCase): self.assertEqual(set(listxattr(fn)), set(init_xattr) | set(many)) def _check_xattrs(self, *args, **kwargs): - def make_bytes(s): - return bytes(s, "ascii") self._check_xattrs_str(str, *args, **kwargs) support.unlink(support.TESTFN) - self._check_xattrs_str(make_bytes, *args, **kwargs) + + self._check_xattrs_str(os.fsencode, *args, **kwargs) + support.unlink(support.TESTFN) def test_simple(self): self._check_xattrs(os.getxattr, os.setxattr, os.removexattr, @@ -2536,10 +2682,10 @@ class ExtendedAttributeTests(unittest.TestCase): with open(path, "rb") as fp: return os.getxattr(fp.fileno(), *args) def setxattr(path, *args): - with open(path, "wb") as fp: + with open(path, "wb", 0) as fp: os.setxattr(fp.fileno(), *args) def removexattr(path, *args): - with open(path, "wb") as fp: + with open(path, "wb", 0) as fp: os.removexattr(fp.fileno(), *args) def listxattr(path, *args): with open(path, "rb") as fp: @@ -2547,43 +2693,6 @@ class ExtendedAttributeTests(unittest.TestCase): self._check_xattrs(getxattr, setxattr, removexattr, listxattr) -@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests") -class Win32DeprecatedBytesAPI(unittest.TestCase): - def test_deprecated(self): - import nt - filename = os.fsencode(support.TESTFN) - with warnings.catch_warnings(): - warnings.simplefilter("error", DeprecationWarning) - for func, *args in ( - (nt._getfullpathname, filename), - (nt._isdir, filename), - (os.access, filename, os.R_OK), - (os.chdir, filename), - (os.chmod, filename, 0o777), - (os.getcwdb,), - (os.link, filename, filename), - (os.listdir, filename), - (os.lstat, filename), - (os.mkdir, filename), - (os.open, filename, os.O_RDONLY), - (os.rename, filename, filename), - (os.rmdir, filename), - (os.startfile, filename), - (os.stat, filename), - (os.unlink, filename), - (os.utime, filename), - ): - self.assertRaises(DeprecationWarning, func, *args) - - @support.skip_unless_symlink - def test_symlink(self): - filename = os.fsencode(support.TESTFN) - with warnings.catch_warnings(): - warnings.simplefilter("error", DeprecationWarning) - self.assertRaises(DeprecationWarning, - os.symlink, filename, filename) - - @unittest.skipUnless(hasattr(os, 'get_terminal_size'), "requires os.get_terminal_size") class TermsizeTests(unittest.TestCase): def test_does_not_crash(self): @@ -2646,6 +2755,7 @@ class OSErrorTests(unittest.TestCase): else: encoded = os.fsencode(support.TESTFN) self.bytes_filenames.append(encoded) + self.bytes_filenames.append(bytearray(encoded)) self.bytes_filenames.append(memoryview(encoded)) self.filenames = self.bytes_filenames + self.unicode_filenames @@ -2666,16 +2776,7 @@ class OSErrorTests(unittest.TestCase): (self.bytes_filenames, os.replace, b"dst"), (self.unicode_filenames, os.rename, "dst"), (self.unicode_filenames, os.replace, "dst"), - # Issue #16414: Don't test undecodable names with listdir() - # because of a Windows bug. - # - # With the ANSI code page 932, os.listdir(b'\xe7') return an - # empty list (instead of failing), whereas os.listdir(b'\xff') - # raises a FileNotFoundError. It looks like a Windows bug: - # b'\xe7' directory does not exist, FindFirstFileA(b'\xe7') - # fails with ERROR_FILE_NOT_FOUND (2), instead of - # ERROR_PATH_NOT_FOUND (3). - (self.unicode_filenames, os.listdir,), + (self.unicode_filenames, os.listdir, ), )) else: funcs.extend(( @@ -2716,12 +2817,24 @@ class OSErrorTests(unittest.TestCase): else: funcs.append((self.filenames, os.readlink,)) + for filenames, func, *func_args in funcs: for name in filenames: try: - func(name, *func_args) + if isinstance(name, (str, bytes)): + func(name, *func_args) + else: + with self.assertWarnsRegex(DeprecationWarning, 'should be'): + func(name, *func_args) except OSError as err: - self.assertIs(err.filename, name) + self.assertIs(err.filename, name, str(func)) + except RuntimeError as err: + if sys.platform != 'win32': + raise + + # issue27781: undecodable bytes currently raise RuntimeError + # by 3.6.0b4 this will become UnicodeDecodeError or nothing + self.assertIsInstance(err.__context__, UnicodeDecodeError) else: self.fail("No exception thrown by {}".format(func)) @@ -2819,6 +2932,63 @@ class FDInheritanceTests(unittest.TestCase): self.assertEqual(os.get_inheritable(slave_fd), False) +class PathTConverterTests(unittest.TestCase): + # tuples of (function name, allows fd arguments, additional arguments to + # function, cleanup function) + functions = [ + ('stat', True, (), None), + ('lstat', False, (), None), + ('access', False, (os.F_OK,), None), + ('chflags', False, (0,), None), + ('lchflags', False, (0,), None), + ('open', False, (0,), getattr(os, 'close', None)), + ] + + def test_path_t_converter(self): + str_filename = support.TESTFN + if os.name == 'nt': + bytes_fspath = bytes_filename = None + else: + bytes_filename = support.TESTFN.encode('ascii') + bytes_fspath = _PathLike(bytes_filename) + fd = os.open(_PathLike(str_filename), os.O_WRONLY|os.O_CREAT) + self.addCleanup(support.unlink, support.TESTFN) + self.addCleanup(os.close, fd) + + int_fspath = _PathLike(fd) + str_fspath = _PathLike(str_filename) + + for name, allow_fd, extra_args, cleanup_fn in self.functions: + with self.subTest(name=name): + try: + fn = getattr(os, name) + except AttributeError: + continue + + for path in (str_filename, bytes_filename, str_fspath, + bytes_fspath): + if path is None: + continue + with self.subTest(name=name, path=path): + result = fn(path, *extra_args) + if cleanup_fn is not None: + cleanup_fn(result) + + with self.assertRaisesRegex( + TypeError, 'should be string, bytes'): + fn(int_fspath, *extra_args) + + if allow_fd: + result = fn(fd, *extra_args) # should not fail + if cleanup_fn is not None: + cleanup_fn(result) + else: + with self.assertRaisesRegex( + TypeError, + 'os.PathLike'): + fn(fd, *extra_args) + + @unittest.skipUnless(hasattr(os, 'get_blocking'), 'needs os.get_blocking() and os.set_blocking()') class BlockingTests(unittest.TestCase): @@ -2842,15 +3012,18 @@ class ExportsTests(unittest.TestCase): class TestScandir(unittest.TestCase): + check_no_resource_warning = support.check_no_resource_warning + def setUp(self): self.path = os.path.realpath(support.TESTFN) + self.bytes_path = os.fsencode(self.path) self.addCleanup(support.rmtree, self.path) os.mkdir(self.path) def create_file(self, name="file.txt"): - filename = os.path.join(self.path, name) - with open(filename, "wb") as fp: - fp.write(b'python') + path = self.bytes_path if isinstance(name, bytes) else self.path + filename = os.path.join(path, name) + create_file(filename, b'python') return filename def get_entries(self, names): @@ -2873,6 +3046,7 @@ class TestScandir(unittest.TestCase): self.assertEqual(stat1, stat2) def check_entry(self, entry, name, is_dir, is_file, is_symlink): + self.assertIsInstance(entry, os.DirEntry) self.assertEqual(entry.name, name) self.assertEqual(entry.path, os.path.join(self.path, name)) self.assertEqual(entry.inode(), @@ -2938,15 +3112,16 @@ class TestScandir(unittest.TestCase): self.check_entry(entry, 'symlink_file.txt', False, True, True) def get_entry(self, name): - entries = list(os.scandir(self.path)) + path = self.bytes_path if isinstance(name, bytes) else self.path + entries = list(os.scandir(path)) self.assertEqual(len(entries), 1) entry = entries[0] self.assertEqual(entry.name, name) return entry - def create_file_entry(self): - filename = self.create_file() + def create_file_entry(self, name='file.txt'): + filename = self.create_file(name=name) return self.get_entry(os.path.basename(filename)) def test_current_directory(self): @@ -2967,6 +3142,18 @@ class TestScandir(unittest.TestCase): entry = self.create_file_entry() self.assertEqual(repr(entry), "<DirEntry 'file.txt'>") + def test_fspath_protocol(self): + entry = self.create_file_entry() + self.assertEqual(os.fspath(entry), os.path.join(self.path, 'file.txt')) + + def test_fspath_protocol_bytes(self): + bytes_filename = os.fsencode('bytesfile.txt') + bytes_entry = self.create_file_entry(name=bytes_filename) + fspath = os.fspath(bytes_entry) + self.assertIsInstance(fspath, bytes) + self.assertEqual(fspath, + os.path.join(os.fsencode(self.path),bytes_filename)) + def test_removed_dir(self): path = os.path.join(self.path, 'dir') @@ -3030,11 +3217,6 @@ class TestScandir(unittest.TestCase): entry.stat(follow_symlinks=False) def test_bytes(self): - if os.name == "nt": - # On Windows, os.scandir(bytes) must raise an exception - self.assertRaises(TypeError, os.scandir, b'.') - return - self.create_file("file.txt") path_bytes = os.fsencode(self.path) @@ -3064,6 +3246,112 @@ class TestScandir(unittest.TestCase): for obj in [1234, 1.234, {}, []]: self.assertRaises(TypeError, os.scandir, obj) + def test_close(self): + self.create_file("file.txt") + self.create_file("file2.txt") + iterator = os.scandir(self.path) + next(iterator) + iterator.close() + # multiple closes + iterator.close() + with self.check_no_resource_warning(): + del iterator + + def test_context_manager(self): + self.create_file("file.txt") + self.create_file("file2.txt") + with os.scandir(self.path) as iterator: + next(iterator) + with self.check_no_resource_warning(): + del iterator + + def test_context_manager_close(self): + self.create_file("file.txt") + self.create_file("file2.txt") + with os.scandir(self.path) as iterator: + next(iterator) + iterator.close() + + def test_context_manager_exception(self): + self.create_file("file.txt") + self.create_file("file2.txt") + with self.assertRaises(ZeroDivisionError): + with os.scandir(self.path) as iterator: + next(iterator) + 1/0 + with self.check_no_resource_warning(): + del iterator + + def test_resource_warning(self): + self.create_file("file.txt") + self.create_file("file2.txt") + iterator = os.scandir(self.path) + next(iterator) + with self.assertWarns(ResourceWarning): + del iterator + support.gc_collect() + # exhausted iterator + iterator = os.scandir(self.path) + list(iterator) + with self.check_no_resource_warning(): + del iterator + + +class TestPEP519(unittest.TestCase): + + # Abstracted so it can be overridden to test pure Python implementation + # if a C version is provided. + fspath = staticmethod(os.fspath) + + def test_return_bytes(self): + for b in b'hello', b'goodbye', b'some/path/and/file': + self.assertEqual(b, self.fspath(b)) + + def test_return_string(self): + for s in 'hello', 'goodbye', 'some/path/and/file': + self.assertEqual(s, self.fspath(s)) + + def test_fsencode_fsdecode(self): + for p in "path/like/object", b"path/like/object": + pathlike = _PathLike(p) + + self.assertEqual(p, self.fspath(pathlike)) + self.assertEqual(b"path/like/object", os.fsencode(pathlike)) + self.assertEqual("path/like/object", os.fsdecode(pathlike)) + + def test_pathlike(self): + self.assertEqual('#feelthegil', self.fspath(_PathLike('#feelthegil'))) + self.assertTrue(issubclass(_PathLike, os.PathLike)) + self.assertTrue(isinstance(_PathLike(), os.PathLike)) + + def test_garbage_in_exception_out(self): + vapor = type('blah', (), {}) + for o in int, type, os, vapor(): + self.assertRaises(TypeError, self.fspath, o) + + def test_argument_required(self): + self.assertRaises(TypeError, self.fspath) + + def test_bad_pathlike(self): + # __fspath__ returns a value other than str or bytes. + self.assertRaises(TypeError, self.fspath, _PathLike(42)) + # __fspath__ attribute that is not callable. + c = type('foo', (), {}) + c.__fspath__ = 1 + self.assertRaises(TypeError, self.fspath, c()) + # __fspath__ raises an exception. + self.assertRaises(ZeroDivisionError, self.fspath, + _PathLike(ZeroDivisionError())) + +# Only test if the C version is provided, otherwise TestPEP519 already tested +# the pure Python implementation. +if hasattr(os, "_fspath"): + class TestPEP519PurePython(TestPEP519): + + """Explicitly test the pure Python implementation of os.fspath().""" + + fspath = staticmethod(os._fspath) + if __name__ == "__main__": unittest.main() |