diff options
Diffstat (limited to 'lib/git/async')
-rw-r--r-- | lib/git/async/pool.py | 18 | ||||
-rw-r--r-- | lib/git/async/task.py | 11 | ||||
-rw-r--r-- | lib/git/async/util.py | 6 |
3 files changed, 30 insertions, 5 deletions
diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py index 3de98777..19fc9f6e 100644 --- a/lib/git/async/pool.py +++ b/lib/git/async/pool.py @@ -67,10 +67,20 @@ class RPoolChannel(RChannel): # if we have count items, don't do any queue preparation - if someone # depletes the queue in the meanwhile, the channel will close and # we will unblock naturally + # PROBLEM: If there are multiple consumer of this channel, we might + # run out of items without being replenished == block forever in the + # worst case. task.min_count could have triggered to produce more ... + # usually per read with n items, we put n items on to the queue, + # so we wouldn't check this + # Even if we have just one consumer ( we could determine that with + # the reference count ), it could be that in one moment we don't yet + # have an item, but its currently being produced by some worker. + # This is why we: + # * make no assumptions if there are multiple consumers + # * have_enough = False if count > 0: - # explicitly > count, as we want a certain safe range - have_enough = self._wc._queue.qsize() > count + have_enough = self._wc._queue.qsize() >= count # END risky game ########## prepare ############################## @@ -78,9 +88,11 @@ class RPoolChannel(RChannel): self._pool._prepare_channel_read(self._task, count) - ######### read data ###### + ####### read data ######## + ########################## # read actual items, tasks were setup to put their output into our channel ( as well ) items = RChannel.read(self, count, block, timeout) + ########################## if self._post_cb: items = self._post_cb(items) diff --git a/lib/git/async/task.py b/lib/git/async/task.py index b282e371..4e8aef54 100644 --- a/lib/git/async/task.py +++ b/lib/git/async/task.py @@ -178,6 +178,17 @@ class InputChannelTask(OutputChannelTask): # make sure we don't trigger the pool if we read from a pool channel which # belongs to our own pool. Channels from different pools are fine though, # there we want to trigger its computation + # PROBLEM: if the user keeps an end, but decides to put the same end into + # a task of this pool, then all items might deplete without new ones being + # produced, causing a deadlock. Just triggering the pool would be better, + # but cost's more, unnecessarily if there is just one consumer, which is + # the user. + # * could encode usage in the channel type, and fail if the refcount on + # the read-pool channel is too high + # * maybe keep track of the elements that are requested or in-production + # for each task, which would allow to precisely determine whether + # the pool as to be triggered, and bail out early. Problem would + # be the if isinstance(self._in_rc, RPoolChannel) and self._in_rc._pool is self._pool_ref(): self._read = self._in_rc._read diff --git a/lib/git/async/util.py b/lib/git/async/util.py index 432d1736..85d44694 100644 --- a/lib/git/async/util.py +++ b/lib/git/async/util.py @@ -59,7 +59,7 @@ class SyncQueue(deque): class HSCondition(_Condition): """An attempt to make conditions less blocking, which gains performance in return by sleeping less""" - delay = 0.00002 # reduces wait times, but increases overhead + delay = 0.00005 # reduces wait times, but increases overhead def wait(self, timeout=None): waiter = Lock() @@ -85,7 +85,9 @@ class HSCondition(_Condition): remaining = endtime - _time() if remaining <= 0: break - delay = min(delay * 2, remaining, .05) + # this makes 4 threads working as good as two, but of course + # it causes more frequent micro-sleeping + #delay = min(delay * 2, remaining, .05) _sleep(delay) # END endless loop if not gotit: |