forked from stop-covid19-hyogo/covid19-scraping
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathutil.py
148 lines (126 loc) · 4.73 KB
/
util.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
import requests
import openpyxl
import codecs
import os
import shutil
import jaconv
import re
import time
from io import BytesIO
from bs4 import BeautifulSoup
from json import dumps
from datetime import datetime, timezone, timedelta
from pdfminer.high_level import extract_text
from typing import Union, Dict, List
base_url = "https://web.pref.hyogo.lg.jp"
jst = timezone(timedelta(hours=9), 'JST')
SUMMARY_INIT = {
'attr': '検査実施人数',
'value': 0,
'children': [
{
'attr': '陽性患者数',
'value': 0,
'children': [
{
'attr': '入院中',
'value': 0,
'children': [
{
'attr': '軽症・中等症',
'value': 0,
},
{
'attr': '重症',
'value': 0,
}
]
},
{
'attr': '死亡',
'value': 0,
},
{
'attr': '退院',
'value': 0,
}
]
}
],
'last_update': ''
}
def print_log(type: str, message: str) -> None:
print(f"[{datetime.now().astimezone(jst).strftime('%Y-%m-%d %H:%M:%S+09:00')}][covid19-scraping:{type}]: {message}")
def get_file(path: str, file_type: str, save_file: bool = False) \
-> Union[openpyxl.workbook.workbook.Workbook, List[str]]:
print_log("file", "get html file...")
html_doc = ""
failed_count = 0
while not html_doc:
try:
html_doc = requests.get(base_url + path).text
except Exception:
if failed_count >= 5:
raise Exception(f"Failed get html file from \"{base_url + path}\"!")
print_log("file", f"Failed get html file from \"{base_url + path}\". retrying...")
failed_count += 1
time.sleep(5)
soup = BeautifulSoup(html_doc, 'html.parser')
real_page_tags = soup.find_all("a")
file_url = ""
for tag in real_page_tags:
if tag.get("href")[-len(file_type):] == file_type:
file_url = base_url + tag.get("href")
break
assert file_url, f"Can't get {file_type} file!"
return requests_file(file_url, file_type, save_file)
def requests_file(file_url: str, file_type: str, save_file: bool = False) \
-> Union[openpyxl.workbook.workbook.Workbook, List[str]]:
failed_count = 0
if save_file or file_type == "pdf":
status_code = 404
while not status_code == 200:
try:
res = requests.get(file_url, stream=True)
status_code = res.status_code
except Exception:
if failed_count >= 5:
raise Exception(f"Failed get {file_type} file from \"{file_url}\"!")
print_log("file", f"Failed get {file_type} file from \"{file_url}\". retrying...")
failed_count += 1
time.sleep(5)
filename = './data/' + os.path.basename(file_url)
with open(filename, 'wb') as f:
res.raw.decode_content = True
shutil.copyfileobj(res.raw, f)
if file_type == "pdf":
return extract_text(filename).split('\n')
elif file_type == "xlsx":
return openpyxl.load_workbook(filename)
else:
raise Exception(f"Not support file type: {file_type}")
else:
file_bin = b""
while failed_count < 5 and not file_bin:
try:
file_bin = requests.get(file_url).content
except Exception:
if failed_count >= 5:
raise Exception(f"Failed get {file_type} file from \"{file_url}\"!")
print_log("file", f"Failed get {file_type} file from \"{file_url}\". retrying...")
failed_count += 1
time.sleep(5)
if file_type == "xlsx":
return openpyxl.load_workbook(BytesIO(file_bin))
else:
raise Exception(f"Not support file type: {file_type}")
def excel_date(num) -> datetime:
return datetime(1899, 12, 30, tzinfo=jst) + timedelta(days=num)
def dumps_json(file_name: str, json_data: Dict) -> None:
with codecs.open("./data/" + file_name, "w", "utf-8") as f:
f.write(dumps(json_data, ensure_ascii=False, indent=4, separators=(',', ': ')))
def get_weekday(day: int) -> str:
weekday_list = ["月", "火", "水", "木", "金", "土", "日"]
return weekday_list[day]
def get_numbers_in_text(text: str) -> List[int]:
return list(map(int, re.findall('[0-9]+', jaconv.z2h(text, digit=True))))