| 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
 | from graph import Node
from util import ReadOnly
import threading
import weakref
import sys
import new
__all__ = ('OutputChannelTask', 'ThreadTaskBase', 'InputIteratorTaskBase', 
			'InputIteratorThreadTask', 'InputChannelTask')
class OutputChannelTask(Node):
	"""Abstracts a named task, which contains 
	additional information on how the task should be queued and processed.
	
	Results of the item processing are sent to a write channel, which is to be 
	set by the creator using the ``set_writer`` method.
	
	* **min_count** assures that not less than min_count items will be processed per call.
	* **max_chunksize** assures that multi-threading is happening in smaller chunks. If 
	 someone wants all items to be processed, using read(0), the whole task would go to
	 one worker, as well as dependent tasks. If you want finer granularity , you can 
	 specify this here, causing chunks to be no larger than max_chunksize"""
	__slots__ = (	'_read',			# method to yield items to process 
					'_out_writer', 			# output write channel
					'_exc',				# exception caught
					'_done',			# True if we are done
					'_num_writers',		# number of concurrent writers
					'_wlock',			# lock for the above
					'fun',				# function to call with items read
					'min_count', 		# minimum amount of items to produce, None means no override
					'max_chunksize',	# maximium amount of items to process per process call
					'apply_single'		# apply single items even if multiple where read
					)
	
	def __init__(self, id, fun, apply_single=True, min_count=None, max_chunksize=0, 
					writer=None):
		Node.__init__(self, id)
		self._read = None					# to be set by subclasss 
		self._out_writer = writer
		self._exc = None
		self._done = False
		self._num_writers = 0
		self._wlock = threading.Lock()
		self.fun = fun
		self.min_count = None
		self.max_chunksize = 0				# note set
		self.apply_single = apply_single
	
	def is_done(self):
		""":return: True if we are finished processing"""
		return self._done
		
	def set_done(self):
		"""Set ourselves to being done, has we have completed the processing"""
		self._done = True
		
	def set_writer(self, writer):
		"""Set the write channel to the given one"""
		self._out_writer = writer
		
	def writer(self):
		""":return: a proxy to our write channel or None if non is set
		:note: you must not hold a reference to our write channel when the 
			task is being processed. This would cause the write channel never 
			to be closed as the task will think there is still another instance
			being processed which can close the channel once it is done.
			In the worst case, this will block your reads."""
		if self._out_writer is None:
			return None
		return self._out_writer
		
	def close(self):
		"""A closed task will close its channel to assure the readers will wake up
		:note: its safe to call this method multiple times"""
		self._out_writer.close()
		
	def is_closed(self):
		""":return: True if the task's write channel is closed"""
		return self._out_writer.closed()
		
	def error(self):
		""":return: Exception caught during last processing or None"""
		return self._exc
	def process(self, count=0):
		"""Process count items and send the result individually to the output channel"""
		# first thing: increment the writer count - other tasks must be able 
		# to respond properly ( even if it turns out we don't need it later )
		self._wlock.acquire()
		self._num_writers += 1
		self._wlock.release()
		
		items = self._read(count)
		
		try:
			try:
				if items:
					write = self._out_writer.write
					if self.apply_single:
						for item in items:
							rval = self.fun(item)
							write(rval)
						# END for each item
					else:
						# shouldn't apply single be the default anyway ? 
						# The task designers should chunk them up in advance
						rvals = self.fun(items)
						for rval in rvals:
							write(rval)
					# END handle single apply
				# END if there is anything to do
			finally:
				self._wlock.acquire()
				self._num_writers -= 1
				self._wlock.release()
			# END handle writer count
		except Exception, e:
			# be sure our task is not scheduled again
			self.set_done()
			
			# PROBLEM: We have failed to create at least one item, hence its not 
			# garantueed that enough items will be produced for a possibly blocking
			# client on the other end. This is why we have no other choice but
			# to close the channel, preventing the possibility of blocking.
			# This implies that dependent tasks will go down with us, but that is
			# just the right thing to do of course - one loose link in the chain ...
			# Other chunks of our kind currently being processed will then 
			# fail to write to the channel and fail as well
			self.close()
			
			# If some other chunk of our Task had an error, the channel will be closed
			# This is not an issue, just be sure we don't overwrite the original 
			# exception with the ReadOnly error that would be emitted in that case.
			# We imply that ReadOnly is exclusive to us, as it won't be an error
			# if the user emits it
			if not isinstance(e, ReadOnly):
				self._exc = e
			# END set error flag
		# END exception handling
		
		
		# if we didn't get all demanded items, which is also the case if count is 0
		# we have depleted the input channel and are done
		# We could check our output channel for how many items we have and put that 
		# into the equation, but whats important is that we were asked to produce
		# count items.
		if not items or len(items) != count:
			self.set_done()
		# END handle done state
		
		# If we appear to be the only one left with our output channel, and are 
		# done ( this could have been set in another thread as well ), make 
		# sure to close the output channel.
		# Waiting with this to be the last one helps to keep the 
		# write-channel writable longer
		# The count is: 1 = wc itself, 2 = first reader channel, + x for every 
		# thread having its copy on the stack 
		# + 1 for the instance we provide to refcount
		# Soft close, so others can continue writing their results
		if self.is_done():
			self._wlock.acquire()
			try:
				if self._num_writers == 0:
					self.close()
				# END handle writers
			finally:
				self._wlock.release()
			# END assure lock release
		# END handle channel closure
	#{ Configuration
class ThreadTaskBase(object):
	"""Describes tasks which can be used with theaded pools"""
	pass
class InputIteratorTaskBase(OutputChannelTask):
	"""Implements a task which processes items from an iterable in a multi-processing 
	safe manner"""
	__slots__ = ('_iterator', '_lock', '_empty')
	# the type of the lock to use when reading from the iterator
	lock_type = None
	
	def __init__(self, iterator, *args, **kwargs):
		OutputChannelTask.__init__(self, *args, **kwargs)
		if not hasattr(iterator, 'next'):
			raise ValueError("Iterator %r needs a next() function" % iterator)
		self._iterator = iterator
		self._lock = self.lock_type()
		
		# this is necessary to prevent a cyclic ref, preventing us from 
		# getting deleted ( and collected )
		weakself = weakref.ref(self)
		self._read = lambda count: weakself().__read(count)
		self._empty = False
		
		# defaults to returning our items unchanged
		self.fun = lambda item: item
		
	def __read(self, count=0):
		"""Read count items from the iterator, and return them"""
		# not threadsafe, but worst thing that could happen is that 
		# we try to get items one more time
		if self._empty:
			return list()
		# END early abort
		
		self._lock.acquire()
		try:
			if count == 0:
				self._empty = True
				return list(self._iterator)
			else:
				out = list()
				it = self._iterator
				for i in xrange(count):
					try:
						out.append(it.next())
					except StopIteration:
						self._empty = True
						break
					# END handle empty iterator
				# END for each item to take
				return out
			# END handle count
		finally:
			self._lock.release()
		# END handle locking
		
		
class InputIteratorThreadTask(InputIteratorTaskBase, ThreadTaskBase):
	"""An input iterator for threaded pools"""
	lock_type = threading.Lock
		
class InputChannelTask(OutputChannelTask, ThreadTaskBase):
	"""Uses an input channel as source for reading items
	For instantiation, it takes all arguments of its base, the first one needs
	to be the input channel to read from though."""
	__slots__ = "_pool_ref"
	
	def __init__(self, in_reader, *args, **kwargs):
		OutputChannelTask.__init__(self, *args, **kwargs)
		self._read = in_reader.read
		self._pool_ref = None
	#{ Internal Interface 
	
	def reader(self):
		""":return: input channel from which we read"""
		# the instance is bound in its instance method - lets use this to keep
		# the refcount at one ( per consumer )
		return self._read.im_self
		
	def set_read(self, read):
		"""Adjust the read method to the given one"""
		self._read = read
		
	def set_pool(self, pool):
		self._pool_ref = weakref.ref(pool)
		
	def pool(self):
		""":return: pool we are attached to, or None"""
		if self._pool_ref is None:
			return None
		return self._pool_ref()
		
	#} END intenral interface
 |