summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKostis Anagnostopoulos <ankostis@gmail.com>2016-09-28 17:56:21 +0200
committerKostis Anagnostopoulos <ankostis@gmail.com>2016-09-28 18:10:34 +0200
commit0574b8b921dbfe1b39de68be7522b248b8404892 (patch)
tree1ba59391b9a21502a29fe567ddea4b1daed534c4
parent6e98416791566f44a407dcac07a1e1f1b0483544 (diff)
downloadgitpython-0574b8b921dbfe1b39de68be7522b248b8404892.tar.gz
ABANDON select/poll
-rw-r--r--git/cmd.py233
1 files changed, 48 insertions, 185 deletions
diff --git a/git/cmd.py b/git/cmd.py
index 835be605..3d9435ba 100644
--- a/git/cmd.py
+++ b/git/cmd.py
@@ -4,48 +4,43 @@
# This module is part of GitPython and is released under
# the BSD License: http://www.opensource.org/licenses/bsd-license.php
-import os
-import sys
-import select
-import logging
-import threading
-import errno
-import mmap
-
-from git.odict import OrderedDict
from contextlib import contextmanager
+import io
+import logging
+import os
import signal
-import subprocess
from subprocess import (
call,
Popen,
PIPE
)
+import subprocess
+import sys
+import threading
-
-from .util import (
- LazyMixin,
- stream_copy,
-)
-from .exc import (
- GitCommandError,
- GitCommandNotFound
-)
from git.compat import (
string_types,
defenc,
force_bytes,
PY3,
- bchr,
# just to satisfy flake8 on py3
unicode,
safe_decode,
is_posix,
is_win,
)
-import io
-from _io import UnsupportedOperation
from git.exc import CommandError
+from git.odict import OrderedDict
+
+from .exc import (
+ GitCommandError,
+ GitCommandNotFound
+)
+from .util import (
+ LazyMixin,
+ stream_copy,
+)
+
execute_kwargs = set(('istream', 'with_keep_cwd', 'with_extended_output',
'with_exceptions', 'as_process', 'stdout_as_string',
@@ -57,13 +52,6 @@ log.addHandler(logging.NullHandler())
__all__ = ('Git',)
-if PY3:
- _bchr = bchr
-else:
- def _bchr(c):
- return c
-# get custom byte character handling
-
# ==============================================================================
## @name Utilities
@@ -73,8 +61,7 @@ else:
def handle_process_output(process, stdout_handler, stderr_handler, finalizer, decode_streams=True):
"""Registers for notifications to lean that process output is ready to read, and dispatches lines to
- the respective line handlers. We are able to handle carriage returns in case progress is sent by that
- mean. For performance reasons, we only apply this to stderr.
+ the respective line handlers.
This function returns once the finalizer returns
:return: result of finalizer
@@ -88,160 +75,36 @@ def handle_process_output(process, stdout_handler, stderr_handler, finalizer, de
Set it to False if `universal_newline == True` (then streams are in text-mode)
or if decoding must happen later (i.e. for Diffs).
"""
- if decode_streams:
- ZERO = b''
- LF = b'\n'
- CR = b'\r'
- else:
- ZERO = u''
- LF = u'\n'
- CR = u'\r'
-
- def _parse_lines_from_buffer(buf):
- line = ZERO
- bi = 0
- lb = len(buf)
- while bi < lb:
- char = buf[bi]
- bi += 1
-
- if char in (LF, CR) and line:
- yield bi, line + LF
- line = ZERO
- else:
- line += char
- # END process parsed line
- # END while file is not done reading
- # end
-
- def _read_lines_from_fno(fno, last_buf_list):
- buf = fno.read(mmap.PAGESIZE)
- buf = last_buf_list[0] + buf
-
- bi = 0
- for bi, line in _parse_lines_from_buffer(buf):
- yield line
- # for each line to parse from the buffer
-
- # keep remainder
- last_buf_list[0] = buf[bi:]
-
- def _dispatch_single_line(line, handler, decode):
- if decode:
- line = line.decode(defenc)
- if line and handler:
- handler(line)
- # end dispatch helper
- # end single line helper
-
- def _dispatch_lines(fno, handler, buf_list, decode):
- lc = 0
- for line in _read_lines_from_fno(fno, buf_list):
- _dispatch_single_line(line, handler, decode)
- lc += 1
- # for each line
- return lc
- # end
-
- def _deplete_buffer(fno, handler, buf_list, decode):
- lc = 0
- while True:
- line_count = _dispatch_lines(fno, handler, buf_list, decode)
- lc += line_count
- if line_count == 0:
- break
- # end deplete buffer
-
- if buf_list[0]:
- _dispatch_single_line(buf_list[0], handler, decode)
- lc += 1
- # end
-
- return lc
- # end
-
- try:
- outfn = process.stdout.fileno()
- errfn = process.stderr.fileno()
- poll = select.poll() # @UndefinedVariable
- except (UnsupportedOperation, AttributeError):
- # Oh ... probably we are on windows. or TC mockap provided for streams.
- # Anyhow, select.select() can only handle sockets, we have files
- # The only reliable way to do this now is to use threads and wait for both to finish
- def pump_stream(cmdline, name, stream, is_decode, handler):
- try:
- for line in stream:
- if handler:
- if is_decode:
- line = line.decode(defenc)
- handler(line)
- except Exception as ex:
- log.error("Pumping %r of cmd(%s) failed due to: %r", name, cmdline, ex)
- raise CommandError(['<%s-pump>' % name] + cmdline, ex)
- finally:
- stream.close()
-
- cmdline = getattr(process, 'args', '') # PY3+ only
- if not isinstance(cmdline, (tuple, list)):
- cmdline = cmdline.split()
- threads = []
- for name, stream, handler in (
- ('stdout', process.stdout, stdout_handler),
- ('stderr', process.stderr, stderr_handler),
- ):
- t = threading.Thread(target=pump_stream,
- args=(cmdline, name, stream, decode_streams, handler))
- t.setDaemon(True)
- t.start()
- threads.append(t)
-
- for t in threads:
- t.join()
- else:
- # poll is preferred, as select is limited to file handles up to 1024 ... . This could otherwise be
- # an issue for us, as it matters how many handles our own process has
- fdmap = {outfn: (process.stdout, stdout_handler, [ZERO], decode_streams),
- errfn: (process.stderr, stderr_handler, [ZERO], decode_streams)}
-
- READ_ONLY = select.POLLIN | select.POLLPRI | select.POLLHUP | select.POLLERR # @UndefinedVariable
- CLOSED = select.POLLHUP | select.POLLERR # @UndefinedVariable
-
- poll.register(process.stdout, READ_ONLY)
- poll.register(process.stderr, READ_ONLY)
-
- closed_streams = set()
- while True:
- # no timeout
-
- try:
- poll_result = poll.poll()
- except select.error as e:
- if e.args[0] == errno.EINTR:
- continue
- raise
- # end handle poll exception
-
- for fd, result in poll_result:
- if result & CLOSED:
- closed_streams.add(fd)
- else:
- _dispatch_lines(*fdmap[fd])
- # end handle closed stream
- # end for each poll-result tuple
-
- if len(closed_streams) == len(fdmap):
- break
- # end its all done
- # end endless loop
-
- # Depelete all remaining buffers
- for fno, args in fdmap.items():
- _deplete_buffer(*args)
- # end for each file handle
-
- for fno in fdmap.keys():
- poll.unregister(fno)
- # end don't forget to unregister !
+ # Use 2 "pupm" threads and wait for both to finish.
+ def pump_stream(cmdline, name, stream, is_decode, handler):
+ try:
+ for line in stream:
+ if handler:
+ if is_decode:
+ line = line.decode(defenc)
+ handler(line)
+ except Exception as ex:
+ log.error("Pumping %r of cmd(%s) failed due to: %r", name, cmdline, ex)
+ raise CommandError(['<%s-pump>' % name] + cmdline, ex)
+ finally:
+ stream.close()
+
+ cmdline = getattr(process, 'args', '') # PY3+ only
+ if not isinstance(cmdline, (tuple, list)):
+ cmdline = cmdline.split()
+ threads = []
+ for name, stream, handler in (
+ ('stdout', process.stdout, stdout_handler),
+ ('stderr', process.stderr, stderr_handler),
+ ):
+ t = threading.Thread(target=pump_stream,
+ args=(cmdline, name, stream, decode_streams, handler))
+ t.setDaemon(True)
+ t.start()
+ threads.append(t)
+
+ for t in threads:
+ t.join()
return finalizer(process)