summaryrefslogtreecommitdiff
path: root/lib/git/async
diff options
context:
space:
mode:
Diffstat (limited to 'lib/git/async')
-rw-r--r--lib/git/async/pool.py18
-rw-r--r--lib/git/async/task.py11
-rw-r--r--lib/git/async/util.py6
3 files changed, 30 insertions, 5 deletions
diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py
index 3de98777..19fc9f6e 100644
--- a/lib/git/async/pool.py
+++ b/lib/git/async/pool.py
@@ -67,10 +67,20 @@ class RPoolChannel(RChannel):
# if we have count items, don't do any queue preparation - if someone
# depletes the queue in the meanwhile, the channel will close and
# we will unblock naturally
+ # PROBLEM: If there are multiple consumer of this channel, we might
+ # run out of items without being replenished == block forever in the
+ # worst case. task.min_count could have triggered to produce more ...
+ # usually per read with n items, we put n items on to the queue,
+ # so we wouldn't check this
+ # Even if we have just one consumer ( we could determine that with
+ # the reference count ), it could be that in one moment we don't yet
+ # have an item, but its currently being produced by some worker.
+ # This is why we:
+ # * make no assumptions if there are multiple consumers
+ # *
have_enough = False
if count > 0:
- # explicitly > count, as we want a certain safe range
- have_enough = self._wc._queue.qsize() > count
+ have_enough = self._wc._queue.qsize() >= count
# END risky game
########## prepare ##############################
@@ -78,9 +88,11 @@ class RPoolChannel(RChannel):
self._pool._prepare_channel_read(self._task, count)
- ######### read data ######
+ ####### read data ########
+ ##########################
# read actual items, tasks were setup to put their output into our channel ( as well )
items = RChannel.read(self, count, block, timeout)
+ ##########################
if self._post_cb:
items = self._post_cb(items)
diff --git a/lib/git/async/task.py b/lib/git/async/task.py
index b282e371..4e8aef54 100644
--- a/lib/git/async/task.py
+++ b/lib/git/async/task.py
@@ -178,6 +178,17 @@ class InputChannelTask(OutputChannelTask):
# make sure we don't trigger the pool if we read from a pool channel which
# belongs to our own pool. Channels from different pools are fine though,
# there we want to trigger its computation
+ # PROBLEM: if the user keeps an end, but decides to put the same end into
+ # a task of this pool, then all items might deplete without new ones being
+ # produced, causing a deadlock. Just triggering the pool would be better,
+ # but cost's more, unnecessarily if there is just one consumer, which is
+ # the user.
+ # * could encode usage in the channel type, and fail if the refcount on
+ # the read-pool channel is too high
+ # * maybe keep track of the elements that are requested or in-production
+ # for each task, which would allow to precisely determine whether
+ # the pool as to be triggered, and bail out early. Problem would
+ # be the
if isinstance(self._in_rc, RPoolChannel) and self._in_rc._pool is self._pool_ref():
self._read = self._in_rc._read
diff --git a/lib/git/async/util.py b/lib/git/async/util.py
index 432d1736..85d44694 100644
--- a/lib/git/async/util.py
+++ b/lib/git/async/util.py
@@ -59,7 +59,7 @@ class SyncQueue(deque):
class HSCondition(_Condition):
"""An attempt to make conditions less blocking, which gains performance
in return by sleeping less"""
- delay = 0.00002 # reduces wait times, but increases overhead
+ delay = 0.00005 # reduces wait times, but increases overhead
def wait(self, timeout=None):
waiter = Lock()
@@ -85,7 +85,9 @@ class HSCondition(_Condition):
remaining = endtime - _time()
if remaining <= 0:
break
- delay = min(delay * 2, remaining, .05)
+ # this makes 4 threads working as good as two, but of course
+ # it causes more frequent micro-sleeping
+ #delay = min(delay * 2, remaining, .05)
_sleep(delay)
# END endless loop
if not gotit: