diff options
Diffstat (limited to 'lib/git/async')
-rw-r--r-- | lib/git/async/channel.py | 51 | ||||
-rw-r--r-- | lib/git/async/pool.py | 51 |
2 files changed, 57 insertions, 45 deletions
diff --git a/lib/git/async/channel.py b/lib/git/async/channel.py index bb118f30..abb31035 100644 --- a/lib/git/async/channel.py +++ b/lib/git/async/channel.py @@ -68,6 +68,32 @@ class WChannel(Channel): #} END interface +class CallbackWChannel(WChannel): + """The write end of a channel which allows you to setup a callback to be + called after an item was written to the channel""" + __slots__ = ('_pre_cb') + + def __init__(self): + WChannel.__init__(self) + self._pre_cb = None + + def set_pre_cb(self, fun = lambda item: item): + """Install a callback to be called before the given item is written. + It returns a possibly altered item which will be written to the channel + instead, making it useful for pre-write item conversions. + Providing None uninstalls the current method. + :return: the previously installed function or None + :note: Must be thread-safe if the channel is used in multiple threads""" + prev = self._pre_cb + self._pre_cb = fun + return prev + + def write(self, item, block=True, timeout=None): + if self._pre_cb: + item = self._pre_cb(item) + WChannel.write(self, item, block, timeout) + + class SerialWChannel(WChannel): """A slightly faster version of a WChannel, which sacrificed thead-safety for performance""" @@ -171,7 +197,32 @@ class RChannel(Channel): return out #} END interface + +class CallbackRChannel(RChannel): + """A channel which sends a callback before items are read from the channel""" + __slots__ = "_pre_cb" + + def __init__(self, wc): + RChannel.__init__(self, wc) + self._pre_cb = None + + def set_pre_cb(self, fun = lambda count: None): + """Install a callback to call with the item count to be read before any + item is actually read from the channel. + Exceptions will be propagated. + If a function is not provided, the call is effectively uninstalled. + :return: the previously installed callback or None + :note: The callback must be threadsafe if the channel is used by multiple threads.""" + prev = self._pre_cb + self._pre_cb = fun + return prev + def read(self, count=0, block=True, timeout=None): + if self._pre_cb: + self._pre_cb(count) + return RChannel.read(self, count, block, timeout) + + #} END classes #{ Constructors diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py index 284c41c7..7d4e96d1 100644 --- a/lib/git/async/pool.py +++ b/lib/git/async/pool.py @@ -21,26 +21,24 @@ from channel import ( mkchannel, WChannel, SerialWChannel, - RChannel + CallbackRChannel ) import sys from time import sleep -class RPoolChannel(RChannel): +class RPoolChannel(CallbackRChannel): """ A read-only pool channel may not be wrapped or derived from, but it provides slots to call before and after an item is to be read. It acts like a handle to the underlying task in the pool.""" - __slots__ = ('_task', '_pool', '_pre_cb', '_post_cb') + __slots__ = ('_task', '_pool') def __init__(self, wchannel, task, pool): - RChannel.__init__(self, wchannel) + CallbackRChannel.__init__(self, wchannel) self._task = task self._pool = pool - self._pre_cb = None - self._post_cb = None def __del__(self): """Assures that our task will be deleted if we were the last reader""" @@ -56,30 +54,10 @@ class RPoolChannel(RChannel): self._pool.remove_task(self._task) # END handle refcount based removal of task - def set_pre_cb(self, fun = lambda count: None): - """Install a callback to call with the item count to be read before any - item is actually read from the channel. The call must be threadsafe if - the channel is passed to more than one tasks. - If it fails, the read will fail with an IOError - If a function is not provided, the call is effectively uninstalled.""" - self._pre_cb = fun - - def set_post_cb(self, fun = lambda item: item): - """Install a callback to call after the items were read. The function - returns a possibly changed item list.The call must be threadsafe if - the channel is passed to more than one tasks. - If it raises, the exception will be propagated. - If a function is not provided, the call is effectively uninstalled.""" - self._post_cb = fun - def read(self, count=0, block=True, timeout=None): """Read an item that was processed by one of our threads :note: Triggers task dependency handling needed to provide the necessary input""" - if self._pre_cb: - self._pre_cb() - # END pre callback - # NOTE: we always queue the operation that would give us count items # as tracking the scheduled items or testing the channels size # is in herently unsafe depending on the design of the task network @@ -90,7 +68,7 @@ class RPoolChannel(RChannel): # NOTE: TODO: that case is only possible if one Task could be connected # to multiple input channels in a manner known by the system. Currently - # this is not possible, but should be implemented at some point + # this is not possible, but should be implemented at some point. # if the user tries to use us to read from a done task, we will never # compute as all produced items are already in the channel @@ -105,25 +83,12 @@ class RPoolChannel(RChannel): ####### read data ######## ########################## # read actual items, tasks were setup to put their output into our channel ( as well ) - items = RChannel.read(self, count, block, timeout) + items = CallbackRChannel.read(self, count, block, timeout) ########################## - if self._post_cb: - items = self._post_cb(items) - - - ####### Finalize ######## - self._pool._post_channel_read(self._task) return items - #{ Internal - def _read(self, count=0, block=False, timeout=None): - """Calls the underlying channel's read directly, without triggering - the pool""" - return RChannel.read(self, count, block, timeout) - - #} END internal class Pool(object): @@ -296,10 +261,6 @@ class Pool(object): # END for each task to process - def _post_channel_read(self, task): - """Called after we processed a read to cleanup""" - pass - def _remove_task_if_orphaned(self, task): """Check the task, and delete it if it is orphaned""" # 1 as its stored on the task, 1 for the getrefcount call |