
我们的团队正在创建一个信息服务,以显示许多国家,城市和地区的全球数据-Routitude。到今年2月底,冠状病毒在世界范围内的迅速传播促使我们引入了附加功能来监视应用程序中的情况。除了在Web界面中可视化数据之外,此任务的主要组件是使用流行的Flask Web框架以Python编写的微服务。
该服务会定期更新来自各种来源的数据,并根据要求提供必要的信息以在Web界面中进行可视化。主要数据来源是有关病毒在国家和地区传播的维基百科页面。这些页面上带有指示符的表格已快速更新,并且非常适合作为监视感染蔓延的服务的数据源。
在本文中,我将讨论服务的主要组件,从接收和更新数据到创建用于客户端请求的API。项目代码在github仓库中可用。
, Python Flask . , , . , COVID-19.
Flask
Python , ORM Python SQLAlchemy. Routitude PostgreSQL.
:
pip install requirements.txt
URI :
export COVID19API_DB_URI=< URI, : postgresql://localhost/covid19api>
. :
/api API
covid.py API c COVID-19
/datasources
/test
test_covid.py COVID-19
covid_wiki.py COVID-19
utils.py
/mirgations SQLAlchemy
... Flask-Migrate (Alembic)
/test
test_app.py (API)
app.py HTTP
appvars.py
config.py
manage.py CLI
models.py ORM
requirements.txt
, :
. github , , .
. :
:
html read_html Pandas. . . , , . , , . , . Pandas DataFrame, , html .
datasources/utils.py
import pandas as pd
def get_wiki_table_df(page_url, match_string):
response = requests.get(page_url)
tables = pd.read_html(response.content)
df = None
for table in tables:
df = table
if match_string in str(df):
break
return df
. . , COVID-19 .
datasources/covid_wiki.py
def get_report_countries():
url = (
'https://en.wikipedia.org/wiki/'
'2019%E2%80%9320_coronavirus_pandemic_by_country_and_territory'
)
df = utils.get_wiki_table_df(url, 'Locations[b]')
df = pd.DataFrame(
df.values[:, 1:5],
columns=['country', 'confirmed', 'deaths', 'recovered']
)
df = df[~df['country'].isna()]
df['country'] = df['country'].apply(lambda x: utils.clean_territory_name(x))
df.drop(df[df['country'].str.len() > 40].index, inplace=True)
df = utils.wiki_table_df_numeric_column_clean(df, [
'confirmed', 'deaths', 'recovered'
])
df['state'] = None
check_report(df)
return df
html , , , .
datasources/test/test_covid.py
from unittest import TestCase
from datasources import covid_wiki
class TestCovid(TestCase):
def test_get_wiki_report(self):
report = covid_wiki.get_report_countries()
self.assertTrue('Russia' in list(report['country']))
self.assertTrue(report.shape[0] > 0)
:
nosetests datasources
SQLAlchemy ORM Flask
, , . Python , , , . - (Object-Relational Mapping, ORM). Python ORM SQLAlchemy. , , , SQLAchemy . Alembic. , Flask , Flask-Migrate. appvars.py manage.py.
ORM Model SQLAlchemy. SQLAchemy . , . ORM .
models.py
class CovidWiki(db.Model):
__tablename__ = 'covid_wiki'
territory_id = Column(
db.VARCHAR(length=256), nullable=False, primary_key=True
)
update_time = Column(db.TIMESTAMP(), nullable=False)
country = Column(db.VARCHAR(length=128), nullable=False)
state = Column(db.VARCHAR(length=128), nullable=True)
confirmed = Column(db.INTEGER(), nullable=True)
deaths = Column(db.INTEGER(), nullable=True)
recovered = Column(db.INTEGER(), nullable=True)
Index('ix_covid_wiki_country', CovidWiki.country)
Index('ix_covid_wiki_state', CovidWiki.state)
, , Alembic:
python manage.py db init
migrations . ORM :
python manage.py db migrate -m covid_wiki
, , , , , , . migrations/versions. , , python, . 2 — upgrate downgrade. ORM , , .
, . :
python manage.py db upgrade
.
COVID-19
ORM , pandas DataFrame , , , . , . , , SQLAlchemy.
models.py
def update_data_by_dataframe(self, df):
report = df.to_dict(orient='records')
report_last = self.get_wiki_last_report()
for value in report:
territory_id = self.get_id(value['country'], value['state'])
value['territory_id'] = territory_id
changed = (
(len(report_last) == 0) or
(territory_id not in report_last) or
(utils.get_covid_values_sum(value) !=
utils.get_covid_values_sum(report_last[territory_id]))
)
if not changed:
continue
logging.info(f"Updating data for territory: {territory_id}")
data = dict(value)
for name in utils.STAT_NAMES:
value = data[name]
if np.isnan(value):
data[name] = None
continue
data[name] = int(value)
data['update_time'] = datetime.datetime.now()
report = CovidWiki(**data)
db.session.merge(report)
db.session.commit()
. .
models.py
def update_data(self):
logging.info('Updating countries data')
self.update_data_by_dataframe(covid_wiki.get_report_countries())
logging.info('Updating Russian states data')
self.update_data_by_dataframe(covid_wiki.get_report_ru())
logging.info('Updating USA states data')
self.update_data_by_dataframe(covid_wiki.get_report_us())
Flask Flask-Script . , , manager.command . .
manage.py
@manager.command
def update_covid_data():
CovidWiki().update_data()
, , :
python manage.py update_covid_data
- , cron.
API COVID-19 Flask
. Flask . , SQLAlchemy . , COVID-19:
api/covid.py
def get_covid_countries_report():
data = db.session.query(CovidWiki).filter(CovidWiki.state.is_(None)).all()
return [v.to_dict() for v in data]
def get_covid_states_report_by_country(country):
data = db.session.query(CovidWiki).filter(and_(
CovidWiki.state.isnot(None),
func.lower(CovidWiki.country) == country.lower(),
)).all()
return [v.to_dict() for v in data]
def get_covid_total_stats():
def to_dict(v):
return {'confirmed': v[0], 'deaths': v[1], 'recovered': v[2]}
curr = db.session.query(
func.sum(CovidWiki.confirmed),
func.sum(CovidWiki.deaths),
func.sum(CovidWiki.recovered),
func.max(CovidWiki.update_time)
).filter(CovidWiki.state.is_(None)).one()
return {
'data': to_dict(curr),
'last_update_time': utils.datetime2string(curr[3], time=True)
}
API. , , .
app.py
@app.route('/covid/countries')
def get_covid_countries_report():
report = covid_api.get_covid_countries_report()
check_data(report)
return jsonify(report)
@app.route('/covid/states/<string:country>')
def get_covid_states_report_by_country(country):
report = covid_api.get_covid_states_report_by_country(country)
check_data(report)
return jsonify(report)
@app.route('/covid/total')
def get_covid_total_stats():
report = covid_api.get_covid_total_stats()
check_data(report)
return jsonify(report)
Flask .
python app.py
, , , curl.
curl http://localhost:5000/covid/total
. test, :
nosetests
-. Python gunicorn uWSGI.
, . API , . , .
在Routitude,我们使用此服务在地图和仪表板上显示当前的COVID-19感染率。所有源材料都可以在github存储库中找到。欢迎进行任何改进,错误修复,新功能和数据。对于本文的任何评论和改进项目的建议,我都会感到高兴。