Skip to content

Commit

Permalink
完善保存作品数据功能
Browse files Browse the repository at this point in the history
  • Loading branch information
JoeanAmier committed Dec 13, 2023
1 parent af8e3ca commit 84a0889
Show file tree
Hide file tree
Showing 8 changed files with 70 additions and 38 deletions.
18 changes: 16 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,13 @@
<h1>📑 功能清单</h1>
<ul>
<li>✅ 采集小红书图文/视频作品信息</li>
<li>✅ 提取小红书图文/视频作品文件下载地址</li>
<li>✅ 提取小红书图文/视频作品下载地址</li>
<li>✅ 下载小红书无水印图文/视频作品文件</li>
<li>✅ 自动跳过已下载的作品文件</li>
<li>✅ 作品文件完整性处理机制</li>
<li>☑️ 采集作品信息储存至文件</li>
<li>✅ 持久化储存作品信息至文件</li>
<li>☑️ 后台监听剪贴板下载作品</li>
<li>☑️ 支持 API 调用功能</li>
</ul>
<h1>📸 程序截图</h1>
<br>
Expand Down Expand Up @@ -137,6 +139,18 @@ async with XHS(path=path,
<td align="center">请求数据失败时,重试的最大次数,单位:秒</td>
<td align="center">5</td>
</tr>
<tr>
<td align="center">record_data</td>
<td align="center">bool</td>
<td align="center">是否记录作品数据至文件</td>
<td align="center">false</td>
</tr>
<tr>
<td align="center">image_format</td>
<td align="center">str</td>
<td align="center">图文作品文件名称后缀,例如:<code>jpg</code>、<code>png</code></td>
<td align="center">webp</td>
</tr>
</tbody>
</table>
<h1>🌐 Cookie</h1>
Expand Down
13 changes: 7 additions & 6 deletions source/Downloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from aiohttp import ServerTimeoutError
from rich.text import Text

from .Html import retry
from .Html import retry as re_download

__all__ = ['Download']

Expand All @@ -26,17 +26,18 @@ def __init__(
headers={"User-Agent": manager.headers["User-Agent"]},
timeout=ClientTimeout(connect=timeout))
self.retry = manager.retry
self.image_format = manager.image_format

async def run(self, urls: list, name: str, type_: int, log, bar):
if type_ == 0:
async def run(self, urls: list, name: str, type_: str, log, bar):
if type_ == "v":
await self.__download(urls[0], f"{name}.mp4", log, bar)
elif type_ == 1:
elif type_ == "n":
for index, url in enumerate(urls, start=1):
await self.__download(url, f"{name}_{index}.png", log, bar)
await self.__download(url, f"{name}_{index}.{self.image_format}", log, bar)
else:
raise ValueError

@retry
@re_download
async def __download(self, url: str, name: str, log, bar):
temp = self.temp.joinpath(name)
file = self.folder.joinpath(name)
Expand Down
8 changes: 3 additions & 5 deletions source/Explore.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ class Explore:
explore_data = compile(
r'"currentTime":\d{13},"note":(.*?)}},"serverRequestInfo"')
time_format = "%Y-%m-%d %H:%M:%S"
explore_type = {"video": "视频", "normal": "图文"}

def run(self, html: str) -> dict:
data = self.__get_json_data(html)
Expand Down Expand Up @@ -41,14 +42,11 @@ def __extract_tags(container: dict, data: dict):
tags = data.get("tagList", [])
container["作品标签"] = [i.get("name", "") for i in tags]

@staticmethod
def __extract_info(container: dict, data: dict):
def __extract_info(self, container: dict, data: dict):
container["作品ID"] = data.get("noteId")
container["作品标题"] = data.get("title")
container["作品描述"] = data.get("desc")
container["作品类型"] = {
"video": "视频", "normal": "图文"}.get(
data.get("type"), "未知")
container["作品类型"] = self.explore_type.get(data.get("type"), "未知")
container["IP归属地"] = data.get("ipLocation")

def __extract_time(self, container: dict, data: dict):
Expand Down
2 changes: 1 addition & 1 deletion source/Html.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from aiohttp import ServerDisconnectedError
from aiohttp import ServerTimeoutError

__all__ = ['Html']
__all__ = ["Html", "retry"]


def retry(function):
Expand Down
2 changes: 1 addition & 1 deletion source/Image.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ def __format_image_data(data: list[str]) -> list[dict]:

@staticmethod
def __generate_image_link(token: str) -> str:
return f"https://ci.xiaohongshu.com/{token}?imageView2/2/w/format/png"
return f"https://sns-img-bd.xhscdn.com/{token}"

def __extract_image_token(self, url: str) -> str:
return self.__generate_image_link(token.group(1)) if (
Expand Down
11 changes: 9 additions & 2 deletions source/Manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ def __init__(
folder: str,
user_agent: str,
cookie: str,
retry: int):
retry: int,
record_data: bool,
image_format: str,
):
self.root = root
self.temp = root.joinpath("./temp")
self.folder = self.__init_root(root, path, folder)
Expand All @@ -34,6 +37,8 @@ def __init__(
"-bcc2-a859e97518bf; unread={%22ub%22:%22655eb3d60000000032033955%22%2C%22ue%22:%22656"
"e9ef2000000003801ff3d%22%2C%22uc%22:29}; cache_feeds=[]"}
self.retry = retry
self.record_data = record_data
self.image_format = image_format

def __init_root(self, root: Path, path: str, folder: str) -> Path:
if path and (r := Path(path)).is_dir():
Expand Down Expand Up @@ -61,9 +66,11 @@ def clean(self):

def filter_name(self, name: str) -> str:
name = self.NAME.sub("_", name)
return sub(r"_+", "_", name)
return sub(r"_+", "_", name).strip("_")

def save_data(self, name: str, data: dict):
if not self.record_data:
return
with self.folder.joinpath(f"{name}.txt").open("a", encoding="utf-8") as f:
time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
content = f"{
Expand Down
2 changes: 2 additions & 0 deletions source/Settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ class Settings:
"timeout": 10,
"chunk": 1024 * 1024,
"max_retry": 5,
"record_data": False,
"image_format": "webp",
}
encode = "UTF-8-SIG" if system() == "Windows" else "UTF-8"

Expand Down
52 changes: 31 additions & 21 deletions source/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ class XHS:
SHARE = compile(r"https://www\.xiaohongshu\.com/discovery/item/[a-z0-9]+")
SHORT = compile(r"https://xhslink\.com/[A-Za-z0-9]+")
__INSTANCE = None
TYPE = {
"视频": "v",
"图文": "n",
}

def __new__(cls, *args, **kwargs):
if not cls.__INSTANCE:
Expand All @@ -54,6 +58,8 @@ def __init__(
timeout=10,
chunk=1024 * 1024,
max_retry=5,
record_data=False,
image_format="webp",
**kwargs,
):
self.manager = Manager(
Expand All @@ -62,7 +68,10 @@ def __init__(
folder_name,
user_agent,
cookie,
max_retry)
max_retry,
record_data,
image_format,
)
self.html = Html(
self.manager.headers,
proxy,
Expand All @@ -78,35 +87,29 @@ def __init__(
timeout, )
self.rich_log = self.download.rich_log

async def __get_image(self, container: dict, html: str, download, log, bar):
urls = self.image.get_image_link(html)
# self.rich_log(log, urls) # 调试代码
name = self.__naming_rules(container)
if download:
await self.download.run(urls, name, 1, log, bar)
container["下载地址"] = urls
self.manager.save_data(name, container)
def __extract_image(self, container: dict, html: str):
container["下载地址"] = self.image.get_image_link(html)

def __extract_video(self, container: dict, html: str):
container["下载地址"] = self.video.get_video_link(html)

async def __get_video(self, container: dict, html: str, download, log, bar):
url = self.video.get_video_link(html)
# self.rich_log(log, url) # 调试代码
async def __download_files(self, container: dict, download: bool, log, bar):
name = self.__naming_rules(container)
if download:
await self.download.run(url, name, 0, log, bar)
container["下载地址"] = url
if download and (u := container["下载地址"]):
await self.download.run(u, name, self.TYPE[container["作品类型"]], log, bar)
self.manager.save_data(name, container)

async def extract(self, url: str, download=False, log=None, bar=None) -> list[dict]:
# return # 调试代码
urls = await self.__deal_links(url)
urls = await self.__extract_links(url)
if not urls:
self.rich_log(log, "提取小红书作品链接失败", "bright_red")
else:
self.rich_log(log, f"共 {len(urls)} 个小红书作品待处理")
# return urls # 调试代码
return [await self.__deal_extract(i, download, log, bar) for i in urls]

async def __deal_links(self, url: str) -> list:
async def __extract_links(self, url: str) -> list:
urls = []
for i in url.split():
if u := self.SHORT.search(i):
Expand All @@ -130,10 +133,14 @@ async def __deal_extract(self, url: str, download: bool, log, bar):
if not data:
self.rich_log(log, f"{url} 提取数据失败", "bright_red")
return {}
if data["作品类型"] == "视频":
await self.__get_video(data, html, download, log, bar)
else:
await self.__get_image(data, html, download, log, bar)
match data["作品类型"]:
case "视频":
self.__extract_video(data, html)
case "图文":
self.__extract_image(data, html)
case _:
data["下载地址"] = []
await self.__download_files(data, download, log, bar)
self.rich_log(log, f"完成处理:{url}")
return data

Expand All @@ -145,6 +152,9 @@ async def __aenter__(self):
return self

async def __aexit__(self, exc_type, exc_value, traceback):
await self.close()

async def close(self):
self.manager.clean()
await self.html.session.close()
await self.download.session.close()
Expand Down

0 comments on commit 84a0889

Please sign in to comment.