summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGiampaolo Rodola <g.rodola@gmail.com>2020-04-28 12:06:38 -0700
committerGitHub <noreply@github.com>2020-04-28 21:06:38 +0200
commit0065a3921b1cf31ad32cca5bd3ca209fe1fbceed (patch)
treec614e4f4e34eb3b12d2ffaeac1d0bd3374df6ccb
parent56db14e14797ac790094a7bd8865d63383fd93a7 (diff)
downloadpsutil-0065a3921b1cf31ad32cca5bd3ca209fe1fbceed.tar.gz
Test sub-processes cleanup and ProcessTestCase class (#1739)
-rw-r--r--psutil/tests/__init__.py196
-rwxr-xr-xpsutil/tests/runner.py2
-rwxr-xr-xpsutil/tests/test_bsd.py8
-rwxr-xr-xpsutil/tests/test_connections.py16
-rwxr-xr-xpsutil/tests/test_linux.py8
-rwxr-xr-xpsutil/tests/test_osx.py5
-rwxr-xr-xpsutil/tests/test_posix.py7
-rwxr-xr-xpsutil/tests/test_process.py104
-rwxr-xr-xpsutil/tests/test_system.py25
-rwxr-xr-xpsutil/tests/test_testutils.py55
-rwxr-xr-xpsutil/tests/test_unicode.py22
-rwxr-xr-xpsutil/tests/test_windows.py27
12 files changed, 260 insertions, 215 deletions
diff --git a/psutil/tests/__init__.py b/psutil/tests/__init__.py
index fea5a5d0..fc4bff01 100644
--- a/psutil/tests/__init__.py
+++ b/psutil/tests/__init__.py
@@ -45,7 +45,6 @@ from psutil import WINDOWS
from psutil._common import bytes2human
from psutil._common import print_color
from psutil._common import supports_ipv6
-from psutil._compat import ChildProcessError
from psutil._compat import FileExistsError
from psutil._compat import FileNotFoundError
from psutil._compat import PY3
@@ -89,7 +88,7 @@ __all__ = [
'ThreadTask'
# test utils
'unittest', 'skip_on_access_denied', 'skip_on_not_implemented',
- 'retry_on_failure', 'TestMemoryLeak',
+ 'retry_on_failure', 'TestMemoryLeak', 'ProcessTestCase',
# install utils
'install_pip', 'install_test_deps',
# fs utils
@@ -291,7 +290,7 @@ def _reap_children_on_err(fun):
@_reap_children_on_err
def get_test_subprocess(cmd=None, **kwds):
"""Creates a python subprocess which does nothing for 60 secs and
- return it as subprocess.Popen instance.
+ return it as a subprocess.Popen instance.
If "cmd" is specified that is used instead of python.
By default stdin and stdout are redirected to /dev/null.
It also attemps to make sure the process is in a reasonably
@@ -353,9 +352,7 @@ def create_proc_children_pair():
else:
subp = pyrun(s)
child1 = psutil.Process(subp.pid)
- data = wait_for_file(testfn, delete=False, empty=False)
- safe_rmpath(testfn)
- child2_pid = int(data)
+ child2_pid = int(wait_for_file(testfn, delete=True, empty=False))
_pids_started.add(child2_pid)
child2 = psutil.Process(child2_pid)
return (child1, child2)
@@ -458,44 +455,37 @@ def _assert_no_pid(pid):
assert 0, "%s is still alive" % p
-def terminate(proc_or_pid, sig=signal.SIGTERM, wait_w_timeout=GLOBAL_TIMEOUT):
- """Terminate and flush a psutil.Process, psutil.Popen or
- subprocess.Popen instance.
+def terminate(proc_or_pid, sig=signal.SIGTERM, wait_timeout=GLOBAL_TIMEOUT):
+ """Terminate a process and wait() for it.
+ Process can be a PID or an instance of psutil.Process(),
+ subprocess.Popen() or psutil.Popen().
+ If it's a subprocess.Popen() or psutil.Popen() instance also closes
+ its stdin / stdout / stderr fds.
+ PID is wait()ed even if the process is already gone (kills zombies).
+ Does nothing if the process does not exist.
+ Return process exit status.
"""
- def wait(proc, timeout=None):
- if sys.version_info < (3, 3) and not \
- isinstance(proc, (psutil.Process, psutil.Popen)):
- # subprocess.Popen instance + no timeout arg.
+ if POSIX:
+ from psutil._psposix import wait_pid
+
+ def wait(proc, timeout):
+ if sys.version_info < (3, 3) and \
+ isinstance(proc, subprocess.Popen) and \
+ not isinstance(proc, psutil.Popen):
+ # subprocess.Popen instance: emulate missing timeout arg.
+ ret = None
try:
ret = psutil.Process(proc.pid).wait(timeout)
except psutil.NoSuchProcess:
- pass
- else:
- proc.returncode = ret
- return ret
+ # Needed to kill zombies.
+ if POSIX:
+ ret = wait_pid(proc.pid, timeout)
+ proc.returncode = ret
+ return ret
else:
return proc.wait(timeout)
- if isinstance(proc_or_pid, int):
- try:
- proc = psutil.Process(proc_or_pid)
- except psutil.NoSuchProcess:
- return
- else:
- proc = proc_or_pid
-
- if isinstance(proc, (psutil.Process, psutil.Popen)):
- try:
- proc.send_signal(sig)
- except psutil.NoSuchProcess:
- _assert_no_pid(proc.pid)
- else:
- if wait_w_timeout:
- ret = wait(proc, wait_w_timeout)
- _assert_no_pid(proc.pid)
- return ret
- else:
- # subprocess.Popen instance
+ def term_subproc(proc, timeout):
try:
proc.send_signal(sig)
except OSError as err:
@@ -503,72 +493,80 @@ def terminate(proc_or_pid, sig=signal.SIGTERM, wait_w_timeout=GLOBAL_TIMEOUT):
pass
elif err.errno != errno.ESRCH:
raise
- except psutil.NoSuchProcess: # psutil.Popen
+ return wait(proc, timeout)
+
+ def term_psproc(proc, timeout):
+ try:
+ proc.send_signal(sig)
+ except psutil.NoSuchProcess:
pass
+ return wait(proc, timeout)
+
+ def term_pid(pid, timeout):
+ try:
+ proc = psutil.Process(pid)
+ except psutil.NoSuchProcess:
+ # Needed to kill zombies.
+ if POSIX:
+ return wait_pid(pid, timeout)
+ else:
+ return term_psproc(proc, timeout)
+
+ def flush_popen(proc):
if proc.stdout:
proc.stdout.close()
if proc.stderr:
proc.stderr.close()
- try:
- # Flushing a BufferedWriter may raise an error.
- if proc.stdin:
- proc.stdin.close()
- finally:
- if wait_w_timeout:
- try:
- return wait(proc, wait_w_timeout)
- except ChildProcessError:
- pass
- _assert_no_pid(proc.pid)
+ # Flushing a BufferedWriter may raise an error.
+ if proc.stdin:
+ proc.stdin.close()
+
+ p = proc_or_pid
+ try:
+ if isinstance(p, int):
+ return term_pid(p, wait_timeout)
+ elif isinstance(p, (psutil.Process, psutil.Popen)):
+ return term_psproc(p, wait_timeout)
+ elif isinstance(p, subprocess.Popen):
+ return term_subproc(p, wait_timeout)
+ else:
+ raise TypeError("wrong type %r" % p)
+ finally:
+ if isinstance(p, (subprocess.Popen, psutil.Popen)):
+ flush_popen(p)
+ _assert_no_pid(p if isinstance(p, int) else p.pid)
def reap_children(recursive=False):
"""Terminate and wait() any subprocess started by this test suite
- and ensure that no zombies stick around to hog resources and
- create problems when looking for refleaks.
-
+ and any children currently running, ensuring that no processes stick
+ around to hog resources.
If resursive is True it also tries to terminate and wait()
all grandchildren started by this process.
"""
- # If recursive, get the children here before terminating them, as
- # we don't want to lose the intermediate reference pointing to the
- # grandchildren.
- if recursive:
- children = set(psutil.Process().children(recursive=True))
- else:
- children = set()
+ # Get the children here before terminating them, as in case of
+ # recursive=True we don't want to lose the intermediate reference
+ # pointing to the grandchildren.
+ children = psutil.Process().children(recursive=recursive)
# Terminate subprocess.Popen.
while _subprocesses_started:
subp = _subprocesses_started.pop()
- _pids_started.add(subp.pid)
terminate(subp)
# Collect started pids.
while _pids_started:
pid = _pids_started.pop()
- try:
- p = psutil.Process(pid)
- except psutil.NoSuchProcess:
- _assert_no_pid(pid)
- else:
- children.add(p)
+ terminate(pid)
# Terminate children.
if children:
for p in children:
- terminate(p, wait_w_timeout=None)
+ terminate(p, wait_timeout=None)
gone, alive = psutil.wait_procs(children, timeout=GLOBAL_TIMEOUT)
for p in alive:
warn("couldn't terminate process %r; attempting kill()" % p)
- terminate(p, wait_w_timeout=None, sig=signal.SIGKILL)
- gone, alive = psutil.wait_procs(alive, timeout=GLOBAL_TIMEOUT)
- if alive:
- for p in alive:
- warn("process %r survived kill()" % p)
-
- for p in children:
- _assert_no_pid(p.pid)
+ terminate(p, sig=signal.SIGKILL)
# ===================================================================
@@ -774,6 +772,7 @@ def chdir(dirname):
def create_exe(outpath, c_code=None):
"""Creates an executable file in the given location."""
assert not os.path.exists(outpath), outpath
+ _testfiles_created.add(outpath)
if c_code:
if not which("gcc"):
raise ValueError("gcc is not installed")
@@ -842,14 +841,37 @@ class TestCase(unittest.TestCase):
unittest.TestCase = TestCase
-def serialrun(klass):
- """A decorator to mark a TestCase class. When running parallel tests,
- class' unit tests will be run serially (1 process).
+class ProcessTestCase(TestCase):
+ """Test class providing auto-cleanup wrappers on top of process
+ test utilities.
"""
- # assert issubclass(klass, unittest.TestCase), klass
- assert inspect.isclass(klass), klass
- klass._serialrun = True
- return klass
+
+ def get_test_subprocess(self, *args, **kwds):
+ sproc = get_test_subprocess(*args, **kwds)
+ self.addCleanup(terminate, sproc)
+ return sproc
+
+ def create_proc_children_pair(self):
+ child1, child2 = create_proc_children_pair()
+ self.addCleanup(terminate, child1)
+ self.addCleanup(terminate, child2)
+ return (child1, child2)
+
+ def create_zombie_proc(self):
+ parent, zombie = create_zombie_proc()
+ self.addCleanup(terminate, zombie)
+ self.addCleanup(terminate, parent) # executed first
+ return (parent, zombie)
+
+ def pyrun(self, *args, **kwds):
+ sproc = pyrun(*args, **kwds)
+ self.addCleanup(terminate, sproc)
+ return sproc
+
+ def get_testfn(self, suffix="", dir=None):
+ fname = get_testfn(suffix=suffix, dir=suffix)
+ self.addCleanup(safe_rmpath(fname))
+ return fname
@unittest.skipIf(PYPY, "unreliable on PYPY")
@@ -966,6 +988,16 @@ class TestMemoryLeak(unittest.TestCase):
self.execute(call, **kwargs)
+def serialrun(klass):
+ """A decorator to mark a TestCase class. When running parallel tests,
+ class' unit tests will be run serially (1 process).
+ """
+ # assert issubclass(klass, unittest.TestCase), klass
+ assert inspect.isclass(klass), klass
+ klass._serialrun = True
+ return klass
+
+
def retry_on_failure(retries=NO_RETRIES):
"""Decorator which runs a test function and retries N times before
actually failing.
diff --git a/psutil/tests/runner.py b/psutil/tests/runner.py
index d90feabd..8e4c872a 100755
--- a/psutil/tests/runner.py
+++ b/psutil/tests/runner.py
@@ -256,6 +256,8 @@ class Runner:
ser_elapsed = time.time() - t
# print
+ if not par.wasSuccessful():
+ par.printErrors() # print them again at the bottom
par_fails, par_errs, par_skips = map(len, (par.failures,
par.errors,
par.skipped))
diff --git a/psutil/tests/test_bsd.py b/psutil/tests/test_bsd.py
index d25eb877..427b9219 100755
--- a/psutil/tests/test_bsd.py
+++ b/psutil/tests/test_bsd.py
@@ -22,10 +22,10 @@ from psutil import NETBSD
from psutil import OPENBSD
from psutil.tests import get_test_subprocess
from psutil.tests import HAS_BATTERY
-from psutil.tests import SYSMEM_TOLERANCE
-from psutil.tests import reap_children
from psutil.tests import retry_on_failure
from psutil.tests import sh
+from psutil.tests import SYSMEM_TOLERANCE
+from psutil.tests import terminate
from psutil.tests import unittest
from psutil.tests import which
@@ -81,7 +81,7 @@ class BSDTestCase(unittest.TestCase):
@classmethod
def tearDownClass(cls):
- reap_children()
+ terminate(cls.pid)
@unittest.skipIf(NETBSD, "-o lstart doesn't work on NETBSD")
def test_process_create_time(self):
@@ -156,7 +156,7 @@ class FreeBSDProcessTestCase(unittest.TestCase):
@classmethod
def tearDownClass(cls):
- reap_children()
+ terminate(cls.pid)
@retry_on_failure()
def test_memory_maps(self):
diff --git a/psutil/tests/test_connections.py b/psutil/tests/test_connections.py
index f4ddc14e..9c1bbe8e 100755
--- a/psutil/tests/test_connections.py
+++ b/psutil/tests/test_connections.py
@@ -39,8 +39,7 @@ from psutil.tests import enum
from psutil.tests import get_free_port
from psutil.tests import get_testfn
from psutil.tests import HAS_CONNECTIONS_UNIX
-from psutil.tests import pyrun
-from psutil.tests import reap_children
+from psutil.tests import ProcessTestCase
from psutil.tests import serialrun
from psutil.tests import skip_on_access_denied
from psutil.tests import SKIP_SYSCONS
@@ -56,7 +55,7 @@ SOCK_SEQPACKET = getattr(socket, "SOCK_SEQPACKET", object())
@serialrun
-class _ConnTestCase(unittest.TestCase):
+class _ConnTestCase(ProcessTestCase):
def setUp(self):
if not (NETBSD or FREEBSD):
@@ -65,7 +64,6 @@ class _ConnTestCase(unittest.TestCase):
assert not cons, cons
def tearDown(self):
- reap_children()
if not (FREEBSD or NETBSD):
# Make sure we closed all resources.
# NetBSD opens a UNIX socket to /var/log/run.
@@ -447,14 +445,14 @@ class TestFilters(_ConnTestCase):
# launch various subprocess instantiating a socket of various
# families and types to enrich psutil results
- tcp4_proc = pyrun(tcp4_template)
+ tcp4_proc = self.pyrun(tcp4_template)
tcp4_addr = eval(wait_for_file(testfile))
- udp4_proc = pyrun(udp4_template)
+ udp4_proc = self.pyrun(udp4_template)
udp4_addr = eval(wait_for_file(testfile))
if supports_ipv6():
- tcp6_proc = pyrun(tcp6_template)
+ tcp6_proc = self.pyrun(tcp6_template)
tcp6_addr = eval(wait_for_file(testfile))
- udp6_proc = pyrun(udp6_template)
+ udp6_proc = self.pyrun(udp6_template)
udp6_addr = eval(wait_for_file(testfile))
else:
tcp6_proc = None
@@ -596,7 +594,7 @@ class TestSystemWideConnections(_ConnTestCase):
cleanup_test_files()
time.sleep(60)
""" % fname)
- sproc = pyrun(src)
+ sproc = self.pyrun(src)
pids.append(sproc.pid)
# sync
diff --git a/psutil/tests/test_linux.py b/psutil/tests/test_linux.py
index 6d4a934a..bac20b05 100755
--- a/psutil/tests/test_linux.py
+++ b/psutil/tests/test_linux.py
@@ -34,15 +34,14 @@ from psutil.tests import HAS_CPU_FREQ
from psutil.tests import HAS_GETLOADAVG
from psutil.tests import HAS_RLIMIT
from psutil.tests import mock
+from psutil.tests import ProcessTestCase
from psutil.tests import PYPY
-from psutil.tests import pyrun
from psutil.tests import reload_module
from psutil.tests import retry_on_failure
from psutil.tests import safe_rmpath
from psutil.tests import sh
from psutil.tests import skip_on_not_implemented
from psutil.tests import SYSMEM_TOLERANCE
-from psutil.tests import terminate
from psutil.tests import ThreadTask
from psutil.tests import TRAVIS
from psutil.tests import unittest
@@ -1641,7 +1640,7 @@ class TestSensorsFans(unittest.TestCase):
@unittest.skipIf(not LINUX, "LINUX only")
-class TestProcess(unittest.TestCase):
+class TestProcess(ProcessTestCase):
@retry_on_failure()
def test_memory_full_info(self):
@@ -1651,8 +1650,7 @@ class TestProcess(unittest.TestCase):
with open("%s", "w") as f:
time.sleep(10)
""" % testfn)
- sproc = pyrun(src)
- self.addCleanup(terminate, sproc)
+ sproc = self.pyrun(src)
call_until(lambda: os.listdir('.'), "'%s' not in ret" % testfn)
p = psutil.Process(sproc.pid)
time.sleep(.1)
diff --git a/psutil/tests/test_osx.py b/psutil/tests/test_osx.py
index bcff0ba7..4fa8d0af 100755
--- a/psutil/tests/test_osx.py
+++ b/psutil/tests/test_osx.py
@@ -16,7 +16,6 @@ from psutil.tests import create_zombie_proc
from psutil.tests import get_test_subprocess
from psutil.tests import HAS_BATTERY
from psutil.tests import SYSMEM_TOLERANCE
-from psutil.tests import reap_children
from psutil.tests import retry_on_failure
from psutil.tests import sh
from psutil.tests import terminate
@@ -85,7 +84,7 @@ class TestProcess(unittest.TestCase):
@classmethod
def tearDownClass(cls):
- reap_children()
+ terminate(cls.pid)
def test_process_create_time(self):
output = sh("ps -o lstart -p %s" % self.pid)
@@ -111,8 +110,8 @@ class TestZombieProcessAPIs(unittest.TestCase):
@classmethod
def tearDownClass(cls):
- terminate(cls.parent)
terminate(cls.zombie)
+ terminate(cls.parent) # executed first
def test_pidtask_info(self):
self.assertEqual(self.p.status(), psutil.STATUS_ZOMBIE)
diff --git a/psutil/tests/test_posix.py b/psutil/tests/test_posix.py
index 14ec880b..1b37fa2f 100755
--- a/psutil/tests/test_posix.py
+++ b/psutil/tests/test_posix.py
@@ -28,13 +28,12 @@ from psutil.tests import get_test_subprocess
from psutil.tests import HAS_NET_IO_COUNTERS
from psutil.tests import mock
from psutil.tests import PYTHON_EXE
-from psutil.tests import reap_children
from psutil.tests import retry_on_failure
from psutil.tests import sh
from psutil.tests import skip_on_access_denied
+from psutil.tests import terminate
from psutil.tests import TRAVIS
from psutil.tests import unittest
-from psutil.tests import wait_for_pid
from psutil.tests import which
@@ -134,11 +133,11 @@ class TestProcess(unittest.TestCase):
def setUpClass(cls):
cls.pid = get_test_subprocess([PYTHON_EXE, "-E", "-O"],
stdin=subprocess.PIPE).pid
- wait_for_pid(cls.pid)
+ # wait_for_pid(cls.pid)
@classmethod
def tearDownClass(cls):
- reap_children()
+ terminate(cls.pid)
def test_ppid(self):
ppid_ps = ps('ppid', self.pid)
diff --git a/psutil/tests/test_process.py b/psutil/tests/test_process.py
index ef8d245f..178b991b 100755
--- a/psutil/tests/test_process.py
+++ b/psutil/tests/test_process.py
@@ -39,10 +39,7 @@ from psutil.tests import call_until
from psutil.tests import CIRRUS
from psutil.tests import copyload_shared_lib
from psutil.tests import create_exe
-from psutil.tests import create_proc_children_pair
-from psutil.tests import create_zombie_proc
from psutil.tests import enum
-from psutil.tests import get_test_subprocess
from psutil.tests import get_testfn
from psutil.tests import HAS_CPU_AFFINITY
from psutil.tests import HAS_ENVIRON
@@ -53,6 +50,7 @@ from psutil.tests import HAS_PROC_IO_COUNTERS
from psutil.tests import HAS_RLIMIT
from psutil.tests import HAS_THREADS
from psutil.tests import mock
+from psutil.tests import ProcessTestCase
from psutil.tests import PYPY
from psutil.tests import PYTHON_EXE
from psutil.tests import reap_children
@@ -60,7 +58,6 @@ from psutil.tests import retry_on_failure
from psutil.tests import sh
from psutil.tests import skip_on_access_denied
from psutil.tests import skip_on_not_implemented
-from psutil.tests import terminate
from psutil.tests import ThreadTask
from psutil.tests import TRAVIS
from psutil.tests import unittest
@@ -72,22 +69,19 @@ from psutil.tests import wait_for_pid
# ===================================================================
-class TestProcess(unittest.TestCase):
+class TestProcess(ProcessTestCase):
"""Tests for psutil.Process class."""
- def tearDown(self):
- reap_children()
-
def test_pid(self):
p = psutil.Process()
self.assertEqual(p.pid, os.getpid())
- sproc = get_test_subprocess()
+ sproc = self.get_test_subprocess()
self.assertEqual(psutil.Process(sproc.pid).pid, sproc.pid)
with self.assertRaises(AttributeError):
p.pid = 33
def test_kill(self):
- sproc = get_test_subprocess()
+ sproc = self.get_test_subprocess()
test_pid = sproc.pid
p = psutil.Process(test_pid)
p.kill()
@@ -97,7 +91,7 @@ class TestProcess(unittest.TestCase):
self.assertEqual(sig, -signal.SIGKILL)
def test_terminate(self):
- sproc = get_test_subprocess()
+ sproc = self.get_test_subprocess()
test_pid = sproc.pid
p = psutil.Process(test_pid)
p.terminate()
@@ -108,7 +102,7 @@ class TestProcess(unittest.TestCase):
def test_send_signal(self):
sig = signal.SIGKILL if POSIX else signal.SIGTERM
- sproc = get_test_subprocess()
+ sproc = self.get_test_subprocess()
p = psutil.Process(sproc.pid)
p.send_signal(sig)
exit_sig = p.wait()
@@ -116,7 +110,7 @@ class TestProcess(unittest.TestCase):
if POSIX:
self.assertEqual(exit_sig, -sig)
#
- sproc = get_test_subprocess()
+ sproc = self.get_test_subprocess()
p = psutil.Process(sproc.pid)
p.send_signal(sig)
with mock.patch('psutil.os.kill',
@@ -124,7 +118,7 @@ class TestProcess(unittest.TestCase):
with self.assertRaises(psutil.NoSuchProcess):
p.send_signal(sig)
#
- sproc = get_test_subprocess()
+ sproc = self.get_test_subprocess()
p = psutil.Process(sproc.pid)
p.send_signal(sig)
with mock.patch('psutil.os.kill',
@@ -140,7 +134,7 @@ class TestProcess(unittest.TestCase):
def test_wait(self):
# check exit code signal
- sproc = get_test_subprocess()
+ sproc = self.get_test_subprocess()
p = psutil.Process(sproc.pid)
p.kill()
code = p.wait()
@@ -150,7 +144,7 @@ class TestProcess(unittest.TestCase):
self.assertEqual(code, signal.SIGTERM)
self.assertFalse(p.is_running())
- sproc = get_test_subprocess()
+ sproc = self.get_test_subprocess()
p = psutil.Process(sproc.pid)
p.terminate()
code = p.wait()
@@ -162,7 +156,7 @@ class TestProcess(unittest.TestCase):
# check sys.exit() code
code = "import time, sys; time.sleep(0.01); sys.exit(5);"
- sproc = get_test_subprocess([PYTHON_EXE, "-c", code])
+ sproc = self.get_test_subprocess([PYTHON_EXE, "-c", code])
p = psutil.Process(sproc.pid)
self.assertEqual(p.wait(), 5)
self.assertFalse(p.is_running())
@@ -171,13 +165,13 @@ class TestProcess(unittest.TestCase):
# It is not supposed to raise NSP when the process is gone.
# On UNIX this should return None, on Windows it should keep
# returning the exit code.
- sproc = get_test_subprocess([PYTHON_EXE, "-c", code])
+ sproc = self.get_test_subprocess([PYTHON_EXE, "-c", code])
p = psutil.Process(sproc.pid)
self.assertEqual(p.wait(), 5)
self.assertIn(p.wait(), (5, None))
# test timeout
- sproc = get_test_subprocess()
+ sproc = self.get_test_subprocess()
p = psutil.Process(sproc.pid)
p.name()
self.assertRaises(psutil.TimeoutExpired, p.wait, 0.01)
@@ -188,7 +182,7 @@ class TestProcess(unittest.TestCase):
def test_wait_non_children(self):
# Test wait() against a process which is not our direct
# child.
- p1, p2 = create_proc_children_pair()
+ p1, p2 = self.create_proc_children_pair()
self.assertRaises(psutil.TimeoutExpired, p1.wait, 0.01)
self.assertRaises(psutil.TimeoutExpired, p2.wait, 0.01)
# We also terminate the direct child otherwise the
@@ -207,7 +201,7 @@ class TestProcess(unittest.TestCase):
self.assertEqual(ret1, signal.SIGTERM)
def test_wait_timeout_0(self):
- sproc = get_test_subprocess()
+ sproc = self.get_test_subprocess()
p = psutil.Process(sproc.pid)
self.assertRaises(psutil.TimeoutExpired, p.wait, 0)
p.kill()
@@ -277,7 +271,7 @@ class TestProcess(unittest.TestCase):
self.assertIn(p.cpu_num(), range(psutil.cpu_count()))
def test_create_time(self):
- sproc = get_test_subprocess()
+ sproc = self.get_test_subprocess()
now = time.time()
p = psutil.Process(sproc.pid)
create_time = p.create_time()
@@ -444,7 +438,7 @@ class TestProcess(unittest.TestCase):
@unittest.skipIf(not HAS_RLIMIT, "not supported")
def test_rlimit_set(self):
- sproc = get_test_subprocess()
+ sproc = self.get_test_subprocess()
p = psutil.Process(sproc.pid)
p.rlimit(psutil.RLIMIT_NOFILE, (5, 5))
self.assertEqual(p.rlimit(psutil.RLIMIT_NOFILE), (5, 5))
@@ -550,7 +544,7 @@ class TestProcess(unittest.TestCase):
@skip_on_access_denied(only_if=MACOS)
@unittest.skipIf(not HAS_THREADS, 'not supported')
def test_threads_2(self):
- sproc = get_test_subprocess()
+ sproc = self.get_test_subprocess()
p = psutil.Process(sproc.pid)
if OPENBSD:
try:
@@ -669,7 +663,7 @@ class TestProcess(unittest.TestCase):
p.memory_percent(memtype='uss')
def test_is_running(self):
- sproc = get_test_subprocess()
+ sproc = self.get_test_subprocess()
p = psutil.Process(sproc.pid)
assert p.is_running()
assert p.is_running()
@@ -679,7 +673,7 @@ class TestProcess(unittest.TestCase):
assert not p.is_running()
def test_exe(self):
- sproc = get_test_subprocess()
+ sproc = self.get_test_subprocess()
exe = psutil.Process(sproc.pid).exe()
try:
self.assertEqual(exe, PYTHON_EXE)
@@ -708,7 +702,7 @@ class TestProcess(unittest.TestCase):
def test_cmdline(self):
cmdline = [PYTHON_EXE, "-c", "import time; time.sleep(60)"]
- sproc = get_test_subprocess(cmdline)
+ sproc = self.get_test_subprocess(cmdline)
try:
self.assertEqual(' '.join(psutil.Process(sproc.pid).cmdline()),
' '.join(cmdline))
@@ -729,12 +723,12 @@ class TestProcess(unittest.TestCase):
testfn = get_testfn()
create_exe(testfn)
cmdline = [testfn] + (["0123456789"] * 20)
- sproc = get_test_subprocess(cmdline)
+ sproc = self.get_test_subprocess(cmdline)
p = psutil.Process(sproc.pid)
self.assertEqual(p.cmdline(), cmdline)
def test_name(self):
- sproc = get_test_subprocess(PYTHON_EXE)
+ sproc = self.get_test_subprocess(PYTHON_EXE)
name = psutil.Process(sproc.pid).name().lower()
pyexe = os.path.basename(os.path.realpath(sys.executable)).lower()
assert pyexe.startswith(name), (pyexe, name)
@@ -743,7 +737,7 @@ class TestProcess(unittest.TestCase):
def test_long_name(self):
testfn = get_testfn(suffix="0123456789" * 2)
create_exe(testfn)
- sproc = get_test_subprocess(testfn)
+ sproc = self.get_test_subprocess(testfn)
p = psutil.Process(sproc.pid)
self.assertEqual(p.name(), os.path.basename(testfn))
@@ -760,7 +754,7 @@ class TestProcess(unittest.TestCase):
cmdline = [funky_path, "-c",
"import time; [time.sleep(0.01) for x in range(3000)];"
"arg1", "arg2", "", "arg3", ""]
- sproc = get_test_subprocess(cmdline)
+ sproc = self.get_test_subprocess(cmdline)
p = psutil.Process(sproc.pid)
# ...in order to try to prevent occasional failures on travis
if TRAVIS:
@@ -844,7 +838,7 @@ class TestProcess(unittest.TestCase):
self.assertEqual(p.status(), psutil.STATUS_RUNNING)
def test_username(self):
- sproc = get_test_subprocess()
+ sproc = self.get_test_subprocess()
p = psutil.Process(sproc.pid)
username = p.username()
if WINDOWS:
@@ -856,14 +850,14 @@ class TestProcess(unittest.TestCase):
self.assertEqual(username, getpass.getuser())
def test_cwd(self):
- sproc = get_test_subprocess()
+ sproc = self.get_test_subprocess()
p = psutil.Process(sproc.pid)
self.assertEqual(p.cwd(), os.getcwd())
def test_cwd_2(self):
cmd = [PYTHON_EXE, "-c",
"import os, time; os.chdir('..'); time.sleep(60)"]
- sproc = get_test_subprocess(cmd)
+ sproc = self.get_test_subprocess(cmd)
p = psutil.Process(sproc.pid)
call_until(p.cwd, "ret == os.path.dirname(os.getcwd())")
@@ -912,7 +906,7 @@ class TestProcess(unittest.TestCase):
@unittest.skipIf(not HAS_CPU_AFFINITY, 'not supported')
def test_cpu_affinity_errs(self):
- sproc = get_test_subprocess()
+ sproc = self.get_test_subprocess()
p = psutil.Process(sproc.pid)
invalid_cpu = [len(psutil.cpu_times(percpu=True)) + 10]
self.assertRaises(ValueError, p.cpu_affinity, invalid_cpu)
@@ -966,7 +960,7 @@ class TestProcess(unittest.TestCase):
# another process
cmdline = "import time; f = open(r'%s', 'r'); time.sleep(60);" % testfn
- sproc = get_test_subprocess([PYTHON_EXE, "-c", cmdline])
+ sproc = self.get_test_subprocess([PYTHON_EXE, "-c", cmdline])
p = psutil.Process(sproc.pid)
for x in range(100):
@@ -1037,7 +1031,7 @@ class TestProcess(unittest.TestCase):
if hasattr(os, 'getppid'):
self.assertEqual(psutil.Process().ppid(), os.getppid())
this_parent = os.getpid()
- sproc = get_test_subprocess()
+ sproc = self.get_test_subprocess()
p = psutil.Process(sproc.pid)
self.assertEqual(p.ppid(), this_parent)
# no other process is supposed to have us as parent
@@ -1055,7 +1049,7 @@ class TestProcess(unittest.TestCase):
def test_parent(self):
this_parent = os.getpid()
- sproc = get_test_subprocess()
+ sproc = self.get_test_subprocess()
p = psutil.Process(sproc.pid)
self.assertEqual(p.parent().pid, this_parent)
@@ -1063,13 +1057,13 @@ class TestProcess(unittest.TestCase):
self.assertIsNone(psutil.Process(lowest_pid).parent())
def test_parent_multi(self):
- p1, p2 = create_proc_children_pair()
+ p1, p2 = self.create_proc_children_pair()
self.assertEqual(p2.parent(), p1)
self.assertEqual(p1.parent(), psutil.Process())
def test_parent_disappeared(self):
# Emulate a case where the parent process disappeared.
- sproc = get_test_subprocess()
+ sproc = self.get_test_subprocess()
p = psutil.Process(sproc.pid)
with mock.patch("psutil.Process",
side_effect=psutil.NoSuchProcess(0, 'foo')):
@@ -1078,7 +1072,7 @@ class TestProcess(unittest.TestCase):
@retry_on_failure()
def test_parents(self):
assert psutil.Process().parents()
- p1, p2 = create_proc_children_pair()
+ p1, p2 = self.create_proc_children_pair()
self.assertEqual(p1.parents()[0], psutil.Process())
self.assertEqual(p2.parents()[0], p1)
self.assertEqual(p2.parents()[1], psutil.Process())
@@ -1091,7 +1085,7 @@ class TestProcess(unittest.TestCase):
# On Windows we set the flag to 0 in order to cancel out the
# CREATE_NO_WINDOW flag (enabled by default) which creates
# an extra "conhost.exe" child.
- sproc = get_test_subprocess(creationflags=0)
+ sproc = self.get_test_subprocess(creationflags=0)
children1 = p.children()
children2 = p.children(recursive=True)
for children in (children1, children2):
@@ -1102,7 +1096,7 @@ class TestProcess(unittest.TestCase):
def test_children_recursive(self):
# Test children() against two sub processes, p1 and p2, where
# p1 (our child) spawned p2 (our grandchild).
- p1, p2 = create_proc_children_pair()
+ p1, p2 = self.create_proc_children_pair()
p = psutil.Process()
self.assertEqual(p.children(), [p1])
self.assertEqual(p.children(recursive=True), [p1, p2])
@@ -1131,7 +1125,7 @@ class TestProcess(unittest.TestCase):
self.assertEqual(len(c), len(set(c)))
def test_parents_and_children(self):
- p1, p2 = create_proc_children_pair()
+ p1, p2 = self.create_proc_children_pair()
me = psutil.Process()
# forward
children = me.children(recursive=True)
@@ -1144,7 +1138,7 @@ class TestProcess(unittest.TestCase):
self.assertEqual(parents[1], me)
def test_suspend_resume(self):
- sproc = get_test_subprocess()
+ sproc = self.get_test_subprocess()
p = psutil.Process(sproc.pid)
p.suspend()
for x in range(100):
@@ -1241,7 +1235,7 @@ class TestProcess(unittest.TestCase):
# Make sure oneshot() cache is nonglobal. Instead it's
# supposed to be bound to the Process instance, see:
# https://github.com/giampaolo/psutil/issues/1373
- p1, p2 = create_proc_children_pair()
+ p1, p2 = self.create_proc_children_pair()
p1_ppid = p1.ppid()
p2_ppid = p2.ppid()
self.assertNotEqual(p1_ppid, p2_ppid)
@@ -1260,7 +1254,7 @@ class TestProcess(unittest.TestCase):
# >>> time.sleep(2) # time-consuming task, process dies in meantime
# >>> proc.name()
# Refers to Issue #15
- sproc = get_test_subprocess()
+ sproc = self.get_test_subprocess()
p = psutil.Process(sproc.pid)
p.terminate()
p.wait()
@@ -1338,9 +1332,7 @@ class TestProcess(unittest.TestCase):
except (psutil.ZombieProcess, psutil.AccessDenied):
pass
- parent, zombie = create_zombie_proc()
- self.addCleanup(terminate, zombie)
- self.addCleanup(terminate, parent) # executed first
+ parent, zombie = self.create_zombie_proc()
# A zombie process should always be instantiable
zproc = psutil.Process(zombie.pid)
# ...and at least its status always be querable
@@ -1349,10 +1341,6 @@ class TestProcess(unittest.TestCase):
self.assertTrue(zproc.is_running())
# ...and as_dict() shouldn't crash
zproc.as_dict()
- # if cmdline succeeds it should be an empty list
- ret = succeed_or_zombie_p_exc(zproc.cmdline)
- if ret is not None:
- self.assertEqual(ret, [])
if hasattr(zproc, "rlimit"):
succeed_or_zombie_p_exc(zproc.rlimit, psutil.RLIMIT_NOFILE)
@@ -1504,9 +1492,8 @@ class TestProcess(unittest.TestCase):
""")
path = get_testfn()
create_exe(path, c_code=code)
- sproc = get_test_subprocess([path],
- stdin=subprocess.PIPE,
- stderr=subprocess.PIPE)
+ sproc = self.get_test_subprocess(
+ [path], stdin=subprocess.PIPE, stderr=subprocess.PIPE)
p = psutil.Process(sproc.pid)
wait_for_pid(p.pid)
self.assertTrue(p.is_running())
@@ -1578,7 +1565,8 @@ if POSIX and os.getuid() == 0:
class TestPopen(unittest.TestCase):
"""Tests for psutil.Popen class."""
- def tearDown(self):
+ @classmethod
+ def tearDownClass(cls):
reap_children()
def test_misc(self):
diff --git a/psutil/tests/test_system.py b/psutil/tests/test_system.py
index f5d9f49e..d0817459 100755
--- a/psutil/tests/test_system.py
+++ b/psutil/tests/test_system.py
@@ -35,7 +35,6 @@ from psutil.tests import check_net_address
from psutil.tests import CI_TESTING
from psutil.tests import DEVNULL
from psutil.tests import enum
-from psutil.tests import get_test_subprocess
from psutil.tests import get_testfn
from psutil.tests import HAS_BATTERY
from psutil.tests import HAS_CPU_FREQ
@@ -45,8 +44,8 @@ from psutil.tests import HAS_SENSORS_BATTERY
from psutil.tests import HAS_SENSORS_FANS
from psutil.tests import HAS_SENSORS_TEMPERATURES
from psutil.tests import mock
+from psutil.tests import ProcessTestCase
from psutil.tests import PYPY
-from psutil.tests import reap_children
from psutil.tests import retry_on_failure
from psutil.tests import TRAVIS
from psutil.tests import UNICODE_SUFFIX
@@ -58,14 +57,11 @@ from psutil.tests import unittest
# ===================================================================
-class TestProcessAPIs(unittest.TestCase):
-
- def tearDown(self):
- reap_children()
+class TestProcessAPIs(ProcessTestCase):
def test_process_iter(self):
self.assertIn(os.getpid(), [x.pid for x in psutil.process_iter()])
- sproc = get_test_subprocess()
+ sproc = self.get_test_subprocess()
self.assertIn(sproc.pid, [x.pid for x in psutil.process_iter()])
p = psutil.Process(sproc.pid)
p.kill()
@@ -107,9 +103,9 @@ class TestProcessAPIs(unittest.TestCase):
pids.append(p.pid)
pids = []
- sproc1 = get_test_subprocess()
- sproc2 = get_test_subprocess()
- sproc3 = get_test_subprocess()
+ sproc1 = self.get_test_subprocess()
+ sproc2 = self.get_test_subprocess()
+ sproc3 = self.get_test_subprocess()
procs = [psutil.Process(x.pid) for x in (sproc1, sproc2, sproc3)]
self.assertRaises(ValueError, psutil.wait_procs, procs, timeout=-1)
self.assertRaises(TypeError, psutil.wait_procs, procs, callback=1)
@@ -160,16 +156,16 @@ class TestProcessAPIs(unittest.TestCase):
@unittest.skipIf(PYPY and WINDOWS,
"get_test_subprocess() unreliable on PYPY + WINDOWS")
def test_wait_procs_no_timeout(self):
- sproc1 = get_test_subprocess()
- sproc2 = get_test_subprocess()
- sproc3 = get_test_subprocess()
+ sproc1 = self.get_test_subprocess()
+ sproc2 = self.get_test_subprocess()
+ sproc3 = self.get_test_subprocess()
procs = [psutil.Process(x.pid) for x in (sproc1, sproc2, sproc3)]
for p in procs:
p.terminate()
gone, alive = psutil.wait_procs(procs)
def test_pid_exists(self):
- sproc = get_test_subprocess()
+ sproc = self.get_test_subprocess()
self.assertTrue(psutil.pid_exists(sproc.pid))
p = psutil.Process(sproc.pid)
p.kill()
@@ -179,7 +175,6 @@ class TestProcessAPIs(unittest.TestCase):
self.assertEqual(psutil.pid_exists(0), 0 in psutil.pids())
def test_pid_exists_2(self):
- reap_children()
pids = psutil.pids()
for pid in pids:
try:
diff --git a/psutil/tests/test_testutils.py b/psutil/tests/test_testutils.py
index fd32e0b7..b2be93ff 100755
--- a/psutil/tests/test_testutils.py
+++ b/psutil/tests/test_testutils.py
@@ -16,6 +16,7 @@ import io
import os
import socket
import stat
+import subprocess
from psutil import FREEBSD
from psutil import NETBSD
@@ -29,15 +30,14 @@ from psutil.tests import bind_socket
from psutil.tests import bind_unix_socket
from psutil.tests import call_until
from psutil.tests import chdir
-from psutil.tests import create_proc_children_pair
from psutil.tests import create_sockets
-from psutil.tests import create_zombie_proc
from psutil.tests import get_free_port
-from psutil.tests import get_test_subprocess
from psutil.tests import get_testfn
from psutil.tests import HAS_CONNECTIONS_UNIX
from psutil.tests import is_namedtuple
from psutil.tests import mock
+from psutil.tests import ProcessTestCase
+from psutil.tests import PYTHON_EXE
from psutil.tests import reap_children
from psutil.tests import retry
from psutil.tests import retry_on_failure
@@ -209,10 +209,10 @@ class TestFSTestUtils(unittest.TestCase):
self.assertEqual(os.getcwd(), base)
-class TestProcessUtils(unittest.TestCase):
+class TestProcessUtils(ProcessTestCase):
def test_reap_children(self):
- subp = get_test_subprocess()
+ subp = self.get_test_subprocess()
p = psutil.Process(subp.pid)
assert p.is_running()
reap_children()
@@ -221,7 +221,7 @@ class TestProcessUtils(unittest.TestCase):
assert not psutil.tests._subprocesses_started
def test_create_proc_children_pair(self):
- p1, p2 = create_proc_children_pair()
+ p1, p2 = self.create_proc_children_pair()
self.assertNotEqual(p1.pid, p2.pid)
assert p1.is_running()
assert p2.is_running()
@@ -241,11 +241,38 @@ class TestProcessUtils(unittest.TestCase):
@unittest.skipIf(not POSIX, "POSIX only")
def test_create_zombie_proc(self):
- parent, zombie = create_zombie_proc()
- self.addCleanup(terminate, zombie)
- self.addCleanup(terminate, parent) # executed first
+ parent, zombie = self.create_zombie_proc()
self.assertEqual(zombie.status(), psutil.STATUS_ZOMBIE)
+ def test_terminate(self):
+ # by subprocess.Popen
+ p = self.get_test_subprocess()
+ terminate(p)
+ assert not psutil.pid_exists(p.pid)
+ terminate(p)
+ # by psutil.Process
+ p = psutil.Process(self.get_test_subprocess().pid)
+ terminate(p)
+ assert not psutil.pid_exists(p.pid)
+ terminate(p)
+ # by psutil.Popen
+ cmd = [PYTHON_EXE, "-c", "import time; time.sleep(60);"]
+ p = psutil.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+ terminate(p)
+ assert not psutil.pid_exists(p.pid)
+ terminate(p)
+ # by PID
+ pid = self.get_test_subprocess().pid
+ terminate(pid)
+ assert not psutil.pid_exists(pid)
+ terminate(pid)
+ # zombie
+ parent, zombie = self.create_zombie_proc()
+ terminate(parent)
+ terminate(zombie)
+ assert not psutil.pid_exists(parent.pid)
+ assert not psutil.pid_exists(zombie.pid)
+
class TestNetUtils(unittest.TestCase):
@@ -377,9 +404,9 @@ class TestMemLeakClass(TestMemoryLeak):
def test_execute_w_exc(self):
def fun():
1 / 0
- self.execute_w_exc(ZeroDivisionError, fun, times=2000, warmup_times=20,
- tolerance=4096, retry_for=3)
-
+ # XXX: use high tolerance, occasional false positive
+ self.execute_w_exc(ZeroDivisionError, fun, times=2000,
+ warmup_times=20, tolerance=200 * 1024, retry_for=3)
with self.assertRaises(ZeroDivisionError):
self.execute_w_exc(OSError, fun)
@@ -397,5 +424,5 @@ class TestOtherUtils(unittest.TestCase):
if __name__ == '__main__':
- from psutil.tests.runner import run
- run(__file__)
+ from psutil.tests.runner import run_from_name
+ run_from_name(__file__)
diff --git a/psutil/tests/test_unicode.py b/psutil/tests/test_unicode.py
index da6ec96e..ae9f7f51 100755
--- a/psutil/tests/test_unicode.py
+++ b/psutil/tests/test_unicode.py
@@ -98,12 +98,14 @@ from psutil.tests import HAS_CONNECTIONS_UNIX
from psutil.tests import HAS_ENVIRON
from psutil.tests import HAS_MEMORY_MAPS
from psutil.tests import INVALID_UNICODE_SUFFIX
+from psutil.tests import ProcessTestCase
from psutil.tests import PYPY
from psutil.tests import reap_children
from psutil.tests import safe_mkdir
from psutil.tests import safe_rmpath as _safe_rmpath
from psutil.tests import serialrun
from psutil.tests import skip_on_access_denied
+from psutil.tests import terminate
from psutil.tests import TESTFN_PREFIX
from psutil.tests import TRAVIS
from psutil.tests import UNICODE_SUFFIX
@@ -138,16 +140,18 @@ def subprocess_supports_unicode(suffix):
if PY3:
return True
name = get_testfn(suffix=suffix)
+ sproc = None
try:
safe_rmpath(name)
create_exe(name)
- get_test_subprocess(cmd=[name])
+ sproc = get_test_subprocess(cmd=[name])
except UnicodeEncodeError:
return False
else:
return True
finally:
- reap_children()
+ if sproc is not None:
+ terminate(sproc)
# ===================================================================
@@ -174,7 +178,7 @@ class _BaseFSAPIsTests(object):
# ---
def test_proc_exe(self):
- subp = get_test_subprocess(cmd=[self.funky_name])
+ subp = self.get_test_subprocess(cmd=[self.funky_name])
p = psutil.Process(subp.pid)
exe = p.exe()
self.assertIsInstance(exe, str)
@@ -183,14 +187,14 @@ class _BaseFSAPIsTests(object):
os.path.normcase(self.funky_name))
def test_proc_name(self):
- subp = get_test_subprocess(cmd=[self.funky_name])
+ subp = self.get_test_subprocess(cmd=[self.funky_name])
name = psutil.Process(subp.pid).name()
self.assertIsInstance(name, str)
if self.expect_exact_path_match():
self.assertEqual(name, os.path.basename(self.funky_name))
def test_proc_cmdline(self):
- subp = get_test_subprocess(cmd=[self.funky_name])
+ subp = self.get_test_subprocess(cmd=[self.funky_name])
p = psutil.Process(subp.pid)
cmdline = p.cmdline()
for part in cmdline:
@@ -300,7 +304,7 @@ class _BaseFSAPIsTests(object):
@unittest.skipIf(ASCII_FS, "ASCII fs")
@unittest.skipIf(not subprocess_supports_unicode(UNICODE_SUFFIX),
"subprocess can't deal with unicode")
-class TestFSAPIs(_BaseFSAPIsTests, unittest.TestCase):
+class TestFSAPIs(_BaseFSAPIsTests, ProcessTestCase):
"""Test FS APIs with a funky, valid, UTF8 path name."""
funky_suffix = UNICODE_SUFFIX
@@ -318,7 +322,7 @@ class TestFSAPIs(_BaseFSAPIsTests, unittest.TestCase):
@unittest.skipIf(PYPY, "unreliable on PYPY")
@unittest.skipIf(not subprocess_supports_unicode(INVALID_UNICODE_SUFFIX),
"subprocess can't deal with invalid unicode")
-class TestFSAPIsWithInvalidPath(_BaseFSAPIsTests, unittest.TestCase):
+class TestFSAPIsWithInvalidPath(_BaseFSAPIsTests, ProcessTestCase):
"""Test FS APIs with a funky, invalid path name."""
funky_suffix = INVALID_UNICODE_SUFFIX
@@ -333,7 +337,7 @@ class TestFSAPIsWithInvalidPath(_BaseFSAPIsTests, unittest.TestCase):
# ===================================================================
-class TestNonFSAPIS(unittest.TestCase):
+class TestNonFSAPIS(ProcessTestCase):
"""Unicode tests for non fs-related APIs."""
def tearDown(self):
@@ -350,7 +354,7 @@ class TestNonFSAPIS(unittest.TestCase):
env = os.environ.copy()
funky_str = UNICODE_SUFFIX if PY3 else 'รจ'
env['FUNNY_ARG'] = funky_str
- sproc = get_test_subprocess(env=env)
+ sproc = self.get_test_subprocess(env=env)
p = psutil.Process(sproc.pid)
env = p.environ()
for k, v in env.items():
diff --git a/psutil/tests/test_windows.py b/psutil/tests/test_windows.py
index 27343ca2..a51c9c15 100755
--- a/psutil/tests/test_windows.py
+++ b/psutil/tests/test_windows.py
@@ -26,11 +26,13 @@ from psutil.tests import APPVEYOR
from psutil.tests import get_test_subprocess
from psutil.tests import HAS_BATTERY
from psutil.tests import mock
+from psutil.tests import ProcessTestCase
from psutil.tests import PY3
from psutil.tests import PYPY
from psutil.tests import reap_children
from psutil.tests import retry_on_failure
from psutil.tests import sh
+from psutil.tests import terminate
from psutil.tests import unittest
@@ -298,7 +300,7 @@ class TestSensorsBattery(TestCase):
@unittest.skipIf(not WINDOWS, "WINDOWS only")
-class TestProcess(TestCase):
+class TestProcess(ProcessTestCase):
@classmethod
def setUpClass(cls):
@@ -306,7 +308,7 @@ class TestProcess(TestCase):
@classmethod
def tearDownClass(cls):
- reap_children()
+ terminate(cls.pid)
def test_issue_24(self):
p = psutil.Process(0)
@@ -382,7 +384,7 @@ class TestProcess(TestCase):
@unittest.skipIf(not sys.version_info >= (2, 7),
"CTRL_* signals not supported")
def test_ctrl_signals(self):
- p = psutil.Process(get_test_subprocess().pid)
+ p = psutil.Process(self.get_test_subprocess().pid)
p.send_signal(signal.CTRL_C_EVENT)
p.send_signal(signal.CTRL_BREAK_EVENT)
p.kill()
@@ -609,7 +611,7 @@ class TestDualProcessImplementation(TestCase):
@classmethod
def tearDownClass(cls):
- reap_children()
+ terminate(cls.pid)
def test_memory_info(self):
mem_1 = psutil.Process(self.pid).memory_info()
@@ -675,7 +677,7 @@ class TestDualProcessImplementation(TestCase):
@unittest.skipIf(not WINDOWS, "WINDOWS only")
-class RemoteProcessTestCase(TestCase):
+class RemoteProcessTestCase(ProcessTestCase):
"""Certain functions require calling ReadProcessMemory.
This trivially works when called on the current process.
Check that this works on other processes, especially when they
@@ -717,17 +719,18 @@ class RemoteProcessTestCase(TestCase):
def setUp(self):
env = os.environ.copy()
env["THINK_OF_A_NUMBER"] = str(os.getpid())
- self.proc32 = get_test_subprocess([self.python32] + self.test_args,
- env=env,
- stdin=subprocess.PIPE)
- self.proc64 = get_test_subprocess([self.python64] + self.test_args,
- env=env,
- stdin=subprocess.PIPE)
+ self.proc32 = self.get_test_subprocess(
+ [self.python32] + self.test_args,
+ env=env,
+ stdin=subprocess.PIPE)
+ self.proc64 = self.get_test_subprocess(
+ [self.python64] + self.test_args,
+ env=env,
+ stdin=subprocess.PIPE)
def tearDown(self):
self.proc32.communicate()
self.proc64.communicate()
- reap_children()
@classmethod
def tearDownClass(cls):