Skip to content

Commit

Permalink
added celery functionality
Browse files Browse the repository at this point in the history
added CRUD base
  • Loading branch information
tikazyq committed Feb 11, 2019
1 parent 426fb19 commit 7e8531a
Show file tree
Hide file tree
Showing 16 changed files with 126 additions and 0 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
.idea/

# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
Expand Down
13 changes: 13 additions & 0 deletions app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from celery import Celery
from flask import Flask
from flask_restful import Api

# TODO: 用配置文件启动 http://www.pythondoc.com/flask/config.html
app = Flask(__name__)
app.config['DEBUG'] = True

# init flask api instance
api = Api(app)

# start flask app
app.run()
1 change: 1 addition & 0 deletions bin/start_flower.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
celery flower --broker=redis://localhost:6379/0 --backend=redis://localhost:6379/1
2 changes: 2 additions & 0 deletions config/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
MONGO_HOST = 'localhost'
MONGO_DATABASE = 'test'
11 changes: 11 additions & 0 deletions config/celery.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# BROKER_URL = 'redis://localhost:6379/0'
BROKER_URL = 'mongodb://localhost:27017/'
CELERY_RESULT_BACKEND = 'mongodb://localhost:27017/'
# CELERY_RESULT_BACKEND = 'redis://localhost:6379/1'
# CELERY_TASK_SERIALIZER = 'json'
# CELERY_RESULT_SERIALIZER = 'json'
# CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 # 任务过期时间
CELERY_MONGODB_BACKEND_SETTINGS = {
'database': 'crawlab_test',
'taskmeta_collection': 'tasks_celery',
}
6 changes: 6 additions & 0 deletions config/db.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# 数据库
MONGO_HOST = 'localhost'
MONGO_PORT = '27017'
# MONGO_USER = 'test'
# MONGO_PASS = 'test'
MONGO_DB = 'crawlab_test'
2 changes: 2 additions & 0 deletions config/flask.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
DEBUG = True

Empty file added db/__init__.py
Empty file.
31 changes: 31 additions & 0 deletions db/manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
from pymongo import MongoClient
from config.db import MONGO_HOST, MONGO_PORT, MONGO_DB


class DbManager(object):
def __init__(self):
self.mongo = MongoClient(host=MONGO_HOST, port=MONGO_PORT)
self.db = self.mongo[MONGO_DB]

# TODO: CRUD
def save(self, col_name: str, item, **kwargs):
col = self.db[col_name]
col.save(item, **kwargs)

def remove(self, col_name: str, cond: dict, **kwargs):
col = self.db[col_name]
col.remove(cond, **kwargs)

def update(self, col_name: str, cond: dict, values: dict, **kwargs):
col = self.db[col_name]
col.update(cond, {'$set': values}, **kwargs)

def list(self, col_name: str, cond: dict, skip: int, limit: int, **kwargs):
if kwargs.get('page') is not None:
try:
page = int(kwargs.get('page'))
skip = page * limit
except Exception as err:
pass
# TODO: list logic
# TODO: pagination
Empty file added logger/__init__.py
Empty file.
Empty file added model/__init__.py
Empty file.
4 changes: 4 additions & 0 deletions route.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from app import api
from api.spider import SpiderApi, SpiderExecutorApi

api.add_resource(SpiderExecutorApi, '/spider')
5 changes: 5 additions & 0 deletions routes/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from app import app


# api.add_resource(SpiderApi, '/spider')
# print(SpiderExecutorApi)
23 changes: 23 additions & 0 deletions routes/spider.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from celery.utils.log import get_logger
from flask_restful import reqparse, Resource
from tasks.spider import execute_spider

logger = get_logger('tasks')
parser = reqparse.RequestParser()
parser.add_argument('spider_name', type=str)


class SpiderApi(Resource):
pass


class SpiderExecutorApi(Resource):
def get(self):
args = parser.parse_args()
job = execute_spider.delay(args.spider_name)
return {
'id': job.id,
'status': job.status,
'spider_name': args.spider_name,
'result': job.get(timeout=5)
}
9 changes: 9 additions & 0 deletions tasks/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from celery import Celery

app = Celery(__name__)
app.config_from_object('config.celery')

import tasks.spider

if __name__ == '__main__':
app.start(argv=['tasks.spider', 'worker', '-P', 'eventlet', '-E', '-l', 'INFO'])
17 changes: 17 additions & 0 deletions tasks/spider.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import requests
from celery.utils.log import get_logger
from tasks import app

logger = get_logger(__name__)


@app.task
def execute_spider(spider_name: str):
logger.info('spider_name: %s' % spider_name)
return spider_name


@app.task
def get_baidu_html(keyword: str):
res = requests.get('http://www.baidu.com')
return res.content.decode('utf-8')

0 comments on commit 7e8531a

Please sign in to comment.