-
Notifications
You must be signed in to change notification settings - Fork 92
/
Copy pathmain.py
153 lines (127 loc) · 5.61 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
# -*- encoding: utf-8 -*-
import os
import requests
import json
from bs4 import BeautifulSoup
from typing import Union
from fastapi import FastAPI
from fastapi.responses import JSONResponse
from fastapi.middleware.cors import CORSMiddleware
from fastapi import FastAPI, HTTPException
import random
from utils.spider import *
import hydra
from utils.logger import setup_logger
import schedule
import time
@hydra.main(config_path='data/', config_name='config', version_base=None)
def main(cfg: DictConfig):
# 初始化日志记录器
logger = setup_logger(cfg)
app = FastAPI()
@app.on_event("startup")
async def startup_event():
global logger
logger = setup_logger(cfg)
app.add_middleware(
CORSMiddleware,
allow_origins=cfg.app.cors_origins,
allow_credentials=cfg.app.cors_credentials,
allow_methods=cfg.app.cors_methods,
allow_headers=cfg.app.cors_headers,
)
def get_image_url(video_url: str) -> str:
try:
# 构建图片目录URL
image_dir_url = video_url.replace('index.m3u8', 'image/')
# 发送请求获取目录内容
response = requests.get(image_dir_url, timeout=20) # 设置超时时间防止长时间等待
response.raise_for_status() # 如果响应状态码不是200,抛出HTTPError
# 解析HTML并提取链接
soup = BeautifulSoup(response.text, 'html.parser')
a_tags = soup.find_all('a', href=True) # 只查找有href属性的<a>标签
# 分离出.webp和其他格式链接,并排除上级目录链接
links = [image_dir_url + tag['href'] for tag in a_tags if tag['href'] != '../']
webp_links = [link for link in links if link.endswith('.webp')]
# 优先返回.webp链接,如果没有则从其他链接中随机返回
if not links:
logger.warning("No image links found.")
return None
return random.choice(webp_links or links)
except Exception as e:
logger.error(f"Failed to obtain the image URL: {str(e)}")
return None
def read_random_line(file_path: str) -> tuple[str, str]:
"""Reads a random line from a given file and returns video URL and image URL."""
if not os.path.isfile(file_path):
logger.error("File not found")
raise HTTPException(status_code=404, detail="File not found")
with open(file_path, 'r') as file:
lines = file.readlines()
if not lines:
logger.error("File is empty")
raise HTTPException(status_code=400, detail="File is empty")
random_line = random.choice(lines).strip()
img_url = get_image_url(random_line)
return random_line, img_url
@app.get("/v1/hacg")
async def read_hacg():
try:
with open(cfg.files.hacg_json_path, 'r', encoding='utf-8') as file:
data = json.load(file)
logger.info("HACG data fetched successfully")
return JSONResponse({"data": data}, headers={'content-type': 'application/json;charset=utf-8'})
except Exception as e:
logger.error(f"Failed to fetch HACG data: {str(e)}")
raise HTTPException(status_code=500, detail="Internal Server Error")
@app.get("/v1/avcode/{code_str}")
async def crawl_av(code_str: str):
crawler = AVSpider(av_code=code_str,
source_url=cfg.av_spider.source_url,
proxy_url=cfg.av_spider.proxy_url,
use_proxy=cfg.av_spider.use_proxy,
cfg=cfg)
video_links = crawler.get_video_url()
all_magnet_links = []
for link in video_links:
magnet_links = crawler.get_magnet_links(link)
all_magnet_links.extend(magnet_links)
if not all_magnet_links:
logger.error("No magnet links found for AV code: %s", code_str)
raise HTTPException(status_code=404, detail="No magnet links found")
logger.info("Magnet links found for AV code: %s", code_str)
return {"status": "succeed", "data": [str(item) for item in all_magnet_links]}
@app.get("/v1/get_video")
async def get_random_video_url():
"""Returns a random video URL and its corresponding image URL."""
try:
file_path = cfg.files.video_urls_txt_path
video_url, img_url = read_random_line(file_path)
logger.info("Random video URL and image URL fetched successfully")
return {
"url": video_url,
"img_url": img_url or ""
}
except Exception as e:
logger.error(f"Failed to fetch random video URL: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))
def run_hacg_spider():
hacg_spider = HacgSpider(url=cfg.hacg_spider.source_url, filepath=cfg.files.hacg_json_path, cfg=cfg)
hacg_spider.update_json_file()
logger.info("HacgSpider task completed.")
# Schedule the HacgSpider task to run daily at 1 AM
schedule.every().day.at("01:00").do(run_hacg_spider)
# Function to keep running the scheduler in the background
def run_scheduler():
while True:
schedule.run_pending()
time.sleep(60) # Check every minute
import threading
# Start the scheduler in a separate thread
scheduler_thread = threading.Thread(target=run_scheduler)
scheduler_thread.daemon = True
scheduler_thread.start()
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
if __name__ == "__main__":
main()