summaryrefslogtreecommitdiff
path: root/lib/git/async/thread.py
blob: 7ca93c86ebe66cdcb07a5a80d0f982f06ee5cbdd (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
# -*- coding: utf-8 -*-
"""Module with threading utilities"""
__docformat__ = "restructuredtext"
import threading
import inspect
import Queue

#{ Decorators

def do_terminate_threads(whitelist=list()):
	"""Simple function which terminates all of our threads
	:param whitelist: If whitelist is given, only the given threads will be terminated"""
	for t in threading.enumerate():
		if not isinstance(t, TerminatableThread):
			continue
		if whitelist and t not in whitelist:
			continue
		if isinstance(t, WorkerThread):
			t.inq.put(t.quit)
		# END worker special handling
		t.stop_and_join()
	# END for each thread

def terminate_threads( func ):
	"""Kills all worker threads the method has created by sending the quit signal.
	This takes over in case of an error in the main function"""
	def wrapper(*args, **kwargs):
		cur_threads = set(threading.enumerate())
		try:
			return func(*args, **kwargs)
		finally:
			do_terminate_threads(set(threading.enumerate()) - cur_threads)
		# END finally shutdown threads
	# END wrapper 
	wrapper.__name__ = func.__name__
	return wrapper

#} END decorators

#{ Classes
	
class TerminatableThread(threading.Thread):
	"""A simple thread able to terminate itself on behalf of the user.
	
	Terminate a thread as follows:
	
	t.stop_and_join()
	
	Derived classes call _should_terminate() to determine whether they should 
	abort gracefully
	"""
	__slots__ = '_terminate'
	
	def __init__(self):
		super(TerminatableThread, self).__init__()
		self._terminate = False
		
		
	#{ Subclass Interface
	def _should_terminate(self):
		""":return: True if this thread should terminate its operation immediately"""
		return self._terminate
		
	def _terminated(self):
		"""Called once the thread terminated. Its called in the main thread
		and may perform cleanup operations"""
		pass

	def start(self):
		"""Start the thread and return self"""
		super(TerminatableThread, self).start()
		return self
	
	#} END subclass interface
		
	#{ Interface 
		
	def stop_and_join(self):
		"""Ask the thread to stop its operation and wait for it to terminate
		:note: Depending on the implenetation, this might block a moment"""
		self._terminate = True
		self.join()
		self._terminated()
	#} END interface
	

class WorkerThread(TerminatableThread):
	""" This base allows to call functions on class instances natively.
	As it is meant to work with a pool, the result of the call must be 
	handled by the callee.
	The thread runs forever unless it receives the terminate signal using 
	its task queue.
	
	Tasks could be anything, but should usually be class methods and arguments to
	allow the following:
	
	inq = Queue()
	w = WorkerThread(inq)
	w.start()
	inq.put((WorkerThread.<method>, args, kwargs))
	
	finally we call quit to terminate asap.
	
	alternatively, you can make a call more intuitively - the output is the output queue
	allowing you to get the result right away or later
	w.call(arg, kwarg='value').get()
	
	inq.put(WorkerThread.quit)
	w.join()
	
	You may provide the following tuples as task:
	t[0] = class method, function or instance method
	t[1] = optional, tuple or list of arguments to pass to the routine
	t[2] = optional, dictionary of keyword arguments to pass to the routine
	"""
	__slots__ = ('inq', 'outq')
	
	class InvalidRoutineError(Exception):
		"""Class sent as return value in case of an error"""
		
	def __init__(self, inq = None):
		super(WorkerThread, self).__init__()
		self.inq = inq or Queue.Queue()
	
	def call(self, function, *args, **kwargs):
		"""Method that makes the call to the worker using the input queue, 
		returning our output queue
		
		:param funciton: can be a standalone function unrelated to this class, 
			a class method of this class or any instance method.
			If it is a string, it will be considered a function residing on this instance
		:param args: arguments to pass to function
		:parma **kwargs: kwargs to pass to function"""
		self.inq.put((function, args, kwargs))
	
	def run(self):
		"""Process input tasks until we receive the quit signal"""
		while True:
			if self._should_terminate():
				break
			# END check for stop request
			routine = self.__class__.quit
			args = tuple()
			kwargs = dict()
			tasktuple = self.inq.get()
			
			if isinstance(tasktuple, (tuple, list)):
				if len(tasktuple) == 3:
					routine, args, kwargs = tasktuple
				elif len(tasktuple) == 2:
					routine, args = tasktuple
				elif len(tasktuple) == 1:
					routine = tasktuple[0]
				# END tasktuple length check
			elif inspect.isroutine(tasktuple):
				routine = tasktuple
			# END tasktuple handling
			
			try:
				rval = None
				if inspect.ismethod(routine):
					if routine.im_self is None:
						rval = routine(self, *args, **kwargs)
					else:
						rval = routine(*args, **kwargs)
				elif inspect.isroutine(routine):
					rval = routine(*args, **kwargs)
				elif isinstance(routine, basestring) and hasattr(self, routine):
					rval = getattr(self, routine)(*args, **kwargs)
				else:
					# ignore unknown items
					print "%s: task %s was not understood - terminating" % (self.getName(), str(tasktuple))
					break
				# END make routine call
			except StopIteration:
				break
			except Exception,e:
				print "%s: Task %s raised unhandled exception: %s" % (self.getName(), str(tasktuple), str(e))
			# END routine exception handling
		# END endless loop
	
	def quit(self):
		raise StopIteration
	
	
#} END classes