summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGiampaolo Rodola <g.rodola@gmail.com>2020-05-01 08:40:18 -0700
committerGitHub <noreply@github.com>2020-05-01 17:40:18 +0200
commit13ba2222971781dc8150099d1fddefb538417ba1 (patch)
tree00757a1f3961b6143f72b7ed4938fc722dc7be83
parent6f4e38220d9da33931ff9a307d20025a6916c258 (diff)
downloadpsutil-13ba2222971781dc8150099d1fddefb538417ba1.tar.gz
Refactor process test utils methods (#1745)
...in order to accomodate Cygwin implementation.
-rw-r--r--psutil/tests/__init__.py27
-rwxr-xr-xpsutil/tests/runner.py8
-rwxr-xr-xpsutil/tests/test_bsd.py6
-rwxr-xr-xpsutil/tests/test_memory_leaks.py4
-rwxr-xr-xpsutil/tests/test_osx.py8
-rwxr-xr-xpsutil/tests/test_posix.py7
-rwxr-xr-xpsutil/tests/test_process.py205
-rw-r--r--psutil/tests/test_system.py20
-rw-r--r--psutil/tests/test_testutils.py18
-rw-r--r--psutil/tests/test_unicode.py12
-rwxr-xr-xpsutil/tests/test_windows.py14
11 files changed, 151 insertions, 178 deletions
diff --git a/psutil/tests/__init__.py b/psutil/tests/__init__.py
index 6cdf3bc8..c2766e23 100644
--- a/psutil/tests/__init__.py
+++ b/psutil/tests/__init__.py
@@ -48,6 +48,7 @@ from psutil._compat import FileExistsError
from psutil._compat import FileNotFoundError
from psutil._compat import PY3
from psutil._compat import range
+from psutil._compat import super
from psutil._compat import u
from psutil._compat import unicode
from psutil._compat import which
@@ -81,8 +82,8 @@ __all__ = [
"HAS_SENSORS_BATTERY", "HAS_BATTERY", "HAS_SENSORS_FANS",
"HAS_SENSORS_TEMPERATURES", "HAS_MEMORY_FULL_INFO",
# subprocesses
- 'pyrun', 'terminate', 'reap_children', 'get_test_subprocess',
- 'create_zombie_proc', 'create_proc_children_pair',
+ 'pyrun', 'terminate', 'reap_children', 'spawn_testproc', 'spawn_zombie',
+ 'spawn_children_pair',
# threads
'ThreadTask'
# test utils
@@ -230,7 +231,7 @@ class ThreadTask(threading.Thread):
"""A thread task which does nothing expect staying alive."""
def __init__(self):
- threading.Thread.__init__(self)
+ super().__init__()
self._running = False
self._interval = 0.001
self._flag = threading.Event()
@@ -286,7 +287,7 @@ def _reap_children_on_err(fun):
@_reap_children_on_err
-def get_test_subprocess(cmd=None, **kwds):
+def spawn_testproc(cmd=None, **kwds):
"""Creates a python subprocess which does nothing for 60 secs and
return it as a subprocess.Popen instance.
If "cmd" is specified that is used instead of python.
@@ -326,7 +327,7 @@ def get_test_subprocess(cmd=None, **kwds):
@_reap_children_on_err
-def create_proc_children_pair():
+def spawn_children_pair():
"""Create a subprocess which creates another one as in:
A (us) -> B (child) -> C (grandchild).
Return a (child, grandchild) tuple.
@@ -364,7 +365,7 @@ def create_proc_children_pair():
safe_rmpath(tfile)
-def create_zombie_proc():
+def spawn_zombie():
"""Create a zombie process and return a (parent, zombie) process tuple.
In order to kill the zombie parent must be terminate()d first, then
zombie must be wait()ed on.
@@ -421,7 +422,7 @@ def pyrun(src, **kwds):
try:
with open(srcfile, 'wt') as f:
f.write(src)
- subp = get_test_subprocess([PYTHON_EXE, f.name], **kwds)
+ subp = spawn_testproc([PYTHON_EXE, f.name], **kwds)
wait_for_pid(subp.pid)
return (subp, srcfile)
except Exception:
@@ -857,19 +858,19 @@ class PsutilTestCase(TestCase):
self.addCleanup(safe_rmpath, fname)
return fname
- def get_test_subprocess(self, *args, **kwds):
- sproc = get_test_subprocess(*args, **kwds)
+ def spawn_testproc(self, *args, **kwds):
+ sproc = spawn_testproc(*args, **kwds)
self.addCleanup(terminate, sproc)
return sproc
- def create_proc_children_pair(self):
- child1, child2 = create_proc_children_pair()
+ def spawn_children_pair(self):
+ child1, child2 = spawn_children_pair()
self.addCleanup(terminate, child2)
self.addCleanup(terminate, child1) # executed first
return (child1, child2)
- def create_zombie_proc(self):
- parent, zombie = create_zombie_proc()
+ def spawn_zombie(self):
+ parent, zombie = spawn_zombie()
self.addCleanup(terminate, zombie)
self.addCleanup(terminate, parent) # executed first
return (parent, zombie)
diff --git a/psutil/tests/runner.py b/psutil/tests/runner.py
index a7f74964..14e33fba 100755
--- a/psutil/tests/runner.py
+++ b/psutil/tests/runner.py
@@ -200,8 +200,8 @@ class ParallelRunner(ColouredTextRunner):
@staticmethod
def _parallelize(suite):
- def fdopen(*args, **kwds):
- stream = orig_fdopen(*args, **kwds)
+ def fdopen(fd, mode, *kwds):
+ stream = orig_fdopen(fd, mode)
atexit.register(stream.close)
return stream
@@ -221,9 +221,9 @@ class ParallelRunner(ColouredTextRunner):
continue
test_class = test._tests[0].__class__
if getattr(test_class, '_serialrun', False):
- serial.addTest(loadTestsFromTestCase(test_class))
+ serial.addTest(test)
else:
- parallel.addTest(loadTestsFromTestCase(test_class))
+ parallel.addTest(test)
return (serial, parallel)
def run(self, suite):
diff --git a/psutil/tests/test_bsd.py b/psutil/tests/test_bsd.py
index 598ec0bf..cfbec71d 100755
--- a/psutil/tests/test_bsd.py
+++ b/psutil/tests/test_bsd.py
@@ -20,7 +20,7 @@ from psutil import BSD
from psutil import FREEBSD
from psutil import NETBSD
from psutil import OPENBSD
-from psutil.tests import get_test_subprocess
+from psutil.tests import spawn_testproc
from psutil.tests import HAS_BATTERY
from psutil.tests import PsutilTestCase
from psutil.tests import retry_on_failure
@@ -78,7 +78,7 @@ class BSDTestCase(PsutilTestCase):
@classmethod
def setUpClass(cls):
- cls.pid = get_test_subprocess().pid
+ cls.pid = spawn_testproc().pid
@classmethod
def tearDownClass(cls):
@@ -153,7 +153,7 @@ class FreeBSDPsutilTestCase(PsutilTestCase):
@classmethod
def setUpClass(cls):
- cls.pid = get_test_subprocess().pid
+ cls.pid = spawn_testproc().pid
@classmethod
def tearDownClass(cls):
diff --git a/psutil/tests/test_memory_leaks.py b/psutil/tests/test_memory_leaks.py
index 3d004f0a..d8f39035 100755
--- a/psutil/tests/test_memory_leaks.py
+++ b/psutil/tests/test_memory_leaks.py
@@ -32,7 +32,7 @@ from psutil._compat import ProcessLookupError
from psutil._compat import super
from psutil.tests import CIRRUS
from psutil.tests import create_sockets
-from psutil.tests import get_test_subprocess
+from psutil.tests import spawn_testproc
from psutil.tests import get_testfn
from psutil.tests import HAS_CPU_AFFINITY
from psutil.tests import HAS_CPU_FREQ
@@ -282,7 +282,7 @@ class TestTerminatedProcessLeaks(TestProcessObjectLeaks):
@classmethod
def setUpClass(cls):
super().setUpClass()
- p = get_test_subprocess()
+ p = spawn_testproc()
cls.proc = psutil.Process(p.pid)
cls.proc.kill()
cls.proc.wait()
diff --git a/psutil/tests/test_osx.py b/psutil/tests/test_osx.py
index c2e6ad72..1d6e1dc9 100755
--- a/psutil/tests/test_osx.py
+++ b/psutil/tests/test_osx.py
@@ -12,8 +12,8 @@ import time
import psutil
from psutil import MACOS
-from psutil.tests import create_zombie_proc
-from psutil.tests import get_test_subprocess
+from psutil.tests import spawn_zombie
+from psutil.tests import spawn_testproc
from psutil.tests import HAS_BATTERY
from psutil.tests import PsutilTestCase
from psutil.tests import retry_on_failure
@@ -81,7 +81,7 @@ class TestProcess(PsutilTestCase):
@classmethod
def setUpClass(cls):
- cls.pid = get_test_subprocess().pid
+ cls.pid = spawn_testproc().pid
@classmethod
def tearDownClass(cls):
@@ -107,7 +107,7 @@ class TestZombieProcessAPIs(PsutilTestCase):
@classmethod
def setUpClass(cls):
- cls.parent, cls.zombie = create_zombie_proc()
+ cls.parent, cls.zombie = spawn_zombie()
@classmethod
def tearDownClass(cls):
diff --git a/psutil/tests/test_posix.py b/psutil/tests/test_posix.py
index 9eeb5c2b..12ea6682 100755
--- a/psutil/tests/test_posix.py
+++ b/psutil/tests/test_posix.py
@@ -24,7 +24,7 @@ from psutil import POSIX
from psutil import SUNOS
from psutil.tests import CI_TESTING
from psutil.tests import get_kernel_version
-from psutil.tests import get_test_subprocess
+from psutil.tests import spawn_testproc
from psutil.tests import HAS_NET_IO_COUNTERS
from psutil.tests import mock
from psutil.tests import PsutilTestCase
@@ -132,9 +132,8 @@ class TestProcess(PsutilTestCase):
@classmethod
def setUpClass(cls):
- cls.pid = get_test_subprocess([PYTHON_EXE, "-E", "-O"],
- stdin=subprocess.PIPE).pid
- # wait_for_pid(cls.pid)
+ cls.pid = spawn_testproc([PYTHON_EXE, "-E", "-O"],
+ stdin=subprocess.PIPE).pid
@classmethod
def tearDownClass(cls):
diff --git a/psutil/tests/test_process.py b/psutil/tests/test_process.py
index 11ab725b..45bac0c1 100755
--- a/psutil/tests/test_process.py
+++ b/psutil/tests/test_process.py
@@ -72,57 +72,56 @@ from psutil.tests import wait_for_pid
class TestProcess(PsutilTestCase):
"""Tests for psutil.Process class."""
+ def spawn_psproc(self, *args, **kwargs):
+ sproc = self.spawn_testproc(*args, **kwargs)
+ return psutil.Process(sproc.pid)
+
+ # ---
+
def test_pid(self):
p = psutil.Process()
self.assertEqual(p.pid, os.getpid())
- sproc = self.get_test_subprocess()
- self.assertEqual(psutil.Process(sproc.pid).pid, sproc.pid)
with self.assertRaises(AttributeError):
p.pid = 33
def test_kill(self):
- sproc = self.get_test_subprocess()
- p = psutil.Process(sproc.pid)
+ p = self.spawn_psproc()
p.kill()
code = p.wait()
- self.assertFalse(psutil.pid_exists(sproc.pid))
+ self.assertFalse(psutil.pid_exists(p.pid))
if POSIX:
self.assertEqual(code, -signal.SIGKILL)
def test_terminate(self):
- sproc = self.get_test_subprocess()
- p = psutil.Process(sproc.pid)
+ p = self.spawn_psproc()
p.terminate()
code = p.wait()
- self.assertFalse(psutil.pid_exists(sproc.pid))
+ self.assertFalse(psutil.pid_exists(p.pid))
if POSIX:
self.assertEqual(code, -signal.SIGTERM)
def test_send_signal(self):
sig = signal.SIGKILL if POSIX else signal.SIGTERM
- sproc = self.get_test_subprocess()
- p = psutil.Process(sproc.pid)
+ p = self.spawn_psproc()
p.send_signal(sig)
code = p.wait()
self.assertFalse(psutil.pid_exists(p.pid))
if POSIX:
self.assertEqual(code, -sig)
#
- sproc = self.get_test_subprocess()
- p = psutil.Process(sproc.pid)
+ p = self.spawn_psproc()
p.send_signal(sig)
with mock.patch('psutil.os.kill',
side_effect=OSError(errno.ESRCH, "")):
with self.assertRaises(psutil.NoSuchProcess):
p.send_signal(sig)
#
- sproc = self.get_test_subprocess()
- p = psutil.Process(sproc.pid)
+ p = self.spawn_psproc()
p.send_signal(sig)
with mock.patch('psutil.os.kill',
side_effect=OSError(errno.EPERM, "")):
with self.assertRaises(psutil.AccessDenied):
- psutil.Process().send_signal(sig)
+ p.send_signal(sig)
# Sending a signal to process with PID 0 is not allowed as
# it would affect every process in the process group of
# the calling process (os.getpid()) instead of PID 0").
@@ -132,8 +131,7 @@ class TestProcess(PsutilTestCase):
def test_wait(self):
# check exit code signal
- sproc = self.get_test_subprocess()
- p = psutil.Process(sproc.pid)
+ p = self.spawn_psproc()
p.kill()
code = p.wait()
if POSIX:
@@ -142,8 +140,7 @@ class TestProcess(PsutilTestCase):
self.assertEqual(code, signal.SIGTERM)
self.assertFalse(p.is_running())
- sproc = self.get_test_subprocess()
- p = psutil.Process(sproc.pid)
+ p = self.spawn_psproc()
p.terminate()
code = p.wait()
if POSIX:
@@ -154,8 +151,7 @@ class TestProcess(PsutilTestCase):
# check sys.exit() code
code = "import time, sys; time.sleep(0.01); sys.exit(5);"
- sproc = self.get_test_subprocess([PYTHON_EXE, "-c", code])
- p = psutil.Process(sproc.pid)
+ p = self.spawn_psproc([PYTHON_EXE, "-c", code])
self.assertEqual(p.wait(), 5)
self.assertFalse(p.is_running())
@@ -163,14 +159,12 @@ class TestProcess(PsutilTestCase):
# It is not supposed to raise NSP when the process is gone.
# On UNIX this should return None, on Windows it should keep
# returning the exit code.
- sproc = self.get_test_subprocess([PYTHON_EXE, "-c", code])
- p = psutil.Process(sproc.pid)
+ p = self.spawn_psproc([PYTHON_EXE, "-c", code])
self.assertEqual(p.wait(), 5)
self.assertIn(p.wait(), (5, None))
# test timeout
- sproc = self.get_test_subprocess()
- p = psutil.Process(sproc.pid)
+ p = self.spawn_psproc()
p.name()
self.assertRaises(psutil.TimeoutExpired, p.wait, 0.01)
@@ -180,7 +174,7 @@ class TestProcess(PsutilTestCase):
def test_wait_non_children(self):
# Test wait() against a process which is not our direct
# child.
- child, grandchild = self.create_proc_children_pair()
+ child, grandchild = self.spawn_children_pair()
self.assertRaises(psutil.TimeoutExpired, child.wait, 0.01)
self.assertRaises(psutil.TimeoutExpired, grandchild.wait, 0.01)
# We also terminate the direct child otherwise the
@@ -199,8 +193,7 @@ class TestProcess(PsutilTestCase):
self.assertEqual(child_ret, signal.SIGTERM)
def test_wait_timeout_0(self):
- sproc = self.get_test_subprocess()
- p = psutil.Process(sproc.pid)
+ p = self.spawn_psproc()
self.assertRaises(psutil.TimeoutExpired, p.wait, 0)
p.kill()
stop_at = time.time() + 2
@@ -269,9 +262,8 @@ class TestProcess(PsutilTestCase):
self.assertIn(p.cpu_num(), range(psutil.cpu_count()))
def test_create_time(self):
- sproc = self.get_test_subprocess()
+ p = self.spawn_psproc()
now = time.time()
- p = psutil.Process(sproc.pid)
create_time = p.create_time()
# Use time.time() as base value to compare our result using a
@@ -436,8 +428,7 @@ class TestProcess(PsutilTestCase):
@unittest.skipIf(not HAS_RLIMIT, "not supported")
def test_rlimit_set(self):
- sproc = self.get_test_subprocess()
- p = psutil.Process(sproc.pid)
+ p = self.spawn_psproc()
p.rlimit(psutil.RLIMIT_NOFILE, (5, 5))
self.assertEqual(p.rlimit(psutil.RLIMIT_NOFILE), (5, 5))
# If pid is 0 prlimit() applies to the calling process and
@@ -449,8 +440,8 @@ class TestProcess(PsutilTestCase):
@unittest.skipIf(not HAS_RLIMIT, "not supported")
def test_rlimit(self):
- testfn = self.get_testfn()
p = psutil.Process()
+ testfn = self.get_testfn()
soft, hard = p.rlimit(psutil.RLIMIT_FSIZE)
try:
p.rlimit(psutil.RLIMIT_FSIZE, (1024, hard))
@@ -471,13 +462,12 @@ class TestProcess(PsutilTestCase):
def test_rlimit_infinity(self):
# First set a limit, then re-set it by specifying INFINITY
# and assume we overridden the previous limit.
- testfn = self.get_testfn()
p = psutil.Process()
soft, hard = p.rlimit(psutil.RLIMIT_FSIZE)
try:
p.rlimit(psutil.RLIMIT_FSIZE, (1024, hard))
p.rlimit(psutil.RLIMIT_FSIZE, (psutil.RLIM_INFINITY, hard))
- with open(testfn, "wb") as f:
+ with open(self.get_testfn(), "wb") as f:
f.write(b"X" * 2048)
finally:
p.rlimit(psutil.RLIMIT_FSIZE, (soft, hard))
@@ -542,8 +532,7 @@ class TestProcess(PsutilTestCase):
@skip_on_access_denied(only_if=MACOS)
@unittest.skipIf(not HAS_THREADS, 'not supported')
def test_threads_2(self):
- sproc = self.get_test_subprocess()
- p = psutil.Process(sproc.pid)
+ p = self.spawn_psproc()
if OPENBSD:
try:
p.threads()
@@ -588,8 +577,9 @@ class TestProcess(PsutilTestCase):
self.assertGreaterEqual(getattr(mem, name), 0)
def test_memory_full_info(self):
+ p = psutil.Process()
total = psutil.virtual_memory().total
- mem = psutil.Process().memory_full_info()
+ mem = p.memory_full_info()
for name in mem._fields:
value = getattr(mem, name)
self.assertGreaterEqual(value, 0, msg=(name, value))
@@ -646,11 +636,12 @@ class TestProcess(PsutilTestCase):
@unittest.skipIf(not HAS_MEMORY_MAPS, "not supported")
def test_memory_maps_lists_lib(self):
# Make sure a newly loaded shared lib is listed.
+ p = psutil.Process()
with copyload_shared_lib() as path:
def normpath(p):
return os.path.realpath(os.path.normcase(p))
libpaths = [normpath(x.path)
- for x in psutil.Process().memory_maps()]
+ for x in p.memory_maps()]
self.assertIn(normpath(path), libpaths)
def test_memory_percent(self):
@@ -661,8 +652,7 @@ class TestProcess(PsutilTestCase):
p.memory_percent(memtype='uss')
def test_is_running(self):
- sproc = self.get_test_subprocess()
- p = psutil.Process(sproc.pid)
+ p = self.spawn_psproc()
assert p.is_running()
assert p.is_running()
p.kill()
@@ -671,8 +661,8 @@ class TestProcess(PsutilTestCase):
assert not p.is_running()
def test_exe(self):
- sproc = self.get_test_subprocess()
- exe = psutil.Process(sproc.pid).exe()
+ p = self.spawn_psproc()
+ exe = p.exe()
try:
self.assertEqual(exe, PYTHON_EXE)
except AssertionError:
@@ -700,10 +690,9 @@ class TestProcess(PsutilTestCase):
def test_cmdline(self):
cmdline = [PYTHON_EXE, "-c", "import time; time.sleep(60)"]
- sproc = self.get_test_subprocess(cmdline)
+ p = self.spawn_psproc(cmdline)
try:
- self.assertEqual(' '.join(psutil.Process(sproc.pid).cmdline()),
- ' '.join(cmdline))
+ self.assertEqual(' '.join(p.cmdline()), ' '.join(cmdline))
except AssertionError:
# XXX - most of the times the underlying sysctl() call on Net
# and Open BSD returns a truncated string.
@@ -711,8 +700,7 @@ class TestProcess(PsutilTestCase):
# like this is a kernel bug.
# XXX - AIX truncates long arguments in /proc/pid/cmdline
if NETBSD or OPENBSD or AIX:
- self.assertEqual(
- psutil.Process(sproc.pid).cmdline()[0], PYTHON_EXE)
+ self.assertEqual(p.cmdline()[0], PYTHON_EXE)
else:
raise
@@ -721,13 +709,12 @@ class TestProcess(PsutilTestCase):
testfn = self.get_testfn()
create_exe(testfn)
cmdline = [testfn] + (["0123456789"] * 20)
- sproc = self.get_test_subprocess(cmdline)
- p = psutil.Process(sproc.pid)
+ p = self.spawn_psproc(cmdline)
self.assertEqual(p.cmdline(), cmdline)
def test_name(self):
- sproc = self.get_test_subprocess(PYTHON_EXE)
- name = psutil.Process(sproc.pid).name().lower()
+ p = self.spawn_psproc(PYTHON_EXE)
+ name = p.name().lower()
pyexe = os.path.basename(os.path.realpath(sys.executable)).lower()
assert pyexe.startswith(name), (pyexe, name)
@@ -735,8 +722,7 @@ class TestProcess(PsutilTestCase):
def test_long_name(self):
testfn = self.get_testfn(suffix="0123456789" * 2)
create_exe(testfn)
- sproc = self.get_test_subprocess(testfn)
- p = psutil.Process(sproc.pid)
+ p = self.spawn_psproc(testfn)
self.assertEqual(p.name(), os.path.basename(testfn))
# XXX
@@ -752,8 +738,7 @@ class TestProcess(PsutilTestCase):
cmdline = [funky_path, "-c",
"import time; [time.sleep(0.01) for x in range(3000)];"
"arg1", "arg2", "", "arg3", ""]
- sproc = self.get_test_subprocess(cmdline)
- p = psutil.Process(sproc.pid)
+ p = self.spawn_psproc(cmdline)
# ...in order to try to prevent occasional failures on travis
if TRAVIS:
wait_for_pid(p.pid)
@@ -836,8 +821,7 @@ class TestProcess(PsutilTestCase):
self.assertEqual(p.status(), psutil.STATUS_RUNNING)
def test_username(self):
- sproc = self.get_test_subprocess()
- p = psutil.Process(sproc.pid)
+ p = self.spawn_psproc()
username = p.username()
if WINDOWS:
domain, username = username.split('\\')
@@ -848,15 +832,13 @@ class TestProcess(PsutilTestCase):
self.assertEqual(username, getpass.getuser())
def test_cwd(self):
- sproc = self.get_test_subprocess()
- p = psutil.Process(sproc.pid)
+ p = self.spawn_psproc()
self.assertEqual(p.cwd(), os.getcwd())
def test_cwd_2(self):
cmd = [PYTHON_EXE, "-c",
"import os, time; os.chdir('..'); time.sleep(60)"]
- sproc = self.get_test_subprocess(cmd)
- p = psutil.Process(sproc.pid)
+ p = self.spawn_psproc(cmd)
call_until(p.cwd, "ret == os.path.dirname(os.getcwd())")
@unittest.skipIf(not HAS_CPU_AFFINITY, 'not supported')
@@ -904,8 +886,7 @@ class TestProcess(PsutilTestCase):
@unittest.skipIf(not HAS_CPU_AFFINITY, 'not supported')
def test_cpu_affinity_errs(self):
- sproc = self.get_test_subprocess()
- p = psutil.Process(sproc.pid)
+ p = self.spawn_psproc()
invalid_cpu = [len(psutil.cpu_times(percpu=True)) + 10]
self.assertRaises(ValueError, p.cpu_affinity, invalid_cpu)
self.assertRaises(ValueError, p.cpu_affinity, range(10000, 11000))
@@ -937,9 +918,8 @@ class TestProcess(PsutilTestCase):
# can't find any process file on Appveyor
@unittest.skipIf(APPVEYOR, "unreliable on APPVEYOR")
def test_open_files(self):
- # current process
- testfn = self.get_testfn()
p = psutil.Process()
+ testfn = self.get_testfn()
files = p.open_files()
self.assertFalse(testfn in files)
with open(testfn, 'wb') as f:
@@ -958,8 +938,7 @@ class TestProcess(PsutilTestCase):
# another process
cmdline = "import time; f = open(r'%s', 'r'); time.sleep(60);" % testfn
- sproc = self.get_test_subprocess([PYTHON_EXE, "-c", cmdline])
- p = psutil.Process(sproc.pid)
+ p = self.spawn_psproc([PYTHON_EXE, "-c", cmdline])
for x in range(100):
filenames = [os.path.normcase(x.path) for x in p.open_files()]
@@ -977,10 +956,10 @@ class TestProcess(PsutilTestCase):
@unittest.skipIf(APPVEYOR, "unreliable on APPVEYOR")
def test_open_files_2(self):
# test fd and path fields
+ p = psutil.Process()
normcase = os.path.normcase
testfn = self.get_testfn()
with open(testfn, 'w') as fileobj:
- p = psutil.Process()
for file in p.open_files():
if normcase(file.path) == normcase(fileobj.name) or \
file.fd == fileobj.fileno():
@@ -1001,8 +980,8 @@ class TestProcess(PsutilTestCase):
@unittest.skipIf(not POSIX, 'POSIX only')
def test_num_fds(self):
- testfn = self.get_testfn()
p = psutil.Process()
+ testfn = self.get_testfn()
start = p.num_fds()
file = open(testfn, 'w')
self.addCleanup(file.close)
@@ -1026,78 +1005,73 @@ class TestProcess(PsutilTestCase):
self.fail("num ctx switches still the same after 50.000 iterations")
def test_ppid(self):
+ p = psutil.Process()
if hasattr(os, 'getppid'):
- self.assertEqual(psutil.Process().ppid(), os.getppid())
- sproc = self.get_test_subprocess()
- p = psutil.Process(sproc.pid)
+ self.assertEqual(p.ppid(), os.getppid())
+ p = self.spawn_psproc()
self.assertEqual(p.ppid(), os.getpid())
if APPVEYOR:
# Occasional failures, see:
# https://ci.appveyor.com/project/giampaolo/psutil/build/
# job/0hs623nenj7w4m33
return
- for p in psutil.process_iter():
- if p.pid == sproc.pid:
- continue
- # XXX: sometimes this fails on Windows; not sure why.
- self.assertNotEqual(p.ppid(), os.getpid(), msg=p)
def test_parent(self):
- sproc = self.get_test_subprocess()
- p = psutil.Process(sproc.pid)
+ p = self.spawn_psproc()
self.assertEqual(p.parent().pid, os.getpid())
lowest_pid = psutil.pids()[0]
self.assertIsNone(psutil.Process(lowest_pid).parent())
def test_parent_multi(self):
- child, grandchild = self.create_proc_children_pair()
+ parent = psutil.Process()
+ child, grandchild = self.spawn_children_pair()
self.assertEqual(grandchild.parent(), child)
- self.assertEqual(child.parent(), psutil.Process())
+ self.assertEqual(child.parent(), parent)
def test_parent_disappeared(self):
# Emulate a case where the parent process disappeared.
- sproc = self.get_test_subprocess()
- p = psutil.Process(sproc.pid)
+ p = self.spawn_psproc()
with mock.patch("psutil.Process",
side_effect=psutil.NoSuchProcess(0, 'foo')):
self.assertIsNone(p.parent())
@retry_on_failure()
def test_parents(self):
- assert psutil.Process().parents()
- child, grandchild = self.create_proc_children_pair()
- self.assertEqual(child.parents()[0], psutil.Process())
+ parent = psutil.Process()
+ assert parent.parents()
+ child, grandchild = self.spawn_children_pair()
+ self.assertEqual(child.parents()[0], parent)
self.assertEqual(grandchild.parents()[0], child)
- self.assertEqual(grandchild.parents()[1], psutil.Process())
+ self.assertEqual(grandchild.parents()[1], parent)
def test_children(self):
- p = psutil.Process()
- self.assertEqual(p.children(), [])
- self.assertEqual(p.children(recursive=True), [])
+ parent = psutil.Process()
+ self.assertEqual(parent.children(), [])
+ self.assertEqual(parent.children(recursive=True), [])
# On Windows we set the flag to 0 in order to cancel out the
# CREATE_NO_WINDOW flag (enabled by default) which creates
# an extra "conhost.exe" child.
- sproc = self.get_test_subprocess(creationflags=0)
- children1 = p.children()
- children2 = p.children(recursive=True)
+ child = self.spawn_psproc(creationflags=0)
+ children1 = parent.children()
+ children2 = parent.children(recursive=True)
for children in (children1, children2):
self.assertEqual(len(children), 1)
- self.assertEqual(children[0].pid, sproc.pid)
- self.assertEqual(children[0].ppid(), os.getpid())
+ self.assertEqual(children[0].pid, child.pid)
+ self.assertEqual(children[0].ppid(), parent.pid)
def test_children_recursive(self):
# Test children() against two sub processes, p1 and p2, where
# p1 (our child) spawned p2 (our grandchild).
- child, grandchild = self.create_proc_children_pair()
- p = psutil.Process()
- self.assertEqual(p.children(), [child])
- self.assertEqual(p.children(recursive=True), [child, grandchild])
+ parent = psutil.Process()
+ child, grandchild = self.spawn_children_pair()
+ self.assertEqual(parent.children(), [child])
+ self.assertEqual(parent.children(recursive=True), [child, grandchild])
# If the intermediate process is gone there's no way for
# children() to recursively find it.
child.terminate()
child.wait()
- self.assertEqual(p.children(recursive=True), [])
+ self.assertEqual(parent.children(recursive=True), [])
def test_children_duplicates(self):
# find the process which has the highest number of children
@@ -1118,21 +1092,20 @@ class TestProcess(PsutilTestCase):
self.assertEqual(len(c), len(set(c)))
def test_parents_and_children(self):
- child, grandchild = self.create_proc_children_pair()
- me = psutil.Process()
+ parent = psutil.Process()
+ child, grandchild = self.spawn_children_pair()
# forward
- children = me.children(recursive=True)
+ children = parent.children(recursive=True)
self.assertEqual(len(children), 2)
self.assertEqual(children[0], child)
self.assertEqual(children[1], grandchild)
# backward
parents = grandchild.parents()
self.assertEqual(parents[0], child)
- self.assertEqual(parents[1], me)
+ self.assertEqual(parents[1], parent)
def test_suspend_resume(self):
- sproc = self.get_test_subprocess()
- p = psutil.Process(sproc.pid)
+ p = self.spawn_psproc()
p.suspend()
for x in range(100):
if p.status() == psutil.STATUS_STOPPED:
@@ -1192,8 +1165,8 @@ class TestProcess(PsutilTestCase):
p.as_dict(['foo', 'bar'])
def test_oneshot(self):
+ p = psutil.Process()
with mock.patch("psutil._psplatform.Process.cpu_times") as m:
- p = psutil.Process()
with p.oneshot():
p.cpu_times()
p.cpu_times()
@@ -1207,9 +1180,9 @@ class TestProcess(PsutilTestCase):
def test_oneshot_twice(self):
# Test the case where the ctx manager is __enter__ed twice.
# The second __enter__ is supposed to resut in a NOOP.
+ p = psutil.Process()
with mock.patch("psutil._psplatform.Process.cpu_times") as m1:
with mock.patch("psutil._psplatform.Process.oneshot_enter") as m2:
- p = psutil.Process()
with p.oneshot():
p.cpu_times()
p.cpu_times()
@@ -1228,7 +1201,7 @@ class TestProcess(PsutilTestCase):
# Make sure oneshot() cache is nonglobal. Instead it's
# supposed to be bound to the Process instance, see:
# https://github.com/giampaolo/psutil/issues/1373
- p1, p2 = self.create_proc_children_pair()
+ p1, p2 = self.spawn_children_pair()
p1_ppid = p1.ppid()
p2_ppid = p2.ppid()
self.assertNotEqual(p1_ppid, p2_ppid)
@@ -1247,8 +1220,7 @@ class TestProcess(PsutilTestCase):
# >>> time.sleep(2) # time-consuming task, process dies in meantime
# >>> proc.name()
# Refers to Issue #15
- sproc = self.get_test_subprocess()
- p = psutil.Process(sproc.pid)
+ p = self.spawn_psproc()
p.terminate()
p.wait()
if WINDOWS:
@@ -1325,7 +1297,7 @@ class TestProcess(PsutilTestCase):
except (psutil.ZombieProcess, psutil.AccessDenied):
pass
- parent, zombie = self.create_zombie_proc()
+ parent, zombie = self.spawn_zombie()
# A zombie process should always be instantiable
zproc = psutil.Process(zombie.pid)
# ...and at least its status always be querable
@@ -1485,7 +1457,7 @@ class TestProcess(PsutilTestCase):
""")
path = self.get_testfn()
create_exe(path, c_code=code)
- sproc = self.get_test_subprocess(
+ sproc = self.spawn_testproc(
[path], stdin=subprocess.PIPE, stderr=subprocess.PIPE)
p = psutil.Process(sproc.pid)
wait_for_pid(p.pid)
@@ -1503,6 +1475,7 @@ class TestProcess(PsutilTestCase):
if POSIX and os.getuid() == 0:
+
class LimitedUserTestCase(TestProcess):
"""Repeat the previous tests by using a limited user.
Executed only on UNIX and only if the user who run the test script
@@ -1514,7 +1487,7 @@ if POSIX and os.getuid() == 0:
PROCESS_GID = os.getgid()
def __init__(self, *args, **kwargs):
- TestProcess.__init__(self, *args, **kwargs)
+ super().__init__(*args, **kwargs)
# re-define all existent test methods in order to
# ignore AccessDenied exceptions
for attr in [x for x in dir(self) if x.startswith('test')]:
@@ -1545,8 +1518,8 @@ if POSIX and os.getuid() == 0:
else:
self.fail("exception not raised")
+ @unittest.skipIf(1, "causes problem as root")
def test_zombie_process(self):
- # causes problems if test test suite is run as root
pass
diff --git a/psutil/tests/test_system.py b/psutil/tests/test_system.py
index eda6db01..09f5324f 100644
--- a/psutil/tests/test_system.py
+++ b/psutil/tests/test_system.py
@@ -60,7 +60,7 @@ class TestProcessAPIs(PsutilTestCase):
def test_process_iter(self):
self.assertIn(os.getpid(), [x.pid for x in psutil.process_iter()])
- sproc = self.get_test_subprocess()
+ sproc = self.spawn_testproc()
self.assertIn(sproc.pid, [x.pid for x in psutil.process_iter()])
p = psutil.Process(sproc.pid)
p.kill()
@@ -96,15 +96,15 @@ class TestProcessAPIs(PsutilTestCase):
assert m.called
@unittest.skipIf(PYPY and WINDOWS,
- "get_test_subprocess() unreliable on PYPY + WINDOWS")
+ "spawn_testproc() unreliable on PYPY + WINDOWS")
def test_wait_procs(self):
def callback(p):
pids.append(p.pid)
pids = []
- sproc1 = self.get_test_subprocess()
- sproc2 = self.get_test_subprocess()
- sproc3 = self.get_test_subprocess()
+ sproc1 = self.spawn_testproc()
+ sproc2 = self.spawn_testproc()
+ sproc3 = self.spawn_testproc()
procs = [psutil.Process(x.pid) for x in (sproc1, sproc2, sproc3)]
self.assertRaises(ValueError, psutil.wait_procs, procs, timeout=-1)
self.assertRaises(TypeError, psutil.wait_procs, procs, callback=1)
@@ -153,18 +153,18 @@ class TestProcessAPIs(PsutilTestCase):
self.assertTrue(hasattr(p, 'returncode'))
@unittest.skipIf(PYPY and WINDOWS,
- "get_test_subprocess() unreliable on PYPY + WINDOWS")
+ "spawn_testproc() unreliable on PYPY + WINDOWS")
def test_wait_procs_no_timeout(self):
- sproc1 = self.get_test_subprocess()
- sproc2 = self.get_test_subprocess()
- sproc3 = self.get_test_subprocess()
+ sproc1 = self.spawn_testproc()
+ sproc2 = self.spawn_testproc()
+ sproc3 = self.spawn_testproc()
procs = [psutil.Process(x.pid) for x in (sproc1, sproc2, sproc3)]
for p in procs:
p.terminate()
gone, alive = psutil.wait_procs(procs)
def test_pid_exists(self):
- sproc = self.get_test_subprocess()
+ sproc = self.spawn_testproc()
self.assertTrue(psutil.pid_exists(sproc.pid))
p = psutil.Process(sproc.pid)
p.kill()
diff --git a/psutil/tests/test_testutils.py b/psutil/tests/test_testutils.py
index e470c1b8..01176b7d 100644
--- a/psutil/tests/test_testutils.py
+++ b/psutil/tests/test_testutils.py
@@ -211,7 +211,7 @@ class TestFSTestUtils(PsutilTestCase):
class TestProcessUtils(PsutilTestCase):
def test_reap_children(self):
- subp = self.get_test_subprocess()
+ subp = self.spawn_testproc()
p = psutil.Process(subp.pid)
assert p.is_running()
reap_children()
@@ -219,8 +219,8 @@ class TestProcessUtils(PsutilTestCase):
assert not psutil.tests._pids_started
assert not psutil.tests._subprocesses_started
- def test_create_proc_children_pair(self):
- child, grandchild = self.create_proc_children_pair()
+ def test_spawn_children_pair(self):
+ child, grandchild = self.spawn_children_pair()
self.assertNotEqual(child.pid, grandchild.pid)
assert child.is_running()
assert grandchild.is_running()
@@ -241,18 +241,18 @@ class TestProcessUtils(PsutilTestCase):
assert not grandchild.is_running()
@unittest.skipIf(not POSIX, "POSIX only")
- def test_create_zombie_proc(self):
- parent, zombie = self.create_zombie_proc()
+ def test_spawn_zombie(self):
+ parent, zombie = self.spawn_zombie()
self.assertEqual(zombie.status(), psutil.STATUS_ZOMBIE)
def test_terminate(self):
# by subprocess.Popen
- p = self.get_test_subprocess()
+ p = self.spawn_testproc()
terminate(p)
assert not psutil.pid_exists(p.pid)
terminate(p)
# by psutil.Process
- p = psutil.Process(self.get_test_subprocess().pid)
+ p = psutil.Process(self.spawn_testproc().pid)
terminate(p)
assert not psutil.pid_exists(p.pid)
terminate(p)
@@ -263,13 +263,13 @@ class TestProcessUtils(PsutilTestCase):
assert not psutil.pid_exists(p.pid)
terminate(p)
# by PID
- pid = self.get_test_subprocess().pid
+ pid = self.spawn_testproc().pid
terminate(pid)
assert not psutil.pid_exists(pid)
terminate(pid)
# zombie
if POSIX:
- parent, zombie = self.create_zombie_proc()
+ parent, zombie = self.spawn_zombie()
terminate(parent)
terminate(zombie)
assert not psutil.pid_exists(parent.pid)
diff --git a/psutil/tests/test_unicode.py b/psutil/tests/test_unicode.py
index 3f55e797..e43d5326 100644
--- a/psutil/tests/test_unicode.py
+++ b/psutil/tests/test_unicode.py
@@ -92,7 +92,7 @@ from psutil.tests import chdir
from psutil.tests import CIRRUS
from psutil.tests import copyload_shared_lib
from psutil.tests import create_exe
-from psutil.tests import get_test_subprocess
+from psutil.tests import spawn_testproc
from psutil.tests import get_testfn
from psutil.tests import HAS_CONNECTIONS_UNIX
from psutil.tests import HAS_ENVIRON
@@ -142,7 +142,7 @@ def subprocess_supports_unicode(suffix):
try:
safe_rmpath(testfn)
create_exe(testfn)
- sproc = get_test_subprocess(cmd=[testfn])
+ sproc = spawn_testproc(cmd=[testfn])
except UnicodeEncodeError:
return False
else:
@@ -177,7 +177,7 @@ class _BaseFSAPIsTests(object):
# ---
def test_proc_exe(self):
- subp = self.get_test_subprocess(cmd=[self.funky_name])
+ subp = self.spawn_testproc(cmd=[self.funky_name])
p = psutil.Process(subp.pid)
exe = p.exe()
self.assertIsInstance(exe, str)
@@ -186,14 +186,14 @@ class _BaseFSAPIsTests(object):
os.path.normcase(self.funky_name))
def test_proc_name(self):
- subp = self.get_test_subprocess(cmd=[self.funky_name])
+ subp = self.spawn_testproc(cmd=[self.funky_name])
name = psutil.Process(subp.pid).name()
self.assertIsInstance(name, str)
if self.expect_exact_path_match():
self.assertEqual(name, os.path.basename(self.funky_name))
def test_proc_cmdline(self):
- subp = self.get_test_subprocess(cmd=[self.funky_name])
+ subp = self.spawn_testproc(cmd=[self.funky_name])
p = psutil.Process(subp.pid)
cmdline = p.cmdline()
for part in cmdline:
@@ -350,7 +350,7 @@ class TestNonFSAPIS(PsutilTestCase):
env = os.environ.copy()
funky_str = UNICODE_SUFFIX if PY3 else 'รจ'
env['FUNNY_ARG'] = funky_str
- sproc = self.get_test_subprocess(env=env)
+ sproc = self.spawn_testproc(env=env)
p = psutil.Process(sproc.pid)
env = p.environ()
for k, v in env.items():
diff --git a/psutil/tests/test_windows.py b/psutil/tests/test_windows.py
index 057c2982..7387dfb7 100755
--- a/psutil/tests/test_windows.py
+++ b/psutil/tests/test_windows.py
@@ -24,7 +24,7 @@ from psutil import WINDOWS
from psutil._compat import FileNotFoundError
from psutil._compat import super
from psutil.tests import APPVEYOR
-from psutil.tests import get_test_subprocess
+from psutil.tests import spawn_testproc
from psutil.tests import HAS_BATTERY
from psutil.tests import mock
from psutil.tests import PsutilTestCase
@@ -304,7 +304,7 @@ class TestProcess(PsutilTestCase):
@classmethod
def setUpClass(cls):
- cls.pid = get_test_subprocess().pid
+ cls.pid = spawn_testproc().pid
@classmethod
def tearDownClass(cls):
@@ -384,7 +384,7 @@ class TestProcess(PsutilTestCase):
@unittest.skipIf(not sys.version_info >= (2, 7),
"CTRL_* signals not supported")
def test_ctrl_signals(self):
- p = psutil.Process(self.get_test_subprocess().pid)
+ p = psutil.Process(self.spawn_testproc().pid)
p.send_signal(signal.CTRL_C_EVENT)
p.send_signal(signal.CTRL_BREAK_EVENT)
p.kill()
@@ -533,7 +533,7 @@ class TestProcessWMI(TestCase):
@classmethod
def setUpClass(cls):
- cls.pid = get_test_subprocess().pid
+ cls.pid = spawn_testproc().pid
@classmethod
def tearDownClass(cls):
@@ -607,7 +607,7 @@ class TestDualProcessImplementation(TestCase):
@classmethod
def setUpClass(cls):
- cls.pid = get_test_subprocess().pid
+ cls.pid = spawn_testproc().pid
@classmethod
def tearDownClass(cls):
@@ -721,11 +721,11 @@ class RemoteProcessTestCase(PsutilTestCase):
super().setUp()
env = os.environ.copy()
env["THINK_OF_A_NUMBER"] = str(os.getpid())
- self.proc32 = self.get_test_subprocess(
+ self.proc32 = self.spawn_testproc(
[self.python32] + self.test_args,
env=env,
stdin=subprocess.PIPE)
- self.proc64 = self.get_test_subprocess(
+ self.proc64 = self.spawn_testproc(
[self.python64] + self.test_args,
env=env,
stdin=subprocess.PIPE)