summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSebastian Thiel <byronimo@gmail.com>2010-06-09 14:01:51 +0200
committerSebastian Thiel <byronimo@gmail.com>2010-06-09 14:01:51 +0200
commit4e6bece08aea01859a232e99a1e1ad8cc1eb7d36 (patch)
tree33a0ebf9c5510b191de219029c8cfedb9df97ab3
parenta988e6985849e4f6a561b4a5468d525c25ce74fe (diff)
downloadgitpython-4e6bece08aea01859a232e99a1e1ad8cc1eb7d36.tar.gz
HSCondition: Fixed terrible bug which it inherited from its default python Condition implementation, related to the notify method not being treadsafe. Although I was aware of it, I missed the first check which tests for the size - the result could be incorrect if the whole method wasn't locked.
Testing runs stable now, allowing to move on \!
-rw-r--r--lib/git/async/pool.py7
-rw-r--r--lib/git/async/task.py2
-rw-r--r--lib/git/async/util.py47
-rw-r--r--test/git/async/test_pool.py17
4 files changed, 36 insertions, 37 deletions
diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py
index 7ed6fd8e..66a2a105 100644
--- a/lib/git/async/pool.py
+++ b/lib/git/async/pool.py
@@ -58,14 +58,17 @@ class RPoolChannel(RChannel):
def set_pre_cb(self, fun = lambda count: None):
"""Install a callback to call with the item count to be read before any
- item is actually read from the channel.
+ item is actually read from the channel. The call must be threadsafe if
+ the channel is passed to more than one tasks.
If it fails, the read will fail with an IOError
If a function is not provided, the call is effectively uninstalled."""
self._pre_cb = fun
def set_post_cb(self, fun = lambda item: item):
"""Install a callback to call after the items were read. The function
- returns a possibly changed item list. If it raises, the exception will be propagated.
+ returns a possibly changed item list.The call must be threadsafe if
+ the channel is passed to more than one tasks.
+ If it raises, the exception will be propagated.
If a function is not provided, the call is effectively uninstalled."""
self._post_cb = fun
diff --git a/lib/git/async/task.py b/lib/git/async/task.py
index dd2bd351..d18cedca 100644
--- a/lib/git/async/task.py
+++ b/lib/git/async/task.py
@@ -138,7 +138,7 @@ class OutputChannelTask(Node):
# just the right thing to do of course - one loose link in the chain ...
# Other chunks of our kind currently being processed will then
# fail to write to the channel and fail as well
- # self.close()
+ self.close()
# If some other chunk of our Task had an error, the channel will be closed
# This is not an issue, just be sure we don't overwrite the original
diff --git a/lib/git/async/util.py b/lib/git/async/util.py
index 008e60a3..2f46d55f 100644
--- a/lib/git/async/util.py
+++ b/lib/git/async/util.py
@@ -133,45 +133,28 @@ class HSCondition(deque):
# END assure release lock
def notify(self, n=1):
- """Its vital that this method is threadsafe - to be fast we don'd get a lock,
- but instead rely on pseudo-atomic operations that come with the GIL.
- Hence we use pop in the n=1 case to be truly atomic.
- In the multi-notify case, we acquire a lock just for safety, as otherwise
- we might pop too much of someone else notifies n waiters as well, which
- would in the worst case lead to double-releases of locks."""
- if not self:
- return
- if n == 1:
- # so here we assume this is thead-safe ! It wouldn't be in any other
- # language, but python it is.
- # But ... its two objects here - first the popleft, then the relasecall.
- # If the timing is really really bad, and that happens if you let it
- # run often enough ( its a matter of statistics ), this will fail,
- # which is why we lock it.
- # And yes, this causes some slow down, as single notifications happen
- # alot
- self._lock.acquire()
- try:
+ """Its vital that this method is threadsafe - we absolutely have to
+ get a lock at the beginning of this method to be sure we get the
+ correct amount of waiters back. If we bail out, although a waiter
+ is about to be added, it will miss its wakeup notification, and block
+ forever (possibly)"""
+ self._lock.acquire()
+ try:
+ if not self: # len(self) == 0, but this should be faster
+ return
+ if n == 1:
try:
self.popleft().release()
except IndexError:
pass
- finally:
- self._lock.release()
- # END assure lock is released
- else:
- self._lock.acquire()
- # once the waiter resumes, he will want to acquire the lock
- # and waits again, but only until we are done, which is important
- # to do that in a thread-safe fashion
- try:
+ else:
for i in range(min(n, len(self))):
self.popleft().release()
# END for each waiter to resume
- finally:
- self._lock.release()
- # END assure we release our lock
- # END handle n = 1 case faster
+ # END handle n = 1 case faster
+ finally:
+ self._lock.release()
+ # END assure lock is released
def notify_all(self):
self.notify(len(self))
diff --git a/test/git/async/test_pool.py b/test/git/async/test_pool.py
index dacbf0be..cccafddc 100644
--- a/test/git/async/test_pool.py
+++ b/test/git/async/test_pool.py
@@ -98,7 +98,8 @@ class TestThreadPool(TestBase):
items = rc.read()
assert len(items) == ni
task._assert(1, ni)
- assert items[0] == 0 and items[-1] == ni-1
+ if not async:
+ assert items[0] == 0 and items[-1] == ni-1
# as the task is done, it should have been removed - we have read everything
assert task.is_done()
@@ -152,8 +153,14 @@ class TestThreadPool(TestBase):
assert p.num_tasks() == null_tasks
task._assert(2, ni) # two chunks, ni calls
- # its already done, gives us no more
+ # its already done, gives us no more, its still okay to use it though
+ # as a task doesn't have to be in the graph to allow reading its produced
+ # items
print "read(0) on closed"
+ # it can happen that a thread closes the channel just a tiny fraction of time
+ # after we check this, so the test fails, although it is nearly closed.
+ # When we start reading, we should wake up once it sends its signal
+ # assert task.is_closed()
assert len(rc.read()) == 0
# test chunking
@@ -231,12 +238,18 @@ class TestThreadPool(TestBase):
rc = p.add_task(task)
print "read(0) with failure"
assert len(rc.read()) == 0 # failure on first item
+
print >> sys.stderr, "done with everything"
+
assert isinstance(task.error(), AssertionError)
assert task.is_done() # on error, its marked done as well
del(rc)
assert p.num_tasks() == null_tasks
+ # test failure after ni / 2 items
+ # This makes sure it correctly closes the channel on failure to prevent blocking
+
+
def _assert_async_dependent_tasks(self, p):
# includes failure in center task, 'recursive' orphan cleanup