ETL-Prozess zum Abrufen von E-Mail-Daten in Apache Airflow


Unabhängig davon, wie viel Technologie sich entwickelt, erstreckt sich eine Reihe veralteter Ansätze immer über die Entwicklung. Dies kann auf einen reibungslosen Übergang, den menschlichen Faktor, technologische Bedürfnisse oder etwas anderes zurückzuführen sein. Im Bereich der Datenverarbeitung sind Datenquellen in diesem Teil am wichtigsten. Egal wie wir davon geträumt haben, dies loszuwerden, bis jetzt wurden einige der Daten in Instant Messenger und E-Mail gesendet, ganz zu schweigen von archaischeren Formaten. Ich lade Sie ein, eine der Optionen für Apache Airflow zu kürzen, die zeigt, wie Sie Daten aus E-Mails entnehmen können.


Hintergrund


Viele Daten werden immer noch per E-Mail übertragen, angefangen bei der zwischenmenschlichen Kommunikation bis hin zu den Standards der Interaktion zwischen Unternehmen. Es ist gut, wenn Sie eine Schnittstelle schreiben können, um die Daten abzurufen, oder Personen ins Büro bringen, die diese Informationen an bequemere Quellen weitergeben. Oft besteht eine solche Möglichkeit jedoch einfach nicht. Die spezielle Aufgabe bestand darin, das bekannte CRM-System mit dem Data Warehouse und anschließend mit dem OLAP-System zu verbinden. Historisch gesehen war die Verwendung dieses Systems für unser Unternehmen in einem einzigen Geschäftsbereich praktisch. Daher wollte wirklich jeder in der Lage sein, auch Daten von diesem Drittanbieter-System zu verarbeiten. Zunächst wurde natürlich die Möglichkeit untersucht, Daten von einer offenen API abzurufen. Leider,Die API deckte nicht den Empfang aller erforderlichen Daten ab und war in einfachen Worten etwas schief, und der technische Support wollte oder konnte sich nicht treffen, um umfassendere Funktionen bereitzustellen. Dieses System bot jedoch die Möglichkeit, die fehlenden Daten regelmäßig per E-Mail in Form eines Links zum Entladen des Archivs zu empfangen.


, , . , , .


Apache Airflow


ETL Apache Airflow. , , , , .


Apache Airflow — , , ETL (Extract-Transform-Loading) Python. Airflow , — , — . Python , . , . :


  • — , ;
  • — ;
  • — , , ( );
  • ..

Apache Airflow . .



, , :


  • ;
  • ;
  • .

from airflow.hooks.base_hook import BaseHook
import imaplib
import logging

class IMAPHook(BaseHook):
    def __init__(self, imap_conn_id):
        """
           IMAP hook      

           :param imap_conn_id:          
           :type imap_conn_id:        string
        """
        self.connection = self.get_connection(imap_conn_id)
        self.mail = None

    def authenticate(self):
        """ 
              
        """
        mail = imaplib.IMAP4_SSL(self.connection.host)
        response, detail = mail.login(user=self.connection.login, password=self.connection.password)
        if response != "OK":
            raise AirflowException("Sign in failed")
        else:
            self.mail = mail

    def get_last_mail(self, check_seen=True, box="INBOX", condition="(UNSEEN)"):
        """
                 , 
              

            :param check_seen:          
            :type check_seen:       bool
            :param box:              
            :type box:              string
            :param condition:         
            :type condition:        string
        """
        self.authenticate()
        self.mail.select(mailbox=box)
        response, data = self.mail.search(None, condition)
        mail_ids = data[0].split()
        logging.info("    : " + str(mail_ids))

        if not mail_ids:
            logging.info("   ")
            return None

        mail_id = mail_ids[0]

        #    
        if len(mail_ids) > 1:
            #   
            for id in mail_ids:
                self.mail.store(id, "+FLAGS", "\\Seen")

            #  
            mail_id = mail_ids[-1]

        #     
        if not check_seen:
            self.mail.store(mail_id, "-FLAGS", "\\Seen")

        return mail_id

: , , — . , . , , — . , .


: . , , . , , : , , , .. , , .


    def download_from_url(self, url, path, chunk_size=128):
        """
               

            :param url:               
            :type url:               string
            :param path:               
            :type path:              string
            :param chunk_size:          
            :type chunk_size:        int
        """
        r = requests.get(url, stream=True)
        with open(path, "wb") as fd:
            for chunk in r.iter_content(chunk_size=chunk_size):
                fd.write(chunk)

    def download_mail_href_attachment(self, mail_id, path):
        """
                   

            :param mail_id:          
            :type mail_id:          string
            :param path:              
            :type path:             string
        """
        response, data = self.mail.fetch(mail_id, "(RFC822)")
        raw_email = data[0][1]
        raw_soup = raw_email.decode().replace("\r", "").replace("\n", "")
        parse_soup = BeautifulSoup(raw_soup, "html.parser")
        link_text = ""

        for a in parse_soup.find_all("a", href=True, text=True):
            link_text = a["href"]

        self.download_from_url(link_text, path)

, . imap_conn_id. Apache Airflow (, , ), .




, . , , , , , , (API, , ..). . CRM , UUID. SIP- , UUID, . , , . , , , . .


, , , .


from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
from my_plugin.hooks.imap_hook import IMAPHook

class MailSensor(BaseSensorOperator):
    @apply_defaults
    def __init__(self, conn_id, check_seen=True, box="Inbox", condition="(UNSEEN)", *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.conn_id = conn_id
        self.check_seen = check_seen
        self.box = box
        self.condition = condition

    def poke(self, context):
        conn = IMAPHook(self.conn_id)
        mail_id = conn.get_last_mail(check_seen=self.check_seen, box=self.box, condition=self.condition)

        if mail_id is None:
            return False
        else:
            return True


, . — , PythonOperator


from airflow.models import DAG

from airflow.operators.python_operator import PythonOperator
from airflow.sensors.my_plugin import MailSensor
from my_plugin.hooks.imap_hook import IMAPHook

start_date = datetime(2020, 4, 4)

#   
args = {
    "owner": "example",
    "start_date": start_date,
    "email": ["home@home.ru"],
    "email_on_failure": False,
    "email_on_retry": False,
    "retry_delay": timedelta(minutes=15),
    "provide_context": False,
}

dag = DAG(
    dag_id="test_etl",
    default_args=args,
    schedule_interval="@hourly",
)

#  
mail_check_sensor = MailSensor(
    task_id="check_new_emails",
    poke_interval=10,
    conn_id="mail_conn_id",
    timeout=10,
    soft_fail=True,
    box="my_box",
    dag=dag,
    mode="poke",
)

#      
def prepare_mail():
    imap_hook = IMAPHook("mail_conn_id")
    mail_id = imap_hook.get_last_mail(check_seen=True, box="my_box")
    if mail_id is None:
        raise AirflowException("Empty mailbox")

    conn.download_mail_href_attachment(mail_id, "./path.zip")

prepare_mail_data = PythonOperator(task_id="prepare_mail_data", default_args=args, dag=dag, python_callable= prepare_mail)

#    
...

#    
mail_check_sensor >> prepare_mail_data
prepare_data >> ...
#    

, mail.ru, , .. 2016 , , , . , - . , (UNSEEN).


Zusammenfassend haben wir die folgende Reihenfolge: Wir prüfen, ob es neue Buchstaben gibt, die die Bedingungen erfüllen, falls vorhanden, und laden dann das Archiv über den Link aus dem letzten Buchstaben herunter.
Unter den letzten Punkten wird weggelassen, dass dieses Archiv entpackt wird, die Daten aus dem Archiv bereinigt und verarbeitet werden und infolgedessen das Ganze weiter in die ETL-Pipeline des Prozesses geht, aber dies geht bereits über den Rahmen des Artikels hinaus. Wenn es sich als interessant und nützlich herausstellte, werde ich gerne weiterhin ETL-Lösungen und deren Teile für Apache Airflow beschreiben.


All Articles