forked from ezz-gg/gdrive-api-verify
-
Notifications
You must be signed in to change notification settings - Fork 0
/
utils.py
169 lines (165 loc) · 8.15 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
from pydrive.drive import GoogleDrive
from pydrive.auth import GoogleAuth
from oauth2client.service_account import ServiceAccountCredentials
import asyncio, json, re
from datetime import datetime
gauth = GoogleAuth()
scope = ["https://www.googleapis.com/auth/drive"]
gauth.credentials = ServiceAccountCredentials.from_json_keyfile_name("data/credentials.json", scope)
drive = GoogleDrive(gauth)
class FileManager:
def __init__(self, data, backup):
match = r"https://drive.google.com/file/d/([a-zA-Z0-9-]+)/"
self.data_id = re.match(match, data).group(1)
self.backup_id = re.match(match, backup).group(1)
self.upload = False
def save(self, datas):
data = json.dumps(datas)
open("data/data.json", 'w').write(data)
print("[+] アップロードを実行します、Botを停止しないでください。")
file = drive.CreateFile({'id': self.data_id})
if not file:
print("[!] URLが無効かファイルが存在しません")
return
else:
file.SetContentString(data)
file.Upload()
self.backup(data)
print("[+] 完了しました")
def backup(self, data):
file = drive.CreateFile({'id': self.backup_id})
file.SetContentString(data)
file.Upload()
def load_file(self):
print("[+] ファイルをGoogleドライブから読み込んでいます")
f = drive.CreateFile({'id': self.data_id})
data = f.GetContentString()
print("[+] 読み込みました")
if not data:
self.load_backup()
try:
json.loads(data)
open("data/data.json", "w").write(data)
except:
self.load_backup()
def load_backup(self):
print("[!] ファイルの中身がない、または破損しているためバックアップを読み込んでいます")
f = drive.CreateFile({'id': self.backup_id})
data = f.GetContentString()
print("[+] バックアップを読み込みました")
if not data:
raise Exception
try:
json.loads(data)
open("data/data.json", "w").write(data)
except:
raise Exception
class utils:
def __init__(self, token, client_id, client_secret, redirect_uri):
self.token = token
self.client_id = client_id
self.client_secret = client_secret
self.redirect_uri = redirect_uri
self.pause = False
async def update_token(self, session, data):
for user in data["users"]:
if datetime.utcnow().timestamp() - data["users"][user]["last_update"] >= 300000:
payload = {"client_id": self.client_id, "client_secret": self.client_secret, "grant_type": "refresh_token", "refresh_token": user["refresh_token"]}
while self.pause:
await asyncio.sleep(1)
while True:
temp = await session.post("https://discordapp.com/api/oauth2/token", data=payload,headers={"Content-Type": "application/x-www-form-urlencoded"})
data = await temp.json()
if 'message' in data:
if data['message'] == 'You are being rate limited.':
print("[!] Rate Limited. Sleeping {}s".format(data["retry_after"]))
await asyncio.sleep(data["retry_after"])
else:
break
userdata["data"][user] = data
userdata["last_update"] = datetime.utcnow().timestamp()
return data
async def get_token(self, session, code):
while True:
temp = await session.post("https://discordapp.com/api/oauth2/token",
data={
"client_id": self.client_id,
"client_secret": self.client_secret,
"redirect_uri": self.redirect_uri,
"code": code,
"grant_type": "authorization_code"},
headers={"content-type": "application/x-www-form-urlencoded"})
data = await temp.json()
if 'message' in data:
if data['message'] == 'You are being rate limited.':
print("[!] Rate Limited. Sleeping {}s".format(data["retry_after"]))
await asyncio.sleep(data["retry_after"])
else:
return data
async def get_user(self, session, access_token):
while True:
temp = await session.get("https://discordapp.com/api/users/@me", headers={"Authorization":"Bearer {}".format(access_token)})
data = await temp.json()
if 'message' in data:
if data['message'] == 'You are being rate limited.':
print("[!] Rate Limited. Sleeping {}s".format(data["retry_after"]))
await asyncio.sleep(data["retry_after"])
else:
return data
async def add_role(self, session, guild_id, user_id, role_id):
while True:
temp = await session.put(
"https://discord.com/api/v9/guilds/{}/members/{}/roles/{}".format(guild_id, user_id, role_id),
headers={"authorization": f"Bot {self.token}"})
try:
data = await temp.json()
if 'message' in data:
if data['message'] == 'You are being rate limited.':
print("[!] Rate Limited. Sleeping {}s".format(data["retry_after"]))
await asyncio.sleep(data["retry_after"])
else:
return "Unknown Error"
else:
return "Already Added"
except:
return "Success"
async def join_guild(self, session, access_token, guild_id, user_id):
while True:
temp = await session.put("https://discord.com/api/v9/guilds/{}/members/{}".format(guild_id, user_id),
headers={"Content-Type": "application/json", "Authorization": f"Bot {self.token}"},
json={"access_token": access_token})
try:
data = await temp.json()
if 'message' in data:
if data['message'] == 'You are being rate limited.':
print("[!] Rate Limited. Sleeping {}s".format(data["retry_after"]))
await asyncio.sleep(data["retry_after"])
else:
return "Unknown Error"
else:
return "Already Joined"
except:
return "Success"
async def send_direct_message(self, session, user_id, content):
while True:
temp = await session.post("https://discord.com/api/users/@me/channels",
headers={"Authorization": f"Bot {self.token}"},
json={"recipient_id": user_id})
data = await temp.json()
if 'message' in data:
if data['message'] == 'You are being rate limited.':
print("[!] Rate Limited. Sleeping {}s".format(data["retry_after"]))
await asyncio.sleep(data["retry_after"])
else:
break
dmid = data["id"]
while True:
temp = await session.post(f"https://discord.com/api/channels/{dmid}/messages",
headers={"Authorization": f"Bot {self.token}"}, json={"content": "", "embeds": [{"title": content}]})
data = await temp.json()
if 'message' in data:
if data['message'] == 'You are being rate limited.':
print("[!] Rate Limited. Sleeping {}s".format(data["retry_after"]))
await asyncio.sleep(data["retry_after"])
else:
return data