summaryrefslogtreecommitdiff
path: root/git/cmd.py
diff options
context:
space:
mode:
authorKostis Anagnostopoulos <ankostis@gmail.com>2016-09-28 01:05:38 +0200
committerKostis Anagnostopoulos <ankostis@gmail.com>2016-09-28 03:35:38 +0200
commita5db3d3c49ebe559cb80983d7bb855d4adf1b887 (patch)
tree8cc2b645e97c5420978ee8fcadc528e878fa99d9 /git/cmd.py
parent467416356a96148bcb01feb771f6ea20e5215727 (diff)
downloadgitpython-a5db3d3c49ebe559cb80983d7bb855d4adf1b887.tar.gz
io, dif: #519: FIX DIFF freeze when reading from GIL
+ CAUSE: In Windows, Diffs freeze while reading Popen streams, probably buffers smaller; good-thin(TM) in this case because reading a Popen-proc from the launching-thread freezes GIL. The alternative to use `proc.communicate()` also relies on big buffers. + SOLUTION: Use `cmd.handle_process_output()` to consume Diff-proc streams. + Retroffited `handle_process_output()` code to support also byte-streams, both Threading(Windows) and Select/Poll (Posix) paths updated. - TODO: Unfortunately, `Diff._index_from_patch_format()` still slurps input; need to re-phrase header-regexes linewise to resolve it.
Diffstat (limited to 'git/cmd.py')
-rw-r--r--git/cmd.py141
1 files changed, 74 insertions, 67 deletions
diff --git a/git/cmd.py b/git/cmd.py
index fb94c200..feb16e30 100644
--- a/git/cmd.py
+++ b/git/cmd.py
@@ -44,6 +44,7 @@ from git.compat import (
is_win,
)
import io
+from _io import UnsupportedOperation
execute_kwargs = set(('istream', 'with_keep_cwd', 'with_extended_output',
'with_exceptions', 'as_process', 'stdout_as_string',
@@ -56,7 +57,7 @@ log.addHandler(logging.NullHandler())
__all__ = ('Git',)
if is_win:
- WindowsError = OSError
+ WindowsError = OSError # @ReservedAssignment
if PY3:
_bchr = bchr
@@ -72,7 +73,8 @@ else:
# Documentation
## @{
-def handle_process_output(process, stdout_handler, stderr_handler, finalizer):
+def handle_process_output(process, stdout_handler, stderr_handler, finalizer,
+ decode_stdout=True, decode_stderr=True):
"""Registers for notifications to lean that process output is ready to read, and dispatches lines to
the respective line handlers. We are able to handle carriage returns in case progress is sent by that
mean. For performance reasons, we only apply this to stderr.
@@ -82,8 +84,6 @@ def handle_process_output(process, stdout_handler, stderr_handler, finalizer):
:param stdout_handler: f(stdout_line_string), or None
:param stderr_hanlder: f(stderr_line_string), or None
:param finalizer: f(proc) - wait for proc to finish"""
- fdmap = {process.stdout.fileno(): (stdout_handler, [b'']),
- process.stderr.fileno(): (stderr_handler, [b''])}
def _parse_lines_from_buffer(buf):
line = b''
@@ -94,7 +94,7 @@ def handle_process_output(process, stdout_handler, stderr_handler, finalizer):
bi += 1
if char in (b'\r', b'\n') and line:
- yield bi, line
+ yield bi, line + b'\n'
line = b''
else:
line += char
@@ -114,105 +114,111 @@ def handle_process_output(process, stdout_handler, stderr_handler, finalizer):
# keep remainder
last_buf_list[0] = buf[bi:]
- def _dispatch_single_line(line, handler):
- line = line.decode(defenc)
+ def _dispatch_single_line(line, handler, decode):
+ if decode:
+ line = line.decode(defenc)
if line and handler:
handler(line)
# end dispatch helper
# end single line helper
- def _dispatch_lines(fno, handler, buf_list):
+ def _dispatch_lines(fno, handler, buf_list, decode):
lc = 0
for line in _read_lines_from_fno(fno, buf_list):
- _dispatch_single_line(line, handler)
+ _dispatch_single_line(line, handler, decode)
lc += 1
# for each line
return lc
# end
- def _deplete_buffer(fno, handler, buf_list, wg=None):
+ def _deplete_buffer(fno, handler, buf_list, decode):
lc = 0
while True:
- line_count = _dispatch_lines(fno, handler, buf_list)
+ line_count = _dispatch_lines(fno, handler, buf_list, decode)
lc += line_count
if line_count == 0:
break
# end deplete buffer
if buf_list[0]:
- _dispatch_single_line(buf_list[0], handler)
+ _dispatch_single_line(buf_list[0], handler, decode)
lc += 1
# end
- if wg:
- wg.done()
-
return lc
# end
- if hasattr(select, 'poll'):
- # poll is preferred, as select is limited to file handles up to 1024 ... . This could otherwise be
- # an issue for us, as it matters how many handles our own process has
- poll = select.poll()
- READ_ONLY = select.POLLIN | select.POLLPRI | select.POLLHUP | select.POLLERR
- CLOSED = select.POLLHUP | select.POLLERR
-
- poll.register(process.stdout, READ_ONLY)
- poll.register(process.stderr, READ_ONLY)
-
- closed_streams = set()
- while True:
- # no timeout
-
- try:
- poll_result = poll.poll()
- except select.error as e:
- if e.args[0] == errno.EINTR:
- continue
- raise
- # end handle poll exception
-
- for fd, result in poll_result:
- if result & CLOSED:
- closed_streams.add(fd)
- else:
- _dispatch_lines(fd, *fdmap[fd])
- # end handle closed stream
- # end for each poll-result tuple
-
- if len(closed_streams) == len(fdmap):
- break
- # end its all done
- # end endless loop
-
- # Depelete all remaining buffers
- for fno, (handler, buf_list) in fdmap.items():
- _deplete_buffer(fno, handler, buf_list)
- # end for each file handle
-
- for fno in fdmap.keys():
- poll.unregister(fno)
- # end don't forget to unregister !
- else:
- # Oh ... probably we are on windows. select.select() can only handle sockets, we have files
+ try:
+ outfn = process.stdout.fileno()
+ errfn = process.stderr.fileno()
+ poll = select.poll() # @UndefinedVariable
+ except (UnsupportedOperation, AttributeError):
+ # Oh ... probably we are on windows. or TC mockap provided for streams.
+ # Anyhow, select.select() can only handle sockets, we have files
# The only reliable way to do this now is to use threads and wait for both to finish
- def _handle_lines(fd, handler):
+ def _handle_lines(fd, handler, decode):
for line in fd:
- line = line.decode(defenc)
- if line and handler:
+ if handler:
+ if decode:
+ line = line.decode(defenc)
handler(line)
threads = []
- for fd, handler in zip((process.stdout, process.stderr),
- (stdout_handler, stderr_handler)):
- t = threading.Thread(target=_handle_lines, args=(fd, handler))
+ for fd, handler, decode in zip((process.stdout, process.stderr),
+ (stdout_handler, stderr_handler),
+ (decode_stdout, decode_stderr),):
+ t = threading.Thread(target=_handle_lines, args=(fd, handler, decode))
t.setDaemon(True)
t.start()
threads.append(t)
for t in threads:
t.join()
- # end
+ else:
+ # poll is preferred, as select is limited to file handles up to 1024 ... . This could otherwise be
+ # an issue for us, as it matters how many handles our own process has
+ fdmap = {outfn: (stdout_handler, [b''], decode_stdout),
+ errfn: (stderr_handler, [b''], decode_stderr)}
+
+ READ_ONLY = select.POLLIN | select.POLLPRI | select.POLLHUP | select.POLLERR # @UndefinedVariable
+ CLOSED = select.POLLHUP | select.POLLERR # @UndefinedVariable
+
+ poll.register(process.stdout, READ_ONLY)
+ poll.register(process.stderr, READ_ONLY)
+
+ closed_streams = set()
+ while True:
+ # no timeout
+
+ try:
+ poll_result = poll.poll()
+ except select.error as e:
+ if e.args[0] == errno.EINTR:
+ continue
+ raise
+ # end handle poll exception
+
+ for fd, result in poll_result:
+ if result & CLOSED:
+ closed_streams.add(fd)
+ else:
+ _dispatch_lines(fd, *fdmap[fd])
+ # end handle closed stream
+ # end for each poll-result tuple
+
+ if len(closed_streams) == len(fdmap):
+ break
+ # end its all done
+ # end endless loop
+
+ # Depelete all remaining buffers
+ for fno, (handler, buf_list, decode) in fdmap.items():
+ _deplete_buffer(fno, handler, buf_list, decode)
+ # end for each file handle
+
+ for fno in fdmap.keys():
+ poll.unregister(fno)
+ # end don't forget to unregister !
return finalizer(process)
@@ -458,6 +464,7 @@ class Git(LazyMixin):
line = self.readline()
if not line:
raise StopIteration
+
return line
def __del__(self):