-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathproxy_broker.py
238 lines (187 loc) · 7.91 KB
/
proxy_broker.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
import socket
from struct import pack, unpack
import asyncio
from aisle import LogMixin
from client import Client, RemoteClientError
from xybase import StreamBase
from config_parse import PyxyConfig
# from memory_profiler import profile
SOCKS_VERSION = 5
class SocksError(Exception):
"""Sock协议错误"""
def __init__(self, msg: str = None):
super().__init__(msg)
self.message = msg
def __str__(self) -> str:
return f"SocksError: {self.message}"
class SockRelay(StreamBase, LogMixin):
"""维护本地Socks5代理"""
def __init__(
self,
config_all: PyxyConfig,
remote_addr: str,
remote_port: int,
name: str = None,
) -> None:
self.key_string = config_all.general["key"]
super().__init__(self.key_string, name=name)
self.config = config_all.client
self.username = self.config["username"]
self.password = self.config["password"]
self.sock_proxy_addr = self.config["socks5_address"]
self.sock_proxy_port = self.config["socks5_port"]
self.remote_addr = remote_addr
self.remote_port = remote_port
# self.run()
def run(self):
"""同步启动"""
try:
asyncio.run(self.start_sock_server())
except KeyboardInterrupt:
return
async def start_sock_server(self) -> None:
"""启动Socks5服务器"""
# TODO: 给Socks连接也加上TLS加密
server = await asyncio.start_server(
self.local_sock_handle,
self.sock_proxy_addr,
self.sock_proxy_port,
backlog=self.config["backlog"],
)
addr = server.sockets[0].getsockname()
self.logger.warning(f"服务器启动, 端口:{addr[1]}")
async with server:
await server.serve_forever()
@StreamBase.handlerDeco
async def local_sock_handle(
self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter
) -> None:
"""处理本地Socks5代理的请求"""
request_id = self.total_conn_count - 1
logger = self.logger.get_child(str(request_id))
logger.debug(f'接收来自{writer.get_extra_info("peername")}的连接')
# Socks5参考文献
# [RFC1928]
# https://www.quarkay.com/code/383/socks5-protocol-rfc-chinese-traslation
try:
# Socks5协议头
header = await reader.readexactly(2)
version, nmethods = unpack("!BB", header)
assert version == SOCKS_VERSION, SocksError("不支持的Socks版本")
assert nmethods > 0, SocksError("Socks请求包协议头错误,认证方式的数量不能小于0")
# 检查客户端支持的methods
methods = []
for _ in range(nmethods):
methods.append(
ord(
await reader.readexactly(1),
)
)
# 目前只兼容用户名密码方式
if 2 not in set(methods):
raise SocksError("不支持的身份验证方式")
# 发送支持的methods
writer.write(pack("!BB", SOCKS_VERSION, 2)) # 2表示用户名密码方式
await writer.drain()
# 验证身份信息
# [文档](https://www.jianshu.com/p/8001c40e5f83)
version = ord(await reader.readexactly(1))
assert version == 1, SocksError("不支持的身份验证版本")
username_len = ord(await reader.readexactly(1))
username = (await reader.readexactly(username_len)).decode("utf-8")
password_len = ord(await reader.readexactly(1))
password = (await reader.readexactly(password_len)).decode("utf-8")
if (username == self.username) and (password == self.password):
# 身份验证成功
writer.write(pack("!BB", version, 0)) # 0 表示正确
await writer.drain()
else:
# 身份验证失败
writer.write(pack("!BB", version, 0xFF)) # !0 表示不正确
await writer.drain()
raise SocksError("身份验证失败")
# 读取客户端请求
# request
version, cmd, _, address_type = unpack("!BBBB", await reader.readexactly(4))
assert version == SOCKS_VERSION, SocksError("不支持的Socks版本")
if address_type == 1: # IPv4
true_domain = ""
true_ip_bytes = await reader.readexactly(4)
if true_ip_bytes == b"":
raise SocksError("没有获取到目标IP地址")
true_ip = socket.inet_ntoa(true_ip_bytes)
elif address_type == 3: # 域名
true_ip = ""
domain_length = (await reader.readexactly(1))[0] # 返回int类型
true_domain_bytes = await reader.readexactly(domain_length)
true_domain = true_domain_bytes.decode("utf-8")
else:
raise SocksError(f"不支持的地址类型{address_type}")
true_port = unpack("!H", await reader.readexactly(2))[0]
logger.info(f"客户端请求 > {true_ip}|{true_domain}:{true_port}")
# 在远程创建真实链接
remote_client = Client(
self.key_string,
self.remote_addr,
self.remote_port,
tag=request_id,
)
response = await remote_client.remote_handshake(
payload={
"ip": true_ip,
"domain": true_domain,
"port": true_port,
}
)
bind_address, bind_port = response
if bind_address is None or bind_port is None:
raise RemoteClientError("远程的客户端错误")
bind_address_bytes = socket.inet_aton(bind_address)
bind_address_int = unpack("!I", bind_address_bytes)[0]
# 对Socks客户端响应连接的结果
reply = pack("!BBBBIH", SOCKS_VERSION, 0, 0, 1, bind_address_int, bind_port)
writer.write(reply)
await writer.drain()
# 建立数据交换
if not remote_client.remote_reader:
raise RemoteClientError("连接未建立")
if not remote_client.remote_writer:
raise RemoteClientError("连接未建立")
if reply[1] == 0 and cmd == 1:
await remote_client.exchange_stream(
reader,
writer,
remote_client.remote_reader,
remote_client.remote_writer,
)
except RemoteClientError as error:
logger.warning(f"远程创建连接失败 > {error}")
except SocksError as error:
logger.warning(f"Socks错误 > {error}")
except OSError as error:
logger.warning(f"OS错误 > {error}")
except Exception as error:
logger.warning(f"未知错误 > {type(error)}|{error}")
finally:
try:
writer.close()
await writer.wait_closed()
logger.debug("本地连接已关闭")
except Exception as error:
logger.debug(f"关闭本地连接失败,连接可能已断开 > {type(error)}|{error}")
try:
await remote_client.remote_close()
except Exception as error:
logger.debug(f"关闭远程连接失败,连接可能已断开 > {type(error)}|{error}")
logger.info("请求处理结束")
# DEBUG
# objgraph.show_growth()
if __name__ == "__main__":
config = PyxyConfig()
proxy_server = SockRelay(
config,
remote_addr=config.general["domain"],
remote_port=config.server["port"],
name='1'
)
proxy_server.run()