summaryrefslogtreecommitdiff
path: root/test
diff options
context:
space:
mode:
Diffstat (limited to 'test')
-rw-r--r--test/git/async/task.py190
-rw-r--r--test/git/async/test_channel.py6
-rw-r--r--test/git/async/test_graph.py45
-rw-r--r--test/git/async/test_pool.py343
4 files changed, 397 insertions, 187 deletions
diff --git a/test/git/async/task.py b/test/git/async/task.py
new file mode 100644
index 00000000..9cc3cb9d
--- /dev/null
+++ b/test/git/async/task.py
@@ -0,0 +1,190 @@
+"""Module containing task implementations useful for testing them"""
+from git.async.task import *
+
+import threading
+import weakref
+
+class _TestTaskBase(object):
+ """Note: causes great slowdown due to the required locking of task variables"""
+ def __init__(self, *args, **kwargs):
+ super(_TestTaskBase, self).__init__(*args, **kwargs)
+ self.should_fail = False
+ self.lock = threading.Lock() # yes, can't safely do x = x + 1 :)
+ self.plock = threading.Lock()
+ self.item_count = 0
+ self.process_count = 0
+
+ def do_fun(self, item):
+ self.lock.acquire()
+ self.item_count += 1
+ self.lock.release()
+ if self.should_fail:
+ raise AssertionError("I am failing just for the fun of it")
+ return item
+
+ def process(self, count=1):
+ # must do it first, otherwise we might read and check results before
+ # the thread gets here :). Its a lesson !
+ self.plock.acquire()
+ self.process_count += 1
+ self.plock.release()
+ super(_TestTaskBase, self).process(count)
+
+ def _assert(self, pc, fc, check_scheduled=False):
+ """Assert for num process counts (pc) and num function counts (fc)
+ :return: self"""
+ self.lock.acquire()
+ if self.item_count != fc:
+ print self.item_count, fc
+ assert self.item_count == fc
+ self.lock.release()
+
+ # NOTE: asserting num-writers fails every now and then, implying a thread is
+ # still processing (an empty chunk) when we are checking it. This can
+ # only be prevented by checking the scheduled items, which requires locking
+ # and causes slowdows, so we don't do that. If the num_writers
+ # counter wouldn't be maintained properly, more tests would fail, so
+ # we can safely refrain from checking this here
+ # self._wlock.acquire()
+ # assert self._num_writers == 0
+ # self._wlock.release()
+ return self
+
+
+class TestThreadTaskNode(_TestTaskBase, InputIteratorThreadTask):
+ pass
+
+
+class TestThreadFailureNode(TestThreadTaskNode):
+ """Fails after X items"""
+ def __init__(self, *args, **kwargs):
+ self.fail_after = kwargs.pop('fail_after')
+ super(TestThreadFailureNode, self).__init__(*args, **kwargs)
+
+ def do_fun(self, item):
+ item = TestThreadTaskNode.do_fun(self, item)
+
+ self.lock.acquire()
+ try:
+ if self.item_count > self.fail_after:
+ raise AssertionError("Simulated failure after processing %i items" % self.fail_after)
+ finally:
+ self.lock.release()
+ # END handle fail after
+ return item
+
+
+class TestThreadInputChannelTaskNode(_TestTaskBase, InputChannelTask):
+ """Apply a transformation on items read from an input channel"""
+ def __init__(self, *args, **kwargs):
+ self.fail_after = kwargs.pop('fail_after', 0)
+ super(TestThreadInputChannelTaskNode, self).__init__(*args, **kwargs)
+
+ def do_fun(self, item):
+ """return tuple(i, i*2)"""
+ item = super(TestThreadInputChannelTaskNode, self).do_fun(item)
+
+ # fail after support
+ if self.fail_after:
+ self.lock.acquire()
+ try:
+ if self.item_count > self.fail_after:
+ raise AssertionError("Simulated failure after processing %i items" % self.fail_after)
+ finally:
+ self.lock.release()
+ # END handle fail-after
+
+ if isinstance(item, tuple):
+ i = item[0]
+ return item + (i * self.id, )
+ else:
+ return (item, item * self.id)
+ # END handle tuple
+
+
+class TestThreadInputChannelVerifyTaskNode(_TestTaskBase, InputChannelTask):
+ """An input channel task, which verifies the result of its input channels,
+ should be last in the chain.
+ Id must be int"""
+
+ def do_fun(self, item):
+ """return tuple(i, i*2)"""
+ item = super(TestThreadInputChannelVerifyTaskNode, self).do_fun(item)
+
+ # make sure the computation order matches
+ assert isinstance(item, tuple), "input was no tuple: %s" % item
+
+ base = item[0]
+ for id, num in enumerate(item[1:]):
+ assert num == base * id, "%i != %i, orig = %s" % (num, base * id, str(item))
+ # END verify order
+
+ return item
+
+
+#{ Utilities
+
+def make_proxy_method(t):
+ """required to prevent binding self into the method we call"""
+ wt = weakref.proxy(t)
+ return lambda item: wt.do_fun(item)
+
+def add_task_chain(p, ni, count=1, fail_setup=list(), feeder_channel=None, id_offset=0):
+ """Create a task chain of feeder, count transformers and order verifcator
+ to the pool p, like t1 -> t2 -> t3
+ :param fail_setup: a list of pairs, task_id, fail_after, i.e. [(2, 20)] would
+ make the third transformer fail after 20 items
+ :param feeder_channel: if set to a channel, it will be used as input of the
+ first transformation task. The respective first task in the return value
+ will be None.
+ :param id_offset: defines the id of the first transformation task, all subsequent
+ ones will add one
+ :return: tuple(list(task1, taskN, ...), list(rc1, rcN, ...))"""
+ nt = p.num_tasks()
+
+ feeder = None
+ frc = feeder_channel
+ if feeder_channel is None:
+ feeder = make_iterator_task(ni)
+ frc = p.add_task(feeder)
+ # END handle specific feeder
+
+ rcs = [frc]
+ tasks = [feeder]
+
+ inrc = frc
+ for tc in xrange(count):
+ t = TestThreadInputChannelTaskNode(inrc, tc+id_offset, None)
+
+ t.fun = make_proxy_method(t)
+ #t.fun = t.do_fun
+ inrc = p.add_task(t)
+
+ tasks.append(t)
+ rcs.append(inrc)
+ # END create count transformers
+
+ # setup failure
+ for id, fail_after in fail_setup:
+ tasks[1+id].fail_after = fail_after
+ # END setup failure
+
+ verifier = TestThreadInputChannelVerifyTaskNode(inrc, 'verifier', None)
+ #verifier.fun = verifier.do_fun
+ verifier.fun = make_proxy_method(verifier)
+ vrc = p.add_task(verifier)
+
+
+ tasks.append(verifier)
+ rcs.append(vrc)
+ return tasks, rcs
+
+def make_iterator_task(ni, taskcls=TestThreadTaskNode, **kwargs):
+ """:return: task which yields ni items
+ :param taskcls: the actual iterator type to use
+ :param **kwargs: additional kwargs to be passed to the task"""
+ t = taskcls(iter(range(ni)), 'iterator', None, **kwargs)
+ t.fun = make_proxy_method(t)
+ return t
+
+#} END utilities
diff --git a/test/git/async/test_channel.py b/test/git/async/test_channel.py
index 215081cd..a24c7c91 100644
--- a/test/git/async/test_channel.py
+++ b/test/git/async/test_channel.py
@@ -9,8 +9,8 @@ class TestChannels(TestBase):
def test_base(self):
# creating channel yields a write and a read channal
wc, rc = mkchannel()
- assert isinstance(wc, WChannel) # default args
- assert isinstance(rc, RChannel)
+ assert isinstance(wc, Writer) # default args
+ assert isinstance(rc, Reader)
# TEST UNLIMITED SIZE CHANNEL - writing+reading is FIFO
@@ -46,7 +46,7 @@ class TestChannels(TestBase):
# test callback channels
- wc, rc = mkchannel(wctype = CallbackWChannel, rctype = CallbackRChannel)
+ wc, rc = mkchannel(wtype = CallbackWriter, rtype = CallbackReader)
cb = [0, 0] # set slots to one if called
def pre_write(item):
diff --git a/test/git/async/test_graph.py b/test/git/async/test_graph.py
index 1a153e2d..7630226b 100644
--- a/test/git/async/test_graph.py
+++ b/test/git/async/test_graph.py
@@ -3,6 +3,7 @@ from test.testlib import *
from git.async.graph import *
import time
+import sys
class TestGraph(TestBase):
@@ -19,7 +20,7 @@ class TestGraph(TestBase):
# delete unconnected nodes
for n in g.nodes[:]:
- g.del_node(n)
+ g.remove_node(n)
# END del nodes
# add a chain of connected nodes
@@ -54,38 +55,26 @@ class TestGraph(TestBase):
# deleting a connected node clears its neighbour connections
assert n3.in_nodes[0] is n2
- assert g.del_node(n2) is g
- assert g.del_node(n2) is g # multi-deletion okay
+ assert g.remove_node(n2) is g
+ assert g.remove_node(n2) is g # multi-deletion okay
assert len(g.nodes) == nn - 1
assert len(n3.in_nodes) == 0
assert len(n1.out_nodes) == 0
# check the history from the last node
- last = g.nodes[-1]
- class Visitor(object):
- def __init__(self, origin):
- self.origin_seen = False
- self.origin = origin
- self.num_seen = 0
-
- def __call__(self, n):
- if n is self.origin:
- self.origin_seen = True
- else:
- assert not self.origin_seen, "should see origin last"
- # END check origin
- self.num_seen += 1
- return True
-
- def _assert(self, num_expected):
- assert self.origin_seen
- assert self.num_seen == num_expected
- # END visitor helper
-
end = g.nodes[-1]
- visitor = Visitor(end)
- g.visit_input_inclusive_depth_first(end, visitor)
-
+ dfirst_nodes = g.input_inclusive_dfirst_reversed(end)
num_nodes_seen = nn - 2 # deleted second, which leaves first one disconnected
- visitor._assert(num_nodes_seen)
+ assert len(dfirst_nodes) == num_nodes_seen
+ assert dfirst_nodes[-1] == end and dfirst_nodes[-2].id == end.id-1
+
+ # test cleanup
+ # its at least kept by its graph
+ assert sys.getrefcount(end) > 3
+ del(g)
+ del(n1); del(n2); del(n3)
+ del(dfirst_nodes)
+ del(last)
+ del(n)
+ assert sys.getrefcount(end) == 2
diff --git a/test/git/async/test_pool.py b/test/git/async/test_pool.py
index 2a5e4647..40c6d66e 100644
--- a/test/git/async/test_pool.py
+++ b/test/git/async/test_pool.py
@@ -1,169 +1,43 @@
"""Channel testing"""
from test.testlib import *
+from task import *
+
from git.async.pool import *
-from git.async.task import *
from git.async.thread import terminate_threads
from git.async.util import cpu_count
+
import threading
+import weakref
import time
import sys
-class _TestTaskBase(object):
- def __init__(self, *args, **kwargs):
- super(_TestTaskBase, self).__init__(*args, **kwargs)
- self.should_fail = False
- self.lock = threading.Lock() # yes, can't safely do x = x + 1 :)
- self.plock = threading.Lock()
- self.item_count = 0
- self.process_count = 0
-
- def do_fun(self, item):
- self.lock.acquire()
- self.item_count += 1
- self.lock.release()
- if self.should_fail:
- raise AssertionError("I am failing just for the fun of it")
- return item
-
- def process(self, count=1):
- # must do it first, otherwise we might read and check results before
- # the thread gets here :). Its a lesson !
- self.plock.acquire()
- self.process_count += 1
- self.plock.release()
- super(_TestTaskBase, self).process(count)
-
- def _assert(self, pc, fc, check_scheduled=False):
- """Assert for num process counts (pc) and num function counts (fc)
- :return: self"""
- self.lock.acquire()
- if self.item_count != fc:
- print self.item_count, fc
- assert self.item_count == fc
- self.lock.release()
-
- return self
-
-class TestThreadTaskNode(_TestTaskBase, InputIteratorThreadTask):
- pass
-
-class TestThreadFailureNode(TestThreadTaskNode):
- """Fails after X items"""
- def __init__(self, *args, **kwargs):
- self.fail_after = kwargs.pop('fail_after')
- super(TestThreadFailureNode, self).__init__(*args, **kwargs)
-
- def do_fun(self, item):
- item = TestThreadTaskNode.do_fun(self, item)
- if self.item_count > self.fail_after:
- raise AssertionError("Simulated failure after processing %i items" % self.fail_after)
- return item
-
-
-class TestThreadInputChannelTaskNode(_TestTaskBase, InputChannelTask):
- """Apply a transformation on items read from an input channel"""
-
- def do_fun(self, item):
- """return tuple(i, i*2)"""
- item = super(TestThreadInputChannelTaskNode, self).do_fun(item)
- if isinstance(item, tuple):
- i = item[0]
- return item + (i * self.id, )
- else:
- return (item, item * self.id)
- # END handle tuple
-
-
-class TestThreadInputChannelVerifyTaskNode(_TestTaskBase, InputChannelTask):
- """An input channel task, which verifies the result of its input channels,
- should be last in the chain.
- Id must be int"""
-
- def do_fun(self, item):
- """return tuple(i, i*2)"""
- item = super(TestThreadInputChannelTaskNode, self).do_fun(item)
-
- # make sure the computation order matches
- assert isinstance(item, tuple)
-
- base = item[0]
- for num in item[1:]:
- assert num == base * 2
- base = num
- # END verify order
-
- return item
-
-
class TestThreadPool(TestBase):
max_threads = cpu_count()
- def _add_task_chain(self, p, ni, count=1):
- """Create a task chain of feeder, count transformers and order verifcator
- to the pool p, like t1 -> t2 -> t3
- :return: tuple(list(task1, taskN, ...), list(rc1, rcN, ...))"""
- nt = p.num_tasks()
-
- feeder = self._make_iterator_task(ni)
- frc = p.add_task(feeder)
-
- assert p.num_tasks() == nt + 1
-
- rcs = [frc]
- tasks = [feeder]
-
- inrc = frc
- for tc in xrange(count):
- t = TestThreadInputChannelTaskNode(inrc, tc, None)
- t.fun = t.do_fun
- inrc = p.add_task(t)
-
- tasks.append(t)
- rcs.append(inrc)
- assert p.num_tasks() == nt + 2 + tc
- # END create count transformers
-
- verifier = TestThreadInputChannelVerifyTaskNode(inrc, 'verifier', None)
- verifier.fun = verifier.do_fun
- vrc = p.add_task(verifier)
-
- assert p.num_tasks() == nt + tc + 3
-
- tasks.append(verifier)
- rcs.append(vrc)
- return tasks, rcs
-
- def _make_iterator_task(self, ni, taskcls=TestThreadTaskNode, **kwargs):
- """:return: task which yields ni items
- :param taskcls: the actual iterator type to use
- :param **kwargs: additional kwargs to be passed to the task"""
- t = taskcls(iter(range(ni)), 'iterator', None, **kwargs)
- t.fun = t.do_fun
- return t
-
def _assert_single_task(self, p, async=False):
"""Performs testing in a synchronized environment"""
+ # return # DEBUG TODO: Fixme deactivated it
print >> sys.stderr, "Threadpool: Starting single task (async = %i) with %i threads" % (async, p.size())
null_tasks = p.num_tasks() # in case we had some before
# add a simple task
# it iterates n items
- ni = 5000
+ ni = 1000
assert ni % 2 == 0, "ni needs to be dividable by 2"
assert ni % 4 == 0, "ni needs to be dividable by 4"
- make_task = lambda *args, **kwargs: self._make_iterator_task(ni, *args, **kwargs)
+ make_task = lambda *args, **kwargs: make_iterator_task(ni, *args, **kwargs)
task = make_task()
assert p.num_tasks() == null_tasks
rc = p.add_task(task)
assert p.num_tasks() == 1 + null_tasks
- assert isinstance(rc, RPoolChannel)
- assert task._out_wc is not None
+ assert isinstance(rc, PoolReader)
+ assert task._out_writer is not None
# pull the result completely - we should get one task, which calls its
# function once. In sync mode, the order matches
@@ -258,8 +132,13 @@ class TestThreadPool(TestBase):
assert len(items) == nri
task._assert( 5, ni)
- assert task.is_done()
+
+ # delete the handle first, causing the task to be removed and to be set
+ # done. We check for the set-done state later. Depending on the timing,
+ # The task is not yet set done when we are checking it because we were
+ # scheduled in before the flag could be set.
del(rc)
+ assert task.is_done()
assert p.num_tasks() == null_tasks # depleted
# but this only hits if we want too many items, if we want less, it could
@@ -312,8 +191,6 @@ class TestThreadPool(TestBase):
print "read(0) with failure"
assert len(rc.read()) == 0 # failure on first item
- print >> sys.stderr, "done with everything"
-
assert isinstance(task.error(), AssertionError)
assert task.is_done() # on error, its marked done as well
del(rc)
@@ -328,40 +205,181 @@ class TestThreadPool(TestBase):
assert task.is_done()
assert isinstance(task.error(), AssertionError)
+ print >> sys.stderr, "done with everything"
+
def _assert_async_dependent_tasks(self, pool):
# includes failure in center task, 'recursive' orphan cleanup
# This will also verify that the channel-close mechanism works
# t1 -> t2 -> t3
- # t1 -> x -> t3
+
+ print >> sys.stderr, "Threadpool: starting async dependency test in %i threads" % pool.size()
null_tasks = pool.num_tasks()
- ni = 100
- count = 1
- make_task = lambda *args, **kwargs: self._add_task_chain(pool, ni, count, *args, **kwargs)
+ ni = 1000
+ count = 3
+ aic = count + 2
+ make_task = lambda *args, **kwargs: add_task_chain(pool, ni, count, *args, **kwargs)
ts, rcs = make_task()
- assert len(ts) == count + 2
- assert len(rcs) == count + 2
+ assert len(ts) == aic
+ assert len(rcs) == aic
assert pool.num_tasks() == null_tasks + len(ts)
- print pool._tasks.nodes
+ # read(0)
+ #########
+ st = time.time()
+ items = rcs[-1].read()
+ elapsed = time.time() - st
+ print len(items), ni
+ assert len(items) == ni
+ del(rcs)
+ assert pool.num_tasks() == 0 # tasks depleted, all done, no handles
+ # wait a tiny moment - there could still be something unprocessed on the
+ # queue, increasing the refcount
+ time.sleep(0.15)
+ assert sys.getrefcount(ts[-1]) == 2 # ts + call
+ assert sys.getrefcount(ts[0]) == 2 # ts + call
+ print >> sys.stderr, "Dependent Tasks: evaluated %i items of %i dependent in %f s ( %i items / s )" % (ni, aic, elapsed, ni / elapsed)
- # in the end, we expect all tasks to be gone, automatically
+ # read(1)
+ #########
+ ts, rcs = make_task()
+ st = time.time()
+ for i in xrange(ni):
+ items = rcs[-1].read(1)
+ assert len(items) == 1
+ # END for each item to pull
+ elapsed_single = time.time() - st
+ # another read yields nothing, its empty
+ assert len(rcs[-1].read()) == 0
+ print >> sys.stderr, "Dependent Tasks: evaluated %i items with read(1) of %i dependent in %f s ( %i items / s )" % (ni, aic, elapsed_single, ni / elapsed_single)
+
+
+ # read with min-count size
+ ###########################
+ # must be faster, as it will read ni / 4 chunks
+ # Its enough to set one task, as it will force all others in the chain
+ # to min_size as well.
+ ts, rcs = make_task()
+ assert pool.num_tasks() == len(ts)
+ nri = ni / 4
+ ts[-1].min_count = nri
+ st = time.time()
+ for i in xrange(ni):
+ items = rcs[-1].read(1)
+ assert len(items) == 1
+ # END for each item to read
+ elapsed_minsize = time.time() - st
+ # its empty
+ assert len(rcs[-1].read()) == 0
+ print >> sys.stderr, "Dependent Tasks: evaluated %i items with read(1), min_size=%i, of %i dependent in %f s ( %i items / s )" % (ni, nri, aic, elapsed_minsize, ni / elapsed_minsize)
+
+ # it should have been a bit faster at least, and most of the time it is
+ # Sometimes, its not, mainly because:
+ # * The test tasks lock a lot, hence they slow down the system
+ # * Each read will still trigger the pool to evaluate, causing some overhead
+ # even though there are enough items on the queue in that case. Keeping
+ # track of the scheduled items helped there, but it caused further inacceptable
+ # slowdown
+ # assert elapsed_minsize < elapsed_single
+
+
+ # read with failure
+ ###################
+ # it should recover and give at least fail_after items
+ # t1 -> x -> t3
+ fail_after = ni/2
+ ts, rcs = make_task(fail_setup=[(0, fail_after)])
+ items = rcs[-1].read()
+ assert len(items) == fail_after
+
+
+ # MULTI-POOL
+ # If two pools are connected, this shold work as well.
+ # The second one has just one more thread
+ ts, rcs = make_task()
+
+ # connect verifier channel as feeder of the second pool
+ p2 = ThreadPool(0) # don't spawn new threads, they have the tendency not to wake up on mutexes
+ assert p2.size() == 0
+ p2ts, p2rcs = add_task_chain(p2, ni, count, feeder_channel=rcs[-1], id_offset=count)
+ assert p2ts[0] is None # we have no feeder task
+ assert rcs[-1].pool_ref()() is pool # it didnt change the pool
+ assert rcs[-1] is p2ts[1].reader()
+ assert p2.num_tasks() == len(p2ts)-1 # first is None
+
+ # reading from the last one will evaluate all pools correctly
+ print "read(0) multi-pool"
+ st = time.time()
+ items = p2rcs[-1].read()
+ elapsed = time.time() - st
+ assert len(items) == ni
+
+ print >> sys.stderr, "Dependent Tasks: evaluated 2 connected pools and %i items with read(0), of %i dependent tasks in %f s ( %i items / s )" % (ni, aic + aic-1, elapsed, ni / elapsed)
+
+
+ # loose the handles of the second pool to allow others to go as well
+ del(p2rcs); del(p2ts)
+ assert p2.num_tasks() == 0
+
+ # now we lost our old handles as well, and the tasks go away
+ ts, rcs = make_task()
+ assert pool.num_tasks() == len(ts)
+
+ p2ts, p2rcs = add_task_chain(p2, ni, count, feeder_channel=rcs[-1], id_offset=count)
+ assert p2.num_tasks() == len(p2ts) - 1
+
+ # Test multi-read(1)
+ print "read(1) * %i" % ni
+ reader = rcs[-1]
+ st = time.time()
+ for i in xrange(ni):
+ items = reader.read(1)
+ assert len(items) == 1
+ # END for each item to get
+ elapsed = time.time() - st
+ del(reader) # decrement refcount
+
+ print >> sys.stderr, "Dependent Tasks: evaluated 2 connected pools and %i items with read(1), of %i dependent tasks in %f s ( %i items / s )" % (ni, aic + aic-1, elapsed, ni / elapsed)
+
+ # another read is empty
+ assert len(rcs[-1].read()) == 0
+
+ # now that both are connected, I can drop my handle to the reader
+ # without affecting the task-count, but whats more important:
+ # They remove their tasks correctly once we drop our references in the
+ # right order
+ del(p2ts)
+ assert p2rcs[0] is rcs[-1]
+ del(p2rcs)
+ assert p2.num_tasks() == 0
+ del(p2)
+
+ assert pool.num_tasks() == null_tasks + len(ts)
- # order of deletion matters - just keep the end, then delete
- final_rc = rcs[-1]
del(ts)
del(rcs)
- del(final_rc)
- assert pool.num_tasks() == null_tasks
+
+ assert pool.num_tasks() == null_tasks
+
+
+ # ASSERTION: We already tested that one pool behaves correctly when an error
+ # occours - if two pools handle their ref-counts correctly, which they
+ # do if we are here, then they should handle errors happening during
+ # the task processing as expected as well. Hence we can safe this here
+
- @terminate_threads
+ # for some reason, sometimes it has multiple workerthreads already when he
+ # enters the method ... dunno yet, pools should clean up themselvess
+ #@terminate_threads
def test_base(self):
+ assert len(threading.enumerate()) == 1
+
p = ThreadPool()
# default pools have no workers
@@ -386,17 +404,28 @@ class TestThreadPool(TestBase):
# SINGLE TASK SERIAL SYNC MODE
##############################
- # put a few unrelated tasks that we forget about
- urc1 = p.add_task(TestThreadTaskNode(iter(list()), "nothing", None))
- urc2 = p.add_task(TestThreadTaskNode(iter(list()), "nothing", None))
+ # put a few unrelated tasks that we forget about - check ref counts and cleanup
+ t1, t2 = TestThreadTaskNode(iter(list()), "nothing1", None), TestThreadTaskNode(iter(list()), "nothing2", None)
+ urc1 = p.add_task(t1)
+ urc2 = p.add_task(t2)
assert p.num_tasks() == 2
## SINGLE TASK #################
self._assert_single_task(p, False)
assert p.num_tasks() == 2
del(urc1)
- del(urc2)
+ assert p.num_tasks() == 1
+
+ p.remove_task(t2)
+ assert p.num_tasks() == 0
+ assert sys.getrefcount(t2) == 2
+
+ t3 = TestThreadInputChannelTaskNode(urc2, "channel", None)
+ urc3 = p.add_task(t3)
+ assert p.num_tasks() == 1
+ del(urc3)
assert p.num_tasks() == 0
+ assert sys.getrefcount(t3) == 2
# DEPENDENT TASKS SYNC MODE
@@ -438,4 +467,6 @@ class TestThreadPool(TestBase):
###########################
self._assert_async_dependent_tasks(p)
-
+ print >> sys.stderr, "Done with everything"
+
+ # TODO: test multi-pool connections