diff options
Diffstat (limited to 'lib/git/async')
| -rw-r--r-- | lib/git/async/__init__.py | 30 | ||||
| -rw-r--r-- | lib/git/async/channel.py | 338 | ||||
| -rw-r--r-- | lib/git/async/graph.py | 126 | ||||
| -rw-r--r-- | lib/git/async/pool.py | 488 | ||||
| -rw-r--r-- | lib/git/async/task.py | 237 | ||||
| -rw-r--r-- | lib/git/async/thread.py | 201 | ||||
| -rw-r--r-- | lib/git/async/util.py | 268 | 
7 files changed, 0 insertions, 1688 deletions
diff --git a/lib/git/async/__init__.py b/lib/git/async/__init__.py deleted file mode 100644 index e212f1b2..00000000 --- a/lib/git/async/__init__.py +++ /dev/null @@ -1,30 +0,0 @@ -"""Initialize the multi-processing package""" - -#{ Initialization -def _init_atexit(): -	"""Setup an at-exit job to be sure our workers are shutdown correctly before -	the interpreter quits""" -	import atexit -	import thread -	atexit.register(thread.do_terminate_threads) -	 -def _init_signals(): -	"""Assure we shutdown our threads correctly when being interrupted""" -	import signal -	import thread -	 -	prev_handler = signal.getsignal(signal.SIGINT) -	def thread_interrupt_handler(signum, frame): -		thread.do_terminate_threads() -		if callable(prev_handler): -			prev_handler(signum, frame) -			raise KeyboardInterrupt() -		# END call previous handler -	# END signal handler -	signal.signal(signal.SIGINT, thread_interrupt_handler) - - -#} END init - -_init_atexit() -_init_signals() diff --git a/lib/git/async/channel.py b/lib/git/async/channel.py deleted file mode 100644 index a29ff17c..00000000 --- a/lib/git/async/channel.py +++ /dev/null @@ -1,338 +0,0 @@ -"""Contains a queue based channel implementation""" -from Queue import ( -	Empty,  -	Full -	) - -from util import ( -		AsyncQueue,  -		SyncQueue, -		ReadOnly -		) - -from time import time -import threading -import sys - -__all__ = ('Channel', 'SerialChannel', 'Writer', 'ChannelWriter', 'CallbackChannelWriter', -			'Reader', 'ChannelReader', 'CallbackChannelReader', 'mkchannel', 'ReadOnly',  -			'IteratorReader') - -#{ Classes  -class Channel(object): -	"""A channel is similar to a file like object. It has a write end as well as one or -	more read ends. If Data is in the channel, it can be read, if not the read operation -	will block until data becomes available. -	If the channel is closed, any read operation will result in an exception -	 -	This base class is not instantiated directly, but instead serves as constructor -	for Rwriter pairs. -	 -	Create a new channel """ -	__slots__ = 'queue' -	 -	# The queue to use to store the actual data -	QueueCls = AsyncQueue -	 -	def __init__(self): -		"""initialize this instance with a queue holding the channel contents"""  -		self.queue = self.QueueCls() - - -class SerialChannel(Channel): -	"""A slightly faster version of a Channel, which sacrificed thead-safety for performance""" -	QueueCls = SyncQueue - - -class Writer(object): -	"""A writer is an object providing write access to a possibly blocking reading device""" -	__slots__ = tuple() -	 -	#{ Interface -	 -	def __init__(self, device): -		"""Initialize the instance with the device to write to""" -	 -	def write(self, item, block=True, timeout=None): -		"""Write the given item into the device -		:param block: True if the device may block until space for the item is available -		:param timeout: The time in seconds to wait for the device to become ready  -		in blocking mode""" -		raise NotImplementedError() -		 -	def size(self): -		""":return: number of items already in the device, they could be read with a reader""" -		raise NotImplementedError() -		 -	def close(self): -		"""Close the channel. Multiple close calls on a closed channel are no  -		an error""" -		raise NotImplementedError() -		 -	def closed(self): -		""":return: True if the channel was closed""" -		raise NotImplementedError() -		 -	#} END interface -	 - -class ChannelWriter(Writer): -	"""The write end of a channel, a file-like interface for a channel""" -	__slots__ = ('channel', '_put') -	 -	def __init__(self, channel): -		"""Initialize the writer to use the given channel""" -		self.channel = channel -		self._put = self.channel.queue.put -	 -	#{ Interface -	def write(self, item, block=False, timeout=None): -		return self._put(item, block, timeout) -		 -	def size(self): -		return self.channel.queue.qsize() -		 -	def close(self): -		"""Close the channel. Multiple close calls on a closed channel are no  -		an error""" -		self.channel.queue.set_writable(False) -		 -	def closed(self): -		""":return: True if the channel was closed""" -		return not self.channel.queue.writable() -	#} END interface  -	 - -class CallbackWriterMixin(object): -	"""The write end of a channel which allows you to setup a callback to be  -	called after an item was written to the channel""" -	# slots don't work with mixin's :( -	# __slots__ = ('_pre_cb') -	 -	def __init__(self, *args): -		super(CallbackWriterMixin, self).__init__(*args) -		self._pre_cb = None -	 -	def set_pre_cb(self, fun = lambda item: item): -		"""Install a callback to be called before the given item is written. -		It returns a possibly altered item which will be written to the channel -		instead, making it useful for pre-write item conversions. -		Providing None uninstalls the current method. -		:return: the previously installed function or None -		:note: Must be thread-safe if the channel is used in multiple threads""" -		prev = self._pre_cb -		self._pre_cb = fun -		return prev -	 -	def write(self, item, block=True, timeout=None): -		if self._pre_cb: -			item = self._pre_cb(item) -		super(CallbackWriterMixin, self).write(item, block, timeout) -	 -	 -class CallbackChannelWriter(CallbackWriterMixin, ChannelWriter): -	"""Implements a channel writer with callback functionality""" -	pass -	 - -class Reader(object): -	"""Allows reading from a device""" -	__slots__ = tuple() -	 -	#{ Interface -	def __init__(self, device): -		"""Initialize the instance with the device to read from""" -		 -	def read(self, count=0, block=True, timeout=None): -		"""read a list of items read from the device. The list, as a sequence -		of items, is similar to the string of characters returned when reading from  -		file like objects. -		:param count: given amount of items to read. If < 1, all items will be read -		:param block: if True, the call will block until an item is available -		:param timeout: if positive and block is True, it will block only for the  -			given amount of seconds, returning the items it received so far. -			The timeout is applied to each read item, not for the whole operation. -		:return: single item in a list if count is 1, or a list of count items.  -			If the device was empty and count was 1, an empty list will be returned. -			If count was greater 1, a list with less than count items will be  -			returned. -			If count was < 1, a list with all items that could be read will be  -			returned.""" -		raise NotImplementedError() -		 - -class ChannelReader(Reader): -	"""Allows reading from a channel. The reader is thread-safe if the channel is as well""" -	__slots__ = 'channel' -	 -	def __init__(self, channel): -		"""Initialize this instance from its parent write channel""" -		self.channel = channel -		 -	#{ Interface -	 -	def read(self, count=0, block=True, timeout=None): -		# if the channel is closed for writing, we never block -		# NOTE: is handled by the queue -		# We don't check for a closed state here has it costs time - most of  -		# the time, it will not be closed, and will bail out automatically once -		# it gets closed -		 -		 -		# in non-blocking mode, its all not a problem -		out = list() -		queue = self.channel.queue -		if not block: -			# be as fast as possible in non-blocking mode, hence -			# its a bit 'unrolled' -			try: -				if count == 1: -					out.append(queue.get(False)) -				elif count < 1: -					while True: -						out.append(queue.get(False)) -					# END for each item -				else: -					for i in xrange(count): -						out.append(queue.get(False)) -					# END for each item -				# END handle count -			except Empty: -				pass -			# END handle exceptions -		else: -			# to get everything into one loop, we set the count accordingly -			if count == 0: -				count = sys.maxint -			# END handle count -			 -			i = 0 -			while i < count: -				try: -					out.append(queue.get(block, timeout)) -					i += 1 -				except Empty: -					# here we are only if  -					# someone woke us up to inform us about the queue that changed -					# its writable state -					# The following branch checks for closed channels, and pulls -					# as many items as we need and as possible, before  -					# leaving the loop. -					if not queue.writable(): -						try: -							while i < count: -								out.append(queue.get(False, None)) -								i += 1 -							# END count loop -						except Empty: -							break	# out of count loop  -						# END handle absolutely empty queue -					# END handle closed channel  -					 -					# if we are here, we woke up and the channel is not closed -					# Either the queue became writable again, which currently shouldn't -					# be able to happen in the channel, or someone read with a timeout -					# that actually timed out. -					# As it timed out, which is the only reason we are here,  -					# we have to abort -					break -				# END ignore empty -				 -			# END for each item -		# END handle blocking -		return out -		 -	#} END interface  - - -class CallbackReaderMixin(object): -	"""A channel which sends a callback before items are read from the channel""" -	# unfortunately, slots can only use direct inheritance, have to turn it off :( -	# __slots__ = "_pre_cb" -	 -	def __init__(self, *args): -		super(CallbackReaderMixin, self).__init__(*args) -		self._pre_cb = None -	 -	def set_pre_cb(self, fun = lambda count: None): -		"""Install a callback to call with the item count to be read before any  -		item is actually read from the channel.  -		Exceptions will be propagated. -		If a function is not provided, the call is effectively uninstalled. -		:return: the previously installed callback or None -		:note: The callback must be threadsafe if the channel is used by multiple threads.""" -		prev = self._pre_cb -		self._pre_cb = fun -		return prev -	 -	def read(self, count=0, block=True, timeout=None): -		if self._pre_cb: -			self._pre_cb(count) -		return super(CallbackReaderMixin, self).read(count, block, timeout) -		 -		 -class CallbackChannelReader(CallbackReaderMixin, ChannelReader): -	"""Implements a channel reader with callback functionality""" -	pass - - -class IteratorReader(Reader): -	"""A Reader allowing to read items from an iterator, instead of a channel. -	Reads will never block. Its thread-safe""" -	__slots__ = ("_empty", '_iter', '_lock') -	 -	# the type of the lock to use when reading from the iterator -	lock_type = threading.Lock -	 -	def __init__(self, iterator): -		self._empty = False -		if not hasattr(iterator, 'next'): -			raise ValueError("Iterator %r needs a next() function" % iterator) -		self._iter = iterator -		self._lock = self.lock_type() -		 -	def read(self, count=0, block=True, timeout=None): -		"""Non-Blocking implementation of read""" -		# not threadsafe, but worst thing that could happen is that  -		# we try to get items one more time -		if self._empty: -			return list() -		# END early abort -		 -		self._lock.acquire() -		try: -			if count == 0: -				self._empty = True -				return list(self._iter) -			else: -				out = list() -				it = self._iter -				for i in xrange(count): -					try: -						out.append(it.next()) -					except StopIteration: -						self._empty = True -						break -					# END handle empty iterator -				# END for each item to take -				return out -			# END handle count -		finally: -			self._lock.release() -		# END handle locking -		 - -#} END classes - -#{ Constructors -def mkchannel(ctype = Channel, wtype = ChannelWriter, rtype = ChannelReader): -	"""Create a channel, with a reader and a writer -	:return: tuple(reader, writer) -	:param ctype: Channel to instantiate -	:param wctype: The type of the write channel to instantiate -	:param rctype: The type of the read channel to instantiate""" -	c = ctype() -	wc = wtype(c) -	rc = rtype(c) -	return wc, rc -#} END constructors diff --git a/lib/git/async/graph.py b/lib/git/async/graph.py deleted file mode 100644 index 4e14c81e..00000000 --- a/lib/git/async/graph.py +++ /dev/null @@ -1,126 +0,0 @@ -"""Simplistic implementation of a graph""" - -__all__ = ('Node', 'Graph') - -class Node(object): -	"""A Node in the graph. They know their neighbours, and have an id which should  -	resolve into a string""" -	__slots__ = ('in_nodes', 'out_nodes', 'id') -	 -	def __init__(self, id=None): -		self.id = id -		self.in_nodes = list() -		self.out_nodes = list() -		 -	def __str__(self): -		return str(self.id) -		 -	def __repr__(self): -		return "%s(%s)" % (type(self).__name__, self.id) -	 -	 -class Graph(object): -	"""A simple graph implementation, keeping nodes and providing basic access and  -	editing functions. The performance is only suitable for small graphs of not  -	more than 10 nodes !""" -	__slots__ = "nodes" -	 -	def __init__(self): -		self.nodes = list() - -	def __del__(self): -		"""Deletes bidericational dependencies""" -		for node in self.nodes: -			node.in_nodes = None -			node.out_nodes = None -		# END cleanup nodes -		 -		# otherwise the nodes would keep floating around -	 - -	def add_node(self, node): -		"""Add a new node to the graph -		:return: the newly added node""" -		self.nodes.append(node) -		return node -	 -	def remove_node(self, node): -		"""Delete a node from the graph -		:return: self""" -		try: -			del(self.nodes[self.nodes.index(node)]) -		except ValueError: -			return self -		# END ignore if it doesn't exist -		 -		# clear connections -		for outn in node.out_nodes: -			del(outn.in_nodes[outn.in_nodes.index(node)]) -		for inn in node.in_nodes: -			del(inn.out_nodes[inn.out_nodes.index(node)]) -		node.out_nodes = list() -		node.in_nodes = list() -		return self -	 -	def add_edge(self, u, v): -		"""Add an undirected edge between the given nodes u and v. -		 -		return: self -		:raise ValueError: If the new edge would create a cycle""" -		if u is v: -			raise ValueError("Cannot connect a node with itself") -		 -		# are they already connected ? -		if 	u in v.in_nodes and v in u.out_nodes or \ -			v in u.in_nodes and u in v.out_nodes: -			return self -		# END handle connection exists -		 -		# cycle check - if we can reach any of the two by following either ones  -		# history, its a cycle -		for start, end in ((u, v), (v,u)): -			if not start.in_nodes:  -				continue -			nodes = start.in_nodes[:] -			seen = set() -			# depth first search - its faster -			while nodes: -				n = nodes.pop() -				if n in seen: -					continue -				seen.add(n) -				if n is end: -					raise ValueError("Connecting u with v would create a cycle") -				nodes.extend(n.in_nodes) -			# END while we are searching -		# END for each direction to look -		 -		# connection is valid, set it up -		u.out_nodes.append(v) -		v.in_nodes.append(u) -		 -		return self -	 -	def input_inclusive_dfirst_reversed(self, node): -		"""Return all input nodes of the given node, depth first, -		It will return the actual input node last, as it is required -		like that by the pool""" -		stack = [node] -		seen = set() -		 -		# depth first -		out = list() -		while stack: -			n = stack.pop() -			if n in seen: -				continue -			seen.add(n) -			out.append(n) -			 -			# only proceed in that direction if visitor is fine with it -			stack.extend(n.in_nodes) -			# END call visitor -		# END while walking -		out.reverse() -		return out -		 diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py deleted file mode 100644 index 8f33a029..00000000 --- a/lib/git/async/pool.py +++ /dev/null @@ -1,488 +0,0 @@ -"""Implementation of a thread-pool working with channels""" -from thread import ( -		WorkerThread,  -		StopProcessing, -		) -from threading import Lock - -from util import ( -		AsyncQueue, -		DummyLock -	) - -from Queue import ( -	Queue,  -	Empty -	) - -from graph import Graph  -from channel import ( -		mkchannel, -		ChannelWriter,  -		Channel, -		SerialChannel, -		CallbackChannelReader -	) - -import sys -import weakref -from time import sleep -import new - - -__all__ = ('PoolReader', 'Pool', 'ThreadPool') - - -class PoolReader(CallbackChannelReader): -	"""A reader designed to read from channels which take part in pools -	It acts like a handle to the underlying task in the pool.""" -	__slots__ = ('_task_ref', '_pool_ref') -	 -	def __init__(self, channel, task, pool): -		CallbackChannelReader.__init__(self, channel) -		self._task_ref = weakref.ref(task) -		self._pool_ref = weakref.ref(pool) -		 -	def __del__(self): -		"""Assures that our task will be deleted if we were the last reader""" -		task = self._task_ref() -		if task is None: -			return -		 -		pool = self._pool_ref() -		if pool is None: -			return -		 -		# if this is the last reader to the wc we just handled, there  -		# is no way anyone will ever read from the task again. If so,  -		# delete the task in question, it will take care of itself and orphans -		# it might leave -		# 1 is ourselves, + 1 for the call + 1, and 3 magical ones which  -		# I can't explain, but appears to be normal in the destructor -		# On the caller side, getrefcount returns 2, as expected -		# When just calling remove_task,  -		# it has no way of knowing that the write channel is about to diminsh. -		# which is why we pass the info as a private kwarg  - not nice, but  -		# okay for now -		if sys.getrefcount(self) < 6: -			pool.remove_task(task, _from_destructor_ = True) -		# END handle refcount based removal of task - -	#{ Internal -	def _read(self, count=0, block=True, timeout=None): -		return CallbackChannelReader.read(self, count, block, timeout) -	 -	#} END internal - -	#{ Interface -	 -	def pool_ref(self): -		""":return: reference to the pool we belong to""" -		return self._pool_ref -		 -	def task_ref(self): -		""":return: reference to the task producing our items""" -		return self._task_ref -	 -	#} END interface  -	 -	def read(self, count=0, block=True, timeout=None): -		"""Read an item that was processed by one of our threads -		:note: Triggers task dependency handling needed to provide the necessary  -			input""" -		# NOTE: we always queue the operation that would give us count items -		# as tracking the scheduled items or testing the channels size -		# is in herently unsafe depending on the design of the task network -		# If we put on tasks onto the queue for every request, we are sure -		# to always produce enough items, even if the task.min_count actually -		# provided enough - its better to have some possibly empty task runs  -		# than having and empty queue that blocks. -		 -		# if the user tries to use us to read from a done task, we will never  -		# compute as all produced items are already in the channel -		task = self._task_ref() -		if task is None: -			return list() -		# END abort if task was deleted -			 -		skip_compute = task.is_done() or task.error() -		 -		########## prepare ############################## -		if not skip_compute: -			self._pool_ref()._prepare_channel_read(task, count) -		# END prepare pool scheduling -		 -		 -		####### read data ######## -		########################## -		# read actual items, tasks were setup to put their output into our channel ( as well ) -		items = CallbackChannelReader.read(self, count, block, timeout) -		########################## -		 -		 -		return items -		 -	 -	 -class Pool(object): -	"""A thread pool maintains a set of one or more worker threads, but supports  -	a fully serial mode in which case the amount of threads is zero. -	 -	Work is distributed via Channels, which form a dependency graph. The evaluation -	is lazy, as work will only be done once an output is requested. -	 -	The thread pools inherent issue is the global interpreter lock that it will hit,  -	which gets worse considering a few c extensions specifically lock their part -	globally as well. The only way this will improve is if custom c extensions -	are written which do some bulk work, but release the GIL once they have acquired -	their resources. -	 -	Due to the nature of having multiple objects in git, its easy to distribute  -	that work cleanly among threads. -	 -	:note: the current implementation returns channels which are meant to be  -		used only from the main thread, hence you cannot consume their results  -		from multiple threads unless you use a task for it.""" -	__slots__ = (	'_tasks',				# a graph of tasks -					'_num_workers',			# list of workers -					'_queue', 				# master queue for tasks -					'_taskorder_cache', 	# map task id -> ordered dependent tasks -					'_taskgraph_lock',		# lock for accessing the task graph -				) -	 -	# CONFIGURATION -	# The type of worker to create - its expected to provide the Thread interface,  -	# taking the taskqueue as only init argument -	# as well as a method called stop_and_join() to terminate it -	WorkerCls = None -	 -	# The type of lock to use to protect critical sections, providing the  -	# threading.Lock interface -	LockCls = None -	 -	# the type of the task queue to use - it must provide the Queue interface -	TaskQueueCls = None -	 -	 -	def __init__(self, size=0): -		self._tasks = Graph() -		self._num_workers = 0 -		self._queue = self.TaskQueueCls() -		self._taskgraph_lock = self.LockCls() -		self._taskorder_cache = dict() -		self.set_size(size) -		 -	def __del__(self): -		self.set_size(0) -	 -	#{ Internal -		 -	def _prepare_channel_read(self, task, count): -		"""Process the tasks which depend on the given one to be sure the input  -		channels are filled with data once we process the actual task -		 -		Tasks have two important states: either they are done, or they are done  -		and have an error, so they are likely not to have finished all their work. -		 -		Either way, we will put them onto a list of tasks to delete them, providng  -		information about the failed ones. -		 -		Tasks which are not done will be put onto the queue for processing, which  -		is fine as we walked them depth-first.""" -		# for the walk, we must make sure the ordering does not change. Even  -		# when accessing the cache, as it is related to graph changes -		self._taskgraph_lock.acquire() -		try: -			try: -				dfirst_tasks = self._taskorder_cache[id(task)] -			except KeyError: -				# have to retrieve the list from the graph -				dfirst_tasks = self._tasks.input_inclusive_dfirst_reversed(task) -				self._taskorder_cache[id(task)] = dfirst_tasks -			# END handle cached order retrieval -		finally: -			self._taskgraph_lock.release() -		# END handle locking -		 -		# check the min count on all involved tasks, and be sure that we don't  -		# have any task which produces less than the maximum min-count of all tasks -		# The actual_count is used when chunking tasks up for the queue, whereas  -		# the count is usued to determine whether we still have enough output -		# on the queue, checking qsize ( ->revise ) -		# ABTRACT: If T depends on T-1, and the client wants 1 item, T produces -		# at least 10, T-1 goes with 1, then T will block after 1 item, which  -		# is read by the client. On the next read of 1 item, we would find T's  -		# queue empty and put in another 10, which could put another thread into  -		# blocking state. T-1 produces one more item, which is consumed right away -		# by the two threads running T. Although this works in the end, it leaves -		# many threads blocking and waiting for input, which is not desired. -		# Setting the min-count to the max of the mincount of all tasks assures -		# we have enough items for all. -		# Addition: in serial mode, we would enter a deadlock if one task would -		# ever wait for items ! -		actual_count = count -		min_counts = (((t.min_count is not None and t.min_count) or count) for t in dfirst_tasks) -		min_count = reduce(lambda m1, m2: max(m1, m2), min_counts) -		if 0 < count < min_count: -			actual_count = min_count -		# END set actual count -		 -		# the list includes our tasks - the first one to evaluate first, the  -		# requested one last -		for task in dfirst_tasks:  -			# if task.error() or task.is_done(): -				# in theory, the should never be consumed task in the pool, right ? -				# They delete themselves once they are done. But as we run asynchronously,  -				# It can be that someone reads, while a task realizes its done, and  -				# we get here to prepare the read although it already is done. -				# Its not a problem though, the task wiill not do anything. -				# Hence we don't waste our time with checking for it -				# raise AssertionError("Shouldn't have consumed tasks on the pool, they delete themeselves, what happend ?") -			# END skip processing -			 -			# but use the actual count to produce the output, we may produce  -			# more than requested -			numchunks = 1 -			chunksize = actual_count -			remainder = 0 -			 -			# we need the count set for this - can't chunk up unlimited items -			# In serial mode we could do this by checking for empty input channels,  -			# but in dispatch mode its impossible ( == not easily possible ) -			# Only try it if we have enough demand -			if task.max_chunksize and actual_count > task.max_chunksize: -				numchunks = actual_count / task.max_chunksize -				chunksize = task.max_chunksize -				remainder = actual_count - (numchunks * chunksize) -			# END handle chunking -			 -			# the following loops are kind of unrolled - code duplication -			# should make things execute faster. Putting the if statements  -			# into the loop would be less code, but ... slower -			if self._num_workers: -				# respect the chunk size, and split the task up if we want  -				# to process too much. This can be defined per task -				qput = self._queue.put -				if numchunks > 1: -					for i in xrange(numchunks): -						qput((task.process, chunksize)) -					# END for each chunk to put -				else: -					qput((task.process, chunksize)) -				# END try efficient looping -				 -				if remainder: -					qput((task.process, remainder)) -				# END handle chunksize -			else: -				# no workers, so we have to do the work ourselves -				if numchunks > 1: -					for i in xrange(numchunks): -						task.process(chunksize) -					# END for each chunk to put -				else: -					task.process(chunksize) -				# END try efficient looping -				 -				if remainder: -					task.process(remainder) -				# END handle chunksize -			# END handle serial mode -		# END for each task to process -		 -		 -	def _remove_task_if_orphaned(self, task, from_destructor): -		"""Check the task, and delete it if it is orphaned""" -		# 1 for writer on task, 1 for the getrefcount call + 1 for each other writer/reader -		# If we are getting here from the destructor of an RPool channel,  -		# its totally valid to virtually decrement the refcount by 1 as  -		# we can expect it to drop once the destructor completes, which is when -		# we finish all recursive calls -		max_ref_count = 3 + from_destructor -		if sys.getrefcount(task.writer().channel) < max_ref_count: -			self.remove_task(task, from_destructor) -	#} END internal -	 -	#{ Interface  -	def size(self): -		""":return: amount of workers in the pool -		:note: method is not threadsafe !""" -		return self._num_workers -	 -	def set_size(self, size=0): -		"""Set the amount of workers to use in this pool. When reducing the size,  -		threads will continue with their work until they are done before effectively -		being removed. -		 -		:return: self -		:param size: if 0, the pool will do all work itself in the calling thread,  -			otherwise the work will be distributed among the given amount of threads. -			If the size is 0, newly added tasks will use channels which are NOT  -			threadsafe to optimize item throughput. -		 -		:note: currently NOT threadsafe !""" -		assert size > -1, "Size cannot be negative" -		 -		# either start new threads, or kill existing ones. -		# If we end up with no threads, we process the remaining chunks on the queue -		# ourselves -		cur_count = self._num_workers -		if cur_count < size: -			# we can safely increase the size, even from serial mode, as we would -			# only be able to do this if the serial ( sync ) mode finished processing. -			# Just adding more workers is not a problem at all. -			add_count = size - cur_count -			for i in range(add_count): -				self.WorkerCls(self._queue).start() -			# END for each new worker to create -			self._num_workers += add_count -		elif cur_count > size: -			# We don't care which thread exactly gets hit by our stop request -			# On their way, they will consume remaining tasks, but new ones  -			# could be added as we speak. -			del_count = cur_count - size -			for i in range(del_count): -				self._queue.put((self.WorkerCls.stop, True))	# arg doesnt matter -			# END for each thread to stop -			self._num_workers -= del_count -		# END handle count -		 -		if size == 0: -			# NOTE: we do not preocess any tasks still on the queue, as we ill  -			# naturally do that once we read the next time, only on the tasks -			# that are actually required. The queue will keep the tasks,  -			# and once we are deleted, they will vanish without additional -			# time spend on them. If there shouldn't be any consumers anyway. -			# If we should reenable some workers again, they will continue on the  -			# remaining tasks, probably with nothing to do. -			# We can't clear the task queue if we have removed workers  -			# as they will receive the termination signal through it, and if  -			# we had added workers, we wouldn't be here ;). -			pass  -		# END process queue -		return self -		 -	def num_tasks(self): -		""":return: amount of tasks""" -		self._taskgraph_lock.acquire() -		try: -			return len(self._tasks.nodes) -		finally: -			self._taskgraph_lock.release() -		 -	def remove_task(self, task, _from_destructor_ = False): -		"""Delete the task -		Additionally we will remove orphaned tasks, which can be identified if their  -		output channel is only held by themselves, so no one will ever consume  -		its items. -		 -		This method blocks until all tasks to be removed have been processed, if  -		they are currently being processed. -		:return: self""" -		self._taskgraph_lock.acquire() -		try: -			# it can be that the task is already deleted, but its chunk was on the  -			# queue until now, so its marked consumed again -			if not task in self._tasks.nodes: -				return self -			# END early abort -			 -			# the task we are currently deleting could also be processed by  -			# a thread right now. We don't care about it as its taking care about -			# its write channel itself, and sends everything it can to it. -			# For it it doesn't matter that its not part of our task graph anymore. -		 -			# now delete our actual node - be sure its done to prevent further  -			# processing in case there are still client reads on their way. -			task.set_done() -			 -			# keep its input nodes as we check whether they were orphaned -			in_tasks = task.in_nodes -			self._tasks.remove_node(task) -			self._taskorder_cache.clear() -		finally: -			self._taskgraph_lock.release() -		# END locked deletion -		 -		for t in in_tasks: -			self._remove_task_if_orphaned(t, _from_destructor_) -		# END handle orphans recursively -		 -		return self -	 -	def add_task(self, task): -		"""Add a new task to be processed. -		:return: a read channel to retrieve processed items. If that handle is lost,  -			the task will be considered orphaned and will be deleted on the next  -			occasion.""" -		# create a write channel for it -		ctype = Channel -		 -		# adjust the task with our pool ref, if it has the slot and is empty -		# For now, we don't allow tasks to be used in multiple pools, except -		# for by their channels -		if hasattr(task, 'pool'): -			their_pool = task.pool() -			if their_pool is None: -				task.set_pool(self) -			elif their_pool is not self: -				raise ValueError("Task %r is already registered to another pool" % task.id) -			# END handle pool exclusivity -		# END handle pool aware tasks -		 -		self._taskgraph_lock.acquire() -		try: -			self._taskorder_cache.clear() -			self._tasks.add_node(task) -			 -			# Use a non-threadsafe queue -			# This brings about 15% more performance, but sacrifices thread-safety -			if self.size() == 0: -				ctype = SerialChannel -			# END improve locks -			 -			# setup the tasks channel - respect the task creators choice though -			# if it is set. -			wc = task.writer() -			ch = None -			if wc is None: -				ch = ctype() -				wc = ChannelWriter(ch) -				task.set_writer(wc) -			else: -				ch = wc.channel -			# END create write channel ifunset -			rc = PoolReader(ch, task, self) -		finally: -			self._taskgraph_lock.release() -		# END sync task addition -		 -		# If the input channel is one of our read channels, we add the relation -		if hasattr(task, 'reader'): -			ic = task.reader() -			if hasattr(ic, 'pool_ref') and ic.pool_ref()() is self: -				self._taskgraph_lock.acquire() -				try: -					self._tasks.add_edge(ic._task_ref(), task) -					 -					# additionally, bypass ourselves when reading from the  -					# task, if possible -					if hasattr(ic, '_read'): -						task.set_read(ic._read) -					# END handle read bypass -				finally: -					self._taskgraph_lock.release() -				# END handle edge-adding -			# END add task relation -		# END handle input channels for connections -		 -		return rc -			 -	#} END interface  -	 -	 -class ThreadPool(Pool): -	"""A pool using threads as worker""" -	WorkerCls = WorkerThread -	LockCls = Lock -	TaskQueueCls = AsyncQueue diff --git a/lib/git/async/task.py b/lib/git/async/task.py deleted file mode 100644 index ac948dc0..00000000 --- a/lib/git/async/task.py +++ /dev/null @@ -1,237 +0,0 @@ -from graph import Node -from util import ReadOnly -from channel import IteratorReader - - -import threading -import weakref -import sys -import new - -__all__ = ('Task', 'ThreadTaskBase', 'IteratorTaskBase',  -			'IteratorThreadTask', 'ChannelThreadTask') - -class Task(Node): -	"""Abstracts a named task, which contains  -	additional information on how the task should be queued and processed. -	 -	Results of the item processing are sent to a writer, which is to be  -	set by the creator using the ``set_writer`` method. -	 -	Items are read using the internal ``_read`` callable, subclasses are meant to -	set this to a callable that supports the Reader interface's read function. -	 -	* **min_count** assures that not less than min_count items will be processed per call. -	* **max_chunksize** assures that multi-threading is happening in smaller chunks. If  -	 someone wants all items to be processed, using read(0), the whole task would go to -	 one worker, as well as dependent tasks. If you want finer granularity , you can  -	 specify this here, causing chunks to be no larger than max_chunksize -	* **apply_single** if True, default True, individual items will be given to the  -		worker function. If False, a list of possibly multiple items will be passed -		instead.""" -	__slots__ = (	'_read',			# method to yield items to process  -					'_out_writer', 			# output write channel -					'_exc',				# exception caught -					'_done',			# True if we are done -					'_num_writers',		# number of concurrent writers -					'_wlock',			# lock for the above -					'fun',				# function to call with items read -					'min_count', 		# minimum amount of items to produce, None means no override -					'max_chunksize',	# maximium amount of items to process per process call -					'apply_single'		# apply single items even if multiple where read -					) -	 -	def __init__(self, id, fun, apply_single=True, min_count=None, max_chunksize=0,  -					writer=None): -		Node.__init__(self, id) -		self._read = None					# to be set by subclasss  -		self._out_writer = writer -		self._exc = None -		self._done = False -		self._num_writers = 0 -		self._wlock = threading.Lock() -		self.fun = fun -		self.min_count = None -		self.max_chunksize = 0				# note set -		self.apply_single = apply_single -	 -	def is_done(self): -		""":return: True if we are finished processing""" -		return self._done -		 -	def set_done(self): -		"""Set ourselves to being done, has we have completed the processing""" -		self._done = True -		 -	def set_writer(self, writer): -		"""Set the write channel to the given one""" -		self._out_writer = writer -		 -	def writer(self): -		""":return: a proxy to our write channel or None if non is set -		:note: you must not hold a reference to our write channel when the  -			task is being processed. This would cause the write channel never  -			to be closed as the task will think there is still another instance -			being processed which can close the channel once it is done. -			In the worst case, this will block your reads.""" -		if self._out_writer is None: -			return None -		return self._out_writer -		 -	def close(self): -		"""A closed task will close its channel to assure the readers will wake up -		:note: its safe to call this method multiple times""" -		self._out_writer.close() -		 -	def is_closed(self): -		""":return: True if the task's write channel is closed""" -		return self._out_writer.closed() -		 -	def error(self): -		""":return: Exception caught during last processing or None""" -		return self._exc - -	def process(self, count=0): -		"""Process count items and send the result individually to the output channel""" -		# first thing: increment the writer count - other tasks must be able  -		# to respond properly ( even if it turns out we don't need it later ) -		self._wlock.acquire() -		self._num_writers += 1 -		self._wlock.release() -		 -		items = self._read(count) -		 -		try: -			try: -				if items: -					write = self._out_writer.write -					if self.apply_single: -						for item in items: -							rval = self.fun(item) -							write(rval) -						# END for each item -					else: -						# shouldn't apply single be the default anyway ?  -						# The task designers should chunk them up in advance -						rvals = self.fun(items) -						for rval in rvals: -							write(rval) -					# END handle single apply -				# END if there is anything to do -			finally: -				self._wlock.acquire() -				self._num_writers -= 1 -				self._wlock.release() -			# END handle writer count -		except Exception, e: -			# be sure our task is not scheduled again -			self.set_done() -			 -			# PROBLEM: We have failed to create at least one item, hence its not  -			# garantueed that enough items will be produced for a possibly blocking -			# client on the other end. This is why we have no other choice but -			# to close the channel, preventing the possibility of blocking. -			# This implies that dependent tasks will go down with us, but that is -			# just the right thing to do of course - one loose link in the chain ... -			# Other chunks of our kind currently being processed will then  -			# fail to write to the channel and fail as well -			self.close() -			 -			# If some other chunk of our Task had an error, the channel will be closed -			# This is not an issue, just be sure we don't overwrite the original  -			# exception with the ReadOnly error that would be emitted in that case. -			# We imply that ReadOnly is exclusive to us, as it won't be an error -			# if the user emits it -			if not isinstance(e, ReadOnly): -				self._exc = e -			# END set error flag -		# END exception handling -		 -		 -		# if we didn't get all demanded items, which is also the case if count is 0 -		# we have depleted the input channel and are done -		# We could check our output channel for how many items we have and put that  -		# into the equation, but whats important is that we were asked to produce -		# count items. -		if not items or len(items) != count: -			self.set_done() -		# END handle done state -		 -		# If we appear to be the only one left with our output channel, and are  -		# done ( this could have been set in another thread as well ), make  -		# sure to close the output channel. -		# Waiting with this to be the last one helps to keep the  -		# write-channel writable longer -		# The count is: 1 = wc itself, 2 = first reader channel, + x for every  -		# thread having its copy on the stack  -		# + 1 for the instance we provide to refcount -		# Soft close, so others can continue writing their results -		if self.is_done(): -			self._wlock.acquire() -			try: -				if self._num_writers == 0: -					self.close() -				# END handle writers -			finally: -				self._wlock.release() -			# END assure lock release -		# END handle channel closure -	#{ Configuration - - -class ThreadTaskBase(object): -	"""Describes tasks which can be used with theaded pools""" -	pass - - -class IteratorTaskBase(Task): -	"""Implements a task which processes items from an iterable in a multi-processing  -	safe manner""" -	__slots__ = tuple() -	 -	 -	def __init__(self, iterator, *args, **kwargs): -		Task.__init__(self, *args, **kwargs) -		self._read = IteratorReader(iterator).read -		# defaults to returning our items unchanged -		self.fun = lambda item: item -				 -		 -class IteratorThreadTask(IteratorTaskBase, ThreadTaskBase): -	"""An input iterator for threaded pools""" -	lock_type = threading.Lock -		 - -class ChannelThreadTask(Task, ThreadTaskBase): -	"""Uses an input channel as source for reading items -	For instantiation, it takes all arguments of its base, the first one needs -	to be the input channel to read from though.""" -	__slots__ = "_pool_ref" -	 -	def __init__(self, in_reader, *args, **kwargs): -		Task.__init__(self, *args, **kwargs) -		self._read = in_reader.read -		self._pool_ref = None - -	#{ Internal Interface  -	 -	def reader(self): -		""":return: input channel from which we read""" -		# the instance is bound in its instance method - lets use this to keep -		# the refcount at one ( per consumer ) -		return self._read.im_self -		 -	def set_read(self, read): -		"""Adjust the read method to the given one""" -		self._read = read -		 -	def set_pool(self, pool): -		self._pool_ref = weakref.ref(pool) -		 -	def pool(self): -		""":return: pool we are attached to, or None""" -		if self._pool_ref is None: -			return None -		return self._pool_ref() -		 -	#} END intenral interface diff --git a/lib/git/async/thread.py b/lib/git/async/thread.py deleted file mode 100644 index 96b4f0c4..00000000 --- a/lib/git/async/thread.py +++ /dev/null @@ -1,201 +0,0 @@ -# -*- coding: utf-8 -*- -"""Module with threading utilities""" -__docformat__ = "restructuredtext" -import threading -import inspect -import Queue - -import sys - -__all__ = ('do_terminate_threads', 'terminate_threads', 'TerminatableThread',  -			'WorkerThread')  -		 - -#{ Decorators - -def do_terminate_threads(whitelist=list()): -	"""Simple function which terminates all of our threads -	:param whitelist: If whitelist is given, only the given threads will be terminated""" -	for t in threading.enumerate(): -		if not isinstance(t, TerminatableThread): -			continue -		if whitelist and t not in whitelist: -			continue -		t.stop_and_join() -	# END for each thread - -def terminate_threads( func ): -	"""Kills all worker threads the method has created by sending the quit signal. -	This takes over in case of an error in the main function""" -	def wrapper(*args, **kwargs): -		cur_threads = set(threading.enumerate()) -		try: -			return func(*args, **kwargs) -		finally: -			do_terminate_threads(set(threading.enumerate()) - cur_threads) -		# END finally shutdown threads -	# END wrapper  -	wrapper.__name__ = func.__name__ -	return wrapper - -#} END decorators - -#{ Classes -	 -class TerminatableThread(threading.Thread): -	"""A simple thread able to terminate itself on behalf of the user. -	 -	Terminate a thread as follows: -	 -	t.stop_and_join() -	 -	Derived classes call _should_terminate() to determine whether they should  -	abort gracefully -	""" -	__slots__ = '_terminate' -	 -	def __init__(self): -		super(TerminatableThread, self).__init__() -		self._terminate = False -		 -		 -	#{ Subclass Interface -	def _should_terminate(self): -		""":return: True if this thread should terminate its operation immediately""" -		return self._terminate -		 -	def _terminated(self): -		"""Called once the thread terminated. Its called in the main thread -		and may perform cleanup operations""" -		pass - -	def start(self): -		"""Start the thread and return self""" -		super(TerminatableThread, self).start() -		return self -	 -	#} END subclass interface -		 -	#{ Interface  -		 -	def stop_and_join(self): -		"""Ask the thread to stop its operation and wait for it to terminate -		:note: Depending on the implenetation, this might block a moment""" -		self._terminate = True -		self.join() -		self._terminated() -	#} END interface -	 -	 -class StopProcessing(Exception): -	"""If thrown in a function processed by a WorkerThread, it will terminate""" -	 - -class WorkerThread(TerminatableThread): -	""" This base allows to call functions on class instances natively. -	As it is meant to work with a pool, the result of the call must be  -	handled by the callee. -	The thread runs forever unless it receives the terminate signal using  -	its task queue. -	 -	Tasks could be anything, but should usually be class methods and arguments to -	allow the following: -	 -	inq = Queue() -	w = WorkerThread(inq) -	w.start() -	inq.put((WorkerThread.<method>, args, kwargs)) -	 -	finally we call quit to terminate asap. -	 -	alternatively, you can make a call more intuitively - the output is the output queue -	allowing you to get the result right away or later -	w.call(arg, kwarg='value').get() -	 -	inq.put(WorkerThread.quit) -	w.join() -	 -	You may provide the following tuples as task: -	t[0] = class method, function or instance method -	t[1] = optional, tuple or list of arguments to pass to the routine -	t[2] = optional, dictionary of keyword arguments to pass to the routine -	""" -	__slots__ = ('inq') -	 -	 -	# define how often we should check for a shutdown request in case our  -	# taskqueue is empty -	shutdown_check_time_s = 0.5 -	 -	def __init__(self, inq = None): -		super(WorkerThread, self).__init__() -		self.inq = inq -		if inq is None: -			self.inq = Queue.Queue() -	 -	@classmethod -	def stop(cls, *args): -		"""If send via the inq of the thread, it will stop once it processed the function""" -		raise StopProcessing -	 -	def run(self): -		"""Process input tasks until we receive the quit signal""" -		gettask = self.inq.get -		while True: -			if self._should_terminate(): -				break -			# END check for stop request -			 -			# note: during shutdown, this turns None in the middle of waiting  -			# for an item to be put onto it - we can't du anything about it -  -			# even if we catch everything and break gracefully, the parent  -			# call will think we failed with an empty exception. -			# Hence we just don't do anything about it. Alternatively -			# we could override the start method to get our own bootstrapping,  -			# which would mean repeating plenty of code in of the threading module. -			tasktuple = gettask() -				 -			# needing exactly one function, and one arg -			routine, arg = tasktuple -			 -			try: -				try: -					rval = None -					if inspect.ismethod(routine): -						if routine.im_self is None: -							rval = routine(self, arg) -						else: -							rval = routine(arg) -					elif inspect.isroutine(routine): -						rval = routine(arg) -					else: -						# ignore unknown items -						sys.stderr.write("%s: task %s was not understood - terminating\n" % (self.getName(), str(tasktuple))) -						break -					# END make routine call -				finally: -					# make sure we delete the routine to release the reference as soon -					# as possible. Otherwise objects might not be destroyed  -					# while we are waiting -					del(routine) -					del(tasktuple) -			except StopProcessing: -				break -			except Exception,e: -				sys.stderr.write("%s: Task %s raised unhandled exception: %s - this really shouldn't happen !\n" % (self.getName(), str(tasktuple), str(e))) -				continue	# just continue  -			# END routine exception handling -		 -			# END handle routine release -		# END endless loop -	 -	def stop_and_join(self): -		"""Send stop message to ourselves - we don't block, the thread will terminate  -		once it has finished processing its input queue to receive our termination -		event""" -		# DONT call superclass as it will try to join - join's don't work for  -		# some reason, as python apparently doesn't switch threads (so often) -		# while waiting ... I don't know, but the threads respond properly,  -		# but only if dear python switches to them -		self.inq.put((self.stop, None)) -#} END classes diff --git a/lib/git/async/util.py b/lib/git/async/util.py deleted file mode 100644 index 4c4f3929..00000000 --- a/lib/git/async/util.py +++ /dev/null @@ -1,268 +0,0 @@ -"""Module with utilities related to async operations""" - -from threading import ( -	Lock, -	_allocate_lock, -	_Condition,  -	_sleep, -	_time, -	) - -from Queue import ( -		Empty, -		) - -from collections import deque -import sys -import os - -#{ Routines  - -def cpu_count(): -	""":return:number of CPUs in the system -	:note: inspired by multiprocessing""" -	num = 0 -	try: -		if sys.platform == 'win32': -			num = int(os.environ['NUMBER_OF_PROCESSORS']) -		elif 'bsd' in sys.platform or sys.platform == 'darwin': -			num = int(os.popen('sysctl -n hw.ncpu').read()) -		else: -			num = os.sysconf('SC_NPROCESSORS_ONLN') -	except (ValueError, KeyError, OSError, AttributeError): -		pass -	# END exception handling -	 -	if num == 0: -		raise NotImplementedError('cannot determine number of cpus') -	 -	return num -	 -#} END routines - - - -class DummyLock(object): -	"""An object providing a do-nothing lock interface for use in sync mode""" -	__slots__ = tuple() -	 -	def acquire(self): -		pass -	 -	def release(self): -		pass -	 - -class SyncQueue(deque): -	"""Adapter to allow using a deque like a queue, without locking""" -	def get(self, block=True, timeout=None): -		try: -			return self.popleft() -		except IndexError: -			raise Empty -		# END raise empty - -	def empty(self): -		return len(self) == 0 -		 -	def set_writable(self, state): -		pass -	 -	def writable(self): -		return True - -	def put(self, item, block=True, timeout=None): -		self.append(item) -	 - -class HSCondition(deque): -	"""Cleaned up code of the original condition object in order  -	to make it run and respond faster.""" -	__slots__ = ("_lock") -	delay = 0.0002					# reduces wait times, but increases overhead -	 -	def __init__(self, lock=None): -		if lock is None: -			lock = Lock() -		self._lock = lock - -	def release(self): -		self._lock.release() -		 -	def acquire(self, block=None): -		if block is None: -			self._lock.acquire() -		else: -			self._lock.acquire(block) - -	def wait(self, timeout=None): -		waiter = _allocate_lock() -		waiter.acquire()				# get it the first time, no blocking -		self.append(waiter) -		 -		 -		try: -			# restore state no matter what (e.g., KeyboardInterrupt) -			# now we block, as we hold the lock already -			# in the momemnt we release our lock, someone else might actually resume -			self._lock.release() -			if timeout is None: -				waiter.acquire() -			else: -				# Balancing act:  We can't afford a pure busy loop, because of the  -				# GIL, so we have to sleep -				# We try to sleep only tiny amounts of time though to be very responsive -				# NOTE: this branch is not used by the async system anyway, but  -				# will be hit when the user reads with timeout  -				endtime = _time() + timeout -				delay = self.delay -				acquire = waiter.acquire -				while True: -					gotit = acquire(0) -					if gotit: -						break -					remaining = endtime - _time() -					if remaining <= 0: -						break -					# this makes 4 threads working as good as two, but of course -					# it causes more frequent micro-sleeping -					#delay = min(delay * 2, remaining, .05) -					_sleep(delay) -				# END endless loop -				if not gotit: -					try: -						self.remove(waiter) -					except ValueError: -						pass -				# END didn't ever get it -		finally: -			# reacquire the lock  -			self._lock.acquire() -		# END assure release lock -			 -	def notify(self, n=1): -		"""Its vital that this method is threadsafe - we absolutely have to  -		get a lock at the beginning of this method to be sure we get the  -		correct amount of waiters back. If we bail out, although a waiter -		is about to be added, it will miss its wakeup notification, and block -		forever (possibly)""" -		self._lock.acquire() -		try: -			if not self:	# len(self) == 0, but this should be faster -				return -			if n == 1: -				try: -					self.popleft().release() -				except IndexError: -					pass -			else: -				for i in range(min(n, len(self))): -					self.popleft().release() -				# END for each waiter to resume -			# END handle n = 1 case faster -		finally: -			self._lock.release() -		# END assure lock is released -	 -	def notify_all(self): -		self.notify(len(self)) -		 - -class ReadOnly(Exception): -	"""Thrown when trying to write to a read-only queue""" - -class AsyncQueue(deque): -	"""A queue using different condition objects to gain multithreading performance. -	Additionally it has a threadsafe writable flag, which will alert all readers -	that there is nothing more to get here. -	All default-queue code was cleaned up for performance.""" -	__slots__ = ('mutex', 'not_empty', '_writable') -	 -	def __init__(self, maxsize=0): -		self.mutex = Lock() -		self.not_empty = HSCondition(self.mutex) -		self._writable = True -		 -	def qsize(self): -		self.mutex.acquire() -		try: -			return len(self) -		finally: -			self.mutex.release() - -	def writable(self): -		self.mutex.acquire() -		try: -			return self._writable -		finally: -			self.mutex.release() - -	def set_writable(self, state): -		"""Set the writable flag of this queue to True or False -		:return: The previous state""" -		self.mutex.acquire() -		try: -			old = self._writable -			self._writable = state -			return old -		finally: -			self.mutex.release() -			# if we won't receive anymore items, inform the getters -			if not state: -				self.not_empty.notify_all() -			# END tell everyone -		# END handle locking - -	def empty(self): -		self.mutex.acquire() -		try: -			return not len(self) -		finally: -			self.mutex.release() - -	def put(self, item, block=True, timeout=None): -		self.mutex.acquire() -		# NOTE: we explicitly do NOT check for our writable state -		# Its just used as a notification signal, and we need to be able  -		# to continue writing to prevent threads ( easily ) from failing -		# to write their computed results, which we want in fact -		# NO: we want them to fail and stop processing, as the one who caused -		# the channel to close had a reason and wants the threads to  -		# stop on the task as soon as possible -		if not self._writable: -			self.mutex.release() -			raise ReadOnly -		# END handle read-only -		self.append(item) -		self.mutex.release() -		self.not_empty.notify() -		 -	def get(self, block=True, timeout=None): -		self.mutex.acquire() -		try: -			if block: -				if timeout is None: -					while not len(self) and self._writable: -						self.not_empty.wait() -				else: -					endtime = _time() + timeout -					while not len(self) and self._writable: -						remaining = endtime - _time() -						if remaining <= 0.0: -							raise Empty -						self.not_empty.wait(remaining) -				# END handle timeout mode -			# END handle block -			 -			# can throw if we woke up because we are not writable anymore -			try: -				return self.popleft() -			except IndexError: -				raise Empty -			# END handle unblocking reason -		finally: -			self.mutex.release() -		# END assure lock is released - - -#} END utilities  | 
