Integrasi Aviasales API dengan Amazon Kinesis dan kesederhanaan tanpa server

Halo, Habr!

Apakah Anda suka menerbangkan pesawat? Saya menyukainya, tetapi dengan isolasi sendiri, saya juga suka menganalisis data tiket pesawat dari satu sumber daya terkenal - Aviasales.

Hari ini kita akan menganalisis karya Amazon Kinesis, membangun sistem streaming dengan analisis waktu nyata, menempatkan basis data Amazon DynamoDB NoSQL sebagai gudang data utama dan mengatur peringatan SMS untuk tiket menarik.

Semua detail di bawah cut! Pergilah!



pengantar


Misalnya, kita memerlukan akses ke API Aviasales . Akses ke sana disediakan gratis dan tanpa batasan, Anda hanya perlu mendaftar di bagian "Pengembang" untuk mendapatkan token API Anda untuk mengakses data.

Tujuan utama artikel ini adalah untuk memberikan pemahaman umum tentang penggunaan informasi streaming di AWS, kami membuatnya keluar dari pertanyaan bahwa data yang dikembalikan oleh API yang digunakan tidak sepenuhnya relevan dan ditransfer dari cache, yang dihasilkan berdasarkan pencarian pengguna situs Aviasales.ru dan Jetradar.com untuk 48 jam terakhir.

Data tiket maskapai agen Kinesis yang diterima melalui API yang dipasang pada mesin produsen akan secara otomatis diuraikan dan ditransfer ke aliran yang diinginkan melalui Kinesis Data Analytics. Versi aliran ini yang belum diproses akan ditulis langsung ke repositori. Digunakan dalam penyimpanan data mentah DynamoDB akan memungkinkan untuk analisis tiket yang lebih mendalam melalui alat BI, seperti AWS Quick Sight.

Kami akan mempertimbangkan dua opsi untuk menggunakan seluruh infrastruktur:

  • Manual - melalui Konsol Manajemen AWS;
  • Infrastruktur dari kode Terraform - untuk insinyur otomatisasi malas;

Arsitektur sistem dalam pengembangan



Komponen yang Digunakan:

  • Aviasales API - data yang dikembalikan oleh API ini akan digunakan untuk semua pekerjaan selanjutnya;
  • Mesin Virtual Produser EC2 - mesin virtual biasa di cloud tempat aliran data input akan dihasilkan:

    • Agen Kinesis adalah aplikasi Java yang diinstal secara lokal yang menyediakan cara mudah untuk mengumpulkan dan mengirim data ke Kinesis (Kinesis Data Stream atau Kinesis Firehose). Agen terus memantau satu set file di direktori yang ditentukan dan mengirimkan data baru ke Kinesis;
    • Caller API Script - skrip Python yang membuat permintaan API dan menempatkan respons dalam folder yang dipantau oleh Kinesis Agent;
  • Kinesis Data Streams β€” ;
  • Kinesis Analytics β€” , . Amazon Kinesis Data Analytics ;
  • AWS Lambda β€” , . ;
  • Amazon DynamoDB β€” «‑» , 10 . DynamoDB - , . DynamoDB , . ;
  • Amazon SNS β€” Β« β€” Β» (Pub/Sub), , . SNS push-, SMS- .

Pelatihan awal


Untuk meniru aliran data, saya memutuskan untuk menggunakan informasi tiket pesawat yang dikembalikan oleh Aviasales API. The dokumentasi memiliki daftar yang cukup luas metode yang berbeda, mengambil salah satu dari mereka - β€œHarga Kalender untuk Bulan”, yang kembali harga untuk setiap hari bulan, dikelompokkan dengan jumlah transfer. Jika Anda tidak mengirimkan bulan pencarian dalam permintaan, informasi akan dikembalikan untuk bulan berikutnya.

Jadi, daftar, dapatkan token Anda.

Contoh permintaan di bawah ini:

http://api.travelpayouts.com/v2/prices/month-matrix?currency=rub&origin=LED&destination=HKT&show_to_affiliates=true&token=TOKEN_API

Metode menerima data dari API dengan token di dalam permintaan akan berfungsi, tetapi saya lebih suka meneruskan token akses melalui header, jadi kami akan menggunakan metode ini dalam skrip api_caller.py.

Contoh jawaban:

{{
   "success":true,
   "data":[{
      "show_to_affiliates":true,
      "trip_class":0,
      "origin":"LED",
      "destination":"HKT",
      "depart_date":"2015-10-01",
      "return_date":"",
      "number_of_changes":1,
      "value":29127,
      "found_at":"2015-09-24T00:06:12+04:00",
      "distance":8015,
      "actual":true
   }]
}

Contoh respons API di atas menunjukkan tiket dari St. Petersburg ke Phuk ... Oh, impian apa ...
Karena saya dari Kazan, dan Phuket "hanya bermimpi tentang kami" sekarang, kami akan mencari tiket dari St. Petersburg ke Kazan.
Diasumsikan bahwa Anda sudah memiliki akun AWS. Saya ingin memberikan perhatian khusus segera bahwa Kinesis dan mengirim pemberitahuan melalui SMS tidak termasuk dalam Free Tier tahunan (penggunaan gratis) . Tetapi meskipun demikian, memiliki beberapa dolar dalam pikiran, sangat mungkin untuk membangun sistem yang diusulkan dan bermain dengannya. Dan, tentu saja, jangan lupa untuk menghapus semua sumber daya setelah mereka menjadi tidak perlu.
Untungnya, fungsi DynamoDb dan lambda akan menjadi shareware bagi kami jika Anda tetap dalam batas gratis bulanan. Misalnya, untuk DynamoDB: penyimpanan 25 GB, 25 WCU / RCU dan 100 juta permintaan. Dan sejuta panggilan ke fungsi lambda per bulan.

Sistem penyebaran manual


Menyiapkan Streaming Data Kinesis


Pergi ke layanan Streaming Data Kinesis dan buat dua aliran baru, masing-masing satu beling.

Apa itu beling?
β€” Amazon Kinesis. 1 / 2 /. 1000 PUT . . , . 2 / 4 / 2000 PUT .

Semakin banyak pecahan dalam aliran Anda, semakin besar throughputnya. Pada prinsipnya, aliran ditingkatkan dengan cara ini dengan menambahkan pecahan. Tetapi semakin banyak pecahan yang Anda miliki, semakin tinggi harganya. Setiap pecahan harganya 1,5 sen per jam dan tambahan 1,4 sen untuk setiap juta unit muatan PUT.

Buat utas baru dengan nama airline_tickets , 1 shard akan cukup untuk itu:


Sekarang buat aliran lain yang disebut special_stream :


Pengaturan Produser


Sebagai produsen data untuk mengurai tugas, cukup menggunakan contoh EC2 biasa. Itu tidak harus menjadi mesin virtual mahal yang kuat; spot t2.micro cukup cocok.

Catatan penting: misalnya, Anda harus menggunakan gambar - Amazon Linux AMI 2018.03.0, dengan itu ada lebih sedikit pengaturan untuk meluncurkan Agen Kinesis dengan cepat.

Pergi ke layanan EC2, buat mesin virtual baru, pilih AMI yang diinginkan dengan tipe t2.micro, yang termasuk dalam Free Tier:


Agar mesin virtual yang baru dibuat untuk dapat berinteraksi dengan layanan Kinesis, Anda harus memberikannya hak untuk melakukannya. Cara terbaik untuk melakukan ini adalah dengan menetapkan Peran IAM. Oleh karena itu, pada Langkah 3: Konfigurasikan detail Contoh, pilih Buat Peran IAM baru :

Membuat Peran IAM untuk EC2

, , EC2 Permissions:


, : AmazonKinesisFullAccess CloudWatchFullAccess.

- , : EC2-KinesisStreams-FullAccess. , , :


, :


.

Anda dapat membiarkan parameter hard disk secara default, juga tag (meskipun praktik yang baik untuk menggunakan tag, setidaknya berikan nama instance dan tentukan lingkungan).

Sekarang kita berada di Langkah 6: Konfigurasikan tab Grup Keamanan, di mana Anda perlu membuat yang baru atau menentukan grup Keamanan yang ada, yang memungkinkan Anda untuk terhubung melalui ssh (port 22) ke instance. Pilih Sumber -> IP saya di sana dan Anda dapat menjalankan instance.


Setelah memasuki status berjalan, Anda dapat mencoba menghubungkannya melalui ssh.

Agar dapat bekerja dengan Agen Kinesis, setelah koneksi yang berhasil ke mesin, Anda harus memasukkan perintah berikut di terminal:

sudo yum -y update
sudo yum install -y python36 python36-pip
sudo /usr/bin/pip-3.6 install --upgrade pip
sudo yum install -y aws-kinesis-agent

Buat folder untuk menyimpan respons API:

sudo mkdir /var/log/airline_tickets

Sebelum memulai agen, Anda perlu mengkonfigurasi konfigurasinya:

sudo vim /etc/aws-kinesis/agent.json

Isi file agent.json akan terlihat seperti ini:

{
  "cloudwatch.emitMetrics": true,
  "kinesis.endpoint": "",
  "firehose.endpoint": "",

  "flows": [
    {
      "filePattern": "/var/log/airline_tickets/*log",
      "kinesisStream": "airline_tickets",
      "partitionKeyOption": "RANDOM",
      "dataProcessingOptions": [
         {
            "optionName": "CSVTOJSON",
            "customFieldNames": ["cost","trip_class","show_to_affiliates",
                "return_date","origin","number_of_changes","gate","found_at",
                "duration","distance","destination","depart_date","actual","record_id"]
         }
      ]
    }
  ]
}

Seperti yang dapat Anda lihat dari file konfigurasi, agen akan memantau file dengan ekstensi .log di direktori / var / log / airline_tickets /, parsing dan transfer ke aliran airline_tickets.

Kami memulai kembali layanan dan memastikan bahwa itu dimulai dan berfungsi:

sudo service aws-kinesis-agent restart

Sekarang unduh skrip Python yang akan meminta data dari API:

REPO_PATH=https://raw.githubusercontent.com/igorgorbenko/aviasales_kinesis/master/producer

wget $REPO_PATH/api_caller.py -P /home/ec2-user/
wget $REPO_PATH/requirements.txt -P /home/ec2-user/
sudo chmod a+x /home/ec2-user/api_caller.py
sudo /usr/local/bin/pip3 install -r /home/ec2-user/requirements.txt

Skrip api_caller.py meminta data dari Aviasales dan menyimpan respons yang diterima di direktori yang dipindai oleh agen Kinesis. Implementasi skrip ini cukup standar, ada kelas TicketsApi, memungkinkan Anda untuk menarik API secara tidak sinkron. Di kelas ini, kami meneruskan tajuk dengan token dan parameter permintaan:

class TicketsApi:
    """Api caller class."""

    def __init__(self, headers):
        """Init method."""
        self.base_url = BASE_URL
        self.headers = headers

    async def get_data(self, data):
        """Get the data from API query."""
        response_json = {}
        async with ClientSession(headers=self.headers) as session:
            try:
                response = await session.get(self.base_url, data=data)
                response.raise_for_status()
                LOGGER.info('Response status %s: %s',
                            self.base_url, response.status)
                response_json = await response.json()
            except HTTPError as http_err:
                LOGGER.error('Oops! HTTP error occurred: %s', str(http_err))
            except Exception as err:
                LOGGER.error('Oops! An error ocurred: %s', str(err))
            return response_json


def prepare_request(api_token):
    """Return the headers and query fot the API request."""
    headers = {'X-Access-Token': api_token,
               'Accept-Encoding': 'gzip'}

    data = FormData()
    data.add_field('currency', CURRENCY)
    data.add_field('origin', ORIGIN)
    data.add_field('destination', DESTINATION)
    data.add_field('show_to_affiliates', SHOW_TO_AFFILIATES)
    data.add_field('trip_duration', TRIP_DURATION)
    return headers, data


async def main():
    """Get run the code."""
    if len(sys.argv) != 2:
        print('Usage: api_caller.py <your_api_token>')
        sys.exit(1)
        return
    api_token = sys.argv[1]
    headers, data = prepare_request(api_token)

    api = TicketsApi(headers)
    response = await api.get_data(data)
    if response.get('success', None):
        LOGGER.info('API has returned %s items', len(response['data']))
        try:
            count_rows = log_maker(response)
            LOGGER.info('%s rows have been saved into %s',
                        count_rows,
                        TARGET_FILE)
        except Exception as e:
            LOGGER.error('Oops! Request result was not saved to file. %s',
                         str(e))
    else:
        LOGGER.error('Oops! API request was unsuccessful %s!', response)

Untuk menguji pengaturan dan operabilitas agen yang benar, kami akan melakukan uji coba script api_caller.py:

sudo ./api_caller.py TOKEN


Dan kami melihat hasil pekerjaan di log Agen dan pada tab Pemantauan dalam aliran data airline_tickets:

tail -f /var/log/aws-kinesis-agent/aws-kinesis-agent.log



Seperti yang Anda lihat, semuanya berfungsi dan Agen Kinesis berhasil mengirim data ke aliran. Sekarang konfigurasikan konsumen.

Menyiapkan Analisis Data Kinesis


Mari kita beralih ke komponen utama dari keseluruhan sistem - buat aplikasi baru di Kinesis Data Analytics yang disebut kinesis_analytics_airlines_app:


Analisis Data Kinesis memungkinkan analisis data real-time dari Streaming Kinesis menggunakan SQL. Ini adalah layanan yang sepenuhnya dapat diskalakan-otomatis (tidak seperti Kinesis Streams), yang:

  1. memungkinkan Anda untuk membuat aliran baru (Output Stream) berdasarkan permintaan ke sumber data;
  2. menyediakan aliran dengan kesalahan yang terjadi selama operasi aplikasi (Stream Kesalahan);
  3. Secara otomatis dapat menentukan skema input data (dapat secara manual didefinisikan ulang jika perlu).

Ini adalah layanan mahal - 0,11 USD per jam, jadi Anda harus menggunakannya dengan hati-hati dan menghapusnya saat Anda menyelesaikan pekerjaan.

Hubungkan aplikasi ke sumber data:


Pilih aliran yang ingin Anda hubungkan (airline_tickets):


Selanjutnya, Anda harus melampirkan Peran IAM baru sehingga aplikasi dapat membaca dari aliran dan menulis ke aliran. Untuk melakukan ini, cukup untuk tidak mengubah apa pun di blok izin akses:


Sekarang kami meminta penemuan skema data dalam aliran, untuk ini kami klik pada tombol "Temukan skema". Akibatnya, peran IAM akan diperbarui (yang baru akan dibuat) dan penemuan skema dari data yang telah tiba di aliran akan diluncurkan:


Sekarang Anda harus pergi ke editor SQL. Ketika Anda mengklik tombol ini, sebuah jendela dengan pertanyaan tentang meluncurkan aplikasi akan muncul - pilih apa yang ingin kita jalankan:


Di jendela editor SQL, masukkan kueri sederhana seperti itu dan klik Simpan dan Jalankan SQL:

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ("cost" DOUBLE, "gate" VARCHAR(16));

CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM "cost", "gate"
FROM "SOURCE_SQL_STREAM_001"
WHERE "cost" < 5000
    and "gate" = 'Aeroflot';

Di database relasional, Anda bekerja dengan tabel menggunakan pernyataan INSERT untuk menambahkan catatan dan pernyataan SELECT untuk meminta data. Di Amazon Kinesis Data Analytics, Anda bekerja dengan stream (STREAM) dan "pompa" (PUMP) - permintaan penyisipan berkelanjutan yang memasukkan data dari satu aliran dalam aplikasi ke aliran lain.

Dalam kueri SQL di atas, tiket Aeroflot dicari dengan harga di bawah lima ribu rubel. Semua catatan yang termasuk dalam kondisi ini akan ditempatkan dalam aliran DESTINATION_SQL_STREAM.


Di blok Tujuan, pilih aliran special_stream, dan dalam nama aliran dalam aplikasi DESTINATION_SQL_STREAM daftar turun bawah:


Sebagai hasil dari semua manipulasi, sesuatu yang mirip dengan gambar di bawah ini akan berubah:



Membuat dan berlangganan topik SNS


Buka Layanan Pemberitahuan Sederhana dan buat topik baru bernama Maskapai:


Kami berlangganan topik ini, di dalamnya kami menunjukkan nomor ponsel yang akan menerima pemberitahuan SMS:


Membuat tabel di DynamoDB


Untuk menyimpan data mentah dari stream airline_tickets mereka, buat tabel di DynamoDB dengan nama yang sama. Sebagai kunci utama, kami akan menggunakan record_id:


Membuat fungsi kolektor lambda


Mari kita membuat fungsi lambda yang disebut Collector, yang tugasnya adalah polling stream airline_tickets dan, jika ada catatan baru di sana, masukkan catatan ini ke dalam tabel DynamoDB. Jelas, selain hak default, lambda ini harus memiliki akses untuk membaca aliran data Kinesis dan menulis ke DynamoDB.

Membuat peran IAM untuk fungsi kolektor lambda
IAM Lambda-TicketsProcessingRole:


AmazonKinesisReadOnlyAccess AmazonDynamoDBFullAccess, :



Lambda ini harus dipicu oleh pemicu Kinesis ketika entri baru mengenai aliran airline_stream, jadi Anda perlu menambahkan pemicu baru:



Tetap memasukkan kode dan menyimpan lambda.

"""Parsing the stream and inserting into the DynamoDB table."""
import base64
import json
import boto3
from decimal import Decimal

DYNAMO_DB = boto3.resource('dynamodb')
TABLE_NAME = 'airline_tickets'

class TicketsParser:
    """Parsing info from the Stream."""

    def __init__(self, table_name, records):
        """Init method."""
        self.table = DYNAMO_DB.Table(table_name)
        self.json_data = TicketsParser.get_json_data(records)

    @staticmethod
    def get_json_data(records):
        """Return deserialized data from the stream."""
        decoded_record_data = ([base64.b64decode(record['kinesis']['data'])
                                for record in records])
        json_data = ([json.loads(decoded_record)
                      for decoded_record in decoded_record_data])
        return json_data

    @staticmethod
    def get_item_from_json(json_item):
        """Pre-process the json data."""
        new_item = {
            'record_id': json_item.get('record_id'),
            'cost': Decimal(json_item.get('cost')),
            'trip_class': json_item.get('trip_class'),
            'show_to_affiliates': json_item.get('show_to_affiliates'),
            'origin': json_item.get('origin'),
            'number_of_changes': int(json_item.get('number_of_changes')),
            'gate': json_item.get('gate'),
            'found_at': json_item.get('found_at'),
            'duration': int(json_item.get('duration')),
            'distance': int(json_item.get('distance')),
            'destination': json_item.get('destination'),
            'depart_date': json_item.get('depart_date'),
            'actual': json_item.get('actual')
        }
        return new_item

    def run(self):
        """Batch insert into the table."""
        with self.table.batch_writer() as batch_writer:
            for item in self.json_data:
                dynamodb_item = TicketsParser.get_item_from_json(item)
                batch_writer.put_item(dynamodb_item)

        print('Has been added ', len(self.json_data), 'items')

def lambda_handler(event, context):
    """Parse the stream and insert into the DynamoDB table."""
    print('Got event:', event)
    parser = TicketsParser(TABLE_NAME, event['Records'])
    parser.run()

Membuat notifikasi fungsi lambda


Fungsi lambda kedua, yang akan memantau aliran kedua (special_stream) dan mengirim pemberitahuan ke SNS, dibuat dengan cara yang sama. Oleh karena itu, lambda ini harus memiliki akses baca dari Kinesis dan mengirim pesan ke topik SNS yang ditentukan, yang kemudian dikirim oleh layanan SNS ke semua pelanggan topik ini (email, SMS, dll.).

Buat Peran IAM
IAM Lambda-KinesisAlarm , alarm_notifier:



Lambda ini harus bekerja sesuai dengan pemicu untuk entri baru untuk memasuki special_stream, jadi Anda perlu mengkonfigurasi pemicu dengan cara yang sama seperti yang kami lakukan untuk lambda Kolektor.

Untuk kenyamanan mengkonfigurasi lambda ini, kami memperkenalkan variabel lingkungan baru - TOPIC_ARN, tempat kami menempatkan topik ANR (Amazon Recourse Names) dari Airlines:


Dan masukkan kode lambda, sangat sederhana:

import boto3
import base64
import os

SNS_CLIENT = boto3.client('sns')
TOPIC_ARN = os.environ['TOPIC_ARN']


def lambda_handler(event, context):
    try:
        SNS_CLIENT.publish(TopicArn=TOPIC_ARN,
                           Message='Hi! I have found an interesting stuff!',
                           Subject='Airline tickets alarm')
        print('Alarm message has been successfully delivered')
    except Exception as err:
        print('Delivery failure', str(err))

Tampaknya ini melengkapi konfigurasi sistem manual. Tetap hanya untuk menguji dan memastikan bahwa kami mengkonfigurasi semuanya dengan benar.

Menyebarkan dari kode Terraform


Persiapan yang diperlukan


Terraform adalah alat open-source yang sangat nyaman untuk menyebarkan infrastruktur dari kode. Ini memiliki sintaksnya sendiri yang mudah dipelajari dan banyak contoh tentang bagaimana dan apa yang harus digunakan. Ada banyak plugin yang nyaman di editor Atom atau Visual Studio Code yang membuatnya lebih mudah untuk bekerja dengan Terraform.

Anda dapat mengunduh kit distribusi dari sini . Analisis terperinci dari semua fitur Terraform berada di luar cakupan artikel ini, jadi kami akan membatasi diri pada poin utama.

Bagaimana memulainya


Kode proyek lengkap ada di repositori saya . Kami mengkloning repositori ke diri kami sendiri. Sebelum memulai, Anda harus memastikan bahwa Anda telah menginstal dan mengkonfigurasi AWS CLI, sebagai Terraform akan mencari kredensial di file ~ / .aw / credentials.

Merupakan praktik yang baik untuk mengerahkan seluruh infrastruktur sebelum meluncurkan perintah rencana untuk melihat apa yang Terraform ciptakan bagi kita di cloud:

terraform.exe plan

Anda akan diminta memasukkan nomor telepon untuk mengirim pemberitahuan. Pada tahap ini, itu opsional.


Setelah menganalisis rencana kerja program, kami dapat memulai pembuatan sumber daya:

terraform.exe apply

Setelah mengirim perintah ini, Anda akan kembali diminta untuk memasukkan nomor telepon, ketik "ya" ketika pertanyaan tentang pelaksanaan tindakan yang sebenarnya ditampilkan. Ini akan memungkinkan Anda untuk meningkatkan seluruh infrastruktur, melakukan semua pengaturan yang diperlukan untuk EC2, menyebarkan fungsi lambda, dll.

Setelah semua sumber daya berhasil dibuat melalui kode Terraform, Anda perlu masuk ke detail aplikasi Kinesis Analytics (sayangnya, saya tidak menemukan cara melakukan ini langsung dari kode).

Luncurkan aplikasi:


Setelah itu, Anda harus secara eksplisit mengatur nama aliran dalam aplikasi dengan memilih dari daftar turun bawah:



Sekarang semuanya sudah siap.

Pengujian Aplikasi


Terlepas dari bagaimana Anda menggunakan sistem, secara manual atau melalui kode Terraform, itu akan bekerja sama.

Kami melewati SSH ke mesin virtual EC2 tempat Agen Kinesis diinstal dan menjalankan skrip api_caller.py

sudo ./api_caller.py TOKEN

Tetap menunggu SMS ke nomor Anda:


SMS - pesan tiba di telepon dalam hampir 1 menit:


Masih harus dilihat apakah catatan disimpan dalam database DynamoDB untuk analisis selanjutnya yang lebih terperinci. Tabel airline_tickets berisi sesuatu seperti ini:


Kesimpulan


Dalam proses pengerjaan, sistem pemrosesan data online berbasis Amazon Kinesis dibangun. Kami memeriksa opsi untuk menggunakan Agen Kinesis dalam hubungannya dengan Streaming Data Kinesis dan analitik real-time dari Kinesis Analytics menggunakan perintah SQL, serta interaksi Amazon Kinesis dengan layanan AWS lainnya.

Kami menggunakan sistem di atas dalam dua cara: manual yang cukup panjang dan cepat dari kode Terraform.

Seluruh kode sumber proyek tersedia di repositori saya di GitHub , saya sarankan Anda membiasakan diri dengannya.

Saya siap membahas artikel ini dengan senang hati, saya menunggu komentar Anda. Saya berharap kritik yang membangun.

Semoga tercapai!

All Articles