diff options
| author | Sebastian Thiel <byronimo@gmail.com> | 2010-06-07 19:12:44 +0200 | 
|---|---|---|
| committer | Sebastian Thiel <byronimo@gmail.com> | 2010-06-07 19:12:44 +0200 | 
| commit | 583cd8807259a69fc01874b798f657c1f9ab7828 (patch) | |
| tree | 046847c4dcd33f5b30c00ff65770039fc72dd148 /lib/git/async/task.py | |
| parent | edd9e23c766cfd51b3a6f6eee5aac0b791ef2fd0 (diff) | |
| download | gitpython-583cd8807259a69fc01874b798f657c1f9ab7828.tar.gz | |
Moved pool utilities into util module, fixed critical issue that caused havok - lets call this a safe-state
Diffstat (limited to 'lib/git/async/task.py')
| -rw-r--r-- | lib/git/async/task.py | 26 | 
1 files changed, 25 insertions, 1 deletions
| diff --git a/lib/git/async/task.py b/lib/git/async/task.py index 3137746c..f106c381 100644 --- a/lib/git/async/task.py +++ b/lib/git/async/task.py @@ -1,5 +1,7 @@  from graph import Node +  import threading +import weakref  import new  class OutputChannelTask(Node): @@ -17,6 +19,7 @@ class OutputChannelTask(Node):  	__slots__ = (	'_read',			# method to yield items to process   					'_out_wc', 			# output write channel  					'_exc',				# exception caught +					'_done',			# True if we are done  					'fun',				# function to call with items read  					'min_count', 		# minimum amount of items to produce, None means no override  					'max_chunksize',	# maximium amount of items to process per process call @@ -28,6 +31,7 @@ class OutputChannelTask(Node):  		self._read = None					# to be set by subclasss   		self._out_wc = None					# to be set later  		self._exc = None +		self._done = False  		self.fun = fun  		self.min_count = None  		self.max_chunksize = 0				# note set @@ -35,12 +39,28 @@ class OutputChannelTask(Node):  	def is_done(self):  		""":return: True if we are finished processing""" -		return self._out_wc.closed +		return self._done  	def set_done(self):  		"""Set ourselves to being done, has we have completed the processing""" +		self._done = True +		self.close() +		 +	def set_wc(self, wc): +		"""Set the write channel to the given one +		:note: resets it done state in order to allow proper queue handling""" +		self._done = False +		self._out_wc = wc +		 +	def close(self): +		"""A closed task will close its channel to assure the readers will wake up +		:note: its safe to call this method multiple times"""  		self._out_wc.close() +	def is_closed(self): +		""":return: True if the task's write channel is closed""" +		return self._out_wc.closed +		  	def error(self):  		""":return: Exception caught during last processing or None"""  		return self._exc @@ -148,5 +168,9 @@ class InputChannelTask(OutputChannelTask):  		# and call it   		return OutputChannelTask.process(self, count) +		 +	def set_pool(self, pool): +		"""Set our pool to the given one, it will be weakref'd""" +		self._pool_ref = weakref.ref(pool)  	#{ Configuration | 
