summaryrefslogtreecommitdiff
path: root/psutil/tests/test_contracts.py
diff options
context:
space:
mode:
Diffstat (limited to 'psutil/tests/test_contracts.py')
-rwxr-xr-xpsutil/tests/test_contracts.py396
1 files changed, 212 insertions, 184 deletions
diff --git a/psutil/tests/test_contracts.py b/psutil/tests/test_contracts.py
index 85f9c00d..51845662 100755
--- a/psutil/tests/test_contracts.py
+++ b/psutil/tests/test_contracts.py
@@ -1,4 +1,4 @@
-#!/usr/bin/env python
+#!/usr/bin/env python3
# Copyright (c) 2009, Giampaolo Rodola'. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
@@ -10,11 +10,13 @@ Some of these are duplicates of tests test_system.py and test_process.py
"""
import errno
+import multiprocessing
import os
+import signal
import stat
+import sys
import time
import traceback
-import warnings
from psutil import AIX
from psutil import BSD
@@ -27,22 +29,28 @@ from psutil import OSX
from psutil import POSIX
from psutil import SUNOS
from psutil import WINDOWS
+from psutil._compat import FileNotFoundError
from psutil._compat import long
+from psutil._compat import range
+from psutil.tests import APPVEYOR
+from psutil.tests import check_connection_ntuple
+from psutil.tests import CI_TESTING
from psutil.tests import create_sockets
from psutil.tests import enum
-from psutil.tests import get_kernel_version
+from psutil.tests import GITHUB_ACTIONS
from psutil.tests import HAS_CPU_FREQ
from psutil.tests import HAS_NET_IO_COUNTERS
-from psutil.tests import HAS_RLIMIT
from psutil.tests import HAS_SENSORS_FANS
from psutil.tests import HAS_SENSORS_TEMPERATURES
from psutil.tests import is_namedtuple
-from psutil.tests import safe_rmpath
+from psutil.tests import kernel_version
+from psutil.tests import process_namespace
+from psutil.tests import PsutilTestCase
+from psutil.tests import PYPY
+from psutil.tests import serialrun
from psutil.tests import SKIP_SYSCONS
-from psutil.tests import TESTFN
from psutil.tests import unittest
from psutil.tests import VALID_PROC_STATUSES
-from psutil.tests import warn
import psutil
@@ -53,7 +61,7 @@ import psutil
# Make sure code reflects what doc promises in terms of APIs
# availability.
-class TestAvailConstantsAPIs(unittest.TestCase):
+class TestAvailConstantsAPIs(PsutilTestCase):
def test_PROCFS_PATH(self):
self.assertEqual(hasattr(psutil, "PROCFS_PATH"),
@@ -82,32 +90,41 @@ class TestAvailConstantsAPIs(unittest.TestCase):
ae(hasattr(psutil, "IOPRIO_LOW"), WINDOWS)
ae(hasattr(psutil, "IOPRIO_VERYLOW"), WINDOWS)
- def test_linux_rlimit(self):
+ @unittest.skipIf(GITHUB_ACTIONS and LINUX,
+ "unsupported on GITHUB_ACTIONS + LINUX")
+ def test_rlimit(self):
ae = self.assertEqual
- hasit = LINUX and get_kernel_version() >= (2, 6, 36)
- ae(hasattr(psutil.Process, "rlimit"), hasit)
- ae(hasattr(psutil, "RLIM_INFINITY"), hasit)
- ae(hasattr(psutil, "RLIMIT_AS"), hasit)
- ae(hasattr(psutil, "RLIMIT_CORE"), hasit)
- ae(hasattr(psutil, "RLIMIT_CPU"), hasit)
- ae(hasattr(psutil, "RLIMIT_DATA"), hasit)
- ae(hasattr(psutil, "RLIMIT_FSIZE"), hasit)
- ae(hasattr(psutil, "RLIMIT_LOCKS"), hasit)
- ae(hasattr(psutil, "RLIMIT_MEMLOCK"), hasit)
- ae(hasattr(psutil, "RLIMIT_NOFILE"), hasit)
- ae(hasattr(psutil, "RLIMIT_NPROC"), hasit)
- ae(hasattr(psutil, "RLIMIT_RSS"), hasit)
- ae(hasattr(psutil, "RLIMIT_STACK"), hasit)
-
- hasit = LINUX and get_kernel_version() >= (3, 0)
- ae(hasattr(psutil, "RLIMIT_MSGQUEUE"), hasit)
- ae(hasattr(psutil, "RLIMIT_NICE"), hasit)
- ae(hasattr(psutil, "RLIMIT_RTPRIO"), hasit)
- ae(hasattr(psutil, "RLIMIT_RTTIME"), hasit)
- ae(hasattr(psutil, "RLIMIT_SIGPENDING"), hasit)
-
-
-class TestAvailSystemAPIs(unittest.TestCase):
+ ae(hasattr(psutil, "RLIM_INFINITY"), LINUX or FREEBSD)
+ ae(hasattr(psutil, "RLIMIT_AS"), LINUX or FREEBSD)
+ ae(hasattr(psutil, "RLIMIT_CORE"), LINUX or FREEBSD)
+ ae(hasattr(psutil, "RLIMIT_CPU"), LINUX or FREEBSD)
+ ae(hasattr(psutil, "RLIMIT_DATA"), LINUX or FREEBSD)
+ ae(hasattr(psutil, "RLIMIT_FSIZE"), LINUX or FREEBSD)
+ ae(hasattr(psutil, "RLIMIT_MEMLOCK"), LINUX or FREEBSD)
+ ae(hasattr(psutil, "RLIMIT_NOFILE"), LINUX or FREEBSD)
+ ae(hasattr(psutil, "RLIMIT_NPROC"), LINUX or FREEBSD)
+ ae(hasattr(psutil, "RLIMIT_RSS"), LINUX or FREEBSD)
+ ae(hasattr(psutil, "RLIMIT_STACK"), LINUX or FREEBSD)
+
+ ae(hasattr(psutil, "RLIMIT_LOCKS"), LINUX)
+ if POSIX:
+ if kernel_version() >= (2, 6, 8):
+ ae(hasattr(psutil, "RLIMIT_MSGQUEUE"), LINUX)
+ if kernel_version() >= (2, 6, 12):
+ ae(hasattr(psutil, "RLIMIT_NICE"), LINUX)
+ if kernel_version() >= (2, 6, 12):
+ ae(hasattr(psutil, "RLIMIT_RTPRIO"), LINUX)
+ if kernel_version() >= (2, 6, 25):
+ ae(hasattr(psutil, "RLIMIT_RTTIME"), LINUX)
+ if kernel_version() >= (2, 6, 8):
+ ae(hasattr(psutil, "RLIMIT_SIGPENDING"), LINUX)
+
+ ae(hasattr(psutil, "RLIMIT_SWAP"), FREEBSD)
+ ae(hasattr(psutil, "RLIMIT_SBSIZE"), FREEBSD)
+ ae(hasattr(psutil, "RLIMIT_NPTS"), FREEBSD)
+
+
+class TestAvailSystemAPIs(PsutilTestCase):
def test_win_service_iter(self):
self.assertEqual(hasattr(psutil, "win_service_iter"), WINDOWS)
@@ -116,11 +133,8 @@ class TestAvailSystemAPIs(unittest.TestCase):
self.assertEqual(hasattr(psutil, "win_service_get"), WINDOWS)
def test_cpu_freq(self):
- linux = (LINUX and
- (os.path.exists("/sys/devices/system/cpu/cpufreq") or
- os.path.exists("/sys/devices/system/cpu/cpu0/cpufreq")))
self.assertEqual(hasattr(psutil, "cpu_freq"),
- linux or MACOS or WINDOWS or FREEBSD)
+ LINUX or MACOS or WINDOWS or FREEBSD)
def test_sensors_temperatures(self):
self.assertEqual(
@@ -138,11 +152,12 @@ class TestAvailSystemAPIs(unittest.TestCase):
self.assertEqual(hasattr(psutil, "disk_swaps"), hasit)
-class TestAvailProcessAPIs(unittest.TestCase):
+class TestAvailProcessAPIs(PsutilTestCase):
def test_environ(self):
self.assertEqual(hasattr(psutil.Process, "environ"),
- LINUX or MACOS or WINDOWS or AIX or SUNOS)
+ LINUX or MACOS or WINDOWS or AIX or SUNOS or
+ FREEBSD or OPENBSD or NETBSD)
def test_uids(self):
self.assertEqual(hasattr(psutil.Process, "uids"), POSIX)
@@ -156,8 +171,10 @@ class TestAvailProcessAPIs(unittest.TestCase):
def test_ionice(self):
self.assertEqual(hasattr(psutil.Process, "ionice"), LINUX or WINDOWS)
+ @unittest.skipIf(GITHUB_ACTIONS and LINUX,
+ "unsupported on GITHUB_ACTIONS + LINUX")
def test_rlimit(self):
- self.assertEqual(hasattr(psutil.Process, "rlimit"), LINUX)
+ self.assertEqual(hasattr(psutil.Process, "rlimit"), LINUX or FREEBSD)
def test_io_counters(self):
hasit = hasattr(psutil.Process, "io_counters")
@@ -184,27 +201,11 @@ class TestAvailProcessAPIs(unittest.TestCase):
# ===================================================================
-# --- Test deprecations
+# --- API types
# ===================================================================
-class TestDeprecations(unittest.TestCase):
-
- def test_memory_info_ex(self):
- with warnings.catch_warnings(record=True) as ws:
- psutil.Process().memory_info_ex()
- w = ws[0]
- self.assertIsInstance(w.category(), DeprecationWarning)
- self.assertIn("memory_info_ex() is deprecated", str(w.message))
- self.assertIn("use memory_info() instead", str(w.message))
-
-
-# ===================================================================
-# --- System API types
-# ===================================================================
-
-
-class TestSystemAPITypes(unittest.TestCase):
+class TestSystemAPITypes(PsutilTestCase):
"""Check the return types of system related APIs.
Mainly we want to test we never return unicode on Python 2, see:
https://github.com/giampaolo/psutil/issues/1039
@@ -214,9 +215,6 @@ class TestSystemAPITypes(unittest.TestCase):
def setUpClass(cls):
cls.proc = psutil.Process()
- def tearDown(self):
- safe_rmpath(TESTFN)
-
def assert_ntuple_of_nums(self, nt, type_=float, gezero=True):
assert is_namedtuple(nt)
for n in nt:
@@ -259,6 +257,8 @@ class TestSystemAPITypes(unittest.TestCase):
self.assertIsInstance(disk.mountpoint, str)
self.assertIsInstance(disk.fstype, str)
self.assertIsInstance(disk.opts, str)
+ self.assertIsInstance(disk.maxfile, int)
+ self.assertIsInstance(disk.maxpath, int)
@unittest.skipIf(SKIP_SYSCONS, "requires root")
def test_net_connections(self):
@@ -273,10 +273,10 @@ class TestSystemAPITypes(unittest.TestCase):
for ifname, addrs in psutil.net_if_addrs().items():
self.assertIsInstance(ifname, str)
for addr in addrs:
- if enum is not None:
- assert isinstance(addr.family, enum.IntEnum), addr
+ if enum is not None and not PYPY:
+ self.assertIsInstance(addr.family, enum.IntEnum)
else:
- assert isinstance(addr.family, int), addr
+ self.assertIsInstance(addr.family, int)
self.assertIsInstance(addr.address, str)
self.assertIsInstance(addr.netmask, (str, type(None)))
self.assertIsInstance(addr.broadcast, (str, type(None)))
@@ -332,105 +332,115 @@ class TestSystemAPITypes(unittest.TestCase):
self.assertIsInstance(user.pid, (int, type(None)))
+class TestProcessWaitType(PsutilTestCase):
+
+ @unittest.skipIf(not POSIX, "not POSIX")
+ def test_negative_signal(self):
+ p = psutil.Process(self.spawn_testproc().pid)
+ p.terminate()
+ code = p.wait()
+ self.assertEqual(code, -signal.SIGTERM)
+ if enum is not None:
+ self.assertIsInstance(code, enum.IntEnum)
+ else:
+ self.assertIsInstance(code, int)
+
+
# ===================================================================
# --- Featch all processes test
# ===================================================================
-class TestFetchAllProcesses(unittest.TestCase):
+def proc_info(pid):
+ tcase = PsutilTestCase()
+
+ def check_exception(exc, proc, name, ppid):
+ tcase.assertEqual(exc.pid, pid)
+ tcase.assertEqual(exc.name, name)
+ if isinstance(exc, psutil.ZombieProcess):
+ if exc.ppid is not None:
+ tcase.assertGreaterEqual(exc.ppid, 0)
+ tcase.assertEqual(exc.ppid, ppid)
+ elif isinstance(exc, psutil.NoSuchProcess):
+ tcase.assertProcessGone(proc)
+ str(exc)
+
+ def do_wait():
+ if pid != 0:
+ try:
+ proc.wait(0)
+ except psutil.Error as exc:
+ check_exception(exc, proc, name, ppid)
+
+ try:
+ proc = psutil.Process(pid)
+ d = proc.as_dict(['ppid', 'name'])
+ except psutil.NoSuchProcess:
+ return {}
+
+ name, ppid = d['name'], d['ppid']
+ info = {'pid': proc.pid}
+ ns = process_namespace(proc)
+ # We don't use oneshot() because in order not to fool
+ # check_exception() in case of NSP.
+ for fun, fun_name in ns.iter(ns.getters, clear_cache=False):
+ try:
+ info[fun_name] = fun()
+ except psutil.Error as exc:
+ check_exception(exc, proc, name, ppid)
+ continue
+ do_wait()
+ return info
+
+
+@serialrun
+class TestFetchAllProcesses(PsutilTestCase):
"""Test which iterates over all running processes and performs
some sanity checks against Process API's returned values.
+ Uses a process pool to get info about all processes.
"""
- def get_attr_names(self):
- excluded_names = set([
- 'send_signal', 'suspend', 'resume', 'terminate', 'kill', 'wait',
- 'as_dict', 'parent', 'parents', 'children', 'memory_info_ex',
- 'oneshot',
- ])
- if LINUX and not HAS_RLIMIT:
- excluded_names.add('rlimit')
- attrs = []
- for name in dir(psutil.Process):
- if name.startswith("_"):
- continue
- if name in excluded_names:
- continue
- attrs.append(name)
- return attrs
-
- def iter_procs(self):
- attrs = self.get_attr_names()
- for p in psutil.process_iter():
- with p.oneshot():
- for name in attrs:
- yield (p, name)
-
- def call_meth(self, p, name):
- args = ()
- kwargs = {}
- attr = getattr(p, name, None)
- if attr is not None and callable(attr):
- if name == 'rlimit':
- args = (psutil.RLIMIT_NOFILE,)
- elif name == 'memory_maps':
- kwargs = {'grouped': False}
- return attr(*args, **kwargs)
- else:
- return attr
+ def setUp(self):
+ self.pool = multiprocessing.Pool()
- def test_fetch_all(self):
- valid_procs = 0
- default = object()
+ def tearDown(self):
+ self.pool.terminate()
+ self.pool.join()
+
+ def iter_proc_info(self):
+ # Fixes "can't pickle <function proc_info>: it's not the
+ # same object as test_contracts.proc_info".
+ from psutil.tests.test_contracts import proc_info
+ return self.pool.imap_unordered(proc_info, psutil.pids())
+
+ def test_all(self):
failures = []
- for p, name in self.iter_procs():
- ret = default
- try:
- ret = self.call_meth(p, name)
- except NotImplementedError:
- msg = "%r was skipped because not implemented" % (
- self.__class__.__name__ + '.test_' + name)
- warn(msg)
- except (psutil.NoSuchProcess, psutil.AccessDenied) as err:
- self.assertEqual(err.pid, p.pid)
- if err.name:
- # make sure exception's name attr is set
- # with the actual process name
- self.assertEqual(err.name, p.name())
- assert str(err)
- assert err.msg
- except Exception:
- s = '\n' + '=' * 70 + '\n'
- s += "FAIL: test_%s (proc=%s" % (name, p)
- if ret != default:
- s += ", ret=%s)" % repr(ret)
- s += ')\n'
- s += '-' * 70
- s += "\n%s" % traceback.format_exc()
- s = "\n".join((" " * 4) + i for i in s.splitlines())
- s += '\n'
- failures.append(s)
- break
- else:
- valid_procs += 1
- if ret not in (0, 0.0, [], None, '', {}):
- assert ret, ret
+ for info in self.iter_proc_info():
+ for name, value in info.items():
meth = getattr(self, name)
- meth(ret, p)
-
+ try:
+ meth(value, info)
+ except AssertionError:
+ s = '\n' + '=' * 70 + '\n'
+ s += "FAIL: test_%s pid=%s, ret=%s\n" % (
+ name, info['pid'], repr(value))
+ s += '-' * 70
+ s += "\n%s" % traceback.format_exc()
+ s = "\n".join((" " * 4) + i for i in s.splitlines())
+ s += '\n'
+ failures.append(s)
+ else:
+ if value not in (0, 0.0, [], None, '', {}):
+ assert value, value
if failures:
- self.fail(''.join(failures))
-
- # we should always have a non-empty list, not including PID 0 etc.
- # special cases.
- assert valid_procs
+ raise self.fail(''.join(failures))
- def cmdline(self, ret, proc):
+ def cmdline(self, ret, info):
self.assertIsInstance(ret, list)
for part in ret:
self.assertIsInstance(part, str)
- def exe(self, ret, proc):
+ def exe(self, ret, info):
self.assertIsInstance(ret, (str, type(None)))
if not ret:
self.assertEqual(ret, '')
@@ -443,30 +453,36 @@ class TestFetchAllProcesses(unittest.TestCase):
# http://stackoverflow.com/questions/3112546/os-path-exists-lies
if POSIX and os.path.isfile(ret):
if hasattr(os, 'access') and hasattr(os, "X_OK"):
- # XXX may fail on MACOS
- assert os.access(ret, os.X_OK)
-
- def pid(self, ret, proc):
+ # XXX: may fail on MACOS
+ try:
+ assert os.access(ret, os.X_OK)
+ except AssertionError:
+ if os.path.exists(ret) and not CI_TESTING:
+ raise
+
+ def pid(self, ret, info):
self.assertIsInstance(ret, int)
self.assertGreaterEqual(ret, 0)
- def ppid(self, ret, proc):
+ def ppid(self, ret, info):
self.assertIsInstance(ret, (int, long))
self.assertGreaterEqual(ret, 0)
- def name(self, ret, proc):
+ def name(self, ret, info):
self.assertIsInstance(ret, str)
+ if APPVEYOR and not ret and info['status'] == 'stopped':
+ return
# on AIX, "<exiting>" processes don't have names
if not AIX:
assert ret
- def create_time(self, ret, proc):
+ def create_time(self, ret, info):
self.assertIsInstance(ret, float)
try:
self.assertGreaterEqual(ret, 0)
except AssertionError:
# XXX
- if OPENBSD and proc.status() == psutil.STATUS_ZOMBIE:
+ if OPENBSD and info['status'] == psutil.STATUS_ZOMBIE:
pass
else:
raise
@@ -476,13 +492,13 @@ class TestFetchAllProcesses(unittest.TestCase):
# with strftime
time.strftime("%Y %m %d %H:%M:%S", time.localtime(ret))
- def uids(self, ret, proc):
+ def uids(self, ret, info):
assert is_namedtuple(ret)
for uid in ret:
self.assertIsInstance(uid, int)
self.assertGreaterEqual(uid, 0)
- def gids(self, ret, proc):
+ def gids(self, ret, info):
assert is_namedtuple(ret)
# note: testing all gids as above seems not to be reliable for
# gid == 30 (nodoby); not sure why.
@@ -491,24 +507,24 @@ class TestFetchAllProcesses(unittest.TestCase):
if not MACOS and not NETBSD:
self.assertGreaterEqual(gid, 0)
- def username(self, ret, proc):
+ def username(self, ret, info):
self.assertIsInstance(ret, str)
assert ret
- def status(self, ret, proc):
+ def status(self, ret, info):
self.assertIsInstance(ret, str)
assert ret
self.assertNotEqual(ret, '?') # XXX
self.assertIn(ret, VALID_PROC_STATUSES)
- def io_counters(self, ret, proc):
+ def io_counters(self, ret, info):
assert is_namedtuple(ret)
for field in ret:
self.assertIsInstance(field, (int, long))
if field != -1:
self.assertGreaterEqual(field, 0)
- def ionice(self, ret, proc):
+ def ionice(self, ret, info):
if LINUX:
self.assertIsInstance(ret.ioclass, int)
self.assertIsInstance(ret.value, int)
@@ -524,11 +540,13 @@ class TestFetchAllProcesses(unittest.TestCase):
self.assertGreaterEqual(ret, 0)
self.assertIn(ret, choices)
- def num_threads(self, ret, proc):
+ def num_threads(self, ret, info):
self.assertIsInstance(ret, int)
+ if APPVEYOR and not ret and info['status'] == 'stopped':
+ return
self.assertGreaterEqual(ret, 1)
- def threads(self, ret, proc):
+ def threads(self, ret, info):
self.assertIsInstance(ret, list)
for t in ret:
assert is_namedtuple(t)
@@ -538,18 +556,18 @@ class TestFetchAllProcesses(unittest.TestCase):
for field in t:
self.assertIsInstance(field, (int, float))
- def cpu_times(self, ret, proc):
+ def cpu_times(self, ret, info):
assert is_namedtuple(ret)
for n in ret:
self.assertIsInstance(n, float)
self.assertGreaterEqual(n, 0)
# TODO: check ntuple fields
- def cpu_percent(self, ret, proc):
+ def cpu_percent(self, ret, info):
self.assertIsInstance(ret, float)
assert 0.0 <= ret <= 100.0, ret
- def cpu_num(self, ret, proc):
+ def cpu_num(self, ret, info):
self.assertIsInstance(ret, int)
if FREEBSD and ret == -1:
return
@@ -558,7 +576,7 @@ class TestFetchAllProcesses(unittest.TestCase):
self.assertEqual(ret, 0)
self.assertIn(ret, list(range(psutil.cpu_count())))
- def memory_info(self, ret, proc):
+ def memory_info(self, ret, info):
assert is_namedtuple(ret)
for value in ret:
self.assertIsInstance(value, (int, long))
@@ -569,7 +587,7 @@ class TestFetchAllProcesses(unittest.TestCase):
self.assertGreaterEqual(ret.peak_nonpaged_pool, ret.nonpaged_pool)
self.assertGreaterEqual(ret.peak_pagefile, ret.pagefile)
- def memory_full_info(self, ret, proc):
+ def memory_full_info(self, ret, info):
assert is_namedtuple(ret)
total = psutil.virtual_memory().total
for name in ret._fields:
@@ -585,7 +603,7 @@ class TestFetchAllProcesses(unittest.TestCase):
if LINUX:
self.assertGreaterEqual(ret.pss, ret.uss)
- def open_files(self, ret, proc):
+ def open_files(self, ret, info):
self.assertIsInstance(ret, list)
for f in ret:
self.assertIsInstance(f.fd, int)
@@ -603,19 +621,25 @@ class TestFetchAllProcesses(unittest.TestCase):
# XXX see: https://github.com/giampaolo/psutil/issues/595
continue
assert os.path.isabs(f.path), f
- assert os.path.isfile(f.path), f
+ try:
+ st = os.stat(f.path)
+ except FileNotFoundError:
+ pass
+ else:
+ assert stat.S_ISREG(st.st_mode), f
- def num_fds(self, ret, proc):
+ def num_fds(self, ret, info):
self.assertIsInstance(ret, int)
self.assertGreaterEqual(ret, 0)
- def connections(self, ret, proc):
+ def connections(self, ret, info):
with create_sockets():
self.assertEqual(len(ret), len(set(ret)))
for conn in ret:
assert is_namedtuple(conn)
+ check_connection_ntuple(conn)
- def cwd(self, ret, proc):
+ def cwd(self, ret, info):
if ret: # 'ret' can be None or empty
self.assertIsInstance(ret, str)
assert os.path.isabs(ret), ret
@@ -631,28 +655,28 @@ class TestFetchAllProcesses(unittest.TestCase):
else:
assert stat.S_ISDIR(st.st_mode)
- def memory_percent(self, ret, proc):
+ def memory_percent(self, ret, info):
self.assertIsInstance(ret, float)
assert 0 <= ret <= 100, ret
- def is_running(self, ret, proc):
+ def is_running(self, ret, info):
self.assertIsInstance(ret, bool)
- def cpu_affinity(self, ret, proc):
+ def cpu_affinity(self, ret, info):
self.assertIsInstance(ret, list)
assert ret != [], ret
- cpus = range(psutil.cpu_count())
+ cpus = list(range(psutil.cpu_count()))
for n in ret:
self.assertIsInstance(n, int)
self.assertIn(n, cpus)
- def terminal(self, ret, proc):
+ def terminal(self, ret, info):
self.assertIsInstance(ret, (str, type(None)))
if ret is not None:
assert os.path.isabs(ret), ret
assert os.path.exists(ret), ret
- def memory_maps(self, ret, proc):
+ def memory_maps(self, ret, info):
for nt in ret:
self.assertIsInstance(nt.addr, str)
self.assertIsInstance(nt.perms, str)
@@ -674,11 +698,11 @@ class TestFetchAllProcesses(unittest.TestCase):
self.assertIsInstance(value, (int, long))
self.assertGreaterEqual(value, 0)
- def num_handles(self, ret, proc):
+ def num_handles(self, ret, info):
self.assertIsInstance(ret, int)
self.assertGreaterEqual(ret, 0)
- def nice(self, ret, proc):
+ def nice(self, ret, info):
self.assertIsInstance(ret, int)
if POSIX:
assert -20 <= ret <= 20, ret
@@ -686,20 +710,24 @@ class TestFetchAllProcesses(unittest.TestCase):
priorities = [getattr(psutil, x) for x in dir(psutil)
if x.endswith('_PRIORITY_CLASS')]
self.assertIn(ret, priorities)
+ if sys.version_info > (3, 4):
+ self.assertIsInstance(ret, enum.IntEnum)
+ else:
+ self.assertIsInstance(ret, int)
- def num_ctx_switches(self, ret, proc):
+ def num_ctx_switches(self, ret, info):
assert is_namedtuple(ret)
for value in ret:
self.assertIsInstance(value, (int, long))
self.assertGreaterEqual(value, 0)
- def rlimit(self, ret, proc):
+ def rlimit(self, ret, info):
self.assertIsInstance(ret, tuple)
self.assertEqual(len(ret), 2)
self.assertGreaterEqual(ret[0], -1)
self.assertGreaterEqual(ret[1], -1)
- def environ(self, ret, proc):
+ def environ(self, ret, info):
self.assertIsInstance(ret, dict)
for k, v in ret.items():
self.assertIsInstance(k, str)
@@ -707,5 +735,5 @@ class TestFetchAllProcesses(unittest.TestCase):
if __name__ == '__main__':
- from psutil.tests.runner import run
- run(__file__)
+ from psutil.tests.runner import run_from_name
+ run_from_name(__file__)