تكامل Aviasales API مع Amazon Kinesis والبساطة بدون خادم

مرحبا يا هابر!

هل تحب أن تطير الطائرات؟ أنا أحب ذلك ، ولكن على العزلة الذاتية ، أحببت أيضًا تحليل بيانات السفر الجوي لمورد واحد معروف - Aviasales.

اليوم سنقوم بتحليل عمل Amazon Kinesis ، وبناء نظام دفق مع تحليلات في الوقت الحقيقي ، ووضع قاعدة بيانات Amazon DynamoDB NoSQL كمخزن بيانات رئيسي وإعداد تنبيهات SMS لتذاكر مثيرة للاهتمام.

كل التفاصيل تحت القطع! اذهب!



المقدمة


على سبيل المثال ، نحتاج إلى الوصول إلى Aviasales API . يتم توفير الوصول إليه مجانًا وبدون قيود ، ما عليك سوى التسجيل في قسم "المطورين" للحصول على رمز API الخاص بك للوصول إلى البيانات.

الغرض الرئيسي من هذه المقالة هو إعطاء فهم عام لاستخدام معلومات التدفق في AWS ، ونجعل من غير المستبعد أن البيانات التي يتم إرجاعها بواسطة واجهة برمجة التطبيقات المستخدمة ليست ذات صلة صارمة ويتم نقلها من ذاكرة التخزين المؤقت ، والتي يتم إنشاؤها استنادًا إلى عمليات البحث عن مستخدمي مواقع Aviasales.ru و Jetradar.com آخر 48 ساعة.

سيتم تحليل بيانات تذكرة طيران وكيل Kinesis المستلمة من خلال واجهة برمجة التطبيقات المثبتة على جهاز المنتج تلقائيًا ونقلها إلى التدفق المطلوب من خلال Kinesis Data Analytics. ستتم كتابة نسخة غير معالجة من هذا الدفق مباشرة إلى المستودع. النشر في DynamoDB تخزين البيانات الخام سيسمح بتحليل أكثر تعمقا للتذاكر من خلال أدوات BI ، مثل AWS Quick Sight.

سننظر في خيارين لنشر البنية التحتية بالكامل:

  • دليل - من خلال وحدة تحكم إدارة AWS ؛
  • البنية التحتية من كود Terraform - لمهندسي الأتمتة البطيئة ؛

بنية النظام قيد التطوير



المكونات المستخدمة:

  • Aviasales API - سيتم استخدام البيانات التي يتم إرجاعها بواسطة واجهة برمجة التطبيقات هذه لجميع الأعمال اللاحقة ؛
  • مثيل منتج EC2 - جهاز افتراضي منتظم في السحابة سيتم إنشاء دفق بيانات الإدخال عليه:

    • وكيل Kinesis هو تطبيق Java مثبت محليًا يوفر طريقة سهلة لجمع البيانات وإرسالها إلى Kinesis (Kinesis Data Streams أو Kinesis Firehose). يراقب الوكيل باستمرار مجموعة من الملفات في الدلائل المحددة ويرسل بيانات جديدة إلى Kinesis ؛
    • Caller API Script - نص برمجي Python يجعل طلبات API ويضع الاستجابة في مجلد يراقبه عامل Kinesis ؛
  • Kinesis Data Streams — ;
  • Kinesis Analytics — , . Amazon Kinesis Data Analytics ;
  • AWS Lambda — , . ;
  • Amazon DynamoDB — «‑» , 10 . DynamoDB - , . DynamoDB , . ;
  • Amazon SNS — « — » (Pub/Sub), , . SNS push-, SMS- .

تدريب اولي


لمحاكاة تدفق البيانات ، قررت استخدام معلومات الطيران التي تم إرجاعها بواسطة Aviasales API. تحتوي الوثائق على قائمة شاملة إلى حد ما من الطرق المختلفة ، خذ واحدة منها - "تقويم الأسعار للشهر" ، الذي يعيد الأسعار لكل يوم من الشهر ، مجمعة حسب عدد التحويلات. إذا لم ترسل شهر البحث في الطلب ، فسيتم إرجاع المعلومات للشهر الذي يلي الشهر الحالي.

لذا ، سجل ، احصل على رمزك.

طلب المثال أدناه:

http://api.travelpayouts.com/v2/prices/month-matrix?currency=rub&origin=LED&destination=HKT&show_to_affiliates=true&token=TOKEN_API

ستعمل الطريقة المذكورة أعلاه لتلقي البيانات من واجهة برمجة التطبيقات مع الرمز المميز في الطلب ، لكنني أفضل تمرير رمز الدخول عبر الرأس ، لذلك سنستخدم هذه الطريقة في البرنامج النصي api_caller.py.

جواب المثال:

{{
   "success":true,
   "data":[{
      "show_to_affiliates":true,
      "trip_class":0,
      "origin":"LED",
      "destination":"HKT",
      "depart_date":"2015-10-01",
      "return_date":"",
      "number_of_changes":1,
      "value":29127,
      "found_at":"2015-09-24T00:06:12+04:00",
      "distance":8015,
      "actual":true
   }]
}

يوضح مثال رد واجهة برمجة التطبيقات (API) أعلاه تذكرة من سانت بطرسبرغ إلى بوك ... أوه ، ما الذي تحلم به ...
نظرًا لأنني من قازان ، وفوكيت "نحلم بنا فقط" الآن ، سنبحث عن تذاكر من سانت بطرسبرغ إلى قازان.
من المفترض أن لديك بالفعل حساب AWS. أريد أن أنتبه بشكل خاص فورًا إلى أن Kinesis وإرسال الإشعارات عبر الرسائل القصيرة لا يتم تضمينهما في المستوى المجاني السنوي (الاستخدام المجاني) . ولكن على الرغم من ذلك ، مع أخذ دولارين في الاعتبار ، فمن الممكن تمامًا بناء النظام المقترح واللعب به. وبالطبع ، لا تنس حذف جميع الموارد بعد أن تصبح غير ضرورية.
لحسن الحظ ، ستكون وظائف DynamoDb و lambda مجانية بالنسبة لنا إذا بقيت ضمن الحدود الشهرية المجانية. على سبيل المثال ، لـ DynamoDB: 25 جيجا بايت من التخزين ، 25 WCU / RCU و 100 مليون طلب. ومليون مكالمة لوظائف لامدا في الشهر.

نظام النشر اليدوي


إعداد تدفقات بيانات Kinesis


انتقل إلى خدمة Kinesis Data Streams وقم بإنشاء تدفقين جديدين ، جزء واحد لكل منهما.

ما هي القشرة؟
— Amazon Kinesis. 1 / 2 /. 1000 PUT . . , . 2 / 4 / 2000 PUT .

كلما زاد عدد الأجزاء في التدفق ، زاد معدل إنتاجها. من حيث المبدأ ، يتم قياس التدفقات بهذه الطريقة بإضافة شظايا. ولكن كلما كان لديك شظايا أكثر ، ارتفع السعر. تبلغ تكلفة كل قطعة 1.5 سنتًا في الساعة و 1.4 سنتًا إضافيًا لكل مليون وحدة حمولة PUT.

أنشئ خيطًا جديدًا باسم flight_tickets ، يكفي قطعة واحدة لذلك:


الآن قم بإنشاء دفق آخر يسمى special_stream :


إعداد المنتج


كمنتج بيانات لتحليل مهمة ، يكفي استخدام مثيل EC2 عادي. لا يجب أن تكون آلة افتراضية باهظة الثمن قوية ؛ بقعة t2.micro مناسبة تمامًا.

ملاحظة مهمة: على سبيل المثال ، يجب عليك استخدام الصورة - Amazon Linux AMI 2018.03.0 ، مع وجود إعدادات أقل لتشغيل وكيل Kinesis بسرعة.

انتقل إلى خدمة EC2 ، وأنشئ جهازًا افتراضيًا جديدًا ، وحدد AMI المطلوب مع نوع t2.micro ، والمضمن في المستوى المجاني:


لكي يتمكن الجهاز الظاهري الذي تم إنشاؤه حديثًا من التفاعل مع خدمة Kinesis ، يجب عليك منحه الحق في القيام بذلك. أفضل طريقة للقيام بذلك هي تعيين دور IAM. لذلك ، في شاشة الخطوة 3: تكوين تفاصيل المثيل ، حدد إنشاء دور IAM جديد :

إنشاء أدوار IAM لـ EC2

, , EC2 Permissions:


, : AmazonKinesisFullAccess CloudWatchFullAccess.

- , : EC2-KinesisStreams-FullAccess. , , :


, :


.

يمكنك ترك معلمات القرص الثابت افتراضيًا ، والعلامات أيضًا (على الرغم من أنه من الأفضل استخدام العلامات ، على الأقل قم بتسمية اسم المثيل وتحديد البيئة).

نحن الآن في الخطوة 6: تكوين علامة التبويب مجموعة الأمان ، حيث تحتاج إلى إنشاء واحدة جديدة أو تحديد مجموعة الأمان الموجودة لديك ، مما يسمح لك بالاتصال عبر ssh (المنفذ 22) بالمثيل. حدد المصدر -> IP الخاص بي هناك ويمكنك تشغيل المثيل.


بمجرد دخوله في حالة التشغيل ، يمكنك محاولة الاتصال به عبر ssh.

لتتمكن من العمل مع وكيل Kinesis ، بعد اتصال ناجح بالجهاز ، يجب عليك إدخال الأوامر التالية في الوحدة الطرفية:

sudo yum -y update
sudo yum install -y python36 python36-pip
sudo /usr/bin/pip-3.6 install --upgrade pip
sudo yum install -y aws-kinesis-agent

قم بإنشاء مجلد لحفظ استجابات API:

sudo mkdir /var/log/airline_tickets

قبل بدء تشغيل الوكيل ، تحتاج إلى تكوين التكوين الخاص به:

sudo vim /etc/aws-kinesis/agent.json

يجب أن تبدو محتويات ملف agent.json كما يلي:

{
  "cloudwatch.emitMetrics": true,
  "kinesis.endpoint": "",
  "firehose.endpoint": "",

  "flows": [
    {
      "filePattern": "/var/log/airline_tickets/*log",
      "kinesisStream": "airline_tickets",
      "partitionKeyOption": "RANDOM",
      "dataProcessingOptions": [
         {
            "optionName": "CSVTOJSON",
            "customFieldNames": ["cost","trip_class","show_to_affiliates",
                "return_date","origin","number_of_changes","gate","found_at",
                "duration","distance","destination","depart_date","actual","record_id"]
         }
      ]
    }
  ]
}

كما ترى من ملف التهيئة ، يقوم الوكيل بمراقبة الملفات ذات الامتداد .log في الدليل / var / log / airline_tickets / ، وتحليلها ونقلها إلى تدفق شركة الخطوط الجوية.

نعيد تشغيل الخدمة ونتأكد من أنها تبدأ وتعمل:

sudo service aws-kinesis-agent restart

الآن قم بتنزيل برنامج Python النصي الذي سيطلب بيانات من API:

REPO_PATH=https://raw.githubusercontent.com/igorgorbenko/aviasales_kinesis/master/producer

wget $REPO_PATH/api_caller.py -P /home/ec2-user/
wget $REPO_PATH/requirements.txt -P /home/ec2-user/
sudo chmod a+x /home/ec2-user/api_caller.py
sudo /usr/local/bin/pip3 install -r /home/ec2-user/requirements.txt

يطلب البرنامج النصي api_caller.py بيانات من Aviasales ويحفظ الاستجابة المستلمة في الدليل الذي يقوم وكيل Kinesis بفحصه. تنفيذ هذا البرنامج النصي قياسي إلى حد ما ، هناك TicketsApi فئة ، يسمح لك بسحب API بشكل غير متزامن. في هذه الفئة ، نمرر الرأس مع الرمز المميز ومعلمات الطلب:

class TicketsApi:
    """Api caller class."""

    def __init__(self, headers):
        """Init method."""
        self.base_url = BASE_URL
        self.headers = headers

    async def get_data(self, data):
        """Get the data from API query."""
        response_json = {}
        async with ClientSession(headers=self.headers) as session:
            try:
                response = await session.get(self.base_url, data=data)
                response.raise_for_status()
                LOGGER.info('Response status %s: %s',
                            self.base_url, response.status)
                response_json = await response.json()
            except HTTPError as http_err:
                LOGGER.error('Oops! HTTP error occurred: %s', str(http_err))
            except Exception as err:
                LOGGER.error('Oops! An error ocurred: %s', str(err))
            return response_json


def prepare_request(api_token):
    """Return the headers and query fot the API request."""
    headers = {'X-Access-Token': api_token,
               'Accept-Encoding': 'gzip'}

    data = FormData()
    data.add_field('currency', CURRENCY)
    data.add_field('origin', ORIGIN)
    data.add_field('destination', DESTINATION)
    data.add_field('show_to_affiliates', SHOW_TO_AFFILIATES)
    data.add_field('trip_duration', TRIP_DURATION)
    return headers, data


async def main():
    """Get run the code."""
    if len(sys.argv) != 2:
        print('Usage: api_caller.py <your_api_token>')
        sys.exit(1)
        return
    api_token = sys.argv[1]
    headers, data = prepare_request(api_token)

    api = TicketsApi(headers)
    response = await api.get_data(data)
    if response.get('success', None):
        LOGGER.info('API has returned %s items', len(response['data']))
        try:
            count_rows = log_maker(response)
            LOGGER.info('%s rows have been saved into %s',
                        count_rows,
                        TARGET_FILE)
        except Exception as e:
            LOGGER.error('Oops! Request result was not saved to file. %s',
                         str(e))
    else:
        LOGGER.error('Oops! API request was unsuccessful %s!', response)

لاختبار الإعدادات الصحيحة وقابلية تشغيل الوكيل ، سنجري اختبارًا للنص البرمجي api_caller.py:

sudo ./api_caller.py TOKEN


ونلقي نظرة على نتيجة العمل في سجلات الوكيل وعلى علامة التبويب المراقبة في تدفق بيانات شركة الخطوط الجوية:

tail -f /var/log/aws-kinesis-agent/aws-kinesis-agent.log



كما ترى ، يعمل كل شيء ويرسل وكيل Kinesis البيانات إلى الدفق بنجاح. الآن تكوين المستهلك.

إعداد تحليلات بيانات Kinesis


دعنا ننتقل إلى المكون المركزي للنظام بأكمله - إنشاء تطبيق جديد في Kinesis Data Analytics يسمى kinesis_analytics_airlines_app:


يتيح Kinesis Data Analytics تحليلات البيانات في الوقت الحقيقي من Kinesis Streams باستخدام SQL. هذه خدمة قابلة للتطوير تلقائيًا بالكامل (على عكس تيارات Kinesis) ، والتي:

  1. يسمح لك بإنشاء تدفقات جديدة (دفق الإخراج) بناءً على الاستعلامات الخاصة ببيانات المصدر ؛
  2. يوفر دفق مع الأخطاء التي حدثت أثناء عملية التطبيق (دفق الخطأ) ؛
  3. يمكنه تحديد مخطط بيانات الإدخال تلقائيًا (يمكن إعادة تعريفه يدويًا إذا لزم الأمر).

هذه خدمة باهظة الثمن - 0.11 دولار أمريكي للساعة ، لذا يجب عليك استخدامها بعناية وإزالتها عند الانتهاء من العمل.

قم بتوصيل التطبيق بمصدر البيانات:


اختر المجموعة التي تريد الاتصال بها (خطوط طيران):


بعد ذلك ، تحتاج إلى إرفاق دور IAM الجديد حتى يتمكن التطبيق من القراءة من الدفق والكتابة إلى الدفق. للقيام بذلك ، يكفي عدم تغيير أي شيء في كتلة أذونات الوصول:


نطلب الآن اكتشاف مخطط البيانات في الدفق ، لذلك ننقر على الزر "اكتشاف المخطط". ونتيجة لذلك ، سيتم تحديث دور IAM (سيتم إنشاء دور جديد) وسيتم إطلاق اكتشاف المخطط من البيانات التي وصلت بالفعل في الدفق:


الآن أنت بحاجة للذهاب إلى محرر SQL. عند النقر فوق هذا الزر ، ستظهر نافذة بها سؤال حول تشغيل التطبيق - اختر ما نريد تشغيله:


في نافذة محرر SQL ، قم بإدراج مثل هذا الاستعلام البسيط وانقر فوق حفظ وتشغيل SQL:

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ("cost" DOUBLE, "gate" VARCHAR(16));

CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM "cost", "gate"
FROM "SOURCE_SQL_STREAM_001"
WHERE "cost" < 5000
    and "gate" = 'Aeroflot';

في قواعد البيانات العلائقية ، تعمل مع الجداول باستخدام عبارات INSERT لإضافة السجلات وعبارة SELECT إلى بيانات الاستعلام. في Amazon Kinesis Data Analytics ، أنت تعمل مع التدفقات (STREAM) و "المضخات" (PUMP) - طلبات الإدراج المستمر التي تدرج البيانات من دفق واحد في تطبيق في دفق آخر.

في استعلام SQL أعلاه ، يتم البحث عن تذاكر Aeroflot بأسعار أقل من خمسة آلاف روبل. سيتم وضع جميع السجلات التي تندرج تحت هذه الشروط في دفق DESTINATION_SQL_STREAM.


في كتلة الوجهة ، حدد دفق special_stream ، وفي القائمة المنسدلة اسم دفق داخل التطبيق DESTINATION_SQL_STREAM:


نتيجة لجميع التلاعبات ، يجب أن يظهر شيء مشابه للصورة أدناه:



إنشاء والاشتراك في موضوع SNS


انتقل إلى خدمة الإعلام البسيط وأنشئ موضوعًا جديدًا باسم شركات الطيران:


نحن مشتركون في هذا الموضوع ، حيث نشير إلى رقم الهاتف المحمول الذي ستصل إليه إشعارات الرسائل القصيرة:


إنشاء جدول في DynamoDB


لتخزين البيانات الأولية الخاصة بتدفق تذاكر الطيران الخاصة بهم ، قم بإنشاء جدول في DynamoDB بنفس الاسم. كمفتاح أساسي ، سوف نستخدم record_id:


إنشاء وظيفة جامع لامدا


دعنا ننشئ وظيفة لامدا تسمى Collector ، ومهمتها استطلاع رأي تيار شركة الخطوط الجوية ، وإذا كانت هناك سجلات جديدة هناك ، فأدخل هذه السجلات في جدول DynamoDB. من الواضح أنه بالإضافة إلى الحقوق الافتراضية ، يجب أن يكون لدى لامدا حق الوصول لقراءة دفق بيانات Kinesis والكتابة إلى DynamoDB.

إنشاء دور IAM لوظيفة جامع لامدا
IAM Lambda-TicketsProcessingRole:


AmazonKinesisReadOnlyAccess AmazonDynamoDBFullAccess, :



يجب تشغيل لامدا هذه من خلال مشغل Kinesis عندما تصل الإدخالات الجديدة إلى تدفق شركة الخطوط الجوية ، لذلك تحتاج إلى إضافة مشغل جديد:



يبقى إدخال الكود وحفظ لامدا.

"""Parsing the stream and inserting into the DynamoDB table."""
import base64
import json
import boto3
from decimal import Decimal

DYNAMO_DB = boto3.resource('dynamodb')
TABLE_NAME = 'airline_tickets'

class TicketsParser:
    """Parsing info from the Stream."""

    def __init__(self, table_name, records):
        """Init method."""
        self.table = DYNAMO_DB.Table(table_name)
        self.json_data = TicketsParser.get_json_data(records)

    @staticmethod
    def get_json_data(records):
        """Return deserialized data from the stream."""
        decoded_record_data = ([base64.b64decode(record['kinesis']['data'])
                                for record in records])
        json_data = ([json.loads(decoded_record)
                      for decoded_record in decoded_record_data])
        return json_data

    @staticmethod
    def get_item_from_json(json_item):
        """Pre-process the json data."""
        new_item = {
            'record_id': json_item.get('record_id'),
            'cost': Decimal(json_item.get('cost')),
            'trip_class': json_item.get('trip_class'),
            'show_to_affiliates': json_item.get('show_to_affiliates'),
            'origin': json_item.get('origin'),
            'number_of_changes': int(json_item.get('number_of_changes')),
            'gate': json_item.get('gate'),
            'found_at': json_item.get('found_at'),
            'duration': int(json_item.get('duration')),
            'distance': int(json_item.get('distance')),
            'destination': json_item.get('destination'),
            'depart_date': json_item.get('depart_date'),
            'actual': json_item.get('actual')
        }
        return new_item

    def run(self):
        """Batch insert into the table."""
        with self.table.batch_writer() as batch_writer:
            for item in self.json_data:
                dynamodb_item = TicketsParser.get_item_from_json(item)
                batch_writer.put_item(dynamodb_item)

        print('Has been added ', len(self.json_data), 'items')

def lambda_handler(event, context):
    """Parse the stream and insert into the DynamoDB table."""
    print('Got event:', event)
    parser = TicketsParser(TABLE_NAME, event['Records'])
    parser.run()

إنشاء منبه دالة لامدا


يتم إنشاء وظيفة لامدا الثانية ، التي ستراقب الدفق الثاني (special_stream) وإرسال إشعار إلى SNS ، بنفس الطريقة. لذلك ، يجب أن يكون لدى lambda حق الوصول للقراءة من Kinesis وإرسال رسائل إلى موضوع SNS المحدد ، والذي يتم إرساله بعد ذلك بواسطة خدمة SNS إلى جميع المشتركين في هذا الموضوع (البريد الإلكتروني والرسائل القصيرة وما إلى ذلك).

إنشاء أدوار IAM
IAM Lambda-KinesisAlarm , alarm_notifier:



يجب أن تعمل لامدا هذه وفقًا للمشغل لإدخالات جديدة للدخول إلى تيار خاص ، لذلك تحتاج إلى تكوين الزناد بنفس الطريقة التي فعلنا بها لامبرت جامع.

من أجل راحة تكوين لامدا هذه ، نقدم متغير بيئة جديد - TOPIC_ARN ، حيث نضع موضوع ANR (Amazon Recourse Names) لشركات الطيران:


وأدخل رمز لامدا ، الأمر بسيط للغاية:

import boto3
import base64
import os

SNS_CLIENT = boto3.client('sns')
TOPIC_ARN = os.environ['TOPIC_ARN']


def lambda_handler(event, context):
    try:
        SNS_CLIENT.publish(TopicArn=TOPIC_ARN,
                           Message='Hi! I have found an interesting stuff!',
                           Subject='Airline tickets alarm')
        print('Alarm message has been successfully delivered')
    except Exception as err:
        print('Delivery failure', str(err))

يبدو أن هذا يكمل تكوين النظام اليدوي. يبقى فقط للاختبار والتأكد من تكوين كل شيء بشكل صحيح.

نشر من رمز Terraform


التحضير اللازم


Terraform هي أداة مفتوحة المصدر مريحة للغاية لنشر البنية التحتية من التعليمات البرمجية. يحتوي على بناء جملة خاص به سهل التعلم والعديد من الأمثلة على كيفية نشره وماذا. هناك العديد من المكونات الإضافية المناسبة في محرر Atom أو Visual Studio Code والتي تسهل العمل مع Terraform.

يمكنك تنزيل مجموعة التوزيع من هنا . تحليل مفصل لجميع ميزات Terraform خارج نطاق هذه المقالة ، لذلك سنقتصر على النقاط الرئيسية.

كيف تبدأ


كود المشروع الكامل موجود في مستودعي . نحن نستنسخ مستودعًا لأنفسنا. قبل البدء ، تحتاج إلى التأكد من أنك قمت بتثبيت وتكوين AWS CLI ، مثل سيبحث Terraform عن بيانات الاعتماد في ملف ~ / .aws / بيانات الاعتماد.

من الممارسات الجيدة نشر البنية التحتية بالكامل قبل تشغيل أمر الخطة لمعرفة ما يقوم Terraform بإنشائه لنا في السحابة:

terraform.exe plan

سيُطلب منك إدخال رقم هاتف لإرسال إشعارات إليه. في هذه المرحلة ، هو اختياري.


بعد تحليل خطة عمل البرنامج ، يمكننا البدء في إنشاء الموارد:

terraform.exe apply

بعد إرسال هذا الأمر ، سيُطلب منك مرة أخرى إدخال رقم هاتف ، اكتب "نعم" عندما يتم عرض السؤال حول التنفيذ الفعلي للإجراءات. سيسمح لك هذا برفع البنية التحتية بالكامل ، وتنفيذ جميع الإعدادات اللازمة لـ EC2 ، ونشر وظائف لامدا ، وما إلى ذلك.

بعد إنشاء جميع الموارد بنجاح من خلال رمز Terraform ، تحتاج إلى الدخول في تفاصيل تطبيق Kinesis Analytics (للأسف ، لم أجد كيفية القيام بذلك مباشرة من الشفرة).

بدء تطبيق:


بعد ذلك ، يجب عليك تعيين اسم الدفق داخل التطبيق بشكل صريح عن طريق الاختيار من القائمة المنسدلة:



الآن كل شيء جاهز للذهاب.

اختبار التطبيق


بغض النظر عن كيفية نشر النظام ، يدويًا أو من خلال رمز Terraform ، فسيعمل بنفس الطريقة.

ننتقل عبر SSH إلى الجهاز الظاهري EC2 حيث تم تثبيت وكيل Kinesis وتشغيل البرنامج النصي api_caller.py

sudo ./api_caller.py TOKEN

يبقى انتظار رسالة SMS إلى رقمك:


رسالة نصية قصيرة - تصل الرسالة على الهاتف في دقيقة واحدة تقريبًا:


يبقى أن نرى ما إذا تم حفظ السجلات في قاعدة بيانات DynamoDB لتحليلها لاحقًا وأكثر تفصيلاً. يحتوي جدول تذاكر الطيران على شيء مثل هذا:


استنتاج


في سياق العمل المنجز ، تم بناء نظام لمعالجة البيانات عبر الإنترنت يعتمد على Amazon Kinesis. لقد درسنا خيارات استخدام وكيل Kinesis بالاشتراك مع دفق بيانات Kinesis وتحليلات الوقت الحقيقي لـ Kinesis Analytics باستخدام أوامر SQL ، بالإضافة إلى تفاعل Amazon Kinesis مع خدمات AWS الأخرى.

نشرنا النظام أعلاه بطريقتين: يدوي طويل بما فيه الكفاية وسريع من رمز Terraform.

كود المصدر الكامل للمشروع متاح في مستودعي على GitHub ، أقترح عليك التعرف عليه.

أنا على استعداد لمناقشة المقال بسرور ، أنا في انتظار تعليقاتكم. آمل في النقد البناء.

أتمنى لك النجاح!

All Articles