
TL; DR;
本文介绍了如何在相当大的实现中实施Apache Airflow来管理基于QlikView构建的报告更新作业。
依赖性地狱(英文依赖性地狱)是配置管理的反模式,是软件产品和库的相互依赖性图的扩展,这导致安装新产品和卸载旧产品很困难。
维基百科
, 2018 , BIA-Technologies.
QlikView " ".
Qlik QlikSense, QlikView, 12.4, .
, , .
QlikView "" (application) — , "QVW" ( ), , , .
, .
-- , , - QV. . , , . , "", ETL- , .
QV QVW , : , , . .
, , QVD. QlikView , , .
, ETL- , , QVD, , .
Qlik , , , (), - . QlikView QVD- . , QlikView, , , , .
, Qlik — NPrinting, , , , , SSRS, , QlikView, .
, QlikView:
- , , QlikView Server (QVS). .QVW .
- QlikView Distribution Service (QDS). Publisher , Reload Engine, QlikView, : , QVS, " ", , , , . , , , QlikView, , .
- , QlikView Management Service (QMS) — .
- , , - —
, MS SQL, ETL QlikView .
( ) , .QVW , . , . , , , QVD.
:
- , , .
- , - , 8-9 , . , , - , - 4 , , .
QMS , , , . , ( Reload Engine, . Publisher , rusbaron , ).
QMS :

, , .
, QMS . (, ).
, , , .
, .
, .
, .
, , , ( ), , , - , ..
, .
- , , , . , , .
, , ETL-, , , . , , , ETL- .
.
, — , , ,- — , .
- , , .
, . - , -. .
, . , , , . , , , . , , — .
, QDS . , - .
, . : , .
? . . ? .
, QlikView , , , .
, , , QlikView, . , , Hadoop ETL- .
QlikView , . , , .
QlikView : QMS QlikView Management Service API, . EDX (Event Driven Execution). , , QMS .
, " ", , , , , NPrinting.
, , — (Workflow management systems). , Apache Airflow.
Airflow — , . , , :

— , -.
Airflow , , Airflow QMS, Airflow — , , . , , : -> Airflow — QlikView.
API QMS . , .
Airflow — , . , , , , .
Airflow . , (, , ), QlikView, , , .
Airflow . , , - . , , : , . , , , , , , .
Airflow (DAG — directed acyclic graph), , — .
, "Application 7" "ETL_4" "ETL_5". "ETL_3" "TimeSensor_6_30" — , 6:30.

Python.
tasksDict = {
u'ETL_1.qvw': {},
u'ETL_2.qvw': {
'Pool': 'Heavy_ETL_pool',
},
u'ETL_3.qvw': {
'StartTime': [6, 30]},
u'ETL_4.qvw': {
'Priority': -5,
'Dep': [
u'ETL_1.qvw',
u'ETL_3.qvw', ]},
u'ETL_5.qvw': {
'Dep': [
u'ETL_2.qvw', ]},
u'Application_6.qvw': {
'Dep': [
u'ETL_1.qvw',
u'ETL_5.qvw', ]},
u'Application_7.qvw': {
'Dep': [
u'ETL_4.qvw',
u'ETL_5.qvw',
]},
}
Airflow Python, , , . QlikView . , ( , , ) DAG. , , DAG .
: (5 ), (1 ) (100 ). . Airflow — — , , , , , , , , .
, DAG,from datetime import datetime, timedelta
from airflow import DAG
from airflow.sensors.time_delta_sensor import TimeDeltaSensor
from airflow.contrib.operators.qds_operator import QDSReloadApplicationOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2018, 1, 19),
'email': ['Airflow.Administrator@mycompany.ru'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 0,
'retry_delay': timedelta(minutes=5),
'pool': 'default_pool',
}
dag = DAG('example_qds_operator',
description='Reload applications at QlikView Distribution Service',
catchup=False,
schedule_interval='0 21 * * 5',
default_args=default_args)
tasksDict = {
u'ETL_1.qvw': {},
u'ETL_2.qvw': {
'Pool': 'Heavy_ETL_pool',
},
u'ETL_3.qvw': {
'StartTime': [6, 30]},
u'ETL_4.qvw': {
'Priority': -5,
'Dep': [
u'ETL_1.qvw',
u'ETL_3.qvw', ]},
u'ETL_5.qvw': {
'Dep': [
u'ETL_2.qvw', ]},
u'Application_6.qvw': {
'Dep': [
u'ETL_1.qvw',
u'ETL_5.qvw', ]},
u'Application_7.qvw': {
'Dep': [
u'ETL_4.qvw',
u'ETL_5.qvw',
]},
}
airflowTasksDict = {}
for task in tasksDict.keys():
task_id = task.replace(" ", "_").replace("'", "").replace("/", "_").replace("(", "_").replace(")", "_").replace(",", "_").replace(".qvw", "").replace("__",
"_")
AirflowTask = QDSReloadApplicationOperator(document_name=task, task_id=task_id, qv_conn_id='qv_connection', dag=dag)
airflowTasksDict[task] = AirflowTask
for task in tasksDict.keys():
if 'Dep' in tasksDict[task]:
for dep in tasksDict[task]['Dep']:
airflowTasksDict[task].set_upstream(airflowTasksDict[dep])
if 'Pool' in tasksDict[task]:
airflowTasksDict[task].pool = tasksDict[task]['Pool']
if 'Priority' in tasksDict[task]:
airflowTasksDict[task].priority_weight = tasksDict[task]['Priority']
if 'StartTime' in tasksDict[task]:
hour = tasksDict[task]['StartTime'][0]
minute = tasksDict[task]['StartTime'][1]
sensorTime = timedelta(hours=hour, minutes=minute)
sensorTaskID = u'TimeSensor_{}_{}'.format(hour, minute)
if sensorTaskID not in airflowTasksDict:
SensorTask = TimeDeltaSensor(delta=sensorTime, task_id=sensorTaskID, pool='Sensors', dag=dag)
airflowTasksDict[sensorTaskID] = SensorTask
airflowTasksDict[task].set_upstream(airflowTasksDict[sensorTaskID])
if __name__ == '__main__':
dag.clear(reset_dag_runs=True)
dag.run()
, Airflow QlikView:

Airflow .
.
- , - , .
, ETL- , , .
, Airflow . .
, , .
, — 1 . , -, .
. , , , . , .
.
Airflow建立了非常有用的任务绩效统计报告,我们可以跟踪工作持续时间增长的动态,并接收其他报告,例如,甘特图:

Combat Airflow在具有两个内核和4 Gb RAM的虚拟机中与我们一起工作,这足以使服务平稳运行。
总的来说,我相信我们已经成功解决了更新QlikView报告的问题,而Airflow在这方面帮助我们发挥了不可估量的作用。
