diff options
author | Kostis Anagnostopoulos <ankostis@gmail.com> | 2016-09-28 17:56:21 +0200 |
---|---|---|
committer | Kostis Anagnostopoulos <ankostis@gmail.com> | 2016-09-28 18:10:34 +0200 |
commit | 0574b8b921dbfe1b39de68be7522b248b8404892 (patch) | |
tree | 1ba59391b9a21502a29fe567ddea4b1daed534c4 /git/cmd.py | |
parent | 6e98416791566f44a407dcac07a1e1f1b0483544 (diff) | |
download | gitpython-0574b8b921dbfe1b39de68be7522b248b8404892.tar.gz |
ABANDON select/poll
Diffstat (limited to 'git/cmd.py')
-rw-r--r-- | git/cmd.py | 233 |
1 files changed, 48 insertions, 185 deletions
@@ -4,48 +4,43 @@ # This module is part of GitPython and is released under # the BSD License: http://www.opensource.org/licenses/bsd-license.php -import os -import sys -import select -import logging -import threading -import errno -import mmap - -from git.odict import OrderedDict from contextlib import contextmanager +import io +import logging +import os import signal -import subprocess from subprocess import ( call, Popen, PIPE ) +import subprocess +import sys +import threading - -from .util import ( - LazyMixin, - stream_copy, -) -from .exc import ( - GitCommandError, - GitCommandNotFound -) from git.compat import ( string_types, defenc, force_bytes, PY3, - bchr, # just to satisfy flake8 on py3 unicode, safe_decode, is_posix, is_win, ) -import io -from _io import UnsupportedOperation from git.exc import CommandError +from git.odict import OrderedDict + +from .exc import ( + GitCommandError, + GitCommandNotFound +) +from .util import ( + LazyMixin, + stream_copy, +) + execute_kwargs = set(('istream', 'with_keep_cwd', 'with_extended_output', 'with_exceptions', 'as_process', 'stdout_as_string', @@ -57,13 +52,6 @@ log.addHandler(logging.NullHandler()) __all__ = ('Git',) -if PY3: - _bchr = bchr -else: - def _bchr(c): - return c -# get custom byte character handling - # ============================================================================== ## @name Utilities @@ -73,8 +61,7 @@ else: def handle_process_output(process, stdout_handler, stderr_handler, finalizer, decode_streams=True): """Registers for notifications to lean that process output is ready to read, and dispatches lines to - the respective line handlers. We are able to handle carriage returns in case progress is sent by that - mean. For performance reasons, we only apply this to stderr. + the respective line handlers. This function returns once the finalizer returns :return: result of finalizer @@ -88,160 +75,36 @@ def handle_process_output(process, stdout_handler, stderr_handler, finalizer, de Set it to False if `universal_newline == True` (then streams are in text-mode) or if decoding must happen later (i.e. for Diffs). """ - if decode_streams: - ZERO = b'' - LF = b'\n' - CR = b'\r' - else: - ZERO = u'' - LF = u'\n' - CR = u'\r' - - def _parse_lines_from_buffer(buf): - line = ZERO - bi = 0 - lb = len(buf) - while bi < lb: - char = buf[bi] - bi += 1 - - if char in (LF, CR) and line: - yield bi, line + LF - line = ZERO - else: - line += char - # END process parsed line - # END while file is not done reading - # end - - def _read_lines_from_fno(fno, last_buf_list): - buf = fno.read(mmap.PAGESIZE) - buf = last_buf_list[0] + buf - - bi = 0 - for bi, line in _parse_lines_from_buffer(buf): - yield line - # for each line to parse from the buffer - - # keep remainder - last_buf_list[0] = buf[bi:] - - def _dispatch_single_line(line, handler, decode): - if decode: - line = line.decode(defenc) - if line and handler: - handler(line) - # end dispatch helper - # end single line helper - - def _dispatch_lines(fno, handler, buf_list, decode): - lc = 0 - for line in _read_lines_from_fno(fno, buf_list): - _dispatch_single_line(line, handler, decode) - lc += 1 - # for each line - return lc - # end - - def _deplete_buffer(fno, handler, buf_list, decode): - lc = 0 - while True: - line_count = _dispatch_lines(fno, handler, buf_list, decode) - lc += line_count - if line_count == 0: - break - # end deplete buffer - - if buf_list[0]: - _dispatch_single_line(buf_list[0], handler, decode) - lc += 1 - # end - - return lc - # end - - try: - outfn = process.stdout.fileno() - errfn = process.stderr.fileno() - poll = select.poll() # @UndefinedVariable - except (UnsupportedOperation, AttributeError): - # Oh ... probably we are on windows. or TC mockap provided for streams. - # Anyhow, select.select() can only handle sockets, we have files - # The only reliable way to do this now is to use threads and wait for both to finish - def pump_stream(cmdline, name, stream, is_decode, handler): - try: - for line in stream: - if handler: - if is_decode: - line = line.decode(defenc) - handler(line) - except Exception as ex: - log.error("Pumping %r of cmd(%s) failed due to: %r", name, cmdline, ex) - raise CommandError(['<%s-pump>' % name] + cmdline, ex) - finally: - stream.close() - - cmdline = getattr(process, 'args', '') # PY3+ only - if not isinstance(cmdline, (tuple, list)): - cmdline = cmdline.split() - threads = [] - for name, stream, handler in ( - ('stdout', process.stdout, stdout_handler), - ('stderr', process.stderr, stderr_handler), - ): - t = threading.Thread(target=pump_stream, - args=(cmdline, name, stream, decode_streams, handler)) - t.setDaemon(True) - t.start() - threads.append(t) - - for t in threads: - t.join() - else: - # poll is preferred, as select is limited to file handles up to 1024 ... . This could otherwise be - # an issue for us, as it matters how many handles our own process has - fdmap = {outfn: (process.stdout, stdout_handler, [ZERO], decode_streams), - errfn: (process.stderr, stderr_handler, [ZERO], decode_streams)} - - READ_ONLY = select.POLLIN | select.POLLPRI | select.POLLHUP | select.POLLERR # @UndefinedVariable - CLOSED = select.POLLHUP | select.POLLERR # @UndefinedVariable - - poll.register(process.stdout, READ_ONLY) - poll.register(process.stderr, READ_ONLY) - - closed_streams = set() - while True: - # no timeout - - try: - poll_result = poll.poll() - except select.error as e: - if e.args[0] == errno.EINTR: - continue - raise - # end handle poll exception - - for fd, result in poll_result: - if result & CLOSED: - closed_streams.add(fd) - else: - _dispatch_lines(*fdmap[fd]) - # end handle closed stream - # end for each poll-result tuple - - if len(closed_streams) == len(fdmap): - break - # end its all done - # end endless loop - - # Depelete all remaining buffers - for fno, args in fdmap.items(): - _deplete_buffer(*args) - # end for each file handle - - for fno in fdmap.keys(): - poll.unregister(fno) - # end don't forget to unregister ! + # Use 2 "pupm" threads and wait for both to finish. + def pump_stream(cmdline, name, stream, is_decode, handler): + try: + for line in stream: + if handler: + if is_decode: + line = line.decode(defenc) + handler(line) + except Exception as ex: + log.error("Pumping %r of cmd(%s) failed due to: %r", name, cmdline, ex) + raise CommandError(['<%s-pump>' % name] + cmdline, ex) + finally: + stream.close() + + cmdline = getattr(process, 'args', '') # PY3+ only + if not isinstance(cmdline, (tuple, list)): + cmdline = cmdline.split() + threads = [] + for name, stream, handler in ( + ('stdout', process.stdout, stdout_handler), + ('stderr', process.stderr, stderr_handler), + ): + t = threading.Thread(target=pump_stream, + args=(cmdline, name, stream, decode_streams, handler)) + t.setDaemon(True) + t.start() + threads.append(t) + + for t in threads: + t.join() return finalizer(process) |