This repository has been archived by the owner on Sep 26, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathyoutube_util.py
89 lines (80 loc) · 3.9 KB
/
youtube_util.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
from asyncio.events import get_event_loop
import sys
import logging
import re
import aiohttp
import asyncio
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/83.0.4103.24 Safari/537.36'}
async def getLiveVideoId(id):
logger = logging.getLogger('youtube_util')
if len(id) == 24:
logger.debug('channelId:'+id)
videoid = await channelId2videoId(id)
logger.debug('videoid:'+str(videoid))
return videoid
else:
return await checkIsLive(id)
async def checkIsLive(videoid):
try:
async with aiohttp.ClientSession() as session:
async with session.get(f"https://www.youtube.com/watch?v={videoid}", headers=headers) as r:
if r.status == 200:
htmlsource = await r.text()
videoid = re.search(r'"isLive":true,', htmlsource)
if videoid is None: # \"isLive\":true,
return {'status': 'None'}
return {'videoid': videoid.group(), 'status': 'OK'}
else:
logging.getLogger('youtube_util').error(
'checkIsLive.status:'+r.status)
return {'status': 'None'}
except BaseException as e:
logging.error('youtube_util.checkIsLive.BaseException', exc_info=True)
return {'status': 'Error'}
async def channelId2videoId(channelId):
try:
async with aiohttp.ClientSession() as session:
async with session.get(f"https://www.youtube.com/channel/{channelId}/live", headers=headers) as r:
if r.status == 200:
htmlsource = await r.text()
if re.search(r'"isLive":true,', htmlsource) is None: # \"isLive\":true,
# debug
with open(f'test-{channelId}.html', 'w') as f:
f.write(htmlsource)
#
scheduledStartTime = re.search(
r'(?<="scheduledStartTime":")(.*?)(?=",)', htmlsource)
if scheduledStartTime == None:
scheduledStartTime = 'None'
else:
scheduledStartTime = int(scheduledStartTime.group())
status = re.search(
r'(?<="status":")(.*?)(?=",)', htmlsource)
if status is None:
return {'status': 'None'}
elif status.group() == 'LIVE_STREAM_OFFLINE':
return {'videoid': re.search(
r'(?<="videoId":")(.*?)(?=",)', htmlsource).group(), 'status': 'LIVE_STREAM_OFFLINE', 'scheduledStartTime': scheduledStartTime}
elif status.group() == 'OK': # 注:这是正常情况,返回的是频道主页界面
return {'status': 'None'}
else:
logging.getLogger('youtube_util').error(
f'channelId2videoId:[{channelId}]未知状态:{status.group()}')
return {'status': 'None'}
videoid = re.search(
r'(?<="videoId":")(.*?)(?=",)', htmlsource)
return {'videoid': videoid.group(), 'status': 'OK'}
else:
logging.getLogger('youtube_util').error(
f'channelId2videoId.status:{r.status}')
return {'status': 'None'}
except BaseException as e:
logging.error('youtube_util.checkIsLive.BaseException', exc_info=True)
return {'status': 'Error'}
if __name__ == "__main__":
# test only
loop = asyncio.get_event_loop()
task=loop.create_task(channelId2videoId(sys.argv[1]))
loop.run_until_complete(task)
print(task.result())