-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrunDataCleaning.py
84 lines (61 loc) · 2.46 KB
/
runDataCleaning.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
# encoding: UTF-8
import json
from datetime import datetime, timedelta, time
from pymongo import MongoClient
from vnpy.trader.app.ctaStrategy.ctaBase import MINUTE_DB_NAME, TICK_DB_NAME
# 这里以商品期货为例
MORNING_START = time(9, 0)
MORNING_REST = time(10, 15)
MORNING_RESTART = time(10, 30)
MORNING_END = time(11, 30)
AFTERNOON_START = time(13, 30)
AFTERNOON_END = time(15, 0)
NIGHT_START = time(21, 0)
NIGHT_END = time(2, 30)
#----------------------------------------------------------------------
def cleanData(dbName, collectionName, start):
"""清洗数据"""
print u'\n清洗数据库:%s, 集合:%s, 起始日:%s' %(dbName, collectionName, start)
mc = MongoClient('localhost', 27017) # 创建MongoClient
cl = mc[dbName][collectionName] # 获取数据集合
d = {'datetime':{'$gte':start}} # 只过滤从start开始的数据
cx = cl.find(d) # 获取数据指针
# 遍历数据
for data in cx:
# 获取时间戳对象
dt = data['datetime'].time()
# 默认需要清洗
cleanRequired = True
# 如果在交易事件内,则为有效数据,无需清洗
if ((MORNING_START <= dt < MORNING_REST) or
(MORNING_RESTART <= dt < MORNING_END) or
(AFTERNOON_START <= dt < AFTERNOON_END) or
(dt >= NIGHT_START) or
(dt < NIGHT_END)):
cleanRequired = False
# 如果需要清洗
if cleanRequired:
print u'删除无效数据,时间戳:%s' %data['datetime']
cl.delete_one(data)
print u'清洗完成,数据库:%s, 集合:%s' %(dbName, collectionName)
#----------------------------------------------------------------------
def runDataCleaning():
"""运行数据清洗"""
print u'开始数据清洗工作'
# 加载配置
setting = {}
with open("DR_setting.json") as f:
setting = json.load(f)
# 遍历执行清洗
today = datetime.now()
start = today - timedelta(10) # 清洗过去10天数据
start.replace(hour=0, minute=0, second=0, microsecond=0)
for l in setting['tick']:
symbol = l[0]
cleanData(TICK_DB_NAME, symbol, start)
for l in setting['bar']:
symbol = l[0]
cleanData(MINUTE_DB_NAME, symbol, start)
print u'数据清洗工作完成'
if __name__ == '__main__':
runDataCleaning()