Skip to content

Commit

Permalink
test add IGNORE_ALL, join every thread, make sure ioloop always stop in
Browse files Browse the repository at this point in the history
its own thread
  • Loading branch information
binux committed Mar 6, 2016
1 parent dd8562a commit 5399c8d
Show file tree
Hide file tree
Showing 11 changed files with 63 additions and 53 deletions.
2 changes: 1 addition & 1 deletion pyspider/fetcher/tornado_fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -541,7 +541,7 @@ def quit(self):
'''Quit fetcher'''
self._running = False
self._quit = True
self.ioloop.stop()
self.ioloop.add_callback(self.ioloop.stop)

def size(self):
return self.http_client.size()
Expand Down
2 changes: 1 addition & 1 deletion pyspider/scheduler/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -845,7 +845,7 @@ def quit_pyspider():
'quit_pyspider() - Close pyspider'
)
if not is_crawled:
self.ioloop.stop()
self.ioloop.add_callback(self.ioloop.stop)

def __getattr__(self, name):
"""patch for crawl(url, callback=self.index_page) API"""
Expand Down
8 changes: 4 additions & 4 deletions pyspider/webui/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,12 @@ def run(self, host=None, port=None, debug=None, **options):
autoreload.start()

self.logger.info('webui running on %s:%s', hostname, port)
tornado.ioloop.IOLoop.current().start()
self.ioloop = tornado.ioloop.IOLoop.current()
self.ioloop.start()

def quit(self):
import tornado.ioloop

tornado.ioloop.IOLoop.current().stop()
if hasattr(self, 'ioloop'):
self.ioloop.add_callback(self.ioloop.stop)
self.logger.info('webui exiting...')


Expand Down
32 changes: 16 additions & 16 deletions tests/test_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,7 @@ def tearDownClass(self):
del self.resultdb


@unittest.skipIf(os.environ.get('IGNORE_MYSQL'), 'no mysql server for test.')
@unittest.skipIf(os.environ.get('IGNORE_MYSQL') or os.environ.get('IGNORE_ALL'), 'no mysql server for test.')
class TestMysqlTaskDB(TaskDBCase, unittest.TestCase):

@classmethod
Expand All @@ -365,7 +365,7 @@ def tearDownClass(self):
self.taskdb._execute('DROP DATABASE pyspider_test_taskdb')


@unittest.skipIf(os.environ.get('IGNORE_MYSQL'), 'no mysql server for test.')
@unittest.skipIf(os.environ.get('IGNORE_MYSQL') or os.environ.get('IGNORE_ALL'), 'no mysql server for test.')
class TestMysqlProjectDB(ProjectDBCase, unittest.TestCase):

@classmethod
Expand All @@ -379,7 +379,7 @@ def tearDownClass(self):
self.projectdb._execute('DROP DATABASE pyspider_test_projectdb')


@unittest.skipIf(os.environ.get('IGNORE_MYSQL'), 'no mysql server for test.')
@unittest.skipIf(os.environ.get('IGNORE_MYSQL') or os.environ.get('IGNORE_ALL'), 'no mysql server for test.')
class TestMysqlResultDB(ResultDBCase, unittest.TestCase):

@classmethod
Expand All @@ -393,7 +393,7 @@ def tearDownClass(self):
self.resultdb._execute('DROP DATABASE pyspider_test_resultdb')


@unittest.skipIf(os.environ.get('IGNORE_MONGODB'), 'no mongodb server for test.')
@unittest.skipIf(os.environ.get('IGNORE_MONGODB') or os.environ.get('IGNORE_ALL'), 'no mongodb server for test.')
class TestMongoDBTaskDB(TaskDBCase, unittest.TestCase):

@classmethod
Expand All @@ -407,7 +407,7 @@ def tearDownClass(self):
self.taskdb.conn.drop_database(self.taskdb.database.name)


@unittest.skipIf(os.environ.get('IGNORE_MONGODB'), 'no mongodb server for test.')
@unittest.skipIf(os.environ.get('IGNORE_MONGODB') or os.environ.get('IGNORE_ALL'), 'no mongodb server for test.')
class TestMongoDBProjectDB(ProjectDBCase, unittest.TestCase):

@classmethod
Expand All @@ -421,7 +421,7 @@ def tearDownClass(self):
self.projectdb.conn.drop_database(self.projectdb.database.name)


@unittest.skipIf(os.environ.get('IGNORE_MONGODB'), 'no mongodb server for test.')
@unittest.skipIf(os.environ.get('IGNORE_MONGODB') or os.environ.get('IGNORE_ALL'), 'no mongodb server for test.')
class TestMongoDBResultDB(ResultDBCase, unittest.TestCase):

@classmethod
Expand All @@ -435,7 +435,7 @@ def tearDownClass(self):
self.resultdb.conn.drop_database(self.resultdb.database.name)


@unittest.skipIf(os.environ.get('IGNORE_MYSQL'), 'no mysql server for test.')
@unittest.skipIf(os.environ.get('IGNORE_MYSQL') or os.environ.get('IGNORE_ALL'), 'no mysql server for test.')
class TestSQLAlchemyMySQLTaskDB(TaskDBCase, unittest.TestCase):

@classmethod
Expand All @@ -449,7 +449,7 @@ def tearDownClass(self):
self.taskdb.engine.execute('DROP DATABASE pyspider_test_taskdb')


@unittest.skipIf(os.environ.get('IGNORE_MYSQL'), 'no mysql server for test.')
@unittest.skipIf(os.environ.get('IGNORE_MYSQL') or os.environ.get('IGNORE_ALL'), 'no mysql server for test.')
class TestSQLAlchemyMySQLProjectDB(ProjectDBCase, unittest.TestCase):

@classmethod
Expand All @@ -463,7 +463,7 @@ def tearDownClass(self):
self.projectdb.engine.execute('DROP DATABASE pyspider_test_projectdb')


@unittest.skipIf(os.environ.get('IGNORE_MYSQL'), 'no mysql server for test.')
@unittest.skipIf(os.environ.get('IGNORE_MYSQL') or os.environ.get('IGNORE_ALL'), 'no mysql server for test.')
class TestSQLAlchemyMySQLResultDB(ResultDBCase, unittest.TestCase):

@classmethod
Expand Down Expand Up @@ -516,7 +516,7 @@ def tearDownClass(self):
del self.resultdb


@unittest.skipIf(os.environ.get('IGNORE_POSTGRESQL'), 'no postgresql server for test.')
@unittest.skipIf(os.environ.get('IGNORE_POSTGRESQL') or os.environ.get('IGNORE_ALL'), 'no postgresql server for test.')
class TestPGTaskDB(TaskDBCase, unittest.TestCase):

@classmethod
Expand All @@ -532,7 +532,7 @@ def tearDownClass(self):
self.taskdb.drop(project)


@unittest.skipIf(os.environ.get('IGNORE_POSTGRESQL'), 'no postgresql server for test.')
@unittest.skipIf(os.environ.get('IGNORE_POSTGRESQL') or os.environ.get('IGNORE_ALL'), 'no postgresql server for test.')
class TestPGProjectDB(ProjectDBCase, unittest.TestCase):

@classmethod
Expand All @@ -548,7 +548,7 @@ def tearDownClass(self):
self.projectdb.drop(project['name'])


@unittest.skipIf(os.environ.get('IGNORE_POSTGRESQL'), 'no postgresql server for test.')
@unittest.skipIf(os.environ.get('IGNORE_POSTGRESQL') or os.environ.get('IGNORE_ALL'), 'no postgresql server for test.')
class TestPGResultDB(ResultDBCase, unittest.TestCase):

@classmethod
Expand All @@ -564,7 +564,7 @@ def tearDownClass(self):
self.resultdb.drop(project)


@unittest.skipIf(os.environ.get('IGNORE_REDIS'), 'no redis server for test.')
@unittest.skipIf(os.environ.get('IGNORE_REDIS') or os.environ.get('IGNORE_ALL'), 'no redis server for test.')
class TestRedisTaskDB(TaskDBCase, unittest.TestCase):

@classmethod
Expand All @@ -578,7 +578,7 @@ def tearDownClass(self):
self.taskdb.drop(project)


@unittest.skipIf(os.environ.get('IGNORE_ELASTICSEARCH'), 'no elasticsearch server for test.')
@unittest.skipIf(os.environ.get('IGNORE_ELASTICSEARCH') or os.environ.get('IGNORE_ALL'), 'no elasticsearch server for test.')
class TestESProjectDB(ProjectDBCase, unittest.TestCase):

@classmethod
Expand All @@ -592,7 +592,7 @@ def tearDownClass(self):
self.projectdb.es.indices.delete(index='test_pyspider', ignore=[400, 404])


@unittest.skipIf(os.environ.get('IGNORE_ELASTICSEARCH'), 'no elasticsearch server for test.')
@unittest.skipIf(os.environ.get('IGNORE_ELASTICSEARCH') or os.environ.get('IGNORE_ALL'), 'no elasticsearch server for test.')
class TestESResultDB(ResultDBCase, unittest.TestCase):

@classmethod
Expand Down Expand Up @@ -629,7 +629,7 @@ def test_z20_update_projects(self):
self.assertIn('drop_project2', self.resultdb.projects)
self.assertNotIn('drop_project3', self.resultdb.projects)

@unittest.skipIf(os.environ.get('IGNORE_ELASTICSEARCH'), 'no elasticsearch server for test.')
@unittest.skipIf(os.environ.get('IGNORE_ELASTICSEARCH') or os.environ.get('IGNORE_ALL'), 'no elasticsearch server for test.')
class TestESTaskDB(TaskDBCase, unittest.TestCase):

@classmethod
Expand Down
19 changes: 10 additions & 9 deletions tests/test_message_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,9 @@ def get(q):
for i in range(100):
q.get()

utils.run_in_thread(put, self.q3)
t = utils.run_in_thread(put, self.q3)
get(self.q3)
t.join()


class BuiltinQueue(TestMessageQueue, unittest.TestCase):
Expand All @@ -72,7 +73,7 @@ def setUpClass(self):


@unittest.skipIf(six.PY3, 'pika not suport python 3')
@unittest.skipIf(os.environ.get('IGNORE_RABBITMQ'), 'no rabbitmq server for test.')
@unittest.skipIf(os.environ.get('IGNORE_RABBITMQ') or os.environ.get('IGNORE_ALL'), 'no rabbitmq server for test.')
class TestPikaRabbitMQ(TestMessageQueue, unittest.TestCase):

@classmethod
Expand All @@ -95,7 +96,7 @@ def tearDownClass(self):
del self.q2
del self.q3

@unittest.skipIf(os.environ.get('IGNORE_RABBITMQ'), 'no rabbitmq server for test.')
@unittest.skipIf(os.environ.get('IGNORE_RABBITMQ') or os.environ.get('IGNORE_ALL'), 'no rabbitmq server for test.')
class TestAmqpRabbitMQ(TestMessageQueue, unittest.TestCase):

@classmethod
Expand Down Expand Up @@ -123,7 +124,7 @@ def tearDownClass(self):

#@unittest.skipIf(True, "beanstalk queue can't pass the test currently")
@unittest.skipIf(six.PY3, 'beanstalkc not suport python 3')
@unittest.skipIf(os.environ.get('IGNORE_BEANSTALK'), 'no beanstalk server for test.')
@unittest.skipIf(os.environ.get('IGNORE_BEANSTALK') or os.environ.get('IGNORE_ALL'), 'no beanstalk server for test.')
class TestBeansTalkQueue(TestMessageQueue, unittest.TestCase):

@classmethod
Expand Down Expand Up @@ -152,7 +153,7 @@ def tearDownClass(self):
while not self.q3.empty():
self.q3.get()

@unittest.skipIf(os.environ.get('IGNORE_REDIS'), 'no redis server for test.')
@unittest.skipIf(os.environ.get('IGNORE_REDIS') or os.environ.get('IGNORE_ALL'), 'no redis server for test.')
class TestRedisQueue(TestMessageQueue, unittest.TestCase):

@classmethod
Expand Down Expand Up @@ -210,20 +211,20 @@ def tearDownClass(self):
self.q3.delete()

@unittest.skip('test cannot pass, get is buffered')
@unittest.skipIf(os.environ.get('IGNORE_RABBITMQ'), 'no rabbitmq server for test.')
@unittest.skipIf(os.environ.get('IGNORE_RABBITMQ') or os.environ.get('IGNORE_ALL'), 'no rabbitmq server for test.')
class TestKombuAmpqQueue(TestKombuQueue):
kombu_url = 'kombu+amqp://'

@unittest.skip('test cannot pass, put is buffered')
@unittest.skipIf(os.environ.get('IGNORE_REDIS'), 'no redis server for test.')
@unittest.skipIf(os.environ.get('IGNORE_REDIS') or os.environ.get('IGNORE_ALL'), 'no redis server for test.')
class TestKombuRedisQueue(TestKombuQueue):
kombu_url = 'kombu+redis://'

@unittest.skip('test cannot pass, get is buffered')
@unittest.skipIf(os.environ.get('IGNORE_BEANSTALK'), 'no beanstalk server for test.')
@unittest.skipIf(os.environ.get('IGNORE_BEANSTALK') or os.environ.get('IGNORE_ALL'), 'no beanstalk server for test.')
class TestKombuBeanstalkQueue(TestKombuQueue):
kombu_url = 'kombu+beanstalk://'

@unittest.skipIf(os.environ.get('IGNORE_MONGODB'), 'no rabbitmq server for test.')
@unittest.skipIf(os.environ.get('IGNORE_MONGODB') or os.environ.get('IGNORE_ALL'), 'no rabbitmq server for test.')
class TestKombuMongoDBQueue(TestKombuQueue):
kombu_url = 'kombu+mongodb://'
12 changes: 7 additions & 5 deletions tests/test_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ def test_40_cli_env(self):
finally:
del os.environ['RESULTDB']

@unittest.skipIf(os.environ.get('IGNORE_RABBITMQ'), 'no rabbitmq server for test.')
@unittest.skipIf(os.environ.get('IGNORE_RABBITMQ') or os.environ.get('IGNORE_ALL'), 'no rabbitmq server for test.')
def test_50_docker_rabbitmq(self):
try:
os.environ['RABBITMQ_NAME'] = 'rabbitmq'
Expand All @@ -116,7 +116,7 @@ def test_50_docker_rabbitmq(self):
del os.environ['RABBITMQ_PORT_5672_TCP_ADDR']
del os.environ['RABBITMQ_PORT_5672_TCP_PORT']

@unittest.skipIf(os.environ.get('IGNORE_MONGODB'), 'no mongodb server for test.')
@unittest.skipIf(os.environ.get('IGNORE_MONGODB') or os.environ.get('IGNORE_ALL'), 'no mongodb server for test.')
def test_60_docker_mongodb(self):
try:
os.environ['MONGODB_NAME'] = 'mongodb'
Expand All @@ -134,7 +134,7 @@ def test_60_docker_mongodb(self):
del os.environ['MONGODB_PORT_27017_TCP_PORT']

@unittest.skip('noly available in docker')
@unittest.skipIf(os.environ.get('IGNORE_MYSQL'), 'no mysql server for test.')
@unittest.skipIf(os.environ.get('IGNORE_MYSQL') or os.environ.get('IGNORE_ALL'), 'no mysql server for test.')
def test_70_docker_mysql(self):
try:
os.environ['MYSQL_NAME'] = 'mysql'
Expand Down Expand Up @@ -310,15 +310,17 @@ def setUpClass(self):

ctx = run.scheduler.make_context('scheduler', [], self.ctx)
scheduler = run.scheduler.invoke(ctx)
utils.run_in_thread(scheduler.xmlrpc_run)
utils.run_in_thread(scheduler.run)
self.xmlrpc_thread = utils.run_in_thread(scheduler.xmlrpc_run)
self.scheduler_thread = utils.run_in_thread(scheduler.run)

time.sleep(1)

@classmethod
def tearDownClass(self):
for each in self.ctx.obj.instances:
each.quit()
self.xmlrpc_thread.join()
self.scheduler_thread.join()
time.sleep(1)

shutil.rmtree('./data/tests', ignore_errors=True)
Expand Down
3 changes: 2 additions & 1 deletion tests/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ def run_scheduler():
scheduler.DELETE_TIME = 0
scheduler.DEFAULT_RETRY_DELAY = {'': 5}
scheduler._last_tick = int(time.time()) # not dispatch cronjob
run_in_thread(scheduler.xmlrpc_run, port=self.scheduler_xmlrpc_port)
self.xmlrpc_thread = run_in_thread(scheduler.xmlrpc_run, port=self.scheduler_xmlrpc_port)
scheduler.run()

self.process = run_in_thread(run_scheduler)
Expand All @@ -152,6 +152,7 @@ def tearDownClass(self):
if self.process.is_alive():
self.rpc._quit()
self.process.join(5)
self.xmlrpc_thread.join()
assert not self.process.is_alive()
shutil.rmtree('./data/tests', ignore_errors=True)
time.sleep(1)
Expand Down
3 changes: 2 additions & 1 deletion tests/test_webdav.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def setUpClass(self):
'--password', '4321',
], self.ctx)
self.app = run.webui.invoke(ctx)
utils.run_in_thread(self.app.run)
self.app_thread = utils.run_in_thread(self.app.run)
time.sleep(5)

self.webdav = easywebdav.connect('localhost', port=5000, path='dav')
Expand All @@ -49,6 +49,7 @@ def setUpClass(self):
def tearDownClass(self):
for each in self.ctx.obj.instances:
each.quit()
self.app_thread.join()
time.sleep(1)

shutil.rmtree('./data/tests', ignore_errors=True)
Expand Down
17 changes: 11 additions & 6 deletions tests/test_webui.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,26 +36,28 @@ def setUpClass(self):
], None, obj=ObjectDict(testing_mode=True))
self.ctx = run.cli.invoke(ctx)

self.threads = []

ctx = run.scheduler.make_context('scheduler', [], self.ctx)
scheduler = run.scheduler.invoke(ctx)
run_in_thread(scheduler.xmlrpc_run)
run_in_thread(scheduler.run)
self.threads.append(run_in_thread(scheduler.xmlrpc_run))
self.threads.append(run_in_thread(scheduler.run))

ctx = run.fetcher.make_context('fetcher', [
'--xmlrpc',
'--xmlrpc-port', '24444',
], self.ctx)
fetcher = run.fetcher.invoke(ctx)
run_in_thread(fetcher.xmlrpc_run)
run_in_thread(fetcher.run)
self.threads.append(run_in_thread(fetcher.xmlrpc_run))
self.threads.append(run_in_thread(fetcher.run))

ctx = run.processor.make_context('processor', [], self.ctx)
processor = run.processor.invoke(ctx)
run_in_thread(processor.run)
self.threads.append(run_in_thread(processor.run))

ctx = run.result_worker.make_context('result_worker', [], self.ctx)
result_worker = run.result_worker.invoke(ctx)
run_in_thread(result_worker.run)
self.threads.append(run_in_thread(result_worker.run))

ctx = run.webui.make_context('webui', [
'--scheduler-rpc', 'http://localhost:23333/'
Expand All @@ -73,6 +75,9 @@ def tearDownClass(self):
each.quit()
time.sleep(1)

for thread in self.threads:
thread.join()

self.httpbin_thread.terminate()
self.httpbin_thread.join()

Expand Down
10 changes: 6 additions & 4 deletions tests/test_xmlrpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,17 @@ def test_3(self, obj):
application.register_function(test_1)

container = tornado.wsgi.WSGIContainer(application)
http_server = tornado.httpserver.HTTPServer(container)
self.io_loop = tornado.ioloop.IOLoop.current()
http_server = tornado.httpserver.HTTPServer(container, io_loop=self.io_loop.current())
http_server.listen(3423)
utils.run_in_thread(tornado.ioloop.IOLoop.current().start)
self.thread = utils.run_in_thread(self.io_loop.start)

@classmethod
def tearDownClass(self):
tornado.ioloop.IOLoop.current().stop()
self.io_loop.add_callback(self.io_loop.stop)
self.thread.join()

def test_xmlrpc_server(self, uri='http://localhost:3423'):
def test_xmlrpc_server(self, uri='http://127.0.0.1:3423'):
from six.moves.xmlrpc_client import ServerProxy

client = ServerProxy(uri)
Expand Down
Loading

0 comments on commit 5399c8d

Please sign in to comment.