Skip to content

Commit

Permalink
Merge branch 'feature/client' into develop
Browse files Browse the repository at this point in the history
  • Loading branch information
taizan-hokuto committed Jul 24, 2021
2 parents 6d581c2 + 604c52e commit 83b10ab
Show file tree
Hide file tree
Showing 14 changed files with 327 additions and 364 deletions.
2 changes: 1 addition & 1 deletion Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ verify_ssl = true
name = "pypi"

[packages]
httpx = {extras = ["http2"], version = "0.16.1"}
httpx = {extras = ["http2"]}

[dev-packages]
pytest-mock = "*"
Expand Down
262 changes: 112 additions & 150 deletions Pipfile.lock

Large diffs are not rendered by default.

62 changes: 34 additions & 28 deletions pytchat/core/pytchat.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,13 @@ class PytchatCore:
processor : ChatProcessor
client : httpx.Client
The client for connecting youtube.
You can specify any customized httpx client (e.g. coolies, user agent).
interruptable : bool
Allows keyboard interrupts.
Set this parameter to False if your own threading program causes
Set this parameter to False if your own multi-threading program causes
the problem.
force_replay : bool
Expand All @@ -57,13 +61,15 @@ class PytchatCore:
def __init__(self, video_id,
seektime=-1,
processor=DefaultProcessor(),
client = httpx.Client(http2=True),
interruptable=True,
force_replay=False,
topchat_only=False,
hold_exception=True,
logger=config.logger(__name__),
replay_continuation=None
):
self._client = client
self._video_id = util.extract_video_id(video_id)
self.seektime = seektime
if isinstance(processor, tuple):
Expand Down Expand Up @@ -97,7 +103,7 @@ def _setup(self):
"""
self.continuation = liveparam.getparam(
self._video_id,
channel_id=util.get_channelid(httpx.Client(http2=True), self._video_id),
channel_id=util.get_channelid(self._client, self._video_id),
past_sec=3)

def _get_chat_component(self):
Expand All @@ -110,19 +116,18 @@ def _get_chat_component(self):
parameter for next chat data
'''
try:
with httpx.Client(http2=True) as client:
if self.continuation and self._is_alive:
contents = self._get_contents(self.continuation, client, headers)
metadata, chatdata = self._parser.parse(contents)
timeout = metadata['timeoutMs'] / 1000
chat_component = {
"video_id": self._video_id,
"timeout": timeout,
"chatdata": chatdata
}
self.continuation = metadata.get('continuation')
self._last_offset_ms = metadata.get('last_offset_ms', 0)
return chat_component
if self.continuation and self._is_alive:
contents = self._get_contents(self.continuation, self._client, headers)
metadata, chatdata = self._parser.parse(contents)
timeout = metadata['timeoutMs'] / 1000
chat_component = {
"video_id": self._video_id,
"timeout": timeout,
"chatdata": chatdata
}
self.continuation = metadata.get('continuation')
self._last_offset_ms = metadata.get('last_offset_ms', 0)
return chat_component
except exceptions.ChatParseException as e:
self._logger.debug(f"[{self._video_id}]{str(e)}")
self._raise_exception(e)
Expand All @@ -139,9 +144,8 @@ def _get_contents(self, continuation, client, headers):
-------
'continuationContents' which includes metadata & chat data.
'''
livechat_json = (
self._get_livechat_json(continuation, client, replay=self._is_replay, offset_ms=self._last_offset_ms)
)
livechat_json = self._get_livechat_json(
continuation, client, replay=self._is_replay, offset_ms=self._last_offset_ms)
contents, dat = self._parser.get_contents(livechat_json)
if self._dat == '' and dat:
self._dat = dat
Expand All @@ -152,7 +156,8 @@ def _get_contents(self, continuation, client, headers):
self._fetch_url = config._smr
continuation = arcparam.getparam(
self._video_id, self.seektime, self._topchat_only, util.get_channelid(client, self._video_id))
livechat_json = self._get_livechat_json(continuation, client, replay=True, offset_ms=self.seektime * 1000)
livechat_json = self._get_livechat_json(
continuation, client, replay=True, offset_ms=self.seektime * 1000)
reload_continuation = self._parser.reload_continuation(
self._parser.get_contents(livechat_json)[0])
if reload_continuation:
Expand All @@ -173,15 +178,14 @@ def _get_livechat_json(self, continuation, client, replay: bool, offset_ms: int
offset_ms = 0
param = util.get_param(continuation, dat=self._dat, replay=replay, offsetms=offset_ms)
for _ in range(MAX_RETRY + 1):
with httpx.Client(http2=True) as client:
try:
response = client.post(self._fetch_url, json=param)
livechat_json = json.loads(response.text)
break
except (json.JSONDecodeError, httpx.ConnectTimeout, httpx.ReadTimeout, httpx.ConnectError) as e:
err = e
time.sleep(2)
continue
try:
response = client.post(self._fetch_url, json=param)
livechat_json = response.json()
break
except (json.JSONDecodeError, httpx.ConnectTimeout, httpx.ReadTimeout, httpx.ConnectError) as e:
err = e
time.sleep(2)
continue
else:
self._logger.error(f"[{self._video_id}]"
f"Exceeded retry count. Last error: {str(err)}")
Expand All @@ -202,6 +206,8 @@ def is_alive(self):
return self._is_alive

def terminate(self):
if not self.is_alive():
return
self._is_alive = False
self.processor.finalize()

Expand Down
20 changes: 13 additions & 7 deletions pytchat/core_async/livechat.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ def __init__(self, video_id,
seektime=-1,
processor=DefaultProcessor(),
buffer=None,
client = httpx.AsyncClient(http2=True),
interruptable=True,
callback=None,
done_callback=None,
Expand All @@ -88,6 +89,7 @@ def __init__(self, video_id,
logger=config.logger(__name__),
replay_continuation=None
):
self._client:httpx.AsyncClient = client
self._video_id = util.extract_video_id(video_id)
self.seektime = seektime
if isinstance(processor, tuple):
Expand Down Expand Up @@ -152,9 +154,10 @@ async def _startlisten(self):
create and start _listen loop.
"""
if not self.continuation:
channel_id = await util.get_channelid_async(self._client, self._video_id)
self.continuation = liveparam.getparam(
self._video_id,
channel_id=util.get_channelid(httpx.Client(http2=True), self._video_id),
channel_id,
past_sec=3)

await self._listen(self.continuation)
Expand All @@ -169,10 +172,10 @@ async def _listen(self, continuation):
parameter for next chat data
'''
try:
async with httpx.AsyncClient(http2=True) as client:
async with self._client as client:
while(continuation and self._is_alive):
continuation = await self._check_pause(continuation)
contents = await self._get_contents(continuation, client, headers)
contents = await self._get_contents(continuation, client, headers) #Q#
metadata, chatdata = self._parser.parse(contents)
continuation = metadata.get('continuation')
if continuation:
Expand Down Expand Up @@ -214,9 +217,10 @@ async def _check_pause(self, continuation):
'''
self._pauser.put_nowait(None)
if not self._is_replay:
async with httpx.AsyncClient(http2=True) as client:
continuation = await liveparam.getparam(self._video_id,
channel_id=util.get_channelid_async(client, self.video_id),
async with self._client as client:
channel_id = await util.get_channelid_async(client, self.video_id)
continuation = liveparam.getparam(self._video_id,
channel_id,
past_sec=3)

return continuation
Expand Down Expand Up @@ -338,12 +342,14 @@ def _finish(self, sender):
self._logger.debug(f'[{self._video_id}] cancelled:{sender}')

def terminate(self):
if not self.is_alive():
return
if self._pauser.empty():
self._pauser.put_nowait(None)
self._is_alive = False
self._buffer.put_nowait({})
self.processor.finalize()

def _keyboard_interrupt(self):
self.exception = exceptions.ChatDataFinished()
self.terminate()
Expand Down
15 changes: 10 additions & 5 deletions pytchat/core_multithread/livechat.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ class LiveChat:
def __init__(self, video_id,
seektime=-1,
processor=DefaultProcessor(),
client = httpx.Client(http2=True),
buffer=None,
interruptable=True,
callback=None,
Expand All @@ -88,6 +89,7 @@ def __init__(self, video_id,
logger=config.logger(__name__),
replay_continuation=None
):
self._client = client
self._video_id = util.extract_video_id(video_id)
self.seektime = seektime
if isinstance(processor, tuple):
Expand Down Expand Up @@ -150,7 +152,7 @@ def _startlisten(self):
if not self.continuation:
self.continuation = liveparam.getparam(
self._video_id,
channel_id=util.get_channelid(httpx.Client(http2=True), self._video_id),
channel_id=util.get_channelid(self._client, self._video_id),
past_sec=3)
self._listen(self.continuation)

Expand All @@ -164,7 +166,7 @@ def _listen(self, continuation):
parameter for next chat data
'''
try:
with httpx.Client(http2=True) as client:
with self._client as client:
while(continuation and self._is_alive):
continuation = self._check_pause(continuation)
contents = self._get_contents(continuation, client, headers)
Expand Down Expand Up @@ -224,7 +226,8 @@ def _get_contents(self, continuation, client, headers):
-------
'continuationContents' which includes metadata & chat data.
'''
livechat_json = self._get_livechat_json(continuation, client, replay=self._is_replay, offset_ms=self._last_offset_ms)
livechat_json = self._get_livechat_json(
continuation, client, replay=self._is_replay, offset_ms=self._last_offset_ms)
contents, dat = self._parser.get_contents(livechat_json)
if self._dat == '' and dat:
self._dat = dat
Expand All @@ -235,8 +238,8 @@ def _get_contents(self, continuation, client, headers):
self._fetch_url = config._smr
continuation = arcparam.getparam(
self._video_id, self.seektime, self._topchat_only, util.get_channelid(client, self._video_id))
livechat_json = (self._get_livechat_json(
continuation, client, replay=True, offset_ms=self.seektime * 1000))
livechat_json = self._get_livechat_json(
continuation, client, replay=True, offset_ms=self.seektime * 1000)
reload_continuation = self._parser.reload_continuation(
self._parser.get_contents(livechat_json)[0])
if reload_continuation:
Expand Down Expand Up @@ -331,6 +334,8 @@ def _finish(self, sender):
self._logger.debug(f'[{self._video_id}] cancelled:{sender}')

def terminate(self):
if not self.is_alive():
return
if self._pauser.empty():
self._pauser.put_nowait(None)
self._is_alive = False
Expand Down
4 changes: 3 additions & 1 deletion pytchat/processors/default/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from .renderer.paidsticker import LiveChatPaidStickerRenderer
from .renderer.legacypaid import LiveChatLegacyPaidMessageRenderer
from .renderer.membership import LiveChatMembershipItemRenderer
from .renderer.donation import LiveChatDonationAnnouncementRenderer
from .. chat_processor import ChatProcessor
from ... import config

Expand Down Expand Up @@ -124,7 +125,8 @@ def __init__(self):
"liveChatPaidMessageRenderer": LiveChatPaidMessageRenderer(),
"liveChatPaidStickerRenderer": LiveChatPaidStickerRenderer(),
"liveChatLegacyPaidMessageRenderer": LiveChatLegacyPaidMessageRenderer(),
"liveChatMembershipItemRenderer": LiveChatMembershipItemRenderer()
"liveChatMembershipItemRenderer": LiveChatMembershipItemRenderer(),
"liveChatDonationAnnouncementRenderer": LiveChatDonationAnnouncementRenderer(),
}

def process(self, chat_components: list):
Expand Down
6 changes: 6 additions & 0 deletions pytchat/processors/default/renderer/donation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from .base import BaseRenderer


class LiveChatDonationAnnouncementRenderer(BaseRenderer):
def settype(self):
self.chat.type = "donation"
Loading

0 comments on commit 83b10ab

Please sign in to comment.