
Unser Team erstellt einen Informationsdienst zur Anzeige globaler Daten für viele Länder, Städte und Gebiete - Routitude . Ende Februar dieses Jahres veranlasste uns die rasche Verbreitung des Coronavirus auf der ganzen Welt, zusätzliche Funktionen zur Überwachung der Situation in unserer Anwendung einzuführen. Neben der Visualisierung von Daten in der Weboberfläche war die Hauptkomponente dieser Aufgabe ein in Python geschriebener Microservice, der das beliebte Flask-Webframework verwendet.
Der Dienst aktualisiert regelmäßig Daten aus verschiedenen Quellen und stellt auf Anfrage die erforderlichen Informationen für die Visualisierung in der Weboberfläche bereit. Die Hauptdatenquelle sind Wikipedia-Seiten zur Verbreitung des Virus in Ländern und Gebieten. Die Tabellen mit Indikatoren auf diesen Seiten werden schnell aktualisiert und eignen sich hervorragend als Datenquelle für den Dienst zur Überwachung der Ausbreitung von Infektionen.
In diesem Artikel werde ich auf die Hauptkomponenten des Dienstes eingehen, vom Empfangen und Aktualisieren von Daten bis zum Erstellen von APIs für Clientanforderungen. Projektcode ist im Github-Repository verfügbar .
, Python Flask . , , . , COVID-19.
Flask
Python , ORM Python SQLAlchemy. Routitude PostgreSQL.
:
pip install requirements.txt
URI :
export COVID19API_DB_URI=< URI, : postgresql://localhost/covid19api>
. :
/api API
covid.py API c COVID-19
/datasources
/test
test_covid.py COVID-19
covid_wiki.py COVID-19
utils.py
/mirgations SQLAlchemy
... Flask-Migrate (Alembic)
/test
test_app.py (API)
app.py HTTP
appvars.py
config.py
manage.py CLI
models.py ORM
requirements.txt
, :
. github , , .
. :
:
html read_html Pandas. . . , , . , , . , . Pandas DataFrame, , html .
datasources/utils.py
import pandas as pd
def get_wiki_table_df(page_url, match_string):
response = requests.get(page_url)
tables = pd.read_html(response.content)
df = None
for table in tables:
df = table
if match_string in str(df):
break
return df
. . , COVID-19 .
datasources/covid_wiki.py
def get_report_countries():
url = (
'https://en.wikipedia.org/wiki/'
'2019%E2%80%9320_coronavirus_pandemic_by_country_and_territory'
)
df = utils.get_wiki_table_df(url, 'Locations[b]')
df = pd.DataFrame(
df.values[:, 1:5],
columns=['country', 'confirmed', 'deaths', 'recovered']
)
df = df[~df['country'].isna()]
df['country'] = df['country'].apply(lambda x: utils.clean_territory_name(x))
df.drop(df[df['country'].str.len() > 40].index, inplace=True)
df = utils.wiki_table_df_numeric_column_clean(df, [
'confirmed', 'deaths', 'recovered'
])
df['state'] = None
check_report(df)
return df
html , , , .
datasources/test/test_covid.py
from unittest import TestCase
from datasources import covid_wiki
class TestCovid(TestCase):
def test_get_wiki_report(self):
report = covid_wiki.get_report_countries()
self.assertTrue('Russia' in list(report['country']))
self.assertTrue(report.shape[0] > 0)
:
nosetests datasources
SQLAlchemy ORM Flask
, , . Python , , , . - (Object-Relational Mapping, ORM). Python ORM SQLAlchemy. , , , SQLAchemy . Alembic. , Flask , Flask-Migrate. appvars.py manage.py.
ORM Model SQLAlchemy. SQLAchemy . , . ORM .
models.py
class CovidWiki(db.Model):
__tablename__ = 'covid_wiki'
territory_id = Column(
db.VARCHAR(length=256), nullable=False, primary_key=True
)
update_time = Column(db.TIMESTAMP(), nullable=False)
country = Column(db.VARCHAR(length=128), nullable=False)
state = Column(db.VARCHAR(length=128), nullable=True)
confirmed = Column(db.INTEGER(), nullable=True)
deaths = Column(db.INTEGER(), nullable=True)
recovered = Column(db.INTEGER(), nullable=True)
Index('ix_covid_wiki_country', CovidWiki.country)
Index('ix_covid_wiki_state', CovidWiki.state)
, , Alembic:
python manage.py db init
migrations . ORM :
python manage.py db migrate -m covid_wiki
, , , , , , . migrations/versions. , , python, . 2 — upgrate downgrade. ORM , , .
, . :
python manage.py db upgrade
.
COVID-19
ORM , pandas DataFrame , , , . , . , , SQLAlchemy.
models.py
def update_data_by_dataframe(self, df):
report = df.to_dict(orient='records')
report_last = self.get_wiki_last_report()
for value in report:
territory_id = self.get_id(value['country'], value['state'])
value['territory_id'] = territory_id
changed = (
(len(report_last) == 0) or
(territory_id not in report_last) or
(utils.get_covid_values_sum(value) !=
utils.get_covid_values_sum(report_last[territory_id]))
)
if not changed:
continue
logging.info(f"Updating data for territory: {territory_id}")
data = dict(value)
for name in utils.STAT_NAMES:
value = data[name]
if np.isnan(value):
data[name] = None
continue
data[name] = int(value)
data['update_time'] = datetime.datetime.now()
report = CovidWiki(**data)
db.session.merge(report)
db.session.commit()
. .
models.py
def update_data(self):
logging.info('Updating countries data')
self.update_data_by_dataframe(covid_wiki.get_report_countries())
logging.info('Updating Russian states data')
self.update_data_by_dataframe(covid_wiki.get_report_ru())
logging.info('Updating USA states data')
self.update_data_by_dataframe(covid_wiki.get_report_us())
Flask Flask-Script . , , manager.command . .
manage.py
@manager.command
def update_covid_data():
CovidWiki().update_data()
, , :
python manage.py update_covid_data
- , cron.
API COVID-19 Flask
. Flask . , SQLAlchemy . , COVID-19:
api/covid.py
def get_covid_countries_report():
data = db.session.query(CovidWiki).filter(CovidWiki.state.is_(None)).all()
return [v.to_dict() for v in data]
def get_covid_states_report_by_country(country):
data = db.session.query(CovidWiki).filter(and_(
CovidWiki.state.isnot(None),
func.lower(CovidWiki.country) == country.lower(),
)).all()
return [v.to_dict() for v in data]
def get_covid_total_stats():
def to_dict(v):
return {'confirmed': v[0], 'deaths': v[1], 'recovered': v[2]}
curr = db.session.query(
func.sum(CovidWiki.confirmed),
func.sum(CovidWiki.deaths),
func.sum(CovidWiki.recovered),
func.max(CovidWiki.update_time)
).filter(CovidWiki.state.is_(None)).one()
return {
'data': to_dict(curr),
'last_update_time': utils.datetime2string(curr[3], time=True)
}
API. , , .
app.py
@app.route('/covid/countries')
def get_covid_countries_report():
report = covid_api.get_covid_countries_report()
check_data(report)
return jsonify(report)
@app.route('/covid/states/<string:country>')
def get_covid_states_report_by_country(country):
report = covid_api.get_covid_states_report_by_country(country)
check_data(report)
return jsonify(report)
@app.route('/covid/total')
def get_covid_total_stats():
report = covid_api.get_covid_total_stats()
check_data(report)
return jsonify(report)
Flask .
python app.py
, , , curl.
curl http://localhost:5000/covid/total
. test, :
nosetests
-. Python gunicorn uWSGI.
, . API , . , .
In Routitude verwenden wir diesen Dienst, um die aktuellen COVID-19-Infektionsraten auf der Karte und in den Dashboards anzuzeigen. Alle Quellmaterialien sind im Github-Repository verfügbar . Verbesserungen, Fehlerbehebungen, neue Funktionen und Daten sind willkommen. Ich freue mich über Kommentare zu dem Artikel und Vorschläge zur Verbesserung des Projekts.