summaryrefslogtreecommitdiff
path: root/lib/git/async/pool.py
diff options
context:
space:
mode:
authorSebastian Thiel <byronimo@gmail.com>2010-06-09 16:38:21 +0200
committerSebastian Thiel <byronimo@gmail.com>2010-06-09 16:38:21 +0200
commitea81f14dafbfb24d70373c74b5f8dabf3f2225d9 (patch)
treeb020c7b72d33df91a35f27a40ef02dfd878bf5d2 /lib/git/async/pool.py
parent07996a1a1e53ffdd2680d4bfbc2f4059687859a5 (diff)
downloadgitpython-ea81f14dafbfb24d70373c74b5f8dabf3f2225d9.tar.gz
Channel: Callbacks reviewed - they are now part of Subclasses of the default channel implementation, one of which is used as base by the Pool Read channel, releasing it of the duty to call these itself. The write channel with callback subclass allows the transformation of the item to be written
Diffstat (limited to 'lib/git/async/pool.py')
-rw-r--r--lib/git/async/pool.py51
1 files changed, 6 insertions, 45 deletions
diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py
index 284c41c7..7d4e96d1 100644
--- a/lib/git/async/pool.py
+++ b/lib/git/async/pool.py
@@ -21,26 +21,24 @@ from channel import (
mkchannel,
WChannel,
SerialWChannel,
- RChannel
+ CallbackRChannel
)
import sys
from time import sleep
-class RPoolChannel(RChannel):
+class RPoolChannel(CallbackRChannel):
""" A read-only pool channel may not be wrapped or derived from, but it provides slots to call
before and after an item is to be read.
It acts like a handle to the underlying task in the pool."""
- __slots__ = ('_task', '_pool', '_pre_cb', '_post_cb')
+ __slots__ = ('_task', '_pool')
def __init__(self, wchannel, task, pool):
- RChannel.__init__(self, wchannel)
+ CallbackRChannel.__init__(self, wchannel)
self._task = task
self._pool = pool
- self._pre_cb = None
- self._post_cb = None
def __del__(self):
"""Assures that our task will be deleted if we were the last reader"""
@@ -56,30 +54,10 @@ class RPoolChannel(RChannel):
self._pool.remove_task(self._task)
# END handle refcount based removal of task
- def set_pre_cb(self, fun = lambda count: None):
- """Install a callback to call with the item count to be read before any
- item is actually read from the channel. The call must be threadsafe if
- the channel is passed to more than one tasks.
- If it fails, the read will fail with an IOError
- If a function is not provided, the call is effectively uninstalled."""
- self._pre_cb = fun
-
- def set_post_cb(self, fun = lambda item: item):
- """Install a callback to call after the items were read. The function
- returns a possibly changed item list.The call must be threadsafe if
- the channel is passed to more than one tasks.
- If it raises, the exception will be propagated.
- If a function is not provided, the call is effectively uninstalled."""
- self._post_cb = fun
-
def read(self, count=0, block=True, timeout=None):
"""Read an item that was processed by one of our threads
:note: Triggers task dependency handling needed to provide the necessary
input"""
- if self._pre_cb:
- self._pre_cb()
- # END pre callback
-
# NOTE: we always queue the operation that would give us count items
# as tracking the scheduled items or testing the channels size
# is in herently unsafe depending on the design of the task network
@@ -90,7 +68,7 @@ class RPoolChannel(RChannel):
# NOTE: TODO: that case is only possible if one Task could be connected
# to multiple input channels in a manner known by the system. Currently
- # this is not possible, but should be implemented at some point
+ # this is not possible, but should be implemented at some point.
# if the user tries to use us to read from a done task, we will never
# compute as all produced items are already in the channel
@@ -105,25 +83,12 @@ class RPoolChannel(RChannel):
####### read data ########
##########################
# read actual items, tasks were setup to put their output into our channel ( as well )
- items = RChannel.read(self, count, block, timeout)
+ items = CallbackRChannel.read(self, count, block, timeout)
##########################
- if self._post_cb:
- items = self._post_cb(items)
-
-
- ####### Finalize ########
- self._pool._post_channel_read(self._task)
return items
- #{ Internal
- def _read(self, count=0, block=False, timeout=None):
- """Calls the underlying channel's read directly, without triggering
- the pool"""
- return RChannel.read(self, count, block, timeout)
-
- #} END internal
class Pool(object):
@@ -296,10 +261,6 @@ class Pool(object):
# END for each task to process
- def _post_channel_read(self, task):
- """Called after we processed a read to cleanup"""
- pass
-
def _remove_task_if_orphaned(self, task):
"""Check the task, and delete it if it is orphaned"""
# 1 as its stored on the task, 1 for the getrefcount call