From b6e43af669f61a37a29d8ff0785455108e6bc29d Mon Sep 17 00:00:00 2001 From: Christian Heimes Date: Mon, 29 Jan 2018 22:37:58 +0100 Subject: bpo-28134: Auto-detect socket values from file descriptor (#1349) Fix socket(fileno=fd) by auto-detecting the socket's family, type, and proto from the file descriptor. The auto-detection can be overruled by passing in family, type, and proto explicitly. Without the fix, all socket except for TCP/IP over IPv4 are basically broken: >>> s = socket.create_connection(('www.python.org', 443)) >>> s >>> socket.socket(fileno=s.fileno()) Signed-off-by: Christian Heimes --- Lib/test/test_socket.py | 45 ++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 44 insertions(+), 1 deletion(-) (limited to 'Lib/test/test_socket.py') diff --git a/Lib/test/test_socket.py b/Lib/test/test_socket.py index 365da36c42..89cf1fa520 100644 --- a/Lib/test/test_socket.py +++ b/Lib/test/test_socket.py @@ -20,6 +20,7 @@ import math import pickle import struct import random +import shutil import string import _thread as thread import threading @@ -1284,6 +1285,7 @@ class GeneralModuleTests(unittest.TestCase): def testCloseException(self): sock = socket.socket() + sock.bind((socket._LOCALHOST, 0)) socket.socket(fileno=sock.fileno()).close() try: sock.close() @@ -1636,9 +1638,11 @@ class GeneralModuleTests(unittest.TestCase): ) + 1 with socket.socket( - family=unknown_family, type=unknown_type, fileno=fd) as s: + family=unknown_family, type=unknown_type, proto=23, + fileno=fd) as s: self.assertEqual(s.family, unknown_family) self.assertEqual(s.type, unknown_type) + self.assertEqual(s.proto, 23) @unittest.skipUnless(hasattr(os, 'sendfile'), 'test needs os.sendfile()') def test__sendfile_use_sendfile(self): @@ -1658,6 +1662,45 @@ class GeneralModuleTests(unittest.TestCase): with self.assertRaises(TypeError): sock._sendfile_use_sendfile(File(None)) + def _test_socket_fileno(self, s, family, stype): + self.assertEqual(s.family, family) + self.assertEqual(s.type, stype) + + fd = s.fileno() + s2 = socket.socket(fileno=fd) + self.addCleanup(s2.close) + # detach old fd to avoid double close + s.detach() + self.assertEqual(s2.family, family) + self.assertEqual(s2.type, stype) + self.assertEqual(s2.fileno(), fd) + + def test_socket_fileno(self): + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.addCleanup(s.close) + s.bind((support.HOST, 0)) + self._test_socket_fileno(s, socket.AF_INET, socket.SOCK_STREAM) + + if hasattr(socket, "SOCK_DGRAM"): + s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + self.addCleanup(s.close) + s.bind((support.HOST, 0)) + self._test_socket_fileno(s, socket.AF_INET, socket.SOCK_DGRAM) + + if support.IPV6_ENABLED: + s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) + self.addCleanup(s.close) + s.bind((support.HOSTv6, 0, 0, 0)) + self._test_socket_fileno(s, socket.AF_INET6, socket.SOCK_STREAM) + + if hasattr(socket, "AF_UNIX"): + tmpdir = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, tmpdir) + s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + self.addCleanup(s.close) + s.bind(os.path.join(tmpdir, 'socket')) + self._test_socket_fileno(s, socket.AF_UNIX, socket.SOCK_STREAM) + @unittest.skipUnless(HAVE_SOCKET_CAN, 'SocketCan required for this test.') class BasicCANTest(unittest.TestCase): -- cgit v1.2.1