forked from binux/pyspider
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtest_result_worker.py
89 lines (76 loc) · 2.72 KB
/
test_result_worker.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
# vim: set et sw=4 ts=4 sts=4 ff=unix fenc=utf8:
# Author: Binux<[email protected]>
# http://binux.me
# Created on 2014-11-11 20:52:53
import os
import time
import unittest2 as unittest
import logging.config
logging.config.fileConfig("pyspider/logging.conf")
import shutil
from pyspider.database.sqlite import resultdb
from pyspider.result.result_worker import ResultWorker
from pyspider.libs.multiprocessing_queue import Queue
from pyspider.libs.utils import run_in_thread
class TestProcessor(unittest.TestCase):
resultdb_path = './data/tests/result.db'
@classmethod
def setUpClass(self):
shutil.rmtree('./data/tests/', ignore_errors=True)
os.makedirs('./data/tests/')
def get_resultdb():
return resultdb.ResultDB(self.resultdb_path)
self.resultdb = get_resultdb()
self.inqueue = Queue(10)
def run_result_worker():
self.result_worker = ResultWorker(get_resultdb(), self.inqueue)
self.result_worker.run()
self.process = run_in_thread(run_result_worker)
time.sleep(1)
@classmethod
def tearDownClass(self):
if self.process.is_alive():
self.result_worker.quit()
self.process.join(2)
assert not self.process.is_alive()
shutil.rmtree('./data/tests/', ignore_errors=True)
def test_10_bad_result(self):
self.inqueue.put(({'project': 'test_project'}, {}))
self.resultdb._list_project()
self.assertEqual(len(self.resultdb.projects), 0)
self.assertEqual(self.resultdb.count('test_project'), 0)
def test_20_insert_result(self):
data = {
'a': 'b'
}
self.inqueue.put(({
'project': 'test_project',
'taskid': 'id1',
'url': 'url1'
}, data))
time.sleep(0.5)
self.resultdb._list_project()
self.assertEqual(len(self.resultdb.projects), 1)
self.assertEqual(self.resultdb.count('test_project'), 1)
result = self.resultdb.get('test_project', 'id1')
self.assertEqual(result['result'], data)
def test_30_overwrite(self):
self.inqueue.put(({
'project': 'test_project',
'taskid': 'id1',
'url': 'url1'
}, "abc"))
time.sleep(0.1)
result = self.resultdb.get('test_project', 'id1')
self.assertEqual(result['result'], "abc")
def test_40_insert_list(self):
self.inqueue.put(({
'project': 'test_project',
'taskid': 'id2',
'url': 'url1'
}, ['a', 'b']))
time.sleep(0.1)
result = self.resultdb.get('test_project', 'id2')
self.assertEqual(result['result'], ['a', 'b'])