Skip to content

Commit

Permalink
添加了patent_status插入到数据库的功能,目前并未测试
Browse files Browse the repository at this point in the history
  • Loading branch information
sky94520 committed Aug 19, 2020
1 parent 115c4bd commit d23e6f4
Show file tree
Hide file tree
Showing 5 changed files with 155 additions and 225 deletions.
19 changes: 19 additions & 0 deletions CNKIPaSearch/pipelines.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,25 @@ def close_spider(self, spider):
self.db_pool.close()


class MySQLPatentStatusPipeline(object):

def __init__(self):
self.db_pool = adbapi.ConnectionPool('pymysql', cursorclass=cursors.DictCursor, **MYSQL_CONFIG)

def process_item(self, item, spdier):
copy = dict(item)
query = self.db_pool.runInteraction(import_patent, copy)
query.addErrback(self.handle_error)
return DropItem()

def handle_error(self, failure):
"""插入数据库失败回调函数"""
logger.error(failure)

def close_spider(self, spider):
self.db_pool.close()


class SaveNumberCsvPipeline(object):
@classmethod
def from_crawler(cls, crawler):
Expand Down
86 changes: 74 additions & 12 deletions CNKIPaSearch/utils/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
import logging
from datetime import datetime
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker


def date2str(date=None, year=None):
Expand All @@ -16,14 +14,78 @@ def str2date(text):
return date


# 创建对象的基类:
Base = declarative_base()
def _select(cursor, sql, first, *args):
"""
select语句
:param sql: SQL语句 内部变量使用?
:param first: 是否只获取一个
:param args: SQL语句中要使用的变量
:return: 返回查询的结果
"""
logging.info('SQL: %s %s' % (sql, args if len(args) > 0 else ""))
sql = sql.replace('?', '%s')
# 利用本身的 execute 函数的特性,传入两个参数:sql语句与tuple类型的参数,以避免sql注入
cursor.execute(sql, args)
if first:
result = cursor.fetchone()
return result
else:
results = cursor.fetchall()
return results


# 返回数据库会话
def load_session(uri):
# 初始化数据库连接:
engine = create_engine(uri)
Session = sessionmaker(bind=engine)
session = Session()
return session
def select_one(cursor, sql, *args):
return _select(cursor, sql, True, *args)


def select(cursor, sql, *args):
return _select(cursor, sql, False, *args)


def _insert(cursor, sql, insertMany, *args):
"""
insert语句
:param sql: SQL语句 内部变量使用?
:param insertMany: 是否要插入多行
:param args: SQL语句中要使用的变量
:return: 返回插入的结果 插入失败则返回-1
"""
logging.info('SQL: %s %s' % (sql, args if len(args) > 0 else ""))
sql = sql.replace('?', '%s')
# 利用本身的 execute 函数的特性,传入两个参数:sql语句与tuple类型的参数,以避免sql注入
if insertMany:
# 插入多行
cursor.executemany(sql, args)
else:
cursor.execute(sql, args)
# 返回最后插入行的主键ID
return cursor.lastrowid


def insert(cursor, sql, *args):
return _insert(cursor, sql, False, *args)


def insert_many(cursor, sql, *args):
return _insert(cursor, sql, True, *args)


def _delete(cursor, sql, *args):
"""
delete语句
:param cursor: 游标
:param sql: SQL语句 内部变量使用?
:param args: SQL语句中要使用的变量
:return: 返回
"""
logging.info('SQL: %s %s' % (sql, args if len(args) > 0 else ""))
sql = sql.replace('?', '%s')
# 利用本身的 execute 函数的特性,传入两个参数:sql语句与tuple类型的参数,以避免sql注入
cursor.execute(sql, args)
# 获取影响的行
row_count = cursor.rowcount
return row_count


def delete(cursor, sql, *args):
return _delete(cursor, sql, *args)
95 changes: 39 additions & 56 deletions CNKIPaSearch/utils/batch.py
Original file line number Diff line number Diff line change
@@ -1,60 +1,5 @@
import logging


def _select(cursor, sql, first, *args):
"""
select语句
:param sql: SQL语句 内部变量使用?
:param first: 是否只获取一个
:param args: SQL语句中要使用的变量
:return: 返回查询的结果
"""
logging.info('SQL: %s %s' % (sql, args if len(args) > 0 else ""))
sql = sql.replace('?', '%s')
# 利用本身的 execute 函数的特性,传入两个参数:sql语句与tuple类型的参数,以避免sql注入
cursor.execute(sql, args)
if first:
result = cursor.fetchone()
return result
else:
results = cursor.fetchall()
return results


def select_one(cursor, sql, *args):
return _select(cursor, sql, True, *args)


def select(cursor, sql, *args):
return _select(cursor, sql, False, *args)


def _insert(cursor, sql, insertMany, *args):
"""
insert语句
:param sql: SQL语句 内部变量使用?
:param insertMany: 是否要插入多行
:param args: SQL语句中要使用的变量
:return: 返回插入的结果 插入失败则返回-1
"""
logging.info('SQL: %s %s' % (sql, args if len(args) > 0 else ""))
sql = sql.replace('?', '%s')
# 利用本身的 execute 函数的特性,传入两个参数:sql语句与tuple类型的参数,以避免sql注入
if insertMany:
# 插入多行
cursor.executemany(sql, args)
else:
cursor.execute(sql, args)
# 返回最后插入行的主键ID
return cursor.lastrowid


def insert(cursor, sql, *args):
return _insert(cursor, sql, False, *args)


def insert_many(cursor, sql, *args):
return _insert(cursor, sql, True, *args)
from . import select_one, select, insert_many, insert, delete


def import_patent(cursor, item, success_callback):
Expand Down Expand Up @@ -108,3 +53,41 @@ def import_patent(cursor, item, success_callback):
insert(cursor, insert_text_sql, summary, sovereignty, patent_id)

return success_callback(item)


def import_patent_status(cursor, item):
"""
插入专利的“状态”到数据库中
:param cursor: 游标,通过调用该方法增删改查数据
:param item: {"publication_number": str, "array": [{"date": date, "status": "", "information": ""}]}
:return:
"""
publication_number = item['publication_number']
sql = """select id from patent where publication_number=?"""
patent = select_one(cursor, sql, publication_number)
# 不存在对应的patent
if patent is None:
logging.warning('patent: publication_number:{} not found in database'.format(publication_number))
return
# 判断数据库中的状态数量
sql = """select count(*) count from patent_status WHERE patent_id=?"""
result = select_one(cursor, sql, patent['id'])
if result['count'] >= len(item['array']):
logging.warning('patent: publication_number:{}\'s status length {} >= {}'.format(publication_number, result['count'], len(item['array'])))
return
# 删除原先的数据
elif result['count'] != 0:
sql = """delete from patent_status where patent_id=?"""
rowcount = delete(cursor, sql, patent['id'])
logging.info('delete patent_status %d, effected row count %d' % (patent['id'], rowcount))
# 把{status, date, information, patent_id} 存入到buffer
buffer = []
for datum in item['array']:
status = datum['status']
date = datum['date']
information = datum['information']
buffer.append((status, date, information, patent['id']))
# 插入到数据库中
insert_sql = """insert into patent_status(status,publication_date, information, patent_id) values(?,?,?,?)"""
insert_many(cursor, insert_sql, *buffer)
buffer.clear()
156 changes: 0 additions & 156 deletions CNKIPaSearch/utils/models.py

This file was deleted.

Loading

0 comments on commit d23e6f4

Please sign in to comment.