summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Lib/subprocess.py129
-rw-r--r--Lib/test/test_subprocess.py18
-rw-r--r--Misc/NEWS3
3 files changed, 120 insertions, 30 deletions
diff --git a/Lib/subprocess.py b/Lib/subprocess.py
index 368ba0e06f..f7361e1559 100644
--- a/Lib/subprocess.py
+++ b/Lib/subprocess.py
@@ -371,6 +371,7 @@ if mswindows:
error = IOError
else:
import select
+ _has_poll = hasattr(select, 'poll')
import errno
import fcntl
import pickle
@@ -383,6 +384,11 @@ try:
except:
MAXFD = 256
+# When select or poll has indicated that the file is writable,
+# we can write up to _PIPE_BUF bytes without risk of blocking.
+# POSIX defines PIPE_BUF as >= 512.
+_PIPE_BUF = getattr(select, 'PIPE_BUF', 512)
+
_active = []
def _cleanup():
@@ -1173,19 +1179,103 @@ class Popen(object):
def _communicate(self, input):
- read_set = []
- write_set = []
- stdout = None # Return
- stderr = None # Return
-
if self.stdin:
# Flush stdio buffer. This might block, if the user has
# been writing to .stdin in an uncontrolled fashion.
self.stdin.flush()
- if input:
- write_set.append(self.stdin)
- else:
+ if not input:
self.stdin.close()
+
+ if _has_poll:
+ stdout, stderr = self._communicate_with_poll(input)
+ else:
+ stdout, stderr = self._communicate_with_select(input)
+
+ # All data exchanged. Translate lists into strings.
+ if stdout is not None:
+ stdout = b''.join(stdout)
+ if stderr is not None:
+ stderr = b''.join(stderr)
+
+ # Translate newlines, if requested.
+ # This also turns bytes into strings.
+ if self.universal_newlines:
+ if stdout is not None:
+ stdout = self._translate_newlines(stdout,
+ self.stdout.encoding)
+ if stderr is not None:
+ stderr = self._translate_newlines(stderr,
+ self.stderr.encoding)
+
+ self.wait()
+ return (stdout, stderr)
+
+
+ def _communicate_with_poll(self, input):
+ stdout = None # Return
+ stderr = None # Return
+ fd2file = {}
+ fd2output = {}
+
+ poller = select.poll()
+ def register_and_append(file_obj, eventmask):
+ poller.register(file_obj.fileno(), eventmask)
+ fd2file[file_obj.fileno()] = file_obj
+
+ def close_unregister_and_remove(fd):
+ poller.unregister(fd)
+ fd2file[fd].close()
+ fd2file.pop(fd)
+
+ if self.stdin and input:
+ register_and_append(self.stdin, select.POLLOUT)
+
+ select_POLLIN_POLLPRI = select.POLLIN | select.POLLPRI
+ if self.stdout:
+ register_and_append(self.stdout, select_POLLIN_POLLPRI)
+ fd2output[self.stdout.fileno()] = stdout = []
+ if self.stderr:
+ register_and_append(self.stderr, select_POLLIN_POLLPRI)
+ fd2output[self.stderr.fileno()] = stderr = []
+
+ input_offset = 0
+ while fd2file:
+ try:
+ ready = poller.poll()
+ except select.error as e:
+ if e.args[0] == errno.EINTR:
+ continue
+ raise
+
+ # XXX Rewrite these to use non-blocking I/O on the
+ # file objects; they are no longer using C stdio!
+
+ for fd, mode in ready:
+ if mode & select.POLLOUT:
+ chunk = input[input_offset : input_offset + _PIPE_BUF]
+ input_offset += os.write(fd, chunk)
+ if input_offset >= len(input):
+ close_unregister_and_remove(fd)
+ elif mode & select_POLLIN_POLLPRI:
+ data = os.read(fd, 4096)
+ if not data:
+ close_unregister_and_remove(fd)
+ fd2output[fd].append(data)
+ else:
+ # Ignore hang up or errors.
+ close_unregister_and_remove(fd)
+
+ return (stdout, stderr)
+
+
+ def _communicate_with_select(self, input):
+ read_set = []
+ write_set = []
+ stdout = None # Return
+ stderr = None # Return
+
+ if self.stdin and input:
+ write_set.append(self.stdin)
if self.stdout:
read_set.append(self.stdout)
stdout = []
@@ -1206,10 +1296,7 @@ class Popen(object):
# file objects; they are no longer using C stdio!
if self.stdin in wlist:
- # When select has indicated that the file is writable,
- # we can write up to PIPE_BUF bytes without risk
- # blocking. POSIX defines PIPE_BUF >= 512
- chunk = input[input_offset : input_offset + 512]
+ chunk = input[input_offset : input_offset + _PIPE_BUF]
bytes_written = os.write(self.stdin.fileno(), chunk)
input_offset += bytes_written
if input_offset >= len(input):
@@ -1230,25 +1317,9 @@ class Popen(object):
read_set.remove(self.stderr)
stderr.append(data)
- # All data exchanged. Translate lists into strings.
- if stdout is not None:
- stdout = b"".join(stdout)
- if stderr is not None:
- stderr = b"".join(stderr)
-
- # Translate newlines, if requested.
- # This also turns bytes into strings.
- if self.universal_newlines:
- if stdout is not None:
- stdout = self._translate_newlines(stdout,
- self.stdout.encoding)
- if stderr is not None:
- stderr = self._translate_newlines(stderr,
- self.stderr.encoding)
-
- self.wait()
return (stdout, stderr)
+
def send_signal(self, sig):
"""Send a signal to the process
"""
diff --git a/Lib/test/test_subprocess.py b/Lib/test/test_subprocess.py
index 326c996ebe..f2a396cd10 100644
--- a/Lib/test/test_subprocess.py
+++ b/Lib/test/test_subprocess.py
@@ -798,8 +798,24 @@ class CommandTests(unittest.TestCase):
if dir is not None:
os.rmdir(dir)
+
+unit_tests = [ProcessTestCase, CommandTests]
+
+if subprocess._has_poll:
+ class ProcessTestCaseNoPoll(ProcessTestCase):
+ def setUp(self):
+ subprocess._has_poll = False
+ ProcessTestCase.setUp(self)
+
+ def tearDown(self):
+ subprocess._has_poll = True
+ ProcessTestCase.tearDown(self)
+
+ unit_tests.append(ProcessTestCaseNoPoll)
+
+
def test_main():
- support.run_unittest(ProcessTestCase, CommandTests)
+ support.run_unittest(*unit_tests)
support.reap_children()
if __name__ == "__main__":
diff --git a/Misc/NEWS b/Misc/NEWS
index ae39b9abc1..87949298ca 100644
--- a/Misc/NEWS
+++ b/Misc/NEWS
@@ -34,6 +34,9 @@ C-API
Library
-------
+- Issue #3392: The subprocess communicate() method no longer fails in select()
+ when file descriptors are large; communicate() now uses poll() when possible.
+
- Issue #6369: Fix an RLE decompression bug in the binhex module.
- Issue #6344: Fixed a crash of mmap.read() when passed a negative argument.