Aviasales API-Integration mit Amazon Kinesis und serverlose Einfachheit

Hallo Habr!

Fliegen Sie gerne Flugzeuge? Ich liebe es, aber aus Gründen der Selbstisolation analysierte ich auch gerne die Flugdaten einer bekannten Ressource - Aviasales.

Heute werden wir die Arbeit von Amazon Kinesis analysieren, ein Streaming-System mit Echtzeitanalyse erstellen, die Amazon DynamoDB NoSQL-Datenbank als Haupt-Data Warehouse festlegen und SMS-Benachrichtigungen für interessante Tickets einrichten.

Alle Details unter dem Schnitt! Gehen!



Einführung


Zum Beispiel benötigen wir Zugriff auf die Aviasales-API . Der Zugriff darauf ist kostenlos und Sie müssen sich ohne Einschränkungen nur im Abschnitt "Entwickler" registrieren, um Ihr API-Token für den Zugriff auf Daten zu erhalten.

Der Hauptzweck dieses Artikels besteht darin, ein allgemeines Verständnis der Verwendung von Streaming-Informationen in AWS zu vermitteln. Wir stellen nicht in Frage, dass die von der verwendeten API zurückgegebenen Daten nicht unbedingt relevant sind und aus dem Cache übertragen werden, der auf der Suche nach Benutzern der Websites Aviasales.ru und Jetradar.com für generiert wird letzte 48 Stunden.

Die über die auf dem Herstellercomputer installierte API empfangenen Kinesis-Agent-Flugticketdaten werden automatisch analysiert und über Kinesis Data Analytics in den gewünschten Stream übertragen. Eine unverarbeitete Version dieses Streams wird direkt in das Repository geschrieben. Die Bereitstellung in der DynamoDB-Speicherung von Rohdaten ermöglicht eine eingehendere Analyse von Tickets mithilfe von BI-Tools wie AWS Quick Sight.

Wir werden zwei Optionen für die Bereitstellung der gesamten Infrastruktur in Betracht ziehen:

  • Handbuch - über die AWS Management Console;
  • Infrastruktur aus Terraform-Code - für faule Automatisierungsingenieure;

Architektur des in Entwicklung befindlichen Systems



Verwendete Komponenten:

  • Aviasales-API - Die von dieser API zurückgegebenen Daten werden für alle nachfolgenden Arbeiten verwendet.
  • EC2 Producer Instance - eine reguläre virtuelle Maschine in der Cloud, auf der der Eingabedatenstrom generiert wird:

    • Kinesis Agent ist eine lokal installierte Java-Anwendung, mit der Daten auf einfache Weise erfasst und an Kinesis gesendet werden können (Kinesis Data Streams oder Kinesis Firehose). Der Agent überwacht ständig eine Reihe von Dateien in den angegebenen Verzeichnissen und sendet neue Daten an Kinesis.
    • Caller-API-Skript - Ein Python-Skript, das API-Anforderungen stellt und die Antwort in einem Ordner ablegt, den Kinesis Agent überwacht.
  • Kinesis Data Streams — ;
  • Kinesis Analytics — , . Amazon Kinesis Data Analytics ;
  • AWS Lambda — , . ;
  • Amazon DynamoDB — «‑» , 10 . DynamoDB - , . DynamoDB , . ;
  • Amazon SNS — « — » (Pub/Sub), , . SNS push-, SMS- .

Erstausbildung


Um den Datenstrom zu emulieren, habe ich mich entschieden, die von der Aviasales-API zurückgegebenen Flugpreisinformationen zu verwenden. Die Dokumentation enthält eine ziemlich umfangreiche Liste verschiedener Methoden. Nehmen Sie eine davon - den „Preiskalender für den Monat“, der die Preise für jeden Tag des Monats zurückgibt, gruppiert nach der Anzahl der Überweisungen. Wenn Sie den Suchmonat in der Anfrage nicht übermitteln, werden Informationen für den Monat zurückgegeben, der auf den aktuellen Monat folgt.

Also, registrieren Sie sich, holen Sie sich Ihren Token.

Beispielanforderung unten:

http://api.travelpayouts.com/v2/prices/month-matrix?currency=rub&origin=LED&destination=HKT&show_to_affiliates=true&token=TOKEN_API

Die obige Methode zum Empfangen von Daten von der API mit dem Token in der Anforderung funktioniert, aber ich bevorzuge es, das Zugriffstoken über den Header zu übergeben, daher verwenden wir diese Methode im Skript api_caller.py.

Antwortbeispiel:

{{
   "success":true,
   "data":[{
      "show_to_affiliates":true,
      "trip_class":0,
      "origin":"LED",
      "destination":"HKT",
      "depart_date":"2015-10-01",
      "return_date":"",
      "number_of_changes":1,
      "value":29127,
      "found_at":"2015-09-24T00:06:12+04:00",
      "distance":8015,
      "actual":true
   }]
}

Das obige API-Antwortbeispiel zeigt ein Ticket von St. Petersburg nach Phuk ... Oh, wovon man träumen soll ...
Da ich aus Kasan komme und Phuket "nur von uns träumt", werden wir nach Tickets von St. Petersburg nach Kasan suchen.
Es wird davon ausgegangen, dass Sie bereits ein AWS-Konto haben. Ich möchte sofort besonders darauf achten, dass Kinesis und das Versenden von Benachrichtigungen per SMS nicht in der jährlichen kostenlosen Stufe (kostenlose Nutzung) enthalten sind . Trotzdem ist es mit ein paar Dollar durchaus möglich, das vorgeschlagene System aufzubauen und damit zu spielen. Und vergessen Sie natürlich nicht, alle Ressourcen zu löschen, wenn sie unnötig werden.
Glücklicherweise werden DynamoDb- und Lambda-Funktionen für uns Shareware sein, wenn Sie die monatlichen kostenlosen Limits einhalten. Zum Beispiel für DynamoDB: 25 GB Speicher, 25 WCU / RCU und 100 Millionen Anforderungen. Und eine Million Anrufe bei Lambda-Funktionen pro Monat.

Manuelles Bereitstellungssystem


Einrichten von Kinesis-Datenströmen


Rufen Sie den Kinesis Data Streams-Dienst auf und erstellen Sie zwei neue Streams, jeweils einen Shard.

Was ist eine Scherbe?
— Amazon Kinesis. 1 / 2 /. 1000 PUT . . , . 2 / 4 / 2000 PUT .

Je mehr Shards sich in Ihrem Stream befinden, desto höher ist der Durchsatz. Im Prinzip werden Streams auf diese Weise durch Hinzufügen von Shards skaliert. Aber je mehr Scherben Sie haben, desto höher ist der Preis. Jeder Shard kostet 1,5 Cent pro Stunde und zusätzlich 1,4 Cent pro Million PUT-Nutzlasteinheiten.

Erstellen Sie einen neuen Thread mit dem Namen airline_tickets . 1 Shard reicht dafür aus:


Erstellen Sie nun einen weiteren Stream mit dem Namen special_stream :


Produzenteneinstellung


Als Datenproduzent zum Parsen einer Aufgabe reicht es aus, eine reguläre EC2-Instanz zu verwenden. Es muss keine leistungsstarke, teure virtuelle Maschine sein, spot t2.micro ist durchaus geeignet.

Wichtiger Hinweis: Sie sollten beispielsweise image - Amazon Linux AMI 2018.03.0 verwenden. Daher gibt es weniger Einstellungen, um den Kinesis Agent schnell zu starten.

Gehen Sie zum EC2-Dienst, erstellen Sie eine neue virtuelle Maschine und wählen Sie die gewünschte AMI mit dem Typ t2.micro aus, der in Free Tier enthalten ist:


Damit die neu erstellte virtuelle Maschine mit dem Kinesis-Dienst interagieren kann, müssen Sie ihr das Recht dazu geben. Der beste Weg, dies zu tun, besteht darin, eine IAM-Rolle zuzuweisen. Wählen Sie daher im Bildschirm Schritt 3: Instanzdetails konfigurieren die Option Neue IAM-Rolle erstellen aus :

Erstellen von IAM-Rollen für EC2

, , EC2 Permissions:


, : AmazonKinesisFullAccess CloudWatchFullAccess.

- , : EC2-KinesisStreams-FullAccess. , , :


, :


.

Sie können standardmäßig die Parameter der Festplatte und auch die Tags belassen (obwohl es empfehlenswert ist, Tags zu verwenden, geben Sie mindestens den Namen der Instanz an und geben Sie die Umgebung an).

Jetzt befinden wir uns auf der Registerkarte Schritt 6: Konfigurieren der Sicherheitsgruppe, auf der Sie eine neue erstellen oder Ihre vorhandene Sicherheitsgruppe angeben müssen, mit der Sie über ssh (Port 22) eine Verbindung zur Instanz herstellen können. Wählen Sie dort Quelle -> Meine IP und Sie können die Instanz ausführen.


Sobald der Betriebsstatus erreicht ist, können Sie versuchen, über ssh eine Verbindung herzustellen.

Um mit Kinesis Agent arbeiten zu können, müssen Sie nach einer erfolgreichen Verbindung mit dem Computer die folgenden Befehle in das Terminal eingeben:

sudo yum -y update
sudo yum install -y python36 python36-pip
sudo /usr/bin/pip-3.6 install --upgrade pip
sudo yum install -y aws-kinesis-agent

Erstellen Sie einen Ordner zum Speichern von API-Antworten:

sudo mkdir /var/log/airline_tickets

Bevor Sie den Agenten starten, müssen Sie seine Konfiguration konfigurieren:

sudo vim /etc/aws-kinesis/agent.json

Der Inhalt der Datei agent.json sollte folgendermaßen aussehen:

{
  "cloudwatch.emitMetrics": true,
  "kinesis.endpoint": "",
  "firehose.endpoint": "",

  "flows": [
    {
      "filePattern": "/var/log/airline_tickets/*log",
      "kinesisStream": "airline_tickets",
      "partitionKeyOption": "RANDOM",
      "dataProcessingOptions": [
         {
            "optionName": "CSVTOJSON",
            "customFieldNames": ["cost","trip_class","show_to_affiliates",
                "return_date","origin","number_of_changes","gate","found_at",
                "duration","distance","destination","depart_date","actual","record_id"]
         }
      ]
    }
  ]
}

Wie Sie der Konfigurationsdatei entnehmen können, überwacht der Agent Dateien mit der Erweiterung .log im Verzeichnis / var / log / airline_tickets /, analysiert sie und überträgt sie in den Stream airline_tickets.

Wir starten den Dienst neu und stellen sicher, dass er startet und funktioniert:

sudo service aws-kinesis-agent restart

Laden Sie nun das Python-Skript herunter, das Daten von der API anfordert:

REPO_PATH=https://raw.githubusercontent.com/igorgorbenko/aviasales_kinesis/master/producer

wget $REPO_PATH/api_caller.py -P /home/ec2-user/
wget $REPO_PATH/requirements.txt -P /home/ec2-user/
sudo chmod a+x /home/ec2-user/api_caller.py
sudo /usr/local/bin/pip3 install -r /home/ec2-user/requirements.txt

Das Skript api_caller.py fordert Daten von Aviasales an und speichert die empfangene Antwort in dem Verzeichnis, das der Kinesis-Agent scannt. Die Implementierung dieses Skripts ist ziemlich Standard, es gibt eine Klasse TicketsApi, mit der Sie die API asynchron abrufen können. In dieser Klasse übergeben wir den Header mit dem Token und den Anforderungsparametern:

class TicketsApi:
    """Api caller class."""

    def __init__(self, headers):
        """Init method."""
        self.base_url = BASE_URL
        self.headers = headers

    async def get_data(self, data):
        """Get the data from API query."""
        response_json = {}
        async with ClientSession(headers=self.headers) as session:
            try:
                response = await session.get(self.base_url, data=data)
                response.raise_for_status()
                LOGGER.info('Response status %s: %s',
                            self.base_url, response.status)
                response_json = await response.json()
            except HTTPError as http_err:
                LOGGER.error('Oops! HTTP error occurred: %s', str(http_err))
            except Exception as err:
                LOGGER.error('Oops! An error ocurred: %s', str(err))
            return response_json


def prepare_request(api_token):
    """Return the headers and query fot the API request."""
    headers = {'X-Access-Token': api_token,
               'Accept-Encoding': 'gzip'}

    data = FormData()
    data.add_field('currency', CURRENCY)
    data.add_field('origin', ORIGIN)
    data.add_field('destination', DESTINATION)
    data.add_field('show_to_affiliates', SHOW_TO_AFFILIATES)
    data.add_field('trip_duration', TRIP_DURATION)
    return headers, data


async def main():
    """Get run the code."""
    if len(sys.argv) != 2:
        print('Usage: api_caller.py <your_api_token>')
        sys.exit(1)
        return
    api_token = sys.argv[1]
    headers, data = prepare_request(api_token)

    api = TicketsApi(headers)
    response = await api.get_data(data)
    if response.get('success', None):
        LOGGER.info('API has returned %s items', len(response['data']))
        try:
            count_rows = log_maker(response)
            LOGGER.info('%s rows have been saved into %s',
                        count_rows,
                        TARGET_FILE)
        except Exception as e:
            LOGGER.error('Oops! Request result was not saved to file. %s',
                         str(e))
    else:
        LOGGER.error('Oops! API request was unsuccessful %s!', response)

Um die korrekten Einstellungen und die Funktionsfähigkeit des Agenten zu testen, führen wir einen Testlauf des Skripts api_caller.py durch:

sudo ./api_caller.py TOKEN


Und wir sehen uns das Ergebnis der Arbeit in den Protokollen des Agenten und auf der Registerkarte Überwachung im Datenstrom von Airline_Tickets an:

tail -f /var/log/aws-kinesis-agent/aws-kinesis-agent.log



Wie Sie sehen, funktioniert alles und der Kinesis Agent sendet erfolgreich Daten an den Stream. Konfigurieren Sie nun den Verbraucher.

Einrichten von Kinesis Data Analytics


Kommen wir zur zentralen Komponente des gesamten Systems - erstellen Sie in Kinesis Data Analytics eine neue Anwendung mit dem Namen kinesis_analytics_airlines_app:


Kinesis Data Analytics ermöglicht die Echtzeitanalyse von Daten aus Kinesis Streams mithilfe von SQL. Dies ist ein vollständig automatisch skalierbarer Dienst (im Gegensatz zu Kinesis Streams), der:

  1. Ermöglicht das Erstellen neuer Streams (Output Stream) basierend auf Abfragen der Quelldaten.
  2. stellt einen Stream mit Fehlern bereit, die während des Anwendungsbetriebs aufgetreten sind (Error Stream);
  3. Es kann das Eingabedatenschema automatisch bestimmen (es kann bei Bedarf manuell neu definiert werden).

Dies ist ein teurer Service - 0,11 USD pro Stunde. Sie sollten ihn daher vorsichtig verwenden und entfernen, wenn Sie die Arbeit abgeschlossen haben.

Verbinden Sie die Anwendung mit der Datenquelle:


Wählen Sie den Stream aus, zu dem Sie eine Verbindung herstellen möchten (airline_tickets):


Als Nächstes müssen Sie die neue IAM-Rolle anhängen, damit die Anwendung aus dem Stream lesen und in den Stream schreiben kann. Dazu reicht es aus, nichts im Zugriffsberechtigungsblock zu ändern:


Nun fordern wir die Ermittlung des Datenschemas im Stream an, dazu klicken wir auf die Schaltfläche "Schema erkennen". Infolgedessen wird die IAM-Rolle aktualisiert (eine neue wird erstellt) und die Ermittlung des Schemas aus den bereits im Stream eingetroffenen Daten wird gestartet:


Jetzt müssen Sie zum SQL-Editor gehen. Wenn Sie auf diese Schaltfläche klicken, wird ein Fenster mit einer Frage zum Starten der Anwendung angezeigt. Wählen Sie aus, was ausgeführt werden soll:


Fügen Sie im SQL-Editorfenster eine so einfache Abfrage ein und klicken Sie auf SQL speichern und ausführen:

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ("cost" DOUBLE, "gate" VARCHAR(16));

CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM "cost", "gate"
FROM "SOURCE_SQL_STREAM_001"
WHERE "cost" < 5000
    and "gate" = 'Aeroflot';

In relationalen Datenbanken arbeiten Sie mit Tabellen, indem Sie INSERT-Anweisungen verwenden, um Datensätze hinzuzufügen, und eine SELECT-Anweisung, um Daten abzufragen. In Amazon Kinesis Data Analytics arbeiten Sie mit Streams (STREAM) und „Pumps“ (PUMP) - fortlaufende Einfügeanforderungen, mit denen Daten aus einem Stream in einer Anwendung in einen anderen Stream eingefügt werden.

In der obigen SQL-Abfrage werden Aeroflot-Tickets zu Preisen unter fünftausend Rubel gesucht. Alle Datensätze, die unter diese Bedingungen fallen, werden in den Stream DESTINATION_SQL_STREAM gestellt.


Wählen Sie im Zielblock den Stream special_stream und in der Dropdown-Liste Name DESTINATION_SQL_STREAM des Streams in der Anwendung Folgendes aus:


Als Ergebnis aller Manipulationen sollte sich etwas Ähnliches wie im folgenden Bild herausstellen:



Erstellen und Abonnieren des SNS-Themas


Gehen Sie zum Simple Notification Service und erstellen Sie ein neues Thema mit dem Namen Airlines:


Wir abonnieren dieses Thema und geben darin die Handynummer an, an die SMS-Benachrichtigungen gesendet werden:


Erstellen einer Tabelle in DynamoDB


Erstellen Sie in DynamoDB eine gleichnamige Tabelle, um die Rohdaten des Streams "Airline_Tickets" zu speichern. Als Primärschlüssel verwenden wir record_id:


Erstellen einer Lambda-Kollektorfunktion


Erstellen wir eine Lambda-Funktion namens Collector, deren Aufgabe es ist, den Stream airline_tickets abzufragen und diese Datensätze in die DynamoDB-Tabelle einzufügen, wenn dort neue Datensätze vorhanden sind. Zusätzlich zu den Standardrechten muss dieses Lambda Zugriff haben, um den Kinesis-Datenstrom lesen und in DynamoDB schreiben zu können.

Erstellen einer IAM-Rolle für eine Lambda-Kollektorfunktion
IAM Lambda-TicketsProcessingRole:


AmazonKinesisReadOnlyAccess AmazonDynamoDBFullAccess, :



Dieses Lambda sollte durch einen Kinesis-Trigger ausgelöst werden, wenn neue Einträge in den Stream airline_stream gelangen. Sie müssen also einen neuen Trigger hinzufügen:



Es bleibt, den Code einzufügen und das Lambda zu speichern.

"""Parsing the stream and inserting into the DynamoDB table."""
import base64
import json
import boto3
from decimal import Decimal

DYNAMO_DB = boto3.resource('dynamodb')
TABLE_NAME = 'airline_tickets'

class TicketsParser:
    """Parsing info from the Stream."""

    def __init__(self, table_name, records):
        """Init method."""
        self.table = DYNAMO_DB.Table(table_name)
        self.json_data = TicketsParser.get_json_data(records)

    @staticmethod
    def get_json_data(records):
        """Return deserialized data from the stream."""
        decoded_record_data = ([base64.b64decode(record['kinesis']['data'])
                                for record in records])
        json_data = ([json.loads(decoded_record)
                      for decoded_record in decoded_record_data])
        return json_data

    @staticmethod
    def get_item_from_json(json_item):
        """Pre-process the json data."""
        new_item = {
            'record_id': json_item.get('record_id'),
            'cost': Decimal(json_item.get('cost')),
            'trip_class': json_item.get('trip_class'),
            'show_to_affiliates': json_item.get('show_to_affiliates'),
            'origin': json_item.get('origin'),
            'number_of_changes': int(json_item.get('number_of_changes')),
            'gate': json_item.get('gate'),
            'found_at': json_item.get('found_at'),
            'duration': int(json_item.get('duration')),
            'distance': int(json_item.get('distance')),
            'destination': json_item.get('destination'),
            'depart_date': json_item.get('depart_date'),
            'actual': json_item.get('actual')
        }
        return new_item

    def run(self):
        """Batch insert into the table."""
        with self.table.batch_writer() as batch_writer:
            for item in self.json_data:
                dynamodb_item = TicketsParser.get_item_from_json(item)
                batch_writer.put_item(dynamodb_item)

        print('Has been added ', len(self.json_data), 'items')

def lambda_handler(event, context):
    """Parse the stream and insert into the DynamoDB table."""
    print('Got event:', event)
    parser = TicketsParser(TABLE_NAME, event['Records'])
    parser.run()

Erstellen eines Lambda-Funktionsbenachrichtigers


Die zweite Lambda-Funktion, die den zweiten Stream (special_stream) überwacht und eine Benachrichtigung an SNS sendet, wird auf die gleiche Weise erstellt. Daher muss dieses Lambda Lesezugriff von Kinesis haben und Nachrichten an das angegebene SNS-Thema senden, die dann vom SNS-Dienst an alle Abonnenten dieses Themas (E-Mail, SMS usw.) gesendet werden.

Erstellen Sie IAM-Rollen
IAM Lambda-KinesisAlarm , alarm_notifier:



Dieses Lambda sollte gemäß dem Auslöser für neue Einträge funktionieren, um in den special_stream zu gelangen. Daher müssen Sie den Auslöser genauso konfigurieren wie für das Collector-Lambda.

Zur Vereinfachung der Konfiguration dieses Lambdas führen wir eine neue Umgebungsvariable ein - TOPIC_ARN, in die wir das ANR-Thema (Amazon Recourse Names) von Airlines einfügen:


Und geben Sie den Lambda-Code ein, es ist ganz einfach:

import boto3
import base64
import os

SNS_CLIENT = boto3.client('sns')
TOPIC_ARN = os.environ['TOPIC_ARN']


def lambda_handler(event, context):
    try:
        SNS_CLIENT.publish(TopicArn=TOPIC_ARN,
                           Message='Hi! I have found an interesting stuff!',
                           Subject='Airline tickets alarm')
        print('Alarm message has been successfully delivered')
    except Exception as err:
        print('Delivery failure', str(err))

Dies scheint die manuelle Systemkonfiguration abzuschließen. Es bleibt nur zu testen und sicherzustellen, dass wir alles richtig konfiguriert haben.

Bereitstellung aus Terraform-Code


Notwendige Vorbereitung


Terraform ist ein sehr praktisches Open-Source-Tool zum Bereitstellen der Infrastruktur aus Code. Es hat eine eigene Syntax, die leicht zu erlernen ist, und viele Beispiele dafür, wie und was bereitgestellt werden soll. Es gibt viele praktische Plugins im Atom- oder Visual Studio-Code-Editor, die die Arbeit mit Terraform erleichtern.

Sie können das Distributionskit hier herunterladen . Eine detaillierte Analyse aller Funktionen von Terraform würde den Rahmen dieses Artikels sprengen, daher beschränken wir uns auf die Hauptpunkte.

Wie man anfängt


Der vollständige Projektcode befindet sich in meinem Repository . Wir klonen ein Repository für uns. Bevor Sie beginnen, müssen Sie sicherstellen, dass Sie AWS CLI installiert und konfiguriert haben Terraform sucht in der Datei ~ / .aws / credentials nach Anmeldeinformationen.

Es wird empfohlen, die gesamte Infrastruktur bereitzustellen, bevor Sie den Befehl plan starten, um zu sehen, was Terraform in der Cloud für uns erstellt:

terraform.exe plan

Sie werden aufgefordert, eine Telefonnummer einzugeben, um Benachrichtigungen an diese zu senden. Zu diesem Zeitpunkt ist dies optional.


Nach der Analyse des Arbeitsplans des Programms können wir mit der Erstellung von Ressourcen beginnen:

terraform.exe apply

Nach dem Senden dieses Befehls werden Sie erneut aufgefordert, eine Telefonnummer einzugeben. Geben Sie "Ja" ein, wenn die Frage zur tatsächlichen Ausführung von Aktionen angezeigt wird. Auf diese Weise können Sie die gesamte Infrastruktur erhöhen, alle erforderlichen Einstellungen für EC2 vornehmen, Lambda-Funktionen bereitstellen usw.

Nachdem alle Ressourcen erfolgreich über den Terraform-Code erstellt wurden, müssen Sie auf die Details der Kinesis Analytics-Anwendung eingehen (leider habe ich nicht herausgefunden, wie dies direkt aus dem Code heraus geschehen kann).

Starte die Anwendung:


Danach müssen Sie den Stream-Namen in der Anwendung explizit festlegen, indem Sie aus der Dropdown-Liste Folgendes auswählen:



Jetzt ist alles bereit zu gehen.

Anwendungstests


Unabhängig davon, wie Sie das System manuell oder über den Terraform-Code bereitgestellt haben, funktioniert es genauso.

Wir gehen über SSH zur virtuellen EC2-Maschine, auf der Kinesis Agent installiert ist, und führen das Skript api_caller.py aus

sudo ./api_caller.py TOKEN

Es bleibt noch auf eine SMS an Ihre Nummer zu warten:


SMS - Nachricht kommt in fast 1 Minute am Telefon an:


Es bleibt abzuwarten, ob Datensätze in der DynamoDB-Datenbank für eine spätere, detailliertere Analyse gespeichert werden. Die Airline_Tickets-Tabelle enthält ungefähr Folgendes:


Fazit


Im Laufe der Arbeit wurde ein auf Amazon Kinesis basierendes Online-Datenverarbeitungssystem aufgebaut. Wir haben die Optionen für die Verwendung von Kinesis Agent in Verbindung mit Kinesis Data Streams und die Echtzeitanalyse von Kinesis Analytics mithilfe von SQL-Befehlen sowie die Interaktion von Amazon Kinesis mit anderen AWS-Diensten untersucht.

Wir haben das oben genannte System auf zwei Arten bereitgestellt: lang genug manuell und schnell aus dem Terraform-Code.

Der gesamte Quellcode des Projekts ist in meinem Repository auf GitHub verfügbar. Ich empfehle Ihnen, sich damit vertraut zu machen.

Ich bin bereit, den Artikel gerne zu diskutieren, ich warte auf Ihre Kommentare. Ich hoffe auf konstruktive Kritik.

Wünsche dir Erfolg!

All Articles