forked from erma0/douyin
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathapi.py
318 lines (263 loc) · 8.79 KB
/
api.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
# -*- encoding: utf-8 -*-
'''
@File : api.py
@Time : 2023年06月19日 20:20:58 星期一
@Author : erma0
@Version : 0.1
@Link : https://erma0.cn
@Desc : 抖音爬虫封装API
'''
import os
import shutil
import stat
import threading
import time
from copy import deepcopy
from enum import Enum
from typing import List, Union
import ujson as json
import uvicorn
from fastapi import BackgroundTasks, FastAPI, HTTPException, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import FileResponse, UJSONResponse
from pydantic import BaseModel
from uvicorn.config import LOGGING_CONFIG
from browser import Browser
from spider import Douyin
date_fmt = "%Y-%m-%d %H:%M:%S"
LOGGING_CONFIG["formatters"]["access"]["fmt"] = '%(asctime)s %(levelprefix)s %(client_addr)s - "%(request_line)s" %(status_code)s'
LOGGING_CONFIG["formatters"]["default"]["fmt"] = "%(asctime)s %(levelprefix)s %(message)s"
LOGGING_CONFIG["formatters"]["default"]["datefmt"] = date_fmt
LOGGING_CONFIG["formatters"]["access"]["datefmt"] = date_fmt
title: str = '抖音爬虫API'
version: str = "0.1.0"
update = "20230619"
author = "erma0"
web = "https://douyin.erma0.cn/"
github = "https://github.com/erma0/douyin"
doc = "https://douyin.erma0.cn/docs"
description: str = """
### ❤️开源不易,欢迎star⭐
- 支持采集账号主页作品、喜欢作品、音乐原声作品、搜索作品、关注列表、粉丝列表、合集作品、单个作品
### 📢声明
> 本仓库为学习`playwright`爬虫、命令行调用`Aria2`及`FastAPI/AMIS/Eel`实现`WEBUI`的案例,仅用于测试和学习研究,禁止用于商业用途或任何非法用途。
> 任何用户直接或间接使用、传播本仓库内容时责任自负,本仓库的贡献者不对该等行为产生的任何后果负责。
> 如果相关方认为该项目的代码可能涉嫌侵犯其权利,请及时通知删除相关代码。
"""
contact = {"author": author, "url": github}
tags_metadata = [
{
"name": "同步",
"description": "耗时短的接口,直接返回结果",
},
{
"name": "异步",
"description": "耗时长的接口,无法直接返回结果,返回一个轮询接口",
},
]
edge = None
running_ls = []
download_path = './下载_API'
app = FastAPI(
title=title,
description=description,
version=version,
openapi_tags=tags_metadata,
contact=contact,
)
# 允许跨域
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"],
)
class DouyinAPI(Douyin):
def _append_awemes(self, aweme_list: List[dict]):
super()._append_awemes(aweme_list)
with open(f"{download_path}/{self.type}_{self.id}.json", 'w', encoding='utf-8') as f:
json.dump({'code': 1, 'num': len(self.results)}, f, ensure_ascii=False)
def _append_users(self, user_list: List[dict]):
super()._append_users(user_list)
with open(f"{download_path}/{self.type}_{self.id}.json", 'w', encoding='utf-8') as f:
json.dump({'code': 1, 'num': len(self.results)}, f, ensure_ascii=False)
def running(a: DouyinAPI, target: str):
# a.aria2_conf = f"{download_path}/{a.type}_{a.id}.txt"
a.run()
with open(f"{download_path}/{a.type}_{a.id}.json", 'w', encoding='utf-8') as f:
json.dump({
'code': 0,
'num': len(a.results),
'data': a.results,
}, f, ensure_ascii=False)
running_ls.remove(target)
class API(BaseModel):
Version: str = version
Update: str = update
Web: str = web
GitHub: str = github
Doc: str = doc
Time: str = time.ctime()
class DataVideo(BaseModel):
id: str
desc: str
download_addr: Union[str, List[str]]
time: int = None
digg_count: int = None
share_count: int = None
collect_count: int = None
comment_count: int = None
diggCount: int = None
shareCount: int = None
collectCount: int = None
commentCount: int = None
liveWatchCount: int = None
music_title: str = None
music_url: str = None
cover: str = None
tags: List[dict] = None
class DataUser(BaseModel):
nickname: str
sec_uid: str
uid: str = None
signature: str = None
avatar: str = None
short_id: str = None
unique_id: str = None
unique_id_modify_time: int = None
aweme_count: int = None
favoriting_count: int = None
follower_count: int = None
following_count: int = None
constellation: int = None
create_time: int = None
enterprise_verify_reason: str = None
is_gov_media_vip: bool = None
total_favorited: int = None
share_qrcode_uri: str = None
class DataSync(BaseModel):
code: int = 0 # 0 已完成;1 正在运行
num: int = 0
data: List[Union[DataVideo, DataUser]] = None
class DataAsync(BaseModel):
code: int = 0 # 0 成功投递;1 正在运行;
url: str
class TypeAsync(str, Enum):
post = 'post'
like = 'like'
favorite = 'favorite'
music = 'music'
search = 'search'
collection = 'collection'
follow = 'follow'
fans = 'fans'
@app.get(
"/",
response_class=UJSONResponse,
response_model=API,
)
def api(req: Request):
return {'Web': req.base_url._url, 'Doc': f'{req.base_url._url}docs'}
@app.get(
"/api/video",
response_class=UJSONResponse,
response_model=DataVideo,
tags=['同步'],
response_model_exclude_unset=True,
response_model_exclude_defaults=True,
)
def get_video(url: str):
start_browser()
a = DouyinAPI(context=edge.context, url=url, type='video', down_path=download_path)
a.run()
return deepcopy(a.results[0])
@app.get(
"/api/user",
response_class=UJSONResponse,
response_model=DataUser,
tags=['同步'],
response_model_exclude_unset=True,
response_model_exclude_defaults=True,
)
def get_user(id: str):
start_browser()
a = DouyinAPI(context=edge.context, url=id, type='id', down_path=download_path)
a.run()
return deepcopy(a.results[0])
@app.get("/api/info", response_class=UJSONResponse, tags=['同步'])
def get_info(url: str):
start_browser()
a = DouyinAPI(context=edge.context, url=url, down_path=download_path)
a.has_more = False
a.run()
return deepcopy(a.info)
@app.get(
"/api/{type_async}",
response_class=UJSONResponse,
response_model=DataAsync,
tags=['异步'],
status_code=201,
)
def start_async(type_async: TypeAsync, url: str, background_tasks: BackgroundTasks, req: Request):
start_browser()
target = f"{type_async.value}_{url.strip()}"
a = DouyinAPI(context=edge.context, url=url, type=type_async.value, down_path=download_path)
if target in running_ls:
code = 1
# return RedirectResponse(url=f"{req.base_url._url}/api/{type_async.value}/{a.id}", status_code=303)
else:
code = 0
running_ls.append(target)
background_tasks.add_task(running, a, target)
return {'code': code, 'url': f"{req.base_url._url}api/{type_async.value}/{a.id}"}
@app.get(
"/api/{type_async}/{id}",
response_class=UJSONResponse,
response_model=DataSync,
tags=['同步'],
response_model_exclude_unset=True,
response_model_exclude_defaults=True,
)
def async_result(type_async: TypeAsync, id: str, down: bool = False):
suffix = 'txt' if down else 'json'
file = f"{download_path}/{type_async.value}_{id}.{suffix}"
if not os.path.exists(file):
raise HTTPException(status_code=404, detail="目标不存在")
return FileResponse(file)
@app.get("/t", response_class=UJSONResponse)
def tt():
print('start')
time.sleep(3)
print('end')
return {"url": "type"}
@app.get("/init")
def start_browser():
global edge
if not isinstance(edge, Browser):
edge = Browser(headless=False)
# 删除只读文件夹
def remove_readonly(func, path, _):
os.chmod(path, stat.S_IWRITE)
func(path)
# 每10分钟清理一次下载文件夹,删除1小时以前的记录
def clean_download_path():
root = download_path
if not os.path.exists(root): os.makedirs(root)
while True:
now = time.time()
for file in os.listdir(root):
file_path = os.path.join(root, file)
if now - os.path.getmtime(file_path) > 60 * 60 * 1:
if os.path.isfile(file_path):
os.remove(file_path)
else:
shutil.rmtree(file_path, onerror=remove_readonly)
time.sleep(60 * 10)
# 启动后执行
@app.on_event("startup")
def startup():
threading.Thread(target=clean_download_path, daemon=True).start()
if __name__ == '__main__':
# uvicorn api:app --host '0.0.0.0' --port 567 --reload
# uvicorn.run("api:app", host='0.0.0.0', port=567, reload=True, limit_concurrency=5, use_colors=True)
uvicorn.run("api:app", host='0.0.0.0', port=567, limit_concurrency=5, use_colors=True)