summaryrefslogtreecommitdiff
path: root/lib/git/async/pool.py
diff options
context:
space:
mode:
Diffstat (limited to 'lib/git/async/pool.py')
-rw-r--r--lib/git/async/pool.py51
1 files changed, 6 insertions, 45 deletions
diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py
index 284c41c7..7d4e96d1 100644
--- a/lib/git/async/pool.py
+++ b/lib/git/async/pool.py
@@ -21,26 +21,24 @@ from channel import (
mkchannel,
WChannel,
SerialWChannel,
- RChannel
+ CallbackRChannel
)
import sys
from time import sleep
-class RPoolChannel(RChannel):
+class RPoolChannel(CallbackRChannel):
""" A read-only pool channel may not be wrapped or derived from, but it provides slots to call
before and after an item is to be read.
It acts like a handle to the underlying task in the pool."""
- __slots__ = ('_task', '_pool', '_pre_cb', '_post_cb')
+ __slots__ = ('_task', '_pool')
def __init__(self, wchannel, task, pool):
- RChannel.__init__(self, wchannel)
+ CallbackRChannel.__init__(self, wchannel)
self._task = task
self._pool = pool
- self._pre_cb = None
- self._post_cb = None
def __del__(self):
"""Assures that our task will be deleted if we were the last reader"""
@@ -56,30 +54,10 @@ class RPoolChannel(RChannel):
self._pool.remove_task(self._task)
# END handle refcount based removal of task
- def set_pre_cb(self, fun = lambda count: None):
- """Install a callback to call with the item count to be read before any
- item is actually read from the channel. The call must be threadsafe if
- the channel is passed to more than one tasks.
- If it fails, the read will fail with an IOError
- If a function is not provided, the call is effectively uninstalled."""
- self._pre_cb = fun
-
- def set_post_cb(self, fun = lambda item: item):
- """Install a callback to call after the items were read. The function
- returns a possibly changed item list.The call must be threadsafe if
- the channel is passed to more than one tasks.
- If it raises, the exception will be propagated.
- If a function is not provided, the call is effectively uninstalled."""
- self._post_cb = fun
-
def read(self, count=0, block=True, timeout=None):
"""Read an item that was processed by one of our threads
:note: Triggers task dependency handling needed to provide the necessary
input"""
- if self._pre_cb:
- self._pre_cb()
- # END pre callback
-
# NOTE: we always queue the operation that would give us count items
# as tracking the scheduled items or testing the channels size
# is in herently unsafe depending on the design of the task network
@@ -90,7 +68,7 @@ class RPoolChannel(RChannel):
# NOTE: TODO: that case is only possible if one Task could be connected
# to multiple input channels in a manner known by the system. Currently
- # this is not possible, but should be implemented at some point
+ # this is not possible, but should be implemented at some point.
# if the user tries to use us to read from a done task, we will never
# compute as all produced items are already in the channel
@@ -105,25 +83,12 @@ class RPoolChannel(RChannel):
####### read data ########
##########################
# read actual items, tasks were setup to put their output into our channel ( as well )
- items = RChannel.read(self, count, block, timeout)
+ items = CallbackRChannel.read(self, count, block, timeout)
##########################
- if self._post_cb:
- items = self._post_cb(items)
-
-
- ####### Finalize ########
- self._pool._post_channel_read(self._task)
return items
- #{ Internal
- def _read(self, count=0, block=False, timeout=None):
- """Calls the underlying channel's read directly, without triggering
- the pool"""
- return RChannel.read(self, count, block, timeout)
-
- #} END internal
class Pool(object):
@@ -296,10 +261,6 @@ class Pool(object):
# END for each task to process
- def _post_channel_read(self, task):
- """Called after we processed a read to cleanup"""
- pass
-
def _remove_task_if_orphaned(self, task):
"""Check the task, and delete it if it is orphaned"""
# 1 as its stored on the task, 1 for the getrefcount call