r = requests.get('http://baidu,com',params=data)
r.text # 获取文本内容 网页 字符串
r.content # 获取非文本内容 音频 视频 图片
r.json() # 返回字典格式
requests-cache
当重复请求过多或程序中断需要重新爬取
爬取一次,会进行缓存入库保存
import requests_cache
# 生成数据库文件
session = requsets_cache.CachedSession('demo')
for i in range(10):
session.get('http://baidu.com')
import requests_cache
import requests
# 生成数据库文夹
requsets_cache.install_cache('demo',backend='filesystem')
session = requests.Session()
for i in range(10):
session.get('http://baidu.com')
在xml/html中查找信息的语言
html/xml的解析库,解析后利用xpath进行查找元素,返回元素类型:<class 'lxml.etree._Element'>
## 下载文本
import requests
import pprint
import re
from lxml import etree
url = 'https://www.mingyannet.com/mingyan/257311586'
header = {
'User-Agent':'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/130.0.0.0 Safari/537.36',
"Referer":"www.mingyannet.com",
}
response = requests.get(url, headers=header, verify=False)
response.encoding = 'UTF-8' # 或者 'UTF-8',根据实际情况选择
v= etree.HTML(response.text).xpath('//div[@class="txt"]/p/text()')
print(v
和lxml一样,是HTML、xml解析器。
lxml只会局部遍历
BeautifulSoup4可以载入整个文档,进行完整DOM树,开销要大,性能要低
from bs4 import BeautifulSoup
import requests
response = requests.get(url, headers=header, verify=False)
response.encoding = 'UTF-8' # 或者 'UTF-8',根据实际情况选择
soup = BeautifulSoup(response.text,'html.parser')
# soup.select('#hh') # css选择器
# soup.prettify() #返回规范格式
# soup.get('alt') # 获取属性
print(soup.find_all('li',attrs={'class':'pager2'})[0].find_all('a'))
利用multiprocessing来绕过glb锁
import multiprocessing
def a(): pass
for i in range(5):
p = multiprocessing.Process(target=a)
p.deamon = True # 进程守护标志
p.start()
p.join()
多进程:
from multiprocessing import Pool
def a(index):pass
pool = Pool(processes = 3)
pool.map(a,[1,2,5])
pool.close()
pool.join()
线程池指定线程的数量
from concurrent.futures import ThreadPollExecutor
def a(){pass}
with ThreadPollExecutor(10) as f:
for i in range(100):
f.submit(a)
线程锁
from threading import Thread
import threading
lock = threading.Lock() #互斥锁
lock = threading.Condition() #互斥锁,多了两个状态,lock.wait(),lock.notify_all()
a = 1
def doing(name):
lock.acquire()
global a
a += 1
print(a)
lock.release()
def mutil_thred():
join_list = []
for i in range(10):
th = Thread(target=doing,args=["nihao"])
th.start()
join_list.append(th)
for i in join_list:
i.join()
# print(join_list)
mutil_thred()
import requests
import lxml
import os
from collections import deque
from threading import Thread
import threading
titles = list()
finish_flag = False
def gen_url():
global titles
global finish_flag
url = 'https://www.3bqg.cc/book/10597/'
header = {
'User-Agent':'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/130.0.0.0 Safari/537.36',
"Referer":"https://www.3bqg.cc/",
}
response = requests.get(url, headers=header, verify=False)
response.encoding = 'UTF-8' # 或者 'UTF-8',根据实际情况选择
charts = etree.HTML(response.text).xpath("//div[@class='listmain']//dd/a")
for i in charts[1:]:
finish_flag = False
if 'javascript' in i.get('href'):
continue
titles.append(i.text)
q.append('https://www.3bqg.cc'+i.get('href'))
finish_flag = True
def download():
while True:
if len(q) == 0 and finish_flag:
print('全部完成')
break
elif len(q) == 0:
continue
else:
url = q.popleft()
header = {
'User-Agent':'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/130.0.0.0 Safari/537.36',
"Referer":"https://www.3bqg.cc/",
}
response = requests.get(url, headers=header, verify=False)
response.encoding = 'UTF-8' # 或者 'UTF-8',根据实际情况选择
title = etree.HTML(response.text).xpath("//div[@class='content']/h1[@class='wap_none']/text()")[0]
content = etree.HTML(response.text).xpath("//div[@id='chaptercontent']/text()")
content = ''.join(content).replace('\u3000\u3000','\n').replace('请收藏本站:https://www.3bqg.cc。笔趣阁手机版:https://m.3bqg.cc ','')
with open(f'book/{title}.txt','w',encoding='utf-8') as f:
f.write(content)
print(f'{title}---已完成下载')
if __name__ == '__main__':
q = deque(maxlen=10000)
th1 = threading.Thread(target=gen_url)
th1.start()
for i in range(10):
th2 = threading.Thread(target=download)
th2.start()
asyncio协程库
切换函数,不切换线程,在单线程内进行,不需要锁机制,但遇到阻塞操作会阻塞掉整个程序
- 并发:微观上同一时刻只有一个任务在执行,宏观上不断任务切换(多线程)
- 并行:同一时刻多个任务执行,多核(多进程)
- 同步:等待前面代码执行完毕,才执行后续代码
- 异步:无需等待 (多协程)
异步请求库,requests是同步请求库
import aiohttp
import asyncio
with aiohttp.ClientSession() as session:
with session.get(url) as r:
r.text()
r.json()
r.content.read()
多线程是指程序中可以同时运行多个线程,每个线程可以独立执行任务。在多线程环境中,每个线程可以看作是CPU调度的独立流。对于网络请求,这意味着可以同时发起多个请求,每个线程负责一个或多个请求的处理。多线程的优点在于可以充分利用多核CPU的优势,提高计算密集型任务的处理效率。然而,多线程编程需要处理好线程之间的同步和通信问题,避免出现竞态条件和死锁。
异步下载,或称为异步I/O,是一种非阻塞的编程模型。在异步模式下,当一个任务(如网络请求)需要等待I/O操作(如等待网络响应)时,它不会阻塞当前线程,而是将控制权交还给事件循环,允许其他任务继续执行。一旦I/O操作完成,事件循环会通知相应的回调函数继续执行。这种方式特别适合I/O密集型任务,可以提高程序的响应性和吞吐量。Python中的asyncio
库和aiohttp
库就是实现异步网络请求的常用工具。
async:异步函数声明
await:挂起耗时任务,去执行其他
import aiofiles
import aiohttp
import asyncio
import requests
from lxml import etree
url = 'http://www.netbian.com'
header = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/130.0.0.0 Safari/537.36',
"Referer": "https://cn.bing.com/",
}
async def get_index_url(pageurl,semaphore):
# 获取所有页面的图片url
urls = ()
async with semaphore:
async with aiohttp.ClientSession() as session:
async with session.get(pageurl,headers=header) as r:
html = await r.text(encoding='gbk')
titlelist = etree.HTML(html).xpath('//div[@class="list"]//a//b/text()')
imglist =[i.get('src') for i in etree.HTML(html).xpath('//div[@class="list"]//a/img')]
# print(len(imglist) == len(imglist))
urls = (titlelist,imglist)
return urls
async def download(title,imgurl):
async with aiohttp.ClientSession() as session:
async with session.get(imgurl, headers=header) as r:
async with aiofiles.open(f'imgs/{title}.jpg', 'wb') as f:
await f.write(await r.content.read())
print(f'{title}---下载完成')
async def main():
semaphore = asyncio.Semaphore(10) #协程并发数量
task = []
response = requests.get(url, headers=header)
totalpage = etree.HTML(response.text).xpath('//div[@class="page"]/a[last()-1]/text()')[0]
for i in range(1, 5):
if i == 1:
index_page = 'http://www.netbian.com/'
else:
index_page = f'http://www.netbian.com/index_{i}.htm'
task.append(asyncio.create_task(get_index_url(index_page,semaphore)))
done,paddings = await asyncio.wait(task)
alltitle = []
allimgurl = []
for item in done:
titlelist,imglist = item.result()
alltitle += titlelist
allimgurl += imglist
# print(len(alltitle))
# print(len(allimgurl))
task2 = list()
for i,imgs_url in enumerate(alltitle):
task2.append(asyncio.create_task(download(alltitle[i],allimgurl[i])))
done, paddings = await asyncio.wait(task2)
if __name__ == '__main__':
asyncio.run(main())
返回元素类型:webElement
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.common.action_chains import ActionChains
opt = Options()
opt.add_argument('--disable-blink-features=AutomationControlled') # 隐藏浏览器痕迹
opt.add_experimental_option('excludeSwitches', ['enable-automation']) #隐藏自动化标识
opt.add_experimental_option('detach', True) #防止浏览器自动退出
opt.page_load_strategy = 'eager' #页面加载方式
browser =webdriver.Chrome(executable_path='E:\chromedriver-win64\chromedriver.exe',options=opt)
browser.implicitly_wait(10) #隐式等待,在10秒内不断查询元素,查不到则报错
browser.maximize_window()
# browser.set_window_position(100,100)
browser.get('https://www.3bqg.cc')
# browser.close() 关闭一个页面
# browser.quit()
page_info = browser.page_source #页面代码
# WebDriverWait(browser, 10).until(。。。。查询条件) 显式等待,可以单独处理某个元素
eletment = browser.find_element(by=By.XPATH, value='//div[@id="chaptercontent"]/text()')
# 在外部通过click方法跳转到新页面时,焦点还在旧页面,需要进行窗口句柄切换,new_window则会自动聚焦
eletment.click()
browser.switch_to.window(browser.window_handles[-1])
# 存在iframe时,也需要进行切换聚焦,之后可以操作iframe中的元素
iframe= browser.find_element(by=By.XPATH, value='//div[@id="chaptercontent"]/iframe')
browser.switch_to.frame(iframe)
browser.switch_to.default_content() # 在切回默认
#表单填充元素
# elememt.set_keys('1244212')
# 获取cookies
browser.get_cookies()
browser.add_cookies({'name':'tom'})
browser.delete_all_cookies()
抹去selenium的指纹
方法1:
文件下载地址:https://github.com/requireCool/stealth.min.js?tab=readme-ov-file
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
opt = Options()
# opt.add_argument('--headless') # 无头浏览器是指没有图形用户界面的浏览器
# opt.add_argument(
# 'user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/87.0.4280.88 Safari/537.36')
opt.add_argument('--disable-blink-features=AutomationControlled') # 隐藏浏览器痕迹
opt.add_experimental_option('excludeSwitches', ['enable-automation']) #隐藏自动化标识
opt.add_experimental_option('detach', True) #防止浏览器自动退出
opt.page_load_strategy = 'eager' #页面加载方式
browser =webdriver.Chrome(executable_path='E:\chromedriver-win64\chromedriver.exe',options=opt)
# 输入 stealth.min.js 文件路径
with open('./stealth.min.js') as f:
js = f.read()
browser.execute_cdp_cmd("Page.addScriptToEvaluateOnNewDocument", {
"source": js
})
browser.get('https://bot.sannysoft.com/') #指纹检查网站
cd C:\Program Files\Google\Chrome\Application\selenium_files #进入chrome安装路径
chrome.exe --remote-debugging-port=9992 --user-data-dir="E:\selenium_files" # 用户数据文件路径不能在chrome安装路径下,端口不重复即可
创建测试版快捷方式:复制快捷方式,在属性-》目标中添加exe之后的代码
第一次连接时用终端命令打开浏览器,之后可以直接使用快捷方式打开
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
opt = Options()
opt.debugger_address = '127.0.0.1:9992'
browser =webdriver.Chrome(executable_path='E:\chromedriver-win64\chromedriver.exe',options=opt)
browser.get('https://bot.sannysoft.com/') #指纹检查网站
方法3:
利用谷歌的开发版浏览器进行网页渲染,减少浏览器更新频次
支持异步
pip install pyppeteer==1.0.2
import pyppeteer
print(pyppeteer.__version__)
print(pyppeteer.executablePath())
import pyppeteer
from pyppeteer import launch
import asyncio
import time
async def main():
# 创建浏览器
browser = await launch(headless=False, args=['--disable-infobars', '--window-size=1680,1080'])
# 创建新页面
page = await browser.newPage()
# 跳转百度
await page.goto('https://www.baidu.com')
# 选择 搜索框,输入
await page.type('#kw', 'pyppeteer')
await page.click('#su')
time.sleep(2)
# 启动异步任务
asyncio.get_event_loop().run_until_complete(main())
#触发JS
dimensions = await page.evaluate('''
() => {
return {cookie :window.document.cookie,}
}
''')
dimensions.get('cookie')
await page.evaluate('window.srollBy(0,800)')
import pyppeteer
from pyppeteer import launch
import asyncio
from pyquery import PyQuery as pq
async def main():
#创建浏览器
browser = await launch(headless=False, args=['--disable-infobars', '--window-size=1680,1080'])
# 创建新页面
page = await browser.newPage()
# 跳转百度
await page.goto('https://www.baidu.com')
# 调用选择器
await page.waitForXPath('//div/a/p')
# 转为静态页面
doc = pq(await page.content())
title = doc('#kw').text()
await page.close()
return 1
num = asyncio.get_event_loop().run_until_complete(main())
from loguru import logger
logger.info('213323')
logger.warning('213323')
代理服务器, 代替浏览器向服务器发送数据,截取服务器返回的数据
fillder,charles,mitmproxy
利用OCR识别库
或者打码库(超级鹰)
借助OCR识别库
import cv2
class CalculateDistance:
def __init__(self,bgimg_path,slide_img_path,offset_x,offset_y,display):
self.bgimg = cv2.imread(bgimg_path)
self.offset_x = offset_x
self.offset_y = offset_y
self.slideimg = cv2.imread(slide_img_path,cv2.IMREAD_UNCHANGED)
# #计算缩放因子,全部缩放为50*50的图片
scale_x = 50/self.slideimg.shape[0]
scale_y = 50/self.slideimg.shape[1]
self.slide_scale_img = cv2.resize(self.slideimg,(0,0),fx=scale_x,fy=scale_y)
self.bg_cut_img = None
def get_distance(self):
slide_img_gray = cv2.cvtColor(self.slide_scale_img,cv2.COLOR_BGR2GRAY)
slide_edg = cv2.Canny(slide_img_gray,100,200)
#
bg_img = cv2.cvtColor(self.bg_cut_img,cv2.COLOR_BGR2GRAY)
bgimg_edg = cv2.Canny(bg_img,100,300)
h,w=slide_edg.shape
#进行匹配找到缺口位置
result = cv2.matchTemplate(bgimg_edg,slide_edg,cv2.TM_CCOEFF_NORMED)
min_val,max_val,min_loc,max_loc = cv2.minMaxLoc(result)
#缺口左上角位置
top_left =( max_loc[0],max_loc[1])
bottom_right =( max_loc[0] + w,max_loc[1] +h)
print(top_left)
slide_distance = top_left[0] + w +10
return slide_distance
def cut_bg_img(self):
print(self.slide_scale_img.shape)
h,w,_= self.slide_scale_img.shape
# 裁减背景区域,上下扩充10像素
self.bg_cut_img = self.bgimg[self.offset_y-10:self.offset_y+h +10,self.offset_x+w+10:]
def run(self):
self.cut_bg_img()
return self.get_distance()
from random import uniform
import pyautogui
# 当网站对动作链做了处理,可以使用pyautogui操作鼠标键盘
# pyautogui可以根据图片定位元素所在坐标
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
import time
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
def press_button():
# 按下登录按钮
pic_add = './2.png'
loc = pyautogui.locateOnScreen(pic_add,confidence=0.3) #定位主屏幕
loc = pyautogui.center(loc)
pyautogui.moveTo(loc)
pyautogui.doubleClick()
def input_info():
# 存在iframe时,也需要进行切换聚焦,之后可以操作iframe中的元素
iframe= browser.find_element(by=By.XPATH, value='//*[@id="anony-reg-new"]/div/div[1]/iframe')
browser.switch_to.frame(iframe)
login_elem = browser.find_element(by=By.XPATH, value='/html/body/div[1]/div[1]/ul[1]/li[2]')
login_elem.click()
time.sleep(2)
browser.find_element(by=By.XPATH, value='//*[@id="username"]').send_keys('18432180331')
time.sleep(2)
browser.find_element(by=By.XPATH, value='//*[@id="password"]').send_keys('wangyubu35')
time.sleep(2)
def handle_distace(distance):
# 将直线距离变成缓慢规矩,防止检测
import random
slow_distance = []
while sum(slow_distance) < distance:
slow_distance.append(random.randint(-2,15))
if sum(slow_distance) > distance:
slow_distance.append(distance - sum(slow_distance))
return slow_distance
def drag_slide(tracks):
# 拖动滑块
slide_dunck_img_path = './3.png'
loc = pyautogui.locateOnScreen(slide_dunck_img_path,confidence=0.3) #定位主屏幕
loc = pyautogui.center(loc)
pyautogui.moveTo(loc)
pyautogui.mouseDown()
for track in tracks:
pyautogui.move(track,uniform(-2,2),duration=0.1) #y在-2,2上波动
pyautogui.mouseUp()
if __name__ == '__main__':
opt = Options()
opt.debugger_address = '127.0.0.1:9992'
browser =webdriver.Chrome(executable_path='E:\chromedriver-win64\chromedriver.exe',options=opt)
browser.get('https://www.douban.com/')
input_info()
press_button()
## 进入滑块的iframe,截取背景图片和滑块的图片
WebDriverWait(browser,4).until(EC.visibility_of_element_located((By.XPATH, '//*[@id="tcaptcha_iframe_dy"]')))
iframe= browser.find_element(by=By.XPATH, value='//*[@id="tcaptcha_iframe_dy"]')
browser.switch_to.frame(iframe)
bg_img = browser.find_element(by=By.XPATH, value='//*[@id="slideBg"]')
bg_location = bg_img.location
print(bg_location)
bg_img = bg_img.screenshot_as_png
with open('./bgcode.png','wb') as f:
f.write(bg_img)
slide_img1 = browser.find_element(by=By.XPATH, value='//*[@id="tcOperation"]/div[7]')
slide_img2 = browser.find_element(by=By.XPATH, value='//*[@id="tcOperation"]/div[8]')
slide_img = slide_img2 if slide_img1.size['width'] / slide_img1.size['height'] >6 else slide_img1
slide_img_location = slide_img.location
# slide_img.size {heigjt:2,width:2}
print(slide_img_location)
slide_img = slide_img.screenshot_as_png
with open('./slide.png','wb') as f:
f.write(slide_img)
time.sleep(2)
offset_x = slide_img_location['x'] - bg_location['x']
offset_y = slide_img_location['y'] - bg_location['y']
distance = CalculateDistance('./bgcode.png','./slide.png',offset_x,offset_y,None).run()
print(distance)
tracks = handle_distace(distance)
drag_slide(tracks)
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.common.action_chains import ActionChains
from selenium.webdriver.common.keys import Keys
import time
from selenium.webdriver.support.select import Select
opt = Options()
opt.add_argument('--disable-blink-features=AutomationControlled') # 隐藏浏览器痕迹
opt.add_experimental_option('excludeSwitches', ['enable-automation']) #隐藏自动化标识
opt.add_experimental_option('detach', True) #防止浏览器自动退出
opt.page_load_strategy = 'eager' #页面加载方式
browser =webdriver.Chrome(executable_path='E:\chromedriver-win64\chromedriver.exe',options=opt)
browser.implicitly_wait(10) #隐式等待,在10秒内不断查询元素,查不到则报错
browser.maximize_window()
browser.get('https://www.12306.cn/index/')
ticket_element = browser.find_element(by=By.XPATH, value='//*[@id="J-chepiao"]/a')
# 鼠标悬浮在车票上
ActionChains(browser).move_to_element(ticket_element).perform()
# 点击单程
one_way_element= browser.find_element(by=By.XPATH, value='//*[@id="megamenu-3"]/div[1]/ul/li[1]/a')
ActionChains(browser).click(one_way_element).perform()
time.sleep(2)
#选择出发地
input1 = browser.find_element(by=By.XPATH,value='//*[@id="fromStationText"]')
ActionChains(browser).click(input1).pause(1).send_keys('西安').send_keys(Keys.ARROW_DOWN).pause(1).send_keys(Keys.ENTER).perform()
# 选择目的地
input2 = browser.find_element(by=By.XPATH,value='//*[@id="toStationText"]')
ActionChains(browser).click(input2).pause(1).send_keys('北京').send_keys(Keys.ARROW_DOWN,Keys.ARROW_DOWN).pause(1).send_keys(Keys.ENTER).perform()
#输入出发日期
time.sleep(2)
gotime = browser.find_element(by=By.XPATH,value='//*[@id="train_date"]')
ActionChains(browser).click(gotime).pause(1).send_keys(Keys.ARROW_RIGHT,Keys.ARROW_RIGHT).pause(1).send_keys(Keys.BACKSPACE*10).send_keys('2024-11-13',Keys.ENTER).perform()
# 点击高铁
time.sleep(1)
browser.find_element(by=By.XPATH,value='//*[@id="_ul_station_train_code"]/li[2]').click()
# 选择出发时间,并查询
start_time = browser.find_element(by=By.XPATH,value='//*[@id="cc_start_time"]')
# ActionChains(browser).click(start_time).pause(1).send_keys(Keys.ARROW_DOWN*2).send_keys(Keys.ENTER).perform() # 动作连方式
select_start_time = Select(start_time).select_by_visible_text('12:00--18:00') # 或者利用Select类直接选则选项
browser.find_element(by=By.XPATH,value='//*[@id="query_ticket"]').click()
![(./images/Snipaste_2024-11-07_15-07-25.png)
借助fake_userAgent库,构建user-agent池
from fake_useragent import UserAgent
ua = UserAgent()
user_agent = ua.random # 随机取
headers = {'User_Agent':user_agent}
import requests
url = 'https://www.bilibili.com/'
# 设置代理ip
proxy = {
'http':'218.87.205.142:21987'
}
res = requests.get(url,proxies=proxy)
print(res.text)
借助cookie绕过繁琐的登录过程
douban_cookie = browser.get_cookies()
#从已登录浏览器中保存cookie
#在未登录浏览器中,直接携带cookie
cookies_dict = {}
for cookie in douban_cookie:
cookies_dict = {
"domain":'.douban.com',
'name':cookie.get('name')
}
browser.add_cookie(cookies_dict)
browser.refresh()
ffmpeg下载地址:https://www.gyan.dev/ffmpeg/builds/
将ffmpeg的bin目录添加环境变量
import requests
from lxml import etree
import re
import json
import os
url = 'https://www.bilibili.com/video/BV1BpDmYcExg/?spm_id_from=333.1007.tianma.2-1-4.click'
# 需要添加cookie
headers = {
'User-Agent':'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/130.0.0.0 Safari/537.36',
"Referer":"https://www.bilibili.com/",
"cookie":"buvid3=2878D0CA-F866-5482-34C2-CDFFE834A28583693infoc; b_nut=1721786983; _uuid=7177EBEF-10157-99CE-1516-103BC8EF3F48E82945infoc; enable_web_push=DISABLE; buvid4=A4D446C8-2AC7-EED9-B160-60701CFB42B484375-024072402-x1ApGtTycEXtrsb8Jknm9g%3D%3D; rpdid=|(JY)R|lRJll0J'u~ku~)Ym)m; header_theme_version=CLOSE; DedeUserID=3493109184858837; DedeUserID__ckMd5=77f3a5db5cef9566; home_feed_column=5; buvid_fp_plain=undefined; fingerprint=e61065e3a3f57edad592cb8e395a8ea9; buvid_fp=e61065e3a3f57edad592cb8e395a8ea9; is-2022-channel=1; CURRENT_QUALITY=80; LIVE_BUVID=AUTO6817301802525650; PVID=1; CURRENT_FNVAL=4048; bp_t_offset_3493109184858837=996700689214210048; bili_ticket=eyJhbGciOiJIUzI1NiIsImtpZCI6InMwMyIsInR5cCI6IkpXVCJ9.eyJleHAiOjE3MzEyMzIzMDgsImlhdCI6MTczMDk3MzA0OCwicGx0IjotMX0.sH4afzxjqcHyazPzMAxeKfx_ETMDul6vBJPFWUO7OJw; bili_ticket_expires=1731232248; SESSDATA=a169b73e%2C1746525110%2C50d44%2Ab1CjCDoXFozPf2mTkINe69QCLXhBJrr5VSfFKtmVGrIni0QLmL3Vl6OXkmvW2kXd_1xDUSVmw1elpvT3lYVi05dXhOZ3FLYVc1TzJheVVPWnRyNno5elJCRHV5Z1pTWHdvQ0pJdFlkTm9wM3hoSGxhSmxncGZFNWRpRFpjNUZkMnZsVmtqUnY4cnB3IIEC; bili_jct=b17f7276217ee6085fc492a31879a3fe; sid=6a4eoyfl; bmg_af_switch=1; bmg_src_def_domain=i0.hdslb.com; browser_resolution=1536-483; b_lsid=2E5E4B1A_1930F674675"
}
# 获取视频音频链接
response = requests.get(url,headers=headers)
info = re.findall('window.__playinfo__=(.*?)</script>',response.text)
json_info = json.loads(info[0])
video_url = json_info['data']['dash']['video'][0]['baseUrl']
audio_url = json_info['data']['dash']['audio'][0]['baseUrl']
title= etree.HTML(response.text).xpath('//h1/text()')[0]
# 下载
video_content = requests.get(video_url, headers=headers).content
audio_content = requests.get(audio_url, headers=headers).content
with open(f'{title}.mp4','wb') as f:
f.write(video_content)
print(f'{title}.mp4下载完成')
with open(f'{title}.mp3','wb') as f:
f.write(audio_content)
# 调用ffmpeg 合并视频音频
cmd = fr'C:\Users\a\ffmpeg-7.1-full_build\bin\ffmpeg -i C:\Users\a\PycharmProjects\myModel_kgag_plus\celery_test\{title}.mp4 -i C:\Users\a\PycharmProjects\myModel_kgag_plus\celery_test\\{title}.mp3 -c:v copy -c:a aac -strict experimental C:\Users\a\PycharmProjects\myModel_kgag_plus\celery_test\final.mp4'
os.system(cmd)
import requests
import re
import json
url = 'https://liveapi.huya.com/moment/getMomentContent'
headers = {
'referer':'https://www.huya.com/',
'user-agent':'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/130.0.0.0 Safari/537.36',
}
data = {
# 'callback': 'jQuery11240486672588610648_1731137320619',
'videoId': '687097727',
'uid':'',
'_': '1731137320625',
}
response = requests.get(url,headers=headers,params=data)
title = json.loads(response.text)['data']['moment']['videoInfo']['videoTitle']
mp4 = json.loads(response.text)['data']['moment']['videoInfo']['definitions'][0]['url']
m3u8 = json.loads(response.text)['data']['moment']['videoInfo']['definitions'][0]['m3u8']
with open(f'{title}.mp4','wb') as f:
f.write(requests.get(mp4,headers=headers).content)
m3u8为视频切片文件,m3u8地址会返回每个切片的地址,所有切片请求结果进行拼接后组成完整视频。
import requests
import re
url = 'https://www.kuaishou.com/graphql'
data ={"operationName":"visionSearchPhoto","variables":{"keyword":"美食","pcursor":"","page":"search"},"query":"fragment photoContent on PhotoEntity {\n __typename\n id\n duration\n caption\n originCaption\n likeCount\n viewCount\n commentCount\n realLikeCount\n coverUrl\n photoUrl\n photoH265Url\n manifest\n manifestH265\n videoResource\n coverUrls {\n url\n __typename\n }\n timestamp\n expTag\n animatedCoverUrl\n distance\n videoRatio\n liked\n stereoType\n profileUserTopPhoto\n musicBlocked\n riskTagContent\n riskTagUrl\n}\n\nfragment recoPhotoFragment on recoPhotoEntity {\n __typename\n id\n duration\n caption\n originCaption\n likeCount\n viewCount\n commentCount\n realLikeCount\n coverUrl\n photoUrl\n photoH265Url\n manifest\n manifestH265\n videoResource\n coverUrls {\n url\n __typename\n }\n timestamp\n expTag\n animatedCoverUrl\n distance\n videoRatio\n liked\n stereoType\n profileUserTopPhoto\n musicBlocked\n riskTagContent\n riskTagUrl\n}\n\nfragment feedContent on Feed {\n type\n author {\n id\n name\n headerUrl\n following\n headerUrls {\n url\n __typename\n }\n __typename\n }\n photo {\n ...photoContent\n ...recoPhotoFragment\n __typename\n }\n canAddComment\n llsid\n status\n currentPcursor\n tags {\n type\n name\n __typename\n }\n __typename\n}\n\nquery visionSearchPhoto($keyword: String, $pcursor: String, $searchSessionId: String, $page: String, $webPageArea: String) {\n visionSearchPhoto(keyword: $keyword, pcursor: $pcursor, searchSessionId: $searchSessionId, page: $page, webPageArea: $webPageArea) {\n result\n llsid\n webPageArea\n feeds {\n ...feedContent\n __typename\n }\n searchSessionId\n pcursor\n aladdinBanner {\n imgUrl\n link\n __typename\n }\n __typename\n }\n}\n"}
headers = {
'content-type':'application/json',
'cookie':'kpf=PC_WEB; clientid=3; did=web_edcdda2d8a443107d46e7723f10ebac6; kpn=KUAISHOU_VISION',
'host':'www.kuaishou.com',
'origin':'https://www.kuaishou.com',
'referer':'https://www.kuaishou.com/search/video?searchKey=%E7%BE%8E%E9%A3%9F',
'user-agent':'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/130.0.0.0 Safari/537.36',
}
r = requests.post(url,json=data,headers= headers)
feeds = r.json()['data']['visionSearchPhoto']['feeds']
all_info ={re.sub("(#.*)",'',item['photo']['caption']).strip():item['photo']['manifest']['adaptationSet'][0]['representation'][0]['url'] for item in feeds}
for key,value in all_info.items():
if not os.path.exists('./videos'):
os.mkdir('./videos')
if '.m3u8?' in value:
pre_utl = '/'.join(value.split('/')[:-1])
m3u8_list = requests.get(value).text
m3u8_list = [pre_utl+'/'+i for i in re.sub('#.*','',m3u8_list).split()]
with open(f'./videos/{key}.mp4','ab') as f:
for item in m3u8_list:
f.write(requests.get(item).content)
print(key,'长视频下载完毕')
else:
with open(f'./videos/{key}.mp4','wb') as f:
f.write(requests.get(value).content)
print(key,'短视频下载完毕')
import hashlib
import requests
import re
from lxml import etree
import json
headers = {
'origin':'https://www.kugou.com',
'referer':'https://www.kugou.com/',
'user-agent':'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/130.0.0.0 Safari/537.36',
}
# 获取首页全部歌曲信息
def get_index_list(url = 'https://www.kugou.com/yy/rank/home/1-4681.html?from=rank'):
r = requests.get(url,headers=headers).text
r = re.findall('global.features = (.*?)</script>',r,re.S)[0].strip()[:-1] #re.S允许换行
r = json.loads(r)
print(r)
all_info = get_index_list()
# 获取每首歌曲下载地址
url = 'https://wwwapi.kugou.com/play/songinfo'
data = {
'srcappid':'2919',
'clientver':'20000',
'clienttime':'1731153630447',
'mid':'028177588c060f09007637efca4501f4',
'uuid':'028177588c060f09007637efca4501f4',
'dfid':'3ewLTP1E4zgo0vZ5E70yuuUj',
'appid':'1014',
'platid':'4',
'encode_album_audio_id':'aaglo114',
'token':'',
'userid':'0',
'signature':''
# 'signature':'77a59c534e05ed31dd03f2d43749d718',
}
# 签名函数存在问题
def signature_param(data, appid):
# 从data中排除'signature'键,并按键排序
sorted_params = sorted((k, v) for k, v in data.items() if k != 'signature')
# 将排序后的键值对拼接成字符串
param_str = ''.join(f'{k}{v}' for k, v in sorted_params)
# 将appid添加到字符串的开始和结束
param_str_with_appid = appid + param_str + appid
# 使用hashlib计算MD5哈希值
signature = hashlib.md5(param_str_with_appid.encode()).hexdigest()
return signature
data['signature'] = signature_param(data,data['appid'])
# print(data)
r = requests.get(url,headers=headers,params=data)
json.loads(r.text)
from datetime import datetime
import time
from random import uniform
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.common.action_chains import ActionChains
from selenium.webdriver.common.keys import Keys # 键盘事件
from selenium.webdriver.support.select import Select # 下拉列表事件
class GetTicket:
def __init__(self):
# 出发站,到达站,出发时间
self.departure_station = '北京南'
self.arrval_station = '石家庄'
self.departure_time = '2024-11-11'
# 车次
self.train_num = 'G689'
#开枪时间
self.schedule_time = datetime.strptime('2024-11-10 12:14:00','%Y-%m-%d %H:%M:%S')
# 坐席
self.seat_type = '一等座'
self.login_url = 'https://kyfw.12306.cn/otn/resources/login.html'
self.login_url = 'https://www.12306.cn/index/index.html' # 登录成功后的页面
self.login_url = 'https://kyfw.12306.cn/otn/leftTicket/init?linktypeid=dc&fs=%E5%8C%97%E4%BA%AC%E5%8D%97,VNP&ts=%E7%9F%B3%E5%AE%B6%E5%BA%84%E5%8C%97,VVP&date=2024-11-11&flag=N,Y,Y'
# 定时模块
def timing(self):
flag = True
while True:
now = datetime.now()
diff_time = (self.schedule_time - now).total_seconds()
if diff_time > 60:
print('等待中')
time.sleep(10)
elif diff_time < 30:
print('抢票时间不足30秒')
flag = False
break
else:
print('还有60秒开始。启动程序')
break
return flag
# 模拟登录
def login(self):
browser.get(self.login_url)
time.sleep(2)
# 输入用户名和密码
browser.find_element(by=By.XPATH,value='//*[@id="J-userName"]').send_keys('18332180338')
browser.find_element(by=By.XPATH,value='//*[@id="J-password"]').send_keys('donghui3664')
time.sleep(1)
browser.find_element(by=By.XPATH,value='//*[@id="J-login"]').click()
# 等待弹窗
WebDriverWait(browser,4).until(EC.visibility_of_element_located((By.XPATH, '//*[@id="modal"]')))
#输入身份证后四位和短信验证
browser.find_element(by=By.XPATH,value='//*[@id="id_card"]').send_keys('0011')
time.sleep(2)
browser.find_element(by=By.XPATH,value='//*[@id="verification_code"]').click()
code = input('输入验证码:')
browser.find_element(by=By.XPATH,value='//*[@id="code"]').send_keys(code)
browser.find_element(by=By.XPATH,value='//*[@id="sureClick"]').click()
# 验证进入页面元素
try:
WebDriverWait(browser,4).until(EC.visibility_of_element_located((By.XPATH, '//*[@id="js-minHeight"]/div[1]/div[2]/p')))
print('登录成功')
except Exception as e:
print(e)
print('登录失败')
browser.find_element(by=By.XPATH,value='//*[@id="J-index"]/a').click()
# 查询车票
def query_tickets(self):
browser.get(self.login_url)
time.sleep(2)
# 输入出发地,到达地,出发时间
ActionChains(browser).click(browser.find_element(by=By.XPATH,value='//*[@id="fromStationText"]')).pause(1).send_keys(self.departure_station).send_keys(Keys.ENTER).perform() # 动作链方式
time.sleep(2)
ActionChains(browser).click(browser.find_element(by=By.XPATH,value='//*[@id="toStationText"]')).pause(1).send_keys(Keys.BACKSPACE*10).send_keys(self.arrval_station).send_keys(Keys.ENTER).perform() # 动作链方式
time.sleep(2)
ActionChains(browser).click(browser.find_element(by=By.XPATH,value='//*[@id="train_date"]')).pause(1).send_keys(Keys.BACKSPACE*10).send_keys(self.departure_time).send_keys(Keys.ENTER).perform() # 动作链方式
time.sleep(2)
browser.find_element(by=By.XPATH,value='//*[@id="toolbar_Div"]/div[3]/div[2]/ul/li[1]/a').click()
time.sleep(1)
browser.find_element(by=By.XPATH,value='//*[@id="isHighDan"]').click()
browser.find_element(by=By.XPATH,value='//*[@id="search_one"]').click()
#切换句柄
browser.switch_to.window(browser.window_handles[-1])
time.sleep(2)
# 保持登录
def stay_login(self):
# 跳转到用户界面,不断刷新
browser.find_element(by=By.XPATH,value='//*[@id="login_user"]/span').click()
time.sleep(2)
while True:
diff_time= (self.schedule_time - datetime.now()).total_seconds()
if diff_time > 20:
browser.refresh()
print('保存登录')
time.sleep(2)
else:
print('还有15秒,开始抢票,进入抢票页面')
break
browser.back()
time.sleep(2)
# 抢票
def booking_ticket(self):
success_flag = False
browser.get(self.login_url)
browser.find_element(by=By.XPATH,value='//*[@id="query_ticket"]').click()
time.sleep(2)
train_list = [i.text for i in browser.find_elements(by=By.XPATH,value='//*[@class="train"]//a')]
# print(train_list)
for i in range(len(train_list)):
if self.train_num == train_list[i]:
print('找到了该车次')
browser.find_element(by=By.XPATH,value=f'/html/body/div[2]/div[8]/div[13]/table/tbody//tr[{i+1}]//*[@class="btn72"]').click()
#勾选乘车人,填写信息
# 。。。。
success_flag = True
return success_flag
print('没找到当前车次')
return success_flag
# 运行
def run(self):
self.timing()
# self.login()
# self.query_tickets()
# self.stay_login()
max_try = 3 #抢票次数
try_num = 0
while True:
try_num+= 1
if try_num > max_try:
print('超过最大抢票次数')
break
if datetime.now() > self.schedule_time:
booking_flag = self.booking_ticket()
if booking_flag:
print('已经抢到票,请尽快付款')
break
else:
break
if __name__ == '__main__':
opt = Options()
opt.debugger_address = '127.0.0.1:9992'
opt.page_load_strategy = 'eager' #页面加载方式
browser =webdriver.Chrome(executable_path='E:\chromedriver-win64\chromedriver.exe',options=opt)
getTicket = GetTicket()
getTicket.run()
from datetime import datetime
import time
from random import uniform
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.common.action_chains import ActionChains
from selenium.webdriver.common.keys import Keys # 键盘事件
from selenium.webdriver.support.select import Select # 下拉列表事件
# 等待
def timing():
flag = True
while True:
now = datetime.now()
diff_time = (buy_timg_org - now).total_seconds()
if diff_time > 60:
print('等待中')
time.sleep(2)
elif diff_time < 30:
print('抢票时间不足30秒')
flag = False
break
else:
print('还有60秒开始。启动程序')
break
return flag
# 抢购
def buy():
timing()
url = 'https://trade.jd.com/shopping/order/getOrderInfo.action'
browser.get(url)
while True:
if datetime.now() > buy_timg_org:
browser.find_element(by=By.XPATH,value='//*[@id="order-submit"]').click()
if __name__ == '__main__':
opt = Options()
opt.debugger_address = '127.0.0.1:9992'
opt.page_load_strategy = 'eager' #页面加载方式
browser =webdriver.Chrome(executable_path='E:\chromedriver-win64\chromedriver.exe',options=opt)
buy_timg_org = datetime.strptime('2024-11-11 09:59:59.8', '%Y-%m-%d %H:%M:%S.%f')
buy()
页面滚动加载,并保存结果到excel
def down():
import rendom browser.execute_script(f'document.documentElement.srollBy(0,document.body.scrollHeight*{rendom.random(0,1)})')
time.sleep(rendom.random(0,1))
from datetime import datetime
import time
from random import uniform
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.common.action_chains import ActionChains
from selenium.webdriver.common.keys import Keys # 键盘事件
from selenium.webdriver.support.select import Select # 下拉列表事件
import pyautogui
from lxml import etree
from openpyxl import load_workbook # 写入excel
# 打开搜索页
def get_page_info(serch_key,page_num):
url= f'https://world.taobao.com/'
browser.get(url)
browser.find_element(by=By.XPATH,value='//*[@id="q"]').send_keys(serch_key)
browser.find_element(by=By.XPATH,value='//*[@id="J_TSearchForm"]/div[1]/button').click()
# 等待页面全部加载
WebDriverWait(browser,5).until(EC.presence_of_all_elements_located((By.XPATH,'//*[@id="search-content-leftWrap"]/div[3]/div[3]/div')))
# 无限滚动页面 scrollTo滚到指定位置,scrollBy 相对当前坐标,不断增加
for i in range(1):
# browser.execute_script('window.scrollTo({top: 800, behavior: "smooth"});')
# time.sleep(2)
total_scroll = 800
step_size = 100
pause = 0.03 # 暂停时间,单位秒
for i in range(0, total_scroll, step_size):
browser.execute_script('window.scrollBy(0, %d)' % step_size)
time.sleep(pause)
# #最后一次要滚动到底部,否则属性为空未填充
# browser.execute_script('window.scrollTo({top: document.body.scrollHeight, behavior: "smooth"});')
# 获取页面
page_text = browser.page_source
html = etree.HTML(page_text)
products = html.xpath('//*[@id="search-content-leftWrap"]/div[3]/div[3]/div/a[@class="doubleCardWrapper--_6NpK_ey"]')
get_product_info(products)
# 获取信息并保存
def get_product_info(products):
# 写入excel
wb = load_workbook(f'./template1.xlsx')
ws = wb['Sheet1']
for i in products:
try:
img_url= i.xpath('.//div[@class=" mainPicWrapper--qRLTAeii"]/img/@src')[0]
except:
# 页面元素缺少
print('之后页面元素未加载')
break
title= ''.join(i.xpath('.//*[@class="title--qJ7Xg_90 "]//span/text()'))
price = ''.join(i.xpath('.//*[@class="priceWrapper--dBtPZ2K1 "]/div[1]//span/text()'))
location = i.xpath('.//*[@class="procity--wlcT2xH9"]//span')[0].text
ws.append([title,price,location,img_url])
wb.save(f'./template1.xlsx')
# wb = load_wokrbook()
pass
if __name__ == '__main__':
opt = Options()
opt.debugger_address = '127.0.0.1:9992'
opt.page_load_strategy = 'eager' #页面加载方式
browser =webdriver.Chrome(executable_path='E:\chromedriver-win64\chromedriver.exe',options=opt)
serch_key = '充电宝'
page_num = 1
wb = load_workbook(f'./template.xlsx')
wb.save(f'./template1.xlsx')
get_page_info(serch_key,page_num)