diff options
| author | Sebastian Thiel <byronimo@gmail.com> | 2010-06-12 14:34:09 +0200 | 
|---|---|---|
| committer | Sebastian Thiel <byronimo@gmail.com> | 2010-06-12 14:34:09 +0200 | 
| commit | 86ea63504f3e8a74cfb1d533be9d9602d2d17e27 (patch) | |
| tree | a2c59af267666a4b44bda748b806585c46faae99 /lib/git/async/util.py | |
| parent | f91495e271597034226f1b9651345091083172c4 (diff) | |
| download | gitpython-86ea63504f3e8a74cfb1d533be9d9602d2d17e27.tar.gz | |
Removed async from tree
Diffstat (limited to 'lib/git/async/util.py')
| -rw-r--r-- | lib/git/async/util.py | 268 | 
1 files changed, 0 insertions, 268 deletions
| diff --git a/lib/git/async/util.py b/lib/git/async/util.py deleted file mode 100644 index 4c4f3929..00000000 --- a/lib/git/async/util.py +++ /dev/null @@ -1,268 +0,0 @@ -"""Module with utilities related to async operations""" - -from threading import ( -	Lock, -	_allocate_lock, -	_Condition,  -	_sleep, -	_time, -	) - -from Queue import ( -		Empty, -		) - -from collections import deque -import sys -import os - -#{ Routines  - -def cpu_count(): -	""":return:number of CPUs in the system -	:note: inspired by multiprocessing""" -	num = 0 -	try: -		if sys.platform == 'win32': -			num = int(os.environ['NUMBER_OF_PROCESSORS']) -		elif 'bsd' in sys.platform or sys.platform == 'darwin': -			num = int(os.popen('sysctl -n hw.ncpu').read()) -		else: -			num = os.sysconf('SC_NPROCESSORS_ONLN') -	except (ValueError, KeyError, OSError, AttributeError): -		pass -	# END exception handling -	 -	if num == 0: -		raise NotImplementedError('cannot determine number of cpus') -	 -	return num -	 -#} END routines - - - -class DummyLock(object): -	"""An object providing a do-nothing lock interface for use in sync mode""" -	__slots__ = tuple() -	 -	def acquire(self): -		pass -	 -	def release(self): -		pass -	 - -class SyncQueue(deque): -	"""Adapter to allow using a deque like a queue, without locking""" -	def get(self, block=True, timeout=None): -		try: -			return self.popleft() -		except IndexError: -			raise Empty -		# END raise empty - -	def empty(self): -		return len(self) == 0 -		 -	def set_writable(self, state): -		pass -	 -	def writable(self): -		return True - -	def put(self, item, block=True, timeout=None): -		self.append(item) -	 - -class HSCondition(deque): -	"""Cleaned up code of the original condition object in order  -	to make it run and respond faster.""" -	__slots__ = ("_lock") -	delay = 0.0002					# reduces wait times, but increases overhead -	 -	def __init__(self, lock=None): -		if lock is None: -			lock = Lock() -		self._lock = lock - -	def release(self): -		self._lock.release() -		 -	def acquire(self, block=None): -		if block is None: -			self._lock.acquire() -		else: -			self._lock.acquire(block) - -	def wait(self, timeout=None): -		waiter = _allocate_lock() -		waiter.acquire()				# get it the first time, no blocking -		self.append(waiter) -		 -		 -		try: -			# restore state no matter what (e.g., KeyboardInterrupt) -			# now we block, as we hold the lock already -			# in the momemnt we release our lock, someone else might actually resume -			self._lock.release() -			if timeout is None: -				waiter.acquire() -			else: -				# Balancing act:  We can't afford a pure busy loop, because of the  -				# GIL, so we have to sleep -				# We try to sleep only tiny amounts of time though to be very responsive -				# NOTE: this branch is not used by the async system anyway, but  -				# will be hit when the user reads with timeout  -				endtime = _time() + timeout -				delay = self.delay -				acquire = waiter.acquire -				while True: -					gotit = acquire(0) -					if gotit: -						break -					remaining = endtime - _time() -					if remaining <= 0: -						break -					# this makes 4 threads working as good as two, but of course -					# it causes more frequent micro-sleeping -					#delay = min(delay * 2, remaining, .05) -					_sleep(delay) -				# END endless loop -				if not gotit: -					try: -						self.remove(waiter) -					except ValueError: -						pass -				# END didn't ever get it -		finally: -			# reacquire the lock  -			self._lock.acquire() -		# END assure release lock -			 -	def notify(self, n=1): -		"""Its vital that this method is threadsafe - we absolutely have to  -		get a lock at the beginning of this method to be sure we get the  -		correct amount of waiters back. If we bail out, although a waiter -		is about to be added, it will miss its wakeup notification, and block -		forever (possibly)""" -		self._lock.acquire() -		try: -			if not self:	# len(self) == 0, but this should be faster -				return -			if n == 1: -				try: -					self.popleft().release() -				except IndexError: -					pass -			else: -				for i in range(min(n, len(self))): -					self.popleft().release() -				# END for each waiter to resume -			# END handle n = 1 case faster -		finally: -			self._lock.release() -		# END assure lock is released -	 -	def notify_all(self): -		self.notify(len(self)) -		 - -class ReadOnly(Exception): -	"""Thrown when trying to write to a read-only queue""" - -class AsyncQueue(deque): -	"""A queue using different condition objects to gain multithreading performance. -	Additionally it has a threadsafe writable flag, which will alert all readers -	that there is nothing more to get here. -	All default-queue code was cleaned up for performance.""" -	__slots__ = ('mutex', 'not_empty', '_writable') -	 -	def __init__(self, maxsize=0): -		self.mutex = Lock() -		self.not_empty = HSCondition(self.mutex) -		self._writable = True -		 -	def qsize(self): -		self.mutex.acquire() -		try: -			return len(self) -		finally: -			self.mutex.release() - -	def writable(self): -		self.mutex.acquire() -		try: -			return self._writable -		finally: -			self.mutex.release() - -	def set_writable(self, state): -		"""Set the writable flag of this queue to True or False -		:return: The previous state""" -		self.mutex.acquire() -		try: -			old = self._writable -			self._writable = state -			return old -		finally: -			self.mutex.release() -			# if we won't receive anymore items, inform the getters -			if not state: -				self.not_empty.notify_all() -			# END tell everyone -		# END handle locking - -	def empty(self): -		self.mutex.acquire() -		try: -			return not len(self) -		finally: -			self.mutex.release() - -	def put(self, item, block=True, timeout=None): -		self.mutex.acquire() -		# NOTE: we explicitly do NOT check for our writable state -		# Its just used as a notification signal, and we need to be able  -		# to continue writing to prevent threads ( easily ) from failing -		# to write their computed results, which we want in fact -		# NO: we want them to fail and stop processing, as the one who caused -		# the channel to close had a reason and wants the threads to  -		# stop on the task as soon as possible -		if not self._writable: -			self.mutex.release() -			raise ReadOnly -		# END handle read-only -		self.append(item) -		self.mutex.release() -		self.not_empty.notify() -		 -	def get(self, block=True, timeout=None): -		self.mutex.acquire() -		try: -			if block: -				if timeout is None: -					while not len(self) and self._writable: -						self.not_empty.wait() -				else: -					endtime = _time() + timeout -					while not len(self) and self._writable: -						remaining = endtime - _time() -						if remaining <= 0.0: -							raise Empty -						self.not_empty.wait(remaining) -				# END handle timeout mode -			# END handle block -			 -			# can throw if we woke up because we are not writable anymore -			try: -				return self.popleft() -			except IndexError: -				raise Empty -			# END handle unblocking reason -		finally: -			self.mutex.release() -		# END assure lock is released - - -#} END utilities | 
