forked from VJBots/VJ-FILTER-BOT
-
Notifications
You must be signed in to change notification settings - Fork 8
/
Copy pathconnection.py
148 lines (133 loc) · 5.64 KB
/
connection.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
# Don't Remove Credit @VJ_Botz
# Subscribe YouTube Channel For Amazing Bot @Tech_VJ
# Ask Doubt on telegram @KingVJ01
import logging
from info import ADMINS
from pyrogram import filters, Client, enums
from pyrogram.types import InlineKeyboardButton, InlineKeyboardMarkup
from database.connections_mdb import add_connection, all_connections, if_active, delete_connection
logger = logging.getLogger(__name__)
logger.setLevel(logging.ERROR)
@Client.on_message((filters.private | filters.group) & filters.command('connect'))
async def addconnection(client, message):
userid = message.from_user.id if message.from_user else None
if not userid:
return await message.reply(f"You are anonymous admin. Use /connect {message.chat.id} in PM")
chat_type = message.chat.type
if chat_type == enums.ChatType.PRIVATE:
try:
cmd, group_id = message.text.split(" ", 1)
except:
await message.reply_text(
"<b>Enter in correct format!</b>\n\n"
"<code>/connect groupid</code>\n\n"
"<i>Get your Group id by adding this bot to your group and use <code>/id</code></i>",
quote=True
)
return
elif chat_type in [enums.ChatType.GROUP, enums.ChatType.SUPERGROUP]:
group_id = message.chat.id
try:
st = await client.get_chat_member(group_id, userid)
if (
st.status != enums.ChatMemberStatus.ADMINISTRATOR
and st.status != enums.ChatMemberStatus.OWNER
and userid not in ADMINS
):
await message.reply_text("You should be an admin in Given group!", quote=True)
return
except Exception as e:
logger.exception(e)
await message.reply_text(
"Invalid Group ID!\n\nIf correct, Make sure I'm present in your group!!",
quote=True,
)
return
try:
st = await client.get_chat_member(group_id, "me")
if st.status == enums.ChatMemberStatus.ADMINISTRATOR:
ttl = await client.get_chat(group_id)
title = ttl.title
addcon = await add_connection(str(group_id), str(userid))
if addcon:
await message.reply_text(
f"Successfully connected to **{title}**\nNow manage your group from my pm !",
quote=True,
parse_mode=enums.ParseMode.MARKDOWN
)
if chat_type in [enums.ChatType.GROUP, enums.ChatType.SUPERGROUP]:
await client.send_message(
userid,
f"Connected to **{title}** !",
parse_mode=enums.ParseMode.MARKDOWN
)
else:
await message.reply_text(
"You're already connected to this chat!",
quote=True
)
else:
await message.reply_text("Add me as an admin in group", quote=True)
except Exception as e:
logger.exception(e)
await message.reply_text('Some error occurred! Try again later.', quote=True)
return
@Client.on_message((filters.private | filters.group) & filters.command('disconnect'))
async def deleteconnection(client, message):
userid = message.from_user.id if message.from_user else None
if not userid:
return await message.reply(f"You are anonymous admin. Use /connect {message.chat.id} in PM")
chat_type = message.chat.type
if chat_type == enums.ChatType.PRIVATE:
await message.reply_text("Run /connections to view or disconnect from groups!", quote=True)
elif chat_type in [enums.ChatType.GROUP, enums.ChatType.SUPERGROUP]:
group_id = message.chat.id
st = await client.get_chat_member(group_id, userid)
if (
st.status != enums.ChatMemberStatus.ADMINISTRATOR
and st.status != enums.ChatMemberStatus.OWNER
and str(userid) not in ADMINS
):
return
delcon = await delete_connection(str(userid), str(group_id))
if delcon:
await message.reply_text("Successfully disconnected from this chat", quote=True)
else:
await message.reply_text("This chat isn't connected to me!\nDo /connect to connect.", quote=True)
@Client.on_message(filters.private & filters.command(["connections"]))
async def connections(client, message):
userid = message.from_user.id
groupids = await all_connections(str(userid))
if groupids is None:
await message.reply_text(
"There are no active connections!! Connect to some groups first.",
quote=True
)
return
buttons = []
for groupid in groupids:
try:
ttl = await client.get_chat(int(groupid))
title = ttl.title
active = await if_active(str(userid), str(groupid))
act = " - ACTIVE" if active else ""
buttons.append(
[
InlineKeyboardButton(
text=f"{title}{act}", callback_data=f"groupcb:{groupid}:{act}"
)
]
)
except:
pass
if buttons:
await message.reply_text(
"Your connected group details ;\n\n",
reply_markup=InlineKeyboardMarkup(buttons),
quote=True
)
else:
await message.reply_text(
"There are no active connections!! Connect to some groups first.",
quote=True
)