forked from mrismanaziz/File-Sharing-Man
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathhelper_func.py
127 lines (106 loc) · 3.81 KB
/
helper_func.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
# (©)Codexbotz
# Recode by @mrismanaziz
# t.me/SharingUserbot & t.me/Lunatic0de
import asyncio
import base64
import re
from pyrogram import filters
from pyrogram.errors import FloodWait
from pyrogram.errors.exceptions.bad_request_400 import UserNotParticipant
from config import ADMINS, FORCE_SUB_CHANNEL, FORCE_SUB_GROUP
async def subschannel(filter, client, update):
if not FORCE_SUB_CHANNEL:
return True
user_id = update.from_user.id
if user_id in ADMINS:
return True
try:
member = await client.get_chat_member(
chat_id=FORCE_SUB_CHANNEL, user_id=user_id
)
except UserNotParticipant:
return False
return member.status in ["creator", "administrator", "member"]
async def subsgroup(filter, client, update):
if not FORCE_SUB_GROUP:
return True
user_id = update.from_user.id
if user_id in ADMINS:
return True
try:
member = await client.get_chat_member(chat_id=FORCE_SUB_GROUP, user_id=user_id)
except UserNotParticipant:
return False
return member.status in ["creator", "administrator", "member"]
async def is_subscribed(filter, client, update):
if not FORCE_SUB_CHANNEL:
return True
if not FORCE_SUB_GROUP:
return True
user_id = update.from_user.id
if user_id in ADMINS:
return True
try:
member = await client.get_chat_member(chat_id=FORCE_SUB_GROUP, user_id=user_id)
except UserNotParticipant:
return False
try:
member = await client.get_chat_member(
chat_id=FORCE_SUB_CHANNEL, user_id=user_id
)
except UserNotParticipant:
return False
return member.status in ["creator", "administrator", "member"]
async def encode(string):
string_bytes = string.encode("ascii")
base64_bytes = base64.urlsafe_b64encode(string_bytes)
base64_string = (base64_bytes.decode("ascii")).strip("=")
return base64_string
async def decode(base64_string):
base64_string = base64_string.strip("=") # links generated before this commit will be having = sign, hence striping them to handle padding errors.
base64_bytes = (base64_string + "=" * (-len(base64_string) % 4)).encode("ascii")
string_bytes = base64.urlsafe_b64decode(base64_bytes)
string = string_bytes.decode("ascii")
return string
async def get_messages(client, message_ids):
messages = []
total_messages = 0
while total_messages != len(message_ids):
temb_ids = message_ids[total_messages : total_messages + 200]
try:
msgs = await client.get_messages(
chat_id=client.db_channel.id, message_ids=temb_ids
)
except FloodWait as e:
await asyncio.sleep(e.x)
msgs = await client.get_messages(
chat_id=client.db_channel.id, message_ids=temb_ids
)
except BaseException:
pass
total_messages += len(temb_ids)
messages.extend(msgs)
return messages
async def get_message_id(client, message):
if (
message.forward_from_chat
and message.forward_from_chat.id == client.db_channel.id
):
return message.forward_from_message_id
elif message.forward_from_chat or message.forward_sender_name or not message.text:
return 0
else:
pattern = "https://t.me/(?:c/)?(.*)/(\\d+)"
matches = re.match(pattern, message.text)
if not matches:
return 0
channel_id = matches.group(1)
msg_id = int(matches.group(2))
if channel_id.isdigit():
if f"-100{channel_id}" == str(client.db_channel.id):
return msg_id
elif channel_id == client.db_channel.username:
return msg_id
subsgc = filters.create(subsgroup)
subsch = filters.create(subschannel)
subsall = filters.create(is_subscribed)