From 491440543571b07c849c0ef9c4ebf5c27f263bc0 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Wed, 7 Jan 2015 11:18:07 +0100 Subject: Implemented non-blocking operations using poll() Next up is using threads --- git/cmd.py | 96 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 96 insertions(+) (limited to 'git/cmd.py') diff --git a/git/cmd.py b/git/cmd.py index ef370fe2..3cb334b6 100644 --- a/git/cmd.py +++ b/git/cmd.py @@ -6,6 +6,7 @@ import os import sys +import select import logging from subprocess import ( call, @@ -36,9 +37,104 @@ log = logging.getLogger('git.cmd') __all__ = ('Git', ) +# ============================================================================== +## @name Utilities +# ------------------------------------------------------------------------------ +# Documentation +## @{ + +def handle_process_output(process, stdout_handler, stderr_handler, finalizer): + """Registers for notifications to lean that process output is ready to read, and dispatches lines to + the respective line handlers. We are able to handle carriage returns in case progress is sent by that + mean. For performance reasons, we only apply this to stderr. + This function returns once the finalizer returns + :return: result of finalizer + :param process: subprocess.Popen instance + :param stdout_handler: f(stdout_line_string), or None + :param stderr_hanlder: f(stderr_line_string), or None + :param finalizer: f(proc) - wait for proc to finish""" + def read_line_fast(stream): + return stream.readline() + + def read_line_slow(stream): + line = b'' + while True: + char = stream.read(1) # reads individual single byte strings + if not char: + break + + if char in (b'\r', b'\n') and line: + break + else: + line += char + # END process parsed line + # END while file is not done reading + return line + # end + + fdmap = { process.stdout.fileno() : (process.stdout, stdout_handler, read_line_fast), + process.stderr.fileno() : (process.stderr, stderr_handler, read_line_slow) } + + if hasattr(select, 'poll'): + def dispatch_line(fd): + stream, handler, readline = fdmap[fd] + # this can possibly block for a while, but since we wake-up with at least one or more lines to handle, + # we are good ... + line = readline(stream).decode(defenc) + if line and handler: + handler(line) + return line + # end dispatch helper + + # poll is preferred, as select is limited to file handles up to 1024 ... . Not an issue for us though, + # as we deal with relatively blank processes + poll = select.poll() + READ_ONLY = select.POLLIN | select.POLLPRI | select.POLLHUP | select.POLLERR + CLOSED = select.POLLHUP | select.POLLERR + + poll.register(process.stdout, READ_ONLY) + poll.register(process.stderr, READ_ONLY) + + closed_streams = set() + while True: + # no timeout + poll_result = poll.poll() + for fd, result in poll_result: + if result & CLOSED: + closed_streams.add(fd) + else: + dispatch_line(fd) + # end handle closed stream + # end for each poll-result tuple + + if len(closed_streams) == len(fdmap): + break + # end its all done + # end endless loop + + # Depelete all remaining buffers + for fno, _ in fdmap.items(): + while True: + line = dispatch_line(fno) + if not line: + break + # end deplete buffer + # end for each file handle + else: + # Oh ... probably we are on windows. select.select() can only handle sockets, we have files + # The only reliable way to do this now is to use threads and wait for both to finish + # Since the finalizer is expected to wait, we don't have to introduce our own wait primitive + raise NotImplementedError() + # end + + return finalizer(process) + + def dashify(string): return string.replace('_', '-') +## -- End Utilities -- @} + class Git(LazyMixin): -- cgit v1.2.1 From c86bea60dde4016dd850916aa2e0db5260e1ff61 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Wed, 7 Jan 2015 11:41:15 +0100 Subject: Implemented threaded version of pipe-draining --- git/cmd.py | 45 ++++++++++++++++++++++++++++++--------------- 1 file changed, 30 insertions(+), 15 deletions(-) (limited to 'git/cmd.py') diff --git a/git/cmd.py b/git/cmd.py index 3cb334b6..5ba5edb4 100644 --- a/git/cmd.py +++ b/git/cmd.py @@ -8,6 +8,7 @@ import os import sys import select import logging +import threading from subprocess import ( call, Popen, @@ -72,12 +73,8 @@ def handle_process_output(process, stdout_handler, stderr_handler, finalizer): return line # end - fdmap = { process.stdout.fileno() : (process.stdout, stdout_handler, read_line_fast), - process.stderr.fileno() : (process.stderr, stderr_handler, read_line_slow) } - - if hasattr(select, 'poll'): - def dispatch_line(fd): - stream, handler, readline = fdmap[fd] + def dispatch_line(fno): + stream, handler, readline = fdmap[fno] # this can possibly block for a while, but since we wake-up with at least one or more lines to handle, # we are good ... line = readline(stream).decode(defenc) @@ -85,9 +82,22 @@ def handle_process_output(process, stdout_handler, stderr_handler, finalizer): handler(line) return line # end dispatch helper + # end + + def deplete_buffer(fno): + while True: + line = dispatch_line(fno) + if not line: + break + # end deplete buffer + # end + + fdmap = { process.stdout.fileno() : (process.stdout, stdout_handler, read_line_fast), + process.stderr.fileno() : (process.stderr, stderr_handler, read_line_slow) } - # poll is preferred, as select is limited to file handles up to 1024 ... . Not an issue for us though, - # as we deal with relatively blank processes + if hasattr(select, 'poll'): + # poll is preferred, as select is limited to file handles up to 1024 ... . This could otherwise be + # an issue for us, as it matters how many handles or own process has poll = select.poll() READ_ONLY = select.POLLIN | select.POLLPRI | select.POLLHUP | select.POLLERR CLOSED = select.POLLHUP | select.POLLERR @@ -113,18 +123,23 @@ def handle_process_output(process, stdout_handler, stderr_handler, finalizer): # end endless loop # Depelete all remaining buffers - for fno, _ in fdmap.items(): - while True: - line = dispatch_line(fno) - if not line: - break - # end deplete buffer + for fno in fdmap.keys(): + deplete_buffer(fno) # end for each file handle else: # Oh ... probably we are on windows. select.select() can only handle sockets, we have files # The only reliable way to do this now is to use threads and wait for both to finish # Since the finalizer is expected to wait, we don't have to introduce our own wait primitive - raise NotImplementedError() + # NO: It's not enough unfortunately, and we will have to sync the threads + threads = list() + for fno in fdmap.keys(): + t = threading.Thread(target = lambda: deplete_buffer(fno)) + threads.append(t) + t.start() + # end + for t in threads: + t.join() + # end # end return finalizer(process) -- cgit v1.2.1 From 763ef75d12f0ad6e4b79a7df304c7b5f1b5a11f2 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Wed, 7 Jan 2015 12:32:45 +0100 Subject: Using a wait-group seems to properly sync the threads for buffer depletion --- git/cmd.py | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) (limited to 'git/cmd.py') diff --git a/git/cmd.py b/git/cmd.py index 5ba5edb4..e03d0cdc 100644 --- a/git/cmd.py +++ b/git/cmd.py @@ -18,7 +18,8 @@ from subprocess import ( from .util import ( LazyMixin, - stream_copy + stream_copy, + WaitGroup ) from .exc import GitCommandError from git.compat import ( @@ -84,12 +85,14 @@ def handle_process_output(process, stdout_handler, stderr_handler, finalizer): # end dispatch helper # end - def deplete_buffer(fno): + def deplete_buffer(fno, wg=None): while True: line = dispatch_line(fno) if not line: break # end deplete buffer + if wg: + wg.done() # end fdmap = { process.stdout.fileno() : (process.stdout, stdout_handler, read_line_fast), @@ -131,15 +134,16 @@ def handle_process_output(process, stdout_handler, stderr_handler, finalizer): # The only reliable way to do this now is to use threads and wait for both to finish # Since the finalizer is expected to wait, we don't have to introduce our own wait primitive # NO: It's not enough unfortunately, and we will have to sync the threads - threads = list() + wg = WaitGroup() for fno in fdmap.keys(): - t = threading.Thread(target = lambda: deplete_buffer(fno)) - threads.append(t) + wg.add(1) + t = threading.Thread(target = lambda: deplete_buffer(fno, wg)) t.start() # end - for t in threads: - t.join() - # end + # NOTE: Just joining threads can possibly fail as there is a gap between .start() and when it's + # actually started, which could make the wait() call to just return because the thread is not yet + # active + wg.wait() # end return finalizer(process) -- cgit v1.2.1 From 87a6ffa13ae2951a168cde5908c7a94b16562b96 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Wed, 7 Jan 2015 12:37:49 +0100 Subject: Fix flake8 --- git/cmd.py | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) (limited to 'git/cmd.py') diff --git a/git/cmd.py b/git/cmd.py index e03d0cdc..f847166c 100644 --- a/git/cmd.py +++ b/git/cmd.py @@ -46,8 +46,8 @@ __all__ = ('Git', ) ## @{ def handle_process_output(process, stdout_handler, stderr_handler, finalizer): - """Registers for notifications to lean that process output is ready to read, and dispatches lines to - the respective line handlers. We are able to handle carriage returns in case progress is sent by that + """Registers for notifications to lean that process output is ready to read, and dispatches lines to + the respective line handlers. We are able to handle carriage returns in case progress is sent by that mean. For performance reasons, we only apply this to stderr. This function returns once the finalizer returns :return: result of finalizer @@ -77,7 +77,7 @@ def handle_process_output(process, stdout_handler, stderr_handler, finalizer): def dispatch_line(fno): stream, handler, readline = fdmap[fno] # this can possibly block for a while, but since we wake-up with at least one or more lines to handle, - # we are good ... + # we are good ... line = readline(stream).decode(defenc) if line and handler: handler(line) @@ -93,13 +93,13 @@ def handle_process_output(process, stdout_handler, stderr_handler, finalizer): # end deplete buffer if wg: wg.done() - # end + # end - fdmap = { process.stdout.fileno() : (process.stdout, stdout_handler, read_line_fast), - process.stderr.fileno() : (process.stderr, stderr_handler, read_line_slow) } + fdmap = {process.stdout.fileno(): (process.stdout, stdout_handler, read_line_fast), + process.stderr.fileno(): (process.stderr, stderr_handler, read_line_slow)} if hasattr(select, 'poll'): - # poll is preferred, as select is limited to file handles up to 1024 ... . This could otherwise be + # poll is preferred, as select is limited to file handles up to 1024 ... . This could otherwise be # an issue for us, as it matters how many handles or own process has poll = select.poll() READ_ONLY = select.POLLIN | select.POLLPRI | select.POLLHUP | select.POLLERR @@ -137,10 +137,10 @@ def handle_process_output(process, stdout_handler, stderr_handler, finalizer): wg = WaitGroup() for fno in fdmap.keys(): wg.add(1) - t = threading.Thread(target = lambda: deplete_buffer(fno, wg)) + t = threading.Thread(target=lambda: deplete_buffer(fno, wg)) t.start() # end - # NOTE: Just joining threads can possibly fail as there is a gap between .start() and when it's + # NOTE: Just joining threads can possibly fail as there is a gap between .start() and when it's # actually started, which could make the wait() call to just return because the thread is not yet # active wg.wait() @@ -148,7 +148,7 @@ def handle_process_output(process, stdout_handler, stderr_handler, finalizer): return finalizer(process) - + def dashify(string): return string.replace('_', '-') -- cgit v1.2.1