ETL process for retrieving email data in Apache Airflow


No matter how much technology develops, a string of outdated approaches always stretches for development. This may be due to a smooth transition, the human factor, technological needs, or something else. In the field of data processing, the most significant in this part are data sources. No matter how we dreamed of getting rid of this, but so far some of the data has been sent in instant messengers and emails, not to mention more archaic formats. I invite you to analyze one of the options for Apache Airflow, which illustrates how you can take data from emails.


Background


Many data are still transmitted via e-mail, starting with interpersonal communications and ending with the standards of interaction between companies. It’s good if you can write an interface to get the data or put people in the office who will bring this information to more convenient sources, but often such an opportunity may simply not exist. The specific task that I encountered was to connect the well-known CRM system to the data warehouse, and then to the OLAP system. It so happened historically that for our company the use of this system was convenient in a single area of ​​business. Therefore, everyone really wanted to be able to operate on data from this third-party system as well. First of all, of course, the possibility of obtaining data from an open API was explored. Unfortunately,The API did not cover the receipt of all the necessary data, and, in simple terms, it was a little crooked, and the technical support did not want or could not go forward to provide more comprehensive functionality. But this system provided the ability to periodically receive the missing data by mail in the form of a link for unloading the archive.


, , . , , .


Apache Airflow


ETL Apache Airflow. , , , , .


Apache Airflow β€” , , ETL (Extract-Transform-Loading) Python. Airflow , β€” , β€” . Python , . , . :


  • β€” , ;
  • β€” ;
  • β€” , , ( );
  • ..

Apache Airflow . .



, , :


  • ;
  • ;
  • .

from airflow.hooks.base_hook import BaseHook
import imaplib
import logging

class IMAPHook(BaseHook):
    def __init__(self, imap_conn_id):
        """
           IMAP hook      

           :param imap_conn_id:          
           :type imap_conn_id:        string
        """
        self.connection = self.get_connection(imap_conn_id)
        self.mail = None

    def authenticate(self):
        """ 
              
        """
        mail = imaplib.IMAP4_SSL(self.connection.host)
        response, detail = mail.login(user=self.connection.login, password=self.connection.password)
        if response != "OK":
            raise AirflowException("Sign in failed")
        else:
            self.mail = mail

    def get_last_mail(self, check_seen=True, box="INBOX", condition="(UNSEEN)"):
        """
                 , 
              

            :param check_seen:          
            :type check_seen:       bool
            :param box:              
            :type box:              string
            :param condition:         
            :type condition:        string
        """
        self.authenticate()
        self.mail.select(mailbox=box)
        response, data = self.mail.search(None, condition)
        mail_ids = data[0].split()
        logging.info("    : " + str(mail_ids))

        if not mail_ids:
            logging.info("   ")
            return None

        mail_id = mail_ids[0]

        #    
        if len(mail_ids) > 1:
            #   
            for id in mail_ids:
                self.mail.store(id, "+FLAGS", "\\Seen")

            #  
            mail_id = mail_ids[-1]

        #     
        if not check_seen:
            self.mail.store(mail_id, "-FLAGS", "\\Seen")

        return mail_id

: , , β€” . , . , , β€” . , .


: . , , . , , : , , , .. , , .


    def download_from_url(self, url, path, chunk_size=128):
        """
               

            :param url:               
            :type url:               string
            :param path:               
            :type path:              string
            :param chunk_size:          
            :type chunk_size:        int
        """
        r = requests.get(url, stream=True)
        with open(path, "wb") as fd:
            for chunk in r.iter_content(chunk_size=chunk_size):
                fd.write(chunk)

    def download_mail_href_attachment(self, mail_id, path):
        """
                   

            :param mail_id:          
            :type mail_id:          string
            :param path:              
            :type path:             string
        """
        response, data = self.mail.fetch(mail_id, "(RFC822)")
        raw_email = data[0][1]
        raw_soup = raw_email.decode().replace("\r", "").replace("\n", "")
        parse_soup = BeautifulSoup(raw_soup, "html.parser")
        link_text = ""

        for a in parse_soup.find_all("a", href=True, text=True):
            link_text = a["href"]

        self.download_from_url(link_text, path)

, . imap_conn_id. Apache Airflow (, , ), .




, . , , , , , , (API, , ..). . CRM , UUID. SIP- , UUID, . , , . , , , . .


, , , .


from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
from my_plugin.hooks.imap_hook import IMAPHook

class MailSensor(BaseSensorOperator):
    @apply_defaults
    def __init__(self, conn_id, check_seen=True, box="Inbox", condition="(UNSEEN)", *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.conn_id = conn_id
        self.check_seen = check_seen
        self.box = box
        self.condition = condition

    def poke(self, context):
        conn = IMAPHook(self.conn_id)
        mail_id = conn.get_last_mail(check_seen=self.check_seen, box=self.box, condition=self.condition)

        if mail_id is None:
            return False
        else:
            return True


, . β€” , PythonOperator


from airflow.models import DAG

from airflow.operators.python_operator import PythonOperator
from airflow.sensors.my_plugin import MailSensor
from my_plugin.hooks.imap_hook import IMAPHook

start_date = datetime(2020, 4, 4)

#   
args = {
    "owner": "example",
    "start_date": start_date,
    "email": ["home@home.ru"],
    "email_on_failure": False,
    "email_on_retry": False,
    "retry_delay": timedelta(minutes=15),
    "provide_context": False,
}

dag = DAG(
    dag_id="test_etl",
    default_args=args,
    schedule_interval="@hourly",
)

#  
mail_check_sensor = MailSensor(
    task_id="check_new_emails",
    poke_interval=10,
    conn_id="mail_conn_id",
    timeout=10,
    soft_fail=True,
    box="my_box",
    dag=dag,
    mode="poke",
)

#      
def prepare_mail():
    imap_hook = IMAPHook("mail_conn_id")
    mail_id = imap_hook.get_last_mail(check_seen=True, box="my_box")
    if mail_id is None:
        raise AirflowException("Empty mailbox")

    conn.download_mail_href_attachment(mail_id, "./path.zip")

prepare_mail_data = PythonOperator(task_id="prepare_mail_data", default_args=args, dag=dag, python_callable= prepare_mail)

#    
...

#    
mail_check_sensor >> prepare_mail_data
prepare_data >> ...
#    

, mail.ru, , .. 2016 , , , . , - . , (UNSEEN).


Summarizing, we have the following sequence: we check whether there are new letters that meet the conditions, if any, then download the archive using the link from the last letter.
Under the last dots, it is omitted that this archive will be unpacked, the data from the archive will be cleaned and processed, and as a result, this whole thing will go further to the ETL pipeline of the process, but this is already beyond the scope of the article. If it turned out interesting and useful, I will gladly continue to describe ETL solutions and their parts for Apache Airflow.


All Articles