diff options
author | Sebastian Thiel <byronimo@gmail.com> | 2010-06-08 17:25:43 +0200 |
---|---|---|
committer | Sebastian Thiel <byronimo@gmail.com> | 2010-06-08 17:25:43 +0200 |
commit | 13dd59ba5b3228820841682b59bad6c22476ff66 (patch) | |
tree | 7b324d19f031112c78cafea469ed10ed689c57e1 /lib/git/async/pool.py | |
parent | 619c11787742ce00a0ee8f841cec075897873c79 (diff) | |
download | gitpython-13dd59ba5b3228820841682b59bad6c22476ff66.tar.gz |
task: now deletes itself once its done - for the test this doesn't change a thing as the task deletes itself too late - its time for a paradigm change, the task should be deleted with its RPoolChannel or explicitly by the user. The test needs to adapt, and shouldn't assume anything unless the RPoolChannel is gone
Diffstat (limited to 'lib/git/async/pool.py')
-rw-r--r-- | lib/git/async/pool.py | 57 |
1 files changed, 15 insertions, 42 deletions
diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py index cf1c2199..fce5e424 100644 --- a/lib/git/async/pool.py +++ b/lib/git/async/pool.py @@ -78,14 +78,17 @@ class RPoolChannel(RChannel): # have an item, but its currently being produced by some worker. # This is why we: # * make no assumptions if there are multiple consumers - # * - have_enough = False + # * + + # if the user tries to use us to read from a done task, we will never + # compute as all produced items are already in the channel + skip_compute = self._task.is_done() or self._task.error() #if count > 0: - # have_enough = self._task.scheduled_item_count() >= count or self._wc._queue.qsize() >= count + # skip_compute = self._task.scheduled_item_count() >= count or self._wc._queue.qsize() >= count # END ########## prepare ############################## - if not have_enough: + if not skip_compute: self._pool._prepare_channel_read(self._task, count) # END prepare pool scheduling @@ -134,7 +137,6 @@ class Pool(object): used only from the main thread, hence you cannot consume their results from multiple threads unless you use a task for it.""" __slots__ = ( '_tasks', # a graph of tasks - '_consumed_tasks', # a queue with tasks that are done or had an error '_workers', # list of worker threads '_queue', # master queue for tasks '_taskorder_cache', # map task id -> ordered dependent tasks @@ -157,7 +159,6 @@ class Pool(object): def __init__(self, size=0): self._tasks = Graph() - self._consumed_tasks = None self._workers = list() self._queue = self.TaskQueueCls() self._taskgraph_lock = self.LockCls() @@ -224,8 +225,10 @@ class Pool(object): # requested one last for task in dfirst_tasks: if task.error() or task.is_done(): - self._consumed_tasks.put(task) - continue + # in theory, the should never be consumed task in the pool, right ? + # They delete themselves once they are done. + raise AssertionError("Shouldn't have consumed tasks on the pool, they delete themeselves, what happend ?") + #continue # END skip processing # if the task does not have the required output on its queue, schedule @@ -297,26 +300,8 @@ class Pool(object): def _post_channel_read(self, task): """Called after we processed a read to cleanup""" - # check whether we consumed the task, and schedule it for deletion - # This could have happend after the read returned ( even though the pre-read - # checks it as well ) - if task.error() or task.is_done(): - self._consumed_tasks.put(task) - # END handle consumption - - self._handle_consumed_tasks() - - def _handle_consumed_tasks(self): - """Remove all consumed tasks from our queue by deleting them""" - try: - while True: - ct = self._consumed_tasks.get(False) - self.del_task(ct) - # END for each task to delete - except Empty: - pass - # END pop queue empty - + pass + def _del_task_if_orphaned(self, task): """Check the task, and delete it if it is orphaned""" # 1 as its stored on the task, 1 for the getrefcount call @@ -347,11 +332,6 @@ class Pool(object): # ourselves cur_count = len(self._workers) if cur_count < size: - # make sure we have a real queue, and can store our consumed tasks properly - if not isinstance(self._consumed_tasks, self.TaskQueueCls): - self._consumed_tasks = Queue() - # END init queue - for i in range(size - cur_count): worker = self.WorkerCls(self._queue) worker.start() @@ -377,9 +357,6 @@ class Pool(object): continue # END while there are tasks on the queue - if self._consumed_tasks and not self._consumed_tasks.empty(): - self._handle_consumed_tasks() - # END assure consumed tasks are empty self._consumed_tasks = SyncQueue() # END process queue return self @@ -437,11 +414,7 @@ class Pool(object): wc, rc = Channel() rc = RPoolChannel(wc, task, self) task.set_wc(wc) - - has_input_channel = isinstance(task, InputChannelTask) - if has_input_channel: - task.set_pool(self) - # END init input channel task + task.set_pool(self) self._taskgraph_lock.acquire() try: @@ -452,7 +425,7 @@ class Pool(object): # END sync task addition # If the input channel is one of our read channels, we add the relation - if has_input_channel: + if isinstance(task, InputChannelTask): ic = task.in_rc if isinstance(ic, RPoolChannel) and ic._pool is self: self._taskgraph_lock.acquire() |