Skip to content

Commit

Permalink
Fixes for IOLoop.run_in_executor
Browse files Browse the repository at this point in the history
Correctly transform the concurrent Future into a Tornado Future
and test that the result is usable with await.
  • Loading branch information
bdarnell committed Oct 22, 2017
1 parent fa34091 commit a3b44cd
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 24 deletions.
10 changes: 7 additions & 3 deletions tornado/ioloop.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
import traceback
import math

from tornado.concurrent import TracebackFuture, is_future
from tornado.concurrent import TracebackFuture, is_future, chain_future
from tornado.log import app_log, gen_log
from tornado.platform.auto import set_close_exec, Waker
from tornado import stack_context
Expand Down Expand Up @@ -655,8 +655,12 @@ def run_in_executor(self, executor, func, *args):
from tornado.process import cpu_count
self._executor = ThreadPoolExecutor(max_workers=(cpu_count() * 5))
executor = self._executor

return executor.submit(func, *args)
c_future = executor.submit(func, *args)
# Concurrent Futures are not usable with await. Wrap this in a
# Tornado Future instead, using self.add_future for thread-safety.
t_future = TracebackFuture()
self.add_future(c_future, lambda f: chain_future(f, t_future))
return t_future

def set_default_executor(self, executor):
"""Sets the default executor to use with :meth:`run_in_executor`."""
Expand Down
56 changes: 35 additions & 21 deletions tornado/test/ioloop_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -604,56 +604,70 @@ def test_run_in_executor_gen(self):
event1 = threading.Event()
event2 = threading.Event()

def callback(self_event, other_event):
def sync_func(self_event, other_event):
self_event.set()
time.sleep(0.01)
self.assertTrue(other_event.is_set())
other_event.wait()
# Note that return value doesn't actually do anything,
# it is just passed through to our final assertion to
# make sure it is passed through properly.
return self_event

# Run two synchronous functions, which would deadlock if not
# run in parallel.
res = yield [
IOLoop.current().run_in_executor(None, callback, event1, event2),
IOLoop.current().run_in_executor(None, callback, event2, event1)
IOLoop.current().run_in_executor(None, sync_func, event1, event2),
IOLoop.current().run_in_executor(None, sync_func, event2, event1)
]

self.assertEqual([event1, event2], res)

@skipBefore35
@gen_test
def test_run_in_executor_native(self):
event1 = threading.Event()
event2 = threading.Event()

def callback(self_event, other_event):
def sync_func(self_event, other_event):
self_event.set()
time.sleep(0.01)
self.assertTrue(other_event.is_set())
other_event.wait()
return self_event

# Go through an async wrapper to ensure that the result of
# run_in_executor works with await and not just gen.coroutine
# (simply passing the underlying concurrrent future would do that).
namespace = exec_test(globals(), locals(), """
async def main():
res = await gen.multi([
IOLoop.current().run_in_executor(None, callback, event1, event2),
IOLoop.current().run_in_executor(None, callback, event2, event1)
])
self.assertEqual([event1, event2], res)
async def async_wrapper(self_event, other_event):
return await IOLoop.current().run_in_executor(
None, sync_func, self_event, other_event)
""")
IOLoop.current().run_sync(namespace['main'])

res = yield [
namespace["async_wrapper"](event1, event2),
namespace["async_wrapper"](event2, event1)
]

self.assertEqual([event1, event2], res)

@gen_test
def test_set_default_executor(self):
class MyExecutor(futures.Executor):
count = [0]

class MyExecutor(futures.ThreadPoolExecutor):
def submit(self, func, *args):
return Future()
count[0] += 1
return super(MyExecutor, self).submit(func, *args)

event = threading.Event()

def future_func():
def sync_func():
event.set()

executor = MyExecutor()
executor = MyExecutor(1)
loop = IOLoop.current()
loop.set_default_executor(executor)
loop.run_in_executor(None, future_func)
loop.add_timeout(0.01, lambda: self.assertFalse(event.is_set()))
yield loop.run_in_executor(None, sync_func)
self.assertEqual(1, count[0])
self.assertTrue(event.is_set())


class TestIOLoopRunSync(unittest.TestCase):
Expand Down

0 comments on commit a3b44cd

Please sign in to comment.