
不,这不是关于如何用Python编写第一个Hello World机器人的数百篇文章之一。在这里,您将找不到有关如何在BotFather中获取API令牌或如何在云中启动机器人的详细说明。作为回报,我将向您展示如何最大程度地释放Python的全部功能,以实现最美观和最漂亮的代码。我们表演一首关于复杂结构魅力的歌曲-我们跳舞。在不同步的情况下,它拥有自己的保存系统,一堆有用的装饰器和许多漂亮的代码。
免责声明:具有大脑OOP和“正确”模式的拥护者的人可能会忽略本文。
理念
要了解现代社会不懂英语的感觉,请想象您是一个不懂法语的18世纪贵族。即使您不太了解历史,您仍然可以想象在这种情况下生活会有多困难。在现代世界中,英语已经成为一种必需品,而不是特权,尤其是在IT行业中。
: , , , . , , , , — .
, :
, . , , Telegram-.
, — . , . , , .
« , »
python-telegram-bot (ptb). loguru, . , ptb ( logging) . , , :
from loguru import logger
import sys
config = {
'handlers': [
{'sink': sys.stdout, 'level': 'INFO'},
{'sink': 'logs.log', 'serialize': False, 'level': 'DEBUG'},
],
}
logger.configure(**config)
updater = Updater('YOUR_TOKEN')
dp = updater.dispatcher
updater.logger = logger
dp.logger = logger
, -, — . . — , .
from __future__ import annotations
from loguru import logger
import inspect
import functools
import os
def file_is_empty(path: str) -> bool:
return os.stat(path).st_size == 0
def clear_file(path: str) -> None:
with open(path, 'w'): pass
def cache_decorator(method):
@functools.wraps(method)
def wrapper(self, *args, **kwargs):
res = method(self, *args, **kwargs)
Cache.link.recess(self, {'method_name': method.__name__})
logger.opt(lazy=True).debug(f'Decorator for {method.__name__} was end')
return res
return wrapper
class Cache:
"""
+ cache_size - ,
+ cache_files - ,
"""
link = None
def __init__(self, cache_size=10):
self._classes = []
self._cache_files = []
self.__class__.link = self
self._counter = 0
self.CACHE_SIZE = cache_size
def add(self, cls: class, file: str) -> None:
"""
+ cls -
+ file - ,
"""
self._cache_files.append(file)
self._classes.append(cls)
if file_is_empty(file): return None
logger.opt(lazy=True).debug(f'For {cls.__class__.__name__} file {file} is not empty')
for data in self.load(file):
cls.save_non_caching(data)
clear_file(file)
self._counter = 0
def recess(self, cls: class, data: dict) -> None:
"""
,
"""
if self._counter + 1 >= self.CACHE_SIZE:
self.save_all()
else:
self._counter += 1
filename = self._cache_files[self._classes.index(cls)]
self.save(data, filename=filename)
, , :
@cache_decorator
def add_smth_important(*args, **kwargs) -> Any:
, , : . — EnglishBot, : ptb, , , - . Telegram- , . , , , . / , :
from modules import EnglishBot
from modules.module import start
if __name__ == '__main__':
tbot = EnglishBot(
)
tbot.add_command_handler(start, 'start')
, .
ptb — update context, . context chat_data, . context.chat_data['data']
. - , context.data
. , .
from telegram.ext import CommandHandler
def a(self, key: str):
try:
return object.__getattribute__(self, key)
except:
return self.chat_data[key]
def b(self, key: str, data=None, replace=True):
if replace or not self.chat_data.get(key, None):
self.chat_data[key] = data
CallbackContext.__getattribute__ = a
CallbackContext.set = b
. , context.
def bind_context(func):
def wrapper(update, context):
context._bot.bind_user_data(update, context)
return func(update, context)
return wrapper
class EnglishBot:
def bind_user_data(self, update, context) -> dict:
context.set('t_id', update.message.chat_id, replace=False)
context.set('t_ln', update.message.from_user.language_code, replace=False)
context:
class EnglishBot:
def __init__(self, *args, **kwargs):
CallbackContext._bot = self
.
from EnglishBot import bind_context
@bind_context
def start(update, context):
if not context._bot.user_exist(context.t_id):
context.set('push_notification', True)
context._bot.new_user(context.t_id, context.t_ln)
return update.message.reply_text(' ')
—
, , . .
class EnglishBot:
def add_command_handler(self, func: function, name=None) -> None:
"""
,
"""
name = name or func.__name__
self.dp.add_handler(CommandHandler(name, func))
tbot.add_command_handler(start)
, . bind_context, wrapper. .
import functools
def bind_context(func):
@functools.wraps(func)
def wrapper(update, context):
context._bot.bind_user_data(update, context)
return func(update, context)
return wrapper
, . .
import functools
END = -1
def zero_exiter(func):
@functools.wraps(func)
def wrapper(update, context):
if update.to_dict()['message'].get('text', None) == '0':
update.message.reply_text(' - ')
return END
return func(update, context)
return wrapper
def skip_edited(func):
@functools.wraps(func)
def wrapper(update, context):
if not update.to_dict().get('edited_message', None):
return func(update, context)
return wrapper
— @run_async
, . .
from telegram.ext.dispatcher import run_async
from EnglishBot import skip_edited
@run_async
@skip_edited
def heavy_function(update, context):
, — , .
, . @logger.catch
, , , logger.
from loguru import logger
@logger.catch
def heavy_function2(update, context):
/ .
from EnglishBot import bind_context
from Cache import file_is_empty
from telegram import ReplyKeyboardMarkup
from loguru import logger
LOG_FILE = 'logs.log'
SENDING, MAIN, END = range(1, -2, -1)
buttons = ReplyKeyboardMarkup(
[(' ', 'logs'), (' ', 'clear')],
[(' ', 'send')],
)
@bind_context
def admin_panel(update, context):
if not context._bot.acess_check(context.t_id):
return update.message.reply_text(f' {update.message.text}')
update.message.reply_text(' :', reply_markup=buttons)
return MAIN
def get_logs(update, context):
if file_is_empty(LOG_FILE):
update.callback_query.reply_text(' ')
else:
context.bot.send_chat_action(chat_id=context.t_id, action='upload_document')
context.bot.sendDocument(chat_id=context.t_id, document=open(LOG_FILE, 'rb'), name=LOG_FILE, timeout=1000)
update.callback_query.answer(text='')
return MAIN
def logs_clear(update, context):
with open(LOG_FILE, 'w') as file:
update.callback_query.reply_text('')
update.callback_query.answer(text='')
return MAIN
def take_message(update, context):
update.callback_query.reply_text(' ')
update.callback_query.answer(text='')
return SENDING
@zero_exiter
def send_all(update, context):
count = 0
for id in list(context._bot.get_user_ids()):
try:
if id == context.t_id: continue
context.bot.send_message(context.t_id, text=update.message.text_markdown, parse_mode='Markdown')
count += 1
except:
pass
update.callback_query.reply_text(f' {count} ')
update.callback_query.answer(text='')
return MAIN
tbot.add_conversation_handler(
entry_points = [('admin', admin)],
states = [[(logs, '^logs$'), (logs_clear, '^clear$'), (send, '^send$')], [(send_alk, '@^((?!.*((^\/)+)).*)(.+)$')]]
)
add_conversation_handler :
class EnglishBot:
def add_conversation_handler(self, entry_points: list, states: list, fallbacks: list) -> None:
fallbacks = [CommandHandler(name, func) for name, func in fallbacks]
entry_points = [CommandHandler(name, func) for name, func in entry_points]
r_states = {}
for i in range(len(states)):
r_states[i] = []
for func, pattern in states[i]:
if pattern[0] == '@':
r_states[i].append(MessageHandler(Filters.regex(pattern[1:]), func))
else:
r_states[i].append(CallbackQueryHandler(func, pattern=pattern))
conv_handler = ConversationHandler(entry_points=entry_points, states=r_states, fallbacks=fallbacks)
dp.add_handler(conv_handler)
.
from EnglishBot import bind_context, skip_edited, zero_exiter
from youtube_transcript_api import YouTubeTranscriptApi
from telegram.ext.dispatcher import run_async
START_OVER, ADDING, END = range(1, -2, -1)
re_youtube = re.compile('^(http(s)?:\/\/)?((w){3}.)?youtu(be|.be)?(\.com)?\/.+')
re_text = re.compile('^[a-z]{3,20}$')
def is_youtube_link(link: str) -> bool:
if re_youtube.match(link) is not None: return True
def clear_text(text: str) -> str:
bad_symbols = '!@#%$^&*()_+1234567890-=/|\\?><.,":;`~[]{}'
for s in bad_symbols:
text = text.replace(s, '')
return text.strip().lower()
@skip_edited
@bind_context
def add_words(update, context):
update.message.reply_text(' :')
return ADDING
@run_async
@skip_edited
@zero_exiter
def parse_text(update, context):
message = update.message.reply_text('...')
if is_youtube_link(update.message.text):
try:
transcript_list = YouTubeTranscriptApi.list_transcripts(get_video_id(update.message.text))
t = transcript_list.find_transcript(['en'])
_text = clear_text('. '.join([i['text'] for i in t.fetch()])).split()
except:
message.edit_text(' . : ')
return ADDING
else:
_text = clear_text(update.message.text).split()
_words = context._bot.get_dict_words(context.t_id)
good_words = []
bad_words = []
for word in set(_text):
z = re_text.match(word)
if z:
if z.group() not in _words:
good_words.append(word)
else:
bad_words.append(word)
tbot.add_conversation_handler(
entry_points = [('add_words', add_words)],
states = [
[(parse_text, '@^((?!.*((^\/)+)).*)(.+)$')],
])
, .
import argparse
parser = argparse.ArgumentParser()
parser.add_argument('-l', '--level', default='INFO',
choices=['TRACE', 'DEBUG', 'INFO', 'SUCCESS', 'WARNING', 'ERROR', 'CRITICAL'],
help=' ')
parser.add_argument('-p', '--proxy', help=' ')
args = parser.parse_args()
config = {
'handlers': [
{'sink': sys.stdout, 'level': args.level},
{'sink': 'logs.log', 'serialize': False, 'level': 'DEBUG'},
],
}
logger.configure(**config)
if args.proxy:
t_proxy = {'proxy_url': args.proxy, 'read_timeout': 1000, 'connect_timeout': 1000}
else:
t_proxy = None
Python 3.5+ . , VPS. . , : pip freeze > requirements.txt
. , . pip freeze
, , . — , pipreqs.
, , .pyz
. py -m zipapp "__" -m "__:_" -o bot.pyz
, bot.pyz . , __init__.py - , .
def main():
if __name__ == '__main__':
main()
zip bot.zip requirements.txt bot.pyz
VPS.
Python , , . . ( ).