用于在Apache Airflow中检索电子邮件数据的ETL过程


无论开发了多少技术,一连串过时的方法总是会不断发展。这可能是由于平稳过渡,人为因素,技术需求或其他原因造成的。在数据处理领域,这部分最重要的是数据源。不管我们梦想如何摆脱这种情况,但到目前为止,一些数据已通过即时通讯工具和电子邮件发送,更不用说更古老的格式了。我邀请您减少Apache Airflow的选项之一,它说明了如何从电子邮件中获取数据。


背景


许多数据仍然通过电子邮件传输,从人际交流开始,以公司之间的交互标准结束。如果您可以编写一个界面来获取数据或将人员带到将这些信息带到更方便的来源的办公室中,这是很好的,但是通常这样的机会可能根本不存在。我遇到的具体任务是将著名的CRM系统连接到数据仓库,然后再连接到OLAP系统。历史上发生过这样的事情,对于我们公司而言,在单个业务领域中使用此系统非常方便。因此,每个人都真正希望能够同时使用该第三方系统中的数据。首先,当然,探讨了从开放的API获取数据的可能性。不幸,API没有涵盖所有必要数据的接收,并且简单来说,它有点歪斜,并且为了提供更全面的功能,技术支持人员也不愿或无法满足要求。但是,该系统提供了以链接的形式定期通过邮件接收丢失的数据的功能,以卸载档案。


, , . , , .


Apache Airflow


ETL Apache Airflow. , , , , .


Apache Airflow — , , ETL (Extract-Transform-Loading) Python. Airflow , — , — . Python , . , . :


  • — , ;
  • — ;
  • — , , ( );
  • ..

Apache Airflow . .



, , :


  • ;
  • ;
  • .

from airflow.hooks.base_hook import BaseHook
import imaplib
import logging

class IMAPHook(BaseHook):
    def __init__(self, imap_conn_id):
        """
           IMAP hook      

           :param imap_conn_id:          
           :type imap_conn_id:        string
        """
        self.connection = self.get_connection(imap_conn_id)
        self.mail = None

    def authenticate(self):
        """ 
              
        """
        mail = imaplib.IMAP4_SSL(self.connection.host)
        response, detail = mail.login(user=self.connection.login, password=self.connection.password)
        if response != "OK":
            raise AirflowException("Sign in failed")
        else:
            self.mail = mail

    def get_last_mail(self, check_seen=True, box="INBOX", condition="(UNSEEN)"):
        """
                 , 
              

            :param check_seen:          
            :type check_seen:       bool
            :param box:              
            :type box:              string
            :param condition:         
            :type condition:        string
        """
        self.authenticate()
        self.mail.select(mailbox=box)
        response, data = self.mail.search(None, condition)
        mail_ids = data[0].split()
        logging.info("    : " + str(mail_ids))

        if not mail_ids:
            logging.info("   ")
            return None

        mail_id = mail_ids[0]

        #    
        if len(mail_ids) > 1:
            #   
            for id in mail_ids:
                self.mail.store(id, "+FLAGS", "\\Seen")

            #  
            mail_id = mail_ids[-1]

        #     
        if not check_seen:
            self.mail.store(mail_id, "-FLAGS", "\\Seen")

        return mail_id

: , , — . , . , , — . , .


: . , , . , , : , , , .. , , .


    def download_from_url(self, url, path, chunk_size=128):
        """
               

            :param url:               
            :type url:               string
            :param path:               
            :type path:              string
            :param chunk_size:          
            :type chunk_size:        int
        """
        r = requests.get(url, stream=True)
        with open(path, "wb") as fd:
            for chunk in r.iter_content(chunk_size=chunk_size):
                fd.write(chunk)

    def download_mail_href_attachment(self, mail_id, path):
        """
                   

            :param mail_id:          
            :type mail_id:          string
            :param path:              
            :type path:             string
        """
        response, data = self.mail.fetch(mail_id, "(RFC822)")
        raw_email = data[0][1]
        raw_soup = raw_email.decode().replace("\r", "").replace("\n", "")
        parse_soup = BeautifulSoup(raw_soup, "html.parser")
        link_text = ""

        for a in parse_soup.find_all("a", href=True, text=True):
            link_text = a["href"]

        self.download_from_url(link_text, path)

, . imap_conn_id. Apache Airflow (, , ), .




, . , , , , , , (API, , ..). . CRM , UUID. SIP- , UUID, . , , . , , , . .


, , , .


from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
from my_plugin.hooks.imap_hook import IMAPHook

class MailSensor(BaseSensorOperator):
    @apply_defaults
    def __init__(self, conn_id, check_seen=True, box="Inbox", condition="(UNSEEN)", *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.conn_id = conn_id
        self.check_seen = check_seen
        self.box = box
        self.condition = condition

    def poke(self, context):
        conn = IMAPHook(self.conn_id)
        mail_id = conn.get_last_mail(check_seen=self.check_seen, box=self.box, condition=self.condition)

        if mail_id is None:
            return False
        else:
            return True


, . — , PythonOperator


from airflow.models import DAG

from airflow.operators.python_operator import PythonOperator
from airflow.sensors.my_plugin import MailSensor
from my_plugin.hooks.imap_hook import IMAPHook

start_date = datetime(2020, 4, 4)

#   
args = {
    "owner": "example",
    "start_date": start_date,
    "email": ["home@home.ru"],
    "email_on_failure": False,
    "email_on_retry": False,
    "retry_delay": timedelta(minutes=15),
    "provide_context": False,
}

dag = DAG(
    dag_id="test_etl",
    default_args=args,
    schedule_interval="@hourly",
)

#  
mail_check_sensor = MailSensor(
    task_id="check_new_emails",
    poke_interval=10,
    conn_id="mail_conn_id",
    timeout=10,
    soft_fail=True,
    box="my_box",
    dag=dag,
    mode="poke",
)

#      
def prepare_mail():
    imap_hook = IMAPHook("mail_conn_id")
    mail_id = imap_hook.get_last_mail(check_seen=True, box="my_box")
    if mail_id is None:
        raise AirflowException("Empty mailbox")

    conn.download_mail_href_attachment(mail_id, "./path.zip")

prepare_mail_data = PythonOperator(task_id="prepare_mail_data", default_args=args, dag=dag, python_callable= prepare_mail)

#    
...

#    
mail_check_sensor >> prepare_mail_data
prepare_data >> ...
#    

, mail.ru, , .. 2016 , , , . , - . , (UNSEEN).


总而言之,我们按以下顺序进行:检查是否有符合条件的新字母(如果有),然后使用最后一个字母的链接下载档案。
在最后一点下,省略了该归档文件的解压缩,归档文件中的数据将被清理和处理,最后,整个过程将进一步扩展到ETL处理管道,但这已经超出了本文的范围。如果结果有趣且有用,我将很高兴继续描述Apache Airflow的ETL解决方案及其组成部分。


All Articles