Skip to content

Commit

Permalink
feat: proper signature validation telegram
Browse files Browse the repository at this point in the history
  • Loading branch information
dartt0n committed Jul 21, 2024
1 parent 98bc466 commit 16dc382
Show file tree
Hide file tree
Showing 3 changed files with 556 additions and 514 deletions.
82 changes: 55 additions & 27 deletions src/adapter/external/auth/telegram.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
from __future__ import annotations

import hashlib
import hmac
from collections import OrderedDict
from datetime import datetime
from hashlib import sha256
from typing import Any
from urllib.parse import parse_qsl, urlencode

import jwt
from pydantic import BaseModel, ValidationError
Expand All @@ -29,7 +31,7 @@ class TgOauthLoginCallback(BaseModel):
auth_date: datetime
first_name: str | None
last_name: str | None
username: str
username: str | None
photo_url: str | None
hash: str

Expand All @@ -49,6 +51,18 @@ def to_data_string(self) -> str:

return data_string

def to_telegram_ordered_dict(self) -> OrderedDict:
data = OrderedDict(
id=self.id,
first_name=self.first_name,
last_name=self.last_name,
username=self.username,
photo_url=self.photo_url,
auth_date=self.auth_date,
hash=self.hash,
)
return data


class TgOauthRegisterCallback(TgOauthLoginCallback):
profile: TgUserProfileMixin
Expand Down Expand Up @@ -89,22 +103,18 @@ def __init__(self, secret_token: str, jwt_secret: str, service: UserService):
self.__jwt_secret = jwt_secret
self.__service = service

def __check_hash(
self, data: TgOauthRegisterCallback | TgOauthLoginCallback
) -> None:
signing = hmac.new(
self.__secret_token.encode(),
data.to_data_string().encode(),
hashlib.sha256,
)
def _validate_hash(self, data: Any) -> bool:
init_data = sorted(parse_qsl(urlencode(data)))
data_check_string = "\n".join(f"{k}={v}" for k, v in init_data if k != "hash")
hash_ = data["hash"]

if signing.hexdigest() != data.hash:
log.error("callback data validation faiked: hash mismatch")
raise auth_exception.InvalidCredentialsException(
"data is malformed since the hash does not match"
)
secret_key = sha256(self.__secret_token.encode())

calculated_hash = hmac.new(
key=secret_key.digest(), msg=data_check_string.encode(), digestmod=sha256
).hexdigest()

log.debug("callback data validation passed")
return calculated_hash == hash_

async def register(self, data: Any) -> TgOauthContainer:
try:
Expand All @@ -113,8 +123,11 @@ async def register(self, data: Any) -> TgOauthContainer:
data, from_attributes=True
)

log.debug("checking callback data hash")
self.__check_hash(request)
if not self._validate_hash(request.to_telegram_ordered_dict()):
log.error(
"failed to parse callback data with exception: invalid aiogram hash check"
)
raise auth_exception.InvalidCredentialsException("invalid hash")

log.debug(f"creating new user with telegram_id={request.id}")
user = await self.__service.create(
Expand Down Expand Up @@ -143,25 +156,40 @@ async def register(self, data: Any) -> TgOauthContainer:

async def login(self, data: Any) -> TgOauthContainer:
try:
log.debug("building callback data from custom data")
request = TgOauthLoginCallback.model_validate(data, from_attributes=True)

log.debug("checking callback data hash")
self.__check_hash(request)
try:
if not self._validate_hash(data):
log.error(
"failed to parse callback data with exception: invalid aiogram hash check"
)
raise auth_exception.InvalidCredentialsException("invalid hash")

log.debug("callback data hash is valid")

webapp_data = dict(parse_qsl(urlencode(data)))
except ValueError as e:
log.error("failed to parse callback data with exception: {}", e)
raise auth_exception.InvalidCredentialsException("invalid hash") from e

log.debug("building callback data from custom data")
request = TgOauthLoginCallback.model_validate(webapp_data)

log.debug("searching user by telegram id")
user = await self.__service.find_by_telegram_id(request.id)
except (ValidationError, database_exception.ReflectUserException) as e:
except auth_exception.AuthException as e:
raise e
except service_exception.ReadUserException as e:
log.error("reading user failed with service exception: {}", e)
raise auth_exception.UserNotFoundException("reading user failed") from e
except ValidationError as e:
log.error(
"failed to build callback data or reflect user data to read user with exception: {}",
e,
)
raise auth_exception.InvalidCredentialsException(
"failed to reflect user data to read user"
) from e
except service_exception.ReadUserException as e:
log.error("reading user failed with service exception: {}", e)
raise auth_exception.UserNotFoundException("reading user failed") from e
except auth_exception.AuthException as e:
except Exception as e:
log.error("authentiocation failed with auth exception: {}", e)
raise e

Expand Down
37 changes: 24 additions & 13 deletions src/app/http/routes/oauth.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
)
from src.protocol.external.auth.oauth import OauthProtocol
from src.service.user import UserService
from src.utils.logger.logger import Logger

log = Logger("oauth-router")


class UserCheckDTO(BaseModel):
Expand Down Expand Up @@ -72,24 +75,32 @@ async def options_handler(self, request: web.Request) -> web.Response:
return web.Response(status=200, text="OK", headers=CORS_HEADERS)

async def callback_handler(self, request: web.Request) -> web.Response:
try:
payload = request.query
with log.activity("callback_handler"):
try:
payload = request.query

container = await self._oauth_adapter.login(payload)
except UserNotFoundException:
return web.Response(
status=302,
headers={
"Location": self._user_form_redirect_url
+ "?"
+ request.query_string
},
)
except Exception as e:
log.error("failed to login user with exception: {}", e)
return web.Response(status=403, text="Callback data is malformed")

container = await self._oauth_adapter.login(payload)
except UserNotFoundException:
return web.Response(
status=302,
headers={"Location": self._user_form_redirect_url},
headers={
"Location": self._user_profile_redirect_url,
"Set-Cookie": f"AccessToken={container.to_string()}; HttpOnly",
},
)

return web.Response(
status=302,
headers={
"Location": self._user_profile_redirect_url,
"Set-Cookie": f"AccessToken={container.to_string()}; HttpOnly",
},
)

async def register_handler(self, request: web.Request) -> web.Response:
try:
payload = await request.json()
Expand Down
Loading

0 comments on commit 16dc382

Please sign in to comment.