diff options
| author | Charles-François Natali <cf.natali@gmail.com> | 2013-11-08 19:56:59 +0100 | 
|---|---|---|
| committer | Charles-François Natali <cf.natali@gmail.com> | 2013-11-08 19:56:59 +0100 | 
| commit | 3a4586a9f97c997fbdb0de297ed75374015e69bf (patch) | |
| tree | a65e4cc91d8ef3b15c25899c422cb2461c066707 | |
| parent | 2ce6c44ae4c1055092a8cfb31d8d804c9b6458f6 (diff) | |
| download | cpython-git-3a4586a9f97c997fbdb0de297ed75374015e69bf.tar.gz | |
Issue #18923: Update subprocess to use the new selectors module.
| -rw-r--r-- | Lib/subprocess.py | 238 | ||||
| -rw-r--r-- | Lib/test/test_subprocess.py | 10 | 
2 files changed, 75 insertions, 173 deletions
diff --git a/Lib/subprocess.py b/Lib/subprocess.py index 5b2811f929..15d95bd2c6 100644 --- a/Lib/subprocess.py +++ b/Lib/subprocess.py @@ -404,15 +404,23 @@ if mswindows:          hStdError = None          wShowWindow = 0  else: -    import select -    _has_poll = hasattr(select, 'poll')      import _posixsubprocess +    import select +    import selectors      # When select or poll has indicated that the file is writable,      # we can write up to _PIPE_BUF bytes without risk of blocking.      # POSIX defines PIPE_BUF as >= 512.      _PIPE_BUF = getattr(select, 'PIPE_BUF', 512) +    # poll/select have the advantage of not requiring any extra file +    # descriptor, contrarily to epoll/kqueue (also, they require a single +    # syscall). +    if hasattr(selectors, 'PollSelector'): +        _PopenSelector = selectors.PollSelector +    else: +        _PopenSelector = selectors.SelectSelector +  __all__ = ["Popen", "PIPE", "STDOUT", "call", "check_call", "getstatusoutput",             "getoutput", "check_output", "CalledProcessError", "DEVNULL"] @@ -1530,12 +1538,65 @@ class Popen(object):                  if not input:                      self.stdin.close() -            if _has_poll: -                stdout, stderr = self._communicate_with_poll(input, endtime, -                                                             orig_timeout) -            else: -                stdout, stderr = self._communicate_with_select(input, endtime, -                                                               orig_timeout) +            stdout = None +            stderr = None + +            # Only create this mapping if we haven't already. +            if not self._communication_started: +                self._fileobj2output = {} +                if self.stdout: +                    self._fileobj2output[self.stdout] = [] +                if self.stderr: +                    self._fileobj2output[self.stderr] = [] + +            if self.stdout: +                stdout = self._fileobj2output[self.stdout] +            if self.stderr: +                stderr = self._fileobj2output[self.stderr] + +            self._save_input(input) + +            with _PopenSelector() as selector: +                if self.stdin and input: +                    selector.register(self.stdin, selectors.EVENT_WRITE) +                if self.stdout: +                    selector.register(self.stdout, selectors.EVENT_READ) +                if self.stderr: +                    selector.register(self.stderr, selectors.EVENT_READ) + +                while selector.get_map(): +                    timeout = self._remaining_time(endtime) +                    if timeout is not None and timeout < 0: +                        raise TimeoutExpired(self.args, orig_timeout) + +                    ready = selector.select(timeout) +                    self._check_timeout(endtime, orig_timeout) + +                    # XXX Rewrite these to use non-blocking I/O on the file +                    # objects; they are no longer using C stdio! + +                    for key, events in ready: +                        if key.fileobj is self.stdin: +                            chunk = self._input[self._input_offset : +                                                self._input_offset + _PIPE_BUF] +                            try: +                                self._input_offset += os.write(key.fd, chunk) +                            except OSError as e: +                                if e.errno == errno.EPIPE: +                                    selector.unregister(key.fileobj) +                                    key.fileobj.close() +                                else: +                                    raise +                            else: +                                if self._input_offset >= len(self._input): +                                    selector.unregister(key.fileobj) +                                    key.fileobj.close() +                        elif key.fileobj in (self.stdout, self.stderr): +                            data = os.read(key.fd, 4096) +                            if not data: +                                selector.unregister(key.fileobj) +                                key.fileobj.close() +                            self._fileobj2output[key.fileobj].append(data)              self.wait(timeout=self._remaining_time(endtime)) @@ -1569,167 +1630,6 @@ class Popen(object):                      self._input = self._input.encode(self.stdin.encoding) -        def _communicate_with_poll(self, input, endtime, orig_timeout): -            stdout = None # Return -            stderr = None # Return - -            if not self._communication_started: -                self._fd2file = {} - -            poller = select.poll() -            def register_and_append(file_obj, eventmask): -                poller.register(file_obj.fileno(), eventmask) -                self._fd2file[file_obj.fileno()] = file_obj - -            def close_unregister_and_remove(fd): -                poller.unregister(fd) -                self._fd2file[fd].close() -                self._fd2file.pop(fd) - -            if self.stdin and input: -                register_and_append(self.stdin, select.POLLOUT) - -            # Only create this mapping if we haven't already. -            if not self._communication_started: -                self._fd2output = {} -                if self.stdout: -                    self._fd2output[self.stdout.fileno()] = [] -                if self.stderr: -                    self._fd2output[self.stderr.fileno()] = [] - -            select_POLLIN_POLLPRI = select.POLLIN | select.POLLPRI -            if self.stdout: -                register_and_append(self.stdout, select_POLLIN_POLLPRI) -                stdout = self._fd2output[self.stdout.fileno()] -            if self.stderr: -                register_and_append(self.stderr, select_POLLIN_POLLPRI) -                stderr = self._fd2output[self.stderr.fileno()] - -            self._save_input(input) - -            while self._fd2file: -                timeout = self._remaining_time(endtime) -                if timeout is not None and timeout < 0: -                    raise TimeoutExpired(self.args, orig_timeout) -                try: -                    ready = poller.poll(timeout) -                except OSError as e: -                    if e.args[0] == errno.EINTR: -                        continue -                    raise -                self._check_timeout(endtime, orig_timeout) - -                # XXX Rewrite these to use non-blocking I/O on the -                # file objects; they are no longer using C stdio! - -                for fd, mode in ready: -                    if mode & select.POLLOUT: -                        chunk = self._input[self._input_offset : -                                            self._input_offset + _PIPE_BUF] -                        try: -                            self._input_offset += os.write(fd, chunk) -                        except OSError as e: -                            if e.errno == errno.EPIPE: -                                close_unregister_and_remove(fd) -                            else: -                                raise -                        else: -                            if self._input_offset >= len(self._input): -                                close_unregister_and_remove(fd) -                    elif mode & select_POLLIN_POLLPRI: -                        data = os.read(fd, 4096) -                        if not data: -                            close_unregister_and_remove(fd) -                        self._fd2output[fd].append(data) -                    else: -                        # Ignore hang up or errors. -                        close_unregister_and_remove(fd) - -            return (stdout, stderr) - - -        def _communicate_with_select(self, input, endtime, orig_timeout): -            if not self._communication_started: -                self._read_set = [] -                self._write_set = [] -                if self.stdin and input: -                    self._write_set.append(self.stdin) -                if self.stdout: -                    self._read_set.append(self.stdout) -                if self.stderr: -                    self._read_set.append(self.stderr) - -            self._save_input(input) - -            stdout = None # Return -            stderr = None # Return - -            if self.stdout: -                if not self._communication_started: -                    self._stdout_buff = [] -                stdout = self._stdout_buff -            if self.stderr: -                if not self._communication_started: -                    self._stderr_buff = [] -                stderr = self._stderr_buff - -            while self._read_set or self._write_set: -                timeout = self._remaining_time(endtime) -                if timeout is not None and timeout < 0: -                    raise TimeoutExpired(self.args, orig_timeout) -                try: -                    (rlist, wlist, xlist) = \ -                        select.select(self._read_set, self._write_set, [], -                                      timeout) -                except OSError as e: -                    if e.args[0] == errno.EINTR: -                        continue -                    raise - -                # According to the docs, returning three empty lists indicates -                # that the timeout expired. -                if not (rlist or wlist or xlist): -                    raise TimeoutExpired(self.args, orig_timeout) -                # We also check what time it is ourselves for good measure. -                self._check_timeout(endtime, orig_timeout) - -                # XXX Rewrite these to use non-blocking I/O on the -                # file objects; they are no longer using C stdio! - -                if self.stdin in wlist: -                    chunk = self._input[self._input_offset : -                                        self._input_offset + _PIPE_BUF] -                    try: -                        bytes_written = os.write(self.stdin.fileno(), chunk) -                    except OSError as e: -                        if e.errno == errno.EPIPE: -                            self.stdin.close() -                            self._write_set.remove(self.stdin) -                        else: -                            raise -                    else: -                        self._input_offset += bytes_written -                        if self._input_offset >= len(self._input): -                            self.stdin.close() -                            self._write_set.remove(self.stdin) - -                if self.stdout in rlist: -                    data = os.read(self.stdout.fileno(), 1024) -                    if not data: -                        self.stdout.close() -                        self._read_set.remove(self.stdout) -                    stdout.append(data) - -                if self.stderr in rlist: -                    data = os.read(self.stderr.fileno(), 1024) -                    if not data: -                        self.stderr.close() -                        self._read_set.remove(self.stderr) -                    stderr.append(data) - -            return (stdout, stderr) - -          def send_signal(self, sig):              """Send a signal to the process              """ diff --git a/Lib/test/test_subprocess.py b/Lib/test/test_subprocess.py index cdcee9fd0b..e12f593b19 100644 --- a/Lib/test/test_subprocess.py +++ b/Lib/test/test_subprocess.py @@ -11,6 +11,7 @@ import errno  import tempfile  import time  import re +import selectors  import sysconfig  import warnings  import select @@ -2179,15 +2180,16 @@ class CommandTests(unittest.TestCase):                  os.rmdir(dir) -@unittest.skipUnless(getattr(subprocess, '_has_poll', False), -                     "poll system call not supported") +@unittest.skipUnless(hasattr(selectors, 'PollSelector'), +                     "Test needs selectors.PollSelector")  class ProcessTestCaseNoPoll(ProcessTestCase):      def setUp(self): -        subprocess._has_poll = False +        self.orig_selector = subprocess._PopenSelector +        subprocess._PopenSelector = selectors.SelectSelector          ProcessTestCase.setUp(self)      def tearDown(self): -        subprocess._has_poll = True +        subprocess._PopenSelector = self.orig_selector          ProcessTestCase.tearDown(self)  | 
