@@ -53,6 +53,10 @@ class TimeoutError(Error):
53
53
"""The operation exceeded the given deadline."""
54
54
pass
55
55
56
+ class InvalidStateError (Error ):
57
+ """The operation is not allowed in this state."""
58
+ pass
59
+
56
60
class _Waiter (object ):
57
61
"""Provides the event that wait() and as_completed() block on."""
58
62
def __init__ (self ):
@@ -170,6 +174,29 @@ def _create_and_install_waiters(fs, return_when):
170
174
171
175
return waiter
172
176
177
+
178
+ def _yield_finished_futures (fs , waiter , ref_collect ):
179
+ """
180
+ Iterate on the list *fs*, yielding finished futures one by one in
181
+ reverse order.
182
+ Before yielding a future, *waiter* is removed from its waiters
183
+ and the future is removed from each set in the collection of sets
184
+ *ref_collect*.
185
+
186
+ The aim of this function is to avoid keeping stale references after
187
+ the future is yielded and before the iterator resumes.
188
+ """
189
+ while fs :
190
+ f = fs [- 1 ]
191
+ for futures_set in ref_collect :
192
+ futures_set .remove (f )
193
+ with f ._condition :
194
+ f ._waiters .remove (waiter )
195
+ del f
196
+ # Careful not to keep a reference to the popped value
197
+ yield fs .pop ()
198
+
199
+
173
200
def as_completed (fs , timeout = None ):
174
201
"""An iterator over the given futures that yields each as it completes.
175
202
@@ -189,28 +216,30 @@ def as_completed(fs, timeout=None):
189
216
before the given timeout.
190
217
"""
191
218
if timeout is not None :
192
- end_time = timeout + time .time ()
219
+ end_time = timeout + time .monotonic ()
193
220
194
221
fs = set (fs )
222
+ total_futures = len (fs )
195
223
with _AcquireFutures (fs ):
196
224
finished = set (
197
225
f for f in fs
198
226
if f ._state in [CANCELLED_AND_NOTIFIED , FINISHED ])
199
227
pending = fs - finished
200
228
waiter = _create_and_install_waiters (fs , _AS_COMPLETED )
201
-
229
+ finished = list ( finished )
202
230
try :
203
- yield from finished
231
+ yield from _yield_finished_futures (finished , waiter ,
232
+ ref_collect = (fs ,))
204
233
205
234
while pending :
206
235
if timeout is None :
207
236
wait_timeout = None
208
237
else :
209
- wait_timeout = end_time - time .time ()
238
+ wait_timeout = end_time - time .monotonic ()
210
239
if wait_timeout < 0 :
211
240
raise TimeoutError (
212
241
'%d (of %d) futures unfinished' % (
213
- len (pending ), len ( fs ) ))
242
+ len (pending ), total_futures ))
214
243
215
244
waiter .event .wait (wait_timeout )
216
245
@@ -219,11 +248,13 @@ def as_completed(fs, timeout=None):
219
248
waiter .finished_futures = []
220
249
waiter .event .clear ()
221
250
222
- for future in finished :
223
- yield future
224
- pending .remove (future )
251
+ # reverse to keep finishing order
252
+ finished .reverse ()
253
+ yield from _yield_finished_futures (finished , waiter ,
254
+ ref_collect = (fs , pending ))
225
255
226
256
finally :
257
+ # Remove waiter from unfinished futures
227
258
for f in fs :
228
259
with f ._condition :
229
260
f ._waiters .remove (waiter )
@@ -373,7 +404,10 @@ def add_done_callback(self, fn):
373
404
if self ._state not in [CANCELLED , CANCELLED_AND_NOTIFIED , FINISHED ]:
374
405
self ._done_callbacks .append (fn )
375
406
return
376
- fn (self )
407
+ try :
408
+ fn (self )
409
+ except Exception :
410
+ LOGGER .exception ('exception calling callback for %r' , self )
377
411
378
412
def result (self , timeout = None ):
379
413
"""Return the result of the call that the future represents.
@@ -486,6 +520,8 @@ def set_result(self, result):
486
520
Should only be used by Executor implementations and unit tests.
487
521
"""
488
522
with self ._condition :
523
+ if self ._state in {CANCELLED , CANCELLED_AND_NOTIFIED , FINISHED }:
524
+ raise InvalidStateError ('{}: {!r}' .format (self ._state , self ))
489
525
self ._result = result
490
526
self ._state = FINISHED
491
527
for waiter in self ._waiters :
@@ -499,6 +535,8 @@ def set_exception(self, exception):
499
535
Should only be used by Executor implementations and unit tests.
500
536
"""
501
537
with self ._condition :
538
+ if self ._state in {CANCELLED , CANCELLED_AND_NOTIFIED , FINISHED }:
539
+ raise InvalidStateError ('{}: {!r}' .format (self ._state , self ))
502
540
self ._exception = exception
503
541
self ._state = FINISHED
504
542
for waiter in self ._waiters :
@@ -509,7 +547,7 @@ def set_exception(self, exception):
509
547
class Executor (object ):
510
548
"""This is an abstract base class for concrete asynchronous executors."""
511
549
512
- def submit (self , fn , * args , ** kwargs ):
550
+ def submit (* args , ** kwargs ):
513
551
"""Submits a callable to be executed with the given arguments.
514
552
515
553
Schedules the callable to be executed as fn(*args, **kwargs) and returns
@@ -518,7 +556,21 @@ def submit(self, fn, *args, **kwargs):
518
556
Returns:
519
557
A Future representing the given call.
520
558
"""
559
+ if len (args ) >= 2 :
560
+ pass
561
+ elif not args :
562
+ raise TypeError ("descriptor 'submit' of 'Executor' object "
563
+ "needs an argument" )
564
+ elif 'fn' in kwargs :
565
+ import warnings
566
+ warnings .warn ("Passing 'fn' as keyword argument is deprecated" ,
567
+ DeprecationWarning , stacklevel = 2 )
568
+ else :
569
+ raise TypeError ('submit expected at least 1 positional argument, '
570
+ 'got %d' % (len (args )- 1 ))
571
+
521
572
raise NotImplementedError ()
573
+ submit .__text_signature__ = '($self, fn, /, *args, **kwargs)'
522
574
523
575
def map (self , fn , * iterables , timeout = None , chunksize = 1 ):
524
576
"""Returns an iterator equivalent to map(fn, iter).
@@ -543,19 +595,22 @@ def map(self, fn, *iterables, timeout=None, chunksize=1):
543
595
Exception: If fn(*args) raises for any values.
544
596
"""
545
597
if timeout is not None :
546
- end_time = timeout + time .time ()
598
+ end_time = timeout + time .monotonic ()
547
599
548
600
fs = [self .submit (fn , * args ) for args in zip (* iterables )]
549
601
550
602
# Yield must be hidden in closure so that the futures are submitted
551
603
# before the first iterator value is required.
552
604
def result_iterator ():
553
605
try :
554
- for future in fs :
606
+ # reverse to keep finishing order
607
+ fs .reverse ()
608
+ while fs :
609
+ # Careful not to keep a reference to the popped future
555
610
if timeout is None :
556
- yield future .result ()
611
+ yield fs . pop () .result ()
557
612
else :
558
- yield future . result (end_time - time .time ())
613
+ yield fs . pop (). result (end_time - time .monotonic ())
559
614
finally :
560
615
for future in fs :
561
616
future .cancel ()
@@ -580,3 +635,9 @@ def __enter__(self):
580
635
def __exit__ (self , exc_type , exc_val , exc_tb ):
581
636
self .shutdown (wait = True )
582
637
return False
638
+
639
+
640
+ class BrokenExecutor (RuntimeError ):
641
+ """
642
+ Raised when a executor has become non-functional after a severe failure.
643
+ """
0 commit comments