summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGiampaolo Rodola <g.rodola@gmail.com>2020-04-23 05:24:01 +0200
committerGiampaolo Rodola <g.rodola@gmail.com>2020-04-23 05:24:01 +0200
commitbdb6e7b52d625097ef82742e9647532d3ec7eac4 (patch)
treea8432a83b26e39af084658b6f4bc3f4a4b8753fd
parent940a5b29b6cd44df16c4404c1dc3f50da237ac42 (diff)
parent97796454d5a14b38b1a036958ad3dfe35faa3b4a (diff)
downloadpsutil-bdb6e7b52d625097ef82742e9647532d3ec7eac4.tar.gz
Merge branch 'master' of github.com:giampaolo/psutil
-rw-r--r--MANIFEST.in1
-rw-r--r--Makefile4
-rw-r--r--psutil/__init__.py2
-rw-r--r--psutil/_common.py4
-rw-r--r--psutil/_compat.py17
-rw-r--r--psutil/_pslinux.py1
-rw-r--r--psutil/tests/__init__.py121
-rwxr-xr-xpsutil/tests/test_memory_leaks.py164
-rwxr-xr-xpsutil/tests/test_misc.py299
-rwxr-xr-xpsutil/tests/test_testutils.py400
-rwxr-xr-xscripts/internal/check_broken_links.py1
-rwxr-xr-xscripts/sensors.py1
12 files changed, 572 insertions, 443 deletions
diff --git a/MANIFEST.in b/MANIFEST.in
index 380a4fa2..d0d75240 100644
--- a/MANIFEST.in
+++ b/MANIFEST.in
@@ -103,6 +103,7 @@ include psutil/tests/test_posix.py
include psutil/tests/test_process.py
include psutil/tests/test_sunos.py
include psutil/tests/test_system.py
+include psutil/tests/test_testutils.py
include psutil/tests/test_unicode.py
include psutil/tests/test_windows.py
include scripts/battery.py
diff --git a/Makefile b/Makefile
index 5c4d5b70..fbd7ffcc 100644
--- a/Makefile
+++ b/Makefile
@@ -124,6 +124,10 @@ test-misc: ## Run miscellaneous tests.
${MAKE} install
$(TEST_PREFIX) $(PYTHON) psutil/tests/test_misc.py
+test-testutils: ## Run test utils tests.
+ ${MAKE} install
+ $(TEST_PREFIX) $(PYTHON) psutil/tests/test_testutils.py
+
test-unicode: ## Test APIs dealing with strings.
${MAKE} install
$(TEST_PREFIX) $(PYTHON) psutil/tests/test_unicode.py
diff --git a/psutil/__init__.py b/psutil/__init__.py
index 46c4a20b..e6a2da8d 100644
--- a/psutil/__init__.py
+++ b/psutil/__init__.py
@@ -226,7 +226,7 @@ __all__ = [
__all__.extend(_psplatform.__extra__all__)
__author__ = "Giampaolo Rodola'"
-__version__ = "5.7.0"
+__version__ = "5.7.1"
version_info = tuple([int(num) for num in __version__.split('.')])
_timer = getattr(time, 'monotonic', time.time)
diff --git a/psutil/_common.py b/psutil/_common.py
index 8a39de7f..b97bb01d 100644
--- a/psutil/_common.py
+++ b/psutil/_common.py
@@ -785,7 +785,7 @@ def hilite(s, color="green", bold=False):
if not term_supports_colors():
return s
attr = []
- colors = dict(green='32', red='91', brown='33')
+ colors = dict(green='32', red='91', brown='33', yellow='93')
colors[None] = '29'
try:
color = colors[color]
@@ -812,7 +812,7 @@ def print_color(s, color="green", bold=False, file=sys.stdout):
SetConsoleTextAttribute = \
ctypes.windll.Kernel32.SetConsoleTextAttribute
- colors = dict(green=2, red=4, brown=6)
+ colors = dict(green=2, red=4, brown=6, yellow=6)
colors[None] = DEFAULT_COLOR
try:
color = colors[color]
diff --git a/psutil/_compat.py b/psutil/_compat.py
index 2965fd1b..64a5761e 100644
--- a/psutil/_compat.py
+++ b/psutil/_compat.py
@@ -5,13 +5,14 @@
"""Module which provides compatibility with older Python versions."""
import collections
+import contextlib
import errno
import functools
import os
import sys
__all__ = ["PY3", "long", "xrange", "unicode", "basestring", "u", "b",
- "lru_cache", "which", "get_terminal_size",
+ "lru_cache", "which", "get_terminal_size", "redirect_stderr",
"FileNotFoundError", "PermissionError", "ProcessLookupError",
"InterruptedError", "ChildProcessError", "FileExistsError"]
@@ -343,3 +344,17 @@ except ImportError:
return (res[1], res[0])
except Exception:
return fallback
+
+
+# python 3.4
+try:
+ from contextlib import redirect_stderr
+except ImportError:
+ @contextlib.contextmanager
+ def redirect_stderr(target):
+ original = sys.stderr
+ try:
+ sys.stderr = target
+ yield
+ finally:
+ sys.stderr = original
diff --git a/psutil/_pslinux.py b/psutil/_pslinux.py
index 4fb783d1..aca5fd7d 100644
--- a/psutil/_pslinux.py
+++ b/psutil/_pslinux.py
@@ -361,7 +361,6 @@ def calculate_avail_vmem(mems):
if line.startswith(b'low'):
watermark_low += int(line.split()[1])
watermark_low *= PAGESIZE
- watermark_low = watermark_low
avail = free - watermark_low
pagecache = lru_active_file + lru_inactive_file
diff --git a/psutil/tests/__init__.py b/psutil/tests/__init__.py
index cd78fae6..f3358806 100644
--- a/psutil/tests/__init__.py
+++ b/psutil/tests/__init__.py
@@ -9,12 +9,12 @@ Test utilities.
"""
from __future__ import print_function
-
import atexit
import contextlib
import ctypes
import errno
import functools
+import gc
import os
import random
import re
@@ -40,7 +40,9 @@ from psutil import MACOS
from psutil import POSIX
from psutil import SUNOS
from psutil import WINDOWS
+from psutil._common import bytes2human
from psutil._common import supports_ipv6
+from psutil._common import print_color
from psutil._compat import ChildProcessError
from psutil._compat import FileExistsError
from psutil._compat import FileNotFoundError
@@ -48,6 +50,7 @@ from psutil._compat import PY3
from psutil._compat import u
from psutil._compat import unicode
from psutil._compat import which
+from psutil._compat import xrange
if sys.version_info < (2, 7):
import unittest2 as unittest # requires "pip install unittest2"
@@ -84,7 +87,7 @@ __all__ = [
'ThreadTask'
# test utils
'unittest', 'skip_on_access_denied', 'skip_on_not_implemented',
- 'retry_on_failure',
+ 'retry_on_failure', 'TestMemoryLeak',
# install utils
'install_pip', 'install_test_deps',
# fs utils
@@ -814,6 +817,120 @@ class TestCase(unittest.TestCase):
unittest.TestCase = TestCase
+@unittest.skipIf(PYPY, "unreliable on PYPY")
+class TestMemoryLeak(unittest.TestCase):
+ """Test framework class for detecting function memory leaks (typically
+ functions implemented in C).
+ It does so by calling a function many times, and checks whether the
+ process memory usage increased before and after having called the
+ function repeadetly.
+ Note that sometimes this may produce false positives.
+ PyPy appears to be completely unstable for this framework, probably
+ because of how its JIT handles memory, so tests on PYPY are
+ automatically skipped.
+ """
+ # Configurable class attrs.
+ times = 1200
+ warmup_times = 10
+ tolerance = 4096 # memory
+ retry_for = 3.0 # seconds
+ verbose = True
+
+ def setUp(self):
+ self._thisproc = psutil.Process()
+ gc.collect()
+
+ def _get_mem(self):
+ # USS is the closest thing we have to "real" memory usage and it
+ # should be less likely to produce false positives.
+ mem = self._thisproc.memory_full_info()
+ return getattr(mem, "uss", mem.rss)
+
+ def _call(self, fun):
+ return fun()
+
+ def _itercall(self, fun, iterator):
+ """Get 2 distinct memory samples, before and after having
+ called fun repeadetly, and return the memory difference.
+ """
+ ncalls = 0
+ gc.collect()
+ mem1 = self._get_mem()
+ for x in iterator:
+ ret = self._call(fun)
+ ncalls += 1
+ del x, ret
+ gc.collect()
+ mem2 = self._get_mem()
+ self.assertEqual(gc.garbage, [])
+ diff = mem2 - mem1
+ if diff < 0:
+ self._log("negative memory diff -%s" % (bytes2human(abs(diff))))
+ return (diff, ncalls)
+
+ def _call_ntimes(self, fun, times):
+ return self._itercall(fun, xrange(times))[0]
+
+ def _call_for(self, fun, secs):
+ def iterator(secs):
+ stop_at = time.time() + secs
+ while time.time() < stop_at:
+ yield
+ return self._itercall(fun, iterator(secs))
+
+ def _log(self, msg):
+ if self.verbose:
+ print_color(msg, color="yellow", file=sys.stderr)
+
+ def execute(self, fun, times=times, warmup_times=warmup_times,
+ tolerance=tolerance, retry_for=retry_for):
+ """Test a callable."""
+ if times <= 0:
+ raise ValueError("times must be > 0")
+ if warmup_times < 0:
+ raise ValueError("warmup_times must be >= 0")
+ if tolerance is not None and tolerance < 0:
+ raise ValueError("tolerance must be >= 0")
+ if retry_for is not None and retry_for < 0:
+ raise ValueError("retry_for must be >= 0")
+
+ # warm up
+ self._call_ntimes(fun, warmup_times)
+ mem1 = self._call_ntimes(fun, times)
+
+ if mem1 > tolerance:
+ # This doesn't necessarily mean we have a leak yet.
+ # At this point we assume that after having called the
+ # function so many times the memory usage is stabilized
+ # and if there are no leaks it should not increase
+ # anymore. Let's keep calling fun for N more seconds and
+ # fail if we notice any difference (ignore tolerance).
+ msg = "+%s after %s calls; try calling fun for another %s secs" % (
+ bytes2human(mem1), times, retry_for)
+ if not retry_for:
+ raise self.fail(msg)
+ else:
+ self._log(msg)
+
+ mem2, ncalls = self._call_for(fun, retry_for)
+ if mem2 > mem1:
+ # failure
+ msg = "+%s memory increase after %s calls; " % (
+ bytes2human(mem1), times)
+ msg += "+%s after another %s calls over %s secs" % (
+ bytes2human(mem2), ncalls, retry_for)
+ raise self.fail(msg)
+
+ def execute_w_exc(self, exc, fun, **kwargs):
+ """Convenience method to test a callable while making sure it
+ raises an exception on every call.
+ """
+ def call():
+ self.assertRaises(exc, fun)
+
+ self.execute(call, **kwargs)
+
+
def retry_on_failure(retries=NO_RETRIES):
"""Decorator which runs a test function and retries N times before
actually failing.
diff --git a/psutil/tests/test_memory_leaks.py b/psutil/tests/test_memory_leaks.py
index f9cad70f..b0b4af1b 100755
--- a/psutil/tests/test_memory_leaks.py
+++ b/psutil/tests/test_memory_leaks.py
@@ -17,11 +17,7 @@ because of how its JIT handles memory, so tests are skipped.
from __future__ import print_function
import functools
-import gc
import os
-import sys
-import threading
-import time
import psutil
import psutil._common
@@ -32,9 +28,7 @@ from psutil import OPENBSD
from psutil import POSIX
from psutil import SUNOS
from psutil import WINDOWS
-from psutil._common import bytes2human
from psutil._compat import ProcessLookupError
-from psutil._compat import xrange
from psutil.tests import CIRRUS
from psutil.tests import create_sockets
from psutil.tests import get_test_subprocess
@@ -50,21 +44,15 @@ from psutil.tests import HAS_RLIMIT
from psutil.tests import HAS_SENSORS_BATTERY
from psutil.tests import HAS_SENSORS_FANS
from psutil.tests import HAS_SENSORS_TEMPERATURES
-from psutil.tests import PYPY
from psutil.tests import reap_children
from psutil.tests import safe_rmpath
from psutil.tests import skip_on_access_denied
from psutil.tests import TESTFN
+from psutil.tests import TestMemoryLeak
from psutil.tests import TRAVIS
from psutil.tests import unittest
-
-# configurable opts
-LOOPS = 1000
-MEMORY_TOLERANCE = 4096
-RETRY_FOR = 3
SKIP_PYTHON_IMPL = True
-
cext = psutil._psplatform.cext
thisproc = psutil.Process()
@@ -79,107 +67,12 @@ def skip_if_linux():
"worthless on LINUX (pure python)")
-@unittest.skipIf(PYPY, "unreliable on PYPY")
-class TestMemLeak(unittest.TestCase):
- """Base framework class which calls a function many times and
- produces a failure if process memory usage keeps increasing
- between calls or over time.
- """
- tolerance = MEMORY_TOLERANCE
- loops = LOOPS
- retry_for = RETRY_FOR
-
- def setUp(self):
- gc.collect()
-
- def execute(self, fun, *args, **kwargs):
- """Test a callable."""
- def call_many_times():
- for x in xrange(loops):
- self._call(fun, *args, **kwargs)
- del x
- gc.collect()
-
- tolerance = kwargs.pop('tolerance_', None) or self.tolerance
- loops = kwargs.pop('loops_', None) or self.loops
- retry_for = kwargs.pop('retry_for_', None) or self.retry_for
-
- # warm up
- for x in range(10):
- self._call(fun, *args, **kwargs)
- self.assertEqual(gc.garbage, [])
- self.assertEqual(threading.active_count(), 1)
- self.assertEqual(thisproc.children(), [])
-
- # Get 2 distinct memory samples, before and after having
- # called fun repeadetly.
- # step 1
- call_many_times()
- mem1 = self._get_mem()
- # step 2
- call_many_times()
- mem2 = self._get_mem()
-
- diff1 = mem2 - mem1
- if diff1 > tolerance:
- # This doesn't necessarily mean we have a leak yet.
- # At this point we assume that after having called the
- # function so many times the memory usage is stabilized
- # and if there are no leaks it should not increase
- # anymore.
- # Let's keep calling fun for 3 more seconds and fail if
- # we notice any difference.
- ncalls = 0
- stop_at = time.time() + retry_for
- while time.time() <= stop_at:
- self._call(fun, *args, **kwargs)
- ncalls += 1
-
- del stop_at
- gc.collect()
- mem3 = self._get_mem()
- diff2 = mem3 - mem2
-
- if mem3 > mem2:
- # failure
- extra_proc_mem = bytes2human(diff1 + diff2)
- print("exta proc mem: %s" % extra_proc_mem, file=sys.stderr)
- msg = "+%s after %s calls, +%s after another %s calls, "
- msg += "+%s extra proc mem"
- msg = msg % (
- bytes2human(diff1), loops, bytes2human(diff2), ncalls,
- extra_proc_mem)
- self.fail(msg)
-
- def execute_w_exc(self, exc, fun, *args, **kwargs):
- """Convenience function which tests a callable raising
- an exception.
- """
- def call():
- self.assertRaises(exc, fun, *args, **kwargs)
-
- self.execute(call)
-
- @staticmethod
- def _get_mem():
- # By using USS memory it seems it's less likely to bump
- # into false positives.
- if LINUX or WINDOWS or MACOS:
- return thisproc.memory_full_info().uss
- else:
- return thisproc.memory_info().rss
-
- @staticmethod
- def _call(fun, *args, **kwargs):
- fun(*args, **kwargs)
-
-
# ===================================================================
# Process class
# ===================================================================
-class TestProcessObjectLeaks(TestMemLeak):
+class TestProcessObjectLeaks(TestMemoryLeak):
"""Test leaks of Process class methods."""
proc = thisproc
@@ -232,7 +125,7 @@ class TestProcessObjectLeaks(TestMemLeak):
def test_nice_set(self):
niceness = thisproc.nice()
- self.execute(self.proc.nice, niceness)
+ self.execute(lambda: self.proc.nice(niceness))
@unittest.skipIf(not HAS_IONICE, "not supported")
def test_ionice_get(self):
@@ -242,9 +135,9 @@ class TestProcessObjectLeaks(TestMemLeak):
def test_ionice_set(self):
if WINDOWS:
value = thisproc.ionice()
- self.execute(self.proc.ionice, value)
+ self.execute(lambda: self.proc.ionice(value))
else:
- self.execute(self.proc.ionice, psutil.IOPRIO_CLASS_NONE)
+ self.execute(lambda: self.proc.ionice(psutil.IOPRIO_CLASS_NONE))
fun = functools.partial(cext.proc_ioprio_set, os.getpid(), -1, 0)
self.execute_w_exc(OSError, fun)
@@ -322,9 +215,10 @@ class TestProcessObjectLeaks(TestMemLeak):
@unittest.skipIf(not HAS_CPU_AFFINITY, "not supported")
def test_cpu_affinity_set(self):
affinity = thisproc.cpu_affinity()
- self.execute(self.proc.cpu_affinity, affinity)
+ self.execute(lambda: self.proc.cpu_affinity(affinity))
if not TRAVIS:
- self.execute_w_exc(ValueError, self.proc.cpu_affinity, [-1])
+ self.execute_w_exc(
+ ValueError, lambda: self.proc.cpu_affinity([-1]))
@skip_if_linux()
def test_open_files(self):
@@ -340,14 +234,14 @@ class TestProcessObjectLeaks(TestMemLeak):
@unittest.skipIf(not LINUX, "LINUX only")
@unittest.skipIf(not HAS_RLIMIT, "not supported")
def test_rlimit_get(self):
- self.execute(self.proc.rlimit, psutil.RLIMIT_NOFILE)
+ self.execute(lambda: self.proc.rlimit(psutil.RLIMIT_NOFILE))
@unittest.skipIf(not LINUX, "LINUX only")
@unittest.skipIf(not HAS_RLIMIT, "not supported")
def test_rlimit_set(self):
limit = thisproc.rlimit(psutil.RLIMIT_NOFILE)
- self.execute(self.proc.rlimit, psutil.RLIMIT_NOFILE, limit)
- self.execute_w_exc(OSError, self.proc.rlimit, -1)
+ self.execute(lambda: self.proc.rlimit(psutil.RLIMIT_NOFILE, limit))
+ self.execute_w_exc(OSError, lambda: self.proc.rlimit(-1))
@skip_if_linux()
# Windows implementation is based on a single system-wide
@@ -359,7 +253,7 @@ class TestProcessObjectLeaks(TestMemLeak):
# be executed.
with create_sockets():
kind = 'inet' if SUNOS else 'all'
- self.execute(self.proc.connections, kind)
+ self.execute(lambda: self.proc.connections(kind))
@unittest.skipIf(not HAS_ENVIRON, "not supported")
def test_environ(self):
@@ -371,13 +265,13 @@ class TestProcessObjectLeaks(TestMemLeak):
@unittest.skipIf(not WINDOWS, "WINDOWS only")
-class TestProcessDualImplementation(TestMemLeak):
+class TestProcessDualImplementation(TestMemoryLeak):
def test_cmdline_peb_true(self):
- self.execute(cext.proc_cmdline, os.getpid(), use_peb=True)
+ self.execute(lambda: cext.proc_cmdline(os.getpid(), use_peb=True))
def test_cmdline_peb_false(self):
- self.execute(cext.proc_cmdline, os.getpid(), use_peb=False)
+ self.execute(lambda: cext.proc_cmdline(os.getpid(), use_peb=False))
class TestTerminatedProcessLeaks(TestProcessObjectLeaks):
@@ -400,9 +294,9 @@ class TestTerminatedProcessLeaks(TestProcessObjectLeaks):
super(TestTerminatedProcessLeaks, cls).tearDownClass()
reap_children()
- def _call(self, fun, *args, **kwargs):
+ def _call(self, fun):
try:
- fun(*args, **kwargs)
+ fun()
except psutil.NoSuchProcess:
pass
@@ -439,7 +333,7 @@ class TestTerminatedProcessLeaks(TestProcessObjectLeaks):
# ===================================================================
-class TestModuleFunctionsLeaks(TestMemLeak):
+class TestModuleFunctionsLeaks(TestMemoryLeak):
"""Test leaks of psutil module functions."""
def test_coverage(self):
@@ -457,11 +351,11 @@ class TestModuleFunctionsLeaks(TestMemLeak):
@skip_if_linux()
def test_cpu_count_logical(self):
- self.execute(psutil.cpu_count, logical=True)
+ self.execute(lambda: psutil.cpu_count(logical=True))
@skip_if_linux()
def test_cpu_count_physical(self):
- self.execute(psutil.cpu_count, logical=False)
+ self.execute(lambda: psutil.cpu_count(logical=False))
@skip_if_linux()
def test_cpu_times(self):
@@ -469,7 +363,7 @@ class TestModuleFunctionsLeaks(TestMemLeak):
@skip_if_linux()
def test_per_cpu_times(self):
- self.execute(psutil.cpu_times, percpu=True)
+ self.execute(lambda: psutil.cpu_times(percpu=True))
@skip_if_linux()
def test_cpu_stats(self):
@@ -497,14 +391,14 @@ class TestModuleFunctionsLeaks(TestMemLeak):
@unittest.skipIf(POSIX and SKIP_PYTHON_IMPL,
"worthless on POSIX (pure python)")
def test_pid_exists(self):
- self.execute(psutil.pid_exists, os.getpid())
+ self.execute(lambda: psutil.pid_exists(os.getpid()))
# --- disk
@unittest.skipIf(POSIX and SKIP_PYTHON_IMPL,
"worthless on POSIX (pure python)")
def test_disk_usage(self):
- self.execute(psutil.disk_usage, '.')
+ self.execute(lambda: psutil.disk_usage('.'))
def test_disk_partitions(self):
self.execute(psutil.disk_partitions)
@@ -513,7 +407,7 @@ class TestModuleFunctionsLeaks(TestMemLeak):
'/proc/diskstats not available on this Linux version')
@skip_if_linux()
def test_disk_io_counters(self):
- self.execute(psutil.disk_io_counters, nowrap=False)
+ self.execute(lambda: psutil.disk_io_counters(nowrap=False))
# --- proc
@@ -528,7 +422,7 @@ class TestModuleFunctionsLeaks(TestMemLeak):
@skip_if_linux()
@unittest.skipIf(not HAS_NET_IO_COUNTERS, 'not supported')
def test_net_io_counters(self):
- self.execute(psutil.net_io_counters, nowrap=False)
+ self.execute(lambda: psutil.net_io_counters(nowrap=False))
@skip_if_linux()
@unittest.skipIf(MACOS and os.getuid() != 0, "need root access")
@@ -539,7 +433,7 @@ class TestModuleFunctionsLeaks(TestMemLeak):
def test_net_if_addrs(self):
# Note: verified that on Windows this was a false positive.
self.execute(psutil.net_if_addrs,
- tolerance_=80 * 1024 if WINDOWS else None)
+ tolerance=80 * 1024 if WINDOWS else 4096)
@unittest.skipIf(TRAVIS, "EPERM on travis")
def test_net_if_stats(self):
@@ -584,15 +478,15 @@ class TestModuleFunctionsLeaks(TestMemLeak):
def test_win_service_get_config(self):
name = next(psutil.win_service_iter()).name()
- self.execute(cext.winservice_query_config, name)
+ self.execute(lambda: cext.winservice_query_config(name))
def test_win_service_get_status(self):
name = next(psutil.win_service_iter()).name()
- self.execute(cext.winservice_query_status, name)
+ self.execute(lambda: cext.winservice_query_status(name))
def test_win_service_get_description(self):
name = next(psutil.win_service_iter()).name()
- self.execute(cext.winservice_query_descr, name)
+ self.execute(lambda: cext.winservice_query_descr(name))
if __name__ == '__main__':
diff --git a/psutil/tests/test_misc.py b/psutil/tests/test_misc.py
index c20cd941..ca0a0433 100755
--- a/psutil/tests/test_misc.py
+++ b/psutil/tests/test_misc.py
@@ -11,7 +11,6 @@ Miscellaneous tests.
import ast
import collections
-import contextlib
import errno
import json
import os
@@ -19,58 +18,33 @@ import pickle
import socket
import stat
-from psutil import FREEBSD
from psutil import LINUX
-from psutil import NETBSD
from psutil import POSIX
from psutil import WINDOWS
from psutil._common import memoize
from psutil._common import memoize_when_activated
from psutil._common import supports_ipv6
from psutil._common import wrap_numbers
-from psutil._common import open_text
-from psutil._common import open_binary
from psutil._compat import PY3
from psutil.tests import APPVEYOR
-from psutil.tests import bind_socket
-from psutil.tests import bind_unix_socket
-from psutil.tests import call_until
-from psutil.tests import chdir
from psutil.tests import CI_TESTING
-from psutil.tests import create_proc_children_pair
-from psutil.tests import create_sockets
-from psutil.tests import create_zombie_proc
from psutil.tests import DEVNULL
-from psutil.tests import get_free_port
-from psutil.tests import get_test_subprocess
from psutil.tests import HAS_BATTERY
-from psutil.tests import HAS_CONNECTIONS_UNIX
from psutil.tests import HAS_MEMORY_MAPS
from psutil.tests import HAS_NET_IO_COUNTERS
from psutil.tests import HAS_SENSORS_BATTERY
from psutil.tests import HAS_SENSORS_FANS
from psutil.tests import HAS_SENSORS_TEMPERATURES
from psutil.tests import import_module_by_path
-from psutil.tests import is_namedtuple
from psutil.tests import mock
from psutil.tests import PYTHON_EXE
-from psutil.tests import reap_children
from psutil.tests import reload_module
-from psutil.tests import retry
from psutil.tests import ROOT_DIR
-from psutil.tests import safe_mkdir
-from psutil.tests import safe_rmpath
from psutil.tests import SCRIPTS_DIR
from psutil.tests import sh
-from psutil.tests import tcp_socketpair
-from psutil.tests import TESTFN
from psutil.tests import TOX
from psutil.tests import TRAVIS
from psutil.tests import unittest
-from psutil.tests import unix_socket_path
-from psutil.tests import unix_socketpair
-from psutil.tests import wait_for_file
-from psutil.tests import wait_for_pid
import psutil
import psutil.tests
@@ -787,279 +761,6 @@ class TestScripts(unittest.TestCase):
self.assert_stdout('sensors.py')
-# ===================================================================
-# --- Unit tests for test utilities.
-# ===================================================================
-
-
-class TestRetryDecorator(unittest.TestCase):
-
- @mock.patch('time.sleep')
- def test_retry_success(self, sleep):
- # Fail 3 times out of 5; make sure the decorated fun returns.
-
- @retry(retries=5, interval=1, logfun=None)
- def foo():
- while queue:
- queue.pop()
- 1 / 0
- return 1
-
- queue = list(range(3))
- self.assertEqual(foo(), 1)
- self.assertEqual(sleep.call_count, 3)
-
- @mock.patch('time.sleep')
- def test_retry_failure(self, sleep):
- # Fail 6 times out of 5; th function is supposed to raise exc.
-
- @retry(retries=5, interval=1, logfun=None)
- def foo():
- while queue:
- queue.pop()
- 1 / 0
- return 1
-
- queue = list(range(6))
- self.assertRaises(ZeroDivisionError, foo)
- self.assertEqual(sleep.call_count, 5)
-
- @mock.patch('time.sleep')
- def test_exception_arg(self, sleep):
- @retry(exception=ValueError, interval=1)
- def foo():
- raise TypeError
-
- self.assertRaises(TypeError, foo)
- self.assertEqual(sleep.call_count, 0)
-
- @mock.patch('time.sleep')
- def test_no_interval_arg(self, sleep):
- # if interval is not specified sleep is not supposed to be called
-
- @retry(retries=5, interval=None, logfun=None)
- def foo():
- 1 / 0
-
- self.assertRaises(ZeroDivisionError, foo)
- self.assertEqual(sleep.call_count, 0)
-
- @mock.patch('time.sleep')
- def test_retries_arg(self, sleep):
-
- @retry(retries=5, interval=1, logfun=None)
- def foo():
- 1 / 0
-
- self.assertRaises(ZeroDivisionError, foo)
- self.assertEqual(sleep.call_count, 5)
-
- @mock.patch('time.sleep')
- def test_retries_and_timeout_args(self, sleep):
- self.assertRaises(ValueError, retry, retries=5, timeout=1)
-
-
-class TestSyncTestUtils(unittest.TestCase):
-
- def tearDown(self):
- safe_rmpath(TESTFN)
-
- def test_wait_for_pid(self):
- wait_for_pid(os.getpid())
- nopid = max(psutil.pids()) + 99999
- with mock.patch('psutil.tests.retry.__iter__', return_value=iter([0])):
- self.assertRaises(psutil.NoSuchProcess, wait_for_pid, nopid)
-
- def test_wait_for_file(self):
- with open(TESTFN, 'w') as f:
- f.write('foo')
- wait_for_file(TESTFN)
- assert not os.path.exists(TESTFN)
-
- def test_wait_for_file_empty(self):
- with open(TESTFN, 'w'):
- pass
- wait_for_file(TESTFN, empty=True)
- assert not os.path.exists(TESTFN)
-
- def test_wait_for_file_no_file(self):
- with mock.patch('psutil.tests.retry.__iter__', return_value=iter([0])):
- self.assertRaises(IOError, wait_for_file, TESTFN)
-
- def test_wait_for_file_no_delete(self):
- with open(TESTFN, 'w') as f:
- f.write('foo')
- wait_for_file(TESTFN, delete=False)
- assert os.path.exists(TESTFN)
-
- def test_call_until(self):
- ret = call_until(lambda: 1, "ret == 1")
- self.assertEqual(ret, 1)
-
-
-class TestFSTestUtils(unittest.TestCase):
-
- def setUp(self):
- safe_rmpath(TESTFN)
-
- tearDown = setUp
-
- def test_open_text(self):
- with open_text(__file__) as f:
- self.assertEqual(f.mode, 'rt')
-
- def test_open_binary(self):
- with open_binary(__file__) as f:
- self.assertEqual(f.mode, 'rb')
-
- def test_safe_mkdir(self):
- safe_mkdir(TESTFN)
- assert os.path.isdir(TESTFN)
- safe_mkdir(TESTFN)
- assert os.path.isdir(TESTFN)
-
- def test_safe_rmpath(self):
- # test file is removed
- open(TESTFN, 'w').close()
- safe_rmpath(TESTFN)
- assert not os.path.exists(TESTFN)
- # test no exception if path does not exist
- safe_rmpath(TESTFN)
- # test dir is removed
- os.mkdir(TESTFN)
- safe_rmpath(TESTFN)
- assert not os.path.exists(TESTFN)
- # test other exceptions are raised
- with mock.patch('psutil.tests.os.stat',
- side_effect=OSError(errno.EINVAL, "")) as m:
- with self.assertRaises(OSError):
- safe_rmpath(TESTFN)
- assert m.called
-
- def test_chdir(self):
- base = os.getcwd()
- os.mkdir(TESTFN)
- with chdir(TESTFN):
- self.assertEqual(os.getcwd(), os.path.join(base, TESTFN))
- self.assertEqual(os.getcwd(), base)
-
-
-class TestProcessUtils(unittest.TestCase):
-
- def test_reap_children(self):
- subp = get_test_subprocess()
- p = psutil.Process(subp.pid)
- assert p.is_running()
- reap_children()
- assert not p.is_running()
- assert not psutil.tests._pids_started
- assert not psutil.tests._subprocesses_started
-
- def test_create_proc_children_pair(self):
- p1, p2 = create_proc_children_pair()
- self.assertNotEqual(p1.pid, p2.pid)
- assert p1.is_running()
- assert p2.is_running()
- children = psutil.Process().children(recursive=True)
- self.assertEqual(len(children), 2)
- self.assertIn(p1, children)
- self.assertIn(p2, children)
- self.assertEqual(p1.ppid(), os.getpid())
- self.assertEqual(p2.ppid(), p1.pid)
-
- # make sure both of them are cleaned up
- reap_children()
- assert not p1.is_running()
- assert not p2.is_running()
- assert not psutil.tests._pids_started
- assert not psutil.tests._subprocesses_started
-
- @unittest.skipIf(not POSIX, "POSIX only")
- def test_create_zombie_proc(self):
- zpid = create_zombie_proc()
- self.addCleanup(reap_children, recursive=True)
- p = psutil.Process(zpid)
- self.assertEqual(p.status(), psutil.STATUS_ZOMBIE)
-
-
-class TestNetUtils(unittest.TestCase):
-
- def bind_socket(self):
- port = get_free_port()
- with contextlib.closing(bind_socket(addr=('', port))) as s:
- self.assertEqual(s.getsockname()[1], port)
-
- @unittest.skipIf(not POSIX, "POSIX only")
- def test_bind_unix_socket(self):
- with unix_socket_path() as name:
- sock = bind_unix_socket(name)
- with contextlib.closing(sock):
- self.assertEqual(sock.family, socket.AF_UNIX)
- self.assertEqual(sock.type, socket.SOCK_STREAM)
- self.assertEqual(sock.getsockname(), name)
- assert os.path.exists(name)
- assert stat.S_ISSOCK(os.stat(name).st_mode)
- # UDP
- with unix_socket_path() as name:
- sock = bind_unix_socket(name, type=socket.SOCK_DGRAM)
- with contextlib.closing(sock):
- self.assertEqual(sock.type, socket.SOCK_DGRAM)
-
- def tcp_tcp_socketpair(self):
- addr = ("127.0.0.1", get_free_port())
- server, client = tcp_socketpair(socket.AF_INET, addr=addr)
- with contextlib.closing(server):
- with contextlib.closing(client):
- # Ensure they are connected and the positions are
- # correct.
- self.assertEqual(server.getsockname(), addr)
- self.assertEqual(client.getpeername(), addr)
- self.assertNotEqual(client.getsockname(), addr)
-
- @unittest.skipIf(not POSIX, "POSIX only")
- @unittest.skipIf(NETBSD or FREEBSD,
- "/var/run/log UNIX socket opened by default")
- def test_unix_socketpair(self):
- p = psutil.Process()
- num_fds = p.num_fds()
- assert not p.connections(kind='unix')
- with unix_socket_path() as name:
- server, client = unix_socketpair(name)
- try:
- assert os.path.exists(name)
- assert stat.S_ISSOCK(os.stat(name).st_mode)
- self.assertEqual(p.num_fds() - num_fds, 2)
- self.assertEqual(len(p.connections(kind='unix')), 2)
- self.assertEqual(server.getsockname(), name)
- self.assertEqual(client.getpeername(), name)
- finally:
- client.close()
- server.close()
-
- def test_create_sockets(self):
- with create_sockets() as socks:
- fams = collections.defaultdict(int)
- types = collections.defaultdict(int)
- for s in socks:
- fams[s.family] += 1
- # work around http://bugs.python.org/issue30204
- types[s.getsockopt(socket.SOL_SOCKET, socket.SO_TYPE)] += 1
- self.assertGreaterEqual(fams[socket.AF_INET], 2)
- if supports_ipv6():
- self.assertGreaterEqual(fams[socket.AF_INET6], 2)
- if POSIX and HAS_CONNECTIONS_UNIX:
- self.assertGreaterEqual(fams[socket.AF_UNIX], 2)
- self.assertGreaterEqual(types[socket.SOCK_STREAM], 2)
- self.assertGreaterEqual(types[socket.SOCK_DGRAM], 2)
-
-
-class TestOtherUtils(unittest.TestCase):
-
- def test_is_namedtuple(self):
- assert is_namedtuple(collections.namedtuple('foo', 'a b c')(1, 2, 3))
- assert not is_namedtuple(tuple())
-
-
if __name__ == '__main__':
from psutil.tests.runner import run
run(__file__)
diff --git a/psutil/tests/test_testutils.py b/psutil/tests/test_testutils.py
new file mode 100755
index 00000000..7d3d6675
--- /dev/null
+++ b/psutil/tests/test_testutils.py
@@ -0,0 +1,400 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+
+# Copyright (c) 2009, Giampaolo Rodola'. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+"""
+Tests for testing utils (psutil.tests namespace).
+"""
+
+import collections
+import contextlib
+import errno
+import io
+import os
+import socket
+import stat
+
+from psutil import FREEBSD
+from psutil import NETBSD
+from psutil import POSIX
+from psutil._common import open_binary
+from psutil._common import open_text
+from psutil._common import supports_ipv6
+from psutil._compat import PY3
+from psutil._compat import redirect_stderr
+from psutil.tests import bind_socket
+from psutil.tests import bind_unix_socket
+from psutil.tests import call_until
+from psutil.tests import chdir
+from psutil.tests import create_proc_children_pair
+from psutil.tests import create_sockets
+from psutil.tests import create_zombie_proc
+from psutil.tests import get_free_port
+from psutil.tests import get_test_subprocess
+from psutil.tests import HAS_CONNECTIONS_UNIX
+from psutil.tests import is_namedtuple
+from psutil.tests import mock
+from psutil.tests import reap_children
+from psutil.tests import retry
+from psutil.tests import retry_on_failure
+from psutil.tests import safe_mkdir
+from psutil.tests import safe_rmpath
+from psutil.tests import tcp_socketpair
+from psutil.tests import TESTFN
+from psutil.tests import TestMemoryLeak
+from psutil.tests import unittest
+from psutil.tests import unix_socket_path
+from psutil.tests import unix_socketpair
+from psutil.tests import wait_for_file
+from psutil.tests import wait_for_pid
+import psutil
+import psutil.tests
+
+# ===================================================================
+# --- Unit tests for test utilities.
+# ===================================================================
+
+
+class TestRetryDecorator(unittest.TestCase):
+
+ @mock.patch('time.sleep')
+ def test_retry_success(self, sleep):
+ # Fail 3 times out of 5; make sure the decorated fun returns.
+
+ @retry(retries=5, interval=1, logfun=None)
+ def foo():
+ while queue:
+ queue.pop()
+ 1 / 0
+ return 1
+
+ queue = list(range(3))
+ self.assertEqual(foo(), 1)
+ self.assertEqual(sleep.call_count, 3)
+
+ @mock.patch('time.sleep')
+ def test_retry_failure(self, sleep):
+ # Fail 6 times out of 5; th function is supposed to raise exc.
+ @retry(retries=5, interval=1, logfun=None)
+ def foo():
+ while queue:
+ queue.pop()
+ 1 / 0
+ return 1
+
+ queue = list(range(6))
+ self.assertRaises(ZeroDivisionError, foo)
+ self.assertEqual(sleep.call_count, 5)
+
+ @mock.patch('time.sleep')
+ def test_exception_arg(self, sleep):
+ @retry(exception=ValueError, interval=1)
+ def foo():
+ raise TypeError
+
+ self.assertRaises(TypeError, foo)
+ self.assertEqual(sleep.call_count, 0)
+
+ @mock.patch('time.sleep')
+ def test_no_interval_arg(self, sleep):
+ # if interval is not specified sleep is not supposed to be called
+
+ @retry(retries=5, interval=None, logfun=None)
+ def foo():
+ 1 / 0
+
+ self.assertRaises(ZeroDivisionError, foo)
+ self.assertEqual(sleep.call_count, 0)
+
+ @mock.patch('time.sleep')
+ def test_retries_arg(self, sleep):
+
+ @retry(retries=5, interval=1, logfun=None)
+ def foo():
+ 1 / 0
+
+ self.assertRaises(ZeroDivisionError, foo)
+ self.assertEqual(sleep.call_count, 5)
+
+ @mock.patch('time.sleep')
+ def test_retries_and_timeout_args(self, sleep):
+ self.assertRaises(ValueError, retry, retries=5, timeout=1)
+
+
+class TestSyncTestUtils(unittest.TestCase):
+
+ def tearDown(self):
+ safe_rmpath(TESTFN)
+
+ def test_wait_for_pid(self):
+ wait_for_pid(os.getpid())
+ nopid = max(psutil.pids()) + 99999
+ with mock.patch('psutil.tests.retry.__iter__', return_value=iter([0])):
+ self.assertRaises(psutil.NoSuchProcess, wait_for_pid, nopid)
+
+ def test_wait_for_file(self):
+ with open(TESTFN, 'w') as f:
+ f.write('foo')
+ wait_for_file(TESTFN)
+ assert not os.path.exists(TESTFN)
+
+ def test_wait_for_file_empty(self):
+ with open(TESTFN, 'w'):
+ pass
+ wait_for_file(TESTFN, empty=True)
+ assert not os.path.exists(TESTFN)
+
+ def test_wait_for_file_no_file(self):
+ with mock.patch('psutil.tests.retry.__iter__', return_value=iter([0])):
+ self.assertRaises(IOError, wait_for_file, TESTFN)
+
+ def test_wait_for_file_no_delete(self):
+ with open(TESTFN, 'w') as f:
+ f.write('foo')
+ wait_for_file(TESTFN, delete=False)
+ assert os.path.exists(TESTFN)
+
+ def test_call_until(self):
+ ret = call_until(lambda: 1, "ret == 1")
+ self.assertEqual(ret, 1)
+
+
+class TestFSTestUtils(unittest.TestCase):
+
+ def setUp(self):
+ safe_rmpath(TESTFN)
+
+ tearDown = setUp
+
+ def test_open_text(self):
+ with open_text(__file__) as f:
+ self.assertEqual(f.mode, 'rt')
+
+ def test_open_binary(self):
+ with open_binary(__file__) as f:
+ self.assertEqual(f.mode, 'rb')
+
+ def test_safe_mkdir(self):
+ safe_mkdir(TESTFN)
+ assert os.path.isdir(TESTFN)
+ safe_mkdir(TESTFN)
+ assert os.path.isdir(TESTFN)
+
+ def test_safe_rmpath(self):
+ # test file is removed
+ open(TESTFN, 'w').close()
+ safe_rmpath(TESTFN)
+ assert not os.path.exists(TESTFN)
+ # test no exception if path does not exist
+ safe_rmpath(TESTFN)
+ # test dir is removed
+ os.mkdir(TESTFN)
+ safe_rmpath(TESTFN)
+ assert not os.path.exists(TESTFN)
+ # test other exceptions are raised
+ with mock.patch('psutil.tests.os.stat',
+ side_effect=OSError(errno.EINVAL, "")) as m:
+ with self.assertRaises(OSError):
+ safe_rmpath(TESTFN)
+ assert m.called
+
+ def test_chdir(self):
+ base = os.getcwd()
+ os.mkdir(TESTFN)
+ with chdir(TESTFN):
+ self.assertEqual(os.getcwd(), os.path.join(base, TESTFN))
+ self.assertEqual(os.getcwd(), base)
+
+
+class TestProcessUtils(unittest.TestCase):
+
+ def test_reap_children(self):
+ subp = get_test_subprocess()
+ p = psutil.Process(subp.pid)
+ assert p.is_running()
+ reap_children()
+ assert not p.is_running()
+ assert not psutil.tests._pids_started
+ assert not psutil.tests._subprocesses_started
+
+ def test_create_proc_children_pair(self):
+ p1, p2 = create_proc_children_pair()
+ self.assertNotEqual(p1.pid, p2.pid)
+ assert p1.is_running()
+ assert p2.is_running()
+ children = psutil.Process().children(recursive=True)
+ self.assertEqual(len(children), 2)
+ self.assertIn(p1, children)
+ self.assertIn(p2, children)
+ self.assertEqual(p1.ppid(), os.getpid())
+ self.assertEqual(p2.ppid(), p1.pid)
+
+ # make sure both of them are cleaned up
+ reap_children()
+ assert not p1.is_running()
+ assert not p2.is_running()
+ assert not psutil.tests._pids_started
+ assert not psutil.tests._subprocesses_started
+
+ @unittest.skipIf(not POSIX, "POSIX only")
+ def test_create_zombie_proc(self):
+ zpid = create_zombie_proc()
+ self.addCleanup(reap_children, recursive=True)
+ p = psutil.Process(zpid)
+ self.assertEqual(p.status(), psutil.STATUS_ZOMBIE)
+
+
+class TestNetUtils(unittest.TestCase):
+
+ def bind_socket(self):
+ port = get_free_port()
+ with contextlib.closing(bind_socket(addr=('', port))) as s:
+ self.assertEqual(s.getsockname()[1], port)
+
+ @unittest.skipIf(not POSIX, "POSIX only")
+ def test_bind_unix_socket(self):
+ with unix_socket_path() as name:
+ sock = bind_unix_socket(name)
+ with contextlib.closing(sock):
+ self.assertEqual(sock.family, socket.AF_UNIX)
+ self.assertEqual(sock.type, socket.SOCK_STREAM)
+ self.assertEqual(sock.getsockname(), name)
+ assert os.path.exists(name)
+ assert stat.S_ISSOCK(os.stat(name).st_mode)
+ # UDP
+ with unix_socket_path() as name:
+ sock = bind_unix_socket(name, type=socket.SOCK_DGRAM)
+ with contextlib.closing(sock):
+ self.assertEqual(sock.type, socket.SOCK_DGRAM)
+
+ def tcp_tcp_socketpair(self):
+ addr = ("127.0.0.1", get_free_port())
+ server, client = tcp_socketpair(socket.AF_INET, addr=addr)
+ with contextlib.closing(server):
+ with contextlib.closing(client):
+ # Ensure they are connected and the positions are
+ # correct.
+ self.assertEqual(server.getsockname(), addr)
+ self.assertEqual(client.getpeername(), addr)
+ self.assertNotEqual(client.getsockname(), addr)
+
+ @unittest.skipIf(not POSIX, "POSIX only")
+ @unittest.skipIf(NETBSD or FREEBSD,
+ "/var/run/log UNIX socket opened by default")
+ def test_unix_socketpair(self):
+ p = psutil.Process()
+ num_fds = p.num_fds()
+ assert not p.connections(kind='unix')
+ with unix_socket_path() as name:
+ server, client = unix_socketpair(name)
+ try:
+ assert os.path.exists(name)
+ assert stat.S_ISSOCK(os.stat(name).st_mode)
+ self.assertEqual(p.num_fds() - num_fds, 2)
+ self.assertEqual(len(p.connections(kind='unix')), 2)
+ self.assertEqual(server.getsockname(), name)
+ self.assertEqual(client.getpeername(), name)
+ finally:
+ client.close()
+ server.close()
+
+ def test_create_sockets(self):
+ with create_sockets() as socks:
+ fams = collections.defaultdict(int)
+ types = collections.defaultdict(int)
+ for s in socks:
+ fams[s.family] += 1
+ # work around http://bugs.python.org/issue30204
+ types[s.getsockopt(socket.SOL_SOCKET, socket.SO_TYPE)] += 1
+ self.assertGreaterEqual(fams[socket.AF_INET], 2)
+ if supports_ipv6():
+ self.assertGreaterEqual(fams[socket.AF_INET6], 2)
+ if POSIX and HAS_CONNECTIONS_UNIX:
+ self.assertGreaterEqual(fams[socket.AF_UNIX], 2)
+ self.assertGreaterEqual(types[socket.SOCK_STREAM], 2)
+ self.assertGreaterEqual(types[socket.SOCK_DGRAM], 2)
+
+
+class TestMemLeakClass(TestMemoryLeak):
+
+ def test_times(self):
+ def fun():
+ cnt['cnt'] += 1
+ cnt = {'cnt': 0}
+ self.execute(fun, times=1, warmup_times=10)
+ self.assertEqual(cnt['cnt'], 11)
+ self.execute(fun, times=10, warmup_times=10)
+ self.assertEqual(cnt['cnt'], 31)
+
+ def test_warmup_times(self):
+ def fun():
+ cnt['cnt'] += 1
+ cnt = {'cnt': 0}
+ self.execute(fun, times=1, warmup_times=10)
+ self.assertEqual(cnt['cnt'], 11)
+
+ def test_param_err(self):
+ self.assertRaises(ValueError, self.execute, lambda: 0, times=0)
+ self.assertRaises(ValueError, self.execute, lambda: 0, times=-1)
+ self.assertRaises(ValueError, self.execute, lambda: 0, warmup_times=-1)
+ self.assertRaises(ValueError, self.execute, lambda: 0, tolerance=-1)
+ self.assertRaises(ValueError, self.execute, lambda: 0, retry_for=-1)
+
+ def test_leak(self):
+ def fun():
+ ls.append("x" * 24 * 1024)
+ ls = []
+ times = 100
+ self.assertRaises(AssertionError, self.execute, fun, times=times,
+ warmup_times=10, retry_for=None)
+ self.assertEqual(len(ls), times + 10)
+
+ @retry_on_failure(retries=20) # 2 secs
+ def test_leak_with_retry(self, ls=[]):
+ def fun():
+ ls.append("x" * 24 * 1024)
+ times = 100
+ f = io.StringIO() if PY3 else io.BytesIO()
+ with redirect_stderr(f):
+ self.assertRaises(AssertionError, self.execute, fun, times=times,
+ retry_for=0.1)
+ self.assertIn("try calling fun for another", f.getvalue())
+ self.assertGreater(len(ls), times)
+
+ def test_tolerance(self):
+ def fun():
+ ls.append("x" * 24 * 1024)
+ ls = []
+ times = 100
+ self.execute(fun, times=times, warmup_times=0,
+ tolerance=200 * 1024 * 1024)
+ self.assertEqual(len(ls), times)
+
+ def test_execute_w_exc(self):
+ def fun():
+ 1 / 0
+ self.execute_w_exc(ZeroDivisionError, fun, times=2000, warmup_times=20,
+ tolerance=4096, retry_for=3)
+
+ with self.assertRaises(ZeroDivisionError):
+ self.execute_w_exc(OSError, fun)
+
+ def fun():
+ pass
+ with self.assertRaises(AssertionError):
+ self.execute_w_exc(ZeroDivisionError, fun)
+
+
+class TestOtherUtils(unittest.TestCase):
+
+ def test_is_namedtuple(self):
+ assert is_namedtuple(collections.namedtuple('foo', 'a b c')(1, 2, 3))
+ assert not is_namedtuple(tuple())
+
+
+if __name__ == '__main__':
+ from psutil.tests.runner import run
+ run(__file__)
diff --git a/scripts/internal/check_broken_links.py b/scripts/internal/check_broken_links.py
index 1a076116..e66448fd 100755
--- a/scripts/internal/check_broken_links.py
+++ b/scripts/internal/check_broken_links.py
@@ -40,7 +40,6 @@ Author: Himanshu Shekhar <https://github.com/himanshub16> (2017)
"""
from __future__ import print_function
-
import concurrent.futures
import functools
import os
diff --git a/scripts/sensors.py b/scripts/sensors.py
index 726af3d2..e532beba 100755
--- a/scripts/sensors.py
+++ b/scripts/sensors.py
@@ -30,7 +30,6 @@ Battery:
"""
from __future__ import print_function
-
import psutil