diff options
Diffstat (limited to 'psutil/tests/test_connections.py')
| -rwxr-xr-x | psutil/tests/test_connections.py | 281 |
1 files changed, 100 insertions, 181 deletions
diff --git a/psutil/tests/test_connections.py b/psutil/tests/test_connections.py index c7fe1992..6bbf2194 100755 --- a/psutil/tests/test_connections.py +++ b/psutil/tests/test_connections.py @@ -1,4 +1,4 @@ -#!/usr/bin/env python +#!/usr/bin/env python3 # Copyright (c) 2009, Giampaolo Rodola'. All rights reserved. # Use of this source code is governed by a BSD-style license that can be @@ -6,10 +6,9 @@ """Tests for net_connections() and Process.connections() APIs.""" -import contextlib -import errno import os import socket +import sys import textwrap from contextlib import closing from socket import AF_INET @@ -31,42 +30,36 @@ from psutil._compat import PY3 from psutil.tests import AF_UNIX from psutil.tests import bind_socket from psutil.tests import bind_unix_socket -from psutil.tests import check_net_address -from psutil.tests import CIRRUS +from psutil.tests import check_connection_ntuple from psutil.tests import create_sockets -from psutil.tests import enum -from psutil.tests import get_free_port from psutil.tests import HAS_CONNECTIONS_UNIX -from psutil.tests import pyrun +from psutil.tests import PsutilTestCase from psutil.tests import reap_children -from psutil.tests import safe_rmpath +from psutil.tests import retry_on_failure +from psutil.tests import serialrun from psutil.tests import skip_on_access_denied from psutil.tests import SKIP_SYSCONS from psutil.tests import tcp_socketpair -from psutil.tests import TESTFN -from psutil.tests import TRAVIS from psutil.tests import unittest -from psutil.tests import unix_socket_path from psutil.tests import unix_socketpair from psutil.tests import wait_for_file thisproc = psutil.Process() SOCK_SEQPACKET = getattr(socket, "SOCK_SEQPACKET", object()) +PYTHON_39 = sys.version_info[:2] == (3, 9) -class Base(object): +@serialrun +class ConnectionTestCase(PsutilTestCase): def setUp(self): - safe_rmpath(TESTFN) if not (NETBSD or FREEBSD): # process opens a UNIX socket to /var/log/run. cons = thisproc.connections(kind='all') assert not cons, cons def tearDown(self): - safe_rmpath(TESTFN) - reap_children() if not (FREEBSD or NETBSD): # Make sure we closed all resources. # NetBSD opens a UNIX socket to /var/log/run. @@ -93,100 +86,27 @@ class Base(object): proc_cons.sort() self.assertEqual(proc_cons, sys_cons) - def check_connection_ntuple(self, conn): - """Check validity of a connection namedtuple.""" - def check_ntuple(conn): - has_pid = len(conn) == 7 - self.assertIn(len(conn), (6, 7)) - self.assertEqual(conn[0], conn.fd) - self.assertEqual(conn[1], conn.family) - self.assertEqual(conn[2], conn.type) - self.assertEqual(conn[3], conn.laddr) - self.assertEqual(conn[4], conn.raddr) - self.assertEqual(conn[5], conn.status) - if has_pid: - self.assertEqual(conn[6], conn.pid) - - def check_family(conn): - self.assertIn(conn.family, (AF_INET, AF_INET6, AF_UNIX)) - if enum is not None: - assert isinstance(conn.family, enum.IntEnum), conn - else: - assert isinstance(conn.family, int), conn - if conn.family == AF_INET: - # actually try to bind the local socket; ignore IPv6 - # sockets as their address might be represented as - # an IPv4-mapped-address (e.g. "::127.0.0.1") - # and that's rejected by bind() - s = socket.socket(conn.family, conn.type) - with contextlib.closing(s): - try: - s.bind((conn.laddr[0], 0)) - except socket.error as err: - if err.errno != errno.EADDRNOTAVAIL: - raise - elif conn.family == AF_UNIX: - self.assertEqual(conn.status, psutil.CONN_NONE) - - def check_type(conn): - # SOCK_SEQPACKET may happen in case of AF_UNIX socks - self.assertIn(conn.type, (SOCK_STREAM, SOCK_DGRAM, SOCK_SEQPACKET)) - if enum is not None: - assert isinstance(conn.type, enum.IntEnum), conn - else: - assert isinstance(conn.type, int), conn - if conn.type == SOCK_DGRAM: - self.assertEqual(conn.status, psutil.CONN_NONE) - - def check_addrs(conn): - # check IP address and port sanity - for addr in (conn.laddr, conn.raddr): - if conn.family in (AF_INET, AF_INET6): - self.assertIsInstance(addr, tuple) - if not addr: - continue - self.assertIsInstance(addr.port, int) - assert 0 <= addr.port <= 65535, addr.port - check_net_address(addr.ip, conn.family) - elif conn.family == AF_UNIX: - self.assertIsInstance(addr, str) - - def check_status(conn): - self.assertIsInstance(conn.status, str) - valids = [getattr(psutil, x) for x in dir(psutil) - if x.startswith('CONN_')] - self.assertIn(conn.status, valids) - if conn.family in (AF_INET, AF_INET6) and conn.type == SOCK_STREAM: - self.assertNotEqual(conn.status, psutil.CONN_NONE) - else: - self.assertEqual(conn.status, psutil.CONN_NONE) - - check_ntuple(conn) - check_family(conn) - check_type(conn) - check_addrs(conn) - check_status(conn) - -class TestBase(Base, unittest.TestCase): +class TestBasicOperations(ConnectionTestCase): @unittest.skipIf(SKIP_SYSCONS, "requires root") def test_system(self): with create_sockets(): for conn in psutil.net_connections(kind='all'): - self.check_connection_ntuple(conn) + check_connection_ntuple(conn) def test_process(self): with create_sockets(): for conn in psutil.Process().connections(kind='all'): - self.check_connection_ntuple(conn) + check_connection_ntuple(conn) def test_invalid_kind(self): self.assertRaises(ValueError, thisproc.connections, kind='???') self.assertRaises(ValueError, psutil.net_connections, kind='???') -class TestUnconnectedSockets(Base, unittest.TestCase): +@serialrun +class TestUnconnectedSockets(ConnectionTestCase): """Tests sockets which are open but not connected to anything.""" def get_conn_from_sock(self, sock): @@ -208,7 +128,7 @@ class TestUnconnectedSockets(Base, unittest.TestCase): only (the one supposed to be checked). """ conn = self.get_conn_from_sock(sock) - self.check_connection_ntuple(conn) + check_connection_ntuple(conn) # fd, family, type if conn.fd != -1: @@ -238,7 +158,7 @@ class TestUnconnectedSockets(Base, unittest.TestCase): return conn def test_tcp_v4(self): - addr = ("127.0.0.1", get_free_port()) + addr = ("127.0.0.1", 0) with closing(bind_socket(AF_INET, SOCK_STREAM, addr=addr)) as sock: conn = self.check_socket(sock) assert not conn.raddr @@ -246,14 +166,14 @@ class TestUnconnectedSockets(Base, unittest.TestCase): @unittest.skipIf(not supports_ipv6(), "IPv6 not supported") def test_tcp_v6(self): - addr = ("::1", get_free_port()) + addr = ("::1", 0) with closing(bind_socket(AF_INET6, SOCK_STREAM, addr=addr)) as sock: conn = self.check_socket(sock) assert not conn.raddr self.assertEqual(conn.status, psutil.CONN_LISTEN) def test_udp_v4(self): - addr = ("127.0.0.1", get_free_port()) + addr = ("127.0.0.1", 0) with closing(bind_socket(AF_INET, SOCK_DGRAM, addr=addr)) as sock: conn = self.check_socket(sock) assert not conn.raddr @@ -261,7 +181,7 @@ class TestUnconnectedSockets(Base, unittest.TestCase): @unittest.skipIf(not supports_ipv6(), "IPv6 not supported") def test_udp_v6(self): - addr = ("::1", get_free_port()) + addr = ("::1", 0) with closing(bind_socket(AF_INET6, SOCK_DGRAM, addr=addr)) as sock: conn = self.check_socket(sock) assert not conn.raddr @@ -269,22 +189,23 @@ class TestUnconnectedSockets(Base, unittest.TestCase): @unittest.skipIf(not POSIX, 'POSIX only') def test_unix_tcp(self): - with unix_socket_path() as name: - with closing(bind_unix_socket(name, type=SOCK_STREAM)) as sock: - conn = self.check_socket(sock) - assert not conn.raddr - self.assertEqual(conn.status, psutil.CONN_NONE) + testfn = self.get_testfn() + with closing(bind_unix_socket(testfn, type=SOCK_STREAM)) as sock: + conn = self.check_socket(sock) + assert not conn.raddr + self.assertEqual(conn.status, psutil.CONN_NONE) @unittest.skipIf(not POSIX, 'POSIX only') def test_unix_udp(self): - with unix_socket_path() as name: - with closing(bind_unix_socket(name, type=SOCK_STREAM)) as sock: - conn = self.check_socket(sock) - assert not conn.raddr - self.assertEqual(conn.status, psutil.CONN_NONE) + testfn = self.get_testfn() + with closing(bind_unix_socket(testfn, type=SOCK_STREAM)) as sock: + conn = self.check_socket(sock) + assert not conn.raddr + self.assertEqual(conn.status, psutil.CONN_NONE) -class TestConnectedSocket(Base, unittest.TestCase): +@serialrun +class TestConnectedSocket(ConnectionTestCase): """Test socket pairs which are are actually connected to each other. """ @@ -293,7 +214,7 @@ class TestConnectedSocket(Base, unittest.TestCase): # in TIME_WAIT state. @unittest.skipIf(SUNOS, "unreliable on SUONS") def test_tcp(self): - addr = ("127.0.0.1", get_free_port()) + addr = ("127.0.0.1", 0) assert not thisproc.connections(kind='tcp4') server, client = tcp_socketpair(AF_INET, addr=addr) try: @@ -313,42 +234,39 @@ class TestConnectedSocket(Base, unittest.TestCase): @unittest.skipIf(not POSIX, 'POSIX only') def test_unix(self): - with unix_socket_path() as name: - server, client = unix_socketpair(name) - try: - cons = thisproc.connections(kind='unix') - assert not (cons[0].laddr and cons[0].raddr) - assert not (cons[1].laddr and cons[1].raddr) - if NETBSD or FREEBSD: - # On NetBSD creating a UNIX socket will cause - # a UNIX connection to /var/run/log. - cons = [c for c in cons if c.raddr != '/var/run/log'] - if CIRRUS: - cons = [c for c in cons if c.fd in - (server.fileno(), client.fileno())] - self.assertEqual(len(cons), 2, msg=cons) - if LINUX or FREEBSD or SUNOS: - # remote path is never set - self.assertEqual(cons[0].raddr, "") - self.assertEqual(cons[1].raddr, "") - # one local address should though - self.assertEqual(name, cons[0].laddr or cons[1].laddr) - elif OPENBSD: - # No addresses whatsoever here. - for addr in (cons[0].laddr, cons[0].raddr, - cons[1].laddr, cons[1].raddr): - self.assertEqual(addr, "") - else: - # On other systems either the laddr or raddr - # of both peers are set. - self.assertEqual(cons[0].laddr or cons[1].laddr, name) - self.assertEqual(cons[0].raddr or cons[1].raddr, name) - finally: - server.close() - client.close() + testfn = self.get_testfn() + server, client = unix_socketpair(testfn) + try: + cons = thisproc.connections(kind='unix') + assert not (cons[0].laddr and cons[0].raddr) + assert not (cons[1].laddr and cons[1].raddr) + if NETBSD or FREEBSD: + # On NetBSD creating a UNIX socket will cause + # a UNIX connection to /var/run/log. + cons = [c for c in cons if c.raddr != '/var/run/log'] + self.assertEqual(len(cons), 2, msg=cons) + if LINUX or FREEBSD or SUNOS: + # remote path is never set + self.assertEqual(cons[0].raddr, "") + self.assertEqual(cons[1].raddr, "") + # one local address should though + self.assertEqual(testfn, cons[0].laddr or cons[1].laddr) + elif OPENBSD: + # No addresses whatsoever here. + for addr in (cons[0].laddr, cons[0].raddr, + cons[1].laddr, cons[1].raddr): + self.assertEqual(addr, "") + else: + # On other systems either the laddr or raddr + # of both peers are set. + self.assertEqual(cons[0].laddr or cons[1].laddr, testfn) + self.assertEqual(cons[0].raddr or cons[1].raddr, testfn) + finally: + server.close() + client.close() -class TestFilters(Base, unittest.TestCase): +class TestFilters(ConnectionTestCase): def test_filters(self): def check(kind, families, types): @@ -395,10 +313,12 @@ class TestFilters(Base, unittest.TestCase): @skip_on_access_denied(only_if=MACOS) def test_combos(self): + reap_children() + def check_conn(proc, conn, family, type, laddr, raddr, status, kinds): all_kinds = ("all", "inet", "inet4", "inet6", "tcp", "tcp4", "tcp6", "udp", "udp4", "udp6") - self.check_connection_ntuple(conn) + check_connection_ntuple(conn) self.assertEqual(conn.family, family) self.assertEqual(conn.type, type) self.assertEqual(conn.laddr, laddr) @@ -418,45 +338,45 @@ class TestFilters(Base, unittest.TestCase): tcp_template = textwrap.dedent(""" import socket, time - s = socket.socket($family, socket.SOCK_STREAM) - s.bind(('$addr', 0)) + s = socket.socket({family}, socket.SOCK_STREAM) + s.bind(('{addr}', 0)) s.listen(5) - with open('$testfn', 'w') as f: + with open('{testfn}', 'w') as f: f.write(str(s.getsockname()[:2])) time.sleep(60) - """) + """) udp_template = textwrap.dedent(""" import socket, time - s = socket.socket($family, socket.SOCK_DGRAM) - s.bind(('$addr', 0)) - with open('$testfn', 'w') as f: + s = socket.socket({family}, socket.SOCK_DGRAM) + s.bind(('{addr}', 0)) + with open('{testfn}', 'w') as f: f.write(str(s.getsockname()[:2])) time.sleep(60) - """) + """) - from string import Template - testfile = os.path.basename(TESTFN) - tcp4_template = Template(tcp_template).substitute( + # must be relative on Windows + testfile = os.path.basename(self.get_testfn(dir=os.getcwd())) + tcp4_template = tcp_template.format( family=int(AF_INET), addr="127.0.0.1", testfn=testfile) - udp4_template = Template(udp_template).substitute( + udp4_template = udp_template.format( family=int(AF_INET), addr="127.0.0.1", testfn=testfile) - tcp6_template = Template(tcp_template).substitute( + tcp6_template = tcp_template.format( family=int(AF_INET6), addr="::1", testfn=testfile) - udp6_template = Template(udp_template).substitute( + udp6_template = udp_template.format( family=int(AF_INET6), addr="::1", testfn=testfile) # launch various subprocess instantiating a socket of various # families and types to enrich psutil results - tcp4_proc = pyrun(tcp4_template) - tcp4_addr = eval(wait_for_file(testfile)) - udp4_proc = pyrun(udp4_template) - udp4_addr = eval(wait_for_file(testfile)) + tcp4_proc = self.pyrun(tcp4_template) + tcp4_addr = eval(wait_for_file(testfile, delete=True)) + udp4_proc = self.pyrun(udp4_template) + udp4_addr = eval(wait_for_file(testfile, delete=True)) if supports_ipv6(): - tcp6_proc = pyrun(tcp6_template) - tcp6_addr = eval(wait_for_file(testfile)) - udp6_proc = pyrun(udp6_template) - udp6_addr = eval(wait_for_file(testfile)) + tcp6_proc = self.pyrun(tcp6_template) + tcp6_addr = eval(wait_for_file(testfile, delete=True)) + udp6_proc = self.pyrun(udp6_template) + udp6_addr = eval(wait_for_file(testfile, delete=True)) else: tcp6_proc = None udp6_proc = None @@ -548,7 +468,7 @@ class TestFilters(Base, unittest.TestCase): @unittest.skipIf(SKIP_SYSCONS, "requires root") -class TestSystemWideConnections(Base, unittest.TestCase): +class TestSystemWideConnections(ConnectionTestCase): """Tests for net_connections().""" def test_it(self): @@ -557,7 +477,7 @@ class TestSystemWideConnections(Base, unittest.TestCase): self.assertIn(conn.family, families, msg=conn) if conn.family != AF_UNIX: self.assertIn(conn.type, types_, msg=conn) - self.check_connection_ntuple(conn) + check_connection_ntuple(conn) with create_sockets(): from psutil._common import conn_tmap @@ -570,8 +490,7 @@ class TestSystemWideConnections(Base, unittest.TestCase): self.assertEqual(len(cons), len(set(cons))) check(cons, families, types_) - # See: https://travis-ci.org/giampaolo/psutil/jobs/237566297 - @unittest.skipIf(MACOS and TRAVIS, "unreliable on MACOS + TRAVIS") + @retry_on_failure() def test_multi_sockets_procs(self): # Creates multiple sub processes, each creating different # sockets. For each process check that proc.connections() @@ -583,23 +502,23 @@ class TestSystemWideConnections(Base, unittest.TestCase): expected = len(socks) pids = [] times = 10 + fnames = [] for i in range(times): - fname = os.path.realpath(TESTFN) + str(i) + fname = self.get_testfn() + fnames.append(fname) src = textwrap.dedent("""\ import time, os from psutil.tests import create_sockets with create_sockets(): with open(r'%s', 'w') as f: - f.write(str(os.getpid())) + f.write("hello") time.sleep(60) """ % fname) - sproc = pyrun(src) + sproc = self.pyrun(src) pids.append(sproc.pid) - self.addCleanup(safe_rmpath, fname) # sync - for i in range(times): - fname = TESTFN + str(i) + for fname in fnames: wait_for_file(fname) syscons = [x for x in psutil.net_connections(kind='all') if x.pid @@ -611,7 +530,7 @@ class TestSystemWideConnections(Base, unittest.TestCase): self.assertEqual(len(p.connections('all')), expected) -class TestMisc(unittest.TestCase): +class TestMisc(PsutilTestCase): def test_connection_constants(self): ints = [] @@ -633,5 +552,5 @@ class TestMisc(unittest.TestCase): if __name__ == '__main__': - from psutil.tests.runner import run - run(__file__) + from psutil.tests.runner import run_from_name + run_from_name(__file__) |
