summaryrefslogtreecommitdiff
path: root/lib/git/async/thread.py
blob: 4240a664e20960fbcab128d2e4282f26eafb0397 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
# -*- coding: utf-8 -*-
"""Module with threading utilities"""
__docformat__ = "restructuredtext"
import threading
import inspect
import Queue

#{ Decorators

def do_terminate_threads(whitelist=list()):
	"""Simple function which terminates all of our threads
	:param whitelist: If whitelist is given, only the given threads will be terminated"""
	for t in threading.enumerate():
		if not isinstance(t, TerminatableThread):
			continue
		if whitelist and t not in whitelist:
			continue
		t.stop_and_join()
	# END for each thread

def terminate_threads( func ):
	"""Kills all worker threads the method has created by sending the quit signal.
	This takes over in case of an error in the main function"""
	def wrapper(*args, **kwargs):
		cur_threads = set(threading.enumerate())
		try:
			return func(*args, **kwargs)
		finally:
			do_terminate_threads(set(threading.enumerate()) - cur_threads)
		# END finally shutdown threads
	# END wrapper 
	wrapper.__name__ = func.__name__
	return wrapper

#} END decorators

#{ Classes
	
class TerminatableThread(threading.Thread):
	"""A simple thread able to terminate itself on behalf of the user.
	
	Terminate a thread as follows:
	
	t.stop_and_join()
	
	Derived classes call _should_terminate() to determine whether they should 
	abort gracefully
	"""
	__slots__ = '_terminate'
	
	def __init__(self):
		super(TerminatableThread, self).__init__()
		self._terminate = False
		
		
	#{ Subclass Interface
	def _should_terminate(self):
		""":return: True if this thread should terminate its operation immediately"""
		return self._terminate
		
	def _terminated(self):
		"""Called once the thread terminated. Its called in the main thread
		and may perform cleanup operations"""
		pass

	def start(self):
		"""Start the thread and return self"""
		super(TerminatableThread, self).start()
		return self
	
	#} END subclass interface
		
	#{ Interface 
		
	def stop_and_join(self):
		"""Ask the thread to stop its operation and wait for it to terminate
		:note: Depending on the implenetation, this might block a moment"""
		self._terminate = True
		self.join()
		self._terminated()
	#} END interface
	

class WorkerThread(TerminatableThread):
	""" This base allows to call functions on class instances natively.
	As it is meant to work with a pool, the result of the call must be 
	handled by the callee.
	The thread runs forever unless it receives the terminate signal using 
	its task queue.
	
	Tasks could be anything, but should usually be class methods and arguments to
	allow the following:
	
	inq = Queue()
	w = WorkerThread(inq)
	w.start()
	inq.put((WorkerThread.<method>, args, kwargs))
	
	finally we call quit to terminate asap.
	
	alternatively, you can make a call more intuitively - the output is the output queue
	allowing you to get the result right away or later
	w.call(arg, kwarg='value').get()
	
	inq.put(WorkerThread.quit)
	w.join()
	
	You may provide the following tuples as task:
	t[0] = class method, function or instance method
	t[1] = optional, tuple or list of arguments to pass to the routine
	t[2] = optional, dictionary of keyword arguments to pass to the routine
	"""
	__slots__ = ('inq', '_current_routine')
	
	
	# define how often we should check for a shutdown request in case our 
	# taskqueue is empty
	shutdown_check_time_s = 0.5
	
	def __init__(self, inq = None):
		super(WorkerThread, self).__init__()
		self.inq = inq or Queue.Queue()
		self._current_routine = None				# routine we execute right now
	
	def run(self):
		"""Process input tasks until we receive the quit signal"""
		while True:
			self._current_routine = None
			if self._should_terminate():
				break
			# END check for stop request
			
			# don't wait too long, instead check for the termination request more often
			try:
				tasktuple = self.inq.get(True, 1)
			except Queue.Empty:
				continue
			# END get task with timeout
			
			# needing exactly one function, and one arg
			assert len(tasktuple) == 2, "Need tuple of function, arg - it could be more flexible, but its reduced to what we need"
			routine, arg = tasktuple
			
			self._current_routine = routine
			
			try:
				rval = None
				if inspect.ismethod(routine):
					if routine.im_self is None:
						rval = routine(self, arg)
					else:
						rval = routine(arg)
				elif inspect.isroutine(routine):
					rval = routine(arg)
				else:
					# ignore unknown items
					print "%s: task %s was not understood - terminating" % (self.getName(), str(tasktuple))
					break
				# END make routine call
			except Exception,e:
				print "%s: Task %s raised unhandled exception: %s - this really shouldn't happen !" % (self.getName(), str(tasktuple), str(e))
				break	# abort ... 
			# END routine exception handling
		# END endless loop
	
	def routine(self):
		""":return: routine we are currently executing, or None if we have no task"""
		return self._current_routine
	
	
#} END classes