forked from hjdhnx/dr_py
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathfiles.py
152 lines (137 loc) · 5.46 KB
/
files.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# File : files.py
# Author: DaShenHan&道长-----先苦后甜,任凭晚风拂柳颜------
# Date : 2022/9/6
import os
import shutil
from utils.system import getHost
from controllers.service import storage_service
from utils.encode import base64Encode,parseText
from flask import render_template_string
from utils.log import logger
def getPics(path='images'):
base_path = os.path.dirname(os.path.abspath(os.path.dirname(__file__))) # 上级目录
img_path = os.path.join(base_path, f'{path}')
os.makedirs(img_path,exist_ok=True)
file_name = os.listdir(img_path)
# file_name = list(filter(lambda x: str(x).endswith('.js') and str(x).find('模板') < 0, file_name))
# print(file_name)
pic_list = [img_path+'/'+file for file in file_name]
# pic_list = file_name
# print(type(pic_list))
return pic_list
def get_live_url(new_conf,mode):
host = getHost(mode)
lsg = storage_service()
# t1 = time()
# live_url = host + '/lives' if new_conf.get('LIVE_MODE',1) == 0 else lsg.getItem('LIVE_URL',getHost(2)+'/lives')
live_url = host + '/lives' if lsg.getItem('LIVE_MODE',1) == 0 else lsg.getItem('LIVE_URL',getHost(2)+'/lives')
live_url = base64Encode(live_url)
# print(f'{get_interval(t1)}毫秒')
return live_url
def getAlist():
base_path = os.path.dirname(os.path.abspath(os.path.dirname(__file__))) # 上级目录
alist_path = os.path.join(base_path, 'js/alist.conf')
alist_cpath = os.path.join(base_path, 'base/alist.conf')
try:
if not os.path.exists(alist_cpath):
shutil.copy(alist_path, alist_cpath) # 复制文件
with open(alist_cpath,encoding='utf-8') as f:
data = f.read().strip()
alists = []
for i in data.split('\n'):
i = i.strip()
dt = i.split(',')
if not i.strip().startswith('#'):
obj = {
'name': dt[0],
'server': dt[1],
'type':"alist",
}
if len(dt) > 2:
obj.update({
'password': dt[2]
})
alists.append(obj)
print(f'共计{len(alists)}条alist记录')
return alists
except Exception as e:
print(f'获取alist列表失败:{e}')
return []
def get_jar_list():
base_path = os.path.dirname(os.path.abspath(os.path.dirname(__file__))) # 上级目录
jar_path = os.path.join(base_path, 'libs/jar')
if not os.path.exists(jar_path):
os.makedirs(jar_path, exist_ok=True)
logger.info(f'初始化{jar_path}目录')
jars = os.listdir(jar_path)
jars = list(filter(lambda x: str(x).endswith('.jar') and str(x).find('base') < 0, jars))
# print(jars)
# jar_list = [file.replace('.jar', '') for file in jars]
return jars
def get_drop_js(jsd_list):
base_path = os.path.dirname(os.path.abspath(os.path.dirname(__file__))) # 上级目录
js_path = os.path.join(base_path, 'js')
js_list = [os.path.join(js_path, jsd.replace('jsd','js')) for jsd in jsd_list]
return js_list
def get_jsd_list():
base_path = os.path.dirname(os.path.abspath(os.path.dirname(__file__))) # 上级目录
js_path = os.path.join(base_path, 'js')
if not os.path.exists(js_path):
os.makedirs(js_path, exist_ok=True)
logger.info(f'初始化{js_path}目录')
jsds = os.listdir(js_path)
jsds = list(filter(lambda x: str(x).endswith('.jsd'), jsds))
return jsds
def custom_merge(original:dict,custom:dict):
"""
合并用户配置
:param original: 原始配置
:param custom: 自定义配置
:return:
"""
if not custom or len(custom.keys()) < 1:
return original
new_keys = custom.keys()
updateObj = {}
extend_obj = {}
for key in ['wallpaper','spider','homepage','lives','hotSearch','sniffer','recommend','rating','rules']:
if key in new_keys:
updateObj[key] = custom[key]
for key in ['drives','sites','flags','ads','parses']:
if key in new_keys:
extend_obj[key] = custom[key]
original.update(updateObj)
for key in extend_obj.keys():
# original[key].extend(extend_obj[key])
# print(key,original.get(key))
if original.get(key) and isinstance(original[key],list):
original[key].extend(extend_obj[key])
else:
original[key] = extend_obj[key]
logger.info(f'合并配置共有解析数量:{len(original.get("parses"))}')
return original
def getCustonDict(host,ali_token='',js0_password=''):
customFile = 'base/custom.conf'
if not os.path.exists(customFile):
with open(customFile, 'w+', encoding='utf-8') as f:
f.write('{}')
customConfig = False
try:
with open(customFile,'r',encoding='utf-8') as f:
text = f.read()
customConfig = parseText(render_template_string(text,host=host,ali_token=ali_token,js0_password=js0_password))
print(customConfig)
except Exception as e:
logger.info(f'用户自定义配置加载失败:{e}')
return customConfig
def get_multi_rules(rules):
lsg = storage_service()
multi_mode = lsg.getItem('MULTI_MODE',0)
fix_multi = ['drpy']
if not multi_mode or str(multi_mode)=='0':
rules['list'] = list(filter(lambda x: x['name'] in fix_multi or x.get('multi'), rules['list']))
rules['count'] = len(rules['list'])
# print(rules)
return rules