diff options
| author | Sebastian Thiel <byronimo@gmail.com> | 2010-06-07 19:12:44 +0200 | 
|---|---|---|
| committer | Sebastian Thiel <byronimo@gmail.com> | 2010-06-07 19:12:44 +0200 | 
| commit | 583cd8807259a69fc01874b798f657c1f9ab7828 (patch) | |
| tree | 046847c4dcd33f5b30c00ff65770039fc72dd148 /lib/git/async/util.py | |
| parent | edd9e23c766cfd51b3a6f6eee5aac0b791ef2fd0 (diff) | |
| download | gitpython-583cd8807259a69fc01874b798f657c1f9ab7828.tar.gz | |
Moved pool utilities into util module, fixed critical issue that caused havok - lets call this a safe-state
Diffstat (limited to 'lib/git/async/util.py')
| -rw-r--r-- | lib/git/async/util.py | 106 | 
1 files changed, 106 insertions, 0 deletions
| diff --git a/lib/git/async/util.py b/lib/git/async/util.py index dabd8a42..432d1736 100644 --- a/lib/git/async/util.py +++ b/lib/git/async/util.py @@ -1,8 +1,23 @@  """Module with utilities related to async operations""" +from threading import ( +	Lock, +	_Condition,  +	_sleep, +	_time, +	) + +from Queue import ( +		Queue,  +		Empty, +		) + +from collections import deque  import sys  import os +#{ Routines  +  def cpu_count():  	""":return:number of CPUs in the system  	:note: inspired by multiprocessing""" @@ -22,3 +37,94 @@ def cpu_count():  		raise NotImplementedError('cannot determine number of cpus')  	return num +	 +#} END routines + + +class SyncQueue(deque): +	"""Adapter to allow using a deque like a queue, without locking""" +	def get(self, block=True, timeout=None): +		try: +			return self.pop() +		except IndexError: +			raise Empty +		# END raise empty +			 +	def empty(self): +		return len(self) == 0 +		 +	put = deque.append +	 +	 +class HSCondition(_Condition): +	"""An attempt to make conditions less blocking, which gains performance  +	in return by sleeping less""" +	delay = 0.00002		# reduces wait times, but increases overhead +	 +	def wait(self, timeout=None): +		waiter = Lock() +		waiter.acquire() +		self.__dict__['_Condition__waiters'].append(waiter) +		saved_state = self._release_save() +		try:	# restore state no matter what (e.g., KeyboardInterrupt) +			if timeout is None: +				waiter.acquire() +			else: +				# Balancing act:  We can't afford a pure busy loop, so we +				# have to sleep; but if we sleep the whole timeout time, +				# we'll be unresponsive.  The scheme here sleeps very +				# little at first, longer as time goes on, but never longer +				# than 20 times per second (or the timeout time remaining). +				endtime = _time() + timeout +				delay = self.delay +				acquire = waiter.acquire +				while True: +					gotit = acquire(0) +					if gotit: +						break +					remaining = endtime - _time() +					if remaining <= 0: +						break +					delay = min(delay * 2, remaining, .05) +					_sleep(delay) +				# END endless loop +				if not gotit: +					try: +						self.__dict__['_Condition__waiters'].remove(waiter) +					except ValueError: +						pass +				# END didn't ever get it +		finally: +			self._acquire_restore(saved_state) +			 +	def notify(self, n=1): +		__waiters = self.__dict__['_Condition__waiters'] +		if not __waiters: +			return +		if n == 1: +			__waiters[0].release() +			try: +				__waiters.pop(0) +			except IndexError: +				pass +		else: +			waiters = __waiters[:n] +			for waiter in waiters: +				waiter.release() +				try: +					__waiters.remove(waiter) +				except ValueError: +					pass +		# END handle n = 1 case faster +	 +class AsyncQueue(Queue): +	"""A queue using different condition objects to gain multithreading performance""" +	def __init__(self, maxsize=0): +		Queue.__init__(self, maxsize) +		 +		self.not_empty = HSCondition(self.mutex) +		self.not_full = HSCondition(self.mutex) +		self.all_tasks_done = HSCondition(self.mutex) +		 +	 +#} END utilities | 
