diff options
author | Sebastian Thiel <byronimo@gmail.com> | 2015-01-20 11:43:20 +0100 |
---|---|---|
committer | Sebastian Thiel <byronimo@gmail.com> | 2015-01-20 11:43:20 +0100 |
commit | ea29541e213df928d356b3c12d4d074001395d3c (patch) | |
tree | 0971fd7abe2e542c92ed2b2ce8a130f628cc6fc3 | |
parent | e395ac90bf088ad6b5de7dadb6531a6e7f3f4f3f (diff) | |
download | gitpython-ea29541e213df928d356b3c12d4d074001395d3c.tar.gz |
Another take on fixing the current concurrent read implementation in git.cmd
There have been rather obvious errors in there, as we forgot to unregister
the filehandles. Now we will read from a buffer ourselves, which should be
faster and ideally, doesn't lead to spurious errors anymore.
Related to #232
-rw-r--r-- | git/cmd.py | 111 |
1 files changed, 77 insertions, 34 deletions
@@ -9,6 +9,9 @@ import sys import select import logging import threading +import errno +import mmap + from subprocess import ( call, Popen, @@ -26,6 +29,7 @@ from git.compat import ( string_types, defenc, PY3, + bchr, # just to satisfy flake8 on py3 unicode ) @@ -41,6 +45,13 @@ __all__ = ('Git', ) if sys.platform != 'win32': WindowsError = OSError +if PY3: + _bchr = bchr +else: + def _bchr(c): + return c +# get custom byte character handling + # ============================================================================== ## @name Utilities @@ -58,52 +69,73 @@ def handle_process_output(process, stdout_handler, stderr_handler, finalizer): :param stdout_handler: f(stdout_line_string), or None :param stderr_hanlder: f(stderr_line_string), or None :param finalizer: f(proc) - wait for proc to finish""" - def read_line_fast(stream): - return stream.readline() - - def read_line_slow(stream): + def parse_lines_from_buffer(fno, buf): line = b'' - while True: - char = stream.read(1) # reads individual single byte strings - if not char: - break + bi = 0 + lb = len(buf) + while bi < lb: + char = _bchr(buf[bi]) + bi += 1 if char in (b'\r', b'\n') and line: - break + yield bi, line + line = b'' else: line += char # END process parsed line # END while file is not done reading - return line # end - def dispatch_line(stream, handler, readline): - # this can possibly block for a while, but since we wake-up with at least one or more lines to handle, - # we are good ... - line = readline(stream).decode(defenc) - if line and handler: - try: - handler(line) - except Exception: - # Keep reading, have to pump the lines empty nontheless - log.error("Line handler exception on line: %s", line, exc_info=True) - # end - return line + def read_lines_from_fno(fno, last_buf_list): + buf = os.read(fno, mmap.PAGESIZE) + buf = last_buf_list[0] + buf + + bi = 0 + for bi, line in parse_lines_from_buffer(fno, buf): + yield line + # for each line to parse from the buffer + + # keep remainder + last_buf_list[0] = buf[bi:] + + def dispatch_single_line(line, handler): + line = line.decode(defenc) + if line and handler: + try: + handler(line) + except Exception: + # Keep reading, have to pump the lines empty nontheless + log.error("Line handler exception on line: %s", line, exc_info=True) + # end # end dispatch helper + # end single line helper + + def dispatch_lines(fno, handler, buf_list): + lc = 0 + for line in read_lines_from_fno(fno, buf_list): + dispatch_single_line(line, handler) + lc += 1 + # for each line + return lc # end - def deplete_buffer(stream, handler, readline, wg=None): + def deplete_buffer(fno, handler, buf_list, wg=None): while True: - line = dispatch_line(stream, handler, readline) - if not line: + line_count = dispatch_lines(fno, handler, buf_list) + if line_count == 0: break # end deplete buffer + + if buf_list[0]: + dispatch_single_line(buf_list[0], handler) + # end + if wg: wg.done() # end - fdmap = {process.stdout.fileno(): (process.stdout, stdout_handler, read_line_fast), - process.stderr.fileno(): (process.stderr, stderr_handler, read_line_slow)} + fdmap = {process.stdout.fileno(): (stdout_handler, [b'']), + process.stderr.fileno(): (stderr_handler, [b''])} if hasattr(select, 'poll'): # poll is preferred, as select is limited to file handles up to 1024 ... . This could otherwise be @@ -118,12 +150,20 @@ def handle_process_output(process, stdout_handler, stderr_handler, finalizer): closed_streams = set() while True: # no timeout - poll_result = poll.poll() + + try: + poll_result = poll.poll() + except select.error as e: + if e.args[0] == errno.EINTR: + continue + raise + # end handle poll exception + for fd, result in poll_result: if result & CLOSED: closed_streams.add(fd) else: - dispatch_line(*fdmap[fd]) + dispatch_lines(fd, *fdmap[fd]) # end handle closed stream # end for each poll-result tuple @@ -133,19 +173,22 @@ def handle_process_output(process, stdout_handler, stderr_handler, finalizer): # end endless loop # Depelete all remaining buffers - for stream, handler, readline in fdmap.values(): - deplete_buffer(stream, handler, readline) + for fno, (handler, buf_list) in fdmap.items(): + deplete_buffer(fno, handler, buf_list) # end for each file handle + + for fno in fdmap.keys(): + poll.unregister(fno) + # end don't forget to unregister ! else: # Oh ... probably we are on windows. select.select() can only handle sockets, we have files # The only reliable way to do this now is to use threads and wait for both to finish # Since the finalizer is expected to wait, we don't have to introduce our own wait primitive # NO: It's not enough unfortunately, and we will have to sync the threads wg = WaitGroup() - for fno in fdmap.keys(): + for fno, (handler, buf_list) in fdmap.items(): wg.add(1) - stream, handler, readline = fdmap[fno] - t = threading.Thread(target=lambda: deplete_buffer(stream, handler, readline, wg)) + t = threading.Thread(target=lambda: deplete_buffer(fno, handler, buf_list, wg)) t.start() # end # NOTE: Just joining threads can possibly fail as there is a gap between .start() and when it's |