diff options
author | Giampaolo Rodola <g.rodola@gmail.com> | 2020-04-24 14:56:57 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-04-24 23:56:57 +0200 |
commit | bc899c3f24581aa122e1b5b18ea6e694c2faddce (patch) | |
tree | 53d54196cc39e1b32f168508f95bde8b99fc0ad0 | |
parent | 6242f7411b882d525e5d267de4bcda1079934ea2 (diff) | |
download | psutil-bc899c3f24581aa122e1b5b18ea6e694c2faddce.tar.gz |
Get rid of TESTFN global variable (#1734)
-rw-r--r-- | psutil/tests/__init__.py | 130 | ||||
-rwxr-xr-x | psutil/tests/runner.py | 6 | ||||
-rwxr-xr-x | psutil/tests/test_connections.py | 117 | ||||
-rwxr-xr-x | psutil/tests/test_contracts.py | 5 | ||||
-rwxr-xr-x | psutil/tests/test_linux.py | 75 | ||||
-rwxr-xr-x | psutil/tests/test_memory_leaks.py | 6 | ||||
-rwxr-xr-x | psutil/tests/test_process.py | 71 | ||||
-rwxr-xr-x | psutil/tests/test_system.py | 17 | ||||
-rwxr-xr-x | psutil/tests/test_testutils.py | 116 | ||||
-rwxr-xr-x | psutil/tests/test_unicode.py | 112 |
10 files changed, 300 insertions, 355 deletions
diff --git a/psutil/tests/__init__.py b/psutil/tests/__init__.py index d7dd42b2..187552d5 100644 --- a/psutil/tests/__init__.py +++ b/psutil/tests/__init__.py @@ -73,9 +73,9 @@ else: __all__ = [ # constants 'APPVEYOR', 'DEVNULL', 'GLOBAL_TIMEOUT', 'MEMORY_TOLERANCE', 'NO_RETRIES', - 'PYPY', 'PYTHON_EXE', 'ROOT_DIR', 'SCRIPTS_DIR', 'TESTFILE_PREFIX', - 'TESTFN', 'TESTFN_UNICODE', 'TOX', 'TRAVIS', 'CIRRUS', 'CI_TESTING', - 'VALID_PROC_STATUSES', + 'PYPY', 'PYTHON_EXE', 'ROOT_DIR', 'SCRIPTS_DIR', 'TESTFN_PREFIX', + 'UNICODE_SUFFIX', 'INVALID_UNICODE_SUFFIX', 'TOX', 'TRAVIS', 'CIRRUS', + 'CI_TESTING', 'VALID_PROC_STATUSES', "HAS_CPU_AFFINITY", "HAS_CPU_FREQ", "HAS_ENVIRON", "HAS_PROC_IO_COUNTERS", "HAS_IONICE", "HAS_MEMORY_MAPS", "HAS_PROC_CPU_NUM", "HAS_RLIMIT", "HAS_SENSORS_BATTERY", "HAS_BATTERY", "HAS_SENSORS_FANS", @@ -92,15 +92,15 @@ __all__ = [ 'install_pip', 'install_test_deps', # fs utils 'chdir', 'safe_rmpath', 'create_exe', 'decode_path', 'encode_path', - 'unique_filename', + 'get_testfn', # os 'get_winver', 'get_kernel_version', # sync primitives 'call_until', 'wait_for_pid', 'wait_for_file', # network 'check_net_address', - 'get_free_port', 'unix_socket_path', 'bind_socket', 'bind_unix_socket', - 'tcp_socketpair', 'unix_socketpair', 'create_sockets', + 'get_free_port', 'bind_socket', 'bind_unix_socket', 'tcp_socketpair', + 'unix_socketpair', 'create_sockets', # compat 'reload_module', 'import_module_by_path', # others @@ -136,21 +136,21 @@ if TRAVIS or APPVEYOR: NO_RETRIES *= 3 GLOBAL_TIMEOUT *= 3 -# --- files +# --- file names -TESTFILE_PREFIX = '$testfn' +# Disambiguate TESTFN for parallel testing. if os.name == 'java': # Jython disallows @ in module names - TESTFILE_PREFIX = '$psutil-test-' + TESTFN_PREFIX = '$psutil-%s-' % os.getpid() else: - TESTFILE_PREFIX = '@psutil-test-' -TESTFN = os.path.join(os.path.realpath(os.getcwd()), TESTFILE_PREFIX) -# Disambiguate TESTFN for parallel testing, while letting it remain a valid -# module name. -TESTFN = TESTFN + str(os.getpid()) - -_TESTFN = TESTFN + '-internal' -TESTFN_UNICODE = TESTFN + u("-ƒőő") + TESTFN_PREFIX = '@psutil-%s-' % os.getpid() +UNICODE_SUFFIX = u("-ƒőő") +# An invalid unicode string. +if PY3: + INVALID_UNICODE_SUFFIX = b"f\xc0\x80".decode('utf8', 'surrogateescape') +else: + INVALID_UNICODE_SUFFIX = "f\xc0\x80" + ASCII_FS = sys.getfilesystemencoding().lower() in ('ascii', 'us-ascii') # --- paths @@ -222,20 +222,15 @@ _pids_started = set() _testfiles_created = set() +# --- exit funs (first is executed last) + +atexit.register(DEVNULL.close) + + @atexit.register def cleanup_test_files(): - DEVNULL.close() - for name in os.listdir(u('.')): - if isinstance(name, unicode): - prefix = u(TESTFILE_PREFIX) - else: - prefix = TESTFILE_PREFIX - if name.startswith(prefix): - try: - safe_rmpath(name) - except Exception: - traceback.print_exc() - for path in _testfiles_created: + while _testfiles_created: + path = _testfiles_created.pop() try: safe_rmpath(path) except Exception: @@ -333,14 +328,15 @@ def get_test_subprocess(cmd=None, **kwds): CREATE_NO_WINDOW = 0x8000000 kwds.setdefault("creationflags", CREATE_NO_WINDOW) if cmd is None: - safe_rmpath(_TESTFN) + testfn = get_testfn() + safe_rmpath(testfn) pyline = "from time import sleep;" \ "open(r'%s', 'w').close();" \ - "sleep(60);" % _TESTFN + "sleep(60);" % testfn cmd = [PYTHON_EXE, "-c", pyline] sproc = subprocess.Popen(cmd, **kwds) _subprocesses_started.add(sproc) - wait_for_file(_TESTFN, delete=True, empty=True) + wait_for_file(testfn, delete=True, empty=True) else: sproc = subprocess.Popen(cmd, **kwds) _subprocesses_started.add(sproc) @@ -356,7 +352,8 @@ def create_proc_children_pair(): The 2 processes are fully initialized and will live for 60 secs and are registered for cleanup on reap_children(). """ - _TESTFN2 = os.path.basename(_TESTFN) + '2' # need to be relative + # must be relative on Windows + testfn = os.path.basename(get_testfn(dir=os.getcwd())) s = textwrap.dedent("""\ import subprocess, os, sys, time s = "import os, time;" @@ -366,7 +363,7 @@ def create_proc_children_pair(): s += "time.sleep(60);" p = subprocess.Popen([r'%s', '-c', s]) p.wait() - """ % (_TESTFN2, PYTHON_EXE)) + """ % (testfn, PYTHON_EXE)) # On Windows if we create a subprocess with CREATE_NO_WINDOW flag # set (which is the default) a "conhost.exe" extra process will be # spawned as a child. We don't want that. @@ -375,8 +372,8 @@ def create_proc_children_pair(): else: subp = pyrun(s) child1 = psutil.Process(subp.pid) - data = wait_for_file(_TESTFN2, delete=False, empty=False) - safe_rmpath(_TESTFN2) + data = wait_for_file(testfn, delete=False, empty=False) + safe_rmpath(testfn) child2_pid = int(data) _pids_started.add(child2_pid) child2 = psutil.Process(child2_pid) @@ -386,7 +383,7 @@ def create_proc_children_pair(): def create_zombie_proc(): """Create a zombie process and return its PID.""" assert psutil.POSIX - unix_file = tempfile.mktemp(prefix=TESTFILE_PREFIX) if MACOS else TESTFN + unix_file = get_testfn() src = textwrap.dedent("""\ import os, sys, time, socket, contextlib child_pid = os.fork() @@ -427,9 +424,7 @@ def pyrun(src, **kwds): """ kwds.setdefault("stdout", None) kwds.setdefault("stderr", None) - with tempfile.NamedTemporaryFile( - prefix=TESTFILE_PREFIX, mode="wt", delete=False) as f: - _testfiles_created.add(f.name) + with open(get_testfn(), 'wt') as f: f.write(src) f.flush() subp = get_test_subprocess([PYTHON_EXE, f.name], **kwds) @@ -772,8 +767,7 @@ def create_exe(outpath, c_code=None): } """) assert isinstance(c_code, str), c_code - with tempfile.NamedTemporaryFile( - suffix='.c', delete=False, mode='wt') as f: + with open(get_testfn(suffix='.c'), 'wt') as f: f.write(c_code) try: subprocess.check_call(["gcc", f.name, "-o", outpath]) @@ -787,8 +781,19 @@ def create_exe(outpath, c_code=None): os.chmod(outpath, st.st_mode | stat.S_IEXEC) -def unique_filename(prefix=TESTFILE_PREFIX, suffix=""): - return tempfile.mktemp(prefix=prefix, suffix=suffix) +def get_testfn(suffix="", dir=None): + """Return an absolute pathname of a file or dir that did not + exist at the time this call is made. Also schedule it for safe + deletion at interpreter exit. It's technically racy but probably + not really due to the time variant. + """ + timer = getattr(time, 'perf_counter', time.time) + while True: + prefix = "%s%.9f-" % (TESTFN_PREFIX, timer()) + name = tempfile.mktemp(prefix=prefix, suffix=suffix, dir=dir) + if not os.path.exists(name): # also include dirs + _testfiles_created.add(name) + return os.path.realpath(name) # needed for OSX # =================================================================== @@ -813,8 +818,9 @@ class TestCase(unittest.TestCase): assertRaisesRegex = unittest.TestCase.assertRaisesRegexp -# override default unittest.TestCase -unittest.TestCase = TestCase +# monkey patch default unittest.TestCase +if 'PSUTIL_TESTING' in os.environ: + unittest.TestCase = TestCase @unittest.skipIf(PYPY, "unreliable on PYPY") @@ -988,22 +994,6 @@ def get_free_port(host='127.0.0.1'): return sock.getsockname()[1] -@contextlib.contextmanager -def unix_socket_path(suffix=""): - """A context manager which returns a non-existent file name - and tries to delete it on exit. - """ - assert psutil.POSIX - path = unique_filename(suffix=suffix) - try: - yield path - finally: - try: - os.unlink(path) - except OSError: - pass - - def bind_socket(family=AF_INET, type=SOCK_STREAM, addr=None): """Binds a generic socket.""" if addr is None and family in (AF_INET, AF_INET6): @@ -1094,8 +1084,8 @@ def create_sockets(): socks.append(bind_socket(socket.AF_INET6, socket.SOCK_STREAM)) socks.append(bind_socket(socket.AF_INET6, socket.SOCK_DGRAM)) if POSIX and HAS_CONNECTIONS_UNIX: - fname1 = unix_socket_path().__enter__() - fname2 = unix_socket_path().__enter__() + fname1 = get_testfn() + fname2 = get_testfn() s1, s2 = unix_socketpair(fname1) s3 = bind_unix_socket(fname2, type=socket.SOCK_DGRAM) # self.addCleanup(safe_rmpath, fname1) @@ -1106,10 +1096,6 @@ def create_sockets(): finally: for s in socks: s.close() - if fname1 is not None: - safe_rmpath(fname1) - if fname2 is not None: - safe_rmpath(fname2) def check_net_address(addr, family): @@ -1196,14 +1182,14 @@ def is_namedtuple(x): if POSIX: @contextlib.contextmanager - def copyload_shared_lib(dst_prefix=TESTFILE_PREFIX): + def copyload_shared_lib(suffix=""): """Ctx manager which picks up a random shared CO lib used by this process, copies it in another location and loads it in memory via ctypes. Return the new absolutized path. """ exe = 'pypy' if PYPY else 'python' ext = ".so" - dst = tempfile.mktemp(prefix=dst_prefix, suffix=ext) + dst = get_testfn(suffix=suffix + ext) libs = [x.path for x in psutil.Process().memory_maps() if os.path.splitext(x.path)[1] == ext and exe in x.path.lower()] @@ -1216,7 +1202,7 @@ if POSIX: safe_rmpath(dst) else: @contextlib.contextmanager - def copyload_shared_lib(dst_prefix=TESTFILE_PREFIX): + def copyload_shared_lib(suffix=""): """Ctx manager which picks up a random shared DLL lib used by this process, copies it in another location and loads it in memory via ctypes. @@ -1225,7 +1211,7 @@ else: from ctypes import wintypes from ctypes import WinError ext = ".dll" - dst = tempfile.mktemp(prefix=dst_prefix, suffix=ext) + dst = get_testfn(suffix=suffix + ext) libs = [x.path for x in psutil.Process().memory_maps() if x.path.lower().endswith(ext) and 'python' in os.path.basename(x.path).lower() and diff --git a/psutil/tests/runner.py b/psutil/tests/runner.py index 2e9264bd..7ffbc1cf 100755 --- a/psutil/tests/runner.py +++ b/psutil/tests/runner.py @@ -28,6 +28,7 @@ import psutil from psutil._common import hilite from psutil._common import print_color from psutil._common import term_supports_colors +from psutil.tests import APPVEYOR from psutil.tests import safe_rmpath from psutil.tests import TOX @@ -134,7 +135,10 @@ def save_failed_tests(result): def run(name=None, last_failed=False): setup_tests() - runner = ColouredRunner(verbosity=VERBOSITY) + if APPVEYOR: + runner = TextTestRunner(verbosity=VERBOSITY) + else: + runner = ColouredRunner(verbosity=VERBOSITY) suite = get_suite_from_failed() if last_failed else get_suite(name) try: result = runner.run(suite) diff --git a/psutil/tests/test_connections.py b/psutil/tests/test_connections.py index 972ac9d5..5bd71dc8 100755 --- a/psutil/tests/test_connections.py +++ b/psutil/tests/test_connections.py @@ -10,6 +10,7 @@ import contextlib import errno import os import socket +import string import textwrap from contextlib import closing from socket import AF_INET @@ -36,17 +37,15 @@ from psutil.tests import CIRRUS from psutil.tests import create_sockets from psutil.tests import enum from psutil.tests import get_free_port +from psutil.tests import get_testfn from psutil.tests import HAS_CONNECTIONS_UNIX from psutil.tests import pyrun from psutil.tests import reap_children -from psutil.tests import safe_rmpath from psutil.tests import skip_on_access_denied from psutil.tests import SKIP_SYSCONS from psutil.tests import tcp_socketpair -from psutil.tests import TESTFN from psutil.tests import TRAVIS from psutil.tests import unittest -from psutil.tests import unix_socket_path from psutil.tests import unix_socketpair from psutil.tests import wait_for_file @@ -58,14 +57,12 @@ SOCK_SEQPACKET = getattr(socket, "SOCK_SEQPACKET", object()) class Base(object): def setUp(self): - safe_rmpath(TESTFN) if not (NETBSD or FREEBSD): # process opens a UNIX socket to /var/log/run. cons = thisproc.connections(kind='all') assert not cons, cons def tearDown(self): - safe_rmpath(TESTFN) reap_children() if not (FREEBSD or NETBSD): # Make sure we closed all resources. @@ -269,19 +266,17 @@ class TestUnconnectedSockets(Base, unittest.TestCase): @unittest.skipIf(not POSIX, 'POSIX only') def test_unix_tcp(self): - with unix_socket_path() as name: - with closing(bind_unix_socket(name, type=SOCK_STREAM)) as sock: - conn = self.check_socket(sock) - assert not conn.raddr - self.assertEqual(conn.status, psutil.CONN_NONE) + with closing(bind_unix_socket(get_testfn(), type=SOCK_STREAM)) as sock: + conn = self.check_socket(sock) + assert not conn.raddr + self.assertEqual(conn.status, psutil.CONN_NONE) @unittest.skipIf(not POSIX, 'POSIX only') def test_unix_udp(self): - with unix_socket_path() as name: - with closing(bind_unix_socket(name, type=SOCK_STREAM)) as sock: - conn = self.check_socket(sock) - assert not conn.raddr - self.assertEqual(conn.status, psutil.CONN_NONE) + with closing(bind_unix_socket(get_testfn(), type=SOCK_STREAM)) as sock: + conn = self.check_socket(sock) + assert not conn.raddr + self.assertEqual(conn.status, psutil.CONN_NONE) class TestConnectedSocket(Base, unittest.TestCase): @@ -313,39 +308,39 @@ class TestConnectedSocket(Base, unittest.TestCase): @unittest.skipIf(not POSIX, 'POSIX only') def test_unix(self): - with unix_socket_path() as name: - server, client = unix_socketpair(name) - try: - cons = thisproc.connections(kind='unix') - assert not (cons[0].laddr and cons[0].raddr) - assert not (cons[1].laddr and cons[1].raddr) - if NETBSD or FREEBSD: - # On NetBSD creating a UNIX socket will cause - # a UNIX connection to /var/run/log. - cons = [c for c in cons if c.raddr != '/var/run/log'] - if CIRRUS: - cons = [c for c in cons if c.fd in - (server.fileno(), client.fileno())] - self.assertEqual(len(cons), 2, msg=cons) - if LINUX or FREEBSD or SUNOS: - # remote path is never set - self.assertEqual(cons[0].raddr, "") - self.assertEqual(cons[1].raddr, "") - # one local address should though - self.assertEqual(name, cons[0].laddr or cons[1].laddr) - elif OPENBSD: - # No addresses whatsoever here. - for addr in (cons[0].laddr, cons[0].raddr, - cons[1].laddr, cons[1].raddr): - self.assertEqual(addr, "") - else: - # On other systems either the laddr or raddr - # of both peers are set. - self.assertEqual(cons[0].laddr or cons[1].laddr, name) - self.assertEqual(cons[0].raddr or cons[1].raddr, name) - finally: - server.close() - client.close() + testfn = get_testfn() + server, client = unix_socketpair(testfn) + try: + cons = thisproc.connections(kind='unix') + assert not (cons[0].laddr and cons[0].raddr) + assert not (cons[1].laddr and cons[1].raddr) + if NETBSD or FREEBSD: + # On NetBSD creating a UNIX socket will cause + # a UNIX connection to /var/run/log. + cons = [c for c in cons if c.raddr != '/var/run/log'] + if CIRRUS: + cons = [c for c in cons if c.fd in + (server.fileno(), client.fileno())] + self.assertEqual(len(cons), 2, msg=cons) + if LINUX or FREEBSD or SUNOS: + # remote path is never set + self.assertEqual(cons[0].raddr, "") + self.assertEqual(cons[1].raddr, "") + # one local address should though + self.assertEqual(testfn, cons[0].laddr or cons[1].laddr) + elif OPENBSD: + # No addresses whatsoever here. + for addr in (cons[0].laddr, cons[0].raddr, + cons[1].laddr, cons[1].raddr): + self.assertEqual(addr, "") + else: + # On other systems either the laddr or raddr + # of both peers are set. + self.assertEqual(cons[0].laddr or cons[1].laddr, testfn) + self.assertEqual(cons[0].raddr or cons[1].raddr, testfn) + finally: + server.close() + client.close() class TestFilters(Base, unittest.TestCase): @@ -435,15 +430,15 @@ class TestFilters(Base, unittest.TestCase): time.sleep(60) """) - from string import Template - testfile = os.path.basename(TESTFN) - tcp4_template = Template(tcp_template).substitute( + # must be relative on Windows + testfile = os.path.basename(get_testfn(dir=os.getcwd())) + tcp4_template = string.Template(tcp_template).substitute( family=int(AF_INET), addr="127.0.0.1", testfn=testfile) - udp4_template = Template(udp_template).substitute( + udp4_template = string.Template(udp_template).substitute( family=int(AF_INET), addr="127.0.0.1", testfn=testfile) - tcp6_template = Template(tcp_template).substitute( + tcp6_template = string.Template(tcp_template).substitute( family=int(AF_INET6), addr="::1", testfn=testfile) - udp6_template = Template(udp_template).substitute( + udp6_template = string.Template(udp_template).substitute( family=int(AF_INET6), addr="::1", testfn=testfile) # launch various subprocess instantiating a socket of various @@ -583,23 +578,25 @@ class TestSystemWideConnections(Base, unittest.TestCase): expected = len(socks) pids = [] times = 10 + fnames = [] for i in range(times): - fname = os.path.realpath(TESTFN) + str(i) + fname = get_testfn() + fnames.append(fname) src = textwrap.dedent("""\ import time, os - from psutil.tests import create_sockets + from psutil.tests import create_sockets, cleanup_test_files with create_sockets(): with open(r'%s', 'w') as f: - f.write(str(os.getpid())) + f.write("hello") + # 2 UNIX test socket files are created + cleanup_test_files() time.sleep(60) """ % fname) sproc = pyrun(src) pids.append(sproc.pid) - self.addCleanup(safe_rmpath, fname) # sync - for i in range(times): - fname = TESTFN + str(i) + for fname in fnames: wait_for_file(fname) syscons = [x for x in psutil.net_connections(kind='all') if x.pid diff --git a/psutil/tests/test_contracts.py b/psutil/tests/test_contracts.py index 51ac3609..74c429be 100755 --- a/psutil/tests/test_contracts.py +++ b/psutil/tests/test_contracts.py @@ -37,9 +37,7 @@ from psutil.tests import HAS_RLIMIT from psutil.tests import HAS_SENSORS_FANS from psutil.tests import HAS_SENSORS_TEMPERATURES from psutil.tests import is_namedtuple -from psutil.tests import safe_rmpath from psutil.tests import SKIP_SYSCONS -from psutil.tests import TESTFN from psutil.tests import unittest from psutil.tests import VALID_PROC_STATUSES from psutil.tests import warn @@ -194,9 +192,6 @@ class TestSystemAPITypes(unittest.TestCase): def setUpClass(cls): cls.proc = psutil.Process() - def tearDown(self): - safe_rmpath(TESTFN) - def assert_ntuple_of_nums(self, nt, type_=float, gezero=True): assert is_namedtuple(nt) for n in nt: diff --git a/psutil/tests/test_linux.py b/psutil/tests/test_linux.py index e51f8bd5..d529ae7c 100755 --- a/psutil/tests/test_linux.py +++ b/psutil/tests/test_linux.py @@ -17,7 +17,6 @@ import re import shutil import socket import struct -import tempfile import textwrap import time import warnings @@ -29,6 +28,7 @@ from psutil._compat import FileNotFoundError from psutil._compat import PY3 from psutil._compat import u from psutil.tests import call_until +from psutil.tests import get_testfn from psutil.tests import HAS_BATTERY from psutil.tests import HAS_CPU_FREQ from psutil.tests import HAS_GETLOADAVG @@ -43,7 +43,6 @@ from psutil.tests import retry_on_failure from psutil.tests import safe_rmpath from psutil.tests import sh from psutil.tests import skip_on_not_implemented -from psutil.tests import TESTFN from psutil.tests import ThreadTask from psutil.tests import TRAVIS from psutil.tests import unittest @@ -1218,7 +1217,8 @@ class TestMisc(unittest.TestCase): self.assertEqual(int(vmstat_value), int(psutil_value)) def test_no_procfs_on_import(self): - my_procfs = tempfile.mkdtemp() + my_procfs = get_testfn() + os.mkdir(my_procfs) with open(os.path.join(my_procfs, 'stat'), 'w') as f: f.write('cpu 0 0 0 0 0 0 0 0 0 0\n') @@ -1346,7 +1346,8 @@ class TestMisc(unittest.TestCase): assert m.called def test_procfs_path(self): - tdir = tempfile.mkdtemp() + tdir = get_testfn() + os.mkdir(tdir) try: psutil.PROCFS_PATH = tdir self.assertRaises(IOError, psutil.virtual_memory) @@ -1362,7 +1363,6 @@ class TestMisc(unittest.TestCase): self.assertRaises(psutil.NoSuchProcess, psutil.Process) finally: psutil.PROCFS_PATH = "/proc" - os.rmdir(tdir) def test_issue_687(self): # In case of thread ID: @@ -1643,20 +1643,16 @@ class TestSensorsFans(unittest.TestCase): @unittest.skipIf(not LINUX, "LINUX only") class TestProcess(unittest.TestCase): - def setUp(self): - safe_rmpath(TESTFN) - - tearDown = setUp - def test_memory_full_info(self): + testfn = get_testfn() src = textwrap.dedent(""" import time with open("%s", "w") as f: time.sleep(10) - """ % TESTFN) + """ % testfn) sproc = pyrun(src) self.addCleanup(reap_children) - call_until(lambda: os.listdir('.'), "'%s' not in ret" % TESTFN) + call_until(lambda: os.listdir('.'), "'%s' not in ret" % testfn) p = psutil.Process(sproc.pid) time.sleep(.1) mem = p.memory_full_info() @@ -1706,46 +1702,47 @@ class TestProcess(unittest.TestCase): # On PYPY file descriptors are not closed fast enough. @unittest.skipIf(PYPY, "unreliable on PYPY") def test_open_files_mode(self): - def get_test_file(): + def get_test_file(fname): p = psutil.Process() giveup_at = time.time() + 2 while True: for file in p.open_files(): - if file.path == os.path.abspath(TESTFN): + if file.path == os.path.abspath(fname): return file elif time.time() > giveup_at: break raise RuntimeError("timeout looking for test file") # - with open(TESTFN, "w"): - self.assertEqual(get_test_file().mode, "w") - with open(TESTFN, "r"): - self.assertEqual(get_test_file().mode, "r") - with open(TESTFN, "a"): - self.assertEqual(get_test_file().mode, "a") + testfn = get_testfn() + with open(testfn, "w"): + self.assertEqual(get_test_file(testfn).mode, "w") + with open(testfn, "r"): + self.assertEqual(get_test_file(testfn).mode, "r") + with open(testfn, "a"): + self.assertEqual(get_test_file(testfn).mode, "a") # - with open(TESTFN, "r+"): - self.assertEqual(get_test_file().mode, "r+") - with open(TESTFN, "w+"): - self.assertEqual(get_test_file().mode, "r+") - with open(TESTFN, "a+"): - self.assertEqual(get_test_file().mode, "a+") + with open(testfn, "r+"): + self.assertEqual(get_test_file(testfn).mode, "r+") + with open(testfn, "w+"): + self.assertEqual(get_test_file(testfn).mode, "r+") + with open(testfn, "a+"): + self.assertEqual(get_test_file(testfn).mode, "a+") # note: "x" bit is not supported if PY3: - safe_rmpath(TESTFN) - with open(TESTFN, "x"): - self.assertEqual(get_test_file().mode, "w") - safe_rmpath(TESTFN) - with open(TESTFN, "x+"): - self.assertEqual(get_test_file().mode, "r+") + safe_rmpath(testfn) + with open(testfn, "x"): + self.assertEqual(get_test_file(testfn).mode, "w") + safe_rmpath(testfn) + with open(testfn, "x+"): + self.assertEqual(get_test_file(testfn).mode, "r+") def test_open_files_file_gone(self): # simulates a file which gets deleted during open_files() # execution p = psutil.Process() files = p.open_files() - with tempfile.NamedTemporaryFile(): + with open(get_testfn(), 'w'): # give the kernel some time to see the new file call_until(p.open_files, "len(ret) != %i" % len(files)) with mock.patch('psutil._pslinux.os.readlink', @@ -1766,7 +1763,7 @@ class TestProcess(unittest.TestCase): # https://travis-ci.org/giampaolo/psutil/jobs/225694530 p = psutil.Process() files = p.open_files() - with tempfile.NamedTemporaryFile(): + with open(get_testfn(), 'w'): # give the kernel some time to see the new file call_until(p.open_files, "len(ret) != %i" % len(files)) patch_point = 'builtins.open' if PY3 else '__builtin__.open' @@ -2104,13 +2101,13 @@ class TestUtils(unittest.TestCase): assert m.called def test_cat(self): - fname = os.path.abspath(TESTFN) - with open(fname, "wt") as f: + testfn = get_testfn() + with open(testfn, "wt") as f: f.write("foo ") - self.assertEqual(psutil._psplatform.cat(TESTFN, binary=False), "foo") - self.assertEqual(psutil._psplatform.cat(TESTFN, binary=True), b"foo") + self.assertEqual(psutil._psplatform.cat(testfn, binary=False), "foo") + self.assertEqual(psutil._psplatform.cat(testfn, binary=True), b"foo") self.assertEqual( - psutil._psplatform.cat(TESTFN + '??', fallback="bar"), "bar") + psutil._psplatform.cat(testfn + '??', fallback="bar"), "bar") if __name__ == '__main__': diff --git a/psutil/tests/test_memory_leaks.py b/psutil/tests/test_memory_leaks.py index 3d37f35e..5cfec577 100755 --- a/psutil/tests/test_memory_leaks.py +++ b/psutil/tests/test_memory_leaks.py @@ -33,6 +33,7 @@ from psutil._compat import super from psutil.tests import CIRRUS from psutil.tests import create_sockets from psutil.tests import get_test_subprocess +from psutil.tests import get_testfn from psutil.tests import HAS_CPU_AFFINITY from psutil.tests import HAS_CPU_FREQ from psutil.tests import HAS_ENVIRON @@ -46,9 +47,7 @@ from psutil.tests import HAS_SENSORS_BATTERY from psutil.tests import HAS_SENSORS_FANS from psutil.tests import HAS_SENSORS_TEMPERATURES from psutil.tests import reap_children -from psutil.tests import safe_rmpath from psutil.tests import skip_on_access_denied -from psutil.tests import TESTFN from psutil.tests import TestMemoryLeak from psutil.tests import TRAVIS from psutil.tests import unittest @@ -223,8 +222,7 @@ class TestProcessObjectLeaks(TestMemoryLeak): @skip_if_linux() def test_open_files(self): - safe_rmpath(TESTFN) # needed after UNIX socket test has run - with open(TESTFN, 'w'): + with open(get_testfn(), 'w'): self.execute(self.proc.open_files) @unittest.skipIf(not HAS_MEMORY_MAPS, "not supported") diff --git a/psutil/tests/test_process.py b/psutil/tests/test_process.py index 987bdf38..ddb1bbba 100755 --- a/psutil/tests/test_process.py +++ b/psutil/tests/test_process.py @@ -15,7 +15,6 @@ import signal import socket import subprocess import sys -import tempfile import textwrap import time import types @@ -44,6 +43,7 @@ from psutil.tests import create_proc_children_pair from psutil.tests import create_zombie_proc from psutil.tests import enum from psutil.tests import get_test_subprocess +from psutil.tests import get_testfn from psutil.tests import HAS_CPU_AFFINITY from psutil.tests import HAS_ENVIRON from psutil.tests import HAS_IONICE @@ -57,12 +57,9 @@ from psutil.tests import PYPY from psutil.tests import PYTHON_EXE from psutil.tests import reap_children from psutil.tests import retry_on_failure -from psutil.tests import safe_rmpath from psutil.tests import sh from psutil.tests import skip_on_access_denied from psutil.tests import skip_on_not_implemented -from psutil.tests import TESTFILE_PREFIX -from psutil.tests import TESTFN from psutil.tests import ThreadTask from psutil.tests import TRAVIS from psutil.tests import unittest @@ -76,9 +73,6 @@ from psutil.tests import wait_for_pid class TestProcess(unittest.TestCase): """Tests for psutil.Process class.""" - def setUp(self): - safe_rmpath(TESTFN) - def tearDown(self): reap_children() @@ -328,7 +322,7 @@ class TestProcess(unittest.TestCase): # test writes io1 = p.io_counters() - with tempfile.TemporaryFile(prefix=TESTFILE_PREFIX) as f: + with open(get_testfn(), 'wb') as f: if PY3: f.write(bytes("x" * 1000000, 'ascii')) else: @@ -461,16 +455,17 @@ class TestProcess(unittest.TestCase): @unittest.skipIf(not HAS_RLIMIT, "not supported") def test_rlimit(self): + testfn = get_testfn() p = psutil.Process() soft, hard = p.rlimit(psutil.RLIMIT_FSIZE) try: p.rlimit(psutil.RLIMIT_FSIZE, (1024, hard)) - with open(TESTFN, "wb") as f: + with open(testfn, "wb") as f: f.write(b"X" * 1024) # write() or flush() doesn't always cause the exception # but close() will. with self.assertRaises(IOError) as exc: - with open(TESTFN, "wb") as f: + with open(testfn, "wb") as f: f.write(b"X" * 1025) self.assertEqual(exc.exception.errno if PY3 else exc.exception[0], errno.EFBIG) @@ -482,12 +477,13 @@ class TestProcess(unittest.TestCase): def test_rlimit_infinity(self): # First set a limit, then re-set it by specifying INFINITY # and assume we overridden the previous limit. + testfn = get_testfn() p = psutil.Process() soft, hard = p.rlimit(psutil.RLIMIT_FSIZE) try: p.rlimit(psutil.RLIMIT_FSIZE, (1024, hard)) p.rlimit(psutil.RLIMIT_FSIZE, (psutil.RLIM_INFINITY, hard)) - with open(TESTFN, "wb") as f: + with open(testfn, "wb") as f: f.write(b"X" * 2048) finally: p.rlimit(psutil.RLIMIT_FSIZE, (soft, hard)) @@ -728,9 +724,9 @@ class TestProcess(unittest.TestCase): @unittest.skipIf(PYPY, "broken on PYPY") def test_long_cmdline(self): - create_exe(TESTFN) - self.addCleanup(safe_rmpath, TESTFN) - cmdline = [TESTFN] + (["0123456789"] * 20) + testfn = get_testfn() + create_exe(testfn) + cmdline = [testfn] + (["0123456789"] * 20) sproc = get_test_subprocess(cmdline) p = psutil.Process(sproc.pid) self.assertEqual(p.cmdline(), cmdline) @@ -743,12 +739,11 @@ class TestProcess(unittest.TestCase): @unittest.skipIf(PYPY, "unreliable on PYPY") def test_long_name(self): - long_name = TESTFN + ("0123456789" * 2) - create_exe(long_name) - self.addCleanup(safe_rmpath, long_name) - sproc = get_test_subprocess(long_name) + testfn = get_testfn(suffix="0123456789" * 2) + create_exe(testfn) + sproc = get_test_subprocess(testfn) p = psutil.Process(sproc.pid) - self.assertEqual(p.name(), os.path.basename(long_name)) + self.assertEqual(p.name(), os.path.basename(testfn)) # XXX @unittest.skipIf(SUNOS, "broken on SUNOS") @@ -758,19 +753,8 @@ class TestProcess(unittest.TestCase): # Test that name(), exe() and cmdline() correctly handle programs # with funky chars such as spaces and ")", see: # https://github.com/giampaolo/psutil/issues/628 - - def rm(): - # Try to limit occasional failures on Appveyor: - # https://ci.appveyor.com/project/giampaolo/psutil/build/1350/ - # job/lbo3bkju55le850n - try: - safe_rmpath(funky_path) - except OSError: - pass - - funky_path = TESTFN + 'foo bar )' + funky_path = get_testfn(suffix='foo bar )') create_exe(funky_path) - self.addCleanup(rm) cmdline = [funky_path, "-c", "import time; [time.sleep(0.01) for x in range(3000)];" "arg1", "arg2", "", "arg3", ""] @@ -960,35 +944,36 @@ class TestProcess(unittest.TestCase): @unittest.skipIf(APPVEYOR, "unreliable on APPVEYOR") def test_open_files(self): # current process + testfn = get_testfn() p = psutil.Process() files = p.open_files() - self.assertFalse(TESTFN in files) - with open(TESTFN, 'wb') as f: + self.assertFalse(testfn in files) + with open(testfn, 'wb') as f: f.write(b'x' * 1024) f.flush() # give the kernel some time to see the new file files = call_until(p.open_files, "len(ret) != %i" % len(files)) filenames = [os.path.normcase(x.path) for x in files] - self.assertIn(os.path.normcase(TESTFN), filenames) + self.assertIn(os.path.normcase(testfn), filenames) if LINUX: for file in files: - if file.path == TESTFN: + if file.path == testfn: self.assertEqual(file.position, 1024) for file in files: assert os.path.isfile(file.path), file # another process - cmdline = "import time; f = open(r'%s', 'r'); time.sleep(60);" % TESTFN + cmdline = "import time; f = open(r'%s', 'r'); time.sleep(60);" % testfn sproc = get_test_subprocess([PYTHON_EXE, "-c", cmdline]) p = psutil.Process(sproc.pid) for x in range(100): filenames = [os.path.normcase(x.path) for x in p.open_files()] - if TESTFN in filenames: + if testfn in filenames: break time.sleep(.01) else: - self.assertIn(os.path.normcase(TESTFN), filenames) + self.assertIn(os.path.normcase(testfn), filenames) for file in filenames: assert os.path.isfile(file), file @@ -999,7 +984,8 @@ class TestProcess(unittest.TestCase): def test_open_files_2(self): # test fd and path fields normcase = os.path.normcase - with open(TESTFN, 'w') as fileobj: + testfn = get_testfn() + with open(testfn, 'w') as fileobj: p = psutil.Process() for file in p.open_files(): if normcase(file.path) == normcase(fileobj.name) or \ @@ -1021,9 +1007,10 @@ class TestProcess(unittest.TestCase): @unittest.skipIf(not POSIX, 'POSIX only') def test_num_fds(self): + testfn = get_testfn() p = psutil.Process() start = p.num_fds() - file = open(TESTFN, 'w') + file = open(testfn, 'w') self.addCleanup(file.close) self.assertEqual(p.num_fds(), start + 1) sock = socket.socket() @@ -1512,9 +1499,8 @@ class TestProcess(unittest.TestCase): return execve("/bin/cat", argv, envp); } """) - path = TESTFN + path = get_testfn() create_exe(path, c_code=code) - self.addCleanup(safe_rmpath, path) sproc = get_test_subprocess([path], stdin=subprocess.PIPE, stderr=subprocess.PIPE) @@ -1559,7 +1545,6 @@ if POSIX and os.getuid() == 0: setattr(self, attr, types.MethodType(test_, self)) def setUp(self): - safe_rmpath(TESTFN) TestProcess.setUp(self) os.setegid(1000) os.seteuid(1000) diff --git a/psutil/tests/test_system.py b/psutil/tests/test_system.py index 3834209f..73401b3c 100755 --- a/psutil/tests/test_system.py +++ b/psutil/tests/test_system.py @@ -15,7 +15,6 @@ import shutil import signal import socket import sys -import tempfile import time import psutil @@ -37,6 +36,7 @@ from psutil.tests import CI_TESTING from psutil.tests import DEVNULL from psutil.tests import enum from psutil.tests import get_test_subprocess +from psutil.tests import get_testfn from psutil.tests import HAS_BATTERY from psutil.tests import HAS_CPU_FREQ from psutil.tests import HAS_GETLOADAVG @@ -48,10 +48,8 @@ from psutil.tests import mock from psutil.tests import PYPY from psutil.tests import reap_children from psutil.tests import retry_on_failure -from psutil.tests import safe_rmpath -from psutil.tests import TESTFN -from psutil.tests import TESTFN_UNICODE from psutil.tests import TRAVIS +from psutil.tests import UNICODE_SUFFIX from psutil.tests import unittest @@ -62,9 +60,6 @@ from psutil.tests import unittest class TestProcessAPIs(unittest.TestCase): - def setUp(self): - safe_rmpath(TESTFN) - def tearDown(self): reap_children() @@ -591,15 +586,15 @@ class TestDiskAPIs(unittest.TestCase): # if path does not exist OSError ENOENT is expected across # all platforms - fname = tempfile.mktemp() + fname = get_testfn() with self.assertRaises(FileNotFoundError): psutil.disk_usage(fname) + @unittest.skipIf(not ASCII_FS, "not an ASCII fs") def test_disk_usage_unicode(self): # See: https://github.com/giampaolo/psutil/issues/416 - if ASCII_FS: - with self.assertRaises(UnicodeEncodeError): - psutil.disk_usage(TESTFN_UNICODE) + with self.assertRaises(UnicodeEncodeError): + psutil.disk_usage(UNICODE_SUFFIX) def test_disk_usage_bytes(self): psutil.disk_usage(b'.') diff --git a/psutil/tests/test_testutils.py b/psutil/tests/test_testutils.py index 7d3d6675..55c5d3df 100755 --- a/psutil/tests/test_testutils.py +++ b/psutil/tests/test_testutils.py @@ -34,6 +34,7 @@ from psutil.tests import create_sockets from psutil.tests import create_zombie_proc from psutil.tests import get_free_port from psutil.tests import get_test_subprocess +from psutil.tests import get_testfn from psutil.tests import HAS_CONNECTIONS_UNIX from psutil.tests import is_namedtuple from psutil.tests import mock @@ -43,10 +44,8 @@ from psutil.tests import retry_on_failure from psutil.tests import safe_mkdir from psutil.tests import safe_rmpath from psutil.tests import tcp_socketpair -from psutil.tests import TESTFN from psutil.tests import TestMemoryLeak from psutil.tests import unittest -from psutil.tests import unix_socket_path from psutil.tests import unix_socketpair from psutil.tests import wait_for_file from psutil.tests import wait_for_pid @@ -126,9 +125,6 @@ class TestRetryDecorator(unittest.TestCase): class TestSyncTestUtils(unittest.TestCase): - def tearDown(self): - safe_rmpath(TESTFN) - def test_wait_for_pid(self): wait_for_pid(os.getpid()) nopid = max(psutil.pids()) + 99999 @@ -136,26 +132,30 @@ class TestSyncTestUtils(unittest.TestCase): self.assertRaises(psutil.NoSuchProcess, wait_for_pid, nopid) def test_wait_for_file(self): - with open(TESTFN, 'w') as f: + testfn = get_testfn() + with open(testfn, 'w') as f: f.write('foo') - wait_for_file(TESTFN) - assert not os.path.exists(TESTFN) + wait_for_file(testfn) + assert not os.path.exists(testfn) def test_wait_for_file_empty(self): - with open(TESTFN, 'w'): + testfn = get_testfn() + with open(testfn, 'w'): pass - wait_for_file(TESTFN, empty=True) - assert not os.path.exists(TESTFN) + wait_for_file(testfn, empty=True) + assert not os.path.exists(testfn) def test_wait_for_file_no_file(self): + testfn = get_testfn() with mock.patch('psutil.tests.retry.__iter__', return_value=iter([0])): - self.assertRaises(IOError, wait_for_file, TESTFN) + self.assertRaises(IOError, wait_for_file, testfn) def test_wait_for_file_no_delete(self): - with open(TESTFN, 'w') as f: + testfn = get_testfn() + with open(testfn, 'w') as f: f.write('foo') - wait_for_file(TESTFN, delete=False) - assert os.path.exists(TESTFN) + wait_for_file(testfn, delete=False) + assert os.path.exists(testfn) def test_call_until(self): ret = call_until(lambda: 1, "ret == 1") @@ -164,11 +164,6 @@ class TestSyncTestUtils(unittest.TestCase): class TestFSTestUtils(unittest.TestCase): - def setUp(self): - safe_rmpath(TESTFN) - - tearDown = setUp - def test_open_text(self): with open_text(__file__) as f: self.assertEqual(f.mode, 'rt') @@ -178,34 +173,37 @@ class TestFSTestUtils(unittest.TestCase): self.assertEqual(f.mode, 'rb') def test_safe_mkdir(self): - safe_mkdir(TESTFN) - assert os.path.isdir(TESTFN) - safe_mkdir(TESTFN) - assert os.path.isdir(TESTFN) + testfn = get_testfn() + safe_mkdir(testfn) + assert os.path.isdir(testfn) + safe_mkdir(testfn) + assert os.path.isdir(testfn) def test_safe_rmpath(self): # test file is removed - open(TESTFN, 'w').close() - safe_rmpath(TESTFN) - assert not os.path.exists(TESTFN) + testfn = get_testfn() + open(testfn, 'w').close() + safe_rmpath(testfn) + assert not os.path.exists(testfn) # test no exception if path does not exist - safe_rmpath(TESTFN) + safe_rmpath(testfn) # test dir is removed - os.mkdir(TESTFN) - safe_rmpath(TESTFN) - assert not os.path.exists(TESTFN) + os.mkdir(testfn) + safe_rmpath(testfn) + assert not os.path.exists(testfn) # test other exceptions are raised with mock.patch('psutil.tests.os.stat', side_effect=OSError(errno.EINVAL, "")) as m: with self.assertRaises(OSError): - safe_rmpath(TESTFN) + safe_rmpath(testfn) assert m.called def test_chdir(self): + testfn = get_testfn() base = os.getcwd() - os.mkdir(TESTFN) - with chdir(TESTFN): - self.assertEqual(os.getcwd(), os.path.join(base, TESTFN)) + os.mkdir(testfn) + with chdir(testfn): + self.assertEqual(os.getcwd(), os.path.join(base, testfn)) self.assertEqual(os.getcwd(), base) @@ -256,19 +254,19 @@ class TestNetUtils(unittest.TestCase): @unittest.skipIf(not POSIX, "POSIX only") def test_bind_unix_socket(self): - with unix_socket_path() as name: - sock = bind_unix_socket(name) - with contextlib.closing(sock): - self.assertEqual(sock.family, socket.AF_UNIX) - self.assertEqual(sock.type, socket.SOCK_STREAM) - self.assertEqual(sock.getsockname(), name) - assert os.path.exists(name) - assert stat.S_ISSOCK(os.stat(name).st_mode) + name = get_testfn() + sock = bind_unix_socket(name) + with contextlib.closing(sock): + self.assertEqual(sock.family, socket.AF_UNIX) + self.assertEqual(sock.type, socket.SOCK_STREAM) + self.assertEqual(sock.getsockname(), name) + assert os.path.exists(name) + assert stat.S_ISSOCK(os.stat(name).st_mode) # UDP - with unix_socket_path() as name: - sock = bind_unix_socket(name, type=socket.SOCK_DGRAM) - with contextlib.closing(sock): - self.assertEqual(sock.type, socket.SOCK_DGRAM) + name = get_testfn() + sock = bind_unix_socket(name, type=socket.SOCK_DGRAM) + with contextlib.closing(sock): + self.assertEqual(sock.type, socket.SOCK_DGRAM) def tcp_tcp_socketpair(self): addr = ("127.0.0.1", get_free_port()) @@ -288,18 +286,18 @@ class TestNetUtils(unittest.TestCase): p = psutil.Process() num_fds = p.num_fds() assert not p.connections(kind='unix') - with unix_socket_path() as name: - server, client = unix_socketpair(name) - try: - assert os.path.exists(name) - assert stat.S_ISSOCK(os.stat(name).st_mode) - self.assertEqual(p.num_fds() - num_fds, 2) - self.assertEqual(len(p.connections(kind='unix')), 2) - self.assertEqual(server.getsockname(), name) - self.assertEqual(client.getpeername(), name) - finally: - client.close() - server.close() + name = get_testfn() + server, client = unix_socketpair(name) + try: + assert os.path.exists(name) + assert stat.S_ISSOCK(os.stat(name).st_mode) + self.assertEqual(p.num_fds() - num_fds, 2) + self.assertEqual(len(p.connections(kind='unix')), 2) + self.assertEqual(server.getsockname(), name) + self.assertEqual(client.getpeername(), name) + finally: + client.close() + server.close() def test_create_sockets(self): with create_sockets() as socks: diff --git a/psutil/tests/test_unicode.py b/psutil/tests/test_unicode.py index ac2d4f69..6fbaa43f 100755 --- a/psutil/tests/test_unicode.py +++ b/psutil/tests/test_unicode.py @@ -86,27 +86,27 @@ from psutil import WINDOWS from psutil._compat import PY3 from psutil._compat import u from psutil.tests import APPVEYOR -from psutil.tests import CIRRUS from psutil.tests import ASCII_FS from psutil.tests import bind_unix_socket from psutil.tests import chdir +from psutil.tests import CIRRUS from psutil.tests import copyload_shared_lib from psutil.tests import create_exe from psutil.tests import get_test_subprocess +from psutil.tests import get_testfn from psutil.tests import HAS_CONNECTIONS_UNIX from psutil.tests import HAS_ENVIRON from psutil.tests import HAS_MEMORY_MAPS +from psutil.tests import INVALID_UNICODE_SUFFIX from psutil.tests import PYPY from psutil.tests import reap_children from psutil.tests import safe_mkdir from psutil.tests import safe_rmpath as _safe_rmpath from psutil.tests import skip_on_access_denied -from psutil.tests import TESTFILE_PREFIX -from psutil.tests import TESTFN -from psutil.tests import TESTFN_UNICODE +from psutil.tests import TESTFN_PREFIX from psutil.tests import TRAVIS +from psutil.tests import UNICODE_SUFFIX from psutil.tests import unittest -from psutil.tests import unix_socket_path import psutil @@ -130,12 +130,13 @@ def safe_rmpath(path): return _safe_rmpath(path) -def subprocess_supports_unicode(name): +def subprocess_supports_unicode(suffix): """Return True if both the fs and the subprocess module can deal with a unicode file name. """ if PY3: return True + name = get_testfn(suffix=suffix) try: safe_rmpath(name) create_exe(name) @@ -148,38 +149,28 @@ def subprocess_supports_unicode(name): reap_children() -# An invalid unicode string. -if PY3: - INVALID_NAME = (TESTFN.encode('utf8') + b"f\xc0\x80").decode( - 'utf8', 'surrogateescape') -else: - INVALID_NAME = TESTFN + "f\xc0\x80" - - # =================================================================== # FS APIs # =================================================================== class _BaseFSAPIsTests(object): - funky_name = None + funky_suffix = None @classmethod def setUpClass(cls): - safe_rmpath(cls.funky_name) + cls.funky_name = get_testfn(suffix=cls.funky_suffix) create_exe(cls.funky_name) @classmethod def tearDownClass(cls): reap_children() - safe_rmpath(cls.funky_name) - - def tearDown(self): - reap_children() def expect_exact_path_match(self): raise NotImplementedError("must be implemented in subclass") + # --- + def test_proc_exe(self): subp = get_test_subprocess(cmd=[self.funky_name]) p = psutil.Process(subp.pid) @@ -234,20 +225,20 @@ class _BaseFSAPIsTests(object): @unittest.skipIf(not POSIX, "POSIX only") def test_proc_connections(self): suffix = os.path.basename(self.funky_name) - with unix_socket_path(suffix=suffix) as name: - try: - sock = bind_unix_socket(name) - except UnicodeEncodeError: - if PY3: - raise - else: - raise unittest.SkipTest("not supported") - with closing(sock): - conn = psutil.Process().connections('unix')[0] - self.assertIsInstance(conn.laddr, str) - # AF_UNIX addr not set on OpenBSD - if not OPENBSD and not CIRRUS: # XXX - self.assertEqual(conn.laddr, name) + name = get_testfn(suffix=suffix) + try: + sock = bind_unix_socket(name) + except UnicodeEncodeError: + if PY3: + raise + else: + raise unittest.SkipTest("not supported") + with closing(sock): + conn = psutil.Process().connections('unix')[0] + self.assertIsInstance(conn.laddr, str) + # AF_UNIX addr not set on OpenBSD + if not OPENBSD and not CIRRUS: # XXX + self.assertEqual(conn.laddr, name) @unittest.skipIf(not POSIX, "POSIX only") @unittest.skipIf(not HAS_CONNECTIONS_UNIX, "can't list UNIX sockets") @@ -255,26 +246,26 @@ class _BaseFSAPIsTests(object): def test_net_connections(self): def find_sock(cons): for conn in cons: - if os.path.basename(conn.laddr).startswith(TESTFILE_PREFIX): + if os.path.basename(conn.laddr).startswith(TESTFN_PREFIX): return conn raise ValueError("connection not found") suffix = os.path.basename(self.funky_name) - with unix_socket_path(suffix=suffix) as name: - try: - sock = bind_unix_socket(name) - except UnicodeEncodeError: - if PY3: - raise - else: - raise unittest.SkipTest("not supported") - with closing(sock): - cons = psutil.net_connections(kind='unix') - # AF_UNIX addr not set on OpenBSD - if not OPENBSD: - conn = find_sock(cons) - self.assertIsInstance(conn.laddr, str) - self.assertEqual(conn.laddr, name) + name = get_testfn(suffix=suffix) + try: + sock = bind_unix_socket(name) + except UnicodeEncodeError: + if PY3: + raise + else: + raise unittest.SkipTest("not supported") + with closing(sock): + cons = psutil.net_connections(kind='unix') + # AF_UNIX addr not set on OpenBSD + if not OPENBSD: + conn = find_sock(cons) + self.assertIsInstance(conn.laddr, str) + self.assertEqual(conn.laddr, name) def test_disk_usage(self): dname = self.funky_name + "2" @@ -289,13 +280,13 @@ class _BaseFSAPIsTests(object): def test_memory_maps(self): # XXX: on Python 2, using ctypes.CDLL with a unicode path # opens a message box which blocks the test run. - with copyload_shared_lib(dst_prefix=self.funky_name) as funky_path: + with copyload_shared_lib(suffix=self.funky_suffix) as funky_path: def normpath(p): return os.path.realpath(os.path.normcase(p)) libpaths = [normpath(x.path) for x in psutil.Process().memory_maps()] # ...just to have a clearer msg in case of failure - libpaths = [x for x in libpaths if TESTFILE_PREFIX in x] + libpaths = [x for x in libpaths if TESTFN_PREFIX in x] self.assertIn(normpath(funky_path), libpaths) for path in libpaths: self.assertIsInstance(path, str) @@ -305,30 +296,29 @@ class _BaseFSAPIsTests(object): @unittest.skipIf(PYPY and TRAVIS, "unreliable on PYPY + TRAVIS") @unittest.skipIf(MACOS and TRAVIS, "unreliable on TRAVIS") # TODO @unittest.skipIf(ASCII_FS, "ASCII fs") -@unittest.skipIf(not subprocess_supports_unicode(TESTFN_UNICODE), +@unittest.skipIf(not subprocess_supports_unicode(UNICODE_SUFFIX), "subprocess can't deal with unicode") class TestFSAPIs(_BaseFSAPIsTests, unittest.TestCase): """Test FS APIs with a funky, valid, UTF8 path name.""" - funky_name = TESTFN_UNICODE + funky_suffix = UNICODE_SUFFIX - @classmethod - def expect_exact_path_match(cls): + def expect_exact_path_match(self): # Do not expect psutil to correctly handle unicode paths on # Python 2 if os.listdir() is not able either. - here = '.' if isinstance(cls.funky_name, str) else u('.') + here = '.' if isinstance(self.funky_name, str) else u('.') with warnings.catch_warnings(): warnings.simplefilter("ignore") - return cls.funky_name in os.listdir(here) + return self.funky_name in os.listdir(here) @unittest.skipIf(PYPY and TRAVIS, "unreliable on PYPY + TRAVIS") @unittest.skipIf(MACOS and TRAVIS, "unreliable on TRAVIS") # TODO @unittest.skipIf(PYPY, "unreliable on PYPY") -@unittest.skipIf(not subprocess_supports_unicode(INVALID_NAME), +@unittest.skipIf(not subprocess_supports_unicode(INVALID_UNICODE_SUFFIX), "subprocess can't deal with invalid unicode") class TestFSAPIsWithInvalidPath(_BaseFSAPIsTests, unittest.TestCase): """Test FS APIs with a funky, invalid path name.""" - funky_name = INVALID_NAME + funky_suffix = INVALID_UNICODE_SUFFIX @classmethod def expect_exact_path_match(cls): @@ -356,7 +346,7 @@ class TestNonFSAPIS(unittest.TestCase): # we use "è", which is part of the extended ASCII table # (unicode point <= 255). env = os.environ.copy() - funky_str = TESTFN_UNICODE if PY3 else 'è' + funky_str = UNICODE_SUFFIX if PY3 else 'è' env['FUNNY_ARG'] = funky_str sproc = get_test_subprocess(env=env) p = psutil.Process(sproc.pid) |