summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--git/cmd.py111
1 files changed, 77 insertions, 34 deletions
diff --git a/git/cmd.py b/git/cmd.py
index 4ceebb4c..80a30410 100644
--- a/git/cmd.py
+++ b/git/cmd.py
@@ -9,6 +9,9 @@ import sys
import select
import logging
import threading
+import errno
+import mmap
+
from subprocess import (
call,
Popen,
@@ -26,6 +29,7 @@ from git.compat import (
string_types,
defenc,
PY3,
+ bchr,
# just to satisfy flake8 on py3
unicode
)
@@ -41,6 +45,13 @@ __all__ = ('Git', )
if sys.platform != 'win32':
WindowsError = OSError
+if PY3:
+ _bchr = bchr
+else:
+ def _bchr(c):
+ return c
+# get custom byte character handling
+
# ==============================================================================
## @name Utilities
@@ -58,52 +69,73 @@ def handle_process_output(process, stdout_handler, stderr_handler, finalizer):
:param stdout_handler: f(stdout_line_string), or None
:param stderr_hanlder: f(stderr_line_string), or None
:param finalizer: f(proc) - wait for proc to finish"""
- def read_line_fast(stream):
- return stream.readline()
-
- def read_line_slow(stream):
+ def parse_lines_from_buffer(fno, buf):
line = b''
- while True:
- char = stream.read(1) # reads individual single byte strings
- if not char:
- break
+ bi = 0
+ lb = len(buf)
+ while bi < lb:
+ char = _bchr(buf[bi])
+ bi += 1
if char in (b'\r', b'\n') and line:
- break
+ yield bi, line
+ line = b''
else:
line += char
# END process parsed line
# END while file is not done reading
- return line
# end
- def dispatch_line(stream, handler, readline):
- # this can possibly block for a while, but since we wake-up with at least one or more lines to handle,
- # we are good ...
- line = readline(stream).decode(defenc)
- if line and handler:
- try:
- handler(line)
- except Exception:
- # Keep reading, have to pump the lines empty nontheless
- log.error("Line handler exception on line: %s", line, exc_info=True)
- # end
- return line
+ def read_lines_from_fno(fno, last_buf_list):
+ buf = os.read(fno, mmap.PAGESIZE)
+ buf = last_buf_list[0] + buf
+
+ bi = 0
+ for bi, line in parse_lines_from_buffer(fno, buf):
+ yield line
+ # for each line to parse from the buffer
+
+ # keep remainder
+ last_buf_list[0] = buf[bi:]
+
+ def dispatch_single_line(line, handler):
+ line = line.decode(defenc)
+ if line and handler:
+ try:
+ handler(line)
+ except Exception:
+ # Keep reading, have to pump the lines empty nontheless
+ log.error("Line handler exception on line: %s", line, exc_info=True)
+ # end
# end dispatch helper
+ # end single line helper
+
+ def dispatch_lines(fno, handler, buf_list):
+ lc = 0
+ for line in read_lines_from_fno(fno, buf_list):
+ dispatch_single_line(line, handler)
+ lc += 1
+ # for each line
+ return lc
# end
- def deplete_buffer(stream, handler, readline, wg=None):
+ def deplete_buffer(fno, handler, buf_list, wg=None):
while True:
- line = dispatch_line(stream, handler, readline)
- if not line:
+ line_count = dispatch_lines(fno, handler, buf_list)
+ if line_count == 0:
break
# end deplete buffer
+
+ if buf_list[0]:
+ dispatch_single_line(buf_list[0], handler)
+ # end
+
if wg:
wg.done()
# end
- fdmap = {process.stdout.fileno(): (process.stdout, stdout_handler, read_line_fast),
- process.stderr.fileno(): (process.stderr, stderr_handler, read_line_slow)}
+ fdmap = {process.stdout.fileno(): (stdout_handler, [b'']),
+ process.stderr.fileno(): (stderr_handler, [b''])}
if hasattr(select, 'poll'):
# poll is preferred, as select is limited to file handles up to 1024 ... . This could otherwise be
@@ -118,12 +150,20 @@ def handle_process_output(process, stdout_handler, stderr_handler, finalizer):
closed_streams = set()
while True:
# no timeout
- poll_result = poll.poll()
+
+ try:
+ poll_result = poll.poll()
+ except select.error as e:
+ if e.args[0] == errno.EINTR:
+ continue
+ raise
+ # end handle poll exception
+
for fd, result in poll_result:
if result & CLOSED:
closed_streams.add(fd)
else:
- dispatch_line(*fdmap[fd])
+ dispatch_lines(fd, *fdmap[fd])
# end handle closed stream
# end for each poll-result tuple
@@ -133,19 +173,22 @@ def handle_process_output(process, stdout_handler, stderr_handler, finalizer):
# end endless loop
# Depelete all remaining buffers
- for stream, handler, readline in fdmap.values():
- deplete_buffer(stream, handler, readline)
+ for fno, (handler, buf_list) in fdmap.items():
+ deplete_buffer(fno, handler, buf_list)
# end for each file handle
+
+ for fno in fdmap.keys():
+ poll.unregister(fno)
+ # end don't forget to unregister !
else:
# Oh ... probably we are on windows. select.select() can only handle sockets, we have files
# The only reliable way to do this now is to use threads and wait for both to finish
# Since the finalizer is expected to wait, we don't have to introduce our own wait primitive
# NO: It's not enough unfortunately, and we will have to sync the threads
wg = WaitGroup()
- for fno in fdmap.keys():
+ for fno, (handler, buf_list) in fdmap.items():
wg.add(1)
- stream, handler, readline = fdmap[fno]
- t = threading.Thread(target=lambda: deplete_buffer(stream, handler, readline, wg))
+ t = threading.Thread(target=lambda: deplete_buffer(fno, handler, buf_list, wg))
t.start()
# end
# NOTE: Just joining threads can possibly fail as there is a gap between .start() and when it's