Proceso ETL para recuperar datos de correo electrónico en Apache Airflow


No importa cuánta tecnología se desarrolle, una serie de enfoques obsoletos siempre se extiende para el desarrollo. Esto puede deberse a una transición suave, el factor humano, las necesidades tecnológicas o algo más. En el campo del procesamiento de datos, lo más significativo en esta parte son las fuentes de datos. No importa cómo soñamos con deshacernos de esto, pero hasta ahora algunos de los datos se han enviado en mensajería instantánea y correos electrónicos, sin mencionar formatos más arcaicos. Te invito a que cortes una de las opciones para Apache Airflow, que ilustra cómo puedes tomar datos de correos electrónicos.


Antecedentes


Muchos datos aún se transmiten por correo electrónico, comenzando con comunicaciones interpersonales y terminando con los estándares de interacción entre empresas. Es bueno si puede escribir una interfaz para obtener los datos o poner personas en la oficina que llevarán esta información a fuentes más convenientes, pero a menudo tal oportunidad simplemente no existe. La tarea específica que encontré fue conectar el conocido sistema CRM al almacén de datos y luego al sistema OLAP. Históricamente sucedió que para nuestra compañía el uso de este sistema era conveniente en una sola área de negocios. Por lo tanto, todos realmente querían poder operar con datos de este sistema de terceros también. En primer lugar, por supuesto, se exploró la posibilidad de obtener datos de una API abierta. Desafortunadamente,La API no cubrió la recepción de todos los datos necesarios y, en términos simples, estaba un poco torcida, y el soporte técnico no quiso o no pudo avanzar para proporcionar una funcionalidad más completa. Pero este sistema proporcionó la capacidad de recibir periódicamente los datos faltantes por correo en forma de un enlace para descargar el archivo.


, , . , , .


Apache Airflow


ETL Apache Airflow. , , , , .


Apache Airflow — , , ETL (Extract-Transform-Loading) Python. Airflow , — , — . Python , . , . :


  • — , ;
  • — ;
  • — , , ( );
  • ..

Apache Airflow . .



, , :


  • ;
  • ;
  • .

from airflow.hooks.base_hook import BaseHook
import imaplib
import logging

class IMAPHook(BaseHook):
    def __init__(self, imap_conn_id):
        """
           IMAP hook      

           :param imap_conn_id:          
           :type imap_conn_id:        string
        """
        self.connection = self.get_connection(imap_conn_id)
        self.mail = None

    def authenticate(self):
        """ 
              
        """
        mail = imaplib.IMAP4_SSL(self.connection.host)
        response, detail = mail.login(user=self.connection.login, password=self.connection.password)
        if response != "OK":
            raise AirflowException("Sign in failed")
        else:
            self.mail = mail

    def get_last_mail(self, check_seen=True, box="INBOX", condition="(UNSEEN)"):
        """
                 , 
              

            :param check_seen:          
            :type check_seen:       bool
            :param box:              
            :type box:              string
            :param condition:         
            :type condition:        string
        """
        self.authenticate()
        self.mail.select(mailbox=box)
        response, data = self.mail.search(None, condition)
        mail_ids = data[0].split()
        logging.info("    : " + str(mail_ids))

        if not mail_ids:
            logging.info("   ")
            return None

        mail_id = mail_ids[0]

        #    
        if len(mail_ids) > 1:
            #   
            for id in mail_ids:
                self.mail.store(id, "+FLAGS", "\\Seen")

            #  
            mail_id = mail_ids[-1]

        #     
        if not check_seen:
            self.mail.store(mail_id, "-FLAGS", "\\Seen")

        return mail_id

: , , — . , . , , — . , .


: . , , . , , : , , , .. , , .


    def download_from_url(self, url, path, chunk_size=128):
        """
               

            :param url:               
            :type url:               string
            :param path:               
            :type path:              string
            :param chunk_size:          
            :type chunk_size:        int
        """
        r = requests.get(url, stream=True)
        with open(path, "wb") as fd:
            for chunk in r.iter_content(chunk_size=chunk_size):
                fd.write(chunk)

    def download_mail_href_attachment(self, mail_id, path):
        """
                   

            :param mail_id:          
            :type mail_id:          string
            :param path:              
            :type path:             string
        """
        response, data = self.mail.fetch(mail_id, "(RFC822)")
        raw_email = data[0][1]
        raw_soup = raw_email.decode().replace("\r", "").replace("\n", "")
        parse_soup = BeautifulSoup(raw_soup, "html.parser")
        link_text = ""

        for a in parse_soup.find_all("a", href=True, text=True):
            link_text = a["href"]

        self.download_from_url(link_text, path)

, . imap_conn_id. Apache Airflow (, , ), .




, . , , , , , , (API, , ..). . CRM , UUID. SIP- , UUID, . , , . , , , . .


, , , .


from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
from my_plugin.hooks.imap_hook import IMAPHook

class MailSensor(BaseSensorOperator):
    @apply_defaults
    def __init__(self, conn_id, check_seen=True, box="Inbox", condition="(UNSEEN)", *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.conn_id = conn_id
        self.check_seen = check_seen
        self.box = box
        self.condition = condition

    def poke(self, context):
        conn = IMAPHook(self.conn_id)
        mail_id = conn.get_last_mail(check_seen=self.check_seen, box=self.box, condition=self.condition)

        if mail_id is None:
            return False
        else:
            return True


, . — , PythonOperator


from airflow.models import DAG

from airflow.operators.python_operator import PythonOperator
from airflow.sensors.my_plugin import MailSensor
from my_plugin.hooks.imap_hook import IMAPHook

start_date = datetime(2020, 4, 4)

#   
args = {
    "owner": "example",
    "start_date": start_date,
    "email": ["home@home.ru"],
    "email_on_failure": False,
    "email_on_retry": False,
    "retry_delay": timedelta(minutes=15),
    "provide_context": False,
}

dag = DAG(
    dag_id="test_etl",
    default_args=args,
    schedule_interval="@hourly",
)

#  
mail_check_sensor = MailSensor(
    task_id="check_new_emails",
    poke_interval=10,
    conn_id="mail_conn_id",
    timeout=10,
    soft_fail=True,
    box="my_box",
    dag=dag,
    mode="poke",
)

#      
def prepare_mail():
    imap_hook = IMAPHook("mail_conn_id")
    mail_id = imap_hook.get_last_mail(check_seen=True, box="my_box")
    if mail_id is None:
        raise AirflowException("Empty mailbox")

    conn.download_mail_href_attachment(mail_id, "./path.zip")

prepare_mail_data = PythonOperator(task_id="prepare_mail_data", default_args=args, dag=dag, python_callable= prepare_mail)

#    
...

#    
mail_check_sensor >> prepare_mail_data
prepare_data >> ...
#    

, mail.ru, , .. 2016 , , , . , - . , (UNSEEN).


Resumiendo, tenemos la siguiente secuencia: verificamos si hay nuevas letras que cumplan con las condiciones, si las hay, luego descargamos el archivo usando el enlace de la última letra.
Debajo de los últimos puntos, se omite que este archivo se desempaquetará, los datos del archivo se limpiarán y procesarán, y como resultado, todo esto irá más allá del proceso de ETL, pero esto ya está fuera del alcance del artículo. Si resultó interesante y útil, con mucho gusto continuaré describiendo las soluciones ETL y sus partes para Apache Airflow.


All Articles