From a55136c15f25706c57a643a6f62a34e7bebf4559 Mon Sep 17 00:00:00 2001 From: voidptr2x Date: Wed, 15 Feb 2023 17:28:55 -0800 Subject: [PATCH 1/2] Type hinting added, removed useless ifs --- keyauth.py | 239 +++++++++++++++++++++++++---------------------------- 1 file changed, 112 insertions(+), 127 deletions(-) diff --git a/keyauth.py b/keyauth.py index f0a2d67..e276093 100644 --- a/keyauth.py +++ b/keyauth.py @@ -1,10 +1,5 @@ -import os -import json as jsond # json -import time # sleep before exit -import binascii # hex encoding +import os, time, binascii, platform, subprocess, json as jsond from uuid import uuid4 # gen random guid -import platform # check platform -import subprocess # needed for mac device try: if os.name == 'nt': @@ -14,51 +9,39 @@ from Crypto.Hash import SHA256 from Crypto.Util.Padding import pad, unpad except ModuleNotFoundError: - print("Exception when importing modules") - print("Installing necessary modules....") + print("Exception when importing modules, Installing necessary modules....") if os.path.isfile("requirements.txt"): os.system("pip install -r requirements.txt") else: - os.system("pip install pywin32") - os.system("pip install pycryptodome") - os.system("pip install requests") + os.system("pip install pywin32; pip install pycryptodome; pip install requests") print("Modules installed!") - time.sleep(1.5) - os._exit(1) + exit(1) try: # Connection check - s = requests.Session() # Session - s.get('https://google.com') + requests.get('https://google.com') except requests.exceptions.RequestException as e: print(e) - time.sleep(3) - os._exit(1) + exit(1) +msg_exit = lambda a: print(a) -class api: - name = ownerid = secret = version = hash_to_check = "" +class api: + name: str + ownerid: str + secret: str + version: str + hash_to_check: str + sessionid: str + enckey: str + initialized: bool def __init__(self, name, ownerid, secret, version, hash_to_check): - self.name = name - - self.ownerid = ownerid - - self.secret = secret + self.name, self.ownerid, self.secret, self.version, self.hash_to_check = [name, ownerid, secret, version, hash_to_check] - self.version = version - self.hash_to_check = hash_to_check - self.init() - - sessionid = enckey = "" - initialized = False def init(self): - - if self.sessionid != "": - print("You've already initialized!") - time.sleep(2) - os._exit(1) + if self.sessionid != "": return print("You've already initialized!") init_iv = SHA256.new(str(uuid4())[:8].encode()).hexdigest() self.enckey = SHA256.new(str(uuid4())[:8].encode()).hexdigest() @@ -77,7 +60,7 @@ def init(self): if response == "KeyAuth_Invalid": print("The application doesn't exist") - os._exit(1) + return response = encryption.decrypt(response, self.secret, init_iv) json = jsond.loads(response) @@ -126,11 +109,12 @@ def register(self, user, password, license, hwid=None): if json["success"]: print("successfully registered") self.__load_user_data(json["info"]) - else: - print(json["message"]) - os._exit(1) + return + + print(json["message"]) + os._exit(1) - def upgrade(self, user, license): + def upgrade(self, user: str, license: str) -> None: self.checkinit() init_iv = SHA256.new(str(uuid4())[:8].encode()).hexdigest() @@ -145,24 +129,20 @@ def upgrade(self, user, license): } response = self.__do_request(post_data) - response = encryption.decrypt(response, self.enckey, init_iv) - json = jsond.loads(response) if json["success"]: print("successfully upgraded user") print("please restart program and login") - time.sleep(2) - os._exit(1) - else: - print(json["message"]) os._exit(1) + + print(json["message"]) + os._exit(1) - def login(self, user, password, hwid=None): + def login(self, user, password: str, hwid=None): self.checkinit() - if hwid is None: - hwid = others.get_hwid() + if hwid is None: hwid = others.get_hwid() init_iv = SHA256.new(str(uuid4())[:8].encode()).hexdigest() @@ -178,22 +158,20 @@ def login(self, user, password, hwid=None): } response = self.__do_request(post_data) - response = encryption.decrypt(response, self.enckey, init_iv) - json = jsond.loads(response) if json["success"]: self.__load_user_data(json["info"]) print("successfully logged in") - else: - print(json["message"]) - os._exit(1) + return + + print(json["message"]) + os._exit(1) - def license(self, key, hwid=None): + def license(self, key: str, hwid=None) -> None: self.checkinit() - if hwid is None: - hwid = others.get_hwid() + if hwid is None: hwid = others.get_hwid() init_iv = SHA256.new(str(uuid4())[:8].encode()).hexdigest() @@ -215,11 +193,12 @@ def license(self, key, hwid=None): if json["success"]: self.__load_user_data(json["info"]) print("successfully logged into license") - else: - print(json["message"]) - os._exit(1) + return + + print(json["message"]) + os._exit(1) - def var(self, name): + def var(self, name: str) -> str: self.checkinit() init_iv = SHA256.new(str(uuid4())[:8].encode()).hexdigest() @@ -233,19 +212,17 @@ def var(self, name): } response = self.__do_request(post_data) - response = encryption.decrypt(response, self.enckey, init_iv) - json = jsond.loads(response) if json["success"]: return json["message"] - else: - print(json["message"]) - time.sleep(5) - os._exit(1) + + print(json["message"]) + time.sleep(5) + os._exit(1) - def getvar(self, var_name): + def getvar(self, var_name: str) -> str: self.checkinit() init_iv = SHA256.new(str(uuid4())[:8].encode()).hexdigest() @@ -263,14 +240,15 @@ def getvar(self, var_name): if json["success"]: return json["response"] - else: - print(json["message"]) - time.sleep(5) - os._exit(1) + + print(json["message"]) + os._exit(1) - def setvar(self, var_name, var_data): + def setvar(self, var_name: str, var_data: str) -> bool: self.checkinit() + init_iv = SHA256.new(str(uuid4())[:8].encode()).hexdigest() + post_data = { "type": binascii.hexlify("setvar".encode()), "var": encryption.encrypt(var_name, self.enckey, init_iv), @@ -286,14 +264,16 @@ def setvar(self, var_name, var_data): if json["success"]: return True - else: - print(json["message"]) - time.sleep(5) - os._exit(1) + + print(json["message"]) + time.sleep(5) + os._exit(1) - def ban(self): + def ban(self) -> bool: self.checkinit() + init_iv = SHA256.new(str(uuid4())[:8].encode()).hexdigest() + post_data = { "type": binascii.hexlify("ban".encode()), "sessionid": binascii.hexlify(self.sessionid.encode()), @@ -301,18 +281,19 @@ def ban(self): "ownerid": binascii.hexlify(self.ownerid.encode()), "init_iv": init_iv } + response = self.__do_request(post_data) response = encryption.decrypt(response, self.enckey, init_iv) json = jsond.loads(response) if json["success"]: return True - else: - print(json["message"]) - time.sleep(5) - os._exit(1) - def file(self, fileid): + print(json["message"]) + time.sleep(5) + os._exit(1) + + def file(self, fileid: str) -> str: self.checkinit() init_iv = SHA256.new(str(uuid4())[:8].encode()).hexdigest() @@ -333,11 +314,11 @@ def file(self, fileid): if not json["success"]: print(json["message"]) - time.sleep(5) - os._exit(1) + exit(1) + return binascii.unhexlify(json["contents"]) - def webhook(self, webid, param, body = "", conttype = ""): + def webhook(self, webid: str, param: str, body = "", conttype = "") -> str: self.checkinit() init_iv = SHA256.new(str(uuid4())[:8].encode()).hexdigest() @@ -360,12 +341,11 @@ def webhook(self, webid, param, body = "", conttype = ""): if json["success"]: return json["message"] - else: - print(json["message"]) - time.sleep(5) - os._exit(1) + + print(json["message"]) + exit(1) - def check(self): + def check(self) -> bool: self.checkinit() init_iv = SHA256.new(str(uuid4())[:8].encode()).hexdigest() post_data = { @@ -381,10 +361,10 @@ def check(self): json = jsond.loads(response) if json["success"]: return True - else: - return False + + return False - def checkblacklist(self): + def checkblacklist(self) -> bool: self.checkinit() hwid = others.get_hwid() init_iv = SHA256.new(str(uuid4())[:8].encode()).hexdigest() @@ -402,10 +382,10 @@ def checkblacklist(self): json = jsond.loads(response) if json["success"]: return True - else: - return False + + return False - def log(self, message): + def log(self, message: str) -> None: self.checkinit() init_iv = SHA256.new(str(uuid4())[:8].encode()).hexdigest() @@ -439,14 +419,11 @@ def fetchOnline(self): json = jsond.loads(response) if json["success"]: - if len(json["users"]) == 0: - return None # THIS IS ISSUE ON KEYAUTH SERVER SIDE 6.8.2022, so it will return none if it is not an array. - else: - return json["users"] - else: - return None + if len(json["users"]) != 0: return json["users"] - def chatGet(self, channel): + return None + + def chatGet(self, channel: str) -> str|None: self.checkinit() init_iv = SHA256.new(str(uuid4())[:8].encode()).hexdigest() @@ -464,12 +441,11 @@ def chatGet(self, channel): json = jsond.loads(response) - if json["success"]: - return json["messages"] - else: - return None + if json["success"]: return json["messages"] + + return None - def chatSend(self, message, channel): + def chatSend(self, message: str, channel: str) -> bool: self.checkinit() init_iv = SHA256.new(str(uuid4())[:8].encode()).hexdigest() @@ -490,16 +466,15 @@ def chatSend(self, message, channel): if json["success"]: return True - else: - return False + + return False def checkinit(self): if not self.initialized: print("Initialize first, in order to use the functions") - time.sleep(2) - os._exit(1) + exit(0) - def changeUsername(self, username): + def changeUsername(self, username: str) -> None: self.checkinit() init_iv = SHA256.new(str(uuid4())[:8].encode()).hexdigest() post_data = { @@ -517,25 +492,35 @@ def changeUsername(self, username): if json["success"]: print("successfully Changed Username") - else: - print(json["message"]) - os._exit(1) + + print(json["message"]) + exitt(0) - def __do_request(self, post_data): + def __do_request(self, post_data) -> str: try: - rq_out = s.post( - "https://keyauth.win/api/1.0/", data=post_data, timeout=30 - ) + rq_out = s.post("https://keyauth.win/api/1.0/", data=post_data, timeout=30) return rq_out.text except requests.exceptions.Timeout: print("Request timed out") + return "" class application_data_class: - numUsers = numKeys = app_ver = customer_panel = onlineUsers = "" + numUsers: str + numKeys: str + app_ver: str + customer_panel: str + onlineUsers: str # region user_data class user_data_class: - username = ip = hwid = expires = createdate = lastlogin = subscription = subscriptions = "" + username: str + ip: str + hwid: str + expires: str + createdate: str + lastlogin: str + subscription: str + subscriptions: str user_data = user_data_class() app_data = application_data_class() @@ -580,7 +565,7 @@ def get_hwid(): class encryption: @staticmethod - def encrypt_string(plain_text, key, iv): + def encrypt_string(plain_text: str, key: str, iv: str) -> str: plain_text = pad(plain_text, 16) aes_instance = AES.new(key, AES.MODE_CBC, iv) @@ -590,7 +575,7 @@ def encrypt_string(plain_text, key, iv): return binascii.hexlify(raw_out) @staticmethod - def decrypt_string(cipher_text, key, iv): + def decrypt_string(cipher_text: str, key: str, iv: str) -> str: cipher_text = binascii.unhexlify(cipher_text) aes_instance = AES.new(key, AES.MODE_CBC, iv) @@ -600,7 +585,7 @@ def decrypt_string(cipher_text, key, iv): return unpad(cipher_text, 16) @staticmethod - def encrypt(message, enc_key, iv): + def encrypt(message: str, enc_key: str, iv: str) -> str: try: _key = SHA256.new(enc_key.encode()).hexdigest()[:32] @@ -612,7 +597,7 @@ def encrypt(message, enc_key, iv): os._exit(1) @staticmethod - def decrypt(message, enc_key, iv): + def decrypt(message: str, enc_key: str, iv: str) -> str: try: _key = SHA256.new(enc_key.encode()).hexdigest()[:32] From 08683a392b07982ca2e6f3aa03158bf3cbd70204 Mon Sep 17 00:00:00 2001 From: voidptr2x Date: Wed, 15 Feb 2023 18:09:25 -0800 Subject: [PATCH 2/2] Replaced useless code --- keyauth.py | 73 +++++++++++++++++++----------------------------------- 1 file changed, 25 insertions(+), 48 deletions(-) diff --git a/keyauth.py b/keyauth.py index e276093..970cfd4 100644 --- a/keyauth.py +++ b/keyauth.py @@ -2,31 +2,28 @@ from uuid import uuid4 # gen random guid try: - if os.name == 'nt': - import win32security # get sid (WIN only) + if os.name == 'nt': import win32security # get sid (WIN only) import requests # https requests from Crypto.Cipher import AES from Crypto.Hash import SHA256 from Crypto.Util.Padding import pad, unpad except ModuleNotFoundError: print("Exception when importing modules, Installing necessary modules....") + if os.path.isfile("requirements.txt"): + print("[+] Unable to find requirements file, Installing modules manually....") os.system("pip install -r requirements.txt") - else: - os.system("pip install pywin32; pip install pycryptodome; pip install requests") - print("Modules installed!") - exit(1) + sys.exit("Modules installed!") + + os.system("pip install pywin32; pip install pycryptodome; pip install requests") + sys.exit("Modules installed!") try: # Connection check requests.get('https://google.com') -except requests.exceptions.RequestException as e: - print(e) - exit(1) - -msg_exit = lambda a: print(a) - +except: + sys.exit("[!] You have no internet connectivity....") -class api: +class KeyAuth: name: str ownerid: str secret: str @@ -72,12 +69,9 @@ def init(self): os.system(f"start {download_link}") os._exit(1) else: - print("Invalid Version, Contact owner to add download link to latest app version") - os._exit(1) + sys.exit("Invalid Version, Contact owner to add download link to latest app version") - if not json["success"]: - print(json["message"]) - os._exit(1) + if not json["success"]: sys.exit(json["message"]) self.sessionid = json["sessionid"] self.initialized = True @@ -111,8 +105,7 @@ def register(self, user, password, license, hwid=None): self.__load_user_data(json["info"]) return - print(json["message"]) - os._exit(1) + sys.exit(json["message"]) def upgrade(self, user: str, license: str) -> None: self.checkinit() @@ -133,12 +126,9 @@ def upgrade(self, user: str, license: str) -> None: json = jsond.loads(response) if json["success"]: - print("successfully upgraded user") - print("please restart program and login") - os._exit(1) + sys.exit("successfully upgraded user, please restart program and login") - print(json["message"]) - os._exit(1) + sys.exit(json["message"]) def login(self, user, password: str, hwid=None): self.checkinit() @@ -166,8 +156,7 @@ def login(self, user, password: str, hwid=None): print("successfully logged in") return - print(json["message"]) - os._exit(1) + sys.exit(json["message"]) def license(self, key: str, hwid=None) -> None: self.checkinit() @@ -195,8 +184,7 @@ def license(self, key: str, hwid=None) -> None: print("successfully logged into license") return - print(json["message"]) - os._exit(1) + sys.exit(json["message"]) def var(self, name: str) -> str: self.checkinit() @@ -218,9 +206,7 @@ def var(self, name: str) -> str: if json["success"]: return json["message"] - print(json["message"]) - time.sleep(5) - os._exit(1) + sys.exit(json["message"]) def getvar(self, var_name: str) -> str: self.checkinit() @@ -241,8 +227,7 @@ def getvar(self, var_name: str) -> str: if json["success"]: return json["response"] - print(json["message"]) - os._exit(1) + sys.exit(json["message"]) def setvar(self, var_name: str, var_data: str) -> bool: self.checkinit() @@ -265,9 +250,7 @@ def setvar(self, var_name: str, var_data: str) -> bool: if json["success"]: return True - print(json["message"]) - time.sleep(5) - os._exit(1) + sys.exit(json["message"]) def ban(self) -> bool: self.checkinit() @@ -289,9 +272,7 @@ def ban(self) -> bool: if json["success"]: return True - print(json["message"]) - time.sleep(5) - os._exit(1) + sys.exit(json["message"]) def file(self, fileid: str) -> str: self.checkinit() @@ -313,8 +294,7 @@ def file(self, fileid: str) -> str: json = jsond.loads(response) if not json["success"]: - print(json["message"]) - exit(1) + sys.exit(json["message"]) return binascii.unhexlify(json["contents"]) @@ -342,8 +322,7 @@ def webhook(self, webid: str, param: str, body = "", conttype = "") -> str: if json["success"]: return json["message"] - print(json["message"]) - exit(1) + sys.exit(json["message"]) def check(self) -> bool: self.checkinit() @@ -493,8 +472,7 @@ def changeUsername(self, username: str) -> None: if json["success"]: print("successfully Changed Username") - print(json["message"]) - exitt(0) + sys.exit(json["message"]) def __do_request(self, post_data) -> str: try: @@ -605,5 +583,4 @@ def decrypt(message: str, enc_key: str, iv: str) -> str: return encryption.decrypt_string(message.encode(), _key.encode(), _iv.encode()).decode() except: - print("Invalid Application Information. Long text is secret short text is ownerid. Name is supposed to be app name not username") - os._exit(1) + sys.exit("Invalid Application Information. Long text is secret short text is ownerid. Name is supposed to be app name not username")