تعلم اللغة الإنجليزية باستخدام Telegram bot

صورة


لا ، هذه ليست واحدة من مئات المقالات حول كيفية كتابة أول برنامج Hello World بوت في Python. هنا لن تجد تعليمات تفصيلية حول كيفية الحصول على رمز API المميز في BotFather أو إطلاق روبوت في السحابة. في المقابل ، سأوضح لك كيفية إطلاق العنان لقوة Python الكاملة إلى أقصى حد من أجل تحقيق رمز جمالي وأجمل. نحن نؤدي أغنية عن جاذبية الهياكل المعقدة - نرقص ونرقص. تحت قطع التزامن ، يحفظ نظامها الخاص ، مجموعة من الديكورات المفيدة والكثير من الكود الجميل.


إخلاء المسؤولية: قد يتجاهل الأشخاص الذين لديهم OOP في الدماغ وأتباع الأنماط "الصحيحة" هذه المقالة.


فكرة


لفهم معنى عدم معرفة اللغة الإنجليزية في المجتمع الحديث ، تخيل أنك نبيل في القرن الثامن عشر لا يعرف اللغة الفرنسية. حتى إذا لم تكن على دراية جيدة بالتاريخ ، فلا يزال بإمكانك تخيل مدى صعوبة العيش في ظل هذه الظروف. في العالم الحديث ، أصبحت اللغة الإنجليزية ضرورة ، وليس امتيازًا ، خاصة إذا كنت تعمل في مجال تكنولوجيا المعلومات.


: , , , . , , , , — .


, :


  • youtube-/,
  • :

, . , , Telegram-.


, — . , . , , .

« , »


python-telegram-bot (ptb). loguru, . , ptb ( logging) . , , :


from loguru import logger
import sys

#   :     
config = {
    'handlers': [
        {'sink': sys.stdout, 'level': 'INFO'},
        {'sink': 'logs.log', 'serialize': False, 'level': 'DEBUG'},
    ],
}

logger.configure(**config)

# ...

updater = Updater('YOUR_TOKEN')
dp = updater.dispatcher

#    .   
updater.logger = logger
dp.logger = logger

, -, — . . — , .


from __future__ import annotations #       
from loguru import logger

import inspect
import functools
import os

def file_is_empty(path: str) -> bool:
    return os.stat(path).st_size == 0

def clear_file(path: str) -> None:
    with open(path, 'w'): pass

def cache_decorator(method):
    @functools.wraps(method)
    def wrapper(self, *args, **kwargs):
        res = method(self, *args, **kwargs)
        Cache.link.recess(self, {'method_name': method.__name__}) # (1)
        #       
        #  ; opt   
        logger.opt(lazy=True).debug(f'Decorator for {method.__name__} was end')
        return res
    return wrapper

class Cache:
    """
    + cache_size  - ,       
    + cache_files -  ,        
    """

    link = None

    def __init__(self, cache_size=10):
        #    .      
        self._classes = []
        # ,  
        self._cache_files = []
        # (1):  ,        
        #  ,         , 
        #      .   ,    
        #     
        self.__class__.link = self

        self._counter = 0
        self.CACHE_SIZE = cache_size

    def add(self, cls: class, file: str) -> None:
        """
            

        + cls  -  
        + file - ,    
        """

        self._cache_files.append(file)
        self._classes.append(cls)

        if file_is_empty(file): return None

        logger.opt(lazy=True).debug(f'For {cls.__class__.__name__} file {file} is not empty')

        for data in self.load(file):
            cls.save_non_caching(data)

        clear_file(file)
        self._counter = 0

    def recess(self, cls: class, data: dict) -> None:
        """
         ,     
        """

        if self._counter + 1 >= self.CACHE_SIZE:
            self.save_all()
        else: 
            self._counter += 1
            filename = self._cache_files[self._classes.index(cls)]
            self.save(data, filename=filename)

    # ...
    #    save_all, save, load 
    # ... 

, , :


@cache_decorator
def add_smth_important(*args, **kwargs) -> Any:
    # ...
    #  -    ...
    # ...

, , : . — EnglishBot, : ptb, , , - . Telegram- , . , , , . / , :


#   
from modules import EnglishBot
#  ,   Telegram-
from modules.module import start
# ...

if __name__ == '__main__':
    #  
    tbot = EnglishBot(
        # ...
    )

    #    
    tbot.add_command_handler(start, 'start')
    # ...

, .



ptb — update context, . context chat_data, . context.chat_data['data']. - , context.data. , .


from telegram.ext import CommandHandler

def a(self, key: str):
    #        
    #  ,      chat_data
    try:
        return object.__getattribute__(self, key)
    except:
        return self.chat_data[key]

def b(self, key: str, data=None, replace=True):
    #  :  replace=False   ,    
    #  ,    ,     None
    if replace or not self.chat_data.get(key, None):
        self.chat_data[key] = data

#  context,      context.data
CallbackContext.__getattribute__ = a
#       
CallbackContext.set = b

. , context.


def bind_context(func):
    def wrapper(update, context):
        context._bot.bind_user_data(update, context) # (2)
        return func(update, context)
    return wrapper

class EnglishBot:
    # ...

    def bind_user_data(self, update, context) -> dict:
        context.set('t_id', update.message.chat_id, replace=False)
        context.set('t_ln', update.message.from_user.language_code, replace=False)
        # ...
        #    ,        context
        #  -   
        # ...

context:


class EnglishBot:
    # ...

    def __init__(self, *args, **kwargs):
        # ...
        # (2):         context._bot
        CallbackContext._bot = self

.


from EnglishBot import bind_context

@bind_context
def start(update, context):
    #         
    #          
    #       
    if not context._bot.user_exist(context.t_id):
        #   -  
        context.set('push_notification', True)
        #      
        context._bot.new_user(context.t_id, context.t_ln)

        return update.message.reply_text(' ')    
    # ...


, , . .



class EnglishBot:
    # ...

    def add_command_handler(self, func: function, name=None) -> None:
        """
        ,    
        """

        name = name or func.__name__
        self.dp.add_handler(CommandHandler(name, func))

# ...

#   :
tbot.add_command_handler(start) #  tbot.add_command_handler(start, 'start')

, . bind_context, wrapper. .


import functools

def bind_context(func):
    # functools.wraps  stdlib     Python 3.4
    @functools.wraps(func)
    def wrapper(update, context):
        context._bot.bind_user_data(update, context)
        return func(update, context)
    return wrapper

, . .


import functools

END = -1

def zero_exiter(func):
    @functools.wraps(func)
    def wrapper(update, context):
        if update.to_dict()['message'].get('text', None) == '0':
            update.message.reply_text(' - ')            
            return END 

        return func(update, context)    
    return wrapper

def skip_edited(func):
    @functools.wraps(func)
    def wrapper(update, context):
        #     ,   None, 
        #   conversation_handler,     
        if not update.to_dict().get('edited_message', None):
            return func(update, context)
    return wrapper

@run_async, . .


from telegram.ext.dispatcher import run_async
from EnglishBot import skip_edited

@run_async
@skip_edited
def heavy_function(update, context):
    # ...
    #   ,    
    #     
    # ...

, — , .


, . @logger.catch, , , logger.


from loguru import logger

@logger.catch
def heavy_function2(update, context):
    # ...
    #     ,     
    # ...


/ .


from EnglishBot import bind_context
from Cache import file_is_empty
from telegram import ReplyKeyboardMarkup
from loguru import logger

LOG_FILE = 'logs.log'
SENDING, MAIN, END = range(1, -2, -1)

buttons = ReplyKeyboardMarkup(
    [(' ', 'logs'), (' ', 'clear')],
    [(' ', 'send')],
    # [...],
)

@bind_context
def admin_panel(update, context):
    #     
    if not context._bot.acess_check(context.t_id):
        #      ,     
        return update.message.reply_text(f'  {update.message.text}')

    update.message.reply_text(' :', reply_markup=buttons)

    return MAIN

#
# Logs methods
#

def get_logs(update, context):
    if file_is_empty(LOG_FILE):
        update.callback_query.reply_text(' ')
    else:
        #   
        context.bot.send_chat_action(chat_id=context.t_id, action='upload_document')
        context.bot.sendDocument(chat_id=context.t_id, document=open(LOG_FILE, 'rb'), name=LOG_FILE, timeout=1000)

    #     ,      
    update.callback_query.answer(text='')
    #  ,  .    , None    
    #      
    return MAIN

def logs_clear(update, context):
    with open(LOG_FILE, 'w') as file:
        update.callback_query.reply_text('')
        update.callback_query.answer(text='')

    return MAIN

#
# Send methods
#

def take_message(update, context):
    update.callback_query.reply_text(' ')
    update.callback_query.answer(text='')

    return SENDING

@zero_exiter
def send_all(update, context):
    count = 0

    #     
    for id in list(context._bot.get_user_ids()):
        #   ,  -     
        #         
        try:
            #     
            if id == context.t_id: continue
            #   Markdown,    
            context.bot.send_message(context.t_id, text=update.message.text_markdown, parse_mode='Markdown')
            count += 1
        except:
            pass

    update.callback_query.reply_text(f' {count} ')
    update.callback_query.answer(text='')

    return MAIN

# ...

#   :
tbot.add_conversation_handler(
    entry_points = [('admin', admin)],
    #   send_alk    ,     /
    states = [[(logs, '^logs$'), (logs_clear, '^clear$'), (send, '^send$')], [(send_alk, '@^((?!.*((^\/)+)).*)(.+)$')]]
)

add_conversation_handler :


class EnglishBot:    
    # ...

    def add_conversation_handler(self, entry_points: list, states: list, fallbacks: list) -> None:
        fallbacks = [CommandHandler(name, func) for name, func in fallbacks]
        entry_points = [CommandHandler(name, func) for name, func in entry_points]
        r_states = {}

        for i in range(len(states)):
            r_states[i] = []

            #      
            for func, pattern in states[i]:
                    #      @,     
                    #  -    
                    if pattern[0] == '@':
                        r_states[i].append(MessageHandler(Filters.regex(pattern[1:]), func))
                    else:
                        r_states[i].append(CallbackQueryHandler(func, pattern=pattern))

        conv_handler = ConversationHandler(entry_points=entry_points, states=r_states, fallbacks=fallbacks)
        dp.add_handler(conv_handler)


.


from EnglishBot import bind_context, skip_edited, zero_exiter
from youtube_transcript_api import YouTubeTranscriptApi
from telegram.ext.dispatcher import run_async

START_OVER, ADDING, END = range(1, -2, -1)

re_youtube = re.compile('^(http(s)?:\/\/)?((w){3}.)?youtu(be|.be)?(\.com)?\/.+')
re_text = re.compile('^[a-z]{3,20}$')

def is_youtube_link(link: str) -> bool:
    if re_youtube.match(link) is not None: return True

def clear_text(text: str) -> str:
    bad_symbols = '!@#%$^&*()_+1234567890-=/|\\?><.,":;`~[]{}'

    for s in bad_symbols:
        text = text.replace(s, '')

    return text.strip().lower()

@skip_edited
@bind_context
def add_words(update, context):
    update.message.reply_text(' :')    
    return ADDING

@run_async
@skip_edited
@zero_exiter
def parse_text(update, context):
    #   - ,    ,   
    message = update.message.reply_text('...')

    if is_youtube_link(update.message.text):
        #        ,    
        try:
            transcript_list = YouTubeTranscriptApi.list_transcripts(get_video_id(update.message.text))
            t = transcript_list.find_transcript(['en'])
            _text = clear_text('. '.join([i['text'] for i in t.fetch()])).split()
        except:
            message.edit_text(' .   : ')
            return ADDING
    else:
        _text = clear_text(update.message.text).split()

    #        
    _words = context._bot.get_dict_words(context.t_id)
    # ,   
    good_words = []
    #  
    bad_words = []

    #    
    for word in set(_text):
        #     
        z = re_text.match(word)
        if z:
            #          
            if z.group() not in _words:
                good_words.append(word)
        else:
            bad_words.append(word)

    #     -    
    #      
    # ...

# ...

#   :
    tbot.add_conversation_handler(
        entry_points = [('add_words', add_words)],
        states = [
            [(parse_text, '@^((?!.*((^\/)+)).*)(.+)$')],
            # ...
        ])


, .


#   :

import argparse

parser = argparse.ArgumentParser()
parser.add_argument('-l', '--level', default='INFO', 
                    choices=['TRACE', 'DEBUG', 'INFO', 'SUCCESS', 'WARNING', 'ERROR', 'CRITICAL'],
                    help='     ')
parser.add_argument('-p', '--proxy', help='    ')
args = parser.parse_args()

config = {
    'handlers': [
        {'sink': sys.stdout, 'level': args.level},
        #   .       DEBUG  
        {'sink': 'logs.log', 'serialize': False, 'level': 'DEBUG'},
    ],
}

logger.configure(**config)

if args.proxy:
    t_proxy = {'proxy_url': args.proxy, 'read_timeout': 1000, 'connect_timeout': 1000}
    # ...
    #    
    # ...
else:
    t_proxy = None

Python 3.5+ . , VPS. . , : pip freeze > requirements.txt. , . pip freeze , , . — , pipreqs.


, , .pyz . py -m zipapp "__" -m "__:_" -o bot.pyz, bot.pyz . , __init__.py - , .


#  __init__.py 
# py -m zipapp "__" -m "__init__.py:main" -o bot.pyz

def main():
    # ...

if __name__ == '__main__':
    main()

zip bot.zip requirements.txt bot.pyz VPS.



Python , , . . ( ).


All Articles