Processo ETL para recuperar dados de email no Apache Airflow


Independentemente da quantidade de tecnologia desenvolvida, uma série de abordagens desatualizadas sempre se estende ao desenvolvimento. Isso pode ser devido a uma transição suave, fator humano, necessidades tecnológicas ou outra coisa. No campo do processamento de dados, os mais significativos nesta parte são as fontes de dados. Não importa como sonhamos em nos livrar disso, mas até agora alguns dados foram enviados em mensagens instantâneas e e-mails, sem mencionar formatos mais arcaicos. Convido você a cortar uma das opções do Apache Airflow, que ilustra como você pode obter dados de emails.


fundo


Muitos dados ainda são transmitidos por e-mail, começando com as comunicações interpessoais e terminando com os padrões de interação entre as empresas. É bom que você possa escrever uma interface para obter dados ou colocar pessoas no escritório que levarão essas informações a fontes mais convenientes, mas muitas vezes essa oportunidade pode simplesmente não existir. A tarefa específica que encontrei foi conectar o conhecido sistema CRM ao armazém de dados e depois ao sistema OLAP. Aconteceu historicamente que, para nossa empresa, o uso desse sistema era conveniente em uma única área de negócios. Portanto, todos realmente queriam poder operar com dados desse sistema de terceiros. Antes de tudo, é claro, a possibilidade de obter dados de uma API aberta foi explorada. Infelizmente,A API não cobriu o recebimento de todos os dados necessários e, em termos simples, estava um pouco torta, e o suporte técnico não quis ou não pôde se reunir para fornecer funcionalidade mais abrangente. Mas esse sistema forneceu a capacidade de receber periodicamente os dados ausentes por correio na forma de um link para descarregar o arquivo morto.


, , . , , .


Apache Airflow


ETL Apache Airflow. , , , , .


Apache Airflow — , , ETL (Extract-Transform-Loading) Python. Airflow , — , — . Python , . , . :


  • — , ;
  • — ;
  • — , , ( );
  • ..

Apache Airflow . .



, , :


  • ;
  • ;
  • .

from airflow.hooks.base_hook import BaseHook
import imaplib
import logging

class IMAPHook(BaseHook):
    def __init__(self, imap_conn_id):
        """
           IMAP hook      

           :param imap_conn_id:          
           :type imap_conn_id:        string
        """
        self.connection = self.get_connection(imap_conn_id)
        self.mail = None

    def authenticate(self):
        """ 
              
        """
        mail = imaplib.IMAP4_SSL(self.connection.host)
        response, detail = mail.login(user=self.connection.login, password=self.connection.password)
        if response != "OK":
            raise AirflowException("Sign in failed")
        else:
            self.mail = mail

    def get_last_mail(self, check_seen=True, box="INBOX", condition="(UNSEEN)"):
        """
                 , 
              

            :param check_seen:          
            :type check_seen:       bool
            :param box:              
            :type box:              string
            :param condition:         
            :type condition:        string
        """
        self.authenticate()
        self.mail.select(mailbox=box)
        response, data = self.mail.search(None, condition)
        mail_ids = data[0].split()
        logging.info("    : " + str(mail_ids))

        if not mail_ids:
            logging.info("   ")
            return None

        mail_id = mail_ids[0]

        #    
        if len(mail_ids) > 1:
            #   
            for id in mail_ids:
                self.mail.store(id, "+FLAGS", "\\Seen")

            #  
            mail_id = mail_ids[-1]

        #     
        if not check_seen:
            self.mail.store(mail_id, "-FLAGS", "\\Seen")

        return mail_id

: , , — . , . , , — . , .


: . , , . , , : , , , .. , , .


    def download_from_url(self, url, path, chunk_size=128):
        """
               

            :param url:               
            :type url:               string
            :param path:               
            :type path:              string
            :param chunk_size:          
            :type chunk_size:        int
        """
        r = requests.get(url, stream=True)
        with open(path, "wb") as fd:
            for chunk in r.iter_content(chunk_size=chunk_size):
                fd.write(chunk)

    def download_mail_href_attachment(self, mail_id, path):
        """
                   

            :param mail_id:          
            :type mail_id:          string
            :param path:              
            :type path:             string
        """
        response, data = self.mail.fetch(mail_id, "(RFC822)")
        raw_email = data[0][1]
        raw_soup = raw_email.decode().replace("\r", "").replace("\n", "")
        parse_soup = BeautifulSoup(raw_soup, "html.parser")
        link_text = ""

        for a in parse_soup.find_all("a", href=True, text=True):
            link_text = a["href"]

        self.download_from_url(link_text, path)

, . imap_conn_id. Apache Airflow (, , ), .




, . , , , , , , (API, , ..). . CRM , UUID. SIP- , UUID, . , , . , , , . .


, , , .


from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
from my_plugin.hooks.imap_hook import IMAPHook

class MailSensor(BaseSensorOperator):
    @apply_defaults
    def __init__(self, conn_id, check_seen=True, box="Inbox", condition="(UNSEEN)", *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.conn_id = conn_id
        self.check_seen = check_seen
        self.box = box
        self.condition = condition

    def poke(self, context):
        conn = IMAPHook(self.conn_id)
        mail_id = conn.get_last_mail(check_seen=self.check_seen, box=self.box, condition=self.condition)

        if mail_id is None:
            return False
        else:
            return True


, . — , PythonOperator


from airflow.models import DAG

from airflow.operators.python_operator import PythonOperator
from airflow.sensors.my_plugin import MailSensor
from my_plugin.hooks.imap_hook import IMAPHook

start_date = datetime(2020, 4, 4)

#   
args = {
    "owner": "example",
    "start_date": start_date,
    "email": ["home@home.ru"],
    "email_on_failure": False,
    "email_on_retry": False,
    "retry_delay": timedelta(minutes=15),
    "provide_context": False,
}

dag = DAG(
    dag_id="test_etl",
    default_args=args,
    schedule_interval="@hourly",
)

#  
mail_check_sensor = MailSensor(
    task_id="check_new_emails",
    poke_interval=10,
    conn_id="mail_conn_id",
    timeout=10,
    soft_fail=True,
    box="my_box",
    dag=dag,
    mode="poke",
)

#      
def prepare_mail():
    imap_hook = IMAPHook("mail_conn_id")
    mail_id = imap_hook.get_last_mail(check_seen=True, box="my_box")
    if mail_id is None:
        raise AirflowException("Empty mailbox")

    conn.download_mail_href_attachment(mail_id, "./path.zip")

prepare_mail_data = PythonOperator(task_id="prepare_mail_data", default_args=args, dag=dag, python_callable= prepare_mail)

#    
...

#    
mail_check_sensor >> prepare_mail_data
prepare_data >> ...
#    

, mail.ru, , .. 2016 , , , . , - . , (UNSEEN).


Resumindo, temos a seguinte sequência: verificamos se há novas letras que atendem às condições, se houver, e depois baixamos o arquivo usando o link da última letra.
Sob os últimos pontos, é omitido que esse arquivo será descompactado, os dados do arquivo serão limpos e processados ​​e, como resultado, tudo isso irá além do pipeline ETL do processo, mas isso já está além do escopo do artigo. Se for interessante e útil, terei prazer em continuar descrevendo as soluções ETL e suas partes para o Apache Airflow.


All Articles