diff options
-rw-r--r-- | git/cmd.py | 111 |
1 files changed, 77 insertions, 34 deletions
@@ -9,6 +9,9 @@ import sys import select import logging import threading +import errno +import mmap + from subprocess import ( call, Popen, @@ -26,6 +29,7 @@ from git.compat import ( string_types, defenc, PY3, + bchr, # just to satisfy flake8 on py3 unicode ) @@ -41,6 +45,13 @@ __all__ = ('Git', ) if sys.platform != 'win32': WindowsError = OSError +if PY3: + _bchr = bchr +else: + def _bchr(c): + return c +# get custom byte character handling + # ============================================================================== ## @name Utilities @@ -58,52 +69,73 @@ def handle_process_output(process, stdout_handler, stderr_handler, finalizer): :param stdout_handler: f(stdout_line_string), or None :param stderr_hanlder: f(stderr_line_string), or None :param finalizer: f(proc) - wait for proc to finish""" - def read_line_fast(stream): - return stream.readline() - - def read_line_slow(stream): + def parse_lines_from_buffer(fno, buf): line = b'' - while True: - char = stream.read(1) # reads individual single byte strings - if not char: - break + bi = 0 + lb = len(buf) + while bi < lb: + char = _bchr(buf[bi]) + bi += 1 if char in (b'\r', b'\n') and line: - break + yield bi, line + line = b'' else: line += char # END process parsed line # END while file is not done reading - return line # end - def dispatch_line(stream, handler, readline): - # this can possibly block for a while, but since we wake-up with at least one or more lines to handle, - # we are good ... - line = readline(stream).decode(defenc) - if line and handler: - try: - handler(line) - except Exception: - # Keep reading, have to pump the lines empty nontheless - log.error("Line handler exception on line: %s", line, exc_info=True) - # end - return line + def read_lines_from_fno(fno, last_buf_list): + buf = os.read(fno, mmap.PAGESIZE) + buf = last_buf_list[0] + buf + + bi = 0 + for bi, line in parse_lines_from_buffer(fno, buf): + yield line + # for each line to parse from the buffer + + # keep remainder + last_buf_list[0] = buf[bi:] + + def dispatch_single_line(line, handler): + line = line.decode(defenc) + if line and handler: + try: + handler(line) + except Exception: + # Keep reading, have to pump the lines empty nontheless + log.error("Line handler exception on line: %s", line, exc_info=True) + # end # end dispatch helper + # end single line helper + + def dispatch_lines(fno, handler, buf_list): + lc = 0 + for line in read_lines_from_fno(fno, buf_list): + dispatch_single_line(line, handler) + lc += 1 + # for each line + return lc # end - def deplete_buffer(stream, handler, readline, wg=None): + def deplete_buffer(fno, handler, buf_list, wg=None): while True: - line = dispatch_line(stream, handler, readline) - if not line: + line_count = dispatch_lines(fno, handler, buf_list) + if line_count == 0: break # end deplete buffer + + if buf_list[0]: + dispatch_single_line(buf_list[0], handler) + # end + if wg: wg.done() # end - fdmap = {process.stdout.fileno(): (process.stdout, stdout_handler, read_line_fast), - process.stderr.fileno(): (process.stderr, stderr_handler, read_line_slow)} + fdmap = {process.stdout.fileno(): (stdout_handler, [b'']), + process.stderr.fileno(): (stderr_handler, [b''])} if hasattr(select, 'poll'): # poll is preferred, as select is limited to file handles up to 1024 ... . This could otherwise be @@ -118,12 +150,20 @@ def handle_process_output(process, stdout_handler, stderr_handler, finalizer): closed_streams = set() while True: # no timeout - poll_result = poll.poll() + + try: + poll_result = poll.poll() + except select.error as e: + if e.args[0] == errno.EINTR: + continue + raise + # end handle poll exception + for fd, result in poll_result: if result & CLOSED: closed_streams.add(fd) else: - dispatch_line(*fdmap[fd]) + dispatch_lines(fd, *fdmap[fd]) # end handle closed stream # end for each poll-result tuple @@ -133,19 +173,22 @@ def handle_process_output(process, stdout_handler, stderr_handler, finalizer): # end endless loop # Depelete all remaining buffers - for stream, handler, readline in fdmap.values(): - deplete_buffer(stream, handler, readline) + for fno, (handler, buf_list) in fdmap.items(): + deplete_buffer(fno, handler, buf_list) # end for each file handle + + for fno in fdmap.keys(): + poll.unregister(fno) + # end don't forget to unregister ! else: # Oh ... probably we are on windows. select.select() can only handle sockets, we have files # The only reliable way to do this now is to use threads and wait for both to finish # Since the finalizer is expected to wait, we don't have to introduce our own wait primitive # NO: It's not enough unfortunately, and we will have to sync the threads wg = WaitGroup() - for fno in fdmap.keys(): + for fno, (handler, buf_list) in fdmap.items(): wg.add(1) - stream, handler, readline = fdmap[fno] - t = threading.Thread(target=lambda: deplete_buffer(stream, handler, readline, wg)) + t = threading.Thread(target=lambda: deplete_buffer(fno, handler, buf_list, wg)) t.start() # end # NOTE: Just joining threads can possibly fail as there is a gap between .start() and when it's |