Skip to content

Commit

Permalink
修复并发bug
Browse files Browse the repository at this point in the history
  • Loading branch information
jooooock committed Nov 14, 2024
1 parent 870f8d0 commit 5096c71
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 148 deletions.
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@
"lucide-vue-next": "^0.441.0",
"mime": "^4.0.4",
"nuxt": "^3.12.3",
"nuxt-gtag": "^2.1.0",
"p-queue": "^8.0.1",
"uuid": "^11.0.3",
"v-calendar": "^3.1.2",
"vue": "latest",
"vue-chartjs": "^5.3.1",
Expand Down
107 changes: 41 additions & 66 deletions utils/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,31 +52,19 @@ async function downloadAssetWithProxy<T extends Blob | string>(url: string, prox
}
let targetURL = proxy ? `${proxy}?url=${encodeURIComponent(url)}&headers=${encodeURIComponent(JSON.stringify(headers))}` : url
targetURL = targetURL.replace(/^http:\/\//, 'https://')
const result = await $fetch<T>(targetURL, {

return await $fetch<T>(targetURL, {
retry: 0,
timeout: timeout * 1000,
})

// 统计代理下载资源流量
if (proxy) {
if (result instanceof Blob) {
pool.pool.incrementTraffic(proxy, result.size)
} else {
pool.pool.incrementTraffic(proxy, new Blob([result]).size)
}
}

return result
}

async function measureExecutionTime(label: string, taskFn: () => Promise<DownloadResult | DownloadResult[]>) {
async function measureExecutionTime(label: string, taskFn: () => Promise<undefined>) {
const start = Date.now()
const result = await taskFn()
const end = Date.now()
const total = (end - start) / 1000;

pool.formatDownloadResult(label, result, total)

return result
}

Expand Down Expand Up @@ -110,9 +98,7 @@ export async function downloadArticleHTML(articleURL: string, title?: string) {
return new Blob([html]).size
}

await measureExecutionTime('html下载结果:', async () => {
return await pool.downloads([articleURL], htmlDownloadFn)
})
await pool.downloads([articleURL], htmlDownloadFn)

if (!html) {
throw new Error('下载html失败,请稍后重试')
Expand Down Expand Up @@ -155,9 +141,7 @@ export async function downloadArticleHTMLs(articles: DownloadableArticle[], call
return new Blob([fullHTML]).size
}

await measureExecutionTime('html下载结果:', async () => {
return await pool.downloads(articles, htmlDownloadFn)
})
await pool.downloads(articles, htmlDownloadFn)

return results
}
Expand Down Expand Up @@ -433,14 +417,13 @@ export async function packHTMLAssets(html: string, title: string, zip?: JSZip) {
videoURLMap.set(url, `./assets/${uuid}.${ext}`)
return videoData.size
}
await measureExecutionTime('视频资源下载结果:', async () => {
const urls: string[] = []
if (poster) {
urls.push(poster)
}
urls.push(videoUrl)
return await pool.downloads<string>(urls, resourceDownloadFn, false)
})

const urls: string[] = []
if (poster) {
urls.push(poster)
}
urls.push(videoUrl)
await pool.downloads<string>(urls, resourceDownloadFn, false)

const div = document.createElement('div')
div.style.cssText = 'height: 381px;background: #000;border-radius: 4px; overflow: hidden;margin-bottom: 12px;'
Expand Down Expand Up @@ -478,27 +461,26 @@ export async function packHTMLAssets(html: string, title: string, zip?: JSZip) {

return audioData.size
}
await measureExecutionTime('音频资源下载结果:', async () => {
const assets: AudioResource[] = []
mpAudioEls.forEach(mpAudioEl => {
const uuid = new Date().getTime() + Math.random().toString()
mpAudioEl.setAttribute('data-uuid', uuid)
const cover = mpAudioEl.getAttribute('cover')!
const voice_encode_fileid = mpAudioEl.getAttribute('voice_encode_fileid')!
assets.push({
uuid: uuid,
type: 'cover',
url: cover,
})
assets.push({
uuid: uuid,
type: 'audio',
url: 'https://res.wx.qq.com/voice/getvoice?mediaid=' + voice_encode_fileid,
})
})

return await pool.downloads<AudioResource>(assets, audioResourceDownloadFn, false)
const assets: AudioResource[] = []
mpAudioEls.forEach(mpAudioEl => {
const uuid = new Date().getTime() + Math.random().toString()
mpAudioEl.setAttribute('data-uuid', uuid)
const cover = mpAudioEl.getAttribute('cover')!
const voice_encode_fileid = mpAudioEl.getAttribute('voice_encode_fileid')!
assets.push({
uuid: uuid,
type: 'cover',
url: cover,
})
assets.push({
uuid: uuid,
type: 'audio',
url: 'https://res.wx.qq.com/voice/getvoice?mediaid=' + voice_encode_fileid,
})
})

await pool.downloads<AudioResource>(assets, audioResourceDownloadFn, false)
}

// 下载内嵌视频
Expand All @@ -524,16 +506,15 @@ export async function packHTMLAssets(html: string, title: string, zip?: JSZip) {
videoURLMap.set(url, `./assets/${uuid}.${ext}`)
return videoData.size
}
await measureExecutionTime('视频资源下载结果:', async () => {
const urls: string[] = []
videoPageInfos.forEach(videoPageInfo => {
urls.push(videoPageInfo.cover_url)
if (videoPageInfo.is_mp_video === 1 && videoPageInfo.mp_video_trans_info.length > 0) {
urls.push(videoPageInfo.mp_video_trans_info[0].url)
}
})
return await pool.downloads<string>(urls, resourceDownloadFn, false)

const urls: string[] = []
videoPageInfos.forEach(videoPageInfo => {
urls.push(videoPageInfo.cover_url)
if (videoPageInfo.is_mp_video === 1 && videoPageInfo.mp_video_trans_info.length > 0) {
urls.push(videoPageInfo.mp_video_trans_info[0].url)
}
})
await pool.downloads<string>(urls, resourceDownloadFn, false)

const videoIframes = $jsArticleContent.querySelectorAll('iframe.video_iframe')
videoIframes.forEach(videoIframe => {
Expand Down Expand Up @@ -578,9 +559,7 @@ export async function packHTMLAssets(html: string, title: string, zip?: JSZip) {
}
const imgs = $jsArticleContent.querySelectorAll<HTMLImageElement>('img')
if (imgs.length > 0) {
await measureExecutionTime('图片下载结果:', async () => {
return await pool.downloads<HTMLImageElement>([...imgs], imgDownloadFn)
})
await pool.downloads<HTMLImageElement>([...imgs], imgDownloadFn)
}


Expand All @@ -607,9 +586,7 @@ export async function packHTMLAssets(html: string, title: string, zip?: JSZip) {
}
const url2pathMap = new Map<string, string>()

await measureExecutionTime('背景图片下载结果:', async () => {
return await pool.downloads<string>([...bgImageURLs], bgImgDownloadFn)
})
await pool.downloads<string>([...bgImageURLs], bgImgDownloadFn)

// 替换背景图片路径
pageContentHTML = pageContentHTML.replaceAll(/((?:background|background-image): url\((?:&quot;)?)((?:https?|\/\/)[^)]+?)((?:&quot;)?\))/gs, (_, p1, url, p3) => {
Expand Down Expand Up @@ -648,9 +625,7 @@ export async function packHTMLAssets(html: string, title: string, zip?: JSZip) {
let localLinks: string = ''
const links = document.querySelectorAll<HTMLLinkElement>('head link[rel="stylesheet"]')
if (links.length > 0) {
await measureExecutionTime('样式下载结果:', async () => {
return await pool.downloads<HTMLLinkElement>([...links], linkDownloadFn, false)
})
await pool.downloads<HTMLLinkElement>([...links], linkDownloadFn, false)
}

// 处理自定义组件
Expand Down
98 changes: 27 additions & 71 deletions utils/pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,16 @@ import dayjs from "dayjs"
import {AVAILABLE_PROXY_LIST} from '~/config'
import type {DownloadableArticle} from "~/types/types"
import type {AudioResource, VideoResource} from "~/types/video"
import {v4 as uuid} from 'uuid'
import PQueue from 'p-queue'

/**
* 代理实例
*/
export interface ProxyInstance {
// 唯一标识
id: string

// 代理地址
address: string

Expand All @@ -30,7 +35,7 @@ export interface ProxyInstance {
traffic: number
}

// 代理下载的资源
// 使用代理下载的资源类型
type DownloadResource =
| string
| HTMLLinkElement
Expand Down Expand Up @@ -83,6 +88,7 @@ class ProxyPool {

constructor(proxyUrls: string[]) {
this.proxies = proxyUrls.map(url => ({
id: uuid(),
address: url,
busy: false,
cooldown: false,
Expand All @@ -96,11 +102,11 @@ class ProxyPool {
/**
* 初始化代理池
* 可以传入新的代理地址列表(私有代理地址)
* @param proxyUrls
*/
init(proxyUrls: string[] = []) {
if (proxyUrls.length > 0) {
this.proxies = proxyUrls.map(url => ({
id: uuid(),
address: url,
busy: false,
cooldown: false,
Expand All @@ -113,10 +119,16 @@ class ProxyPool {
this.proxies.forEach(proxy => {
proxy.busy = false
proxy.cooldown = false
proxy.usageCount = 0
proxy.successCount = 0
proxy.failureCount = 0
})
}
}

/**
* 获取可用代理
*/
async getAvailableProxy() {
let time = 0
while (true) {
Expand All @@ -137,6 +149,11 @@ class ProxyPool {
}
}

/**
* 释放代理
* @param proxy 代理对象
* @param success 使用当前代理的本次下载是否成功
*/
releaseProxy(proxy: ProxyInstance, success: boolean) {
proxy.busy = false

Expand All @@ -146,12 +163,12 @@ class ProxyPool {
proxy.failureCount++
proxy.cooldown = true

// 5秒冷却时间
// 2秒冷却时间
setTimeout(() => {
proxy.cooldown = false;
}, 5_000);
}, 2_000);

if (proxy.failureCount >= 5 && proxy.successCount === 0) {
if (proxy.failureCount >= 10 && proxy.successCount === 0) {
// 代理被识别为不可用,从代理池中移除
console.warn(`代理 ${proxy.address} 不可用,将被移除`)
this.removeProxy(proxy)
Expand All @@ -163,42 +180,7 @@ class ProxyPool {
* 移除代理
*/
removeProxy(proxy: ProxyInstance) {
this.proxies = this.proxies.filter(p => p.address !== proxy.address)
}

printProxyUsage() {
console.debug('代理使用情况:')
let traffic = 0
const usageData = this.proxies.map(proxy => {
traffic += proxy.traffic
return {
'代理': proxy.address,
'使用次数': proxy.usageCount,
'下载流量': formatTraffic(proxy.traffic),
'成功次数': proxy.successCount,
'失败次数': proxy.failureCount,
'成功率': proxy.usageCount === 0 ? '-' : ((proxy.successCount / proxy.usageCount) * 100).toFixed(2) + '%',
}
});
// 增加总计
usageData.push({
'代理': '总计',
'使用次数': usageData.reduce((total, item) => total + item['使用次数'], 0),
'下载流量': formatTraffic(traffic),
'成功次数': usageData.reduce((total, item) => total + item['成功次数'], 0),
'失败次数': usageData.reduce((total, item) => total + item['失败次数'], 0),
'成功率': '-',
})
console.table(usageData);
}

incrementTraffic(address: string, bytes: number) {
const proxy = this.proxies.find(proxy => proxy.address === address)
if (proxy) {
proxy.traffic += bytes
} else {
console.warn(`代理${address}未找到`)
}
this.proxies = this.proxies.filter(p => p.id !== proxy.id)
}
}

Expand Down Expand Up @@ -231,7 +213,7 @@ async function downloadResource<T extends DownloadResource>(proxy: ProxyInstance
* @param useProxy
* @param maxRetries
*/
async function downloadWithRetry<T extends DownloadResource>(pool: ProxyPool, resource: T, downloadFn: DownloadFn<T>, useProxy = true, maxRetries = 30): Promise<DownloadResult> {
async function downloadWithRetry<T extends DownloadResource>(pool: ProxyPool, resource: T, downloadFn: DownloadFn<T>, useProxy = true, maxRetries = 10): Promise<DownloadResult> {
let attempts = 0;
let isSuccess = false;
let size: number = 0;
Expand Down Expand Up @@ -322,34 +304,8 @@ export async function downloads<T extends DownloadResource>(resources: T[], down
// 初始化 pool
pool.init(privateProxy)

console.debug('本次下载使用代理为: ', pool.proxies)

const tasks = resources.map(resource => download<T>(resource, downloadFn, useProxy));
return await Promise.all(tasks)
}

/**
* 打印代理使用次数
*/
export function usage() {
pool.printProxyUsage();
}

export function formatDownloadResult(label: string, results: DownloadResult | DownloadResult[], total: number) {
if (!Array.isArray(results)) {
results = [results]
}
const queue = new PQueue({concurrency: pool.proxies.length})

console.debug(label)
console.debug(`总耗时: ${total.toFixed(2)}s`);

// 打印下载耗时明细
const downloadResults = results.map(result => ({
URL: result.url,
size: result.size,
'耗时': result.totalTime,
'重试次数': result.attempts,
'是否下载成功': result.success,
}))
console.table(downloadResults)
const tasks = resources.map(resource => queue.add(() => download<T>(resource, downloadFn, useProxy)))
await Promise.all(tasks)
}
Loading

0 comments on commit 5096c71

Please sign in to comment.