summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGiampaolo Rodola <g.rodola@gmail.com>2016-10-28 02:14:37 +0200
committerGiampaolo Rodola <g.rodola@gmail.com>2016-10-28 02:14:37 +0200
commit7818a81c9e44ec9b4d102f7157c7f437a4b22cc0 (patch)
tree38fb060282fbffc3e147762d1eb2f206481d39b0
parentc37304538285289c6f2917c9ddbe705209b225c4 (diff)
parenta63974f88f2709f4aa5ad07bac10467f49d244df (diff)
downloadpsutil-7818a81c9e44ec9b4d102f7157c7f437a4b22cc0.tar.gz
Merge branch 'oneshot' into oneshot-win
-rw-r--r--psutil/_psutil_windows.c127
-rw-r--r--psutil/_pswindows.py67
-rwxr-xr-xpsutil/tests/test_windows.py415
3 files changed, 356 insertions, 253 deletions
diff --git a/psutil/_psutil_windows.c b/psutil/_psutil_windows.c
index 7464bbf9..af658bb0 100644
--- a/psutil/_psutil_windows.c
+++ b/psutil/_psutil_windows.c
@@ -766,57 +766,6 @@ psutil_proc_memory_info(PyObject *self, PyObject *args) {
}
-/*
- * Alternative implementation of the one above but bypasses ACCESS DENIED.
- */
-static PyObject *
-psutil_proc_memory_info_2(PyObject *self, PyObject *args) {
- DWORD pid;
- PSYSTEM_PROCESS_INFORMATION process;
- PVOID buffer;
- SIZE_T private;
- unsigned long pfault_count;
-
-#if defined(_WIN64)
- unsigned long long m1, m2, m3, m4, m5, m6, m7, m8;
-#else
- unsigned int m1, m2, m3, m4, m5, m6, m7, m8;
-#endif
-
- if (! PyArg_ParseTuple(args, "l", &pid))
- return NULL;
- if (! psutil_get_proc_info(pid, &process, &buffer))
- return NULL;
-
-#if (_WIN32_WINNT >= 0x0501) // Windows XP with SP2
- private = process->PrivatePageCount;
-#else
- private = 0;
-#endif
- pfault_count = process->PageFaultCount;
-
- m1 = process->PeakWorkingSetSize;
- m2 = process->WorkingSetSize;
- m3 = process->QuotaPeakPagedPoolUsage;
- m4 = process->QuotaPagedPoolUsage;
- m5 = process->QuotaPeakNonPagedPoolUsage;
- m6 = process->QuotaNonPagedPoolUsage;
- m7 = process->PagefileUsage;
- m8 = process->PeakPagefileUsage;
-
- free(buffer);
-
- // SYSTEM_PROCESS_INFORMATION values are defined as SIZE_T which on 64
- // bits is an (unsigned long long) and on 32bits is an (unsigned int).
- // "_WIN64" is defined if we're running a 64bit Python interpreter not
- // exclusively if the *system* is 64bit.
-#if defined(_WIN64)
- return Py_BuildValue("(kKKKKKKKKK)",
-#else
- return Py_BuildValue("(kIIIIIIIII)",
-#endif
- pfault_count, m1, m2, m3, m4, m5, m6, m7, m8, private);
-}
/**
* Returns the USS of the process.
@@ -2742,34 +2691,32 @@ psutil_proc_num_handles(PyObject *self, PyObject *args) {
* denied. This is slower because it iterates over all processes.
* Returned tuple includes the following process info:
*
- * - num_threads
- * - ctx_switches
- * - num_handles (fallback)
- * - user/kernel times (fallback)
- * - create time (fallback)
- * - io counters (fallback)
+ * - num_threads()
+ * - ctx_switches()
+ * - num_handles() (fallback)
+ * - cpu_times() (fallback)
+ * - create_time() (fallback)
+ * - io_counters() (fallback)
+ * - memory_info() (fallback)
*/
static PyObject *
psutil_proc_info(PyObject *self, PyObject *args) {
DWORD pid;
PSYSTEM_PROCESS_INFORMATION process;
PVOID buffer;
- ULONG num_handles;
ULONG i;
ULONG ctx_switches = 0;
double user_time;
double kernel_time;
long long create_time;
- int num_threads;
- LONGLONG io_rcount, io_wcount, io_rbytes, io_wbytes;
-
+ SIZE_T mem_private;
+ PyObject *py_retlist;
if (! PyArg_ParseTuple(args, "l", &pid))
return NULL;
if (! psutil_get_proc_info(pid, &process, &buffer))
return NULL;
- num_handles = process->HandleCount;
for (i = 0; i < process->NumberOfThreads; i++)
ctx_switches += process->Threads[i].ContextSwitches;
user_time = (double)process->UserTime.HighPart * 429.4967296 + \
@@ -2788,26 +2735,44 @@ psutil_proc_info(PyObject *self, PyObject *args) {
create_time += process->CreateTime.LowPart - 116444736000000000LL;
create_time /= 10000000;
}
- num_threads = (int)process->NumberOfThreads;
- io_rcount = process->ReadOperationCount.QuadPart;
- io_wcount = process->WriteOperationCount.QuadPart;
- io_rbytes = process->ReadTransferCount.QuadPart;
- io_wbytes = process->WriteTransferCount.QuadPart;
- free(buffer);
- return Py_BuildValue(
- "kkdddiKKKK",
- num_handles,
- ctx_switches,
- user_time,
- kernel_time,
- (double)create_time,
- num_threads,
- io_rcount,
- io_wcount,
- io_rbytes,
- io_wbytes
+#if (_WIN32_WINNT >= 0x0501) // Windows XP with SP2
+ mem_private = process->PrivatePageCount;
+#else
+ mem_private = 0;
+#endif
+
+ py_retlist = Py_BuildValue(
+#if defined(_WIN64)
+ "kkdddiKKKK" "kKKKKKKKKK",
+#else
+ "kkdddiKKKK" "kIIIIIIIII",
+#endif
+ process->HandleCount, // num handles
+ ctx_switches, // num ctx switches
+ user_time, // cpu user time
+ kernel_time, // cpu kernel time
+ (double)create_time, // create time
+ (int)process->NumberOfThreads, // num threads
+ process->ReadOperationCount.QuadPart, // io rcount
+ process->WriteOperationCount.QuadPart, // io wcount
+ process->ReadTransferCount.QuadPart, // io rbytes
+ process->WriteTransferCount.QuadPart, // io wbytes
+ // memory
+ process->PageFaultCount, // num page faults
+ process->PeakWorkingSetSize, // peak wset
+ process->WorkingSetSize, // wset
+ process->QuotaPeakPagedPoolUsage, // peak paged pool
+ process->QuotaPagedPoolUsage, // paged pool
+ process->QuotaPeakNonPagedPoolUsage, // peak non paged pool
+ process->QuotaNonPagedPoolUsage, // non paged pool
+ process->PagefileUsage, // pagefile
+ process->PeakPagefileUsage, // peak pagefile
+ mem_private // private
);
+
+ free(buffer);
+ return py_retlist;
}
@@ -3420,8 +3385,6 @@ PsutilMethods[] = {
"seconds since the epoch"},
{"proc_memory_info", psutil_proc_memory_info, METH_VARARGS,
"Return a tuple of process memory information"},
- {"proc_memory_info_2", psutil_proc_memory_info_2, METH_VARARGS,
- "Alternate implementation"},
{"proc_memory_uss", psutil_proc_memory_uss, METH_VARARGS,
"Return the USS of the process"},
{"proc_cwd", psutil_proc_cwd, METH_VARARGS,
diff --git a/psutil/_pswindows.py b/psutil/_pswindows.py
index 2272c484..df34ee09 100644
--- a/psutil/_pswindows.py
+++ b/psutil/_pswindows.py
@@ -90,6 +90,29 @@ if enum is not None:
globals().update(Priority.__members__)
+pinfo_map = dict(
+ num_handles=0,
+ ctx_switches=1,
+ user_time=2,
+ kernel_time=3,
+ create_time=4,
+ num_threads=5,
+ io_rcount=6,
+ io_wcount=7,
+ io_rbytes=8,
+ io_wbytes=9,
+ num_page_faults=10,
+ peak_wset=11,
+ wset=12,
+ peak_paged_pool=13,
+ paged_pool=14,
+ peak_non_paged_pool=15,
+ non_paged_pool=16,
+ pagefile=17,
+ peak_pagefile=18,
+ mem_private=19,
+)
+
# =====================================================================
# --- named tuples
@@ -589,6 +612,14 @@ class Process(object):
if not self._inctx:
cext.win32_CloseHandle(handle)
+ def oneshot_info(self):
+ """Return multiple information about this process as a
+ raw tuple.
+ """
+ ret = cext.proc_info(self.pid)
+ assert len(ret) == len(pinfo_map)
+ return ret
+
@wrap_exceptions
def name(self):
"""Return process name, which on Windows is always the final
@@ -646,7 +677,19 @@ class Process(object):
if err.errno in ACCESS_DENIED_SET:
# TODO: the C ext can probably be refactored in order
# to get this from cext.proc_info()
- return cext.proc_memory_info_2(self.pid)
+ info = self.oneshot_info()
+ return (
+ info[pinfo_map['num_page_faults']],
+ info[pinfo_map['peak_wset']],
+ info[pinfo_map['wset']],
+ info[pinfo_map['peak_paged_pool']],
+ info[pinfo_map['paged_pool']],
+ info[pinfo_map['peak_non_paged_pool']],
+ info[pinfo_map['non_paged_pool']],
+ info[pinfo_map['pagefile']],
+ info[pinfo_map['peak_pagefile']],
+ info[pinfo_map['mem_private']],
+ )
raise
@wrap_exceptions
@@ -717,12 +760,12 @@ class Process(object):
return cext.proc_create_time(self.pid)
except OSError as err:
if err.errno in ACCESS_DENIED_SET:
- return ntpinfo(*cext.proc_info(self.pid)).create_time
+ return self.oneshot_info()[pinfo_map['create_time']]
raise
@wrap_exceptions
def num_threads(self):
- return ntpinfo(*cext.proc_info(self.pid)).num_threads
+ return self.oneshot_info()[pinfo_map['num_threads']]
@wrap_exceptions
def threads(self):
@@ -740,8 +783,9 @@ class Process(object):
user, system = cext.proc_cpu_times(self.pid, handle)
except OSError as err:
if err.errno in ACCESS_DENIED_SET:
- nt = ntpinfo(*cext.proc_info(self.pid))
- user, system = (nt.user_time, nt.kernel_time)
+ info = self.oneshot_info()
+ user = info[pinfo_map['user_time']]
+ system = info[pinfo_map['kernel_time']]
else:
raise
# Children user/system times are not retrievable (set to 0).
@@ -823,8 +867,13 @@ class Process(object):
ret = cext.proc_io_counters(self.pid, handle)
except OSError as err:
if err.errno in ACCESS_DENIED_SET:
- nt = ntpinfo(*cext.proc_info(self.pid))
- ret = (nt.io_rcount, nt.io_wcount, nt.io_rbytes, nt.io_wbytes)
+ info = self.oneshot_info()
+ ret = (
+ info[pinfo_map['io_rcount']],
+ info[pinfo_map['io_wcount']],
+ info[pinfo_map['io_rbytes']],
+ info[pinfo_map['io_wbytes']],
+ )
else:
raise
return _common.pio(*ret)
@@ -877,11 +926,11 @@ class Process(object):
return cext.proc_num_handles(self.pid, handle)
except OSError as err:
if err.errno in ACCESS_DENIED_SET:
- return ntpinfo(*cext.proc_info(self.pid)).num_handles
+ return self.oneshot_info()[pinfo_map['num_handles']]
raise
@wrap_exceptions
def num_ctx_switches(self):
- ctx_switches = ntpinfo(*cext.proc_info(self.pid)).ctx_switches
+ ctx_switches = self.oneshot_info()[pinfo_map['ctx_switches']]
# only voluntary ctx switches are supported
return _common.pctxsw(ctx_switches, 0)
diff --git a/psutil/tests/test_windows.py b/psutil/tests/test_windows.py
index 86910a27..40a84086 100755
--- a/psutil/tests/test_windows.py
+++ b/psutil/tests/test_windows.py
@@ -60,41 +60,13 @@ def wrap_exceptions(fun):
return wrapper
-@unittest.skipUnless(WINDOWS, "WINDOWS only")
-class WindowsSpecificTestCase(unittest.TestCase):
-
- @classmethod
- def setUpClass(cls):
- cls.pid = get_test_subprocess().pid
-
- @classmethod
- def tearDownClass(cls):
- reap_children()
+# ===================================================================
+# System APIs
+# ===================================================================
- def test_issue_24(self):
- p = psutil.Process(0)
- self.assertRaises(psutil.AccessDenied, p.kill)
- def test_special_pid(self):
- p = psutil.Process(4)
- self.assertEqual(p.name(), 'System')
- # use __str__ to access all common Process properties to check
- # that nothing strange happens
- str(p)
- p.username()
- self.assertTrue(p.create_time() >= 0.0)
- try:
- rss, vms = p.memory_info()[:2]
- except psutil.AccessDenied:
- # expected on Windows Vista and Windows 7
- if not platform.uname()[1] in ('vista', 'win-7', 'win7'):
- raise
- else:
- self.assertTrue(rss > 0)
-
- def test_send_signal(self):
- p = psutil.Process(self.pid)
- self.assertRaises(ValueError, p.send_signal, signal.SIGINT)
+@unittest.skipUnless(WINDOWS, "WINDOWS only")
+class TestSystemAPIs(unittest.TestCase):
def test_nic_names(self):
p = subprocess.Popen(['ipconfig', '/all'], stdout=subprocess.PIPE)
@@ -109,70 +81,6 @@ class WindowsSpecificTestCase(unittest.TestCase):
self.fail(
"%r nic wasn't found in 'ipconfig /all' output" % nic)
- def test_exe(self):
- for p in psutil.process_iter():
- try:
- self.assertEqual(os.path.basename(p.exe()), p.name())
- except psutil.Error:
- pass
-
- # --- Process class tests
-
- def test_process_name(self):
- w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
- p = psutil.Process(self.pid)
- self.assertEqual(p.name(), w.Caption)
-
- def test_process_exe(self):
- w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
- p = psutil.Process(self.pid)
- # Note: wmi reports the exe as a lower case string.
- # Being Windows paths case-insensitive we ignore that.
- self.assertEqual(p.exe().lower(), w.ExecutablePath.lower())
-
- def test_process_cmdline(self):
- w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
- p = psutil.Process(self.pid)
- self.assertEqual(' '.join(p.cmdline()),
- w.CommandLine.replace('"', ''))
-
- def test_process_username(self):
- w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
- p = psutil.Process(self.pid)
- domain, _, username = w.GetOwner()
- username = "%s\\%s" % (domain, username)
- self.assertEqual(p.username(), username)
-
- def test_process_rss_memory(self):
- time.sleep(0.1)
- w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
- p = psutil.Process(self.pid)
- rss = p.memory_info().rss
- self.assertEqual(rss, int(w.WorkingSetSize))
-
- def test_process_vms_memory(self):
- time.sleep(0.1)
- w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
- p = psutil.Process(self.pid)
- vms = p.memory_info().vms
- # http://msdn.microsoft.com/en-us/library/aa394372(VS.85).aspx
- # ...claims that PageFileUsage is represented in Kilo
- # bytes but funnily enough on certain platforms bytes are
- # returned instead.
- wmi_usage = int(w.PageFileUsage)
- if (vms != wmi_usage) and (vms != wmi_usage * 1024):
- self.fail("wmi=%s, psutil=%s" % (wmi_usage, vms))
-
- def test_process_create_time(self):
- w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
- p = psutil.Process(self.pid)
- wmic_create = str(w.CreationDate.split('.')[0])
- psutil_create = time.strftime("%Y%m%d%H%M%S",
- time.localtime(p.create_time()))
- self.assertEqual(wmic_create, psutil_create)
-
- # --- psutil namespace functions and constants tests
-
@unittest.skipUnless('NUMBER_OF_PROCESSORS' in os.environ,
'NUMBER_OF_PROCESSORS env var is not available')
def test_cpu_count(self):
@@ -194,10 +102,10 @@ class WindowsSpecificTestCase(unittest.TestCase):
# wmic_create = str(w.CreationDate.split('.')[0])
# psutil_create = time.strftime("%Y%m%d%H%M%S",
# time.localtime(p.create_time()))
- #
# Note: this test is not very reliable
@unittest.skipIf(APPVEYOR, "test not relieable on appveyor")
+ @retry_before_failing()
def test_pids(self):
# Note: this test might fail if the OS is starting/killing
# other processes in the meantime
@@ -235,6 +143,65 @@ class WindowsSpecificTestCase(unittest.TestCase):
else:
self.fail("can't find partition %s" % repr(ps_part))
+ def test_net_if_stats(self):
+ ps_names = set(cext.net_if_stats())
+ wmi_adapters = wmi.WMI().Win32_NetworkAdapter()
+ wmi_names = set()
+ for wmi_adapter in wmi_adapters:
+ wmi_names.add(wmi_adapter.Name)
+ wmi_names.add(wmi_adapter.NetConnectionID)
+ self.assertTrue(ps_names & wmi_names,
+ "no common entries in %s, %s" % (ps_names, wmi_names))
+
+
+# ===================================================================
+# Process APIs
+# ===================================================================
+
+
+@unittest.skipUnless(WINDOWS, "WINDOWS only")
+class TestProcess(unittest.TestCase):
+
+ @classmethod
+ def setUpClass(cls):
+ cls.pid = get_test_subprocess().pid
+
+ @classmethod
+ def tearDownClass(cls):
+ reap_children()
+
+ def test_issue_24(self):
+ p = psutil.Process(0)
+ self.assertRaises(psutil.AccessDenied, p.kill)
+
+ def test_special_pid(self):
+ p = psutil.Process(4)
+ self.assertEqual(p.name(), 'System')
+ # use __str__ to access all common Process properties to check
+ # that nothing strange happens
+ str(p)
+ p.username()
+ self.assertTrue(p.create_time() >= 0.0)
+ try:
+ rss, vms = p.memory_info()[:2]
+ except psutil.AccessDenied:
+ # expected on Windows Vista and Windows 7
+ if not platform.uname()[1] in ('vista', 'win-7', 'win7'):
+ raise
+ else:
+ self.assertTrue(rss > 0)
+
+ def test_send_signal(self):
+ p = psutil.Process(self.pid)
+ self.assertRaises(ValueError, p.send_signal, signal.SIGINT)
+
+ def test_exe(self):
+ for p in psutil.process_iter():
+ try:
+ self.assertEqual(os.path.basename(p.exe()), p.name())
+ except psutil.Error:
+ pass
+
def test_num_handles(self):
p = psutil.Process(os.getpid())
before = p.num_handles()
@@ -245,9 +212,10 @@ class WindowsSpecificTestCase(unittest.TestCase):
win32api.CloseHandle(handle)
self.assertEqual(p.num_handles(), before)
- def test_num_handles_2(self):
- # Note: this fails from time to time; I'm keen on thinking
- # it doesn't mean something is broken
+ def test_handles_leak(self):
+ # Call all Process methods and make sure no handles are left
+ # open. This is here mainly to make sure functions using
+ # OpenProcess() always call CloseHandle().
def call(p, attr):
attr = getattr(p, name, None)
if attr is not None and callable(attr):
@@ -259,9 +227,9 @@ class WindowsSpecificTestCase(unittest.TestCase):
failures = []
for name in dir(psutil.Process):
if name.startswith('_') \
- or name in ('terminate', 'kill', 'suspend', 'resume',
- 'nice', 'send_signal', 'wait', 'children',
- 'as_dict'):
+ or name in ('terminate', 'kill', 'suspend', 'resume',
+ 'nice', 'send_signal', 'wait', 'children',
+ 'as_dict'):
continue
else:
try:
@@ -302,15 +270,81 @@ class WindowsSpecificTestCase(unittest.TestCase):
self.assertRaises(psutil.NoSuchProcess,
p.send_signal, signal.CTRL_BREAK_EVENT)
- def test_net_if_stats(self):
- ps_names = set(cext.net_if_stats())
- wmi_adapters = wmi.WMI().Win32_NetworkAdapter()
- wmi_names = set()
- for wmi_adapter in wmi_adapters:
- wmi_names.add(wmi_adapter.Name)
- wmi_names.add(wmi_adapter.NetConnectionID)
- self.assertTrue(ps_names & wmi_names,
- "no common entries in %s, %s" % (ps_names, wmi_names))
+ def test_compare_name_exe(self):
+ for p in psutil.process_iter():
+ try:
+ a = os.path.basename(p.exe())
+ b = p.name()
+ except (psutil.NoSuchProcess, psutil.AccessDenied):
+ pass
+ else:
+ self.assertEqual(a, b)
+
+
+@unittest.skipUnless(WINDOWS, "WINDOWS only")
+class TestProcessWMI(unittest.TestCase):
+ """Compare Process API results with WMI."""
+
+ @classmethod
+ def setUpClass(cls):
+ cls.pid = get_test_subprocess().pid
+
+ @classmethod
+ def tearDownClass(cls):
+ reap_children()
+
+ def test_name(self):
+ w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
+ p = psutil.Process(self.pid)
+ self.assertEqual(p.name(), w.Caption)
+
+ def test_exe(self):
+ w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
+ p = psutil.Process(self.pid)
+ # Note: wmi reports the exe as a lower case string.
+ # Being Windows paths case-insensitive we ignore that.
+ self.assertEqual(p.exe().lower(), w.ExecutablePath.lower())
+
+ def test_cmdline(self):
+ w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
+ p = psutil.Process(self.pid)
+ self.assertEqual(' '.join(p.cmdline()),
+ w.CommandLine.replace('"', ''))
+
+ def test_username(self):
+ w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
+ p = psutil.Process(self.pid)
+ domain, _, username = w.GetOwner()
+ username = "%s\\%s" % (domain, username)
+ self.assertEqual(p.username(), username)
+
+ def test_memory_rss(self):
+ time.sleep(0.1)
+ w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
+ p = psutil.Process(self.pid)
+ rss = p.memory_info().rss
+ self.assertEqual(rss, int(w.WorkingSetSize))
+
+ def test_memory_vms(self):
+ time.sleep(0.1)
+ w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
+ p = psutil.Process(self.pid)
+ vms = p.memory_info().vms
+ # http://msdn.microsoft.com/en-us/library/aa394372(VS.85).aspx
+ # ...claims that PageFileUsage is represented in Kilo
+ # bytes but funnily enough on certain platforms bytes are
+ # returned instead.
+ wmi_usage = int(w.PageFileUsage)
+ if (vms != wmi_usage) and (vms != wmi_usage * 1024):
+ self.fail("wmi=%s, psutil=%s" % (wmi_usage, vms))
+
+ def test_create_time(self):
+ w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
+ p = psutil.Process(self.pid)
+ wmic_create = str(w.CreationDate.split('.')[0])
+ psutil_create = time.strftime("%Y%m%d%H%M%S",
+ time.localtime(p.create_time()))
+ self.assertEqual(wmic_create, psutil_create)
@unittest.skipUnless(WINDOWS, "WINDOWS only")
@@ -334,9 +368,19 @@ class TestDualProcessImplementation(unittest.TestCase):
('proc_io_counters', 0),
]
- def test_compare_values(self):
+ @classmethod
+ def setUpClass(cls):
+ cls.pid = get_test_subprocess().pid
+
+ @classmethod
+ def tearDownClass(cls):
+ reap_children()
+
+ def test_all_procs(self):
+ from psutil._pswindows import pinfo_map
+
def assert_ge_0(obj):
- if isinstance(obj, tuple):
+ if isinstance(obj, (tuple, list)):
for value in obj:
self.assertGreaterEqual(value, 0, msg=obj)
elif isinstance(obj, (int, long, float)):
@@ -356,14 +400,13 @@ class TestDualProcessImplementation(unittest.TestCase):
diff = abs(a - b)
self.assertLessEqual(diff, tolerance)
- from psutil._pswindows import ntpinfo
failures = []
for p in psutil.process_iter():
try:
- nt = ntpinfo(*cext.proc_info(p.pid))
+ raw_info = cext.proc_info(p.pid)
except psutil.NoSuchProcess:
continue
- assert_ge_0(nt)
+ assert_ge_0(raw_info)
for name, tolerance in self.fun_names:
if name == 'proc_memory_info' and p.pid == os.getpid():
@@ -378,28 +421,66 @@ class TestDualProcessImplementation(unittest.TestCase):
# compare values
try:
if name == 'proc_cpu_times':
- compare_with_tolerance(ret[0], nt.user_time, tolerance)
- compare_with_tolerance(ret[1],
- nt.kernel_time, tolerance)
+ compare_with_tolerance(
+ ret[0], raw_info[pinfo_map['user_time']],
+ tolerance)
+ compare_with_tolerance(
+ ret[1], raw_info[pinfo_map['kernel_time']],
+ tolerance)
elif name == 'proc_create_time':
- compare_with_tolerance(ret, nt.create_time, tolerance)
+ compare_with_tolerance(
+ ret, raw_info[pinfo_map['create_time']], tolerance)
elif name == 'proc_num_handles':
- compare_with_tolerance(ret, nt.num_handles, tolerance)
+ compare_with_tolerance(
+ ret, raw_info[pinfo_map['num_handles']], tolerance)
elif name == 'proc_io_counters':
- compare_with_tolerance(ret[0], nt.io_rcount, tolerance)
- compare_with_tolerance(ret[1], nt.io_wcount, tolerance)
- compare_with_tolerance(ret[2], nt.io_rbytes, tolerance)
- compare_with_tolerance(ret[3], nt.io_wbytes, tolerance)
+ compare_with_tolerance(
+ ret[0], raw_info[pinfo_map['io_rcount']],
+ tolerance)
+ compare_with_tolerance(
+ ret[1], raw_info[pinfo_map['io_wcount']],
+ tolerance)
+ compare_with_tolerance(
+ ret[2], raw_info[pinfo_map['io_rbytes']],
+ tolerance)
+ compare_with_tolerance(
+ ret[3], raw_info[pinfo_map['io_wbytes']],
+ tolerance)
elif name == 'proc_memory_info':
- try:
- rawtupl = cext.proc_memory_info_2(p.pid)
- except psutil.NoSuchProcess:
- continue
- compare_with_tolerance(ret, rawtupl, tolerance)
+ compare_with_tolerance(
+ ret[0], raw_info[pinfo_map['num_page_faults']],
+ tolerance)
+ compare_with_tolerance(
+ ret[1], raw_info[pinfo_map['peak_wset']],
+ tolerance)
+ compare_with_tolerance(
+ ret[2], raw_info[pinfo_map['wset']],
+ tolerance)
+ compare_with_tolerance(
+ ret[3], raw_info[pinfo_map['peak_paged_pool']],
+ tolerance)
+ compare_with_tolerance(
+ ret[4], raw_info[pinfo_map['paged_pool']],
+ tolerance)
+ compare_with_tolerance(
+ ret[5], raw_info[pinfo_map['peak_non_paged_pool']],
+ tolerance)
+ compare_with_tolerance(
+ ret[6], raw_info[pinfo_map['non_paged_pool']],
+ tolerance)
+ compare_with_tolerance(
+ ret[7], raw_info[pinfo_map['pagefile']],
+ tolerance)
+ compare_with_tolerance(
+ ret[8], raw_info[pinfo_map['peak_pagefile']],
+ tolerance)
+ compare_with_tolerance(
+ ret[9], raw_info[pinfo_map['mem_private']],
+ tolerance)
except AssertionError:
trace = traceback.format_exc()
msg = '%s\npid=%s, method=%r, ret_1=%r, ret_2=%r' % (
- trace, p.pid, name, ret, nt)
+ trace, p.pid, name, ret, raw_info)
failures.append(msg)
break
@@ -409,62 +490,65 @@ class TestDualProcessImplementation(unittest.TestCase):
# ---
# same tests as above but mimicks the AccessDenied failure of
# the first (fast) method failing with AD.
- # TODO: currently does not take tolerance into account.
def test_name(self):
- name = psutil.Process().name()
+ name = psutil.Process(self.pid).name()
with mock.patch("psutil._psplatform.cext.proc_exe",
side_effect=psutil.AccessDenied(os.getpid())) as fun:
- psutil.Process().name() == name
+ self.assertEqual(psutil.Process(self.pid).name(), name)
assert fun.called
def test_memory_info(self):
- mem = psutil.Process().memory_info()
+ mem_1 = psutil.Process(self.pid).memory_info()
with mock.patch("psutil._psplatform.cext.proc_memory_info",
side_effect=OSError(errno.EPERM, "msg")) as fun:
- psutil.Process().memory_info() == mem
+ mem_2 = psutil.Process(self.pid).memory_info()
+ self.assertEqual(len(mem_1), len(mem_2))
+ for i in range(len(mem_1)):
+ self.assertGreaterEqual(mem_1[i], 0)
+ self.assertGreaterEqual(mem_2[i], 0)
+ self.assertAlmostEqual(mem_1[i], mem_2[i], delta=512)
assert fun.called
def test_create_time(self):
- ctime = psutil.Process().create_time()
+ ctime = psutil.Process(self.pid).create_time()
with mock.patch("psutil._psplatform.cext.proc_create_time",
side_effect=OSError(errno.EPERM, "msg")) as fun:
- psutil.Process().create_time() == ctime
+ self.assertEqual(psutil.Process(self.pid).create_time(), ctime)
assert fun.called
def test_cpu_times(self):
- cpu_times = psutil.Process().cpu_times()
+ cpu_times_1 = psutil.Process(self.pid).cpu_times()
with mock.patch("psutil._psplatform.cext.proc_cpu_times",
side_effect=OSError(errno.EPERM, "msg")) as fun:
- psutil.Process().cpu_times() == cpu_times
+ cpu_times_2 = psutil.Process(self.pid).cpu_times()
assert fun.called
+ self.assertAlmostEqual(
+ cpu_times_1.user, cpu_times_2.user, delta=0.01)
+ self.assertAlmostEqual(
+ cpu_times_1.system, cpu_times_2.system, delta=0.01)
def test_io_counters(self):
- io_counters = psutil.Process().io_counters()
+ io_counters_1 = psutil.Process(self.pid).io_counters()
+ print("")
+ print(io_counters_1)
with mock.patch("psutil._psplatform.cext.proc_io_counters",
side_effect=OSError(errno.EPERM, "msg")) as fun:
- psutil.Process().io_counters() == io_counters
+ io_counters_2 = psutil.Process(self.pid).io_counters()
+ for i in range(len(io_counters_1)):
+ self.assertGreaterEqual(io_counters_1[i], 0)
+ self.assertGreaterEqual(io_counters_2[i], 0)
+ self.assertAlmostEqual(
+ io_counters_1[i], io_counters_2[i], delta=5)
assert fun.called
def test_num_handles(self):
- io_counters = psutil.Process().io_counters()
- with mock.patch("psutil._psplatform.cext.proc_io_counters",
+ num_handles = psutil.Process(self.pid).num_handles()
+ with mock.patch("psutil._psplatform.cext.proc_num_handles",
side_effect=OSError(errno.EPERM, "msg")) as fun:
- psutil.Process().io_counters() == io_counters
+ psutil.Process(self.pid).num_handles() == num_handles
assert fun.called
- # --- other tests
-
- def test_compare_name_exe(self):
- for p in psutil.process_iter():
- try:
- a = os.path.basename(p.exe())
- b = p.name()
- except (psutil.NoSuchProcess, psutil.AccessDenied):
- pass
- else:
- self.assertEqual(a, b)
-
def test_zombies(self):
# test that NPS is raised by the 2nd implementation in case a
# process no longer exists
@@ -476,9 +560,11 @@ class TestDualProcessImplementation(unittest.TestCase):
@unittest.skipUnless(WINDOWS, "WINDOWS only")
class RemoteProcessTestCase(unittest.TestCase):
- """Certain functions require calling ReadProcessMemory. This trivially
- works when called on the current process. Check that this works on other
- processes, especially when they have a different bitness."""
+ """Certain functions require calling ReadProcessMemory.
+ This trivially works when called on the current process.
+ Check that this works on other processes, especially when they
+ have a different bitness.
+ """
@staticmethod
def find_other_interpreter():
@@ -562,6 +648,11 @@ class RemoteProcessTestCase(unittest.TestCase):
self.assertEquals(e["THINK_OF_A_NUMBER"], str(os.getpid()))
+# ===================================================================
+# Windows services
+# ===================================================================
+
+
@unittest.skipUnless(WINDOWS, "WINDOWS only")
class TestServices(unittest.TestCase):