Skip to content

Commit

Permalink
Merge pull request #33 from pjsek-ai/feature/client-3.5.0
Browse files Browse the repository at this point in the history
Multiple changes according to client 3.5.0
  • Loading branch information
erikchan002 authored May 14, 2024
2 parents dfd5b79 + db28f39 commit e25ad7b
Show file tree
Hide file tree
Showing 18 changed files with 353 additions and 188 deletions.
14 changes: 11 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ build-backend = "hatchling.build"
name = "pypjsekai"
description = "Reverse engineered Python client for the mobile rhythm game Project SEKAI COLORFUL STAGE! feat. Hatsune Miku"
readme = "README.md"
requires-python = ">=3.11"
requires-python = ">=3.8"
license = "MIT"
keywords = []
authors = [
Expand All @@ -15,15 +15,18 @@ authors = [
classifiers = [
"Development Status :: 3 - Alpha",
"Programming Language :: Python",
"Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
]
dependencies = [
"msgpack==1.0.8",
"pycryptodome==3.20.0",
"requests==2.31.0",
"pydantic==2.7.0",
"PyJWT==2.8.0",
"requests==2.31.0",
]
dynamic = ["version"]

Expand All @@ -40,15 +43,20 @@ path = "src/pjsekai/__about__.py"

[tool.hatch.envs.default]
dependencies = [
"msgpack-types==0.2.0",
"pytest",
"pytest-cov",
"types-requests==2.31.0",
]
[tool.hatch.envs.default.scripts]
cov = "pytest --cov-report=term-missing --cov-config=pyproject.toml --cov=src/pjsekai --cov=tests {args}"
no-cov = "cov --no-cov {args}"

[tool.hatch.envs.minimum]
python = "3.8"

[[tool.hatch.envs.test.matrix]]
python = ["311", "312"]
python = ["3.12", "3.11", "3.10", "3.9", "3.8"]

[tool.coverage.run]
branch = true
Expand Down
118 changes: 56 additions & 62 deletions src/pjsekai/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import operator
from typing import Dict, List, Optional, Union
from uuid import uuid4
import warnings

from requests import Session, HTTPError
from jwt import encode as jwtEncode
Expand Down Expand Up @@ -183,7 +184,7 @@ def _pack(self, plaintext_dict: Optional[dict], enable_encryption: bool = True)
plaintext: bytes = msgpack(plaintext_dict)
return encrypt(plaintext, self.key or b"", self.iv or b"") if enable_encryption else plaintext

def _unpack(self, ciphertext: bytes, enable_decryption: bool = True) -> dict:
def _unpack(self, ciphertext: bytes, enable_decryption: bool = True) -> Optional[dict]:
plaintext: bytes = decrypt(
ciphertext, self.key or b"", self.iv or b"") if enable_decryption else ciphertext
return unmsgpack(plaintext)
Expand Down Expand Up @@ -245,7 +246,7 @@ def get_game_version(
game_version_domain: Optional[str] = None,
enable_game_version_encryption: Optional[bool] = None,
system_info: Optional[SystemInfo] = None,
) -> dict:
) -> Optional[dict]:
if game_version_domain is None:
game_version_domain = self.game_version_domain
if enable_game_version_encryption is None:
Expand Down Expand Up @@ -285,7 +286,7 @@ def get_game_version(
response=response, unpacked=unpacked) from e
else:
raise
if unpacked is None:
if unpack_exception is not None:
raise UnpackException(response.content) from unpack_exception
else:
return unpacked
Expand All @@ -296,7 +297,7 @@ def get_asset_bundle_info(
asset_bundle_info_domain: Optional[str] = None,
enable_asset_bundle_info_encryption: Optional[bool] = None,
system_info: Optional[SystemInfo] = None,
) -> dict:
) -> Optional[dict]:
if asset_bundle_info_domain is None:
asset_bundle_info_domain = self.asset_bundle_info_domain
if enable_asset_bundle_info_encryption is None:
Expand All @@ -315,8 +316,8 @@ def get_asset_bundle_info(
try:
unpacked = self._unpack(
response.content, enable_asset_bundle_info_encryption)
except ValueError:
pass
except ValueError as e:
unpack_exception = e
try:
response.raise_for_status()
except HTTPError as e:
Expand All @@ -328,7 +329,7 @@ def get_asset_bundle_info(
response=response, unpacked=unpacked) from e
else:
raise
if unpacked is None:
if unpack_exception is not None:
raise UnpackException(response.content) from unpack_exception
else:
return unpacked
Expand Down Expand Up @@ -391,7 +392,7 @@ def request(
api_domain: Optional[str] = None,
enable_api_encryption: Optional[bool] = None,
system_info: Optional[SystemInfo] = None,
) -> dict:
) -> Optional[dict]:
if api_domain is None:
api_domain = self.api_domain
if enable_api_encryption is None:
Expand All @@ -415,8 +416,8 @@ def request(
try:
unpacked = self._unpack(
response.content, enable_api_encryption)
except ValueError:
pass
except ValueError as e:
unpack_exception = e
try:
response.raise_for_status()
except HTTPError as e:
Expand All @@ -430,63 +431,67 @@ def request(
raise
self._session_token = response.headers.get(
"X-Session-Token", self._session_token)
if unpacked is None:
if unpack_exception is not None:
raise UnpackException(response.content) from unpack_exception
else:
return unpacked

def ping(self) -> dict:
def ping(self) -> Optional[dict]:
return self.request("GET", "")

def get_system(self) -> dict:
def get_system(self) -> Optional[dict]:
return self.request("GET", "system")

def register(self) -> dict:
def register(self) -> Optional[dict]:
return self.request("POST", "user", data=self.platform.info)

def authenticate(self, user_id: Union[int, str], credential: str) -> dict:
responseDict: dict = self.request(
def authenticate(self, user_id: Union[int, str], credential: str) -> Optional[dict]:
responseDict: Optional[dict] = self.request(
"PUT", f"user/{user_id}/auth", data={"credential": credential})
if "sessionToken" in responseDict:
if responseDict is not None and "sessionToken" in responseDict:
self._session_token = responseDict["sessionToken"]
return responseDict

def get_master_data(self, suite_master_split_path: Optional[List[str]] = None) -> dict:
def get_master_data(self, suite_master_split_path: Optional[List[str]] = None) -> Optional[dict]:
if suite_master_split_path is None:
suite_master_split_path = self.system_info.suite_master_split_path or []
if len(suite_master_split_path) > 0:
return functools.reduce(operator.or_, [self.request("GET", path) for path in suite_master_split_path])
return functools.reduce(lambda x,y: {
**x,
**y,
}, [self.request("GET", path) or {} for path in suite_master_split_path])
# return functools.reduce(operator.or_, [self.request("GET", path) or {} for path in suite_master_split_path]) # dict | dict not available in Python 3.8
else:
return self.request("GET", f"suite/master")

def get_notices(self) -> dict:
def get_notices(self) -> Optional[dict]:
return self.request("GET", f"information")

def get_user_data(self, user_id: Union[int, str], name: Optional[str] = None) -> dict:
def get_user_data(self, user_id: Union[int, str], name: Optional[str] = None) -> Optional[dict]:
params = {
"isForceAllReload": name is None,
"name": name,
}
return self.request("GET", f"suite/user/{user_id}", params=params)

def get_login_bonus(self, user_id: Union[int, str]) -> dict:
def get_login_bonus(self, user_id: Union[int, str]) -> Optional[dict]:
return self.request("PUT", f"user/{user_id}/home/refresh", data={
"refreshableTypes": [
"new_pending_friend_request",
"login_bonus"
]
})

def get_profile(self, user_id: Union[int, str]) -> dict:
def get_profile(self, user_id: Union[int, str]) -> Optional[dict]:
return self.request("GET", f"user/{user_id}/profile")

def set_tutorial_status(self, user_id: Union[int, str], tutorial_status: TutorialStatus) -> dict:
def set_tutorial_status(self, user_id: Union[int, str], tutorial_status: TutorialStatus) -> Optional[dict]:
return self.request("PATCH", f"user/{user_id}/tutorial", data={"tutorialStatus": tutorial_status.value})

def generate_transfer_code(self, user_id: Union[int, str], password: str) -> dict:
def generate_transfer_code(self, user_id: Union[int, str], password: str) -> Optional[dict]:
return self.request("PUT", f"user/{user_id}/inherit", data={"password": password})

def checkTransferCode(self, transfer_code: str, password: str) -> dict:
def check_transfer_code(self, transfer_code: str, password: str) -> Optional[dict]:
if self.jwt_secret is None:
raise MissingJWTScecret
token_payload = {
Expand All @@ -501,7 +506,7 @@ def checkTransferCode(self, transfer_code: str, password: str) -> dict:
}
return self.request("POST", f"inherit/user/{transfer_code}", params=params, headers=header)

def generate_credential(self, transfer_code: str, password: str) -> dict:
def generate_credential(self, transfer_code: str, password: str) -> Optional[dict]:
if self.jwt_secret is None:
raise MissingJWTScecret
token_payload = {
Expand All @@ -516,10 +521,10 @@ def generate_credential(self, transfer_code: str, password: str) -> dict:
}
return self.request("POST", f"inherit/user/{transfer_code}", params=params, headers=header)

def gacha(self, user_id: Union[int, str], gacha_id: int, gacha_behavior_id: int) -> dict:
def gacha(self, user_id: Union[int, str], gacha_id: int, gacha_behavior_id: int) -> Optional[dict]:
return self.request("PUT", f"user/{user_id}/gacha/{gacha_id}/gachaBehaviorId/{gacha_behavior_id}")

def receive_presents(self, user_id: Union[int, str], present_ids: List[str]) -> dict:
def receive_presents(self, user_id: Union[int, str], present_ids: List[str]) -> Optional[dict]:
return self.request("POST", f"user/{user_id}/present", data={
"presentIds": present_ids
})
Expand All @@ -533,7 +538,7 @@ def start_solo_live(
deck_id: int,
boost_count: int,
is_auto: bool
) -> dict:
) -> Optional[dict]:
return self.request("POST", f"user/{user_id}/live", data={
"musicId": music_id,
"musicDifficultyId": music_difficulty_id,
Expand All @@ -557,7 +562,7 @@ def end_solo_live(
life: int,
tap_count: int,
continue_count: int
) -> dict:
) -> Optional[dict]:
return self.request("PUT", f"user/{user_id}/live/{live_id}", data={
"score": score,
"perfectCount": perfect_count,
Expand All @@ -575,67 +580,56 @@ def get_event_rankings(
self,
user_id: Union[int, str],
event_id: int,
ranking_view_type: Optional[RankingViewType] = None,
# target_user_id: Optional[Union[int, str]] = None,
# target_rank: Optional[int] = None,
# higher_limit: Optional[int] = None,
# lower_limit: Optional[int] = None
) -> dict:
# if target_user_id is None and target_rank is None:
# target_user_id = user_id
ranking_view_type: Union[RankingViewType,str],
) -> Optional[dict]:
params = {
# "targetUserId": target_user_id,
# "targetRank": target_rank,
# "higherLimit": higher_limit,
# "lowerLimit": lower_limit,
"rankingViewType": ranking_view_type,
"rankingViewType": ranking_view_type if isinstance(ranking_view_type,str) else ranking_view_type.value,
}
return self.request("GET", f"user/{user_id}/event/{event_id}/ranking", params=params)

def get_event_teams_player_count(self, event_id: int) -> dict:
def get_event_border_ranking_scores(
self,
event_id: int,
) -> Optional[dict]:
return self.request("GET", f"event/{event_id}/ranking-border")

def get_event_teams_player_count(self, event_id: int) -> Optional[dict]:
warnings.warn("API no longer available. Will raise HTTP 500", DeprecationWarning, stacklevel=2)
return self.request("GET", f"cheerful-carnival-team-count/{event_id}")

def get_event_teams_point(self, event_id: int) -> dict:
def get_event_teams_point(self, event_id: int) -> Optional[dict]:
return self.request("GET", f"cheerful-carnival-team-point/{event_id}")

def get_rank_match_rankings(
self,
user_id: Union[int, str],
rank_match_season_id: int,
target_user_id: Optional[Union[int, str]] = None,
target_rank: Optional[int] = None,
higher_limit: Optional[int] = None,
lower_limit: Optional[int] = None
) -> dict:
if target_user_id is None and target_rank is None:
target_user_id = user_id
ranking_view_type: Union[RankingViewType,str],
) -> Optional[dict]:
params = {
"targetUserId": target_user_id,
"targetRank": target_rank,
"higherLimit": higher_limit,
"lowerLimit": lower_limit,
"rankingViewType": ranking_view_type if isinstance(ranking_view_type,str) else ranking_view_type.value,
}
return self.request("GET", f"user/{user_id}/rank-match-season/{rank_match_season_id}/ranking", params=params)

def get_room_invitations(self, user_id: Union[int, str]) -> dict:
def get_room_invitations(self, user_id: Union[int, str]) -> Optional[dict]:
return self.request("GET", f"user/{user_id}/invitation")

def send_friend_request(self, user_id: Union[int, str], target_user_id: Union[int, str], message: Optional[str] = None) -> dict:
def send_friend_request(self, user_id: Union[int, str], target_user_id: Union[int, str], message: Optional[str] = None) -> Optional[dict]:
return self.request("POST", f"user/{user_id}/friend/{target_user_id}", data={
"message": message,
"friendRequestSentLocation": "id_search",
})

def reject_friend_request(self, user_id: Union[int, str], request_user_id: Union[int, str]) -> dict:
def reject_friend_request(self, user_id: Union[int, str], request_user_id: Union[int, str]) -> Optional[dict]:
params = {
"type": "reject_friend_request",
}
return self.request("DELETE", f"user/{user_id}/friend/{request_user_id}", params=params)

def accept_friend_request(self, user_id: Union[int, str], request_user_id: Union[int, str]) -> dict:
def accept_friend_request(self, user_id: Union[int, str], request_user_id: Union[int, str]) -> Optional[dict]:
return self.request("PUT", f"user/{user_id}/friend/{request_user_id}")

def remove_friend(self, user_id: Union[int, str], friend_user_id: Union[int, str]) -> dict:
def remove_friend(self, user_id: Union[int, str], friend_user_id: Union[int, str]) -> Optional[dict]:
params = {
"type": "release_friend",
}
Expand Down
2 changes: 1 addition & 1 deletion src/pjsekai/asset.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,5 +67,5 @@ def __init__(self, version: str, hash: str, asset_directory: Optional[str] = Non

def get_asset_bundle_info(self, api_manager: APIManager) -> AssetBundleInfo:
self.asset_bundle_info = AssetBundleInfo(
**api_manager.get_asset_bundle_info(self._version))
**(api_manager.get_asset_bundle_info(self._version) or {}))
return self.asset_bundle_info
Loading

0 comments on commit e25ad7b

Please sign in to comment.