diff options
Diffstat (limited to 'psutil/tests')
-rw-r--r-- | psutil/tests/__init__.py | 68 | ||||
-rwxr-xr-x | psutil/tests/__main__.py | 12 | ||||
-rwxr-xr-x | psutil/tests/test_aix.py | 121 | ||||
-rwxr-xr-x | psutil/tests/test_connections.py | 4 | ||||
-rwxr-xr-x | psutil/tests/test_contracts.py | 44 | ||||
-rwxr-xr-x | psutil/tests/test_linux.py | 46 | ||||
-rwxr-xr-x | psutil/tests/test_misc.py | 27 | ||||
-rwxr-xr-x | psutil/tests/test_osx.py | 13 | ||||
-rwxr-xr-x | psutil/tests/test_posix.py | 62 | ||||
-rwxr-xr-x | psutil/tests/test_process.py | 149 | ||||
-rwxr-xr-x | psutil/tests/test_system.py | 8 | ||||
-rwxr-xr-x | psutil/tests/test_unicode.py | 41 |
12 files changed, 452 insertions, 143 deletions
diff --git a/psutil/tests/__init__.py b/psutil/tests/__init__.py index 7d8e5def..0890b6f9 100644 --- a/psutil/tests/__init__.py +++ b/psutil/tests/__init__.py @@ -65,7 +65,7 @@ else: __all__ = [ # constants 'APPVEYOR', 'DEVNULL', 'GLOBAL_TIMEOUT', 'MEMORY_TOLERANCE', 'NO_RETRIES', - 'PYPY', 'PYTHON', 'ROOT_DIR', 'SCRIPTS_DIR', 'TESTFILE_PREFIX', + 'PYPY', 'PYTHON_EXE', 'ROOT_DIR', 'SCRIPTS_DIR', 'TESTFILE_PREFIX', 'TESTFN', 'TESTFN_UNICODE', 'TOX', 'TRAVIS', 'VALID_PROC_STATUSES', 'VERBOSITY', "HAS_CPU_AFFINITY", "HAS_CPU_FREQ", "HAS_ENVIRON", "HAS_PROC_IO_COUNTERS", @@ -159,6 +159,7 @@ HAS_MEMORY_FULL_INFO = 'uss' in psutil.Process().memory_full_info()._fields HAS_MEMORY_MAPS = hasattr(psutil.Process, "memory_maps") HAS_PROC_CPU_NUM = hasattr(psutil.Process, "cpu_num") HAS_RLIMIT = hasattr(psutil.Process, "rlimit") +HAS_THREADS = hasattr(psutil.Process, "threads") HAS_SENSORS_BATTERY = hasattr(psutil, "sensors_battery") HAS_BATTERY = HAS_SENSORS_BATTERY and psutil.sensors_battery() HAS_SENSORS_FANS = hasattr(psutil, "sensors_fans") @@ -166,7 +167,33 @@ HAS_SENSORS_TEMPERATURES = hasattr(psutil, "sensors_temperatures") # --- misc -PYTHON = os.path.realpath(sys.executable) + +def _get_py_exe(): + def attempt(exe): + try: + subprocess.check_call( + [exe, "-V"], stdout=subprocess.PIPE, stderr=subprocess.PIPE) + except Exception: + return None + else: + return exe + + if OSX: + exe = \ + attempt(sys.executable) or \ + attempt(os.path.realpath(sys.executable)) or \ + attempt(which("python%s.%s" % sys.version_info[:2])) or \ + attempt(psutil.Process().exe()) + if not exe: + raise ValueError("can't find python exe real abspath") + return exe + else: + exe = os.path.realpath(sys.executable) + assert os.path.exists(exe), exe + return exe + + +PYTHON_EXE = _get_py_exe() DEVNULL = open(os.devnull, 'r+') VALID_PROC_STATUSES = [getattr(psutil, x) for x in dir(psutil) if x.startswith('STATUS_')] @@ -182,7 +209,11 @@ _testfiles_created = set() def _cleanup_files(): DEVNULL.close() for name in os.listdir(u('.')): - if name.startswith(u(TESTFILE_PREFIX)): + if isinstance(name, unicode): + prefix = u(TESTFILE_PREFIX) + else: + prefix = TESTFILE_PREFIX + if name.startswith(prefix): try: safe_rmpath(name) except Exception: @@ -286,7 +317,7 @@ def get_test_subprocess(cmd=None, **kwds): pyline = "from time import sleep;" \ "open(r'%s', 'w').close();" \ "sleep(60);" % _TESTFN - cmd = [PYTHON, "-c", pyline] + cmd = [PYTHON_EXE, "-c", pyline] sproc = subprocess.Popen(cmd, **kwds) _subprocesses_started.add(sproc) wait_for_file(_TESTFN, delete=True, empty=True) @@ -308,15 +339,14 @@ def create_proc_children_pair(): _TESTFN2 = os.path.basename(_TESTFN) + '2' # need to be relative s = textwrap.dedent("""\ import subprocess, os, sys, time - PYTHON = os.path.realpath(sys.executable) s = "import os, time;" s += "f = open('%s', 'w');" s += "f.write(str(os.getpid()));" s += "f.close();" s += "time.sleep(60);" - subprocess.Popen([PYTHON, '-c', s]) + subprocess.Popen(['%s', '-c', s]) time.sleep(60) - """ % _TESTFN2) + """ % (_TESTFN2, PYTHON_EXE)) # On Windows if we create a subprocess with CREATE_NO_WINDOW flag # set (which is the default) a "conhost.exe" extra process will be # spawned as a child. We don't want that. @@ -382,7 +412,7 @@ def pyrun(src, **kwds): _testfiles_created.add(f.name) f.write(src) f.flush() - subp = get_test_subprocess([PYTHON, f.name], **kwds) + subp = get_test_subprocess([PYTHON_EXE, f.name], **kwds) wait_for_pid(subp.pid) return subp @@ -689,7 +719,7 @@ def create_exe(outpath, c_code=None): if c_code: if not which("gcc"): raise ValueError("gcc is not installed") - if c_code is None: + if isinstance(c_code, bool): # c_code is True c_code = textwrap.dedent( """ #include <unistd.h> @@ -698,6 +728,7 @@ def create_exe(outpath, c_code=None): return 1; } """) + assert isinstance(c_code, str), c_code with tempfile.NamedTemporaryFile( suffix='.c', delete=False, mode='wt') as f: f.write(c_code) @@ -707,7 +738,7 @@ def create_exe(outpath, c_code=None): safe_rmpath(f.name) else: # copy python executable - shutil.copyfile(sys.executable, outpath) + shutil.copyfile(PYTHON_EXE, outpath) if POSIX: st = os.stat(outpath) os.chmod(outpath, st.st_mode | stat.S_IEXEC) @@ -742,16 +773,21 @@ unittest.TestCase = TestCase def _setup_tests(): - assert 'PSUTIL_TESTING' in os.environ - assert psutil._psplatform.cext.py_psutil_testing() + if 'PSUTIL_TESTING' not in os.environ: + # This won't work on Windows but set_testing() below will do it. + os.environ['PSUTIL_TESTING'] = '1' + psutil._psplatform.cext.set_testing() def get_suite(): - testmodules = [os.path.splitext(x)[0] for x in os.listdir(HERE) - if x.endswith('.py') and x.startswith('test_') and not - x.startswith('test_memory_leaks')] + testmods = [os.path.splitext(x)[0] for x in os.listdir(HERE) + if x.endswith('.py') and x.startswith('test_') and not + x.startswith('test_memory_leaks')] + if "WHEELHOUSE_UPLOADER_USERNAME" in os.environ: + testmods = [x for x in testmods if not x.endswith(( + "osx", "posix", "linux"))] suite = unittest.TestSuite() - for tm in testmodules: + for tm in testmods: # ...so that the full test paths are printed on screen tm = "psutil.tests.%s" % tm suite.addTest(unittest.defaultTestLoader.loadTestsFromName(tm)) diff --git a/psutil/tests/__main__.py b/psutil/tests/__main__.py index 475e6b81..2cdf5c42 100755 --- a/psutil/tests/__main__.py +++ b/psutil/tests/__main__.py @@ -21,11 +21,11 @@ try: except ImportError: from urllib2 import urlopen +from psutil.tests import PYTHON_EXE from psutil.tests import run_suite HERE = os.path.abspath(os.path.dirname(__file__)) -PYTHON = os.path.basename(sys.executable) GET_PIP_URL = "https://bootstrap.pypa.io/get-pip.py" TEST_DEPS = [] if sys.version_info[:2] == (2, 6): @@ -54,7 +54,7 @@ def install_pip(): f.flush() print("installing pip") - code = os.system('%s %s --user' % (sys.executable, f.name)) + code = os.system('%s %s --user' % (PYTHON_EXE, f.name)) return code @@ -68,12 +68,12 @@ def install_test_deps(deps=None): opts = "--user" if not is_venv else "" install_pip() code = os.system('%s -m pip install %s --upgrade %s' % ( - sys.executable, opts, " ".join(deps))) + PYTHON_EXE, opts, " ".join(deps))) return code def main(): - usage = "%s -m psutil.tests [opts]" % PYTHON + usage = "%s -m psutil.tests [opts]" % PYTHON_EXE parser = optparse.OptionParser(usage=usage, description="run unit tests") parser.add_option("-i", "--install-deps", action="store_true", default=False, @@ -88,8 +88,8 @@ def main(): try: __import__(dep.split("==")[0]) except ImportError: - sys.exit("%r lib is not installed; run:\n" - "%s -m psutil.tests --install-deps" % (dep, PYTHON)) + sys.exit("%r lib is not installed; run %s -m psutil.tests " + "--install-deps" % (dep, PYTHON_EXE)) run_suite() diff --git a/psutil/tests/test_aix.py b/psutil/tests/test_aix.py new file mode 100755 index 00000000..7a8a4c33 --- /dev/null +++ b/psutil/tests/test_aix.py @@ -0,0 +1,121 @@ +#!/usr/bin/env python + +# Copyright (c) 2009, Giampaolo Rodola' +# Copyright (c) 2017, Arnon Yaari +# All rights reserved. +# Use of this source code is governed by a BSD-style license that can be +# found in the LICENSE file. + +"""AIX specific tests.""" + +import re + +from psutil import AIX +from psutil.tests import run_test_module_by_name +from psutil.tests import sh +from psutil.tests import unittest +import psutil + + +@unittest.skipIf(not AIX, "AIX only") +class AIXSpecificTestCase(unittest.TestCase): + + def test_virtual_memory(self): + out = sh('/usr/bin/svmon -O unit=KB') + re_pattern = "memory\s*" + for field in ("size inuse free pin virtual available mmode").split(): + re_pattern += "(?P<%s>\S+)\s+" % (field,) + matchobj = re.search(re_pattern, out) + + self.assertIsNotNone( + matchobj, "svmon command returned unexpected output") + + KB = 1024 + total = int(matchobj.group("size")) * KB + available = int(matchobj.group("available")) * KB + used = int(matchobj.group("inuse")) * KB + free = int(matchobj.group("free")) * KB + + psutil_result = psutil.virtual_memory() + + # MEMORY_TOLERANCE from psutil.tests is not enough. For some reason + # we're seeing differences of ~1.2 MB. 2 MB is still a good tolerance + # when compared to GBs. + MEMORY_TOLERANCE = 2 * KB * KB # 2 MB + self.assertEqual(psutil_result.total, total) + self.assertAlmostEqual( + psutil_result.used, used, delta=MEMORY_TOLERANCE) + self.assertAlmostEqual( + psutil_result.available, available, delta=MEMORY_TOLERANCE) + self.assertAlmostEqual( + psutil_result.free, free, delta=MEMORY_TOLERANCE) + + def test_swap_memory(self): + out = sh('/usr/sbin/lsps -a') + # From the man page, "The size is given in megabytes" so we assume + # we'll always have 'MB' in the result + # TODO maybe try to use "swap -l" to check "used" too, but its units + # are not guaranteed to be "MB" so parsing may not be consistent + matchobj = re.search("(?P<space>\S+)\s+" + "(?P<vol>\S+)\s+" + "(?P<vg>\S+)\s+" + "(?P<size>\d+)MB", out) + + self.assertIsNotNone( + matchobj, "lsps command returned unexpected output") + + total_mb = int(matchobj.group("size")) + MB = 1024 ** 2 + psutil_result = psutil.swap_memory() + # we divide our result by MB instead of multiplying the lsps value by + # MB because lsps may round down, so we round down too + self.assertEqual(int(psutil_result.total / MB), total_mb) + + def test_cpu_stats(self): + out = sh('/usr/bin/mpstat -a') + + re_pattern = "ALL\s*" + for field in ("min maj mpcs mpcr dev soft dec ph cs ics bound rq " + "push S3pull S3grd S0rd S1rd S2rd S3rd S4rd S5rd " + "sysc").split(): + re_pattern += "(?P<%s>\S+)\s+" % (field,) + matchobj = re.search(re_pattern, out) + + self.assertIsNotNone( + matchobj, "mpstat command returned unexpected output") + + # numbers are usually in the millions so 1000 is ok for tolerance + CPU_STATS_TOLERANCE = 1000 + psutil_result = psutil.cpu_stats() + self.assertAlmostEqual( + psutil_result.ctx_switches, + int(matchobj.group("cs")), + delta=CPU_STATS_TOLERANCE) + self.assertAlmostEqual( + psutil_result.syscalls, + int(matchobj.group("sysc")), + delta=CPU_STATS_TOLERANCE) + self.assertAlmostEqual( + psutil_result.interrupts, + int(matchobj.group("dev")), + delta=CPU_STATS_TOLERANCE) + self.assertAlmostEqual( + psutil_result.soft_interrupts, + int(matchobj.group("soft")), + delta=CPU_STATS_TOLERANCE) + + def test_cpu_count_logical(self): + out = sh('/usr/bin/mpstat -a') + mpstat_lcpu = int(re.search("lcpu=(\d+)", out).group(1)) + psutil_lcpu = psutil.cpu_count(logical=True) + self.assertEqual(mpstat_lcpu, psutil_lcpu) + + def test_net_if_addrs_names(self): + out = sh('/etc/ifconfig -l') + ifconfig_names = set(out.split()) + psutil_names = set(psutil.net_if_addrs().keys()) + self.assertSetEqual(ifconfig_names, psutil_names) + + +if __name__ == '__main__': + run_test_module_by_name(__file__) diff --git a/psutil/tests/test_connections.py b/psutil/tests/test_connections.py index 203ddebb..176e2664 100755 --- a/psutil/tests/test_connections.py +++ b/psutil/tests/test_connections.py @@ -152,6 +152,7 @@ class TestUnconnectedSockets(Base, unittest.TestCase): assert not conn.raddr self.assertEqual(conn.status, psutil.CONN_LISTEN) + @unittest.skipIf(not supports_ipv6(), "IPv6 not supported") def test_tcp_v6(self): addr = ("::1", get_free_port()) with closing(bind_socket(AF_INET6, SOCK_STREAM, addr=addr)) as sock: @@ -166,6 +167,7 @@ class TestUnconnectedSockets(Base, unittest.TestCase): assert not conn.raddr self.assertEqual(conn.status, psutil.CONN_NONE) + @unittest.skipIf(not supports_ipv6(), "IPv6 not supported") def test_udp_v6(self): addr = ("::1", get_free_port()) with closing(bind_socket(AF_INET6, SOCK_DGRAM, addr=addr)) as sock: @@ -418,7 +420,7 @@ class TestConnectedSocketPairs(Base, unittest.TestCase): # ===================================================================== -class TestSystemWideConnections(unittest.TestCase): +class TestSystemWideConnections(Base, unittest.TestCase): """Tests for net_connections().""" @skip_on_access_denied() diff --git a/psutil/tests/test_contracts.py b/psutil/tests/test_contracts.py index 95bf2146..f9543e57 100755 --- a/psutil/tests/test_contracts.py +++ b/psutil/tests/test_contracts.py @@ -14,8 +14,10 @@ import os import stat import time import traceback +import warnings from contextlib import closing +from psutil import AIX from psutil import BSD from psutil import FREEBSD from psutil import LINUX @@ -65,7 +67,8 @@ class TestAvailability(unittest.TestCase): self.assertEqual(hasattr(psutil, "win_service_get"), WINDOWS) def test_PROCFS_PATH(self): - self.assertEqual(hasattr(psutil, "PROCFS_PATH"), LINUX or SUNOS) + self.assertEqual(hasattr(psutil, "PROCFS_PATH"), + LINUX or SUNOS or AIX) def test_win_priority(self): ae = self.assertEqual @@ -108,7 +111,10 @@ class TestAvailability(unittest.TestCase): ae(hasattr(psutil, "RLIMIT_SIGPENDING"), hasit) def test_cpu_freq(self): - self.assertEqual(hasattr(psutil, "cpu_freq"), LINUX or OSX or WINDOWS) + linux = (LINUX and + (os.path.exists("/sys/devices/system/cpu/cpufreq") or + os.path.exists("/sys/devices/system/cpu/cpu0/cpufreq"))) + self.assertEqual(hasattr(psutil, "cpu_freq"), linux or OSX or WINDOWS) def test_sensors_temperatures(self): self.assertEqual(hasattr(psutil, "sensors_temperatures"), LINUX) @@ -118,7 +124,7 @@ class TestAvailability(unittest.TestCase): def test_battery(self): self.assertEqual(hasattr(psutil, "sensors_battery"), - LINUX or WINDOWS or FREEBSD) + LINUX or WINDOWS or FREEBSD or OSX) def test_proc_environ(self): self.assertEqual(hasattr(psutil.Process, "environ"), @@ -159,7 +165,23 @@ class TestAvailability(unittest.TestCase): def test_proc_memory_maps(self): hasit = hasattr(psutil.Process, "memory_maps") - self.assertEqual(hasit, False if OPENBSD or NETBSD else True) + self.assertEqual(hasit, False if OPENBSD or NETBSD or AIX else True) + + +# =================================================================== +# --- Test deprecations +# =================================================================== + + +class TestDeprecations(unittest.TestCase): + + def test_memory_info_ex(self): + with warnings.catch_warnings(record=True) as ws: + psutil.Process().memory_info_ex() + w = ws[0] + self.assertIsInstance(w.category(), FutureWarning) + self.assertIn("memory_info_ex() is deprecated", str(w.message)) + self.assertIn("use memory_info() instead", str(w.message)) # =================================================================== @@ -372,12 +394,14 @@ class TestFetchAllProcesses(unittest.TestCase): self.assertGreaterEqual(ret, 0) def ppid(self, ret, proc): - self.assertIsInstance(ret, int) + self.assertIsInstance(ret, (int, long)) self.assertGreaterEqual(ret, 0) def name(self, ret, proc): self.assertIsInstance(ret, str) - assert ret + # on AIX, "<exiting>" processes don't have names + if not AIX: + assert ret def create_time(self, ret, proc): self.assertIsInstance(ret, float) @@ -482,7 +506,7 @@ class TestFetchAllProcesses(unittest.TestCase): for value in ret: self.assertIsInstance(value, (int, long)) self.assertGreaterEqual(value, 0) - if POSIX and ret.vms != 0: + if POSIX and not AIX and ret.vms != 0: # VMS is always supposed to be the highest for name in ret._fields: if name != 'vms': @@ -536,8 +560,8 @@ class TestFetchAllProcesses(unittest.TestCase): check_connection_ntuple(conn) def cwd(self, ret, proc): - self.assertIsInstance(ret, str) - if ret is not None: # BSD may return None + if ret: # 'ret' can be None or empty + self.assertIsInstance(ret, str) assert os.path.isabs(ret), ret try: st = os.stat(ret) @@ -607,7 +631,7 @@ class TestFetchAllProcesses(unittest.TestCase): def num_ctx_switches(self, ret, proc): assert is_namedtuple(ret) for value in ret: - self.assertIsInstance(value, int) + self.assertIsInstance(value, (int, long)) self.assertGreaterEqual(value, 0) def rlimit(self, ret, proc): diff --git a/psutil/tests/test_linux.py b/psutil/tests/test_linux.py index 468b3c66..6ba17b25 100755 --- a/psutil/tests/test_linux.py +++ b/psutil/tests/test_linux.py @@ -29,6 +29,7 @@ from psutil._compat import PY3 from psutil._compat import u from psutil.tests import call_until from psutil.tests import HAS_BATTERY +from psutil.tests import HAS_CPU_FREQ from psutil.tests import HAS_RLIMIT from psutil.tests import MEMORY_TOLERANCE from psutil.tests import mock @@ -607,11 +608,13 @@ class TestSystemCPU(unittest.TestCase): self.assertIsNone(psutil._pslinux.cpu_count_physical()) assert m.called + @unittest.skipIf(not HAS_CPU_FREQ, "not supported") def test_cpu_freq_no_result(self): with mock.patch("psutil._pslinux.glob.glob", return_value=[]): self.assertIsNone(psutil.cpu_freq()) @unittest.skipIf(TRAVIS, "fails on Travis") + @unittest.skipIf(not HAS_CPU_FREQ, "not supported") def test_cpu_freq_use_second_file(self): # https://github.com/giampaolo/psutil/issues/981 def glob_mock(pattern): @@ -629,6 +632,7 @@ class TestSystemCPU(unittest.TestCase): assert psutil.cpu_freq() self.assertEqual(len(flags), 2) + @unittest.skipIf(not HAS_CPU_FREQ, "not supported") def test_cpu_freq_emulate_data(self): def open_mock(name, *args, **kwargs): if name.endswith('/scaling_cur_freq'): @@ -651,6 +655,7 @@ class TestSystemCPU(unittest.TestCase): self.assertEqual(freq.min, 600.0) self.assertEqual(freq.max, 700.0) + @unittest.skipIf(not HAS_CPU_FREQ, "not supported") def test_cpu_freq_emulate_multi_cpu(self): def open_mock(name, *args, **kwargs): if name.endswith('/scaling_cur_freq'): @@ -675,6 +680,7 @@ class TestSystemCPU(unittest.TestCase): self.assertEqual(freq.max, 300.0) @unittest.skipIf(TRAVIS, "fails on Travis") + @unittest.skipIf(not HAS_CPU_FREQ, "not supported") def test_cpu_freq_no_scaling_cur_freq_file(self): # See: https://github.com/giampaolo/psutil/issues/1071 def open_mock(name, *args, **kwargs): @@ -762,21 +768,25 @@ class TestSystemNetwork(unittest.TestCase): # Not always reliable. # self.assertEqual(stats.isup, 'RUNNING' in out, msg=out) self.assertEqual(stats.mtu, - int(re.findall(r'MTU:(\d+)', out)[0])) + int(re.findall(r'(?i)MTU[: ](\d+)', out)[0])) @retry_before_failing() def test_net_io_counters(self): def ifconfig(nic): ret = {} out = sh("ifconfig %s" % name) - ret['packets_recv'] = int(re.findall(r'RX packets:(\d+)', out)[0]) - ret['packets_sent'] = int(re.findall(r'TX packets:(\d+)', out)[0]) - ret['errin'] = int(re.findall(r'errors:(\d+)', out)[0]) - ret['errout'] = int(re.findall(r'errors:(\d+)', out)[1]) - ret['dropin'] = int(re.findall(r'dropped:(\d+)', out)[0]) - ret['dropout'] = int(re.findall(r'dropped:(\d+)', out)[1]) - ret['bytes_recv'] = int(re.findall(r'RX bytes:(\d+)', out)[0]) - ret['bytes_sent'] = int(re.findall(r'TX bytes:(\d+)', out)[0]) + ret['packets_recv'] = int( + re.findall(r'RX packets[: ](\d+)', out)[0]) + ret['packets_sent'] = int( + re.findall(r'TX packets[: ](\d+)', out)[0]) + ret['errin'] = int(re.findall(r'errors[: ](\d+)', out)[0]) + ret['errout'] = int(re.findall(r'errors[: ](\d+)', out)[1]) + ret['dropin'] = int(re.findall(r'dropped[: ](\d+)', out)[0]) + ret['dropout'] = int(re.findall(r'dropped[: ](\d+)', out)[1]) + ret['bytes_recv'] = int( + re.findall(r'RX (?:packets \d+ +)?bytes[: ](\d+)', out)[0]) + ret['bytes_sent'] = int( + re.findall(r'TX (?:packets \d+ +)?bytes[: ](\d+)', out)[0]) return ret nio = psutil.net_io_counters(pernic=True, nowrap=False) @@ -1322,7 +1332,9 @@ class TestSensorsBattery(unittest.TestCase): # Emulate a case where energy_full file does not exist. # Expected fallback on /capacity. def open_mock(name, *args, **kwargs): - if name.startswith("/sys/class/power_supply/BAT0/energy_full"): + energy_full = "/sys/class/power_supply/BAT0/energy_full" + charge_full = "/sys/class/power_supply/BAT0/charge_full" + if name.startswith(energy_full) or name.startswith(charge_full): raise IOError(errno.ENOENT, "") elif name.startswith("/sys/class/power_supply/BAT0/capacity"): return io.BytesIO(b"88") @@ -1573,6 +1585,20 @@ class TestProcess(unittest.TestCase): self.assertEqual(p.cmdline(), ['foo', 'bar', '']) assert m.called + def test_cmdline_spaces_mocked(self): + # see: https://github.com/giampaolo/psutil/issues/1179 + p = psutil.Process() + fake_file = io.StringIO(u('foo bar ')) + with mock.patch('psutil._pslinux.open', + return_value=fake_file, create=True) as m: + self.assertEqual(p.cmdline(), ['foo', 'bar']) + assert m.called + fake_file = io.StringIO(u('foo bar ')) + with mock.patch('psutil._pslinux.open', + return_value=fake_file, create=True) as m: + self.assertEqual(p.cmdline(), ['foo', 'bar', '']) + assert m.called + def test_readlink_path_deleted_mocked(self): with mock.patch('psutil._pslinux.os.readlink', return_value='/home/foo (deleted)'): diff --git a/psutil/tests/test_misc.py b/psutil/tests/test_misc.py index 85bab84c..f67c0e4c 100755 --- a/psutil/tests/test_misc.py +++ b/psutil/tests/test_misc.py @@ -18,7 +18,6 @@ import os import pickle import socket import stat -import sys from psutil import LINUX from psutil import POSIX @@ -49,6 +48,7 @@ from psutil.tests import HAS_SENSORS_TEMPERATURES from psutil.tests import import_module_by_path from psutil.tests import is_namedtuple from psutil.tests import mock +from psutil.tests import PYTHON_EXE from psutil.tests import reap_children from psutil.tests import reload_module from psutil.tests import retry @@ -365,6 +365,8 @@ class TestMisc(unittest.TestCase): def test_setup_script(self): setup_py = os.path.join(ROOT_DIR, 'setup.py') + if TRAVIS and not os.path.exists(setup_py): + return self.skipTest("can't find setup.py") module = import_module_by_path(setup_py) self.assertRaises(SystemExit, module.setup) self.assertEqual(module.get_version(), psutil.__version__) @@ -643,16 +645,20 @@ class TestWrapNumbers(unittest.TestCase): @unittest.skipIf(TOX, "can't test on TOX") +# See: https://travis-ci.org/giampaolo/psutil/jobs/295224806 +@unittest.skipIf(TRAVIS and not os.path.exists(SCRIPTS_DIR), + "can't locate scripts directory") class TestScripts(unittest.TestCase): """Tests for scripts in the "scripts" directory.""" @staticmethod - def assert_stdout(exe, args=None, **kwds): - exe = '"%s"' % os.path.join(SCRIPTS_DIR, exe) - if args: - exe = exe + ' ' + args + def assert_stdout(exe, *args, **kwargs): + exe = '%s' % os.path.join(SCRIPTS_DIR, exe) + cmd = [PYTHON_EXE, exe] + for arg in args: + cmd.append(arg) try: - out = sh(sys.executable + ' ' + exe, **kwds).strip() + out = sh(cmd, **kwargs).strip() except RuntimeError as err: if 'AccessDenied' in str(err): return str(err) @@ -700,7 +706,7 @@ class TestScripts(unittest.TestCase): self.assert_stdout('meminfo.py') def test_procinfo(self): - self.assert_stdout('procinfo.py', args=str(os.getpid())) + self.assert_stdout('procinfo.py', str(os.getpid())) # can't find users on APPVEYOR or TRAVIS @unittest.skipIf(APPVEYOR or TRAVIS and not psutil.users(), @@ -724,7 +730,7 @@ class TestScripts(unittest.TestCase): @unittest.skipIf(not HAS_MEMORY_MAPS, "not supported") def test_pmap(self): - self.assert_stdout('pmap.py', args=str(os.getpid())) + self.assert_stdout('pmap.py', str(os.getpid())) @unittest.skipIf(not HAS_MEMORY_FULL_INFO, "not supported") def test_procsmem(self): @@ -743,7 +749,7 @@ class TestScripts(unittest.TestCase): self.assert_syntax('iotop.py') def test_pidof(self): - output = self.assert_stdout('pidof.py', args=psutil.Process().name()) + output = self.assert_stdout('pidof.py', psutil.Process().name()) self.assertIn(str(os.getpid()), output) @unittest.skipIf(not WINDOWS, "WINDOWS only") @@ -1014,7 +1020,8 @@ class TestNetUtils(unittest.TestCase): # work around http://bugs.python.org/issue30204 types[s.getsockopt(socket.SOL_SOCKET, socket.SO_TYPE)] += 1 self.assertGreaterEqual(fams[socket.AF_INET], 2) - self.assertGreaterEqual(fams[socket.AF_INET6], 2) + if supports_ipv6(): + self.assertGreaterEqual(fams[socket.AF_INET6], 2) if POSIX and HAS_CONNECTIONS_UNIX: self.assertGreaterEqual(fams[socket.AF_UNIX], 2) self.assertGreaterEqual(types[socket.SOCK_STREAM], 2) diff --git a/psutil/tests/test_osx.py b/psutil/tests/test_osx.py index c8214f14..bcb2ba4e 100755 --- a/psutil/tests/test_osx.py +++ b/psutil/tests/test_osx.py @@ -14,6 +14,7 @@ import psutil from psutil import OSX from psutil.tests import create_zombie_proc from psutil.tests import get_test_subprocess +from psutil.tests import HAS_BATTERY from psutil.tests import MEMORY_TOLERANCE from psutil.tests import reap_children from psutil.tests import retry_before_failing @@ -285,6 +286,18 @@ class TestSystemAPIs(unittest.TestCase): self.assertEqual(stats.mtu, int(re.findall(r'mtu (\d+)', out)[0])) + # --- sensors_battery + + @unittest.skipIf(not HAS_BATTERY, "no battery") + def test_sensors_battery(self): + out = sh("pmset -g batt") + percent = re.search("(\d+)%", out).group(1) + drawing_from = re.search("Now drawing from '([^']+)'", out).group(1) + power_plugged = drawing_from == "AC Power" + psutil_result = psutil.sensors_battery() + self.assertEqual(psutil_result.power_plugged, power_plugged) + self.assertEqual(psutil_result.percent, int(percent)) + if __name__ == '__main__': run_test_module_by_name(__file__) diff --git a/psutil/tests/test_posix.py b/psutil/tests/test_posix.py index 580abdfd..c59f9a1c 100755 --- a/psutil/tests/test_posix.py +++ b/psutil/tests/test_posix.py @@ -10,11 +10,13 @@ import datetime import errno import os +import re import subprocess import sys import time import psutil +from psutil import AIX from psutil import BSD from psutil import LINUX from psutil import OPENBSD @@ -27,7 +29,7 @@ from psutil.tests import APPVEYOR from psutil.tests import get_kernel_version from psutil.tests import get_test_subprocess from psutil.tests import mock -from psutil.tests import PYTHON +from psutil.tests import PYTHON_EXE from psutil.tests import reap_children from psutil.tests import retry_before_failing from psutil.tests import run_test_module_by_name @@ -46,8 +48,9 @@ def ps(cmd): if not LINUX: cmd = cmd.replace(" --no-headers ", " ") if SUNOS: - cmd = cmd.replace("-o command", "-o comm") cmd = cmd.replace("-o start", "-o stime") + if AIX: + cmd = cmd.replace("-o rss", "-o rssize") output = sh(cmd) if not LINUX: output = output.split('\n')[1].strip() @@ -56,6 +59,31 @@ def ps(cmd): except ValueError: return output +# ps "-o" field names differ wildly between platforms. +# "comm" means "only executable name" but is not available on BSD platforms. +# "args" means "command with all its arguments", and is also not available +# on BSD platforms. +# "command" is like "args" on most platforms, but like "comm" on AIX, +# and not available on SUNOS. +# so for the executable name we can use "comm" on Solaris and split "command" +# on other platforms. +# to get the cmdline (with args) we have to use "args" on AIX and +# Solaris, and can use "command" on all others. + + +def ps_name(pid): + field = "command" + if SUNOS: + field = "comm" + return ps("ps --no-headers -o %s -p %s" % (field, pid)).split(' ')[0] + + +def ps_args(pid): + field = "command" + if AIX or SUNOS: + field = "args" + return ps("ps --no-headers -o %s -p %s" % (field, pid)) + @unittest.skipIf(not POSIX, "POSIX only") class TestProcess(unittest.TestCase): @@ -63,7 +91,7 @@ class TestProcess(unittest.TestCase): @classmethod def setUpClass(cls): - cls.pid = get_test_subprocess([PYTHON, "-E", "-O"], + cls.pid = get_test_subprocess([PYTHON_EXE, "-E", "-O"], stdin=subprocess.PIPE).pid wait_for_pid(cls.pid) @@ -121,12 +149,14 @@ class TestProcess(unittest.TestCase): self.assertEqual(vsz_ps, vsz_psutil) def test_name(self): - # use command + arg since "comm" keyword not supported on all platforms - name_ps = ps("ps --no-headers -o command -p %s" % ( - self.pid)).split(' ')[0] + name_ps = ps_name(self.pid) # remove path if there is any, from the command name_ps = os.path.basename(name_ps).lower() name_psutil = psutil.Process(self.pid).name().lower() + # ...because of how we calculate PYTHON_EXE; on OSX this may + # be "pythonX.Y". + name_ps = re.sub(r"\d.\d", "", name_ps) + name_psutil = re.sub(r"\d.\d", "", name_psutil) self.assertEqual(name_ps, name_psutil) def test_name_long(self): @@ -179,8 +209,7 @@ class TestProcess(unittest.TestCase): self.assertIn(time_ps, [time_psutil_tstamp, round_time_psutil_tstamp]) def test_exe(self): - ps_pathname = ps("ps --no-headers -o command -p %s" % - self.pid).split(' ')[0] + ps_pathname = ps_name(self.pid) psutil_pathname = psutil.Process(self.pid).exe() try: self.assertEqual(ps_pathname, psutil_pathname) @@ -195,18 +224,17 @@ class TestProcess(unittest.TestCase): self.assertEqual(ps_pathname, adjusted_ps_pathname) def test_cmdline(self): - ps_cmdline = ps("ps --no-headers -o command -p %s" % self.pid) + ps_cmdline = ps_args(self.pid) psutil_cmdline = " ".join(psutil.Process(self.pid).cmdline()) - if SUNOS: - # ps on Solaris only shows the first part of the cmdline - psutil_cmdline = psutil_cmdline.split(" ")[0] self.assertEqual(ps_cmdline, psutil_cmdline) # On SUNOS "ps" reads niceness /proc/pid/psinfo which returns an # incorrect value (20); the real deal is getpriority(2) which # returns 0; psutil relies on it, see: # https://github.com/giampaolo/psutil/issues/1082 + # AIX has the same issue @unittest.skipIf(SUNOS, "not reliable on SUNOS") + @unittest.skipIf(AIX, "not reliable on AIX") def test_nice(self): ps_nice = ps("ps --no-headers -o nice -p %s" % self.pid) psutil_nice = psutil.Process().nice() @@ -262,7 +290,7 @@ class TestSystemAPIs(unittest.TestCase): def test_pids(self): # Note: this test might fail if the OS is starting/killing # other processes in the meantime - if SUNOS: + if SUNOS or AIX: cmd = ["ps", "-A", "-o", "pid"] else: cmd = ["ps", "ax", "-o", "pid"] @@ -285,11 +313,7 @@ class TestSystemAPIs(unittest.TestCase): # on OSX and OPENBSD ps doesn't show pid 0 if OSX or OPENBSD and 0 not in pids_ps: pids_ps.insert(0, 0) - - if pids_ps != pids_psutil: - difference = [x for x in pids_psutil if x not in pids_ps] + \ - [x for x in pids_ps if x not in pids_psutil] - self.fail("difference: " + str(difference)) + self.assertEqual(pids_ps, pids_psutil) # for some reason ifconfig -a does not report all interfaces # returned by psutil @@ -355,6 +379,8 @@ class TestSystemAPIs(unittest.TestCase): psutil._psposix.wait_pid, os.getpid()) assert m.called + # AIX can return '-' in df output instead of numbers, e.g. for /proc + @unittest.skipIf(AIX, "unreliable on AIX") def test_disk_usage(self): def df(device): out = sh("df -k %s" % device).strip() diff --git a/psutil/tests/test_process.py b/psutil/tests/test_process.py index 8b88f766..3b60d38a 100755 --- a/psutil/tests/test_process.py +++ b/psutil/tests/test_process.py @@ -9,6 +9,7 @@ import collections import errno import getpass +import itertools import os import signal import socket @@ -22,6 +23,7 @@ import types import psutil +from psutil import AIX from psutil import BSD from psutil import LINUX from psutil import NETBSD @@ -49,9 +51,10 @@ from psutil.tests import HAS_MEMORY_MAPS from psutil.tests import HAS_PROC_CPU_NUM from psutil.tests import HAS_PROC_IO_COUNTERS from psutil.tests import HAS_RLIMIT +from psutil.tests import HAS_THREADS from psutil.tests import mock from psutil.tests import PYPY -from psutil.tests import PYTHON +from psutil.tests import PYTHON_EXE from psutil.tests import reap_children from psutil.tests import retry_before_failing from psutil.tests import run_test_module_by_name @@ -62,7 +65,6 @@ from psutil.tests import skip_on_not_implemented from psutil.tests import TESTFILE_PREFIX from psutil.tests import TESTFN from psutil.tests import ThreadTask -from psutil.tests import TOX from psutil.tests import TRAVIS from psutil.tests import unittest from psutil.tests import wait_for_pid @@ -151,7 +153,7 @@ class TestProcess(unittest.TestCase): if POSIX: self.assertEqual(code, -signal.SIGKILL) else: - self.assertEqual(code, 0) + self.assertEqual(code, signal.SIGTERM) self.assertFalse(p.is_running()) sproc = get_test_subprocess() @@ -161,12 +163,12 @@ class TestProcess(unittest.TestCase): if POSIX: self.assertEqual(code, -signal.SIGTERM) else: - self.assertEqual(code, 0) + self.assertEqual(code, signal.SIGTERM) self.assertFalse(p.is_running()) # check sys.exit() code code = "import time, sys; time.sleep(0.01); sys.exit(5);" - sproc = get_test_subprocess([PYTHON, "-c", code]) + sproc = get_test_subprocess([PYTHON_EXE, "-c", code]) p = psutil.Process(sproc.pid) self.assertEqual(p.wait(), 5) self.assertFalse(p.is_running()) @@ -175,7 +177,7 @@ class TestProcess(unittest.TestCase): # It is not supposed to raise NSP when the process is gone. # On UNIX this should return None, on Windows it should keep # returning the exit code. - sproc = get_test_subprocess([PYTHON, "-c", code]) + sproc = get_test_subprocess([PYTHON_EXE, "-c", code]) p = psutil.Process(sproc.pid) self.assertEqual(p.wait(), 5) self.assertIn(p.wait(), (5, None)) @@ -207,8 +209,8 @@ class TestProcess(unittest.TestCase): # to get None. self.assertEqual(ret2, None) else: - self.assertEqual(ret1, 0) - self.assertEqual(ret1, 0) + self.assertEqual(ret1, signal.SIGTERM) + self.assertEqual(ret1, signal.SIGTERM) def test_wait_timeout_0(self): sproc = get_test_subprocess() @@ -227,7 +229,7 @@ class TestProcess(unittest.TestCase): if POSIX: self.assertEqual(code, -signal.SIGKILL) else: - self.assertEqual(code, 0) + self.assertEqual(code, signal.SIGTERM) self.assertFalse(p.is_running()) def test_cpu_percent(self): @@ -316,10 +318,10 @@ class TestProcess(unittest.TestCase): # test reads io1 = p.io_counters() - with open(PYTHON, 'rb') as f: + with open(PYTHON_EXE, 'rb') as f: f.read() io2 = p.io_counters() - if not BSD: + if not BSD and not AIX: self.assertGreater(io2.read_count, io1.read_count) self.assertEqual(io2.write_count, io1.write_count) if LINUX: @@ -528,6 +530,7 @@ class TestProcess(unittest.TestCase): p = psutil.Process() self.assertGreater(p.num_handles(), 0) + @unittest.skipIf(not HAS_THREADS, 'not supported') def test_threads(self): p = psutil.Process() if OPENBSD: @@ -552,6 +555,7 @@ class TestProcess(unittest.TestCase): @retry_before_failing() @skip_on_access_denied(only_if=OSX) + @unittest.skipIf(not HAS_THREADS, 'not supported') def test_threads_2(self): sproc = get_test_subprocess() p = psutil.Process(sproc.pid) @@ -693,12 +697,12 @@ class TestProcess(unittest.TestCase): sproc = get_test_subprocess() exe = psutil.Process(sproc.pid).exe() try: - self.assertEqual(exe, PYTHON) + self.assertEqual(exe, PYTHON_EXE) except AssertionError: - if WINDOWS and len(exe) == len(PYTHON): + if WINDOWS and len(exe) == len(PYTHON_EXE): # on Windows we don't care about case sensitivity normcase = os.path.normcase - self.assertEqual(normcase(exe), normcase(PYTHON)) + self.assertEqual(normcase(exe), normcase(PYTHON_EXE)) else: # certain platforms such as BSD are more accurate returning: # "/usr/local/bin/python2.7" @@ -709,7 +713,7 @@ class TestProcess(unittest.TestCase): ver = "%s.%s" % (sys.version_info[0], sys.version_info[1]) try: self.assertEqual(exe.replace(ver, ''), - PYTHON.replace(ver, '')) + PYTHON_EXE.replace(ver, '')) except AssertionError: # Tipically OSX. Really not sure what to do here. pass @@ -718,7 +722,7 @@ class TestProcess(unittest.TestCase): self.assertEqual(out, 'hey') def test_cmdline(self): - cmdline = [PYTHON, "-c", "import time; time.sleep(60)"] + cmdline = [PYTHON_EXE, "-c", "import time; time.sleep(60)"] sproc = get_test_subprocess(cmdline) try: self.assertEqual(' '.join(psutil.Process(sproc.pid).cmdline()), @@ -728,27 +732,39 @@ class TestProcess(unittest.TestCase): # and Open BSD returns a truncated string. # Also /proc/pid/cmdline behaves the same so it looks # like this is a kernel bug. - if NETBSD or OPENBSD: + # XXX - AIX truncates long arguments in /proc/pid/cmdline + if NETBSD or OPENBSD or AIX: self.assertEqual( - psutil.Process(sproc.pid).cmdline()[0], PYTHON) + psutil.Process(sproc.pid).cmdline()[0], PYTHON_EXE) else: raise def test_name(self): - sproc = get_test_subprocess(PYTHON) + sproc = get_test_subprocess(PYTHON_EXE) name = psutil.Process(sproc.pid).name().lower() pyexe = os.path.basename(os.path.realpath(sys.executable)).lower() assert pyexe.startswith(name), (pyexe, name) # XXX @unittest.skipIf(SUNOS, "broken on SUNOS") + @unittest.skipIf(AIX, "broken on AIX") def test_prog_w_funky_name(self): # Test that name(), exe() and cmdline() correctly handle programs # with funky chars such as spaces and ")", see: # https://github.com/giampaolo/psutil/issues/628 + + def rm(): + # Try to limit occasional failures on Appveyor: + # https://ci.appveyor.com/project/giampaolo/psutil/build/1350/ + # job/lbo3bkju55le850n + try: + safe_rmpath(funky_path) + except OSError: + pass + funky_path = TESTFN + 'foo bar )' create_exe(funky_path) - self.addCleanup(safe_rmpath, funky_path) + self.addCleanup(rm) cmdline = [funky_path, "-c", "import time; [time.sleep(0.01) for x in range(3000)];" "arg1", "arg2", "", "arg3", ""] @@ -853,7 +869,8 @@ class TestProcess(unittest.TestCase): self.assertEqual(p.cwd(), os.getcwd()) def test_cwd_2(self): - cmd = [PYTHON, "-c", "import os, time; os.chdir('..'); time.sleep(60)"] + cmd = [PYTHON_EXE, "-c", + "import os, time; os.chdir('..'); time.sleep(60)"] sproc = get_test_subprocess(cmd) p = psutil.Process(sproc.pid) call_until(p.cwd, "ret == os.path.dirname(os.getcwd())") @@ -870,10 +887,9 @@ class TestProcess(unittest.TestCase): self.assertEqual(len(initial), len(set(initial))) all_cpus = list(range(len(psutil.cpu_percent(percpu=True)))) - # setting on travis doesn't seem to work (always return all - # CPUs on get): - # AssertionError: Lists differ: [0, 1, 2, 3, 4, 5, 6, ... != [0] - for n in all_cpus: + # Work around travis failure: + # https://travis-ci.org/giampaolo/psutil/builds/284173194 + for n in all_cpus if not TRAVIS else initial: p.cpu_affinity([n]) self.assertEqual(p.cpu_affinity(), [n]) if hasattr(os, "sched_getaffinity"): @@ -911,6 +927,24 @@ class TestProcess(unittest.TestCase): self.assertRaises(TypeError, p.cpu_affinity, [0, "1"]) self.assertRaises(ValueError, p.cpu_affinity, [0, -1]) + @unittest.skipIf(not HAS_CPU_AFFINITY, 'not supported') + def test_cpu_affinity_all_combinations(self): + p = psutil.Process() + initial = p.cpu_affinity() + assert initial, initial + self.addCleanup(p.cpu_affinity, initial) + + # All possible CPU set combinations. + combos = [] + for l in range(0, len(initial) + 1): + for subset in itertools.combinations(initial, l): + if subset: + combos.append(list(subset)) + + for combo in combos: + p.cpu_affinity(combo) + self.assertEqual(p.cpu_affinity(), combo) + # TODO: #595 @unittest.skipIf(BSD, "broken on BSD") # can't find any process file on Appveyor @@ -937,7 +971,7 @@ class TestProcess(unittest.TestCase): # another process cmdline = "import time; f = open(r'%s', 'r'); time.sleep(60);" % TESTFN - sproc = get_test_subprocess([PYTHON, "-c", cmdline]) + sproc = get_test_subprocess([PYTHON_EXE, "-c", cmdline]) p = psutil.Process(sproc.pid) for x in range(100): @@ -1268,7 +1302,15 @@ class TestProcess(unittest.TestCase): # set methods succeed_or_zombie_p_exc(zproc.parent) if hasattr(zproc, 'cpu_affinity'): - succeed_or_zombie_p_exc(zproc.cpu_affinity, [0]) + try: + succeed_or_zombie_p_exc(zproc.cpu_affinity, [0]) + except ValueError as err: + if TRAVIS and LINUX and "not eligible" in str(err): + # https://travis-ci.org/giampaolo/psutil/jobs/279890461 + pass + else: + raise + succeed_or_zombie_p_exc(zproc.nice, 0) if hasattr(zproc, 'ionice'): if LINUX: @@ -1294,10 +1336,14 @@ class TestProcess(unittest.TestCase): # self.assertEqual(zpid.ppid(), os.getpid()) # ...and all other APIs should be able to deal with it self.assertTrue(psutil.pid_exists(zpid)) - self.assertIn(zpid, psutil.pids()) - self.assertIn(zpid, [x.pid for x in psutil.process_iter()]) - psutil._pmap = {} - self.assertIn(zpid, [x.pid for x in psutil.process_iter()]) + if not TRAVIS and OSX: + # For some reason this started failing all of the sudden. + # Maybe they upgraded OSX version? + # https://travis-ci.org/giampaolo/psutil/jobs/310896404 + self.assertIn(zpid, psutil.pids()) + self.assertIn(zpid, [x.pid for x in psutil.process_iter()]) + psutil._pmap = {} + self.assertIn(zpid, [x.pid for x in psutil.process_iter()]) @unittest.skipIf(not POSIX, 'POSIX only') def test_zombie_process_is_running_w_exc(self): @@ -1360,28 +1406,23 @@ class TestProcess(unittest.TestCase): @unittest.skipIf(not HAS_ENVIRON, "not supported") def test_environ(self): + def clean_dict(d): + # Most of these are problematic on Travis. + d.pop("PSUTIL_TESTING", None) + d.pop("PLAT", None) + d.pop("HOME", None) + if OSX: + d.pop("__CF_USER_TEXT_ENCODING", None) + d.pop("VERSIONER_PYTHON_PREFER_32_BIT", None) + d.pop("VERSIONER_PYTHON_VERSION", None) + return dict( + [(k.rstrip("\r\n"), v.rstrip("\r\n")) for k, v in d.items()]) + self.maxDiff = None p = psutil.Process() - d = p.environ() - d2 = os.environ.copy() - - removes = [] - if 'PSUTIL_TESTING' in os.environ: - removes.append('PSUTIL_TESTING') - if OSX: - removes.extend([ - "__CF_USER_TEXT_ENCODING", - "VERSIONER_PYTHON_PREFER_32_BIT", - "VERSIONER_PYTHON_VERSION"]) - if LINUX or OSX: - removes.extend(['PLAT']) - if TOX: - removes.extend(['HOME']) - for key in removes: - d.pop(key, None) - d2.pop(key, None) - - self.assertEqual(d, d2) + d1 = clean_dict(p.environ()) + d2 = clean_dict(os.environ.copy()) + self.assertEqual(d1, d2) @unittest.skipIf(not HAS_ENVIRON, "not supported") @unittest.skipIf(not POSIX, "POSIX only") @@ -1485,7 +1526,7 @@ class TestPopen(unittest.TestCase): # XXX this test causes a ResourceWarning on Python 3 because # psutil.__subproc instance doesn't get propertly freed. # Not sure what to do though. - cmd = [PYTHON, "-c", "import time; time.sleep(60);"] + cmd = [PYTHON_EXE, "-c", "import time; time.sleep(60);"] with psutil.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) as proc: proc.name() @@ -1496,7 +1537,7 @@ class TestPopen(unittest.TestCase): proc.terminate() def test_ctx_manager(self): - with psutil.Popen([PYTHON, "-V"], + with psutil.Popen([PYTHON_EXE, "-V"], stdout=subprocess.PIPE, stderr=subprocess.PIPE, stdin=subprocess.PIPE) as proc: @@ -1510,7 +1551,7 @@ class TestPopen(unittest.TestCase): # subprocess.Popen()'s terminate(), kill() and send_signal() do # not raise exception after the process is gone. psutil.Popen # diverges from that. - cmd = [PYTHON, "-c", "import time; time.sleep(60);"] + cmd = [PYTHON_EXE, "-c", "import time; time.sleep(60);"] with psutil.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) as proc: proc.terminate() diff --git a/psutil/tests/test_system.py b/psutil/tests/test_system.py index e93bb6b5..20b132a9 100755 --- a/psutil/tests/test_system.py +++ b/psutil/tests/test_system.py @@ -19,6 +19,7 @@ import tempfile import time import psutil +from psutil import AIX from psutil import BSD from psutil import FREEBSD from psutil import LINUX @@ -669,7 +670,8 @@ class TestSystemAPIs(unittest.TestCase): @unittest.skipIf(LINUX and not os.path.exists('/proc/diskstats'), '/proc/diskstats not available on this linux version') - @unittest.skipIf(APPVEYOR, "unreliable on APPVEYOR") # no visible disks + @unittest.skipIf(APPVEYOR and psutil.disk_io_counters() is None, + "unreliable on APPVEYOR") # no visible disks def test_disk_io_counters(self): def check_ntuple(nt): self.assertEqual(nt[0], nt.read_count) @@ -689,6 +691,7 @@ class TestSystemAPIs(unittest.TestCase): assert getattr(nt, name) >= 0, nt ret = psutil.disk_io_counters(perdisk=False) + assert ret is not None, "no disks on this system?" check_ntuple(ret) ret = psutil.disk_io_counters(perdisk=True) # make sure there are no duplicates @@ -742,7 +745,8 @@ class TestSystemAPIs(unittest.TestCase): for name in infos._fields: value = getattr(infos, name) self.assertGreaterEqual(value, 0) - if name in ('ctx_switches', 'interrupts'): + # on AIX, ctx_switches is always 0 + if not AIX and name in ('ctx_switches', 'interrupts'): self.assertGreater(value, 0) @unittest.skipIf(not HAS_CPU_FREQ, "not suported") diff --git a/psutil/tests/test_unicode.py b/psutil/tests/test_unicode.py index 9b99fdf9..c2a2f847 100755 --- a/psutil/tests/test_unicode.py +++ b/psutil/tests/test_unicode.py @@ -49,7 +49,7 @@ etc.) and make sure that: For a detailed explanation of how psutil handles unicode see: - https://github.com/giampaolo/psutil/issues/1040 -- https://pythonhosted.org/psutil/#unicode +- http://psutil.readthedocs.io/#unicode """ import os @@ -91,8 +91,6 @@ import psutil.tests def safe_rmpath(path): - # XXX - return _safe_rmpath(path) if APPVEYOR: # TODO - this is quite random and I'm not sure why it happens, # nor I can reproduce it locally: @@ -125,8 +123,9 @@ def subprocess_supports_unicode(name): except UnicodeEncodeError: return False else: - reap_children() return True + finally: + reap_children() # An invalid unicode string. @@ -145,18 +144,23 @@ else: class _BaseFSAPIsTests(object): funky_name = None - def setUp(self): - safe_rmpath(self.funky_name) + @classmethod + def setUpClass(cls): + safe_rmpath(cls.funky_name) + create_exe(cls.funky_name) + + @classmethod + def tearDownClass(cls): + reap_children() + safe_rmpath(cls.funky_name) def tearDown(self): reap_children() - safe_rmpath(self.funky_name) def expect_exact_path_match(self): raise NotImplementedError("must be implemented in subclass") def test_proc_exe(self): - create_exe(self.funky_name) subp = get_test_subprocess(cmd=[self.funky_name]) p = psutil.Process(subp.pid) exe = p.exe() @@ -165,7 +169,6 @@ class _BaseFSAPIsTests(object): self.assertEqual(exe, self.funky_name) def test_proc_name(self): - create_exe(self.funky_name) subp = get_test_subprocess(cmd=[self.funky_name]) if WINDOWS: # On Windows name() is determined from exe() first, because @@ -182,7 +185,6 @@ class _BaseFSAPIsTests(object): self.assertEqual(name, os.path.basename(self.funky_name)) def test_proc_cmdline(self): - create_exe(self.funky_name) subp = get_test_subprocess(cmd=[self.funky_name]) p = psutil.Process(subp.pid) cmdline = p.cmdline() @@ -192,18 +194,20 @@ class _BaseFSAPIsTests(object): self.assertEqual(cmdline, [self.funky_name]) def test_proc_cwd(self): - safe_mkdir(self.funky_name) - with chdir(self.funky_name): + dname = self.funky_name + "2" + self.addCleanup(safe_rmpath, dname) + safe_mkdir(dname) + with chdir(dname): p = psutil.Process() cwd = p.cwd() self.assertIsInstance(p.cwd(), str) if self.expect_exact_path_match(): - self.assertEqual(cwd, self.funky_name) + self.assertEqual(cwd, dname) def test_proc_open_files(self): p = psutil.Process() start = set(p.open_files()) - with open(self.funky_name, 'wb'): + with open(self.funky_name, 'rb'): new = set(p.open_files()) path = (new - start).pop().path self.assertIsInstance(path, str) @@ -260,8 +264,10 @@ class _BaseFSAPIsTests(object): self.assertEqual(conn.laddr, name) def test_disk_usage(self): - safe_mkdir(self.funky_name) - psutil.disk_usage(self.funky_name) + dname = self.funky_name + "2" + self.addCleanup(safe_rmpath, dname) + safe_mkdir(dname) + psutil.disk_usage(dname) @unittest.skipIf(not HAS_MEMORY_MAPS, "not supported") @unittest.skipIf(not PY3, "ctypes does not support unicode on PY2") @@ -335,6 +341,9 @@ class TestWinProcessName(unittest.TestCase): class TestNonFSAPIS(unittest.TestCase): """Unicode tests for non fs-related APIs.""" + def tearDown(self): + reap_children() + @unittest.skipIf(not HAS_ENVIRON, "not supported") def test_proc_environ(self): # Note: differently from others, this test does not deal |