forked from jhao104/proxy_pool
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathvalidator.py
86 lines (66 loc) · 2.48 KB
/
validator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
# -*- coding: utf-8 -*-
"""
-------------------------------------------------
File Name: _validators
Description : 定义proxy验证方法
Author : JHao
date: 2021/5/25
-------------------------------------------------
Change Activity:
2023/03/10: 支持带用户认证的代理格式 username:password@ip:port
-------------------------------------------------
"""
__author__ = 'JHao'
import re
from requests import head
from util.six import withMetaclass
from util.singleton import Singleton
from handler.configHandler import ConfigHandler
conf = ConfigHandler()
HEADER = {'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64; rv:34.0) Gecko/20100101 Firefox/34.0',
'Accept': '*/*',
'Connection': 'keep-alive',
'Accept-Language': 'zh-CN,zh;q=0.8'}
IP_REGEX = re.compile(r"(.*:.*@)?\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}:\d{1,5}")
class ProxyValidator(withMetaclass(Singleton)):
pre_validator = []
http_validator = []
https_validator = []
@classmethod
def addPreValidator(cls, func):
cls.pre_validator.append(func)
return func
@classmethod
def addHttpValidator(cls, func):
cls.http_validator.append(func)
return func
@classmethod
def addHttpsValidator(cls, func):
cls.https_validator.append(func)
return func
@ProxyValidator.addPreValidator
def formatValidator(proxy):
"""检查代理格式"""
return True if IP_REGEX.fullmatch(proxy) else False
@ProxyValidator.addHttpValidator
def httpTimeOutValidator(proxy):
""" http检测超时 """
proxies = {"http": "http://{proxy}".format(proxy=proxy), "https": "https://{proxy}".format(proxy=proxy)}
try:
r = head(conf.httpUrl, headers=HEADER, proxies=proxies, timeout=conf.verifyTimeout)
return True if r.status_code == 200 else False
except Exception as e:
return False
@ProxyValidator.addHttpsValidator
def httpsTimeOutValidator(proxy):
"""https检测超时"""
proxies = {"http": "http://{proxy}".format(proxy=proxy), "https": "https://{proxy}".format(proxy=proxy)}
try:
r = head(conf.httpsUrl, headers=HEADER, proxies=proxies, timeout=conf.verifyTimeout, verify=False)
return True if r.status_code == 200 else False
except Exception as e:
return False
@ProxyValidator.addHttpValidator
def customValidatorExample(proxy):
"""自定义validator函数,校验代理是否可用, 返回True/False"""
return True