diff options
Diffstat (limited to 'lib/git/async/pool.py')
-rw-r--r-- | lib/git/async/pool.py | 51 |
1 files changed, 6 insertions, 45 deletions
diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py index 284c41c7..7d4e96d1 100644 --- a/lib/git/async/pool.py +++ b/lib/git/async/pool.py @@ -21,26 +21,24 @@ from channel import ( mkchannel, WChannel, SerialWChannel, - RChannel + CallbackRChannel ) import sys from time import sleep -class RPoolChannel(RChannel): +class RPoolChannel(CallbackRChannel): """ A read-only pool channel may not be wrapped or derived from, but it provides slots to call before and after an item is to be read. It acts like a handle to the underlying task in the pool.""" - __slots__ = ('_task', '_pool', '_pre_cb', '_post_cb') + __slots__ = ('_task', '_pool') def __init__(self, wchannel, task, pool): - RChannel.__init__(self, wchannel) + CallbackRChannel.__init__(self, wchannel) self._task = task self._pool = pool - self._pre_cb = None - self._post_cb = None def __del__(self): """Assures that our task will be deleted if we were the last reader""" @@ -56,30 +54,10 @@ class RPoolChannel(RChannel): self._pool.remove_task(self._task) # END handle refcount based removal of task - def set_pre_cb(self, fun = lambda count: None): - """Install a callback to call with the item count to be read before any - item is actually read from the channel. The call must be threadsafe if - the channel is passed to more than one tasks. - If it fails, the read will fail with an IOError - If a function is not provided, the call is effectively uninstalled.""" - self._pre_cb = fun - - def set_post_cb(self, fun = lambda item: item): - """Install a callback to call after the items were read. The function - returns a possibly changed item list.The call must be threadsafe if - the channel is passed to more than one tasks. - If it raises, the exception will be propagated. - If a function is not provided, the call is effectively uninstalled.""" - self._post_cb = fun - def read(self, count=0, block=True, timeout=None): """Read an item that was processed by one of our threads :note: Triggers task dependency handling needed to provide the necessary input""" - if self._pre_cb: - self._pre_cb() - # END pre callback - # NOTE: we always queue the operation that would give us count items # as tracking the scheduled items or testing the channels size # is in herently unsafe depending on the design of the task network @@ -90,7 +68,7 @@ class RPoolChannel(RChannel): # NOTE: TODO: that case is only possible if one Task could be connected # to multiple input channels in a manner known by the system. Currently - # this is not possible, but should be implemented at some point + # this is not possible, but should be implemented at some point. # if the user tries to use us to read from a done task, we will never # compute as all produced items are already in the channel @@ -105,25 +83,12 @@ class RPoolChannel(RChannel): ####### read data ######## ########################## # read actual items, tasks were setup to put their output into our channel ( as well ) - items = RChannel.read(self, count, block, timeout) + items = CallbackRChannel.read(self, count, block, timeout) ########################## - if self._post_cb: - items = self._post_cb(items) - - - ####### Finalize ######## - self._pool._post_channel_read(self._task) return items - #{ Internal - def _read(self, count=0, block=False, timeout=None): - """Calls the underlying channel's read directly, without triggering - the pool""" - return RChannel.read(self, count, block, timeout) - - #} END internal class Pool(object): @@ -296,10 +261,6 @@ class Pool(object): # END for each task to process - def _post_channel_read(self, task): - """Called after we processed a read to cleanup""" - pass - def _remove_task_if_orphaned(self, task): """Check the task, and delete it if it is orphaned""" # 1 as its stored on the task, 1 for the getrefcount call |