عملية ETL لاسترداد بيانات البريد الإلكتروني في Apache Airflow


بغض النظر عن مدى تطور التكنولوجيا ، فإن سلسلة من الأساليب القديمة دائمًا ما تمتد إلى التطوير. قد يكون هذا بسبب الانتقال السلس أو العامل البشري أو الاحتياجات التكنولوجية أو أي شيء آخر. في مجال معالجة البيانات ، والأكثر أهمية في هذا الجزء هي مصادر البيانات. بغض النظر عن الكيفية التي حلمنا بها للتخلص من هذا ، ولكن حتى الآن تم إرسال بعض البيانات في المراسلات الفورية ورسائل البريد الإلكتروني ، ناهيك عن المزيد من التنسيقات القديمة. أدعوك إلى قطع أحد خيارات Apache Airflow ، والذي يوضح كيف يمكنك أخذ البيانات من رسائل البريد الإلكتروني.


خلفية


لا يزال يتم إرسال العديد من البيانات عبر البريد الإلكتروني ، بدءًا من الاتصالات الشخصية وانتهاءً بمعايير التفاعل بين الشركات. من الجيد أن تتمكن من كتابة واجهة للحصول على البيانات أو وضع الأشخاص في المكتب الذين سيقدمون هذه المعلومات إلى مصادر أكثر ملاءمة ، ولكن غالبًا ما تكون هذه الفرصة غير موجودة. كانت المهمة المحددة التي واجهتها هي ربط نظام CRM المعروف بمستودع البيانات ، ثم بنظام OLAP. حدث ذلك تاريخيًا أنه بالنسبة لشركتنا ، كان استخدام هذا النظام مناسبًا في مجال واحد من الأعمال. لذلك ، أراد الجميع حقًا أن يتمكنوا من العمل على البيانات من نظام الطرف الثالث هذا أيضًا. أولاً وقبل كل شيء ، بالطبع ، تم استكشاف إمكانية الحصول على البيانات من واجهة برمجة تطبيقات مفتوحة. للأسف،لم يقم API بتغطية استلام جميع البيانات الضرورية ، وبعبارات بسيطة ، كان معوجًا قليلاً ، ولم يكن الدعم الفني يريد أو لا يستطيع المضي قدمًا لتقديم وظائف أكثر شمولاً. لكن هذا النظام وفر القدرة على استقبال البيانات المفقودة بشكل دوري عن طريق البريد في شكل رابط لتفريغ الأرشيف.


, , . , , .


Apache Airflow


ETL Apache Airflow. , , , , .


Apache Airflow — , , ETL (Extract-Transform-Loading) Python. Airflow , — , — . Python , . , . :


  • — , ;
  • — ;
  • — , , ( );
  • ..

Apache Airflow . .



, , :


  • ;
  • ;
  • .

from airflow.hooks.base_hook import BaseHook
import imaplib
import logging

class IMAPHook(BaseHook):
    def __init__(self, imap_conn_id):
        """
           IMAP hook      

           :param imap_conn_id:          
           :type imap_conn_id:        string
        """
        self.connection = self.get_connection(imap_conn_id)
        self.mail = None

    def authenticate(self):
        """ 
              
        """
        mail = imaplib.IMAP4_SSL(self.connection.host)
        response, detail = mail.login(user=self.connection.login, password=self.connection.password)
        if response != "OK":
            raise AirflowException("Sign in failed")
        else:
            self.mail = mail

    def get_last_mail(self, check_seen=True, box="INBOX", condition="(UNSEEN)"):
        """
                 , 
              

            :param check_seen:          
            :type check_seen:       bool
            :param box:              
            :type box:              string
            :param condition:         
            :type condition:        string
        """
        self.authenticate()
        self.mail.select(mailbox=box)
        response, data = self.mail.search(None, condition)
        mail_ids = data[0].split()
        logging.info("    : " + str(mail_ids))

        if not mail_ids:
            logging.info("   ")
            return None

        mail_id = mail_ids[0]

        #    
        if len(mail_ids) > 1:
            #   
            for id in mail_ids:
                self.mail.store(id, "+FLAGS", "\\Seen")

            #  
            mail_id = mail_ids[-1]

        #     
        if not check_seen:
            self.mail.store(mail_id, "-FLAGS", "\\Seen")

        return mail_id

: , , — . , . , , — . , .


: . , , . , , : , , , .. , , .


    def download_from_url(self, url, path, chunk_size=128):
        """
               

            :param url:               
            :type url:               string
            :param path:               
            :type path:              string
            :param chunk_size:          
            :type chunk_size:        int
        """
        r = requests.get(url, stream=True)
        with open(path, "wb") as fd:
            for chunk in r.iter_content(chunk_size=chunk_size):
                fd.write(chunk)

    def download_mail_href_attachment(self, mail_id, path):
        """
                   

            :param mail_id:          
            :type mail_id:          string
            :param path:              
            :type path:             string
        """
        response, data = self.mail.fetch(mail_id, "(RFC822)")
        raw_email = data[0][1]
        raw_soup = raw_email.decode().replace("\r", "").replace("\n", "")
        parse_soup = BeautifulSoup(raw_soup, "html.parser")
        link_text = ""

        for a in parse_soup.find_all("a", href=True, text=True):
            link_text = a["href"]

        self.download_from_url(link_text, path)

, . imap_conn_id. Apache Airflow (, , ), .




, . , , , , , , (API, , ..). . CRM , UUID. SIP- , UUID, . , , . , , , . .


, , , .


from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
from my_plugin.hooks.imap_hook import IMAPHook

class MailSensor(BaseSensorOperator):
    @apply_defaults
    def __init__(self, conn_id, check_seen=True, box="Inbox", condition="(UNSEEN)", *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.conn_id = conn_id
        self.check_seen = check_seen
        self.box = box
        self.condition = condition

    def poke(self, context):
        conn = IMAPHook(self.conn_id)
        mail_id = conn.get_last_mail(check_seen=self.check_seen, box=self.box, condition=self.condition)

        if mail_id is None:
            return False
        else:
            return True


, . — , PythonOperator


from airflow.models import DAG

from airflow.operators.python_operator import PythonOperator
from airflow.sensors.my_plugin import MailSensor
from my_plugin.hooks.imap_hook import IMAPHook

start_date = datetime(2020, 4, 4)

#   
args = {
    "owner": "example",
    "start_date": start_date,
    "email": ["home@home.ru"],
    "email_on_failure": False,
    "email_on_retry": False,
    "retry_delay": timedelta(minutes=15),
    "provide_context": False,
}

dag = DAG(
    dag_id="test_etl",
    default_args=args,
    schedule_interval="@hourly",
)

#  
mail_check_sensor = MailSensor(
    task_id="check_new_emails",
    poke_interval=10,
    conn_id="mail_conn_id",
    timeout=10,
    soft_fail=True,
    box="my_box",
    dag=dag,
    mode="poke",
)

#      
def prepare_mail():
    imap_hook = IMAPHook("mail_conn_id")
    mail_id = imap_hook.get_last_mail(check_seen=True, box="my_box")
    if mail_id is None:
        raise AirflowException("Empty mailbox")

    conn.download_mail_href_attachment(mail_id, "./path.zip")

prepare_mail_data = PythonOperator(task_id="prepare_mail_data", default_args=args, dag=dag, python_callable= prepare_mail)

#    
...

#    
mail_check_sensor >> prepare_mail_data
prepare_data >> ...
#    

, mail.ru, , .. 2016 , , , . , - . , (UNSEEN).


تلخيص ، لدينا التسلسل التالي: نتحقق مما إذا كانت هناك أحرف جديدة تستوفي الشروط ، إن وجدت ، ثم قم بتنزيل الأرشيف باستخدام الرابط من الحرف الأخير.
تحت النقاط الأخيرة ، يتم حذف أن هذا الأرشيف سيتم تفريغه ، وسيتم تنظيف البيانات من الأرشيف ومعالجتها ، ونتيجة لذلك ، سيذهب هذا الأمر برمته إلى خط أنابيب ETL للعملية ، ولكن هذا بالفعل خارج نطاق المقالة. إذا اتضح أنها مثيرة للاهتمام ومفيدة ، فسأواصل بسرور وصف حلول ETL وأجزائها لـ Apache Airflow.


All Articles