
Tidak, ini bukan salah satu dari ratusan artikel tentang cara menulis bot Hello World pertama Anda dengan Python. Di sini Anda tidak akan menemukan petunjuk terperinci tentang cara mendapatkan token API di BotFather atau meluncurkan bot di cloud. Sebagai imbalannya, saya akan menunjukkan kepada Anda bagaimana melepaskan kekuatan penuh Python secara maksimal untuk mencapai kode yang paling estetis dan indah. Kami menampilkan lagu tentang daya tarik struktur yang kompleks - kami menari dan menari. Di bawah asynchrony cut, sistem menyimpan sendiri, banyak dekorator yang berguna dan banyak kode yang indah.
Penafian: Orang dengan OOP otak dan penganut pola "benar" mungkin mengabaikan artikel ini.
Ide
Untuk memahami bagaimana rasanya tidak mengenal bahasa Inggris di masyarakat modern, bayangkan Anda adalah bangsawan abad ke-18 yang tidak mengenal bahasa Prancis. Bahkan jika Anda tidak berpengalaman dalam sejarah, Anda masih bisa membayangkan betapa sulitnya hidup dalam keadaan seperti itu. Di dunia modern, bahasa Inggris telah menjadi kebutuhan, bukan hak istimewa, terutama jika Anda berada di industri TI.
: , , , . , , , , โ .
, :
, . , , Telegram-.
, โ . , . , , .
ยซ , ยป
python-telegram-bot (ptb). loguru, . , ptb ( logging) . , , :
from loguru import logger
import sys
config = {
'handlers': [
{'sink': sys.stdout, 'level': 'INFO'},
{'sink': 'logs.log', 'serialize': False, 'level': 'DEBUG'},
],
}
logger.configure(**config)
updater = Updater('YOUR_TOKEN')
dp = updater.dispatcher
updater.logger = logger
dp.logger = logger
, -, โ . . โ , .
from __future__ import annotations
from loguru import logger
import inspect
import functools
import os
def file_is_empty(path: str) -> bool:
return os.stat(path).st_size == 0
def clear_file(path: str) -> None:
with open(path, 'w'): pass
def cache_decorator(method):
@functools.wraps(method)
def wrapper(self, *args, **kwargs):
res = method(self, *args, **kwargs)
Cache.link.recess(self, {'method_name': method.__name__})
logger.opt(lazy=True).debug(f'Decorator for {method.__name__} was end')
return res
return wrapper
class Cache:
"""
+ cache_size - ,
+ cache_files - ,
"""
link = None
def __init__(self, cache_size=10):
self._classes = []
self._cache_files = []
self.__class__.link = self
self._counter = 0
self.CACHE_SIZE = cache_size
def add(self, cls: class, file: str) -> None:
"""
+ cls -
+ file - ,
"""
self._cache_files.append(file)
self._classes.append(cls)
if file_is_empty(file): return None
logger.opt(lazy=True).debug(f'For {cls.__class__.__name__} file {file} is not empty')
for data in self.load(file):
cls.save_non_caching(data)
clear_file(file)
self._counter = 0
def recess(self, cls: class, data: dict) -> None:
"""
,
"""
if self._counter + 1 >= self.CACHE_SIZE:
self.save_all()
else:
self._counter += 1
filename = self._cache_files[self._classes.index(cls)]
self.save(data, filename=filename)
, , :
@cache_decorator
def add_smth_important(*args, **kwargs) -> Any:
, , : . โ EnglishBot, : ptb, , , - . Telegram- , . , , , . / , :
from modules import EnglishBot
from modules.module import start
if __name__ == '__main__':
tbot = EnglishBot(
)
tbot.add_command_handler(start, 'start')
, .
ptb โ update context, . context chat_data, . context.chat_data['data']
. - , context.data
. , .
from telegram.ext import CommandHandler
def a(self, key: str):
try:
return object.__getattribute__(self, key)
except:
return self.chat_data[key]
def b(self, key: str, data=None, replace=True):
if replace or not self.chat_data.get(key, None):
self.chat_data[key] = data
CallbackContext.__getattribute__ = a
CallbackContext.set = b
. , context.
def bind_context(func):
def wrapper(update, context):
context._bot.bind_user_data(update, context)
return func(update, context)
return wrapper
class EnglishBot:
def bind_user_data(self, update, context) -> dict:
context.set('t_id', update.message.chat_id, replace=False)
context.set('t_ln', update.message.from_user.language_code, replace=False)
context:
class EnglishBot:
def __init__(self, *args, **kwargs):
CallbackContext._bot = self
.
from EnglishBot import bind_context
@bind_context
def start(update, context):
if not context._bot.user_exist(context.t_id):
context.set('push_notification', True)
context._bot.new_user(context.t_id, context.t_ln)
return update.message.reply_text(' ')
โ
, , . .
class EnglishBot:
def add_command_handler(self, func: function, name=None) -> None:
"""
,
"""
name = name or func.__name__
self.dp.add_handler(CommandHandler(name, func))
tbot.add_command_handler(start)
, . bind_context, wrapper. .
import functools
def bind_context(func):
@functools.wraps(func)
def wrapper(update, context):
context._bot.bind_user_data(update, context)
return func(update, context)
return wrapper
, . .
import functools
END = -1
def zero_exiter(func):
@functools.wraps(func)
def wrapper(update, context):
if update.to_dict()['message'].get('text', None) == '0':
update.message.reply_text(' - ')
return END
return func(update, context)
return wrapper
def skip_edited(func):
@functools.wraps(func)
def wrapper(update, context):
if not update.to_dict().get('edited_message', None):
return func(update, context)
return wrapper
โ @run_async
, . .
from telegram.ext.dispatcher import run_async
from EnglishBot import skip_edited
@run_async
@skip_edited
def heavy_function(update, context):
, โ , .
, . @logger.catch
, , , logger.
from loguru import logger
@logger.catch
def heavy_function2(update, context):
/ .
from EnglishBot import bind_context
from Cache import file_is_empty
from telegram import ReplyKeyboardMarkup
from loguru import logger
LOG_FILE = 'logs.log'
SENDING, MAIN, END = range(1, -2, -1)
buttons = ReplyKeyboardMarkup(
[(' ', 'logs'), (' ', 'clear')],
[(' ', 'send')],
)
@bind_context
def admin_panel(update, context):
if not context._bot.acess_check(context.t_id):
return update.message.reply_text(f' {update.message.text}')
update.message.reply_text(' :', reply_markup=buttons)
return MAIN
def get_logs(update, context):
if file_is_empty(LOG_FILE):
update.callback_query.reply_text(' ')
else:
context.bot.send_chat_action(chat_id=context.t_id, action='upload_document')
context.bot.sendDocument(chat_id=context.t_id, document=open(LOG_FILE, 'rb'), name=LOG_FILE, timeout=1000)
update.callback_query.answer(text='')
return MAIN
def logs_clear(update, context):
with open(LOG_FILE, 'w') as file:
update.callback_query.reply_text('')
update.callback_query.answer(text='')
return MAIN
def take_message(update, context):
update.callback_query.reply_text(' ')
update.callback_query.answer(text='')
return SENDING
@zero_exiter
def send_all(update, context):
count = 0
for id in list(context._bot.get_user_ids()):
try:
if id == context.t_id: continue
context.bot.send_message(context.t_id, text=update.message.text_markdown, parse_mode='Markdown')
count += 1
except:
pass
update.callback_query.reply_text(f' {count} ')
update.callback_query.answer(text='')
return MAIN
tbot.add_conversation_handler(
entry_points = [('admin', admin)],
states = [[(logs, '^logs$'), (logs_clear, '^clear$'), (send, '^send$')], [(send_alk, '@^((?!.*((^\/)+)).*)(.+)$')]]
)
add_conversation_handler :
class EnglishBot:
def add_conversation_handler(self, entry_points: list, states: list, fallbacks: list) -> None:
fallbacks = [CommandHandler(name, func) for name, func in fallbacks]
entry_points = [CommandHandler(name, func) for name, func in entry_points]
r_states = {}
for i in range(len(states)):
r_states[i] = []
for func, pattern in states[i]:
if pattern[0] == '@':
r_states[i].append(MessageHandler(Filters.regex(pattern[1:]), func))
else:
r_states[i].append(CallbackQueryHandler(func, pattern=pattern))
conv_handler = ConversationHandler(entry_points=entry_points, states=r_states, fallbacks=fallbacks)
dp.add_handler(conv_handler)
.
from EnglishBot import bind_context, skip_edited, zero_exiter
from youtube_transcript_api import YouTubeTranscriptApi
from telegram.ext.dispatcher import run_async
START_OVER, ADDING, END = range(1, -2, -1)
re_youtube = re.compile('^(http(s)?:\/\/)?((w){3}.)?youtu(be|.be)?(\.com)?\/.+')
re_text = re.compile('^[a-z]{3,20}$')
def is_youtube_link(link: str) -> bool:
if re_youtube.match(link) is not None: return True
def clear_text(text: str) -> str:
bad_symbols = '!@#%$^&*()_+1234567890-=/|\\?><.,":;`~[]{}'
for s in bad_symbols:
text = text.replace(s, '')
return text.strip().lower()
@skip_edited
@bind_context
def add_words(update, context):
update.message.reply_text(' :')
return ADDING
@run_async
@skip_edited
@zero_exiter
def parse_text(update, context):
message = update.message.reply_text('...')
if is_youtube_link(update.message.text):
try:
transcript_list = YouTubeTranscriptApi.list_transcripts(get_video_id(update.message.text))
t = transcript_list.find_transcript(['en'])
_text = clear_text('. '.join([i['text'] for i in t.fetch()])).split()
except:
message.edit_text(' . : ')
return ADDING
else:
_text = clear_text(update.message.text).split()
_words = context._bot.get_dict_words(context.t_id)
good_words = []
bad_words = []
for word in set(_text):
z = re_text.match(word)
if z:
if z.group() not in _words:
good_words.append(word)
else:
bad_words.append(word)
tbot.add_conversation_handler(
entry_points = [('add_words', add_words)],
states = [
[(parse_text, '@^((?!.*((^\/)+)).*)(.+)$')],
])
, .
import argparse
parser = argparse.ArgumentParser()
parser.add_argument('-l', '--level', default='INFO',
choices=['TRACE', 'DEBUG', 'INFO', 'SUCCESS', 'WARNING', 'ERROR', 'CRITICAL'],
help=' ')
parser.add_argument('-p', '--proxy', help=' ')
args = parser.parse_args()
config = {
'handlers': [
{'sink': sys.stdout, 'level': args.level},
{'sink': 'logs.log', 'serialize': False, 'level': 'DEBUG'},
],
}
logger.configure(**config)
if args.proxy:
t_proxy = {'proxy_url': args.proxy, 'read_timeout': 1000, 'connect_timeout': 1000}
else:
t_proxy = None
Python 3.5+ . , VPS. . , : pip freeze > requirements.txt
. , . pip freeze
, , . โ , pipreqs.
, , .pyz
. py -m zipapp "__" -m "__:_" -o bot.pyz
, bot.pyz . , __init__.py - , .
def main():
if __name__ == '__main__':
main()
zip bot.zip requirements.txt bot.pyz
VPS.
Python , , . . ( ).