summaryrefslogtreecommitdiff
path: root/lib/git/async
diff options
context:
space:
mode:
authorSebastian Thiel <byronimo@gmail.com>2010-06-09 16:38:21 +0200
committerSebastian Thiel <byronimo@gmail.com>2010-06-09 16:38:21 +0200
commitea81f14dafbfb24d70373c74b5f8dabf3f2225d9 (patch)
treeb020c7b72d33df91a35f27a40ef02dfd878bf5d2 /lib/git/async
parent07996a1a1e53ffdd2680d4bfbc2f4059687859a5 (diff)
downloadgitpython-ea81f14dafbfb24d70373c74b5f8dabf3f2225d9.tar.gz
Channel: Callbacks reviewed - they are now part of Subclasses of the default channel implementation, one of which is used as base by the Pool Read channel, releasing it of the duty to call these itself. The write channel with callback subclass allows the transformation of the item to be written
Diffstat (limited to 'lib/git/async')
-rw-r--r--lib/git/async/channel.py51
-rw-r--r--lib/git/async/pool.py51
2 files changed, 57 insertions, 45 deletions
diff --git a/lib/git/async/channel.py b/lib/git/async/channel.py
index bb118f30..abb31035 100644
--- a/lib/git/async/channel.py
+++ b/lib/git/async/channel.py
@@ -68,6 +68,32 @@ class WChannel(Channel):
#} END interface
+class CallbackWChannel(WChannel):
+ """The write end of a channel which allows you to setup a callback to be
+ called after an item was written to the channel"""
+ __slots__ = ('_pre_cb')
+
+ def __init__(self):
+ WChannel.__init__(self)
+ self._pre_cb = None
+
+ def set_pre_cb(self, fun = lambda item: item):
+ """Install a callback to be called before the given item is written.
+ It returns a possibly altered item which will be written to the channel
+ instead, making it useful for pre-write item conversions.
+ Providing None uninstalls the current method.
+ :return: the previously installed function or None
+ :note: Must be thread-safe if the channel is used in multiple threads"""
+ prev = self._pre_cb
+ self._pre_cb = fun
+ return prev
+
+ def write(self, item, block=True, timeout=None):
+ if self._pre_cb:
+ item = self._pre_cb(item)
+ WChannel.write(self, item, block, timeout)
+
+
class SerialWChannel(WChannel):
"""A slightly faster version of a WChannel, which sacrificed thead-safety for
performance"""
@@ -171,7 +197,32 @@ class RChannel(Channel):
return out
#} END interface
+
+class CallbackRChannel(RChannel):
+ """A channel which sends a callback before items are read from the channel"""
+ __slots__ = "_pre_cb"
+
+ def __init__(self, wc):
+ RChannel.__init__(self, wc)
+ self._pre_cb = None
+
+ def set_pre_cb(self, fun = lambda count: None):
+ """Install a callback to call with the item count to be read before any
+ item is actually read from the channel.
+ Exceptions will be propagated.
+ If a function is not provided, the call is effectively uninstalled.
+ :return: the previously installed callback or None
+ :note: The callback must be threadsafe if the channel is used by multiple threads."""
+ prev = self._pre_cb
+ self._pre_cb = fun
+ return prev
+ def read(self, count=0, block=True, timeout=None):
+ if self._pre_cb:
+ self._pre_cb(count)
+ return RChannel.read(self, count, block, timeout)
+
+
#} END classes
#{ Constructors
diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py
index 284c41c7..7d4e96d1 100644
--- a/lib/git/async/pool.py
+++ b/lib/git/async/pool.py
@@ -21,26 +21,24 @@ from channel import (
mkchannel,
WChannel,
SerialWChannel,
- RChannel
+ CallbackRChannel
)
import sys
from time import sleep
-class RPoolChannel(RChannel):
+class RPoolChannel(CallbackRChannel):
""" A read-only pool channel may not be wrapped or derived from, but it provides slots to call
before and after an item is to be read.
It acts like a handle to the underlying task in the pool."""
- __slots__ = ('_task', '_pool', '_pre_cb', '_post_cb')
+ __slots__ = ('_task', '_pool')
def __init__(self, wchannel, task, pool):
- RChannel.__init__(self, wchannel)
+ CallbackRChannel.__init__(self, wchannel)
self._task = task
self._pool = pool
- self._pre_cb = None
- self._post_cb = None
def __del__(self):
"""Assures that our task will be deleted if we were the last reader"""
@@ -56,30 +54,10 @@ class RPoolChannel(RChannel):
self._pool.remove_task(self._task)
# END handle refcount based removal of task
- def set_pre_cb(self, fun = lambda count: None):
- """Install a callback to call with the item count to be read before any
- item is actually read from the channel. The call must be threadsafe if
- the channel is passed to more than one tasks.
- If it fails, the read will fail with an IOError
- If a function is not provided, the call is effectively uninstalled."""
- self._pre_cb = fun
-
- def set_post_cb(self, fun = lambda item: item):
- """Install a callback to call after the items were read. The function
- returns a possibly changed item list.The call must be threadsafe if
- the channel is passed to more than one tasks.
- If it raises, the exception will be propagated.
- If a function is not provided, the call is effectively uninstalled."""
- self._post_cb = fun
-
def read(self, count=0, block=True, timeout=None):
"""Read an item that was processed by one of our threads
:note: Triggers task dependency handling needed to provide the necessary
input"""
- if self._pre_cb:
- self._pre_cb()
- # END pre callback
-
# NOTE: we always queue the operation that would give us count items
# as tracking the scheduled items or testing the channels size
# is in herently unsafe depending on the design of the task network
@@ -90,7 +68,7 @@ class RPoolChannel(RChannel):
# NOTE: TODO: that case is only possible if one Task could be connected
# to multiple input channels in a manner known by the system. Currently
- # this is not possible, but should be implemented at some point
+ # this is not possible, but should be implemented at some point.
# if the user tries to use us to read from a done task, we will never
# compute as all produced items are already in the channel
@@ -105,25 +83,12 @@ class RPoolChannel(RChannel):
####### read data ########
##########################
# read actual items, tasks were setup to put their output into our channel ( as well )
- items = RChannel.read(self, count, block, timeout)
+ items = CallbackRChannel.read(self, count, block, timeout)
##########################
- if self._post_cb:
- items = self._post_cb(items)
-
-
- ####### Finalize ########
- self._pool._post_channel_read(self._task)
return items
- #{ Internal
- def _read(self, count=0, block=False, timeout=None):
- """Calls the underlying channel's read directly, without triggering
- the pool"""
- return RChannel.read(self, count, block, timeout)
-
- #} END internal
class Pool(object):
@@ -296,10 +261,6 @@ class Pool(object):
# END for each task to process
- def _post_channel_read(self, task):
- """Called after we processed a read to cleanup"""
- pass
-
def _remove_task_if_orphaned(self, task):
"""Check the task, and delete it if it is orphaned"""
# 1 as its stored on the task, 1 for the getrefcount call