Intégration d'API Aviasales avec Amazon Kinesis et simplicité sans serveur

Bonjour, Habr!

Aimez-vous piloter des avions? J'adore ça, mais sur l'auto-isolement, j'ai aussi aimé analyser les données sur les billets d'avion d'une ressource bien connue - Aviasales.

Aujourd'hui, nous analyserons le travail d'Amazon Kinesis, construirons un systÚme de streaming avec des analyses en temps réel, placerons la base de données Amazon DynamoDB NoSQL comme entrepÎt de données principal et configurerons des alertes SMS pour les tickets intéressants.

Tous les détails sous la coupe! Aller!



introduction


Par exemple, nous devons avoir accÚs à l'API Aviasales . L'accÚs à celui-ci est gratuit et sans restriction, il vous suffit de vous inscrire dans la section "Développeurs" pour obtenir votre token API pour accéder aux données.

L'objectif principal de cet article est de donner une compréhension générale de l'utilisation des informations de streaming dans AWS, nous faisons en sorte que les données renvoyées par l'API utilisée ne soient pas strictement pertinentes et soient transférées à partir du cache, qui est généré à partir des recherches des utilisateurs des sites Aviasales.ru et Jetradar.com pour derniÚres 48 heures.

Les données des billets d'avion de l'agent Kinesis reçues via l'API installée sur la machine du producteur seront automatiquement analysées et transférées vers le flux souhaité via Kinesis Data Analytics. Une version non traitée de ce flux sera écrite directement dans le référentiel. Déployé dans le stockage DynamoDB des données brutes permettra une analyse plus approfondie des tickets via des outils de BI, tels qu'AWS Quick Sight.

Nous considérerons deux options pour déployer l'ensemble de l'infrastructure:

  • Manuel - via AWS Management Console;
  • Infrastructure Ă  partir du code Terraform - pour les ingĂ©nieurs d'automatisation paresseux;

Architecture du systÚme en cours de développement



Composants utilisés:

  • API Aviasales - les donnĂ©es renvoyĂ©es par cette API seront utilisĂ©es pour tous les travaux ultĂ©rieurs;
  • EC2 Producer Instance - une machine virtuelle ordinaire dans le cloud sur laquelle le flux de donnĂ©es d'entrĂ©e sera gĂ©nĂ©rĂ©:

    • Kinesis Agent est une application Java installĂ©e localement qui fournit un moyen simple de collecter et d'envoyer des donnĂ©es Ă  Kinesis (Kinesis Data Streams ou Kinesis Firehose). L'agent surveille en permanence un ensemble de fichiers dans les rĂ©pertoires spĂ©cifiĂ©s et envoie de nouvelles donnĂ©es Ă  Kinesis;
    • Script de l'API de l'appelant - un script Python qui fait des demandes d'API et place la rĂ©ponse dans un dossier surveillĂ© par l'agent Kinesis;
  • Kinesis Data Streams — ;
  • Kinesis Analytics — , . Amazon Kinesis Data Analytics ;
  • AWS Lambda — , . ;
  • Amazon DynamoDB — «‑» , 10 . DynamoDB - , . DynamoDB , . ;
  • Amazon SNS — « — » (Pub/Sub), , . SNS push-, SMS- .

Formation initiale


Pour émuler le flux de données, j'ai décidé d'utiliser les informations sur les billets d'avion renvoyées par l'API Aviasales. La documentation contient une liste assez complÚte de différentes méthodes, prenez l'une d'entre elles - le «Calendrier des prix pour le mois», qui renvoie les prix pour chaque jour du mois, regroupés par nombre de transferts. Si vous ne transmettez pas le mois de recherche dans la demande, les informations seront retournées pour le mois suivant celui en cours.

Alors, inscrivez-vous, obtenez votre jeton.

Exemple de demande ci-dessous:

http://api.travelpayouts.com/v2/prices/month-matrix?currency=rub&origin=LED&destination=HKT&show_to_affiliates=true&token=TOKEN_API

La mĂ©thode ci-dessus pour recevoir des donnĂ©es de l'API avec le jeton dans la demande fonctionnera, mais je prĂ©fĂšre passer le jeton d'accĂšs via l'en-tĂȘte, nous allons donc utiliser cette mĂ©thode dans le script api_caller.py.

Exemple de réponse:

{{
   "success":true,
   "data":[{
      "show_to_affiliates":true,
      "trip_class":0,
      "origin":"LED",
      "destination":"HKT",
      "depart_date":"2015-10-01",
      "return_date":"",
      "number_of_changes":1,
      "value":29127,
      "found_at":"2015-09-24T00:06:12+04:00",
      "distance":8015,
      "actual":true
   }]
}

L'exemple de rĂ©ponse API ci-dessus montre un billet de Saint-PĂ©tersbourg Ă  Phuk ... Oh, de quoi rĂȘver ...
Puisque je viens de Kazan et que Phuket "rĂȘve tout simplement de nous" maintenant, nous chercherons des billets de Saint-PĂ©tersbourg Ă  Kazan.
Il est supposĂ© que vous disposez dĂ©jĂ  d'un compte AWS. Je veux tout de suite faire particuliĂšrement attention Ă  ce que Kinesis et l'envoi de notifications par SMS ne soient pas inclus dans le niveau gratuit annuel (utilisation gratuite) . Mais malgrĂ© cela, avec quelques dollars en tĂȘte, il est tout Ă  fait possible de construire le systĂšme proposĂ© et de jouer avec. Et, bien sĂ»r, n'oubliez pas de supprimer toutes les ressources lorsqu'elles deviennent inutiles.
Heureusement, les fonctions DynamoDb et lambda seront des partagiciels pour nous si vous respectez les limites mensuelles gratuites. Par exemple, pour DynamoDB: 25 Go de stockage, 25 WCU / RCU et 100 millions de demandes. Et un million d'appels aux fonctions lambda par mois.

SystÚme de déploiement manuel


Configuration des flux de données Kinesis


Accédez au service Kinesis Data Streams et créez deux nouveaux flux, un fragment pour chacun.

Qu'est-ce qu'un Ă©clat?
— Amazon Kinesis. 1 / 2 /. 1000 PUT . . , . 2 / 4 / 2000 PUT .

Plus il y a d'éclats dans votre flux, plus son débit est important. En principe, les flux sont mis à l'échelle de cette façon en ajoutant des fragments. Mais plus vous avez d'éclats, plus le prix est élevé. Chaque fragment coûte 1,5 centime par heure et 1,4 centime supplémentaire pour chaque million d'unités de charge utile PUT.

Créez un nouveau thread avec le nom flight_tickets , 1 fragment suffira pour cela:


Créez maintenant un autre flux appelé special_stream :


RĂ©glage du producteur


En tant que producteur de données pour analyser une tùche, il suffit d'utiliser une instance EC2 standard. Il n'est pas nécessaire que ce soit une machine virtuelle puissante et coûteuse; le spot t2.micro convient parfaitement.

Remarque importante: par exemple, vous devez utiliser l'image - Amazon Linux AMI 2018.03.0, avec elle, il y a moins de paramĂštres pour lancer rapidement l'agent Kinesis.

Accédez au service EC2, créez une nouvelle machine virtuelle, sélectionnez l'AMI souhaitée avec le type t2.micro, qui est inclus dans Free Tier:


Pour que la machine virtuelle nouvellement crĂ©Ă©e puisse interagir avec le service Kinesis, vous devez lui en donner le droit. La meilleure façon de procĂ©der consiste Ă  attribuer un rĂŽle IAM. Par consĂ©quent, sur l'Ă©cran Étape 3: Configurer les dĂ©tails de l'instance, sĂ©lectionnez CrĂ©er un nouveau rĂŽle IAM :

Création de rÎles IAM pour EC2

, , EC2 Permissions:


, : AmazonKinesisFullAccess CloudWatchFullAccess.

- , : EC2-KinesisStreams-FullAccess. , , :


, :


.

Vous pouvez laisser les paramÚtres du disque dur par défaut, les balises aussi (bien qu'il soit recommandé d'utiliser des balises, donnez au moins le nom de l'instance et spécifiez l'environnement).

Nous sommes maintenant sur l'onglet Étape 6: Configurer le groupe de sĂ©curitĂ©, oĂč vous devez en crĂ©er un nouveau ou spĂ©cifier votre groupe de sĂ©curitĂ© existant, ce qui vous permet de vous connecter via ssh (port 22) Ă  l'instance. SĂ©lectionnez Source -> Mon IP lĂ -bas et vous pouvez exĂ©cuter l'instance.


Une fois qu'il entre en Ă©tat de fonctionnement, vous pouvez essayer de vous y connecter via ssh.

Pour pouvoir travailler avec Kinesis Agent, aprÚs une connexion réussie à la machine, vous devez entrer les commandes suivantes dans le terminal:

sudo yum -y update
sudo yum install -y python36 python36-pip
sudo /usr/bin/pip-3.6 install --upgrade pip
sudo yum install -y aws-kinesis-agent

Créez un dossier pour enregistrer les réponses de l'API:

sudo mkdir /var/log/airline_tickets

Avant de démarrer l'agent, vous devez configurer sa configuration:

sudo vim /etc/aws-kinesis/agent.json

Le contenu du fichier agent.json doit ressembler Ă  ceci:

{
  "cloudwatch.emitMetrics": true,
  "kinesis.endpoint": "",
  "firehose.endpoint": "",

  "flows": [
    {
      "filePattern": "/var/log/airline_tickets/*log",
      "kinesisStream": "airline_tickets",
      "partitionKeyOption": "RANDOM",
      "dataProcessingOptions": [
         {
            "optionName": "CSVTOJSON",
            "customFieldNames": ["cost","trip_class","show_to_affiliates",
                "return_date","origin","number_of_changes","gate","found_at",
                "duration","distance","destination","depart_date","actual","record_id"]
         }
      ]
    }
  ]
}

Comme vous pouvez le voir dans le fichier de configuration, l'agent surveillera les fichiers avec l'extension .log dans le répertoire / var / log / airlines_tickets /, les analysera et les transférera vers le flux de airlines_tickets.

Nous redémarrons le service et nous assurons qu'il démarre et fonctionne:

sudo service aws-kinesis-agent restart

Téléchargez maintenant le script Python qui demandera des données à l'API:

REPO_PATH=https://raw.githubusercontent.com/igorgorbenko/aviasales_kinesis/master/producer

wget $REPO_PATH/api_caller.py -P /home/ec2-user/
wget $REPO_PATH/requirements.txt -P /home/ec2-user/
sudo chmod a+x /home/ec2-user/api_caller.py
sudo /usr/local/bin/pip3 install -r /home/ec2-user/requirements.txt

Le script api_caller.py demande des donnĂ©es Ă  Aviasales et enregistre la rĂ©ponse reçue dans le rĂ©pertoire analysĂ© par l'agent Kinesis. L'implĂ©mentation de ce script est assez standard, il existe une classe TicketsApi, elle vous permet d'extraire l'API de maniĂšre asynchrone. Dans cette classe, nous passons l'en-tĂȘte avec le jeton et les paramĂštres de requĂȘte:

class TicketsApi:
    """Api caller class."""

    def __init__(self, headers):
        """Init method."""
        self.base_url = BASE_URL
        self.headers = headers

    async def get_data(self, data):
        """Get the data from API query."""
        response_json = {}
        async with ClientSession(headers=self.headers) as session:
            try:
                response = await session.get(self.base_url, data=data)
                response.raise_for_status()
                LOGGER.info('Response status %s: %s',
                            self.base_url, response.status)
                response_json = await response.json()
            except HTTPError as http_err:
                LOGGER.error('Oops! HTTP error occurred: %s', str(http_err))
            except Exception as err:
                LOGGER.error('Oops! An error ocurred: %s', str(err))
            return response_json


def prepare_request(api_token):
    """Return the headers and query fot the API request."""
    headers = {'X-Access-Token': api_token,
               'Accept-Encoding': 'gzip'}

    data = FormData()
    data.add_field('currency', CURRENCY)
    data.add_field('origin', ORIGIN)
    data.add_field('destination', DESTINATION)
    data.add_field('show_to_affiliates', SHOW_TO_AFFILIATES)
    data.add_field('trip_duration', TRIP_DURATION)
    return headers, data


async def main():
    """Get run the code."""
    if len(sys.argv) != 2:
        print('Usage: api_caller.py <your_api_token>')
        sys.exit(1)
        return
    api_token = sys.argv[1]
    headers, data = prepare_request(api_token)

    api = TicketsApi(headers)
    response = await api.get_data(data)
    if response.get('success', None):
        LOGGER.info('API has returned %s items', len(response['data']))
        try:
            count_rows = log_maker(response)
            LOGGER.info('%s rows have been saved into %s',
                        count_rows,
                        TARGET_FILE)
        except Exception as e:
            LOGGER.error('Oops! Request result was not saved to file. %s',
                         str(e))
    else:
        LOGGER.error('Oops! API request was unsuccessful %s!', response)

Pour tester les paramÚtres et l'opérabilité corrects de l'agent, nous allons effectuer un test de test du script api_caller.py:

sudo ./api_caller.py TOKEN


Et nous examinons le résultat du travail dans les journaux de l'agent et sur l'onglet Surveillance dans le flux de données de airlines_tickets:

tail -f /var/log/aws-kinesis-agent/aws-kinesis-agent.log



Comme vous pouvez le voir, tout fonctionne et l'agent Kinesis envoie avec succÚs des données au flux. Configurez maintenant le consommateur.

Configuration de Kinesis Data Analytics


Passons au composant central de tout le systÚme - créons une nouvelle application dans Kinesis Data Analytics appelée kinesis_analytics_airlines_app:


Kinesis Data Analytics permet une analyse en temps réel des données de Kinesis Streams à l'aide de SQL. Il s'agit d'un service entiÚrement auto-évolutif (contrairement à Kinesis Streams), qui:

  1. vous permet de crĂ©er de nouveaux flux (flux de sortie) en fonction des requĂȘtes sur les donnĂ©es source;
  2. fournit un flux avec des erreurs survenues pendant le fonctionnement de l'application (flux d'erreurs);
  3. Il peut dĂ©terminer automatiquement le schĂ©ma de donnĂ©es d'entrĂ©e (il peut ĂȘtre redĂ©fini manuellement si nĂ©cessaire).

Il s'agit d'un service coûteux - 0,11 USD par heure, vous devez donc l'utiliser avec soin et le retirer lorsque vous avez terminé le travail.

Connectez l'application à la source de données:


Sélectionnez le flux auquel vous souhaitez vous connecter (billets_aériens):


Ensuite, vous devez attacher le nouveau rĂŽle IAM afin que l'application puisse lire dans le flux et Ă©crire dans le flux. Pour ce faire, il suffit de ne rien changer dans le bloc Autorisations d'accĂšs:


Maintenant, nous demandons la découverte du schéma de données dans le flux, pour cela nous cliquons sur le bouton "Découvrir le schéma". En conséquence, le rÎle IAM sera mis à jour (un nouveau sera créé) et la découverte du schéma à partir des données déjà arrivées dans le flux sera lancée:


Maintenant, vous devez aller dans l'Ă©diteur SQL. Lorsque vous cliquez sur ce bouton, une fenĂȘtre avec une question sur le lancement de l'application apparaĂźtra - choisissez ce que nous voulons exĂ©cuter:


Dans la fenĂȘtre de l'Ă©diteur SQL, insĂ©rez une requĂȘte aussi simple et cliquez sur Enregistrer et exĂ©cuter SQL:

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ("cost" DOUBLE, "gate" VARCHAR(16));

CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM "cost", "gate"
FROM "SOURCE_SQL_STREAM_001"
WHERE "cost" < 5000
    and "gate" = 'Aeroflot';

Dans les bases de données relationnelles, vous travaillez avec des tables à l'aide d'instructions INSERT pour ajouter des enregistrements et d'une instruction SELECT pour interroger des données. Dans Amazon Kinesis Data Analytics, vous travaillez avec des flux (STREAM) et des «pompes» (PUMP) - des demandes d'insertion continues qui insÚrent des données d'un flux d'une application dans un autre flux.

Dans la requĂȘte SQL ci-dessus, les billets Aeroflot sont recherchĂ©s Ă  des prix infĂ©rieurs Ă  cinq mille roubles. Tous les enregistrements qui tombent dans ces conditions seront placĂ©s dans le flux DESTINATION_SQL_STREAM.


Dans le bloc Destination, sélectionnez le flux de flux spécial et dans la liste déroulante Nom du flux intégré à l'application DESTINATION_SQL_STREAM:


À la suite de toutes les manipulations, quelque chose de similaire à l'image ci-dessous devrait se produire:



Création et abonnement à la rubrique SNS


Accédez au service de notification simple et créez un nouveau sujet nommé Airlines:


Nous souscrivons à ce sujet, nous y indiquons le numéro de téléphone mobile auquel les notifications SMS seront envoyées:


Création d'une table dans DynamoDB


Pour stocker les donnĂ©es brutes de leur flux airlines_tickets, crĂ©ez une table dans DynamoDB avec le mĂȘme nom. Comme clĂ© primaire, nous utiliserons record_id:


Création d'une fonction de collecteur lambda


Créons une fonction lambda appelée Collector, dont la tùche est d'interroger le flux airlines_tickets et, s'il y a de nouveaux enregistrements, d'insérer ces enregistrements dans la table DynamoDB. De toute évidence, en plus des droits par défaut, ce lambda doit avoir accÚs pour lire le flux de données Kinesis et écrire dans DynamoDB.

Création d'un rÎle IAM pour une fonction de collecteur lambda
IAM Lambda-TicketsProcessingRole:


AmazonKinesisReadOnlyAccess AmazonDynamoDBFullAccess, :



Ce lambda doit ĂȘtre dĂ©clenchĂ© par un dĂ©clencheur Kinesis lorsque de nouvelles entrĂ©es atteignent le flux airlines_stream, vous devez donc ajouter un nouveau dĂ©clencheur:



Il reste à insérer le code et à sauvegarder le lambda.

"""Parsing the stream and inserting into the DynamoDB table."""
import base64
import json
import boto3
from decimal import Decimal

DYNAMO_DB = boto3.resource('dynamodb')
TABLE_NAME = 'airline_tickets'

class TicketsParser:
    """Parsing info from the Stream."""

    def __init__(self, table_name, records):
        """Init method."""
        self.table = DYNAMO_DB.Table(table_name)
        self.json_data = TicketsParser.get_json_data(records)

    @staticmethod
    def get_json_data(records):
        """Return deserialized data from the stream."""
        decoded_record_data = ([base64.b64decode(record['kinesis']['data'])
                                for record in records])
        json_data = ([json.loads(decoded_record)
                      for decoded_record in decoded_record_data])
        return json_data

    @staticmethod
    def get_item_from_json(json_item):
        """Pre-process the json data."""
        new_item = {
            'record_id': json_item.get('record_id'),
            'cost': Decimal(json_item.get('cost')),
            'trip_class': json_item.get('trip_class'),
            'show_to_affiliates': json_item.get('show_to_affiliates'),
            'origin': json_item.get('origin'),
            'number_of_changes': int(json_item.get('number_of_changes')),
            'gate': json_item.get('gate'),
            'found_at': json_item.get('found_at'),
            'duration': int(json_item.get('duration')),
            'distance': int(json_item.get('distance')),
            'destination': json_item.get('destination'),
            'depart_date': json_item.get('depart_date'),
            'actual': json_item.get('actual')
        }
        return new_item

    def run(self):
        """Batch insert into the table."""
        with self.table.batch_writer() as batch_writer:
            for item in self.json_data:
                dynamodb_item = TicketsParser.get_item_from_json(item)
                batch_writer.put_item(dynamodb_item)

        print('Has been added ', len(self.json_data), 'items')

def lambda_handler(event, context):
    """Parse the stream and insert into the DynamoDB table."""
    print('Got event:', event)
    parser = TicketsParser(TABLE_NAME, event['Records'])
    parser.run()

Création d'un notificateur de fonction lambda


La deuxiĂšme fonction lambda, qui surveillera le deuxiĂšme flux (flux_ spĂ©cial) et enverra une notification Ă  SNS, est crĂ©Ă©e de la mĂȘme maniĂšre. Par consĂ©quent, ce lambda doit avoir un accĂšs en lecture depuis Kinesis et envoyer des messages Ă  la rubrique SNS spĂ©cifiĂ©e, qui est ensuite envoyĂ©e par le service SNS Ă  tous les abonnĂ©s de cette rubrique (e-mail, SMS, etc.).

Créer des rÎles IAM
IAM Lambda-KinesisAlarm , alarm_notifier:



Ce lambda devrait fonctionner en fonction du dĂ©clencheur pour que de nouvelles entrĂ©es entrent dans le flux spĂ©cial, vous devez donc configurer le dĂ©clencheur de la mĂȘme maniĂšre que nous l'avons fait pour le lambda Collector.

Pour la commoditĂ© de la configuration de cette lambda, nous introduisons une nouvelle variable d'environnement - TOPIC_ARN, oĂč nous plaçons le sujet ANR (Amazon Recourse Names) de Airlines:


Et insérez le code lambda, c'est trÚs simple:

import boto3
import base64
import os

SNS_CLIENT = boto3.client('sns')
TOPIC_ARN = os.environ['TOPIC_ARN']


def lambda_handler(event, context):
    try:
        SNS_CLIENT.publish(TopicArn=TOPIC_ARN,
                           Message='Hi! I have found an interesting stuff!',
                           Subject='Airline tickets alarm')
        print('Alarm message has been successfully delivered')
    except Exception as err:
        print('Delivery failure', str(err))

Il semble que cela termine la configuration manuelle du systÚme. Il ne reste plus qu'à tester et à s'assurer que nous avons tout configuré correctement.

DĂ©ployer Ă  partir du code Terraform


Préparation nécessaire


Terraform est un outil open source trÚs pratique pour déployer l'infrastructure à partir du code. Il a sa propre syntaxe qui est facile à apprendre et de nombreux exemples de comment et quoi déployer. Il existe de nombreux plugins pratiques dans l'éditeur de code Atom ou Visual Studio qui facilitent le travail avec Terraform.

Vous pouvez télécharger le kit de distribution ici . Une analyse détaillée de toutes les fonctionnalités de Terraform dépasse le cadre de cet article, nous nous limiterons donc aux points principaux.

Comment commencer


Le code de projet complet est dans mon rĂ©fĂ©rentiel . Nous clonons un rĂ©fĂ©rentiel pour nous-mĂȘmes. Avant de commencer, vous devez vous assurer que vous avez installĂ© et configurĂ© l'AWS CLI, comme Terraform recherchera les informations d'identification dans le fichier ~ / .aws / credentials.

Il est recommandé de déployer l'intégralité de l'infrastructure avant de lancer la commande plan pour voir ce que Terraform crée pour nous dans le cloud:

terraform.exe plan

Vous serez invitĂ© Ă  entrer un numĂ©ro de tĂ©lĂ©phone pour lui envoyer des notifications. À ce stade, il est facultatif.


AprÚs avoir analysé le plan de travail du programme, nous pouvons commencer la création de ressources:

terraform.exe apply

AprÚs l'envoi de cette commande, il vous sera à nouveau demandé de saisir un numéro de téléphone, tapez «oui» lorsque la question sur l'exécution réelle des actions s'affiche. Cela vous permettra d'augmenter l'ensemble de l'infrastructure, d'effectuer tous les réglages nécessaires pour EC2, de déployer des fonctions lambda, etc.

Une fois que toutes les ressources ont été créées avec succÚs via le code Terraform, vous devez entrer dans les détails de l'application Kinesis Analytics (malheureusement, je n'ai pas trouvé comment le faire directement à partir du code).

Lancez l'application:


AprÚs cela, vous devez définir explicitement le nom du flux dans l'application en choisissant dans la liste déroulante:



Maintenant, tout est prĂȘt.

Test d'application


Quelle que soit la façon dont vous avez dĂ©ployĂ© le systĂšme, manuellement ou via le code Terraform, il fonctionnera de la mĂȘme maniĂšre.

Nous passons par SSH Ă  la machine virtuelle EC2 oĂč Kinesis Agent est installĂ© et exĂ©cutons le script api_caller.py

sudo ./api_caller.py TOKEN

Reste à attendre un SMS sur votre numéro:


SMS - le message arrive sur le téléphone en presque 1 minute:


Il reste à voir si les enregistrements sont sauvegardés dans la base de données DynamoDB pour une analyse ultérieure plus détaillée. Le tableau des billets d'avion contient quelque chose comme ceci:


Conclusion


Au cours du travail effectué, un systÚme de traitement des données en ligne basé sur Amazon Kinesis a été construit. Nous avons examiné les options d'utilisation de Kinesis Agent en conjonction avec Kinesis Data Streams et l'analyse en temps réel de Kinesis Analytics à l'aide de commandes SQL, ainsi que l'interaction d'Amazon Kinesis avec d'autres services AWS.

Nous avons déployé le systÚme ci-dessus de deux maniÚres: suffisamment longue et manuelle à partir du code Terraform.

L'intégralité du code source du projet est disponible dans mon référentiel sur GitHub , je vous suggÚre de vous familiariser avec celui-ci.

Je suis prĂȘt Ă  discuter de l'article avec plaisir, j'attends vos commentaires. J'espĂšre des critiques constructives.

Te souhaite du succĂšs!

All Articles