summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGiampaolo Rodola <g.rodola@gmail.com>2020-04-30 14:45:08 -0700
committerGitHub <noreply@github.com>2020-04-30 23:45:08 +0200
commit3480e1b05f3e98744a1b6aa6fe286caac86e6bbd (patch)
tree4c3dc33e7102e60b3a49555a3c91c4a7493d1556
parent5f56983c2195ff6c20c8066749fa8b28d47dbd2e (diff)
downloadpsutil-3480e1b05f3e98744a1b6aa6fe286caac86e6bbd.tar.gz
Per-test file cleanup and new PsutilTestCase (#1743)
Test files/dirs are now removed after each test. when invoked via self.get_testfn(). Until now test files were stored in a global variable and were removed at process exit, via atexit.register(), but this didn't work with parallel tests because the fork()ed workers use os._exit(0), preventing cleanup functions to run. All test classes now inherit from PsutilTestCase class, which provides the most important methods requiring an automatic cleanup (get_test_subprocess() and others).
-rw-r--r--Makefile2
-rw-r--r--psutil/tests/__init__.py139
-rwxr-xr-xpsutil/tests/test_aix.py3
-rwxr-xr-xpsutil/tests/test_bsd.py11
-rwxr-xr-xpsutil/tests/test_connections.py23
-rwxr-xr-xpsutil/tests/test_contracts.py11
-rwxr-xr-xpsutil/tests/test_linux.py59
-rwxr-xr-xpsutil/tests/test_memory_leaks.py6
-rwxr-xr-xpsutil/tests/test_misc.py7
-rwxr-xr-xpsutil/tests/test_osx.py9
-rwxr-xr-xpsutil/tests/test_posix.py5
-rwxr-xr-xpsutil/tests/test_process.py99
-rwxr-xr-xpsutil/tests/test_sunos.py3
-rw-r--r--[-rwxr-xr-x]psutil/tests/test_system.py19
-rw-r--r--[-rwxr-xr-x]psutil/tests/test_testutils.py65
-rw-r--r--[-rwxr-xr-x]psutil/tests/test_unicode.py38
-rwxr-xr-xpsutil/tests/test_windows.py19
-rwxr-xr-xscripts/internal/winmake.py13
18 files changed, 262 insertions, 269 deletions
diff --git a/Makefile b/Makefile
index f6e8ba70..86727ca7 100644
--- a/Makefile
+++ b/Makefile
@@ -59,7 +59,7 @@ clean: ## Remove all build files.
rm -rf \
*.core \
*.egg-info \
- *\$testfn* \
+ *\@psutil-* \
.coverage \
.failed-tests.txt \
.tox \
diff --git a/psutil/tests/__init__.py b/psutil/tests/__init__.py
index fc4bff01..8ce76304 100644
--- a/psutil/tests/__init__.py
+++ b/psutil/tests/__init__.py
@@ -30,7 +30,6 @@ import tempfile
import textwrap
import threading
import time
-import traceback
import warnings
from socket import AF_INET
from socket import AF_INET6
@@ -88,7 +87,7 @@ __all__ = [
'ThreadTask'
# test utils
'unittest', 'skip_on_access_denied', 'skip_on_not_implemented',
- 'retry_on_failure', 'TestMemoryLeak', 'ProcessTestCase',
+ 'retry_on_failure', 'TestMemoryLeak', 'PsutilTestCase',
# install utils
'install_pip', 'install_test_deps',
# fs utils
@@ -220,7 +219,6 @@ AF_UNIX = getattr(socket, "AF_UNIX", object())
_subprocesses_started = set()
_pids_started = set()
-_testfiles_created = set()
# ===================================================================
@@ -309,14 +307,17 @@ def get_test_subprocess(cmd=None, **kwds):
kwds.setdefault("creationflags", CREATE_NO_WINDOW)
if cmd is None:
testfn = get_testfn()
- safe_rmpath(testfn)
- pyline = "from time import sleep;" \
- "open(r'%s', 'w').close();" \
- "sleep(60);" % testfn
- cmd = [PYTHON_EXE, "-c", pyline]
- sproc = subprocess.Popen(cmd, **kwds)
- _subprocesses_started.add(sproc)
- wait_for_file(testfn, delete=True, empty=True)
+ try:
+ safe_rmpath(testfn)
+ pyline = "from time import sleep;" \
+ "open(r'%s', 'w').close();" \
+ "sleep(60);" % testfn
+ cmd = [PYTHON_EXE, "-c", pyline]
+ sproc = subprocess.Popen(cmd, **kwds)
+ _subprocesses_started.add(sproc)
+ wait_for_file(testfn, delete=True, empty=True)
+ finally:
+ safe_rmpath(testfn)
else:
sproc = subprocess.Popen(cmd, **kwds)
_subprocesses_started.add(sproc)
@@ -332,30 +333,35 @@ def create_proc_children_pair():
The 2 processes are fully initialized and will live for 60 secs
and are registered for cleanup on reap_children().
"""
- # must be relative on Windows
- testfn = os.path.basename(get_testfn(dir=os.getcwd()))
- s = textwrap.dedent("""\
- import subprocess, os, sys, time
- s = "import os, time;"
- s += "f = open('%s', 'w');"
- s += "f.write(str(os.getpid()));"
- s += "f.close();"
- s += "time.sleep(60);"
- p = subprocess.Popen([r'%s', '-c', s])
- p.wait()
- """ % (testfn, PYTHON_EXE))
- # On Windows if we create a subprocess with CREATE_NO_WINDOW flag
- # set (which is the default) a "conhost.exe" extra process will be
- # spawned as a child. We don't want that.
- if WINDOWS:
- subp = pyrun(s, creationflags=0)
- else:
- subp = pyrun(s)
- child1 = psutil.Process(subp.pid)
- child2_pid = int(wait_for_file(testfn, delete=True, empty=False))
- _pids_started.add(child2_pid)
- child2 = psutil.Process(child2_pid)
- return (child1, child2)
+ tfile = None
+ testfn = get_testfn(dir=os.getcwd())
+ try:
+ s = textwrap.dedent("""\
+ import subprocess, os, sys, time
+ s = "import os, time;"
+ s += "f = open('%s', 'w');"
+ s += "f.write(str(os.getpid()));"
+ s += "f.close();"
+ s += "time.sleep(60);"
+ p = subprocess.Popen([r'%s', '-c', s])
+ p.wait()
+ """ % (os.path.basename(testfn), PYTHON_EXE))
+ # On Windows if we create a subprocess with CREATE_NO_WINDOW flag
+ # set (which is the default) a "conhost.exe" extra process will be
+ # spawned as a child. We don't want that.
+ if WINDOWS:
+ subp, tfile = pyrun(s, creationflags=0)
+ else:
+ subp, tfile = pyrun(s)
+ child = psutil.Process(subp.pid)
+ grandchild_pid = int(wait_for_file(testfn, delete=True, empty=False))
+ _pids_started.add(grandchild_pid)
+ grandchild = psutil.Process(grandchild_pid)
+ return (child, grandchild)
+ finally:
+ safe_rmpath(testfn)
+ if tfile is not None:
+ safe_rmpath(tfile)
def create_zombie_proc():
@@ -381,10 +387,11 @@ def create_zombie_proc():
pid = bytes(str(os.getpid()), 'ascii')
s.sendall(pid)
""" % unix_file)
+ tfile = None
sock = bind_unix_socket(unix_file)
- with contextlib.closing(sock):
+ try:
sock.settimeout(GLOBAL_TIMEOUT)
- parent = pyrun(src)
+ parent, tfile = pyrun(src)
conn, _ = sock.accept()
try:
select.select([conn.fileno()], [], [], GLOBAL_TIMEOUT)
@@ -395,21 +402,31 @@ def create_zombie_proc():
return (parent, zombie)
finally:
conn.close()
+ finally:
+ sock.close()
+ safe_rmpath(unix_file)
+ if tfile is not None:
+ safe_rmpath(tfile)
@_reap_children_on_err
def pyrun(src, **kwds):
"""Run python 'src' code string in a separate interpreter.
- Returns a subprocess.Popen instance.
+ Returns a subprocess.Popen instance and the test file where the source
+ code was written.
"""
kwds.setdefault("stdout", None)
kwds.setdefault("stderr", None)
- with open(get_testfn(), 'wt') as f:
- f.write(src)
- f.flush()
+ srcfile = get_testfn()
+ try:
+ with open(srcfile, 'wt') as f:
+ f.write(src)
subp = get_test_subprocess([PYTHON_EXE, f.name], **kwds)
wait_for_pid(subp.pid)
- return subp
+ return (subp, srcfile)
+ except Exception:
+ safe_rmpath(srcfile)
+ raise
@_reap_children_on_err
@@ -772,7 +789,6 @@ def chdir(dirname):
def create_exe(outpath, c_code=None):
"""Creates an executable file in the given location."""
assert not os.path.exists(outpath), outpath
- _testfiles_created.add(outpath)
if c_code:
if not which("gcc"):
raise ValueError("gcc is not installed")
@@ -811,7 +827,6 @@ def get_testfn(suffix="", dir=None):
prefix = "%s%.9f-" % (TESTFN_PREFIX, timer())
name = tempfile.mktemp(prefix=prefix, suffix=suffix, dir=dir)
if not os.path.exists(name): # also include dirs
- _testfiles_created.add(name)
return os.path.realpath(name) # needed for OSX
@@ -841,11 +856,16 @@ class TestCase(unittest.TestCase):
unittest.TestCase = TestCase
-class ProcessTestCase(TestCase):
+class PsutilTestCase(TestCase):
"""Test class providing auto-cleanup wrappers on top of process
test utilities.
"""
+ def get_testfn(self, suffix="", dir=None):
+ fname = get_testfn(suffix=suffix, dir=dir)
+ self.addCleanup(safe_rmpath, fname)
+ return fname
+
def get_test_subprocess(self, *args, **kwds):
sproc = get_test_subprocess(*args, **kwds)
self.addCleanup(terminate, sproc)
@@ -853,8 +873,8 @@ class ProcessTestCase(TestCase):
def create_proc_children_pair(self):
child1, child2 = create_proc_children_pair()
- self.addCleanup(terminate, child1)
self.addCleanup(terminate, child2)
+ self.addCleanup(terminate, child1) # executed first
return (child1, child2)
def create_zombie_proc(self):
@@ -864,18 +884,14 @@ class ProcessTestCase(TestCase):
return (parent, zombie)
def pyrun(self, *args, **kwds):
- sproc = pyrun(*args, **kwds)
- self.addCleanup(terminate, sproc)
+ sproc, srcfile = pyrun(*args, **kwds)
+ self.addCleanup(safe_rmpath, srcfile)
+ self.addCleanup(terminate, sproc) # executed first
return sproc
- def get_testfn(self, suffix="", dir=None):
- fname = get_testfn(suffix=suffix, dir=suffix)
- self.addCleanup(safe_rmpath(fname))
- return fname
-
@unittest.skipIf(PYPY, "unreliable on PYPY")
-class TestMemoryLeak(unittest.TestCase):
+class TestMemoryLeak(PsutilTestCase):
"""Test framework class for detecting function memory leaks (typically
functions implemented in C).
It does so by calling a function many times, and checks whether the
@@ -1149,14 +1165,15 @@ def create_sockets():
fname2 = get_testfn()
s1, s2 = unix_socketpair(fname1)
s3 = bind_unix_socket(fname2, type=socket.SOCK_DGRAM)
- # self.addCleanup(safe_rmpath, fname1)
- # self.addCleanup(safe_rmpath, fname2)
for s in (s1, s2, s3):
socks.append(s)
yield socks
finally:
for s in socks:
s.close()
+ for fname in (fname1, fname2):
+ if fname is not None:
+ safe_rmpath(fname)
def check_net_address(addr, family):
@@ -1309,16 +1326,6 @@ else:
atexit.register(DEVNULL.close)
-@atexit.register
-def cleanup_test_files():
- while _testfiles_created:
- path = _testfiles_created.pop()
- try:
- safe_rmpath(path)
- except Exception:
- traceback.print_exc()
-
-
# this is executed first
@atexit.register
def cleanup_test_procs():
diff --git a/psutil/tests/test_aix.py b/psutil/tests/test_aix.py
index 889526ad..caf20357 100755
--- a/psutil/tests/test_aix.py
+++ b/psutil/tests/test_aix.py
@@ -11,13 +11,14 @@
import re
from psutil import AIX
+from psutil.tests import PsutilTestCase
from psutil.tests import sh
from psutil.tests import unittest
import psutil
@unittest.skipIf(not AIX, "AIX only")
-class AIXSpecificTestCase(unittest.TestCase):
+class AIXSpecificTestCase(PsutilTestCase):
def test_virtual_memory(self):
out = sh('/usr/bin/svmon -O unit=KB')
diff --git a/psutil/tests/test_bsd.py b/psutil/tests/test_bsd.py
index 427b9219..598ec0bf 100755
--- a/psutil/tests/test_bsd.py
+++ b/psutil/tests/test_bsd.py
@@ -22,6 +22,7 @@ from psutil import NETBSD
from psutil import OPENBSD
from psutil.tests import get_test_subprocess
from psutil.tests import HAS_BATTERY
+from psutil.tests import PsutilTestCase
from psutil.tests import retry_on_failure
from psutil.tests import sh
from psutil.tests import SYSMEM_TOLERANCE
@@ -72,7 +73,7 @@ def muse(field):
@unittest.skipIf(not BSD, "BSD only")
-class BSDTestCase(unittest.TestCase):
+class BSDTestCase(PsutilTestCase):
"""Generic tests common to all BSD variants."""
@classmethod
@@ -148,7 +149,7 @@ class BSDTestCase(unittest.TestCase):
@unittest.skipIf(not FREEBSD, "FREEBSD only")
-class FreeBSDProcessTestCase(unittest.TestCase):
+class FreeBSDPsutilTestCase(PsutilTestCase):
@classmethod
def setUpClass(cls):
@@ -238,7 +239,7 @@ class FreeBSDProcessTestCase(unittest.TestCase):
@unittest.skipIf(not FREEBSD, "FREEBSD only")
-class FreeBSDSystemTestCase(unittest.TestCase):
+class FreeBSDSystemTestCase(PsutilTestCase):
@staticmethod
def parse_swapinfo():
@@ -479,7 +480,7 @@ class FreeBSDSystemTestCase(unittest.TestCase):
@unittest.skipIf(not OPENBSD, "OPENBSD only")
-class OpenBSDTestCase(unittest.TestCase):
+class OpenBSDTestCase(PsutilTestCase):
def test_boot_time(self):
s = sysctl('kern.boottime')
@@ -494,7 +495,7 @@ class OpenBSDTestCase(unittest.TestCase):
@unittest.skipIf(not NETBSD, "NETBSD only")
-class NetBSDTestCase(unittest.TestCase):
+class NetBSDTestCase(PsutilTestCase):
@staticmethod
def parse_meminfo(look_for):
diff --git a/psutil/tests/test_connections.py b/psutil/tests/test_connections.py
index 9c1bbe8e..7fcc2f98 100755
--- a/psutil/tests/test_connections.py
+++ b/psutil/tests/test_connections.py
@@ -37,9 +37,8 @@ from psutil.tests import CIRRUS
from psutil.tests import create_sockets
from psutil.tests import enum
from psutil.tests import get_free_port
-from psutil.tests import get_testfn
from psutil.tests import HAS_CONNECTIONS_UNIX
-from psutil.tests import ProcessTestCase
+from psutil.tests import PsutilTestCase
from psutil.tests import serialrun
from psutil.tests import skip_on_access_denied
from psutil.tests import SKIP_SYSCONS
@@ -55,7 +54,7 @@ SOCK_SEQPACKET = getattr(socket, "SOCK_SEQPACKET", object())
@serialrun
-class _ConnTestCase(ProcessTestCase):
+class _ConnTestCase(PsutilTestCase):
def setUp(self):
if not (NETBSD or FREEBSD):
@@ -267,14 +266,16 @@ class TestUnconnectedSockets(_ConnTestCase):
@unittest.skipIf(not POSIX, 'POSIX only')
def test_unix_tcp(self):
- with closing(bind_unix_socket(get_testfn(), type=SOCK_STREAM)) as sock:
+ testfn = self.get_testfn()
+ with closing(bind_unix_socket(testfn, type=SOCK_STREAM)) as sock:
conn = self.check_socket(sock)
assert not conn.raddr
self.assertEqual(conn.status, psutil.CONN_NONE)
@unittest.skipIf(not POSIX, 'POSIX only')
def test_unix_udp(self):
- with closing(bind_unix_socket(get_testfn(), type=SOCK_STREAM)) as sock:
+ testfn = self.get_testfn()
+ with closing(bind_unix_socket(testfn, type=SOCK_STREAM)) as sock:
conn = self.check_socket(sock)
assert not conn.raddr
self.assertEqual(conn.status, psutil.CONN_NONE)
@@ -310,7 +311,7 @@ class TestConnectedSocket(_ConnTestCase):
@unittest.skipIf(not POSIX, 'POSIX only')
def test_unix(self):
- testfn = get_testfn()
+ testfn = self.get_testfn()
server, client = unix_socketpair(testfn)
try:
cons = thisproc.connections(kind='unix')
@@ -433,7 +434,7 @@ class TestFilters(_ConnTestCase):
""")
# must be relative on Windows
- testfile = os.path.basename(get_testfn(dir=os.getcwd()))
+ testfile = os.path.basename(self.get_testfn(dir=os.getcwd()))
tcp4_template = string.Template(tcp_template).substitute(
family=int(AF_INET), addr="127.0.0.1", testfn=testfile)
udp4_template = string.Template(udp_template).substitute(
@@ -582,16 +583,14 @@ class TestSystemWideConnections(_ConnTestCase):
times = 10
fnames = []
for i in range(times):
- fname = get_testfn()
+ fname = self.get_testfn()
fnames.append(fname)
src = textwrap.dedent("""\
import time, os
- from psutil.tests import create_sockets, cleanup_test_files
+ from psutil.tests import create_sockets
with create_sockets():
with open(r'%s', 'w') as f:
f.write("hello")
- # 2 UNIX test socket files are created
- cleanup_test_files()
time.sleep(60)
""" % fname)
sproc = self.pyrun(src)
@@ -610,7 +609,7 @@ class TestSystemWideConnections(_ConnTestCase):
self.assertEqual(len(p.connections('all')), expected)
-class TestMisc(unittest.TestCase):
+class TestMisc(PsutilTestCase):
def test_connection_constants(self):
ints = []
diff --git a/psutil/tests/test_contracts.py b/psutil/tests/test_contracts.py
index 69bb0b2f..a80a6b78 100755
--- a/psutil/tests/test_contracts.py
+++ b/psutil/tests/test_contracts.py
@@ -37,6 +37,7 @@ from psutil.tests import HAS_RLIMIT
from psutil.tests import HAS_SENSORS_FANS
from psutil.tests import HAS_SENSORS_TEMPERATURES
from psutil.tests import is_namedtuple
+from psutil.tests import PsutilTestCase
from psutil.tests import SKIP_SYSCONS
from psutil.tests import unittest
from psutil.tests import VALID_PROC_STATUSES
@@ -51,7 +52,7 @@ import psutil
# Make sure code reflects what doc promises in terms of APIs
# availability.
-class TestAvailConstantsAPIs(unittest.TestCase):
+class TestAvailConstantsAPIs(PsutilTestCase):
def test_PROCFS_PATH(self):
self.assertEqual(hasattr(psutil, "PROCFS_PATH"),
@@ -105,7 +106,7 @@ class TestAvailConstantsAPIs(unittest.TestCase):
ae(hasattr(psutil, "RLIMIT_SIGPENDING"), hasit)
-class TestAvailSystemAPIs(unittest.TestCase):
+class TestAvailSystemAPIs(PsutilTestCase):
def test_win_service_iter(self):
self.assertEqual(hasattr(psutil, "win_service_iter"), WINDOWS)
@@ -132,7 +133,7 @@ class TestAvailSystemAPIs(unittest.TestCase):
LINUX or WINDOWS or FREEBSD or MACOS)
-class TestAvailProcessAPIs(unittest.TestCase):
+class TestAvailProcessAPIs(PsutilTestCase):
def test_environ(self):
self.assertEqual(hasattr(psutil.Process, "environ"),
@@ -182,7 +183,7 @@ class TestAvailProcessAPIs(unittest.TestCase):
# ===================================================================
-class TestSystemAPITypes(unittest.TestCase):
+class TestSystemAPITypes(PsutilTestCase):
"""Check the return types of system related APIs.
Mainly we want to test we never return unicode on Python 2, see:
https://github.com/giampaolo/psutil/issues/1039
@@ -312,7 +313,7 @@ class TestSystemAPITypes(unittest.TestCase):
# ===================================================================
-class TestFetchAllProcesses(unittest.TestCase):
+class TestFetchAllProcesses(PsutilTestCase):
"""Test which iterates over all running processes and performs
some sanity checks against Process API's returned values.
"""
diff --git a/psutil/tests/test_linux.py b/psutil/tests/test_linux.py
index bac20b05..fcc9d5db 100755
--- a/psutil/tests/test_linux.py
+++ b/psutil/tests/test_linux.py
@@ -28,13 +28,12 @@ from psutil._compat import FileNotFoundError
from psutil._compat import PY3
from psutil._compat import u
from psutil.tests import call_until
-from psutil.tests import get_testfn
from psutil.tests import HAS_BATTERY
from psutil.tests import HAS_CPU_FREQ
from psutil.tests import HAS_GETLOADAVG
from psutil.tests import HAS_RLIMIT
from psutil.tests import mock
-from psutil.tests import ProcessTestCase
+from psutil.tests import PsutilTestCase
from psutil.tests import PYPY
from psutil.tests import reload_module
from psutil.tests import retry_on_failure
@@ -188,7 +187,7 @@ def mock_open_exception(for_path, exc):
@unittest.skipIf(not LINUX, "LINUX only")
-class TestSystemVirtualMemory(unittest.TestCase):
+class TestSystemVirtualMemory(PsutilTestCase):
def test_total(self):
# free_value = free_physmem().total
@@ -494,7 +493,7 @@ class TestSystemVirtualMemory(unittest.TestCase):
@unittest.skipIf(not LINUX, "LINUX only")
-class TestSystemSwapMemory(unittest.TestCase):
+class TestSystemSwapMemory(PsutilTestCase):
@staticmethod
def meminfo_has_swap_info():
@@ -588,7 +587,7 @@ class TestSystemSwapMemory(unittest.TestCase):
@unittest.skipIf(not LINUX, "LINUX only")
-class TestSystemCPUTimes(unittest.TestCase):
+class TestSystemCPUTimes(PsutilTestCase):
@unittest.skipIf(TRAVIS, "unknown failure on travis")
def test_fields(self):
@@ -610,7 +609,7 @@ class TestSystemCPUTimes(unittest.TestCase):
@unittest.skipIf(not LINUX, "LINUX only")
-class TestSystemCPUCountLogical(unittest.TestCase):
+class TestSystemCPUCountLogical(PsutilTestCase):
@unittest.skipIf(not os.path.exists("/sys/devices/system/cpu/online"),
"/sys/devices/system/cpu/online does not exist")
@@ -674,7 +673,7 @@ class TestSystemCPUCountLogical(unittest.TestCase):
@unittest.skipIf(not LINUX, "LINUX only")
-class TestSystemCPUCountPhysical(unittest.TestCase):
+class TestSystemCPUCountPhysical(PsutilTestCase):
@unittest.skipIf(not which("lscpu"), "lscpu utility not available")
def test_against_lscpu(self):
@@ -695,7 +694,7 @@ class TestSystemCPUCountPhysical(unittest.TestCase):
@unittest.skipIf(not LINUX, "LINUX only")
-class TestSystemCPUFrequency(unittest.TestCase):
+class TestSystemCPUFrequency(PsutilTestCase):
@unittest.skipIf(TRAVIS, "fails on Travis")
@unittest.skipIf(not HAS_CPU_FREQ, "not supported")
@@ -843,7 +842,7 @@ class TestSystemCPUFrequency(unittest.TestCase):
@unittest.skipIf(not LINUX, "LINUX only")
-class TestSystemCPUStats(unittest.TestCase):
+class TestSystemCPUStats(PsutilTestCase):
@unittest.skipIf(TRAVIS, "fails on Travis")
def test_ctx_switches(self):
@@ -859,7 +858,7 @@ class TestSystemCPUStats(unittest.TestCase):
@unittest.skipIf(not LINUX, "LINUX only")
-class TestLoadAvg(unittest.TestCase):
+class TestLoadAvg(PsutilTestCase):
@unittest.skipIf(not HAS_GETLOADAVG, "not supported")
def test_getloadavg(self):
@@ -878,7 +877,7 @@ class TestLoadAvg(unittest.TestCase):
@unittest.skipIf(not LINUX, "LINUX only")
-class TestSystemNetIfAddrs(unittest.TestCase):
+class TestSystemNetIfAddrs(PsutilTestCase):
def test_ips(self):
for name, addrs in psutil.net_if_addrs().items():
@@ -907,7 +906,7 @@ class TestSystemNetIfAddrs(unittest.TestCase):
@unittest.skipIf(not LINUX, "LINUX only")
-class TestSystemNetIfStats(unittest.TestCase):
+class TestSystemNetIfStats(PsutilTestCase):
def test_against_ifconfig(self):
for name, stats in psutil.net_if_stats().items():
@@ -923,7 +922,7 @@ class TestSystemNetIfStats(unittest.TestCase):
@unittest.skipIf(not LINUX, "LINUX only")
-class TestSystemNetIOCounters(unittest.TestCase):
+class TestSystemNetIOCounters(PsutilTestCase):
@retry_on_failure()
def test_against_ifconfig(self):
@@ -969,7 +968,7 @@ class TestSystemNetIOCounters(unittest.TestCase):
@unittest.skipIf(not LINUX, "LINUX only")
-class TestSystemNetConnections(unittest.TestCase):
+class TestSystemNetConnections(PsutilTestCase):
@mock.patch('psutil._pslinux.socket.inet_ntop', side_effect=ValueError)
@mock.patch('psutil._pslinux.supports_ipv6', return_value=False)
@@ -1002,7 +1001,7 @@ class TestSystemNetConnections(unittest.TestCase):
@unittest.skipIf(not LINUX, "LINUX only")
-class TestSystemDiskPartitions(unittest.TestCase):
+class TestSystemDiskPartitions(PsutilTestCase):
@unittest.skipIf(not hasattr(os, 'statvfs'), "os.statvfs() not available")
@skip_on_not_implemented()
@@ -1067,7 +1066,7 @@ class TestSystemDiskPartitions(unittest.TestCase):
@unittest.skipIf(not LINUX, "LINUX only")
-class TestSystemDiskIoCounters(unittest.TestCase):
+class TestSystemDiskIoCounters(PsutilTestCase):
def test_emulate_kernel_2_4(self):
# Tests /proc/diskstats parsing format for 2.4 kernels, see:
@@ -1208,7 +1207,7 @@ class TestSystemDiskIoCounters(unittest.TestCase):
@unittest.skipIf(not LINUX, "LINUX only")
-class TestMisc(unittest.TestCase):
+class TestMisc(PsutilTestCase):
def test_boot_time(self):
vmstat_value = vmstat('boot time')
@@ -1216,7 +1215,7 @@ class TestMisc(unittest.TestCase):
self.assertEqual(int(vmstat_value), int(psutil_value))
def test_no_procfs_on_import(self):
- my_procfs = get_testfn()
+ my_procfs = self.get_testfn()
os.mkdir(my_procfs)
with open(os.path.join(my_procfs, 'stat'), 'w') as f:
@@ -1345,7 +1344,7 @@ class TestMisc(unittest.TestCase):
assert m.called
def test_procfs_path(self):
- tdir = get_testfn()
+ tdir = self.get_testfn()
os.mkdir(tdir)
try:
psutil.PROCFS_PATH = tdir
@@ -1397,7 +1396,7 @@ class TestMisc(unittest.TestCase):
@unittest.skipIf(not LINUX, "LINUX only")
@unittest.skipIf(not HAS_BATTERY, "no battery")
-class TestSensorsBattery(unittest.TestCase):
+class TestSensorsBattery(PsutilTestCase):
@unittest.skipIf(not which("acpi"), "acpi utility not available")
def test_percent(self):
@@ -1545,7 +1544,7 @@ class TestSensorsBattery(unittest.TestCase):
@unittest.skipIf(not LINUX, "LINUX only")
-class TestSensorsTemperatures(unittest.TestCase):
+class TestSensorsTemperatures(PsutilTestCase):
def test_emulate_class_hwmon(self):
def open_mock(name, *args, **kwargs):
@@ -1611,7 +1610,7 @@ class TestSensorsTemperatures(unittest.TestCase):
@unittest.skipIf(not LINUX, "LINUX only")
-class TestSensorsFans(unittest.TestCase):
+class TestSensorsFans(PsutilTestCase):
def test_emulate_data(self):
def open_mock(name, *args, **kwargs):
@@ -1640,11 +1639,11 @@ class TestSensorsFans(unittest.TestCase):
@unittest.skipIf(not LINUX, "LINUX only")
-class TestProcess(ProcessTestCase):
+class TestProcess(PsutilTestCase):
@retry_on_failure()
def test_memory_full_info(self):
- testfn = get_testfn()
+ testfn = self.get_testfn()
src = textwrap.dedent("""
import time
with open("%s", "w") as f:
@@ -1713,7 +1712,7 @@ class TestProcess(ProcessTestCase):
raise RuntimeError("timeout looking for test file")
#
- testfn = get_testfn()
+ testfn = self.get_testfn()
with open(testfn, "w"):
self.assertEqual(get_test_file(testfn).mode, "w")
with open(testfn, "r"):
@@ -1741,7 +1740,7 @@ class TestProcess(ProcessTestCase):
# execution
p = psutil.Process()
files = p.open_files()
- with open(get_testfn(), 'w'):
+ with open(self.get_testfn(), 'w'):
# give the kernel some time to see the new file
call_until(p.open_files, "len(ret) != %i" % len(files))
with mock.patch('psutil._pslinux.os.readlink',
@@ -1762,7 +1761,7 @@ class TestProcess(ProcessTestCase):
# https://travis-ci.org/giampaolo/psutil/jobs/225694530
p = psutil.Process()
files = p.open_files()
- with open(get_testfn(), 'w'):
+ with open(self.get_testfn(), 'w'):
# give the kernel some time to see the new file
call_until(p.open_files, "len(ret) != %i" % len(files))
patch_point = 'builtins.open' if PY3 else '__builtin__.open'
@@ -2009,7 +2008,7 @@ class TestProcess(ProcessTestCase):
@unittest.skipIf(not LINUX, "LINUX only")
-class TestProcessAgainstStatus(unittest.TestCase):
+class TestProcessAgainstStatus(PsutilTestCase):
"""/proc/pid/stat and /proc/pid/status have many values in common.
Whenever possible, psutil uses /proc/pid/stat (it's faster).
For all those cases we check that the value found in
@@ -2092,7 +2091,7 @@ class TestProcessAgainstStatus(unittest.TestCase):
@unittest.skipIf(not LINUX, "LINUX only")
-class TestUtils(unittest.TestCase):
+class TestUtils(PsutilTestCase):
def test_readlink(self):
with mock.patch("os.readlink", return_value="foo (deleted)") as m:
@@ -2100,7 +2099,7 @@ class TestUtils(unittest.TestCase):
assert m.called
def test_cat(self):
- testfn = get_testfn()
+ testfn = self.get_testfn()
with open(testfn, "wt") as f:
f.write("foo ")
self.assertEqual(psutil._psplatform.cat(testfn, binary=False), "foo")
diff --git a/psutil/tests/test_memory_leaks.py b/psutil/tests/test_memory_leaks.py
index 9069b1a3..3d004f0a 100755
--- a/psutil/tests/test_memory_leaks.py
+++ b/psutil/tests/test_memory_leaks.py
@@ -46,7 +46,6 @@ from psutil.tests import HAS_RLIMIT
from psutil.tests import HAS_SENSORS_BATTERY
from psutil.tests import HAS_SENSORS_FANS
from psutil.tests import HAS_SENSORS_TEMPERATURES
-from psutil.tests import reap_children
from psutil.tests import skip_on_access_denied
from psutil.tests import TestMemoryLeak
from psutil.tests import TRAVIS
@@ -288,11 +287,6 @@ class TestTerminatedProcessLeaks(TestProcessObjectLeaks):
cls.proc.kill()
cls.proc.wait()
- @classmethod
- def tearDownClass(cls):
- super().tearDownClass()
- reap_children()
-
def _call(self, fun):
try:
fun()
diff --git a/psutil/tests/test_misc.py b/psutil/tests/test_misc.py
index 18781e75..4fb8ba5a 100755
--- a/psutil/tests/test_misc.py
+++ b/psutil/tests/test_misc.py
@@ -37,6 +37,7 @@ from psutil.tests import HAS_SENSORS_FANS
from psutil.tests import HAS_SENSORS_TEMPERATURES
from psutil.tests import import_module_by_path
from psutil.tests import mock
+from psutil.tests import PsutilTestCase
from psutil.tests import PYTHON_EXE
from psutil.tests import reload_module
from psutil.tests import ROOT_DIR
@@ -53,7 +54,7 @@ import psutil.tests
# ===================================================================
-class TestMisc(unittest.TestCase):
+class TestMisc(PsutilTestCase):
def test_process__repr__(self, func=repr):
p = psutil.Process()
@@ -386,7 +387,7 @@ class TestMisc(unittest.TestCase):
nt = collections.namedtuple('foo', 'a b c')
-class TestWrapNumbers(unittest.TestCase):
+class TestWrapNumbers(PsutilTestCase):
def setUp(self):
wrap_numbers.cache_clear()
@@ -627,7 +628,7 @@ class TestWrapNumbers(unittest.TestCase):
@unittest.skipIf(not os.path.exists(SCRIPTS_DIR),
"can't locate scripts directory")
-class TestScripts(unittest.TestCase):
+class TestScripts(PsutilTestCase):
"""Tests for scripts in the "scripts" directory."""
@staticmethod
diff --git a/psutil/tests/test_osx.py b/psutil/tests/test_osx.py
index 4df6a884..c2e6ad72 100755
--- a/psutil/tests/test_osx.py
+++ b/psutil/tests/test_osx.py
@@ -15,9 +15,10 @@ from psutil import MACOS
from psutil.tests import create_zombie_proc
from psutil.tests import get_test_subprocess
from psutil.tests import HAS_BATTERY
-from psutil.tests import SYSMEM_TOLERANCE
+from psutil.tests import PsutilTestCase
from psutil.tests import retry_on_failure
from psutil.tests import sh
+from psutil.tests import SYSMEM_TOLERANCE
from psutil.tests import terminate
from psutil.tests import unittest
@@ -76,7 +77,7 @@ def human2bytes(s):
@unittest.skipIf(not MACOS, "MACOS only")
-class TestProcess(unittest.TestCase):
+class TestProcess(PsutilTestCase):
@classmethod
def setUpClass(cls):
@@ -102,7 +103,7 @@ class TestProcess(unittest.TestCase):
# TODO: probably needs removal (duplicate)
@unittest.skipIf(not MACOS, "MACOS only")
-class TestZombieProcessAPIs(unittest.TestCase):
+class TestZombieProcessAPIs(PsutilTestCase):
@classmethod
def setUpClass(cls):
@@ -160,7 +161,7 @@ class TestZombieProcessAPIs(unittest.TestCase):
@unittest.skipIf(not MACOS, "MACOS only")
-class TestSystemAPIs(unittest.TestCase):
+class TestSystemAPIs(PsutilTestCase):
# --- disk
diff --git a/psutil/tests/test_posix.py b/psutil/tests/test_posix.py
index 1b37fa2f..9eeb5c2b 100755
--- a/psutil/tests/test_posix.py
+++ b/psutil/tests/test_posix.py
@@ -27,6 +27,7 @@ from psutil.tests import get_kernel_version
from psutil.tests import get_test_subprocess
from psutil.tests import HAS_NET_IO_COUNTERS
from psutil.tests import mock
+from psutil.tests import PsutilTestCase
from psutil.tests import PYTHON_EXE
from psutil.tests import retry_on_failure
from psutil.tests import sh
@@ -126,7 +127,7 @@ def ps_vsz(pid):
@unittest.skipIf(not POSIX, "POSIX only")
-class TestProcess(unittest.TestCase):
+class TestProcess(PsutilTestCase):
"""Compare psutil results against 'ps' command line utility (mainly)."""
@classmethod
@@ -326,7 +327,7 @@ class TestProcess(unittest.TestCase):
@unittest.skipIf(not POSIX, "POSIX only")
-class TestSystemAPIs(unittest.TestCase):
+class TestSystemAPIs(PsutilTestCase):
"""Test some system APIs."""
@retry_on_failure()
diff --git a/psutil/tests/test_process.py b/psutil/tests/test_process.py
index c87df1b1..11ab725b 100755
--- a/psutil/tests/test_process.py
+++ b/psutil/tests/test_process.py
@@ -34,13 +34,13 @@ from psutil import WINDOWS
from psutil._common import open_text
from psutil._compat import long
from psutil._compat import PY3
+from psutil._compat import super
from psutil.tests import APPVEYOR
from psutil.tests import call_until
from psutil.tests import CIRRUS
from psutil.tests import copyload_shared_lib
from psutil.tests import create_exe
from psutil.tests import enum
-from psutil.tests import get_testfn
from psutil.tests import HAS_CPU_AFFINITY
from psutil.tests import HAS_ENVIRON
from psutil.tests import HAS_IONICE
@@ -50,7 +50,7 @@ from psutil.tests import HAS_PROC_IO_COUNTERS
from psutil.tests import HAS_RLIMIT
from psutil.tests import HAS_THREADS
from psutil.tests import mock
-from psutil.tests import ProcessTestCase
+from psutil.tests import PsutilTestCase
from psutil.tests import PYPY
from psutil.tests import PYTHON_EXE
from psutil.tests import reap_children
@@ -69,7 +69,7 @@ from psutil.tests import wait_for_pid
# ===================================================================
-class TestProcess(ProcessTestCase):
+class TestProcess(PsutilTestCase):
"""Tests for psutil.Process class."""
def test_pid(self):
@@ -180,23 +180,23 @@ class TestProcess(ProcessTestCase):
def test_wait_non_children(self):
# Test wait() against a process which is not our direct
# child.
- p1, p2 = self.create_proc_children_pair()
- self.assertRaises(psutil.TimeoutExpired, p1.wait, 0.01)
- self.assertRaises(psutil.TimeoutExpired, p2.wait, 0.01)
+ child, grandchild = self.create_proc_children_pair()
+ self.assertRaises(psutil.TimeoutExpired, child.wait, 0.01)
+ self.assertRaises(psutil.TimeoutExpired, grandchild.wait, 0.01)
# We also terminate the direct child otherwise the
# grandchild will hang until the parent is gone.
- p1.terminate()
- p2.terminate()
- ret1 = p1.wait()
- ret2 = p2.wait()
+ child.terminate()
+ grandchild.terminate()
+ child_ret = child.wait()
+ grandchild_ret = grandchild.wait()
if POSIX:
- self.assertEqual(ret1, -signal.SIGTERM)
+ self.assertEqual(child_ret, -signal.SIGTERM)
# For processes which are not our children we're supposed
# to get None.
- self.assertEqual(ret2, None)
+ self.assertEqual(grandchild_ret, None)
else:
- self.assertEqual(ret1, signal.SIGTERM)
- self.assertEqual(ret1, signal.SIGTERM)
+ self.assertEqual(child_ret, signal.SIGTERM)
+ self.assertEqual(child_ret, signal.SIGTERM)
def test_wait_timeout_0(self):
sproc = self.get_test_subprocess()
@@ -316,7 +316,7 @@ class TestProcess(ProcessTestCase):
# test writes
io1 = p.io_counters()
- with open(get_testfn(), 'wb') as f:
+ with open(self.get_testfn(), 'wb') as f:
if PY3:
f.write(bytes("x" * 1000000, 'ascii'))
else:
@@ -449,7 +449,7 @@ class TestProcess(ProcessTestCase):
@unittest.skipIf(not HAS_RLIMIT, "not supported")
def test_rlimit(self):
- testfn = get_testfn()
+ testfn = self.get_testfn()
p = psutil.Process()
soft, hard = p.rlimit(psutil.RLIMIT_FSIZE)
try:
@@ -471,7 +471,7 @@ class TestProcess(ProcessTestCase):
def test_rlimit_infinity(self):
# First set a limit, then re-set it by specifying INFINITY
# and assume we overridden the previous limit.
- testfn = get_testfn()
+ testfn = self.get_testfn()
p = psutil.Process()
soft, hard = p.rlimit(psutil.RLIMIT_FSIZE)
try:
@@ -718,7 +718,7 @@ class TestProcess(ProcessTestCase):
@unittest.skipIf(PYPY, "broken on PYPY")
def test_long_cmdline(self):
- testfn = get_testfn()
+ testfn = self.get_testfn()
create_exe(testfn)
cmdline = [testfn] + (["0123456789"] * 20)
sproc = self.get_test_subprocess(cmdline)
@@ -733,7 +733,7 @@ class TestProcess(ProcessTestCase):
@unittest.skipIf(PYPY, "unreliable on PYPY")
def test_long_name(self):
- testfn = get_testfn(suffix="0123456789" * 2)
+ testfn = self.get_testfn(suffix="0123456789" * 2)
create_exe(testfn)
sproc = self.get_test_subprocess(testfn)
p = psutil.Process(sproc.pid)
@@ -747,7 +747,7 @@ class TestProcess(ProcessTestCase):
# Test that name(), exe() and cmdline() correctly handle programs
# with funky chars such as spaces and ")", see:
# https://github.com/giampaolo/psutil/issues/628
- funky_path = get_testfn(suffix='foo bar )')
+ funky_path = self.get_testfn(suffix='foo bar )')
create_exe(funky_path)
cmdline = [funky_path, "-c",
"import time; [time.sleep(0.01) for x in range(3000)];"
@@ -938,7 +938,7 @@ class TestProcess(ProcessTestCase):
@unittest.skipIf(APPVEYOR, "unreliable on APPVEYOR")
def test_open_files(self):
# current process
- testfn = get_testfn()
+ testfn = self.get_testfn()
p = psutil.Process()
files = p.open_files()
self.assertFalse(testfn in files)
@@ -978,7 +978,7 @@ class TestProcess(ProcessTestCase):
def test_open_files_2(self):
# test fd and path fields
normcase = os.path.normcase
- testfn = get_testfn()
+ testfn = self.get_testfn()
with open(testfn, 'w') as fileobj:
p = psutil.Process()
for file in p.open_files():
@@ -1001,7 +1001,7 @@ class TestProcess(ProcessTestCase):
@unittest.skipIf(not POSIX, 'POSIX only')
def test_num_fds(self):
- testfn = get_testfn()
+ testfn = self.get_testfn()
p = psutil.Process()
start = p.num_fds()
file = open(testfn, 'w')
@@ -1028,12 +1028,9 @@ class TestProcess(ProcessTestCase):
def test_ppid(self):
if hasattr(os, 'getppid'):
self.assertEqual(psutil.Process().ppid(), os.getppid())
- this_parent = os.getpid()
sproc = self.get_test_subprocess()
p = psutil.Process(sproc.pid)
- self.assertEqual(p.ppid(), this_parent)
- # no other process is supposed to have us as parent
- reap_children(recursive=True)
+ self.assertEqual(p.ppid(), os.getpid())
if APPVEYOR:
# Occasional failures, see:
# https://ci.appveyor.com/project/giampaolo/psutil/build/
@@ -1043,21 +1040,20 @@ class TestProcess(ProcessTestCase):
if p.pid == sproc.pid:
continue
# XXX: sometimes this fails on Windows; not sure why.
- self.assertNotEqual(p.ppid(), this_parent, msg=p)
+ self.assertNotEqual(p.ppid(), os.getpid(), msg=p)
def test_parent(self):
- this_parent = os.getpid()
sproc = self.get_test_subprocess()
p = psutil.Process(sproc.pid)
- self.assertEqual(p.parent().pid, this_parent)
+ self.assertEqual(p.parent().pid, os.getpid())
lowest_pid = psutil.pids()[0]
self.assertIsNone(psutil.Process(lowest_pid).parent())
def test_parent_multi(self):
- p1, p2 = self.create_proc_children_pair()
- self.assertEqual(p2.parent(), p1)
- self.assertEqual(p1.parent(), psutil.Process())
+ child, grandchild = self.create_proc_children_pair()
+ self.assertEqual(grandchild.parent(), child)
+ self.assertEqual(child.parent(), psutil.Process())
def test_parent_disappeared(self):
# Emulate a case where the parent process disappeared.
@@ -1070,13 +1066,12 @@ class TestProcess(ProcessTestCase):
@retry_on_failure()
def test_parents(self):
assert psutil.Process().parents()
- p1, p2 = self.create_proc_children_pair()
- self.assertEqual(p1.parents()[0], psutil.Process())
- self.assertEqual(p2.parents()[0], p1)
- self.assertEqual(p2.parents()[1], psutil.Process())
+ child, grandchild = self.create_proc_children_pair()
+ self.assertEqual(child.parents()[0], psutil.Process())
+ self.assertEqual(grandchild.parents()[0], child)
+ self.assertEqual(grandchild.parents()[1], psutil.Process())
def test_children(self):
- reap_children(recursive=True)
p = psutil.Process()
self.assertEqual(p.children(), [])
self.assertEqual(p.children(recursive=True), [])
@@ -1094,14 +1089,14 @@ class TestProcess(ProcessTestCase):
def test_children_recursive(self):
# Test children() against two sub processes, p1 and p2, where
# p1 (our child) spawned p2 (our grandchild).
- p1, p2 = self.create_proc_children_pair()
+ child, grandchild = self.create_proc_children_pair()
p = psutil.Process()
- self.assertEqual(p.children(), [p1])
- self.assertEqual(p.children(recursive=True), [p1, p2])
+ self.assertEqual(p.children(), [child])
+ self.assertEqual(p.children(recursive=True), [child, grandchild])
# If the intermediate process is gone there's no way for
# children() to recursively find it.
- p1.terminate()
- p1.wait()
+ child.terminate()
+ child.wait()
self.assertEqual(p.children(recursive=True), [])
def test_children_duplicates(self):
@@ -1123,16 +1118,16 @@ class TestProcess(ProcessTestCase):
self.assertEqual(len(c), len(set(c)))
def test_parents_and_children(self):
- p1, p2 = self.create_proc_children_pair()
+ child, grandchild = self.create_proc_children_pair()
me = psutil.Process()
# forward
children = me.children(recursive=True)
self.assertEqual(len(children), 2)
- self.assertEqual(children[0], p1)
- self.assertEqual(children[1], p2)
+ self.assertEqual(children[0], child)
+ self.assertEqual(children[1], grandchild)
# backward
- parents = p2.parents()
- self.assertEqual(parents[0], p1)
+ parents = grandchild.parents()
+ self.assertEqual(parents[0], child)
self.assertEqual(parents[1], me)
def test_suspend_resume(self):
@@ -1488,7 +1483,7 @@ class TestProcess(ProcessTestCase):
return execve("/bin/cat", argv, envp);
}
""")
- path = get_testfn()
+ path = self.get_testfn()
create_exe(path, c_code=code)
sproc = self.get_test_subprocess(
[path], stdin=subprocess.PIPE, stderr=subprocess.PIPE)
@@ -1533,14 +1528,14 @@ if POSIX and os.getuid() == 0:
setattr(self, attr, types.MethodType(test_, self))
def setUp(self):
- TestProcess.setUp(self)
+ super().setUp()
os.setegid(1000)
os.seteuid(1000)
def tearDown(self):
os.setegid(self.PROCESS_UID)
os.seteuid(self.PROCESS_GID)
- TestProcess.tearDown(self)
+ super().tearDown()
def test_nice(self):
try:
@@ -1560,7 +1555,7 @@ if POSIX and os.getuid() == 0:
# ===================================================================
-class TestPopen(unittest.TestCase):
+class TestPopen(PsutilTestCase):
"""Tests for psutil.Popen class."""
@classmethod
diff --git a/psutil/tests/test_sunos.py b/psutil/tests/test_sunos.py
index bac1a212..ad94f774 100755
--- a/psutil/tests/test_sunos.py
+++ b/psutil/tests/test_sunos.py
@@ -10,12 +10,13 @@ import os
import psutil
from psutil import SUNOS
+from psutil.tests import PsutilTestCase
from psutil.tests import sh
from psutil.tests import unittest
@unittest.skipIf(not SUNOS, "SUNOS only")
-class SunOSSpecificTestCase(unittest.TestCase):
+class SunOSSpecificTestCase(PsutilTestCase):
def test_swap_memory(self):
out = sh('env PATH=/usr/sbin:/sbin:%s swap -l' % os.environ['PATH'])
diff --git a/psutil/tests/test_system.py b/psutil/tests/test_system.py
index d0817459..eda6db01 100755..100644
--- a/psutil/tests/test_system.py
+++ b/psutil/tests/test_system.py
@@ -35,7 +35,6 @@ from psutil.tests import check_net_address
from psutil.tests import CI_TESTING
from psutil.tests import DEVNULL
from psutil.tests import enum
-from psutil.tests import get_testfn
from psutil.tests import HAS_BATTERY
from psutil.tests import HAS_CPU_FREQ
from psutil.tests import HAS_GETLOADAVG
@@ -44,7 +43,7 @@ from psutil.tests import HAS_SENSORS_BATTERY
from psutil.tests import HAS_SENSORS_FANS
from psutil.tests import HAS_SENSORS_TEMPERATURES
from psutil.tests import mock
-from psutil.tests import ProcessTestCase
+from psutil.tests import PsutilTestCase
from psutil.tests import PYPY
from psutil.tests import retry_on_failure
from psutil.tests import TRAVIS
@@ -57,7 +56,7 @@ from psutil.tests import unittest
# ===================================================================
-class TestProcessAPIs(ProcessTestCase):
+class TestProcessAPIs(PsutilTestCase):
def test_process_iter(self):
self.assertIn(os.getpid(), [x.pid for x in psutil.process_iter()])
@@ -190,7 +189,7 @@ class TestProcessAPIs(ProcessTestCase):
self.assertFalse(psutil.pid_exists(pid), msg=pid)
-class TestMiscAPIs(unittest.TestCase):
+class TestMiscAPIs(PsutilTestCase):
def test_boot_time(self):
bt = psutil.boot_time()
@@ -272,7 +271,7 @@ class TestMiscAPIs(unittest.TestCase):
self.assertIs(getattr(psutil, name), False, msg=name)
-class TestMemoryAPIs(unittest.TestCase):
+class TestMemoryAPIs(PsutilTestCase):
def test_virtual_memory(self):
mem = psutil.virtual_memory()
@@ -309,7 +308,7 @@ class TestMemoryAPIs(unittest.TestCase):
assert mem.sout >= 0, mem
-class TestCpuAPIs(unittest.TestCase):
+class TestCpuAPIs(PsutilTestCase):
def test_cpu_count_logical(self):
logical = psutil.cpu_count()
@@ -550,7 +549,7 @@ class TestCpuAPIs(unittest.TestCase):
self.assertGreaterEqual(load, 0.0)
-class TestDiskAPIs(unittest.TestCase):
+class TestDiskAPIs(PsutilTestCase):
def test_disk_usage(self):
usage = psutil.disk_usage(os.getcwd())
@@ -574,7 +573,7 @@ class TestDiskAPIs(unittest.TestCase):
# if path does not exist OSError ENOENT is expected across
# all platforms
- fname = get_testfn()
+ fname = self.get_testfn()
with self.assertRaises(FileNotFoundError):
psutil.disk_usage(fname)
@@ -684,7 +683,7 @@ class TestDiskAPIs(unittest.TestCase):
assert m.called
-class TestNetAPIs(unittest.TestCase):
+class TestNetAPIs(PsutilTestCase):
@unittest.skipIf(not HAS_NET_IO_COUNTERS, 'not supported')
def test_net_io_counters(self):
@@ -829,7 +828,7 @@ class TestNetAPIs(unittest.TestCase):
assert m.called
-class TestSensorsAPIs(unittest.TestCase):
+class TestSensorsAPIs(PsutilTestCase):
@unittest.skipIf(not HAS_SENSORS_TEMPERATURES, "not supported")
def test_sensors_temperatures(self):
diff --git a/psutil/tests/test_testutils.py b/psutil/tests/test_testutils.py
index 85b61aea..e470c1b8 100755..100644
--- a/psutil/tests/test_testutils.py
+++ b/psutil/tests/test_testutils.py
@@ -32,11 +32,10 @@ from psutil.tests import call_until
from psutil.tests import chdir
from psutil.tests import create_sockets
from psutil.tests import get_free_port
-from psutil.tests import get_testfn
from psutil.tests import HAS_CONNECTIONS_UNIX
from psutil.tests import is_namedtuple
from psutil.tests import mock
-from psutil.tests import ProcessTestCase
+from psutil.tests import PsutilTestCase
from psutil.tests import PYTHON_EXE
from psutil.tests import reap_children
from psutil.tests import retry
@@ -59,7 +58,7 @@ import psutil.tests
# ===================================================================
-class TestRetryDecorator(unittest.TestCase):
+class TestRetryDecorator(PsutilTestCase):
@mock.patch('time.sleep')
def test_retry_success(self, sleep):
@@ -125,7 +124,7 @@ class TestRetryDecorator(unittest.TestCase):
self.assertRaises(ValueError, retry, retries=5, timeout=1)
-class TestSyncTestUtils(unittest.TestCase):
+class TestSyncTestUtils(PsutilTestCase):
def test_wait_for_pid(self):
wait_for_pid(os.getpid())
@@ -134,26 +133,26 @@ class TestSyncTestUtils(unittest.TestCase):
self.assertRaises(psutil.NoSuchProcess, wait_for_pid, nopid)
def test_wait_for_file(self):
- testfn = get_testfn()
+ testfn = self.get_testfn()
with open(testfn, 'w') as f:
f.write('foo')
wait_for_file(testfn)
assert not os.path.exists(testfn)
def test_wait_for_file_empty(self):
- testfn = get_testfn()
+ testfn = self.get_testfn()
with open(testfn, 'w'):
pass
wait_for_file(testfn, empty=True)
assert not os.path.exists(testfn)
def test_wait_for_file_no_file(self):
- testfn = get_testfn()
+ testfn = self.get_testfn()
with mock.patch('psutil.tests.retry.__iter__', return_value=iter([0])):
self.assertRaises(IOError, wait_for_file, testfn)
def test_wait_for_file_no_delete(self):
- testfn = get_testfn()
+ testfn = self.get_testfn()
with open(testfn, 'w') as f:
f.write('foo')
wait_for_file(testfn, delete=False)
@@ -164,7 +163,7 @@ class TestSyncTestUtils(unittest.TestCase):
self.assertEqual(ret, 1)
-class TestFSTestUtils(unittest.TestCase):
+class TestFSTestUtils(PsutilTestCase):
def test_open_text(self):
with open_text(__file__) as f:
@@ -175,7 +174,7 @@ class TestFSTestUtils(unittest.TestCase):
self.assertEqual(f.mode, 'rb')
def test_safe_mkdir(self):
- testfn = get_testfn()
+ testfn = self.get_testfn()
safe_mkdir(testfn)
assert os.path.isdir(testfn)
safe_mkdir(testfn)
@@ -183,7 +182,7 @@ class TestFSTestUtils(unittest.TestCase):
def test_safe_rmpath(self):
# test file is removed
- testfn = get_testfn()
+ testfn = self.get_testfn()
open(testfn, 'w').close()
safe_rmpath(testfn)
assert not os.path.exists(testfn)
@@ -201,7 +200,7 @@ class TestFSTestUtils(unittest.TestCase):
assert m.called
def test_chdir(self):
- testfn = get_testfn()
+ testfn = self.get_testfn()
base = os.getcwd()
os.mkdir(testfn)
with chdir(testfn):
@@ -209,7 +208,7 @@ class TestFSTestUtils(unittest.TestCase):
self.assertEqual(os.getcwd(), base)
-class TestProcessUtils(ProcessTestCase):
+class TestProcessUtils(PsutilTestCase):
def test_reap_children(self):
subp = self.get_test_subprocess()
@@ -221,23 +220,25 @@ class TestProcessUtils(ProcessTestCase):
assert not psutil.tests._subprocesses_started
def test_create_proc_children_pair(self):
- p1, p2 = self.create_proc_children_pair()
- self.assertNotEqual(p1.pid, p2.pid)
- assert p1.is_running()
- assert p2.is_running()
+ child, grandchild = self.create_proc_children_pair()
+ self.assertNotEqual(child.pid, grandchild.pid)
+ assert child.is_running()
+ assert grandchild.is_running()
+ children = psutil.Process().children()
+ self.assertEqual(children, [child])
children = psutil.Process().children(recursive=True)
self.assertEqual(len(children), 2)
- self.assertIn(p1, children)
- self.assertIn(p2, children)
- self.assertEqual(p1.ppid(), os.getpid())
- self.assertEqual(p2.ppid(), p1.pid)
+ self.assertIn(child, children)
+ self.assertIn(grandchild, children)
+ self.assertEqual(child.ppid(), os.getpid())
+ self.assertEqual(grandchild.ppid(), child.pid)
- # make sure both of them are cleaned up
- reap_children()
- assert not p1.is_running()
- assert not p2.is_running()
- assert not psutil.tests._pids_started
- assert not psutil.tests._subprocesses_started
+ terminate(child)
+ assert not child.is_running()
+ assert grandchild.is_running()
+
+ terminate(grandchild)
+ assert not grandchild.is_running()
@unittest.skipIf(not POSIX, "POSIX only")
def test_create_zombie_proc(self):
@@ -275,7 +276,7 @@ class TestProcessUtils(ProcessTestCase):
assert not psutil.pid_exists(zombie.pid)
-class TestNetUtils(unittest.TestCase):
+class TestNetUtils(PsutilTestCase):
def bind_socket(self):
port = get_free_port()
@@ -284,7 +285,7 @@ class TestNetUtils(unittest.TestCase):
@unittest.skipIf(not POSIX, "POSIX only")
def test_bind_unix_socket(self):
- name = get_testfn()
+ name = self.get_testfn()
sock = bind_unix_socket(name)
with contextlib.closing(sock):
self.assertEqual(sock.family, socket.AF_UNIX)
@@ -293,7 +294,7 @@ class TestNetUtils(unittest.TestCase):
assert os.path.exists(name)
assert stat.S_ISSOCK(os.stat(name).st_mode)
# UDP
- name = get_testfn()
+ name = self.get_testfn()
sock = bind_unix_socket(name, type=socket.SOCK_DGRAM)
with contextlib.closing(sock):
self.assertEqual(sock.type, socket.SOCK_DGRAM)
@@ -316,7 +317,7 @@ class TestNetUtils(unittest.TestCase):
p = psutil.Process()
num_fds = p.num_fds()
assert not p.connections(kind='unix')
- name = get_testfn()
+ name = self.get_testfn()
server, client = unix_socketpair(name)
try:
assert os.path.exists(name)
@@ -417,7 +418,7 @@ class TestMemLeakClass(TestMemoryLeak):
self.execute_w_exc(ZeroDivisionError, fun)
-class TestOtherUtils(unittest.TestCase):
+class TestOtherUtils(PsutilTestCase):
def test_is_namedtuple(self):
assert is_namedtuple(collections.namedtuple('foo', 'a b c')(1, 2, 3))
diff --git a/psutil/tests/test_unicode.py b/psutil/tests/test_unicode.py
index ae9f7f51..3f55e797 100755..100644
--- a/psutil/tests/test_unicode.py
+++ b/psutil/tests/test_unicode.py
@@ -98,11 +98,10 @@ from psutil.tests import HAS_CONNECTIONS_UNIX
from psutil.tests import HAS_ENVIRON
from psutil.tests import HAS_MEMORY_MAPS
from psutil.tests import INVALID_UNICODE_SUFFIX
-from psutil.tests import ProcessTestCase
+from psutil.tests import PsutilTestCase
from psutil.tests import PYPY
-from psutil.tests import reap_children
from psutil.tests import safe_mkdir
-from psutil.tests import safe_rmpath as _safe_rmpath
+from psutil.tests import safe_rmpath
from psutil.tests import serialrun
from psutil.tests import skip_on_access_denied
from psutil.tests import terminate
@@ -113,8 +112,8 @@ from psutil.tests import unittest
import psutil
-def safe_rmpath(path):
- if APPVEYOR:
+if APPVEYOR:
+ def safe_rmpath(path): # NOQA
# TODO - this is quite random and I'm not sure why it happens,
# nor I can reproduce it locally:
# https://ci.appveyor.com/project/giampaolo/psutil/build/job/
@@ -125,12 +124,11 @@ def safe_rmpath(path):
# https://github.com/giampaolo/psutil/blob/
# 68c7a70728a31d8b8b58f4be6c4c0baa2f449eda/psutil/arch/
# windows/process_info.c#L146
+ from psutil.tests import safe_rmpath as _rm
try:
- return _safe_rmpath(path)
+ return _rm(path)
except WindowsError:
traceback.print_exc()
- else:
- return _safe_rmpath(path)
def subprocess_supports_unicode(suffix):
@@ -139,12 +137,12 @@ def subprocess_supports_unicode(suffix):
"""
if PY3:
return True
- name = get_testfn(suffix=suffix)
sproc = None
+ testfn = get_testfn(suffix=suffix)
try:
- safe_rmpath(name)
- create_exe(name)
- sproc = get_test_subprocess(cmd=[name])
+ safe_rmpath(testfn)
+ create_exe(testfn)
+ sproc = get_test_subprocess(cmd=[testfn])
except UnicodeEncodeError:
return False
else:
@@ -152,6 +150,7 @@ def subprocess_supports_unicode(suffix):
finally:
if sproc is not None:
terminate(sproc)
+ safe_rmpath(testfn)
# ===================================================================
@@ -170,7 +169,7 @@ class _BaseFSAPIsTests(object):
@classmethod
def tearDownClass(cls):
- reap_children()
+ safe_rmpath(cls.funky_name)
def expect_exact_path_match(self):
raise NotImplementedError("must be implemented in subclass")
@@ -231,7 +230,7 @@ class _BaseFSAPIsTests(object):
@unittest.skipIf(not POSIX, "POSIX only")
def test_proc_connections(self):
suffix = os.path.basename(self.funky_name)
- name = get_testfn(suffix=suffix)
+ name = self.get_testfn(suffix=suffix)
try:
sock = bind_unix_socket(name)
except UnicodeEncodeError:
@@ -257,7 +256,7 @@ class _BaseFSAPIsTests(object):
raise ValueError("connection not found")
suffix = os.path.basename(self.funky_name)
- name = get_testfn(suffix=suffix)
+ name = self.get_testfn(suffix=suffix)
try:
sock = bind_unix_socket(name)
except UnicodeEncodeError:
@@ -304,7 +303,7 @@ class _BaseFSAPIsTests(object):
@unittest.skipIf(ASCII_FS, "ASCII fs")
@unittest.skipIf(not subprocess_supports_unicode(UNICODE_SUFFIX),
"subprocess can't deal with unicode")
-class TestFSAPIs(_BaseFSAPIsTests, ProcessTestCase):
+class TestFSAPIs(_BaseFSAPIsTests, PsutilTestCase):
"""Test FS APIs with a funky, valid, UTF8 path name."""
funky_suffix = UNICODE_SUFFIX
@@ -322,7 +321,7 @@ class TestFSAPIs(_BaseFSAPIsTests, ProcessTestCase):
@unittest.skipIf(PYPY, "unreliable on PYPY")
@unittest.skipIf(not subprocess_supports_unicode(INVALID_UNICODE_SUFFIX),
"subprocess can't deal with invalid unicode")
-class TestFSAPIsWithInvalidPath(_BaseFSAPIsTests, ProcessTestCase):
+class TestFSAPIsWithInvalidPath(_BaseFSAPIsTests, PsutilTestCase):
"""Test FS APIs with a funky, invalid path name."""
funky_suffix = INVALID_UNICODE_SUFFIX
@@ -337,12 +336,9 @@ class TestFSAPIsWithInvalidPath(_BaseFSAPIsTests, ProcessTestCase):
# ===================================================================
-class TestNonFSAPIS(ProcessTestCase):
+class TestNonFSAPIS(PsutilTestCase):
"""Unicode tests for non fs-related APIs."""
- def tearDown(self):
- reap_children()
-
@unittest.skipIf(not HAS_ENVIRON, "not supported")
@unittest.skipIf(PYPY and WINDOWS, "segfaults on PYPY + WINDOWS")
def test_proc_environ(self):
diff --git a/psutil/tests/test_windows.py b/psutil/tests/test_windows.py
index a51c9c15..057c2982 100755
--- a/psutil/tests/test_windows.py
+++ b/psutil/tests/test_windows.py
@@ -22,14 +22,14 @@ import warnings
import psutil
from psutil import WINDOWS
from psutil._compat import FileNotFoundError
+from psutil._compat import super
from psutil.tests import APPVEYOR
from psutil.tests import get_test_subprocess
from psutil.tests import HAS_BATTERY
from psutil.tests import mock
-from psutil.tests import ProcessTestCase
+from psutil.tests import PsutilTestCase
from psutil.tests import PY3
from psutil.tests import PYPY
-from psutil.tests import reap_children
from psutil.tests import retry_on_failure
from psutil.tests import sh
from psutil.tests import terminate
@@ -66,7 +66,7 @@ def wrap_exceptions(fun):
@unittest.skipIf(PYPY, "pywin32 not available on PYPY") # skip whole module
-class TestCase(unittest.TestCase):
+class TestCase(PsutilTestCase):
pass
@@ -300,7 +300,7 @@ class TestSensorsBattery(TestCase):
@unittest.skipIf(not WINDOWS, "WINDOWS only")
-class TestProcess(ProcessTestCase):
+class TestProcess(PsutilTestCase):
@classmethod
def setUpClass(cls):
@@ -537,7 +537,7 @@ class TestProcessWMI(TestCase):
@classmethod
def tearDownClass(cls):
- reap_children()
+ terminate(cls.pid)
def test_name(self):
w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
@@ -677,7 +677,7 @@ class TestDualProcessImplementation(TestCase):
@unittest.skipIf(not WINDOWS, "WINDOWS only")
-class RemoteProcessTestCase(ProcessTestCase):
+class RemoteProcessTestCase(PsutilTestCase):
"""Certain functions require calling ReadProcessMemory.
This trivially works when called on the current process.
Check that this works on other processes, especially when they
@@ -696,6 +696,7 @@ class RemoteProcessTestCase(ProcessTestCase):
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
output, _ = proc.communicate()
+ proc.wait()
if output == str(not IS_64_BIT):
return filename
@@ -717,6 +718,7 @@ class RemoteProcessTestCase(ProcessTestCase):
test_args = ["-c", "import sys; sys.stdin.read()"]
def setUp(self):
+ super().setUp()
env = os.environ.copy()
env["THINK_OF_A_NUMBER"] = str(os.getpid())
self.proc32 = self.get_test_subprocess(
@@ -729,13 +731,10 @@ class RemoteProcessTestCase(ProcessTestCase):
stdin=subprocess.PIPE)
def tearDown(self):
+ super().tearDown()
self.proc32.communicate()
self.proc64.communicate()
- @classmethod
- def tearDownClass(cls):
- reap_children()
-
def test_cmdline_32(self):
p = psutil.Process(self.proc32.pid)
self.assertEqual(len(p.cmdline()), 3)
diff --git a/scripts/internal/winmake.py b/scripts/internal/winmake.py
index f39d45ac..eebd1692 100755
--- a/scripts/internal/winmake.py
+++ b/scripts/internal/winmake.py
@@ -31,7 +31,7 @@ if APPVEYOR:
PYTHON = sys.executable
else:
PYTHON = os.getenv('PYTHON', sys.executable)
-TEST_SCRIPT = 'psutil\\tests\\runner.py'
+RUNNER_PY = 'psutil\\tests\\runner.py'
GET_PIP_URL = "https://bootstrap.pypa.io/get-pip.py"
PY3 = sys.version_info[0] == 3
HERE = os.path.abspath(os.path.dirname(__file__))
@@ -397,13 +397,11 @@ def lint():
sh("%s -m flake8 %s" % (PYTHON, py_files), nolog=True)
-def test(script=TEST_SCRIPT):
+def test(name=""):
"""Run tests"""
install()
test_setup()
- cmdline = "%s %s" % (PYTHON, script)
- safe_print(cmdline)
- sh(cmdline)
+ sh("%s %s %s" % (PYTHON, RUNNER_PY, name))
def coverage():
@@ -411,7 +409,7 @@ def coverage():
# Note: coverage options are controlled by .coveragerc file
install()
test_setup()
- sh("%s -m coverage run %s" % (PYTHON, TEST_SCRIPT))
+ sh("%s -m coverage run %s" % (PYTHON, RUNNER_PY))
sh("%s -m coverage report" % PYTHON)
sh("%s -m coverage html" % PYTHON)
sh("%s -m webbrowser -t htmlcov/index.html" % PYTHON)
@@ -477,8 +475,7 @@ def test_failed():
"""Re-run tests which failed on last run."""
install()
test_setup()
- sh('%s -c "import psutil.tests.runner as r; r.run(last_failed=True)"' % (
- PYTHON))
+ sh("%s %s --last-failed" % (PYTHON, RUNNER_PY))
def test_memleaks():