forked from V-I-C-T-O-R/12306
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathNetUtils.py
101 lines (86 loc) · 3.61 KB
/
NetUtils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
import collections
import random
import time
import requests
from conf.urls_conf import queryUrls
from define.UserAgent import USER_AGENT
def sendLogic(func):
def wrapper(*args, **kw):
for count in range(10):
response = func(*args, **kw)
if response:
return response
else:
time.sleep(0.1)
return None
return wrapper
class EasyHttp(object):
__session = requests.Session()
@staticmethod
def updateHeaders(headers):
EasyHttp.__session.headers.update(headers)
@staticmethod
def resetHeaders():
EasyHttp.__session.headers.clear()
EasyHttp.__session.headers.update({
'Host': r'kyfw.12306.cn',
'Referer': 'https://kyfw.12306.cn/otn/login/init',
'User-Agent': random.choice(USER_AGENT),
"Accept-Encoding": "gzip, deflate, br",
"X-Requested-With": "XMLHttpRequest"
})
@staticmethod
def setCookies(**kwargs):
for k, v in kwargs.items():
EasyHttp.__session.cookies.set(k, v)
@staticmethod
def removeCookies(key=None):
EasyHttp.__session.cookies.set(key, None) if key else EasyHttp.__session.cookies.clear()
@staticmethod
@sendLogic
def send(urlInfo, params=None, data=None, **kwargs):
EasyHttp.resetHeaders()
if 'headers' in urlInfo and urlInfo['headers']:
EasyHttp.updateHeaders(urlInfo['headers'])
try:
response = EasyHttp.__session.request(method=urlInfo['method'],
url=urlInfo['url'],
params=params,
data=data,
timeout=10,
allow_redirects=False,
**kwargs)
if response.status_code == requests.codes.ok:
if 'response' in urlInfo:
if urlInfo['response'] == 'binary':
return response.content
if urlInfo['response'] == 'html':
response.encoding = response.apparent_encoding
return response.text
return response.json()
except:
pass
return None
if __name__ == '__main__':
dic = collections.OrderedDict()
dic['leftTicketDTO.train_date'] = '2019-01-01'
dic['leftTicketDTO.from_station'] = 'SHH'
dic['leftTicketDTO.to_station'] = 'GZQ'
dic['purpose_codes'] = 'ADULT'
# params = {
# r'leftTicketDTO.train_date': '2019-01-01',
# r'leftTicketDTO.from_station': 'SHH',
# r'leftTicketDTO.to_station': 'GZQ',
# r'purpose_codes': "ADULT"
# }
headers = {"X-Requested-With": "XMLHttpRequest",
"User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/65.0.3325.162 Safari/537.36",
"Accept-Encoding": "gzip, deflate, br",
"Content-Type": "application/json;charset=utf-8"}
url = queryUrls['query']['url'] + "?leftTicketDTO.train_date=" + '2019-01-01' + "&leftTicketDTO.from_station=" + 'SHH' + "&leftTicketDTO.to_station=" + 'GZQ' + "&purpose_codes=ADULT"
result = requests.get(url,headers=headers, timeout=15)
print("---------------------------------------------------------------------")
print(result.content)
print("---------------------------------------------------------------------")
print(EasyHttp.send(queryUrls['query'],params=dic))
pass