
介绍
鉴于从即时通讯,新闻站点,广播,电视等绝对所有渠道都涌向我们的恐慌和错误信息的全境,我们决定展示如何使用python bot 和Telegram的其他有趣成分(只是在开玩笑)来打败冠状病毒!
要在家中准备疫苗机器人,您需要一台计算机,python,docker,heroku CLI,电报信使作为平台,而mongoDB作为数据库。在我们的故事开始时,其他所有内容都可以忽略不计,最后,将提供完整的列表以及进行批判性分析和进一步开发的剂量。
我们想做什么
首先,我们决定要做的是一个电报机器人,该机器人将能够显示:
- 任何国家/地区当前日期的COVID-19感染统计数据
- COVID-19感染的地理位置统计
- 用户国家统计
- 用户活动统计
- 联系信息
- 帮助如何使用机器人
我们将从中得到什么
-:
Telegram
Telegram . , :
- Telegram “BotFather”
- /start
- /newbot
- Telegram
- token
Telegram :
, . github , CI .
-
tree -L 2
├── Dockerfile
├── .gitignore
├── README.md
├── common
│   ├── containers.py
│   └── tg_analytics.py
├── data
│   └── mongo_context.py
├── data.csv
├── heroku.yml
├── setup.py
├── requirements.txt
├── services
│   ├── country_service.py
│   └── statistics_service.py
└── templates
    ├── contacts.html
    ├── country_statistics.html
    ├── himydear.html
    ├── idunnocommand.html
    ├── notfound.html
    └── query_statistics.html
, :
python -m venv env 
sourse env/bin/activate 
( ) , , , .
- - , :
deactivate 
. , .
touch requirements.txt 
touch setup.py 
:
- Requests — HTTP
- Pytelegrambotapi — Telegram API
- Pymongo — Mongo
- Dependency_injector —
- Geocoder — Geonames API
- Jinja2 —
- Ciso8601 — ISO 8601
- Cachetools —
- Pandas —
- Flask — web framework python
, .
requests==2.23.0
pytelegrambotapi==3.6.7
pymongo==3.10.1
dependency_injector==3.15.6
geocoder==1.38.1
jinja2==2.11.1
ciso8601==2.1.3
cachetools==4.0.0
pandas==1.0.3
flask==1.1.2
. , API, ;).
requirements.txt , :
pip install -r requirements.txt 
, .
setup.py
import telebot
import os
from telebot import types
token = os.getenv('API_BOT_TOKEN')
bot = telebot.TeleBot(token)
@bot.message_handler(commands=['start'])
def command_start_handler(message):
    cid = message.chat.id
    bot.send_chat_action(cid, 'typing')
    markup = types.ReplyKeyboardMarkup(row_width=1, resize_keyboard=True)
    button_geo = types.KeyboardButton(text='send location', request_location=True)
    markup.add(button_geo)
    bot.send_message(cid, 'Hello stranger, please choose commands from the menu', reply_markup=markup)
if __name__ == '__main__':
    bot.polling(none_stop=True, interval=0)
 , API_BOT_TOKEN, pycharm IDE . “Edit configurations”, , , token .


Telegram .

, “send location”, «contacts», «help» -.
HTML Markdown, Telegram . HTML .
setup.py
known_users = []
user_steps = {}
commands = {
    'start': 'Start using this bot',
    'help': 'Useful information about this bot',
    'contacts': 'Contacts'
}
def send_action(action):
    """Sends `action` while processing func command."""
    def decorator(func):
        @wraps(func)
        def command_func(message, *args, **kwargs):
            bot.send_chat_action(chat_id=message.chat.id, action=action)
            return func(message, *args, **kwargs)
        return command_func
    return decorator
@bot.message_handler(commands=['help'])
@send_action('typing')
def command_help_handler(message):
    help_text = 'The following commands are available: \n'
    for key in commands:
        help_text += '/' + key + ': '
        help_text += commands[key] + '\n'
    help_text += 'ANTICOVID19BOT speaks english, be careful and take care!'
    bot.send_message(message.chat.id, help_text)
@bot.message_handler(commands=['contacts'])
@send_action('typing')
def command_contacts_handler(message):
    with codecs.open('templates/contacts.html') as file:
        template = Template(file.read())
        bot.send_message(message.chat.id, template.render(username=message.chat.username), parse_mode='HTML')
 .
mongo_context.py
import os
from datetime import datetime
from pymongo import MongoClient
class MongoDbContext:
    """Mongo database context class"""
    
    def __init__(self):
        try:
            self.connection_string = os.getenv('CONNECTION_STRING')
            self.client = MongoClient(self.connection_string)
        except Exception as e:
            raise e
    
    def save_query(self, country, user_name):
        db = self.client[os.getenv('DB_NAME')]
        countries_stats = db.country_stats
        result = countries_stats.insert_one({'date': datetime.now(), 'country': country, 'username': user_name})
    
    def get_users_queries(self):
        db = self.client[os.getenv('DB_NAME')]
        countries_stats = db.country_stats
        queries = countries_stats.aggregate([
            {'$group': {'_id': '$country', 'count': {'$sum': 1}}},
            {'$sort': {'count': -1}},
            {'$limit': 5}
        ])
        users = countries_stats.aggregate([
            {'$group': {'_id': '$username', 'count': {'$sum': 1}}},
            {'$sort': {'count': -1}},
            {'$limit': 5}
        ])
        return {'queries': list(queries), 'users': list(users)}
 .
containers.py
import dependency_injector.containers as containers
import dependency_injector.providers as providers
from data.mongo_context import MongoDBContext
class DbContext(containers.DeclarativeContainer):
    """di for future development"""
    mongo_db_context = providers.Singleton(MongoDBContext)
 API. API , , (, ) Telegram.
country_service.py
import os
import requests
from cachetools import cached, TTLCache
class CountryService:
    """This class provide country information"""
    
    cache = TTLCache(maxsize=100, ttl=10800)
    
    @cached(cache)
    def get_country_information(self, latitude, longitude):
        url = 'http://api.geonames.org/countrySubdivisionJSON'
        query_string = {'lat': latitude, 'lng': longitude, 'username': os.getenv('GEO_NAME_API_KEY')}
        geo_result = requests.request('GET', url, params=query_string)
        return geo_result.json()
 statistics_service.py
import os
import requests
import codecs
import ciso8601
from jinja2 import Template
from cachetools import cached, TTLCache
from common.containers import DBContext
class StatisticsService:
    """This class provide information about statistics"""
    
    cache = TTLCache(maxsize=100, ttl=10800)
    def __init__(self):
        try:
            self.covid_api_token = os.getenv('COVID_STAT_API_TOKEN')
            self.db_context = DBContext.mongo_db_context()
        except Exception as e:
            raise e
    
    def __get_statistics_by_country_from_api(self, country_name):
        url = "https://covid-193.p.rapidapi.com/statistics"
        query_string = {'country': country_name}
        headers = {
            'x-rapidapi-host': "covid-193.p.rapidapi.com",
            'x-rapidapi-key': "db21e48371msh30968ff2ec637d3p19bd08jsn32426bcabdaf"
        }
        response = requests.request("GET", url, headers=headers, params=query_string)
        return response.json()
    
    @cached(cache)
    def __get_statistics_by_country_as_html(self, country_name):
        try:
            statistics_json = self.__get_statistics_by_country_from_api(country_name)
            if len(statistics_json['response']) == 0:
                with codecs.open('templates/idunnocommand.html', 'r', encoding='UTF-8') as file:
                    template = Template(file.read())
                    return template.render(text_command=country_name)
            else:
                with codecs.open('templates/country_statistics.html', 'r', encoding='UTF-8') as file:
                    template = Template(file.read())
                    return template.render(date=ciso8601.parse_datetime(statistics_json['response'][0]['time']).date(),
                                           country=statistics_json['response'][0]['country'].upper(),
                                           new_cases=statistics_json['response'][0]['cases']['new'],
                                           active_cases=statistics_json['response'][0]['cases']['active'],
                                           critical_cases=statistics_json['response'][0]['cases']['critical'],
                                           recovered_cases=statistics_json['response'][0]['cases']['recovered'],
                                           total_cases=statistics_json['response'][0]['cases']['total'],
                                           new_deaths=statistics_json['response'][0]['deaths']['new'],
                                           total_deaths=statistics_json['response'][0]['deaths']['total'])
        except Exception as e:
            raise e
    
    def get_statistics_by_country_name(self, country_name, user_name):
        self.db_context.save_query(country_name, user_name)
        return self.__get_statistics_by_country_as_html(country_name)
    
    def get_statistics_of_users_queries(self):
        query_statistics = self.db_context.get_users_queries()
        with codecs.open('templates/query_statistics.html', 'r', encoding='UTF-8') as file:
            template = Template(file.read())
            return template.render(queries=query_statistics['queries'], users=query_statistics['users'])
 . chatbase, csv , , . , alekskram.
tg_analitycs.py
import csv
import datetime
import os
import pandas as pd
users_type = {
    1: '',
    2: '',
    3: '',
    4: ''
}
day_type = {
    1: '',
    2: '',
    3: '',
    4: ''
}
def remove(user_id):
    path = os.getcwd() + '/%s.txt' % user_id
    os.remove(path)
def statistics(user_id, command):
    data = datetime.datetime.today().strftime("%Y-%m-%d")
    with open('data.csv', 'a', newline="") as fil:
        wr = csv.writer(fil, delimiter=';')
        wr.writerow([data, user_id, command])
def analysis(bid, user_id):
    season = int(bid[1])
    df = pd.read_csv('data.csv', delimiter=';', encoding='utf8')
    number_of_users = len(df['id'].unique())
    number_of_days = len(df['data'].unique())
    message_to_user = '    %s %s: \n' % (season, day_type.get(season, ''))
    message_to_user += '    %s %s \n' % (number_of_days, day_type.get(season, ''))
    if season > number_of_days:
        season = number_of_days
        message_to_user += '    , \n' \
                           '      \n'
    df_user = df.groupby(['data', 'id']).count().reset_index().groupby('data').count().reset_index()
    list_of_dates_in_df_user = list(df_user['data'])
    list_of_number_of_user_in_df_user = list(df_user['id'])
    list_of_dates_in_df_user = list_of_dates_in_df_user[-season:]
    list_of_number_of_user_in_df_user = list_of_number_of_user_in_df_user[-season:]
    df_command = df.groupby(['data', 'command']).count().reset_index()
    unique_commands = df['command'].unique()
    commands_in_each_day = []
    list_of_dates_in_df_command = list(df_command['data'])
    list_of_number_of_user_in_df_command = list(df_command['id'])
    list_of_name_of_command_in_df_command = list(df_command['command'])
    commands_in_this_day = dict()
    for i in range(len(list_of_dates_in_df_command)):
        commands_in_this_day[list_of_name_of_command_in_df_command[i]] = list_of_number_of_user_in_df_command[i]
        if i + 1 >= len(list_of_dates_in_df_command) or list_of_dates_in_df_command[i] != list_of_dates_in_df_command[
            i + 1]:
            commands_in_each_day.append(commands_in_this_day)
            commands_in_this_day = dict()
    commands_in_each_day = commands_in_each_day[-season:]
    if '' in bid:
        message_to_user += '     ' + '%s' % number_of_users \
                           + ' %s ' % users_type.get(number_of_users, '') + '\n' \
                                                                                         '   %s %s: \n' % (
                               season, day_type.get(season, ''))
        for days, number, comm_day in zip(list_of_dates_in_df_user, list_of_number_of_user_in_df_user,
                                          commands_in_each_day):
            message_to_user += ':%s :%d   :%s\n' % (days, number, comm_day.get('/start', 0))
    if '' in bid:
        message_to_user += '    %s %s: \n' % (season, day_type.get(season, ''))
        for days, commands in zip(list_of_dates_in_df_user, commands_in_each_day):
            message_to_user += ':%s\n' % days
            for i in unique_commands:
                if i in commands:
                    message_to_user += '%s - %s \n' % (i, commands.get(i))
                else:
                    message_to_user += '%s - 0 \n' % i
    if 'txt' in bid or '' in bid:
        with open('%s.txt' % user_id, 'w', encoding='UTF-8') as fil:
            fil.write(message_to_user)
            fil.close()
    else:
        return message_to_user
 .
setup.py
import telebot
import os
import codecs
import common.tg_analytics as tga
from functools import wraps
from telebot import types
from jinja2 import Template
from services.country_service import CountryService
from services.statistics_service import StatisticsService
token = os.getenv('API_BOT_TOKEN')
bot = telebot.TeleBot(token)
user_steps = {}
known_users = []
stats_service = StatisticsService()
country_service = CountryService()
commands = {'start': 'Start using this bot',
            'country': 'Please, write a country name',
            'statistics': 'Statistics by users queries',
            'help': 'Useful information about this bot',
            'contacts': 'Developer contacts'}
def get_user_step(uid):
    if uid in user_steps:
        return user_steps[uid]
    else:
        known_users.append(uid)
        user_steps[uid] = 0
        return user_steps[uid]
def send_action(action):
    def decorator(func):
        @wraps(func)
        def command_func(message, *args, **kwargs):
            bot.send_chat_action(chat_id=message.chat.id, action=action)
            return func(message, *args, **kwargs)
        return command_func
    return decorator
def save_user_activity():
    def decorator(func):
        @wraps(func)
        def command_func(message, *args, **kwargs):
            tga.statistics(message.chat.id, message.text)
            return func(message, *args, **kwargs)
        return command_func
    return decorator
@bot.message_handler(commands=['start'])
@send_action('typing')
@save_user_activity()
def start_command_handler(message):
    cid = message.chat.id
    markup = types.ReplyKeyboardMarkup(row_width=1, resize_keyboard=True)
    button_geo = types.KeyboardButton(text='send location', request_location=True)
    markup.add(button_geo)
    bot.send_message(cid, 'Hello, {0}, please choose command from the menu'.format(message.chat.username),
                     reply_markup=markup)
    help_command_handler(message)
@bot.message_handler(commands=['country'])
@send_action('typing')
@save_user_activity()
def country_command_handler(message):
    cid = message.chat.id
    user_steps[cid] = 1
    bot.send_message(cid, '{0}, write name of country please'.format(message.chat.username))
@bot.message_handler(content_types=['location'])
@send_action('typing')
@save_user_activity()
def geo_command_handler(message):
    cid = message.chat.id
    geo_result = country_service.get_country_information(message.location.latitude, message.location.longitude)
    statistics = stats_service.get_statistics_by_country_name(geo_result['countryName'], message.chat.username)
    user_steps[cid] = 0
    bot.send_message(cid, statistics, parse_mode='HTML')
@bot.message_handler(func=lambda message: get_user_step(message.chat.id) == 1)
@send_action('typing')
@save_user_activity()
def country_statistics_command_handler(message):
    cid = message.chat.id
    country_name = message.text.strip()
    try:
        statistics = stats_service.get_statistics_by_country_name(country_name, message.chat.username)
    except Exception as e:
        raise e
    user_steps[cid] = 0
    bot.send_message(cid, statistics, parse_mode='HTML')
@bot.message_handler(commands=['statistics'])
@send_action('typing')
@save_user_activity()
def statistics_command_handler(message):
    cid = message.chat.id
    bot.send_message(cid, stats_service.get_statistics_of_users_queries(), parse_mode='HTML')
@bot.message_handler(commands=['contacts'])
@send_action('typing')
@save_user_activity()
def contacts_command_handler(message):
    cid = message.chat.id
    with codecs.open('templates/contacts.html', 'r', encoding='UTF-8') as file:
        template = Template(file.read())
        bot.send_message(cid, template.render(user_name=message.chat.username), parse_mode='HTML')
@bot.message_handler(commands=['help'])
@send_action('typing')
@save_user_activity()
def help_command_handler(message):
    cid = message.chat.id
    help_text = 'The following commands are available \n'
    for key in commands:
        help_text += '/' + key + ': '
        help_text += commands[key] + '\n'
    help_text += 'ANTI_COVID_19_BOT speaks english, be careful and take care'
    bot.send_message(cid, help_text)
@bot.message_handler(func=lambda message: message.text.lower() == 'hi')
@send_action('typing')
@save_user_activity()
def hi_command_handler(message):
    cid = message.chat.id
    with codecs.open('templates/himydear.html', 'r', encoding='UTF-8') as file:
        template = Template(file.read())
        bot.send_message(cid, template.render(user_name=message.chat.username), parse_mode='HTML')
@bot.message_handler(func=lambda message: True, content_types=['text'])
@send_action('typing')
@save_user_activity()
def default_command_handler(message):
    cid = message.chat.id
    if message.text[:int(os.getenv('PASS_CHAR_COUNT'))] == os.getenv('STAT_KEY'):
        st = message.text.split(' ')
        if 'txt' in st:
            tga.analysis(st, cid)
            with codecs.open('%s.txt' % cid, 'r', encoding='UTF-8') as file:
                bot.send_document(cid, file)
                tga.remove(cid)
        else:
            messages = tga.analysis(st, cid)
            bot.send_message(cid, messages)
    else:
        with codecs.open('templates/idunnocommand.html', 'r', encoding='UTF-8') as file:
            template = Template(file.read())
            bot.send_message(cid, template.render(text_command=message.text), parse_mode='HTML')
if __name__ == '__main__':
    bot.polling(none_stop=True, interval=0)
 , HTML «templates»
- API_BOT_TOKEN — Telegram
- COVID_STAT_API_TOKEN — API
- GEO_NAME_API_KEY — API geonames
- CONNECTION_STRING —
- STAT_KEY —
- DB_NAME —
Telegram BotFather . . , , Botfather . BotFather , , .
Long polling webhook
heroku, :
- “Hobby”, 30 , , 
- Telegram long polling webhook , , , HEROKU_URL
from flask import Flask, request
server = Flask(__name__)
@server.route('/' + token, methods=['POST'])
def get_message():
    bot.process_new_updates([telebot.types.Update.de_json(request.stream.read().decode("utf-8"))])
    return "!", 200
@server.route("/")
def web_hook():
    bot.remove_webhook()
    bot.set_webhook(url=os.getenv('HEROKU_URL') + token)
    return "!", 200
if __name__ == "__main__":
    server.run(host="0.0.0.0", port=int(os.environ.get('PORT', 8443)))
, Docker , PAAS Heroku, CI Github Heroku.
Docker
Dockerfile . Dockerfile .
FROM python:3.7  
COPY /requirements.txt /app/requirements.txt 
WORKDIR /app  
RUN pip install -r /app/requirements.txt
COPY . /app
CMD python /app/setup.py
PAAS Heroku
Heroku . , :
MongoDb , (connection string) . Heroku, Heroku Docker .
cd project_folder_name 
heroku container:login 
Heroku apps 
heroku container:push web --app anticovid19bot 
heroku container:release web --app anticovid19bot 
heroku logs --tail --app anticovid19bot 
Heroku , CI.
CI Github Heroku
CI heroku.yml . docker , .
build:
  docker:
    web: Dockerfile
heroku.yml Github. heroku, “Deploy” Github “connect to github” heroku .
, COVID-19 c Telegram.
, , . , .
, 10 000 , , 10 000 , , ? , “10000 ” , , , , , « ».
, .
, Telegram, .
. !
P.S...
, .
- .