Skip to content

Commit

Permalink
fix phantomjs keep-alive bug
Browse files Browse the repository at this point in the history
add processed new task cache when check new tasks. not process new
request twice for perform.
update dockerfile set pyspider as volumn for debug
fix bug in tornado_fetcher when using async mode
debugger load fetch options from old task
  • Loading branch information
binux committed Oct 31, 2014
1 parent 75c99c8 commit 1f85054
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 11 deletions.
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ ADD ./ /opt/pyspider
WORKDIR /opt/pyspider
RUN IGNORE_MYSQL=1 IGNORE_RABBITMQ=1 IGNORE_MONGODB=1 ./runtest.py

VOLUME ["/opt/pyspider/data"]
VOLUME ["/opt/pyspider"]

ENTRYPOINT ["python", "run.py"]

Expand Down
20 changes: 14 additions & 6 deletions fetcher/phantomjs_fetcher.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ if (system.args.length !== 2) {
port = system.args[1];
server = require('webserver').create();

service = server.listen(port, function (request, response) {
service = server.listen(port, {
'keepAlive': true
}, function (request, response) {
//console.log(JSON.stringify(request, null, 4));
// check method
if (request.method == 'GET') {
Expand Down Expand Up @@ -46,6 +48,7 @@ if (system.args.length !== 2) {

// add callbacks
var first_response = null,
finished = false,
page_loaded = false,
start_time = Date.now(),
end_time = null;
Expand Down Expand Up @@ -98,14 +101,13 @@ if (system.args.length !== 2) {

// make response
function make_result(page) {
if (!!!end_time) {
if (!!!end_time || finished) {
return;
}
if (end_time > Date.now()) {
setTimeout(make_result, Date.now() - end_time, page);
return;
}
end_time = null;

var cookies = {};
page.cookies.forEach(function(e) {
Expand All @@ -119,19 +121,25 @@ if (system.args.length !== 2) {
status_code: first_response.status || 599,
url: page.url,
cookies: cookies,
time: (end_time - start_time) / 1000,
time: (Date.now() - start_time) / 1000,
save: fetch.save
}
console.log("["+result.status_code+"] "+result.orig_url+" "+result.time)

var body = JSON.stringify(result, null, 2);
var body = unescape(encodeURIComponent(JSON.stringify(result, null, 2)));
response.statusCode = 200;
response.headers = {
'Cache': 'no-cache',
'Content-Type': 'application/json',
'Connection': 'Keep-Alive',
'Keep-Alive': 'timeout=5, max=100',
'Content-Length': body.length
};
response.setEncoding("binary");
response.write(body);
response.closeGracefully();
response.close();
finished = true;
page.close();
}
});

Expand Down
9 changes: 6 additions & 3 deletions fetcher/tornado_fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ def handle_response(response):
}
else:
try:
return task, json.loads(response.body)
result = json.loads(response.body)
except Exception as e:
result = {
'status_code': 599,
Expand All @@ -302,8 +302,11 @@ def handle_response(response):
'orig_url': url,
'url': url,
}
logger.exception("[599] %s, %r %.2fs",
url, result['content'], result['time'])
if result.get('status_code', 200):
logger.info("[%d] %s %.2fs", result['status_code'], url, result['time'])
else:
logger.exception("[%d] %s, %r %.2fs", result['status_code'],
url, result['content'], result['time'])
callback('phantomjs', task, result)
self.on_result('phantomjs', task, result)
return task, result
Expand Down
7 changes: 7 additions & 0 deletions scheduler/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ def _check_task_done(self):
def _check_request(self):
cnt = 0
try:
processed_task_cache = set()
while cnt < self.LOOP_LIMIT:
task = self.newtask_queue.get_nowait()
if not self.task_verify(task):
Expand All @@ -206,12 +207,18 @@ def _check_request(self):
if not task.get('schedule', {}).get('force_update', False):
logger.debug('ignore newtask %(project)s:%(taskid)s %(url)s', task)
continue
cache_key = "%(project)s:%(project)s" % task
if cache_key in processed_task_cache:
logger.debug('processed newtask %(project)s:%(taskid)s %(url)s', task)
continue

oldtask = self.taskdb.get_task(task['project'], task['taskid'],
fields=self.merge_task_fields)
if oldtask:
task = self.on_old_request(task, oldtask)
else:
task = self.on_new_request(task)
processed_task_cache.add(cache_key)
cnt += 1
except Queue.Empty:
pass
Expand Down
2 changes: 1 addition & 1 deletion webui/debug.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def debug(project):
taskid = request.args.get('taskid')
if taskid:
taskdb = app.config['taskdb']
task = taskdb.get_task(project, taskid, ['taskid', 'project', 'url', 'process'])
task = taskdb.get_task(project, taskid, ['taskid', 'project', 'url', 'fetch', 'process'])
else:
task = default_task

Expand Down

0 comments on commit 1f85054

Please sign in to comment.