Telegram bot in python vs COVID-19


Introduction


In connection with the situation of total panic and misinformation that pours to us from absolutely all channels such as instant messengers, news sites, radio, television, it was decided to show how you can defeat coronavirus using the python bot and other interesting ingredients for Telegram ( just kidding )!



- , python, docker, heroku CLI, telegram mongoDB . , .



, , , :




-:



Telegram


Telegram . , :


  • Telegram β€œBotFather”
  • /start
  • /newbot
  • Telegram
  • token


Telegram :




, . github , CI .


-


tree -L 2
β”œβ”€β”€ Dockerfile
β”œβ”€β”€ .gitignore
β”œβ”€β”€ README.md
β”œβ”€β”€ common
β”‚   β”œβ”€β”€ containers.py
β”‚   └── tg_analytics.py
β”œβ”€β”€ data
β”‚   └── mongo_context.py
β”œβ”€β”€ data.csv
β”œβ”€β”€ heroku.yml
β”œβ”€β”€ setup.py
β”œβ”€β”€ requirements.txt
β”œβ”€β”€ services
β”‚   β”œβ”€β”€ country_service.py
β”‚   └── statistics_service.py
└── templates
    β”œβ”€β”€ contacts.html
    β”œβ”€β”€ country_statistics.html
    β”œβ”€β”€ himydear.html
    β”œβ”€β”€ idunnocommand.html
    β”œβ”€β”€ notfound.html
    └── query_statistics.html



, :


python -m venv env #   
sourse env/bin/activate #    

( ) , , , .


- - , :


deactivate #   


. , .


touch requirements.txt #   
touch setup.py #  python 

:


  • Requests β€” HTTP
  • Pytelegrambotapi β€” Telegram API
  • Pymongo β€” Mongo
  • Dependency_injector β€”
  • Geocoder β€” Geonames API
  • Jinja2 β€”
  • Ciso8601 β€” ISO 8601
  • Cachetools β€”
  • Pandas β€”
  • Flask β€” web framework python

, .


requests==2.23.0
pytelegrambotapi==3.6.7
pymongo==3.10.1
dependency_injector==3.15.6
geocoder==1.38.1
jinja2==2.11.1
ciso8601==2.1.3
cachetools==4.0.0
pandas==1.0.3
flask==1.1.2

. , API, ;).



requirements.txt , :


pip install -r requirements.txt #  

, .


setup.py
# /setup.py file
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# dependencies
import telebot
import os

from telebot import types

# bot initialization
token = os.getenv('API_BOT_TOKEN')
bot = telebot.TeleBot(token)


# start command handler
@bot.message_handler(commands=['start'])
def command_start_handler(message):
    cid = message.chat.id
    bot.send_chat_action(cid, 'typing')
    markup = types.ReplyKeyboardMarkup(row_width=1, resize_keyboard=True)
    button_geo = types.KeyboardButton(text='send location', request_location=True)
    markup.add(button_geo)
    bot.send_message(cid, 'Hello stranger, please choose commands from the menu', reply_markup=markup)


# application entry point
if __name__ == '__main__':
    bot.polling(none_stop=True, interval=0)


, API_BOT_TOKEN, pycharm IDE . β€œEdit configurations”, , , token .





Telegram .



, β€œsend location”, Β«contactsΒ», Β«helpΒ» -.


HTML Markdown, Telegram . HTML .


setup.py
# /setup.py file
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

known_users = []
user_steps = {}
commands = {
    'start': 'Start using this bot',
    'help': 'Useful information about this bot',
    'contacts': 'Contacts'
}

# decorator for bot actions
def send_action(action):
    """Sends `action` while processing func command."""

    def decorator(func):
        @wraps(func)
        def command_func(message, *args, **kwargs):
            bot.send_chat_action(chat_id=message.chat.id, action=action)
            return func(message, *args, **kwargs)
        return command_func
    return decorator


# help command handler
@bot.message_handler(commands=['help'])
@send_action('typing')
def command_help_handler(message):
    help_text = 'The following commands are available: \n'
    for key in commands:
        help_text += '/' + key + ': '
        help_text += commands[key] + '\n'
    help_text += 'ANTICOVID19BOT speaks english, be careful and take care!'
    bot.send_message(message.chat.id, help_text)


# contacts command handler
@bot.message_handler(commands=['contacts'])
@send_action('typing')
def command_contacts_handler(message):
    with codecs.open('templates/contacts.html') as file:
        template = Template(file.read())
        bot.send_message(message.chat.id, template.render(username=message.chat.username), parse_mode='HTML')


.


mongo_context.py
# /data/mongo_context.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# dependencies
import os

from datetime import datetime
from pymongo import MongoClient


class MongoDbContext:
    """Mongo database context class"""

    # constructor of class
    def __init__(self):
        try:
            self.connection_string = os.getenv('CONNECTION_STRING')
            self.client = MongoClient(self.connection_string)
        except Exception as e:
            raise e

    # save user query method
    def save_query(self, country, user_name):
        db = self.client[os.getenv('DB_NAME')]
        countries_stats = db.country_stats
        result = countries_stats.insert_one({'date': datetime.now(), 'country': country, 'username': user_name})

    # get users queries method
    def get_users_queries(self):
        db = self.client[os.getenv('DB_NAME')]
        countries_stats = db.country_stats
        queries = countries_stats.aggregate([
            {'$group': {'_id': '$country', 'count': {'$sum': 1}}},
            {'$sort': {'count': -1}},
            {'$limit': 5}
        ])
        users = countries_stats.aggregate([
            {'$group': {'_id': '$username', 'count': {'$sum': 1}}},
            {'$sort': {'count': -1}},
            {'$limit': 5}
        ])
        return {'queries': list(queries), 'users': list(users)}



.


containers.py
# /common/containers.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# dependencies

import dependency_injector.containers as containers
import dependency_injector.providers as providers
from data.mongo_context import MongoDBContext


class DbContext(containers.DeclarativeContainer):
    """di for future development"""
    mongo_db_context = providers.Singleton(MongoDBContext)


API. API , , (, ) Telegram.


country_service.py
# /services/country_service.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# dependencies

import os
import requests

from cachetools import cached, TTLCache


class CountryService:
    """This class provide country information"""

    # 3 hours cache time
    cache = TTLCache(maxsize=100, ttl=10800)

    # method for getting country information by lat nad lng
    @cached(cache)
    def get_country_information(self, latitude, longitude):
        url = 'http://api.geonames.org/countrySubdivisionJSON'
        query_string = {'lat': latitude, 'lng': longitude, 'username': os.getenv('GEO_NAME_API_KEY')}
        geo_result = requests.request('GET', url, params=query_string)
        return geo_result.json()


statistics_service.py
# /services/statistics_service.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# dependencies

import os
import requests
import codecs
import ciso8601

from jinja2 import Template
from cachetools import cached, TTLCache
from common.containers import DBContext


class StatisticsService:
    """This class provide information about statistics"""

    # cache time 3 hours
    cache = TTLCache(maxsize=100, ttl=10800)

    def __init__(self):
        try:
            self.covid_api_token = os.getenv('COVID_STAT_API_TOKEN')
            self.db_context = DBContext.mongo_db_context()
        except Exception as e:
            raise e

    # method for getting statistics from API
    def __get_statistics_by_country_from_api(self, country_name):
        url = "https://covid-193.p.rapidapi.com/statistics"
        query_string = {'country': country_name}
        headers = {
            'x-rapidapi-host': "covid-193.p.rapidapi.com",
            'x-rapidapi-key': "db21e48371msh30968ff2ec637d3p19bd08jsn32426bcabdaf"
        }
        response = requests.request("GET", url, headers=headers, params=query_string)
        return response.json()

    # method for rendering statistics as html
    @cached(cache)
    def __get_statistics_by_country_as_html(self, country_name):
        try:
            statistics_json = self.__get_statistics_by_country_from_api(country_name)
            if len(statistics_json['response']) == 0:
                with codecs.open('templates/idunnocommand.html', 'r', encoding='UTF-8') as file:
                    template = Template(file.read())
                    return template.render(text_command=country_name)
            else:
                with codecs.open('templates/country_statistics.html', 'r', encoding='UTF-8') as file:
                    template = Template(file.read())
                    return template.render(date=ciso8601.parse_datetime(statistics_json['response'][0]['time']).date(),
                                           country=statistics_json['response'][0]['country'].upper(),
                                           new_cases=statistics_json['response'][0]['cases']['new'],
                                           active_cases=statistics_json['response'][0]['cases']['active'],
                                           critical_cases=statistics_json['response'][0]['cases']['critical'],
                                           recovered_cases=statistics_json['response'][0]['cases']['recovered'],
                                           total_cases=statistics_json['response'][0]['cases']['total'],
                                           new_deaths=statistics_json['response'][0]['deaths']['new'],
                                           total_deaths=statistics_json['response'][0]['deaths']['total'])
        except Exception as e:
            raise e

    # method for getting statistics by country_name
    def get_statistics_by_country_name(self, country_name, user_name):
        self.db_context.save_query(country_name, user_name)
        return self.__get_statistics_by_country_as_html(country_name)

    # method for getting statistics of users and queries
    def get_statistics_of_users_queries(self):
        query_statistics = self.db_context.get_users_queries()
        with codecs.open('templates/query_statistics.html', 'r', encoding='UTF-8') as file:
            template = Template(file.read())
            return template.render(queries=query_statistics['queries'], users=query_statistics['users'])



. chatbase, csv , , . , alekskram.


tg_analitycs.py
# /common/tg_analitycs.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# dependencies

import csv
import datetime
import os
import pandas as pd

users_type = {
    1: '',
    2: '',
    3: '',
    4: ''
}
day_type = {
    1: '',
    2: '',
    3: '',
    4: ''
}


# remove txt file
def remove(user_id):
    path = os.getcwd() + '/%s.txt' % user_id
    os.remove(path)


# write data to csv
def statistics(user_id, command):
    data = datetime.datetime.today().strftime("%Y-%m-%d")
    with open('data.csv', 'a', newline="") as fil:
        wr = csv.writer(fil, delimiter=';')
        wr.writerow([data, user_id, command])


# make report
def analysis(bid, user_id):
    season = int(bid[1])
    df = pd.read_csv('data.csv', delimiter=';', encoding='utf8')
    number_of_users = len(df['id'].unique())
    number_of_days = len(df['data'].unique())

    message_to_user = '    %s %s: \n' % (season, day_type.get(season, ''))
    message_to_user += '    %s %s \n' % (number_of_days, day_type.get(season, ''))
    if season > number_of_days:
        season = number_of_days
        message_to_user += '    , \n' \
                           '      \n'

    df_user = df.groupby(['data', 'id']).count().reset_index().groupby('data').count().reset_index()
    list_of_dates_in_df_user = list(df_user['data'])
    list_of_number_of_user_in_df_user = list(df_user['id'])
    list_of_dates_in_df_user = list_of_dates_in_df_user[-season:]
    list_of_number_of_user_in_df_user = list_of_number_of_user_in_df_user[-season:]
    df_command = df.groupby(['data', 'command']).count().reset_index()
    unique_commands = df['command'].unique()
    commands_in_each_day = []
    list_of_dates_in_df_command = list(df_command['data'])
    list_of_number_of_user_in_df_command = list(df_command['id'])
    list_of_name_of_command_in_df_command = list(df_command['command'])
    commands_in_this_day = dict()
    for i in range(len(list_of_dates_in_df_command)):
        commands_in_this_day[list_of_name_of_command_in_df_command[i]] = list_of_number_of_user_in_df_command[i]
        if i + 1 >= len(list_of_dates_in_df_command) or list_of_dates_in_df_command[i] != list_of_dates_in_df_command[
            i + 1]:
            commands_in_each_day.append(commands_in_this_day)
            commands_in_this_day = dict()
    commands_in_each_day = commands_in_each_day[-season:]

    if '' in bid:
        message_to_user += '     ' + '%s' % number_of_users \
                           + ' %s ' % users_type.get(number_of_users, '') + '\n' \
                                                                                         '   %s %s: \n' % (
                               season, day_type.get(season, ''))
        for days, number, comm_day in zip(list_of_dates_in_df_user, list_of_number_of_user_in_df_user,
                                          commands_in_each_day):
            message_to_user += ':%s :%d   :%s\n' % (days, number, comm_day.get('/start', 0))
    if '' in bid:
        message_to_user += '    %s %s: \n' % (season, day_type.get(season, ''))
        for days, commands in zip(list_of_dates_in_df_user, commands_in_each_day):
            message_to_user += ':%s\n' % days
            for i in unique_commands:
                if i in commands:
                    message_to_user += '%s - %s \n' % (i, commands.get(i))
                else:
                    message_to_user += '%s - 0 \n' % i

    if 'txt' in bid or '' in bid:
        with open('%s.txt' % user_id, 'w', encoding='UTF-8') as fil:
            fil.write(message_to_user)
            fil.close()
    else:
        return message_to_user



.


setup.py
# /setup.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# dependencies

import telebot
import os
import codecs
import common.tg_analytics as tga

from functools import wraps
from telebot import types
from jinja2 import Template
from services.country_service import CountryService
from services.statistics_service import StatisticsService

# bot initialization
token = os.getenv('API_BOT_TOKEN')
bot = telebot.TeleBot(token)
user_steps = {}
known_users = []
stats_service = StatisticsService()
country_service = CountryService()
commands = {'start': 'Start using this bot',
            'country': 'Please, write a country name',
            'statistics': 'Statistics by users queries',
            'help': 'Useful information about this bot',
            'contacts': 'Developer contacts'}


def get_user_step(uid):
    if uid in user_steps:
        return user_steps[uid]
    else:
        known_users.append(uid)
        user_steps[uid] = 0
        return user_steps[uid]


# decorator for bot actions
def send_action(action):

    def decorator(func):
        @wraps(func)
        def command_func(message, *args, **kwargs):
            bot.send_chat_action(chat_id=message.chat.id, action=action)
            return func(message, *args, **kwargs)
        return command_func
    return decorator


# decorator for save user activity
def save_user_activity():

    def decorator(func):
        @wraps(func)
        def command_func(message, *args, **kwargs):
            tga.statistics(message.chat.id, message.text)
            return func(message, *args, **kwargs)
        return command_func
    return decorator


# start command handler
@bot.message_handler(commands=['start'])
@send_action('typing')
@save_user_activity()
def start_command_handler(message):
    cid = message.chat.id
    markup = types.ReplyKeyboardMarkup(row_width=1, resize_keyboard=True)
    button_geo = types.KeyboardButton(text='send location', request_location=True)
    markup.add(button_geo)
    bot.send_message(cid, 'Hello, {0}, please choose command from the menu'.format(message.chat.username),
                     reply_markup=markup)
    help_command_handler(message)


# country command handler
@bot.message_handler(commands=['country'])
@send_action('typing')
@save_user_activity()
def country_command_handler(message):
    cid = message.chat.id
    user_steps[cid] = 1
    bot.send_message(cid, '{0}, write name of country please'.format(message.chat.username))


# geo command handler
@bot.message_handler(content_types=['location'])
@send_action('typing')
@save_user_activity()
def geo_command_handler(message):
    cid = message.chat.id
    geo_result = country_service.get_country_information(message.location.latitude, message.location.longitude)
    statistics = stats_service.get_statistics_by_country_name(geo_result['countryName'], message.chat.username)
    user_steps[cid] = 0
    bot.send_message(cid, statistics, parse_mode='HTML')


# country statistics command handler
@bot.message_handler(func=lambda message: get_user_step(message.chat.id) == 1)
@send_action('typing')
@save_user_activity()
def country_statistics_command_handler(message):
    cid = message.chat.id
    country_name = message.text.strip()

    try:
        statistics = stats_service.get_statistics_by_country_name(country_name, message.chat.username)
    except Exception as e:
        raise e

    user_steps[cid] = 0
    bot.send_message(cid, statistics, parse_mode='HTML')


# query statistics command handler
@bot.message_handler(commands=['statistics'])
@send_action('typing')
@save_user_activity()
def statistics_command_handler(message):
    cid = message.chat.id
    bot.send_message(cid, stats_service.get_statistics_of_users_queries(), parse_mode='HTML')


# contacts command handler
@bot.message_handler(commands=['contacts'])
@send_action('typing')
@save_user_activity()
def contacts_command_handler(message):
    cid = message.chat.id
    with codecs.open('templates/contacts.html', 'r', encoding='UTF-8') as file:
        template = Template(file.read())
        bot.send_message(cid, template.render(user_name=message.chat.username), parse_mode='HTML')


# help command handler
@bot.message_handler(commands=['help'])
@send_action('typing')
@save_user_activity()
def help_command_handler(message):
    cid = message.chat.id
    help_text = 'The following commands are available \n'
    for key in commands:
        help_text += '/' + key + ': '
        help_text += commands[key] + '\n'
    help_text += 'ANTI_COVID_19_BOT speaks english, be careful and take care'
    bot.send_message(cid, help_text)


# hi command handler
@bot.message_handler(func=lambda message: message.text.lower() == 'hi')
@send_action('typing')
@save_user_activity()
def hi_command_handler(message):
    cid = message.chat.id
    with codecs.open('templates/himydear.html', 'r', encoding='UTF-8') as file:
        template = Template(file.read())
        bot.send_message(cid, template.render(user_name=message.chat.username), parse_mode='HTML')


# default text messages and hidden statistics command handler
@bot.message_handler(func=lambda message: True, content_types=['text'])
@send_action('typing')
@save_user_activity()
def default_command_handler(message):
    cid = message.chat.id
    if message.text[:int(os.getenv('PASS_CHAR_COUNT'))] == os.getenv('STAT_KEY'):
        st = message.text.split(' ')
        if 'txt' in st:
            tga.analysis(st, cid)
            with codecs.open('%s.txt' % cid, 'r', encoding='UTF-8') as file:
                bot.send_document(cid, file)
                tga.remove(cid)
        else:
            messages = tga.analysis(st, cid)
            bot.send_message(cid, messages)
    else:
        with codecs.open('templates/idunnocommand.html', 'r', encoding='UTF-8') as file:
            template = Template(file.read())
            bot.send_message(cid, template.render(text_command=message.text), parse_mode='HTML')

# application entry point
if __name__ == '__main__':
    bot.polling(none_stop=True, interval=0)



, HTML Β«templatesΒ»




  • API_BOT_TOKEN β€” Telegram
  • COVID_STAT_API_TOKEN β€” API
  • GEO_NAME_API_KEY β€” API geonames
  • CONNECTION_STRING β€”
  • STAT_KEY β€”
  • DB_NAME β€”


Telegram BotFather . . , , Botfather . BotFather , , .


start, help


statistics


country


contacts


send location




Long polling webhook


heroku, :


  1. β€œHobby”, 30 , ,
  2. Telegram long polling webhook , , , HEROKU_URL

# /setup.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# dependencies

from flask import Flask, request

# set webhook
server = Flask(__name__)


@server.route('/' + token, methods=['POST'])
def get_message():
    bot.process_new_updates([telebot.types.Update.de_json(request.stream.read().decode("utf-8"))])
    return "!", 200


@server.route("/")
def web_hook():
    bot.remove_webhook()
    bot.set_webhook(url=os.getenv('HEROKU_URL') + token)
    return "!", 200

if __name__ == "__main__":
    server.run(host="0.0.0.0", port=int(os.environ.get('PORT', 8443)))

, Docker , PAAS Heroku, CI Github Heroku.


Docker


Dockerfile . Dockerfile .


#         
FROM python:3.7  
#      
COPY /requirements.txt /app/requirements.txt 
#   
WORKDIR /app  
#         
RUN pip install -r /app/requirements.txt
#         
COPY . /app
#   
CMD python /app/setup.py

PAAS Heroku


Heroku . , :



MongoDb , (connection string) . Heroku, Heroku Docker .


cd project_folder_name # go to project folder
heroku container:login # login to Heroku
Heroku apps # see available apps and copy name to next command
heroku container:push web --app anticovid19bot #push docker image to Heroku
heroku container:release web --app anticovid19bot #release docker image to Heroku
heroku logs --tail --app anticovid19bot # see what’s going on (logs)


Heroku , CI.


CI Github Heroku


CI heroku.yml . docker , .


build:
  docker:
    web: Dockerfile

heroku.yml Github. heroku, β€œDeploy” Github β€œconnect to github” heroku .


, COVID-19 c Telegram.



, , . , .


, 10 000 , , 10 000 , , ? , β€œ10000 ” , , , , , Β« Β».
, .


, Telegram, .


. !


P.S...


, .


- .



All Articles