summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSebastian Thiel <byronimo@gmail.com>2015-01-20 11:43:20 +0100
committerSebastian Thiel <byronimo@gmail.com>2015-01-20 11:43:20 +0100
commitea29541e213df928d356b3c12d4d074001395d3c (patch)
tree0971fd7abe2e542c92ed2b2ce8a130f628cc6fc3
parente395ac90bf088ad6b5de7dadb6531a6e7f3f4f3f (diff)
downloadgitpython-ea29541e213df928d356b3c12d4d074001395d3c.tar.gz
Another take on fixing the current concurrent read implementation in git.cmd
There have been rather obvious errors in there, as we forgot to unregister the filehandles. Now we will read from a buffer ourselves, which should be faster and ideally, doesn't lead to spurious errors anymore. Related to #232
-rw-r--r--git/cmd.py111
1 files changed, 77 insertions, 34 deletions
diff --git a/git/cmd.py b/git/cmd.py
index 4ceebb4c..80a30410 100644
--- a/git/cmd.py
+++ b/git/cmd.py
@@ -9,6 +9,9 @@ import sys
import select
import logging
import threading
+import errno
+import mmap
+
from subprocess import (
call,
Popen,
@@ -26,6 +29,7 @@ from git.compat import (
string_types,
defenc,
PY3,
+ bchr,
# just to satisfy flake8 on py3
unicode
)
@@ -41,6 +45,13 @@ __all__ = ('Git', )
if sys.platform != 'win32':
WindowsError = OSError
+if PY3:
+ _bchr = bchr
+else:
+ def _bchr(c):
+ return c
+# get custom byte character handling
+
# ==============================================================================
## @name Utilities
@@ -58,52 +69,73 @@ def handle_process_output(process, stdout_handler, stderr_handler, finalizer):
:param stdout_handler: f(stdout_line_string), or None
:param stderr_hanlder: f(stderr_line_string), or None
:param finalizer: f(proc) - wait for proc to finish"""
- def read_line_fast(stream):
- return stream.readline()
-
- def read_line_slow(stream):
+ def parse_lines_from_buffer(fno, buf):
line = b''
- while True:
- char = stream.read(1) # reads individual single byte strings
- if not char:
- break
+ bi = 0
+ lb = len(buf)
+ while bi < lb:
+ char = _bchr(buf[bi])
+ bi += 1
if char in (b'\r', b'\n') and line:
- break
+ yield bi, line
+ line = b''
else:
line += char
# END process parsed line
# END while file is not done reading
- return line
# end
- def dispatch_line(stream, handler, readline):
- # this can possibly block for a while, but since we wake-up with at least one or more lines to handle,
- # we are good ...
- line = readline(stream).decode(defenc)
- if line and handler:
- try:
- handler(line)
- except Exception:
- # Keep reading, have to pump the lines empty nontheless
- log.error("Line handler exception on line: %s", line, exc_info=True)
- # end
- return line
+ def read_lines_from_fno(fno, last_buf_list):
+ buf = os.read(fno, mmap.PAGESIZE)
+ buf = last_buf_list[0] + buf
+
+ bi = 0
+ for bi, line in parse_lines_from_buffer(fno, buf):
+ yield line
+ # for each line to parse from the buffer
+
+ # keep remainder
+ last_buf_list[0] = buf[bi:]
+
+ def dispatch_single_line(line, handler):
+ line = line.decode(defenc)
+ if line and handler:
+ try:
+ handler(line)
+ except Exception:
+ # Keep reading, have to pump the lines empty nontheless
+ log.error("Line handler exception on line: %s", line, exc_info=True)
+ # end
# end dispatch helper
+ # end single line helper
+
+ def dispatch_lines(fno, handler, buf_list):
+ lc = 0
+ for line in read_lines_from_fno(fno, buf_list):
+ dispatch_single_line(line, handler)
+ lc += 1
+ # for each line
+ return lc
# end
- def deplete_buffer(stream, handler, readline, wg=None):
+ def deplete_buffer(fno, handler, buf_list, wg=None):
while True:
- line = dispatch_line(stream, handler, readline)
- if not line:
+ line_count = dispatch_lines(fno, handler, buf_list)
+ if line_count == 0:
break
# end deplete buffer
+
+ if buf_list[0]:
+ dispatch_single_line(buf_list[0], handler)
+ # end
+
if wg:
wg.done()
# end
- fdmap = {process.stdout.fileno(): (process.stdout, stdout_handler, read_line_fast),
- process.stderr.fileno(): (process.stderr, stderr_handler, read_line_slow)}
+ fdmap = {process.stdout.fileno(): (stdout_handler, [b'']),
+ process.stderr.fileno(): (stderr_handler, [b''])}
if hasattr(select, 'poll'):
# poll is preferred, as select is limited to file handles up to 1024 ... . This could otherwise be
@@ -118,12 +150,20 @@ def handle_process_output(process, stdout_handler, stderr_handler, finalizer):
closed_streams = set()
while True:
# no timeout
- poll_result = poll.poll()
+
+ try:
+ poll_result = poll.poll()
+ except select.error as e:
+ if e.args[0] == errno.EINTR:
+ continue
+ raise
+ # end handle poll exception
+
for fd, result in poll_result:
if result & CLOSED:
closed_streams.add(fd)
else:
- dispatch_line(*fdmap[fd])
+ dispatch_lines(fd, *fdmap[fd])
# end handle closed stream
# end for each poll-result tuple
@@ -133,19 +173,22 @@ def handle_process_output(process, stdout_handler, stderr_handler, finalizer):
# end endless loop
# Depelete all remaining buffers
- for stream, handler, readline in fdmap.values():
- deplete_buffer(stream, handler, readline)
+ for fno, (handler, buf_list) in fdmap.items():
+ deplete_buffer(fno, handler, buf_list)
# end for each file handle
+
+ for fno in fdmap.keys():
+ poll.unregister(fno)
+ # end don't forget to unregister !
else:
# Oh ... probably we are on windows. select.select() can only handle sockets, we have files
# The only reliable way to do this now is to use threads and wait for both to finish
# Since the finalizer is expected to wait, we don't have to introduce our own wait primitive
# NO: It's not enough unfortunately, and we will have to sync the threads
wg = WaitGroup()
- for fno in fdmap.keys():
+ for fno, (handler, buf_list) in fdmap.items():
wg.add(1)
- stream, handler, readline = fdmap[fno]
- t = threading.Thread(target=lambda: deplete_buffer(stream, handler, readline, wg))
+ t = threading.Thread(target=lambda: deplete_buffer(fno, handler, buf_list, wg))
t.start()
# end
# NOTE: Just joining threads can possibly fail as there is a gap between .start() and when it's