
Quelle que soit la technologie développée, une série d'approches dépassées s'étire toujours pour le développement. Cela peut être dû à une transition en douceur, au facteur humain, aux besoins technologiques ou à autre chose. Dans le domaine du traitement des données, les plus importantes dans cette partie sont les sources de données. Peu importe comment nous rêvions de nous en débarrasser, mais jusqu'à présent, certaines des données ont été envoyées dans des messageries instantanées et des e-mails, sans parler de formats plus archaïques. Je vous invite à supprimer l'une des options d'Apache Airflow, qui illustre comment vous pouvez prendre des données à partir d'e-mails.
Contexte
, . , , , . , , — CRM , — OLAP. , . . , , API. , API , , , , . .
, , . , , .
Apache Airflow
ETL Apache Airflow. , , , , .
Apache Airflow — , , ETL (Extract-Transform-Loading) Python. Airflow , — , — . Python , . , . :
- — , ;
- — ;
- — , , ( );
- ..
Apache Airflow . .
, , :
from airflow.hooks.base_hook import BaseHook
import imaplib
import logging
class IMAPHook(BaseHook):
def __init__(self, imap_conn_id):
"""
IMAP hook
:param imap_conn_id:
:type imap_conn_id: string
"""
self.connection = self.get_connection(imap_conn_id)
self.mail = None
def authenticate(self):
"""
"""
mail = imaplib.IMAP4_SSL(self.connection.host)
response, detail = mail.login(user=self.connection.login, password=self.connection.password)
if response != "OK":
raise AirflowException("Sign in failed")
else:
self.mail = mail
def get_last_mail(self, check_seen=True, box="INBOX", condition="(UNSEEN)"):
"""
,
:param check_seen:
:type check_seen: bool
:param box:
:type box: string
:param condition:
:type condition: string
"""
self.authenticate()
self.mail.select(mailbox=box)
response, data = self.mail.search(None, condition)
mail_ids = data[0].split()
logging.info(" : " + str(mail_ids))
if not mail_ids:
logging.info(" ")
return None
mail_id = mail_ids[0]
if len(mail_ids) > 1:
for id in mail_ids:
self.mail.store(id, "+FLAGS", "\\Seen")
mail_id = mail_ids[-1]
if not check_seen:
self.mail.store(mail_id, "-FLAGS", "\\Seen")
return mail_id
: , , — . , . , , — . , .
: . , , . , , : , , , .. , , .
def download_from_url(self, url, path, chunk_size=128):
"""
:param url:
:type url: string
:param path:
:type path: string
:param chunk_size:
:type chunk_size: int
"""
r = requests.get(url, stream=True)
with open(path, "wb") as fd:
for chunk in r.iter_content(chunk_size=chunk_size):
fd.write(chunk)
def download_mail_href_attachment(self, mail_id, path):
"""
:param mail_id:
:type mail_id: string
:param path:
:type path: string
"""
response, data = self.mail.fetch(mail_id, "(RFC822)")
raw_email = data[0][1]
raw_soup = raw_email.decode().replace("\r", "").replace("\n", "")
parse_soup = BeautifulSoup(raw_soup, "html.parser")
link_text = ""
for a in parse_soup.find_all("a", href=True, text=True):
link_text = a["href"]
self.download_from_url(link_text, path)
, . imap_conn_id. Apache Airflow (, , ), .

, . , , , , , , (API, , ..). . CRM , UUID. SIP- , UUID, . , , . , , , . .
, , , .
from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
from my_plugin.hooks.imap_hook import IMAPHook
class MailSensor(BaseSensorOperator):
@apply_defaults
def __init__(self, conn_id, check_seen=True, box="Inbox", condition="(UNSEEN)", *args, **kwargs):
super().__init__(*args, **kwargs)
self.conn_id = conn_id
self.check_seen = check_seen
self.box = box
self.condition = condition
def poke(self, context):
conn = IMAPHook(self.conn_id)
mail_id = conn.get_last_mail(check_seen=self.check_seen, box=self.box, condition=self.condition)
if mail_id is None:
return False
else:
return True
, . — , PythonOperator
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.sensors.my_plugin import MailSensor
from my_plugin.hooks.imap_hook import IMAPHook
start_date = datetime(2020, 4, 4)
args = {
"owner": "example",
"start_date": start_date,
"email": ["home@home.ru"],
"email_on_failure": False,
"email_on_retry": False,
"retry_delay": timedelta(minutes=15),
"provide_context": False,
}
dag = DAG(
dag_id="test_etl",
default_args=args,
schedule_interval="@hourly",
)
mail_check_sensor = MailSensor(
task_id="check_new_emails",
poke_interval=10,
conn_id="mail_conn_id",
timeout=10,
soft_fail=True,
box="my_box",
dag=dag,
mode="poke",
)
def prepare_mail():
imap_hook = IMAPHook("mail_conn_id")
mail_id = imap_hook.get_last_mail(check_seen=True, box="my_box")
if mail_id is None:
raise AirflowException("Empty mailbox")
conn.download_mail_href_attachment(mail_id, "./path.zip")
prepare_mail_data = PythonOperator(task_id="prepare_mail_data", default_args=args, dag=dag, python_callable= prepare_mail)
...
mail_check_sensor >> prepare_mail_data
prepare_data >> ...
, mail.ru, , .. 2016 , , , . , - . , (UNSEEN).
En résumé, nous avons la séquence suivante: nous vérifions s'il y a de nouvelles lettres qui remplissent les conditions, le cas échéant, puis téléchargeons l'archive en utilisant le lien de la dernière lettre.
Sous les derniers points, il est omis que cette archive soit décompressée, les données de l'archive soient nettoyées et traitées, et à la fin, tout cela ira plus loin dans le pipeline de processus ETL, mais cela dépasse déjà le cadre de l'article. Si cela s'avère intéressant et utile, je continuerai volontiers à décrire les solutions ETL et leurs composants pour Apache Airflow.