diff options
Diffstat (limited to 'test')
-rw-r--r-- | test/README | 15 | ||||
-rw-r--r-- | test/README.rst | 21 | ||||
-rw-r--r-- | test/_bsd.py | 11 | ||||
-rw-r--r-- | test/_linux.py | 162 | ||||
-rw-r--r-- | test/_osx.py | 9 | ||||
-rw-r--r-- | test/_posix.py | 7 | ||||
-rw-r--r-- | test/_sunos.py | 10 | ||||
-rw-r--r-- | test/_windows.py | 18 | ||||
-rw-r--r-- | test/test_memory_leaks.py | 39 | ||||
-rw-r--r-- | test/test_psutil.py | 131 |
10 files changed, 361 insertions, 62 deletions
diff --git a/test/README b/test/README deleted file mode 100644 index 801f93d1..00000000 --- a/test/README +++ /dev/null @@ -1,15 +0,0 @@ -- The recommended way to run tests (also on Windows) is to cd into parent - directory and run: - - make test - -- If you're on Python < 2.7 unittest2 module must be installed first: - https://pypi.python.org/pypi/unittest2 - -- The main test script is test_psutil.py, which also imports platform-specific - _*.py scripts (which should be ignored). - -- test_memory_leaks.py looks for memory leaks into C extension modules and must - be run separately with: - - make memtest diff --git a/test/README.rst b/test/README.rst new file mode 100644 index 00000000..3f2a468e --- /dev/null +++ b/test/README.rst @@ -0,0 +1,21 @@ +- The recommended way to run tests (also on Windows) is to cd into parent + directory and run ``make test`` + +- Dependencies for running tests: + - python 2.6: ipaddress, mock, unittest2 + - python 2.7: ipaddress, mock + - python 3.2: ipaddress, mock + - python 3.3: ipaddress + - python >= 3.4: no deps required + +- The main test script is ``test_psutil.py``, which also imports platform-specific + ``_*.py`` scripts (which should be ignored). + +- ``test_memory_leaks.py`` looks for memory leaks into C extension modules and must + be run separately with ``make test-memleaks``. + +- To run tests on all supported Python version install tox (pip install tox) + then run ``tox``. + +- Every time a commit is pushed tests are automatically run on Travis: + https://travis-ci.org/giampaolo/psutil/ diff --git a/test/_bsd.py b/test/_bsd.py index 9dfeb493..e4a3225d 100644 --- a/test/_bsd.py +++ b/test/_bsd.py @@ -16,7 +16,7 @@ import time import psutil from psutil._compat import PY3 -from test_psutil import (TOLERANCE, sh, get_test_subprocess, which, +from test_psutil import (TOLERANCE, BSD, sh, get_test_subprocess, which, retry_before_failing, reap_children, unittest) @@ -50,6 +50,7 @@ def muse(field): return int(line.split()[1]) +@unittest.skipUnless(BSD, "not a BSD system") class BSDSpecificTestCase(unittest.TestCase): @classmethod @@ -186,6 +187,10 @@ class BSDSpecificTestCase(unittest.TestCase): self.assertAlmostEqual(psutil.virtual_memory().buffers, syst, delta=TOLERANCE) + def test_cpu_count_logical(self): + syst = sysctl("hw.ncpu") + self.assertEqual(psutil.cpu_count(logical=True), syst) + # --- virtual_memory(); tests against muse @unittest.skipUnless(MUSE_AVAILABLE, "muse cmdline tool is not available") @@ -236,12 +241,12 @@ class BSDSpecificTestCase(unittest.TestCase): delta=TOLERANCE) -def test_main(): +def main(): test_suite = unittest.TestSuite() test_suite.addTest(unittest.makeSuite(BSDSpecificTestCase)) result = unittest.TextTestRunner(verbosity=2).run(test_suite) return result.wasSuccessful() if __name__ == '__main__': - if not test_main(): + if not main(): sys.exit(1) diff --git a/test/_linux.py b/test/_linux.py index 1ba8fb94..493c1491 100644 --- a/test/_linux.py +++ b/test/_linux.py @@ -7,8 +7,8 @@ """Linux specific tests. These are implicitly run by test_psutil.py.""" from __future__ import division -import array import contextlib +import errno import fcntl import os import pprint @@ -16,14 +16,22 @@ import re import socket import struct import sys +import tempfile import time +import warnings -from test_psutil import POSIX, TOLERANCE, TRAVIS +try: + from unittest import mock # py3 +except ImportError: + import mock # requires "pip install mock" + +from test_psutil import POSIX, TOLERANCE, TRAVIS, LINUX from test_psutil import (skip_on_not_implemented, sh, get_test_subprocess, retry_before_failing, get_kernel_version, unittest, - which) + which, call_until) import psutil +import psutil._pslinux from psutil._compat import PY3 @@ -61,6 +69,7 @@ def get_mac_address(ifname): return ''.join(['%02x:' % ord(char) for char in info[18:24]])[:-1] +@unittest.skipUnless(LINUX, "not a Linux system") class LinuxSpecificTestCase(unittest.TestCase): @unittest.skipIf( @@ -205,6 +214,149 @@ class LinuxSpecificTestCase(unittest.TestCase): self.assertEqual(len(nics), found, msg="%s\n---\n%s" % ( pprint.pformat(nics), out)) + @unittest.skipUnless(which("nproc"), "nproc utility not available") + def test_cpu_count_logical_w_nproc(self): + num = int(sh("nproc --all")) + self.assertEqual(psutil.cpu_count(logical=True), num) + + @unittest.skipUnless(which("lscpu"), "lscpu utility not available") + def test_cpu_count_logical_w_lscpu(self): + out = sh("lscpu -p") + num = len([x for x in out.split('\n') if not x.startswith('#')]) + self.assertEqual(psutil.cpu_count(logical=True), num) + + # --- mocked tests + + def test_virtual_memory_mocked_warnings(self): + with mock.patch('psutil._pslinux.open', create=True) as m: + with warnings.catch_warnings(record=True) as ws: + warnings.simplefilter("always") + ret = psutil._pslinux.virtual_memory() + assert m.called + self.assertEqual(len(ws), 1) + w = ws[0] + self.assertTrue(w.filename.endswith('psutil/_pslinux.py')) + self.assertIn( + "'cached', 'active' and 'inactive' memory stats couldn't " + "be determined", str(w.message)) + self.assertEqual(ret.cached, 0) + self.assertEqual(ret.active, 0) + self.assertEqual(ret.inactive, 0) + + def test_swap_memory_mocked_warnings(self): + with mock.patch('psutil._pslinux.open', create=True) as m: + with warnings.catch_warnings(record=True) as ws: + warnings.simplefilter("always") + ret = psutil._pslinux.swap_memory() + assert m.called + self.assertEqual(len(ws), 1) + w = ws[0] + self.assertTrue(w.filename.endswith('psutil/_pslinux.py')) + self.assertIn( + "'sin' and 'sout' swap memory stats couldn't " + "be determined", str(w.message)) + self.assertEqual(ret.sin, 0) + self.assertEqual(ret.sout, 0) + + def test_cpu_count_logical_mocked(self): + import psutil._pslinux + original = psutil._pslinux.cpu_count_logical() + with mock.patch( + 'psutil._pslinux.os.sysconf', side_effect=ValueError) as m: + # Here we want to mock os.sysconf("SC_NPROCESSORS_ONLN") in + # order to test /proc/cpuinfo parsing. + # We might also test /proc/stat parsing but mocking open() + # like that is too difficult. + self.assertEqual(psutil._pslinux.cpu_count_logical(), original) + assert m.called + # Have open() return emtpy data and make sure None is returned + # ('cause we want to mimick os.cpu_count()) + with mock.patch('psutil._pslinux.open', create=True) as m: + self.assertIsNone(psutil._pslinux.cpu_count_logical()) + assert m.called + + def test_cpu_count_physical_mocked(self): + # Have open() return emtpy data and make sure None is returned + # ('cause we want to mimick os.cpu_count()) + with mock.patch('psutil._pslinux.open', create=True) as m: + self.assertIsNone(psutil._pslinux.cpu_count_physical()) + assert m.called + + def test_proc_open_files_file_gone(self): + # simulates a file which gets deleted during open_files() + # execution + p = psutil.Process() + files = p.open_files() + with tempfile.NamedTemporaryFile(): + # give the kernel some time to see the new file + call_until(p.open_files, "len(ret) != %i" % len(files)) + with mock.patch('psutil._pslinux.os.readlink', + side_effect=OSError(errno.ENOENT, "")) as m: + files = p.open_files() + assert not files + assert m.called + # also simulate the case where os.readlink() returns EINVAL + # in which case psutil is supposed to 'continue' + with mock.patch('psutil._pslinux.os.readlink', + side_effect=OSError(errno.EINVAL, "")) as m: + self.assertEqual(p.open_files(), []) + assert m.called + + def test_proc_terminal_mocked(self): + with mock.patch('psutil._pslinux._psposix._get_terminal_map', + return_value={}) as m: + self.assertIsNone(psutil._pslinux.Process(os.getpid()).terminal()) + assert m.called + + def test_proc_num_ctx_switches_mocked(self): + with mock.patch('psutil._pslinux.open', create=True) as m: + self.assertRaises( + NotImplementedError, + psutil._pslinux.Process(os.getpid()).num_ctx_switches) + assert m.called + + def test_proc_num_threads_mocked(self): + with mock.patch('psutil._pslinux.open', create=True) as m: + self.assertRaises( + NotImplementedError, + psutil._pslinux.Process(os.getpid()).num_threads) + assert m.called + + def test_proc_ppid_mocked(self): + with mock.patch('psutil._pslinux.open', create=True) as m: + self.assertRaises( + NotImplementedError, + psutil._pslinux.Process(os.getpid()).ppid) + assert m.called + + def test_proc_uids_mocked(self): + with mock.patch('psutil._pslinux.open', create=True) as m: + self.assertRaises( + NotImplementedError, + psutil._pslinux.Process(os.getpid()).uids) + assert m.called + + def test_proc_gids_mocked(self): + with mock.patch('psutil._pslinux.open', create=True) as m: + self.assertRaises( + NotImplementedError, + psutil._pslinux.Process(os.getpid()).gids) + assert m.called + + def test_proc_io_counters_mocked(self): + with mock.patch('psutil._pslinux.open', create=True) as m: + self.assertRaises( + NotImplementedError, + psutil._pslinux.Process(os.getpid()).io_counters) + assert m.called + + def test_boot_time_mocked(self): + with mock.patch('psutil._pslinux.open', create=True) as m: + self.assertRaises( + RuntimeError, + psutil._pslinux.boot_time) + assert m.called + # --- tests for specific kernel versions @unittest.skipUnless( @@ -241,12 +393,12 @@ class LinuxSpecificTestCase(unittest.TestCase): self.assertTrue(hasattr(psutil, "RLIMIT_SIGPENDING")) -def test_main(): +def main(): test_suite = unittest.TestSuite() test_suite.addTest(unittest.makeSuite(LinuxSpecificTestCase)) result = unittest.TextTestRunner(verbosity=2).run(test_suite) return result.wasSuccessful() if __name__ == '__main__': - if not test_main(): + if not main(): sys.exit(1) diff --git a/test/_osx.py b/test/_osx.py index 784f00e0..6e6e4380 100644 --- a/test/_osx.py +++ b/test/_osx.py @@ -15,8 +15,8 @@ import time import psutil from psutil._compat import PY3 -from test_psutil import (TOLERANCE, sh, get_test_subprocess, reap_children, - retry_before_failing, unittest) +from test_psutil import (TOLERANCE, OSX, sh, get_test_subprocess, + reap_children, retry_before_failing, unittest) PAGESIZE = os.sysconf("SC_PAGE_SIZE") @@ -47,6 +47,7 @@ def vm_stat(field): return int(re.search('\d+', line).group(0)) * PAGESIZE +@unittest.skipUnless(OSX, "not an OSX system") class OSXSpecificTestCase(unittest.TestCase): @classmethod @@ -148,12 +149,12 @@ class OSXSpecificTestCase(unittest.TestCase): self.assertEqual(tot1, tot2) -def test_main(): +def main(): test_suite = unittest.TestSuite() test_suite.addTest(unittest.makeSuite(OSXSpecificTestCase)) result = unittest.TextTestRunner(verbosity=2).run(test_suite) return result.wasSuccessful() if __name__ == '__main__': - if not test_main(): + if not main(): sys.exit(1) diff --git a/test/_posix.py b/test/_posix.py index 4439f2c7..2a263a3f 100644 --- a/test/_posix.py +++ b/test/_posix.py @@ -15,7 +15,7 @@ import time import psutil from psutil._compat import PY3, callable -from test_psutil import LINUX, SUNOS, OSX, BSD, PYTHON +from test_psutil import LINUX, SUNOS, OSX, BSD, PYTHON, POSIX from test_psutil import (get_test_subprocess, skip_on_access_denied, retry_before_failing, reap_children, sh, unittest, get_kernel_version, wait_for_pid) @@ -42,6 +42,7 @@ def ps(cmd): return output +@unittest.skipUnless(POSIX, "not a POSIX system") class PosixSpecificTestCase(unittest.TestCase): """Compare psutil results against 'ps' command line utility.""" @@ -243,12 +244,12 @@ class PosixSpecificTestCase(unittest.TestCase): self.fail('\n' + '\n'.join(failures)) -def test_main(): +def main(): test_suite = unittest.TestSuite() test_suite.addTest(unittest.makeSuite(PosixSpecificTestCase)) result = unittest.TextTestRunner(verbosity=2).run(test_suite) return result.wasSuccessful() if __name__ == '__main__': - if not test_main(): + if not main(): sys.exit(1) diff --git a/test/_sunos.py b/test/_sunos.py index 7fdc50b6..3d54ccd8 100644 --- a/test/_sunos.py +++ b/test/_sunos.py @@ -7,15 +7,17 @@ """Sun OS specific tests. These are implicitly run by test_psutil.py.""" import sys +import os -from test_psutil import sh, unittest +from test_psutil import SUNOS, sh, unittest import psutil +@unittest.skipUnless(SUNOS, "not a SunOS system") class SunOSSpecificTestCase(unittest.TestCase): def test_swap_memory(self): - out = sh('swap -l -k') + out = sh('env PATH=/usr/sbin:/sbin:%s swap -l -k' % os.environ['PATH']) lines = out.strip().split('\n')[1:] if not lines: raise ValueError('no swap device(s) configured') @@ -35,12 +37,12 @@ class SunOSSpecificTestCase(unittest.TestCase): self.assertEqual(psutil_swap.free, free) -def test_main(): +def main(): test_suite = unittest.TestSuite() test_suite.addTest(unittest.makeSuite(SunOSSpecificTestCase)) result = unittest.TextTestRunner(verbosity=2).run(test_suite) return result.wasSuccessful() if __name__ == '__main__': - if not test_main(): + if not main(): sys.exit(1) diff --git a/test/_windows.py b/test/_windows.py index a3699398..a0a22052 100644 --- a/test/_windows.py +++ b/test/_windows.py @@ -15,7 +15,7 @@ import sys import time import traceback -from test_psutil import (get_test_subprocess, reap_children, unittest) +from test_psutil import WINDOWS, get_test_subprocess, reap_children, unittest try: import wmi @@ -28,16 +28,18 @@ except ImportError: win32api = win32con = None from psutil._compat import PY3, callable, long -from psutil._pswindows import ACCESS_DENIED_SET -import psutil._psutil_windows as _psutil_windows import psutil +cext = psutil._psplatform.cext + + def wrap_exceptions(fun): def wrapper(self, *args, **kwargs): try: return fun(self, *args, **kwargs) except OSError as err: + from psutil._pswindows import ACCESS_DENIED_SET if err.errno in ACCESS_DENIED_SET: raise psutil.AccessDenied(None, None) if err.errno == errno.ESRCH: @@ -46,6 +48,7 @@ def wrap_exceptions(fun): return wrapper +@unittest.skipUnless(WINDOWS, "not a Windows system") class WindowsSpecificTestCase(unittest.TestCase): @classmethod @@ -281,6 +284,7 @@ class WindowsSpecificTestCase(unittest.TestCase): self.fail('\n' + '\n'.join(failures)) +@unittest.skipUnless(WINDOWS, "not a Windows system") class TestDualProcessImplementation(unittest.TestCase): fun_names = [ # function name, tolerance @@ -324,7 +328,7 @@ class TestDualProcessImplementation(unittest.TestCase): failures = [] for p in psutil.process_iter(): try: - nt = ntpinfo(*_psutil_windows.proc_info(p.pid)) + nt = ntpinfo(*cext.proc_info(p.pid)) except psutil.NoSuchProcess: continue assert_ge_0(nt) @@ -334,7 +338,7 @@ class TestDualProcessImplementation(unittest.TestCase): continue if name == 'proc_create_time' and p.pid in (0, 4): continue - meth = wrap_exceptions(getattr(_psutil_windows, name)) + meth = wrap_exceptions(getattr(cext, name)) try: ret = meth(p.pid) except (psutil.NoSuchProcess, psutil.AccessDenied): @@ -356,7 +360,7 @@ class TestDualProcessImplementation(unittest.TestCase): compare_with_tolerance(ret[3], nt.io_wbytes, tolerance) elif name == 'proc_memory_info': try: - rawtupl = _psutil_windows.proc_memory_info_2(p.pid) + rawtupl = cext.proc_memory_info_2(p.pid) except psutil.NoSuchProcess: continue compare_with_tolerance(ret, rawtupl, tolerance) @@ -385,7 +389,7 @@ class TestDualProcessImplementation(unittest.TestCase): # process no longer exists ZOMBIE_PID = max(psutil.pids()) + 5000 for name, _ in self.fun_names: - meth = wrap_exceptions(getattr(_psutil_windows, name)) + meth = wrap_exceptions(getattr(cext, name)) self.assertRaises(psutil.NoSuchProcess, meth, ZOMBIE_PID) diff --git a/test/test_memory_leaks.py b/test/test_memory_leaks.py index 5a31ac1f..6f02dc0a 100644 --- a/test/test_memory_leaks.py +++ b/test/test_memory_leaks.py @@ -10,6 +10,7 @@ functions many times and compare process memory usage before and after the calls. It might produce false positives. """ +import functools import gc import os import socket @@ -20,7 +21,7 @@ import time import psutil import psutil._common -from psutil._compat import xrange +from psutil._compat import xrange, callable from test_psutil import (WINDOWS, POSIX, OSX, LINUX, SUNOS, BSD, TESTFN, RLIMIT_SUPPORT, TRAVIS) from test_psutil import (reap_children, supports_ipv6, safe_remove, @@ -92,7 +93,7 @@ class Base(unittest.TestCase): def get_mem(self): return psutil.Process().memory_info()[0] - def call(self, *args, **kwargs): + def call(self, function, *args, **kwargs): raise NotImplementedError("must be implemented in subclass") @@ -106,15 +107,25 @@ class TestProcessObjectLeaks(Base): reap_children() def call(self, function, *args, **kwargs): - meth = getattr(self.proc, function) - if '_exc' in kwargs: - exc = kwargs.pop('_exc') - self.assertRaises(exc, meth, *args, **kwargs) + if callable(function): + if '_exc' in kwargs: + exc = kwargs.pop('_exc') + self.assertRaises(exc, function, *args, **kwargs) + else: + try: + function(*args, **kwargs) + except psutil.Error: + pass else: - try: - meth(*args, **kwargs) - except psutil.Error: - pass + meth = getattr(self.proc, function) + if '_exc' in kwargs: + exc = kwargs.pop('_exc') + self.assertRaises(exc, meth, *args, **kwargs) + else: + try: + meth(*args, **kwargs) + except psutil.Error: + pass @skip_if_linux() def test_name(self): @@ -165,8 +176,10 @@ class TestProcessObjectLeaks(Base): value = psutil.Process().ionice() self.execute('ionice', value) else: + from psutil._pslinux import cext self.execute('ionice', psutil.IOPRIO_CLASS_NONE) - self.execute_w_exc(OSError, 'ionice', -1) + fun = functools.partial(cext.proc_ioprio_set, os.getpid(), -1, 0) + self.execute_w_exc(OSError, fun) @unittest.skipIf(OSX or SUNOS, "feature not supported on this platform") @skip_if_linux() @@ -417,7 +430,7 @@ class TestModuleFunctionsLeaks(Base): self.execute('net_if_stats') -def test_main(): +def main(): test_suite = unittest.TestSuite() tests = [TestProcessObjectLeaksZombie, TestProcessObjectLeaks, @@ -428,5 +441,5 @@ def test_main(): return result.wasSuccessful() if __name__ == '__main__': - if not test_main(): + if not main(): sys.exit(1) diff --git a/test/test_psutil.py b/test/test_psutil.py index b53ff680..91423841 100644 --- a/test/test_psutil.py +++ b/test/test_psutil.py @@ -22,6 +22,7 @@ import contextlib import datetime import errno import functools +import imp import json import os import pickle @@ -45,6 +46,10 @@ try: import ipaddress # python >= 3.3 except ImportError: ipaddress = None +try: + from unittest import mock # py3 +except ImportError: + import mock # requires "pip install mock" import psutil from psutil._compat import PY3, callable, long, unicode @@ -579,6 +584,7 @@ class TestSystemAPIs(unittest.TestCase): sproc3 = get_test_subprocess() procs = [psutil.Process(x.pid) for x in (sproc1, sproc2, sproc3)] self.assertRaises(ValueError, psutil.wait_procs, procs, timeout=-1) + self.assertRaises(TypeError, psutil.wait_procs, procs, callback=1) t = time.time() gone, alive = psutil.wait_procs(procs, timeout=0.01, callback=callback) @@ -674,6 +680,8 @@ class TestSystemAPIs(unittest.TestCase): self.assertFalse(psutil.pid_exists(sproc.pid)) self.assertFalse(psutil.pid_exists(-1)) self.assertEqual(psutil.pid_exists(0), 0 in psutil.pids()) + # pid 0 + psutil.pid_exists(0) == 0 in psutil.pids() def test_pid_exists_2(self): reap_children() @@ -712,10 +720,11 @@ class TestSystemAPIs(unittest.TestCase): self.assertEqual(logical, len(psutil.cpu_times(percpu=True))) self.assertGreaterEqual(logical, 1) # - with open("/proc/cpuinfo") as fd: - cpuinfo_data = fd.read() - if "physical id" not in cpuinfo_data: - raise unittest.SkipTest("cpuinfo doesn't include physical id") + if LINUX: + with open("/proc/cpuinfo") as fd: + cpuinfo_data = fd.read() + if "physical id" not in cpuinfo_data: + raise unittest.SkipTest("cpuinfo doesn't include physical id") physical = psutil.cpu_count(logical=False) self.assertGreaterEqual(physical, 1) self.assertGreaterEqual(logical, physical) @@ -1127,13 +1136,30 @@ class TestProcess(unittest.TestCase): def test_send_signal(self): sig = signal.SIGKILL if POSIX else signal.SIGTERM sproc = get_test_subprocess() - test_pid = sproc.pid - p = psutil.Process(test_pid) + p = psutil.Process(sproc.pid) p.send_signal(sig) exit_sig = p.wait() - self.assertFalse(psutil.pid_exists(test_pid)) + self.assertFalse(psutil.pid_exists(p.pid)) if POSIX: self.assertEqual(exit_sig, sig) + # + sproc = get_test_subprocess() + p = psutil.Process(sproc.pid) + p.send_signal(sig) + with mock.patch('psutil.os.kill', + side_effect=OSError(errno.ESRCH, "")) as fun: + with self.assertRaises(psutil.NoSuchProcess): + p.send_signal(sig) + assert fun.called + # + sproc = get_test_subprocess() + p = psutil.Process(sproc.pid) + p.send_signal(sig) + with mock.patch('psutil.os.kill', + side_effect=OSError(errno.EPERM, "")) as fun: + with self.assertRaises(psutil.AccessDenied): + p.send_signal(sig) + assert fun.called def test_wait(self): # check exit code signal @@ -1352,7 +1378,20 @@ class TestProcess(unittest.TestCase): ioclass, value = p.ionice() self.assertEqual(ioclass, 2) self.assertEqual(value, 7) + # self.assertRaises(ValueError, p.ionice, 2, 10) + self.assertRaises(ValueError, p.ionice, 2, -1) + self.assertRaises(ValueError, p.ionice, 4) + self.assertRaises(TypeError, p.ionice, 2, "foo") + self.assertRaisesRegexp( + ValueError, "can't specify value with IOPRIO_CLASS_NONE", + p.ionice, psutil.IOPRIO_CLASS_NONE, 1) + self.assertRaisesRegexp( + ValueError, "can't specify value with IOPRIO_CLASS_IDLE", + p.ionice, psutil.IOPRIO_CLASS_IDLE, 1) + self.assertRaisesRegexp( + ValueError, "'ioclass' argument must be specified", + p.ionice, value=1) finally: p.ionice(IOPRIO_CLASS_NONE) else: @@ -1397,6 +1436,12 @@ class TestProcess(unittest.TestCase): p = psutil.Process(sproc.pid) p.rlimit(psutil.RLIMIT_NOFILE, (5, 5)) self.assertEqual(p.rlimit(psutil.RLIMIT_NOFILE), (5, 5)) + # If pid is 0 prlimit() applies to the calling process and + # we don't want that. + with self.assertRaises(ValueError): + psutil._psplatform.Process(0).rlimit(0) + with self.assertRaises(ValueError): + p.rlimit(psutil.RLIMIT_NOFILE, (5, 5, 5)) def test_num_threads(self): # on certain platforms such as Linux we might test for exact @@ -1542,6 +1587,30 @@ class TestProcess(unittest.TestCase): pyexe = os.path.basename(os.path.realpath(sys.executable)).lower() assert pyexe.startswith(name), (pyexe, name) + @unittest.skipUnless(POSIX, "posix only") + # TODO: add support for other compilers + @unittest.skipUnless(which("gcc"), "gcc not available") + def test_prog_w_funky_name(self): + # Test that name(), exe() and cmdline() correctly handle programs + # with funky chars such as spaces and ")", see: + # https://github.com/giampaolo/psutil/issues/628 + funky_name = "/tmp/foo bar )" + _, c_file = tempfile.mkstemp(prefix='psutil-', suffix='.c', dir="/tmp") + self.addCleanup(lambda: safe_remove(c_file)) + self.addCleanup(lambda: safe_remove(funky_name)) + with open(c_file, "w") as f: + f.write("void main() { pause(); }") + subprocess.check_call(["gcc", c_file, "-o", funky_name]) + sproc = get_test_subprocess( + [funky_name, "arg1", "arg2", "", "arg3", ""]) + p = psutil.Process(sproc.pid) + # ...in order to try to prevent occasional failures on travis + wait_for_pid(p.pid) + self.assertEqual(p.name(), "foo bar )") + self.assertEqual(p.exe(), "/tmp/foo bar )") + self.assertEqual( + p.cmdline(), ["/tmp/foo bar )", "arg1", "arg2", "", "arg3", ""]) + @unittest.skipUnless(POSIX, 'posix only') def test_uids(self): p = psutil.Process() @@ -1613,6 +1682,11 @@ class TestProcess(unittest.TestCase): if POSIX: import pwd self.assertEqual(p.username(), pwd.getpwuid(os.getuid()).pw_name) + with mock.patch("psutil.pwd.getpwuid", + side_effect=KeyError) as fun: + p.username() == str(p.uids().real) + assert fun.called + elif WINDOWS and 'USERNAME' in os.environ: expected_username = os.environ['USERNAME'] expected_domain = os.environ['USERDOMAIN'] @@ -1677,6 +1751,7 @@ class TestProcess(unittest.TestCase): files = p.open_files() self.assertFalse(TESTFN in files) with open(TESTFN, 'w'): + # give the kernel some time to see the new file call_until(p.open_files, "len(ret) != %i" % len(files)) filenames = [x.path for x in p.open_files()] self.assertIn(TESTFN, filenames) @@ -2158,6 +2233,10 @@ class TestProcess(unittest.TestCase): except psutil.AccessDenied: pass + self.assertRaisesRegexp( + ValueError, "preventing sending signal to process with PID 0", + p.send_signal(signal.SIGTERM)) + self.assertIn(p.ppid(), (0, 1)) # self.assertEqual(p.exe(), "") p.cmdline() @@ -2171,7 +2250,6 @@ class TestProcess(unittest.TestCase): except psutil.AccessDenied: pass - # username property try: if POSIX: self.assertEqual(p.username(), 'root') @@ -2198,6 +2276,7 @@ class TestProcess(unittest.TestCase): proc.stdin self.assertTrue(hasattr(proc, 'name')) self.assertTrue(hasattr(proc, 'stdin')) + self.assertTrue(dir(proc)) self.assertRaises(AttributeError, getattr, proc, 'foo') finally: proc.kill() @@ -2545,6 +2624,19 @@ class TestMisc(unittest.TestCase): p.wait() self.assertIn(str(sproc.pid), str(p)) self.assertIn("terminated", str(p)) + # test error conditions + with mock.patch.object(psutil.Process, 'name', + side_effect=psutil.ZombieProcess(1)) as meth: + self.assertIn("zombie", str(p)) + self.assertIn("pid", str(p)) + assert meth.called + with mock.patch.object(psutil.Process, 'name', + side_effect=psutil.AccessDenied) as meth: + self.assertIn("pid", str(p)) + assert meth.called + + def test__repr__(self): + repr(psutil.Process()) def test__eq__(self): p1 = psutil.Process() @@ -2635,6 +2727,29 @@ class TestMisc(unittest.TestCase): check(psutil.disk_usage(os.getcwd())) check(psutil.users()) + def test_setup_script(self): + here = os.path.abspath(os.path.dirname(__file__)) + setup_py = os.path.realpath(os.path.join(here, '..', 'setup.py')) + module = imp.load_source('setup', setup_py) + self.assertRaises(SystemExit, module.setup) + + def test_ad_on_process_creation(self): + # We are supposed to be able to instantiate Process also in case + # of zombie processes or access denied. + with mock.patch.object(psutil.Process, 'create_time', + side_effect=psutil.AccessDenied) as meth: + psutil.Process() + assert meth.called + with mock.patch.object(psutil.Process, 'create_time', + side_effect=psutil.ZombieProcess(1)) as meth: + psutil.Process() + assert meth.called + with mock.patch.object(psutil.Process, 'create_time', + side_effect=ValueError) as meth: + with self.assertRaises(ValueError): + psutil.Process() + assert meth.called + # =================================================================== # --- Example script tests |