forked from hjdhnx/dr_py
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsafePython.py
143 lines (126 loc) · 4.89 KB
/
safePython.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# File : safePython.py
# Author: DaShenHan&道长-----先苦后甜,任凭晚风拂柳颜------
# Date : 2022/8/28
import io
import tokenize
from func_timeout import func_set_timeout
from func_timeout.exceptions import FunctionTimedOut
from urllib.parse import urljoin, quote, unquote
import requests
import time
import json
import re
from lxml import etree
import datetime
import base64
from utils.log import logger
time_out_sec = 8 # 安全执行python代码超时
class my_exception(Exception):
def __init__(self, message):
self.message = message
def __str__(self):
message = f'函数执行超时: "{self.message}"'
return message
@func_set_timeout(time_out_sec)
def excute(*args):
exec(*args)
def check_unsafe_attributes(string):
"""
安全检测需要exec执行的python代码
:param string:
:return:
"""
g = tokenize.tokenize(io.BytesIO(string.encode('utf-8')).readline)
pre_op = ''
for toktype, tokval, _, _, _ in g:
if toktype == tokenize.NAME and pre_op == '.' and tokval.startswith('_'):
attr = tokval
msg = "access to attribute '{0}' is unsafe.".format(attr)
raise AttributeError(msg)
elif toktype == tokenize.OP:
pre_op = tokval
DEFAULT_PYTHON_CODE = """# 可用内置环境变量:
# - log: log(message): 打印日志功能
# - error: 弹出用户错误的弹窗
# 返回变量值: result = {...}\n\n
zyw_lists = env['hikerule.zyw.list'].with_context(active_test=True).sudo().search(
[('option', '=', 'zy'), ('cate_id.name', '!=', '18+'),('cate_id.is_bad', '!=', True)])
result = env['hikerule.zyw.list2data.wizard'].sudo().get_publish_value(zyw_lists)
"""
def safe_eval(code: str = '', localdict: dict = None):
code = code.strip()
logger.info('code:' + code)
if not code:
return {}
if localdict is None:
localdict = {}
builtins = __builtins__
builtins = dict(builtins).copy()
for key in ['__import__', 'eval', 'exec', 'globals', 'dir', 'copyright', 'open', 'quit']:
del builtins[key] # 删除不安全的关键字
# print(builtins)
global_dict = {'__builtins__': builtins,
'requests': requests, 'urljoin': urljoin, 'quote': quote, 'unquote': unquote,
'log': logger.info, 'json': json, 'print': print,
're': re, 'etree': etree, 'time': time, 'datetime': datetime, 'base64': base64
} # 禁用内置函数,不允许导入包
try:
check_unsafe_attributes(code)
# 待解决windows下运行超时的问题
try:
# excute(to_run_code, global_dict, localdict)
excute(code, global_dict, localdict)
return localdict
except FunctionTimedOut:
raise my_exception(f'safe_eval运行时间超过{time_out_sec}秒,疑似死循环,已被系统切断')
except Exception as e:
ret = f'执行报错:{e}'
logger.info(ret)
return ret
class safePython:
def __init__(self, name, code):
self.name = name or '未定义'
self.code = code
def action_task_exec(self, call=None, params=None):
"""
接口调用执行函数
:return:
"""
if not params:
params = []
builtins = __builtins__
builtins = dict(builtins).copy()
for key in ['__import__', 'eval', 'exec', 'globals', 'dir', 'copyright', 'open', 'quit']:
del builtins[key] # 删除不安全的关键字
# print(builtins)
global_dict = {'__builtins__': builtins,
'requests': requests, 'urljoin': urljoin, 'quote': quote, 'unquote': unquote,
'log': logger.info, 'json': json, 'print': print,
're': re, 'etree': etree, 'time': time, 'datetime': datetime, 'base64': base64
} # 禁用内置函数,不允许导入包
try:
check_unsafe_attributes(self.code)
localdict = {'result': None}
# 待解决windows下运行超时的问题
base_code = self.code.strip()
if call:
logger.info(f'开始执行:{call}')
try:
# excute(to_run_code, global_dict, localdict)
excute(base_code, global_dict, localdict)
run = localdict.get(call)
if run:
localdict['result'] = run(*params)
except FunctionTimedOut:
raise my_exception(f'函数[{self.name}]运行时间超过{time_out_sec}秒,疑似死循环,已被系统切断')
except Exception as e:
ret = f'执行报错:{e}'
logger.info(ret)
return ret
else:
# print(global_dict)
# print(localdict)
ret = localdict['result']
return ret