
Nuestro equipo está creando un servicio de información para mostrar datos globales de muchos países, ciudades y territorios: Rutitud . A fines de febrero de este año, la rápida propagación del coronavirus en todo el mundo nos impulsó a introducir funcionalidades adicionales para monitorear la situación en nuestra aplicación. Además de visualizar datos en la interfaz web, el componente principal de esta tarea fue un microservicio escrito en Python usando el popular marco web Flask.
El servicio actualiza periódicamente los datos de varias fuentes y, previa solicitud, proporciona la información necesaria para su visualización en la interfaz web. La principal fuente de datos son las páginas de Wikipedia sobre la propagación del virus en países y territorios. Las tablas con indicadores en estas páginas se actualizan rápidamente y son excelentes como fuente de datos para el servicio de monitoreo de la propagación de la infección.
En el artículo, hablaré sobre los componentes principales del servicio, desde recibir y actualizar datos hasta crear API para solicitudes de clientes. El código del proyecto está disponible en el repositorio de github .
, Python Flask . , , . , COVID-19.
Flask
Python , ORM Python SQLAlchemy. Routitude PostgreSQL.
:
pip install requirements.txt
URI :
export COVID19API_DB_URI=< URI, : postgresql://localhost/covid19api>
. :
/api API
covid.py API c COVID-19
/datasources
/test
test_covid.py COVID-19
covid_wiki.py COVID-19
utils.py
/mirgations SQLAlchemy
... Flask-Migrate (Alembic)
/test
test_app.py (API)
app.py HTTP
appvars.py
config.py
manage.py CLI
models.py ORM
requirements.txt
, :
. github , , .
. :
:
html read_html Pandas. . . , , . , , . , . Pandas DataFrame, , html .
datasources/utils.py
import pandas as pd
def get_wiki_table_df(page_url, match_string):
response = requests.get(page_url)
tables = pd.read_html(response.content)
df = None
for table in tables:
df = table
if match_string in str(df):
break
return df
. . , COVID-19 .
datasources/covid_wiki.py
def get_report_countries():
url = (
'https://en.wikipedia.org/wiki/'
'2019%E2%80%9320_coronavirus_pandemic_by_country_and_territory'
)
df = utils.get_wiki_table_df(url, 'Locations[b]')
df = pd.DataFrame(
df.values[:, 1:5],
columns=['country', 'confirmed', 'deaths', 'recovered']
)
df = df[~df['country'].isna()]
df['country'] = df['country'].apply(lambda x: utils.clean_territory_name(x))
df.drop(df[df['country'].str.len() > 40].index, inplace=True)
df = utils.wiki_table_df_numeric_column_clean(df, [
'confirmed', 'deaths', 'recovered'
])
df['state'] = None
check_report(df)
return df
html , , , .
datasources/test/test_covid.py
from unittest import TestCase
from datasources import covid_wiki
class TestCovid(TestCase):
def test_get_wiki_report(self):
report = covid_wiki.get_report_countries()
self.assertTrue('Russia' in list(report['country']))
self.assertTrue(report.shape[0] > 0)
:
nosetests datasources
SQLAlchemy ORM Flask
, , . Python , , , . - (Object-Relational Mapping, ORM). Python ORM SQLAlchemy. , , , SQLAchemy . Alembic. , Flask , Flask-Migrate. appvars.py manage.py.
ORM Model SQLAlchemy. SQLAchemy . , . ORM .
models.py
class CovidWiki(db.Model):
__tablename__ = 'covid_wiki'
territory_id = Column(
db.VARCHAR(length=256), nullable=False, primary_key=True
)
update_time = Column(db.TIMESTAMP(), nullable=False)
country = Column(db.VARCHAR(length=128), nullable=False)
state = Column(db.VARCHAR(length=128), nullable=True)
confirmed = Column(db.INTEGER(), nullable=True)
deaths = Column(db.INTEGER(), nullable=True)
recovered = Column(db.INTEGER(), nullable=True)
Index('ix_covid_wiki_country', CovidWiki.country)
Index('ix_covid_wiki_state', CovidWiki.state)
, , Alembic:
python manage.py db init
migrations . ORM :
python manage.py db migrate -m covid_wiki
, , , , , , . migrations/versions. , , python, . 2 — upgrate downgrade. ORM , , .
, . :
python manage.py db upgrade
.
COVID-19
ORM , pandas DataFrame , , , . , . , , SQLAlchemy.
models.py
def update_data_by_dataframe(self, df):
report = df.to_dict(orient='records')
report_last = self.get_wiki_last_report()
for value in report:
territory_id = self.get_id(value['country'], value['state'])
value['territory_id'] = territory_id
changed = (
(len(report_last) == 0) or
(territory_id not in report_last) or
(utils.get_covid_values_sum(value) !=
utils.get_covid_values_sum(report_last[territory_id]))
)
if not changed:
continue
logging.info(f"Updating data for territory: {territory_id}")
data = dict(value)
for name in utils.STAT_NAMES:
value = data[name]
if np.isnan(value):
data[name] = None
continue
data[name] = int(value)
data['update_time'] = datetime.datetime.now()
report = CovidWiki(**data)
db.session.merge(report)
db.session.commit()
. .
models.py
def update_data(self):
logging.info('Updating countries data')
self.update_data_by_dataframe(covid_wiki.get_report_countries())
logging.info('Updating Russian states data')
self.update_data_by_dataframe(covid_wiki.get_report_ru())
logging.info('Updating USA states data')
self.update_data_by_dataframe(covid_wiki.get_report_us())
Flask Flask-Script . , , manager.command . .
manage.py
@manager.command
def update_covid_data():
CovidWiki().update_data()
, , :
python manage.py update_covid_data
- , cron.
API COVID-19 Flask
. Flask . , SQLAlchemy . , COVID-19:
api/covid.py
def get_covid_countries_report():
data = db.session.query(CovidWiki).filter(CovidWiki.state.is_(None)).all()
return [v.to_dict() for v in data]
def get_covid_states_report_by_country(country):
data = db.session.query(CovidWiki).filter(and_(
CovidWiki.state.isnot(None),
func.lower(CovidWiki.country) == country.lower(),
)).all()
return [v.to_dict() for v in data]
def get_covid_total_stats():
def to_dict(v):
return {'confirmed': v[0], 'deaths': v[1], 'recovered': v[2]}
curr = db.session.query(
func.sum(CovidWiki.confirmed),
func.sum(CovidWiki.deaths),
func.sum(CovidWiki.recovered),
func.max(CovidWiki.update_time)
).filter(CovidWiki.state.is_(None)).one()
return {
'data': to_dict(curr),
'last_update_time': utils.datetime2string(curr[3], time=True)
}
API. , , .
app.py
@app.route('/covid/countries')
def get_covid_countries_report():
report = covid_api.get_covid_countries_report()
check_data(report)
return jsonify(report)
@app.route('/covid/states/<string:country>')
def get_covid_states_report_by_country(country):
report = covid_api.get_covid_states_report_by_country(country)
check_data(report)
return jsonify(report)
@app.route('/covid/total')
def get_covid_total_stats():
report = covid_api.get_covid_total_stats()
check_data(report)
return jsonify(report)
Flask .
python app.py
, , , curl.
curl http://localhost:5000/covid/total
. test, :
nosetests
-. Python gunicorn uWSGI.
, . API , . , .
En Routitude, utilizamos este servicio para mostrar las tasas actuales de infección por COVID-19 en el mapa y los paneles. Todos los materiales de origen están disponibles en el repositorio de github . Cualquier mejora, corrección de errores, nuevas características y datos son bienvenidos. Estaré encantado de cualquier comentario sobre el artículo y sugerencias para mejorar el proyecto.