
TL; DR;
Este artículo describe cómo se implementó Apache Airflow para administrar trabajos de actualización de informes creados en QlikView en una implementación bastante grande.
El infierno de dependencia (infierno de dependencia en inglés) es el antipatrón de la gestión de la configuración, la expansión del gráfico de dependencias mutuas de productos de software y bibliotecas, lo que dificulta la instalación de productos nuevos y la desinstalación.
Wikipedia
, 2018 , BIA-Technologies.
QlikView " ".
Qlik QlikSense, QlikView, 12.4, .
, , .
QlikView "" (application) — , "QVW" ( ), , , .
, .
-- , , - QV. . , , . , "", ETL- , .
QV QVW , : , , . .
, , QVD. QlikView , , .
, ETL- , , QVD, , .
Qlik , , , (), - . QlikView QVD- . , QlikView, , , , .
, Qlik — NPrinting, , , , , SSRS, , QlikView, .
, QlikView:
- , , QlikView Server (QVS). .QVW .
- QlikView Distribution Service (QDS). Publisher , Reload Engine, QlikView, : , QVS, " ", , , , . , , , QlikView, , .
- , QlikView Management Service (QMS) — .
- , , - —
, MS SQL, ETL QlikView .
( ) , .QVW , . , . , , , QVD.
:
- , , .
- , - , 8-9 , . , , - , - 4 , , .
QMS , , , . , ( Reload Engine, . Publisher , rusbaron , ).
QMS :

, , .
, QMS . (, ).
, , , .
, .
, .
, .
, , , ( ), , , - , ..
, .
- , , , . , , .
, , ETL-, , , . , , , ETL- .
.
, — , , ,- — , .
- , , .
, . - , -. .
, . , , , . , , , . , , — .
, QDS . , - .
, . : , .
? . . ? .
, QlikView , , , .
, , , QlikView, . , , Hadoop ETL- .
QlikView , . , , .
QlikView : QMS QlikView Management Service API, . EDX (Event Driven Execution). , , QMS .
, " ", , , , , NPrinting.
, , — (Workflow management systems). , Apache Airflow.
Airflow — , . , , :

— , -.
Airflow , , Airflow QMS, Airflow — , , . , , : -> Airflow — QlikView.
API QMS . , .
Airflow — , . , , , , .
Airflow . , (, , ), QlikView, , , .
Airflow . , , - . , , : , . , , , , , , .
Airflow (DAG — directed acyclic graph), , — .
, "Application 7" "ETL_4" "ETL_5". "ETL_3" "TimeSensor_6_30" — , 6:30.

Python.
tasksDict = {
u'ETL_1.qvw': {},
u'ETL_2.qvw': {
'Pool': 'Heavy_ETL_pool',
},
u'ETL_3.qvw': {
'StartTime': [6, 30]},
u'ETL_4.qvw': {
'Priority': -5,
'Dep': [
u'ETL_1.qvw',
u'ETL_3.qvw', ]},
u'ETL_5.qvw': {
'Dep': [
u'ETL_2.qvw', ]},
u'Application_6.qvw': {
'Dep': [
u'ETL_1.qvw',
u'ETL_5.qvw', ]},
u'Application_7.qvw': {
'Dep': [
u'ETL_4.qvw',
u'ETL_5.qvw',
]},
}
Airflow Python, , , . QlikView . , ( , , ) DAG. , , DAG .
: (5 ), (1 ) (100 ). . Airflow — — , , , , , , , , .
, DAG,from datetime import datetime, timedelta
from airflow import DAG
from airflow.sensors.time_delta_sensor import TimeDeltaSensor
from airflow.contrib.operators.qds_operator import QDSReloadApplicationOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2018, 1, 19),
'email': ['Airflow.Administrator@mycompany.ru'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 0,
'retry_delay': timedelta(minutes=5),
'pool': 'default_pool',
}
dag = DAG('example_qds_operator',
description='Reload applications at QlikView Distribution Service',
catchup=False,
schedule_interval='0 21 * * 5',
default_args=default_args)
tasksDict = {
u'ETL_1.qvw': {},
u'ETL_2.qvw': {
'Pool': 'Heavy_ETL_pool',
},
u'ETL_3.qvw': {
'StartTime': [6, 30]},
u'ETL_4.qvw': {
'Priority': -5,
'Dep': [
u'ETL_1.qvw',
u'ETL_3.qvw', ]},
u'ETL_5.qvw': {
'Dep': [
u'ETL_2.qvw', ]},
u'Application_6.qvw': {
'Dep': [
u'ETL_1.qvw',
u'ETL_5.qvw', ]},
u'Application_7.qvw': {
'Dep': [
u'ETL_4.qvw',
u'ETL_5.qvw',
]},
}
airflowTasksDict = {}
for task in tasksDict.keys():
task_id = task.replace(" ", "_").replace("'", "").replace("/", "_").replace("(", "_").replace(")", "_").replace(",", "_").replace(".qvw", "").replace("__",
"_")
AirflowTask = QDSReloadApplicationOperator(document_name=task, task_id=task_id, qv_conn_id='qv_connection', dag=dag)
airflowTasksDict[task] = AirflowTask
for task in tasksDict.keys():
if 'Dep' in tasksDict[task]:
for dep in tasksDict[task]['Dep']:
airflowTasksDict[task].set_upstream(airflowTasksDict[dep])
if 'Pool' in tasksDict[task]:
airflowTasksDict[task].pool = tasksDict[task]['Pool']
if 'Priority' in tasksDict[task]:
airflowTasksDict[task].priority_weight = tasksDict[task]['Priority']
if 'StartTime' in tasksDict[task]:
hour = tasksDict[task]['StartTime'][0]
minute = tasksDict[task]['StartTime'][1]
sensorTime = timedelta(hours=hour, minutes=minute)
sensorTaskID = u'TimeSensor_{}_{}'.format(hour, minute)
if sensorTaskID not in airflowTasksDict:
SensorTask = TimeDeltaSensor(delta=sensorTime, task_id=sensorTaskID, pool='Sensors', dag=dag)
airflowTasksDict[sensorTaskID] = SensorTask
airflowTasksDict[task].set_upstream(airflowTasksDict[sensorTaskID])
if __name__ == '__main__':
dag.clear(reset_dag_runs=True)
dag.run()
, Airflow QlikView:

Airflow .
.
- , - , .
, ETL- , , .
, Airflow . .
, , .
, — 1 . , -, .
. , , , . , .
.
Airflow crea informes bastante útiles sobre estadísticas de rendimiento de tareas, y podemos rastrear la dinámica de crecimiento de la duración de la tarea, así como recibir otros informes, por ejemplo, un diagrama de Gantt:

Combat Airflow funciona en nuestra máquina virtual con dos núcleos y 4 Gb de RAM, y esto es suficiente para el buen funcionamiento del servicio.
En general, creo que hemos resuelto con éxito el problema de actualizar los informes QlikView, y Airflow nos ayudó de gran valor en esto.
