forked from raBOTyagi-hse/movie-tg-bot
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
13 changed files
with
606 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,27 @@ | ||
from configobj import ConfigObj | ||
|
||
config = ConfigObj() | ||
config.filename = "config_example.cfg" | ||
config['APIs'] = { | ||
'bot_nick': "@coolbot", | ||
'telegram_chatlink': 'https://api.telegram.org/bot', | ||
'telegram_api': "000000000:api_key" | ||
} | ||
config['admins_ids'] = [ | ||
"admin_id0", | ||
"admin_id1" | ||
] | ||
config['included_workers'] = [ | ||
#"Blacklist", | ||
"Stop", | ||
"Humanity", | ||
"Info", | ||
"Example" | ||
] | ||
config['included_service_workers'] = [ | ||
#"Autoquit" | ||
] | ||
config["user_inactivity_time_at_minutes"] = 5 | ||
config.initial_comment = ["Change values which you want, rename file to \"config.cfg\" and delete this string!"] | ||
config.final_comment = [""] | ||
config.write() |
Empty file.
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,260 @@ | ||
# -*- coding: utf-8 -*- | ||
|
||
import requests | ||
import json | ||
from core.workers.service_workers import WorkersList as ServiceWorkersList | ||
|
||
from telegram.ext import Updater | ||
|
||
__author__ = 'NickVeld' | ||
|
||
|
||
class API: | ||
def __init__(self): | ||
self.telegram = Tg_api() | ||
self.additional = Additional() | ||
self.offset = 0 | ||
|
||
self.admin_ids = list() | ||
self.BOT_NICK = "" | ||
|
||
self.workers_list = None | ||
|
||
def get_from_config(self, cfg): | ||
self.admin_ids = list(cfg['admins_ids']) | ||
|
||
self.BOT_NICK = cfg['APIs']['bot_nick'] | ||
|
||
self.telegram.get_from_config(cfg['APIs']) | ||
self.additional.get_from_config(cfg['APIs']) | ||
|
||
self.service_workers_list = ServiceWorkersList.get_workers(ServiceWorkersList | ||
, cfg["included_service_workers"], self) | ||
|
||
def get(self, toffset=0): | ||
return self.telegram.get(toffset) | ||
|
||
def get_msg(self): | ||
while True: | ||
for worker in self.service_workers_list: | ||
if worker.is_it_for_me(): | ||
cmd = worker.run() | ||
if cmd != 1: | ||
break | ||
new_msgs = self.get(self.offset) | ||
if new_msgs is None: | ||
continue | ||
|
||
for msg in new_msgs['result']: | ||
self.offset = msg['update_id'] | ||
yield msg | ||
|
||
def send(self, message, chat_id, reply_to_message_id=0, keyboard=None): | ||
self.telegram.send(message, chat_id, reply_to_message_id) | ||
return message | ||
|
||
def send_with_id(self, message, chat_id, reply_to_message_id=0, keyboard=None): | ||
return self.telegram.send(message, chat_id, reply_to_message_id) | ||
|
||
def send_inline_keyboard(self, message, chat_id, inline_keyboard, reply_to_message_id=0, image_url = ""): | ||
self.telegram.send_inline_keyboard(message, chat_id, inline_keyboard, reply_to_message_id, image_url) | ||
return message | ||
|
||
def send_inline_keyboard_with_id(self, message, chat_id, inline_keyboard, reply_to_message_id=0): | ||
return self.telegram.send_inline_keyboard(message, chat_id, inline_keyboard, reply_to_message_id) | ||
|
||
def edit(self, message, chat_id, inline_keyboard, message_id): | ||
return self.telegram.edit(message, chat_id, inline_keyboard, message_id) | ||
|
||
def get_inline_text_keyboard(self, source): | ||
return self.telegram.get_inline_text_keyboard(source) | ||
|
||
|
||
class Tg_api: | ||
|
||
def __init__(self): | ||
pass | ||
|
||
def get_from_config(self, cfg): | ||
self.CHAT_LINK = cfg['telegram_chatlink'] | ||
self.API_KEY = cfg['telegram_api'] | ||
|
||
def get(self, toffset=0, timeout=29): | ||
method = 'getUpdates' | ||
params = { | ||
'offset': toffset + 1, | ||
'timeout': timeout | ||
} | ||
try: | ||
req = requests.request( | ||
'POST', | ||
'{link}{api_key}/{method}'.format( | ||
link=self.CHAT_LINK, | ||
api_key=self.API_KEY, | ||
method=method | ||
), | ||
params=params, | ||
timeout=timeout+1 | ||
) | ||
if req.text is "": | ||
return None | ||
|
||
new_msgs = json.loads(req.text) | ||
if new_msgs['ok'] and (len(new_msgs['result']) != 0): | ||
return new_msgs | ||
except requests.exceptions.Timeout: | ||
print("Timeout in get()!") | ||
except Exception as ex: | ||
print("Error in get()!") | ||
print(type(ex), ex.__str__()) | ||
return None | ||
|
||
def send(self, message, chat_id, reply_to_message_id=0): | ||
method = 'sendMessage' | ||
params = { | ||
'chat_id': chat_id, | ||
'disable_web_page_preview': True, | ||
'text': message | ||
} | ||
if reply_to_message_id: | ||
params['reply_to_message_id'] = reply_to_message_id | ||
try: | ||
req = requests.request( | ||
'POST', | ||
'{link}{api_key}/{method}'.format( | ||
link = self.CHAT_LINK, | ||
api_key=self.API_KEY, | ||
method=method | ||
), | ||
params=params, | ||
timeout=30 | ||
) | ||
sended = json.loads(req.text) | ||
if sended['ok']: | ||
return sended['result']['message_id'] | ||
except requests.exceptions.Timeout: | ||
print("Timeout in send()!") | ||
except Exception as ex: | ||
print("Error in send()!") | ||
print(type(ex), ex.__str__()) | ||
return 0 | ||
|
||
def get_reply_keyboard(self, source): | ||
return [["/{}".format(c) for c in s_in.split('\t')] for s_in in source.split('\n')] | ||
|
||
def send_reply_keyboard(self, message, chat_id, keyboard, reply_to_message_id=0): | ||
method = 'sendMessage' | ||
params = { | ||
'chat_id': chat_id, | ||
'disable_web_page_preview': True, | ||
'text': message | ||
} | ||
if reply_to_message_id: | ||
params['reply_to_message_id'] = reply_to_message_id | ||
if keyboard != None: | ||
params["reply_markup"] = json.dumps({ | ||
"keyboard": keyboard, | ||
"resize_keyboard": True, | ||
"one_time_keyboard": True, | ||
"selective": True | ||
}) | ||
try: | ||
req = requests.request( | ||
'POST', | ||
'{link}{api_key}/{method}'.format( | ||
link=self.CHAT_LINK, | ||
api_key=self.API_KEY, | ||
method=method | ||
), | ||
params=params, | ||
timeout=30 | ||
) | ||
sended = json.loads(req.text) | ||
if sended['ok']: | ||
return sended['result']['message_id'] | ||
except requests.exceptions.Timeout: | ||
print("Timeout in send_reply_keyboard(...)!") | ||
except Exception as ex: | ||
print("Error in send_reply_keyboard(...)!") | ||
print(type(ex), ex.__str__()) | ||
return 0 | ||
|
||
def get_inline_text_keyboard(self, source): | ||
return list([[{'text': c, | ||
'callback_data': "/{}".format(c)} for c in s_in.split('\t')] for s_in in source.split('\n')]) | ||
|
||
def send_inline_keyboard(self, message, chat_id, inline_keyboard, reply_to_message_id=0, image_url = ""): | ||
params = {'chat_id': chat_id} | ||
if (image_url == ""): | ||
method = 'sendMessage' | ||
params['disable_web_page_preview'] = True | ||
params['text'] = message | ||
else: | ||
method = 'sendPhoto' | ||
params['caption'] = message | ||
params['photo'] = image_url | ||
|
||
if reply_to_message_id: | ||
params['reply_to_message_id'] = reply_to_message_id | ||
if inline_keyboard != None: | ||
params["reply_markup"] = json.dumps({ | ||
"inline_keyboard": inline_keyboard | ||
}) | ||
try: | ||
req = requests.request( | ||
'POST', | ||
'{link}{api_key}/{method}'.format( | ||
link=self.CHAT_LINK, | ||
api_key=self.API_KEY, | ||
method=method | ||
), | ||
params=params, | ||
timeout=30 | ||
) | ||
sended = json.loads(req.text) | ||
if sended['ok']: | ||
return sended['result']['message_id'] | ||
except requests.exceptions.Timeout: | ||
print("Timeout in send_inline_keyboard(...)!") | ||
except Exception as ex: | ||
print("Error in send_inline_keyboard(...)!") | ||
print(type(ex), ex.__str__()) | ||
return 0 | ||
|
||
def edit(self, message, chat_id, inline_keyboard, message_id): | ||
method = 'editMessageText' | ||
params = { | ||
'chat_id': chat_id, | ||
'message_id': message_id, | ||
'disable_web_page_preview': True, | ||
'text': message | ||
} | ||
if inline_keyboard != None: | ||
params["reply_markup"] = json.dumps({ | ||
"inline_keyboard": inline_keyboard | ||
}) | ||
try: | ||
req = requests.request( | ||
'POST', | ||
'{link}{api_key}/{method}'.format( | ||
link=self.CHAT_LINK, | ||
api_key=self.API_KEY, | ||
method=method | ||
), | ||
params=params, | ||
timeout=30 | ||
) | ||
except requests.exceptions.Timeout: | ||
print("Timeout in edit(...)!") | ||
except Exception as ex: | ||
print("Error in edit(...)!") | ||
print(type(ex), ex.__str__()) | ||
return message | ||
|
||
#TODO: Use it if you need more APIs | ||
class Additional: | ||
def __init__(self): | ||
pass | ||
|
||
def get_from_config(self, cfg): | ||
pass |
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,7 @@ | ||
def autoquit_run(tapi): | ||
for user in tapi.db_shell.get_ready_for_autoquit(): | ||
pers_id = int(user['pers_id']) | ||
for worker in tapi.workers_list: | ||
worker.quit(pers_id, pers_id, "You was inactive for a long time.\n") | ||
|
||
#TODO: It is just an example of the autoquit system. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,39 @@ | ||
from core.services.msg_module import Msg | ||
|
||
|
||
class BotCycle: | ||
|
||
def __init__(self, tapi, workers_list): | ||
self.tapi = tapi | ||
self.workers_list = workers_list | ||
|
||
def run(self): | ||
is_running = True | ||
tmsg = None | ||
print("Guess who's back!") | ||
|
||
try: | ||
for msg in self.tapi.get_msg(): | ||
tmsg = Msg(msg, self.tapi.BOT_NICK) | ||
if (tmsg.msg == None) or tmsg.text.startswith("//"): | ||
continue | ||
if tmsg.text != "": | ||
print(tmsg.text) | ||
tmsg.textmod() | ||
try: | ||
# is_running = self.workers_list.run_list(tmsg) | ||
for worker in self.workers_list: | ||
if worker.is_it_for_me(tmsg): | ||
cmd = worker.run(tmsg) | ||
if cmd == 2: | ||
is_running = False | ||
if cmd != 1: | ||
break | ||
except UnicodeEncodeError: | ||
print(self.tapi.send("I do not like this language!", tmsg.chat_id)) | ||
if not is_running: | ||
break | ||
except Exception as ex: | ||
print(type(ex), ex.__str__()) | ||
|
||
# self.tapi.get(offset, timeout=1) |
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,61 @@ | ||
class Msg(): | ||
def __init__(self, msg, bot_nick): | ||
self.BOT_NICK = bot_nick | ||
if "callback_query" in msg: | ||
self.msg = msg['callback_query']['message'] | ||
self.from_data = msg['callback_query']['from'] | ||
self.inline_data = msg['callback_query']['data'] | ||
elif "message" in msg: | ||
self.msg = msg['message'] | ||
self.from_data = msg['message']['from'] | ||
self.inline_data = None | ||
|
||
@property | ||
def id(self): | ||
return self.msg['message_id'] | ||
|
||
@property | ||
def chat_id(self): | ||
return self.msg['chat']['id'] | ||
|
||
@property | ||
def pers_id(self): | ||
return self.from_data['id'] | ||
|
||
@property | ||
def name(self): | ||
return self.from_data.get('first_name', 'Anonymous') | ||
|
||
@property | ||
def surname(self): | ||
return self.from_data.get('last_name', 'Anonymous') | ||
|
||
@property | ||
def text(self): | ||
if self.inline_data == None: | ||
return self.msg.get('text', '') | ||
else: | ||
return self.inline_data | ||
|
||
@property | ||
def text_of_inline_root(self): | ||
return self.msg.get('text', '') | ||
|
||
@property | ||
def is_inline(self): | ||
return self.inline_data != None | ||
|
||
def text_change_to(self, new_value): | ||
if self.inline_data == None: | ||
self.msg['text'] = new_value | ||
else: | ||
self.inline_data = new_value | ||
|
||
def text_replace(self, template, new_value, func=None): | ||
if self.inline_data == None: | ||
strk = self.msg['text'] | ||
self.msg['text'] = func(template, new_value, strk) | ||
|
||
def textmod(self): | ||
if self.inline_data == None: | ||
self.msg['text'] = self.msg['text'].strip().replace(self.BOT_NICK, "") |
Empty file.
Oops, something went wrong.