Integración API de Aviasales con Amazon Kinesis y simplicidad sin servidor

Hola Habr!

¿Te gusta volar aviones? Me encanta, pero en el autoaislamiento también me gustaba analizar los datos de tarifas aéreas de un recurso conocido: Aviasales.

Hoy analizaremos el trabajo de Amazon Kinesis, crearemos un sistema de transmisión con análisis en tiempo real, colocaremos la base de datos Amazon DynamoDB NoSQL como el almacén de datos principal y configuraremos alertas por SMS para tickets interesantes.

¡Todos los detalles debajo del corte! ¡Vamos!



Introducción


Por ejemplo, necesitamos acceso a la API de Aviasales . El acceso a él se proporciona de forma gratuita y sin restricciones, solo necesita registrarse en la sección "Desarrolladores" para obtener su token API para acceder a los datos.

El objetivo principal de este artículo es proporcionar una comprensión general del uso de la información de transmisión en AWS, lo que hace que la información devuelta por la API utilizada no sea estrictamente relevante y se transfiera desde la memoria caché, que se genera en base a búsquedas de usuarios de los sitios Aviasales.ru y Jetradar.com para últimas 48 horas

Los datos de los boletos de avión de Kinesis-agent recibidos a través de la API instalada en la máquina del productor se analizarán automáticamente y se transferirán a la transmisión deseada a través de Kinesis Data Analytics. Una versión no procesada de esta secuencia se escribirá directamente en el repositorio. Implementado en el almacenamiento de datos sin procesar de DynamoDB permitirá un análisis más profundo de los tickets a través de herramientas de BI, como AWS Quick Sight.

Consideraremos dos opciones para implementar toda la infraestructura:

  • Manual: a través de la consola de administración de AWS;
  • Infraestructura del código Terraform - para ingenieros de automatización perezosos;

Arquitectura del sistema en desarrollo.



Componentes utilizados:

  • API de Aviasales : los datos devueltos por esta API se utilizarán para todo el trabajo posterior;
  • Instancia de productor EC2 : una máquina virtual normal en la nube en la que se generará el flujo de datos de entrada:

    • Kinesis Agent es una aplicación Java instalada localmente que proporciona una forma sencilla de recopilar y enviar datos a Kinesis (Kinesis Data Streams o Kinesis Firehose). El agente monitorea constantemente un conjunto de archivos en los directorios especificados y envía nuevos datos a Kinesis;
    • Caller API Script : un script de Python que realiza solicitudes de API y coloca la respuesta en una carpeta que Kinesis Agent monitorea;
  • Kinesis Data Streams — ;
  • Kinesis Analytics — , . Amazon Kinesis Data Analytics ;
  • AWS Lambda — , . ;
  • Amazon DynamoDB — «‑» , 10 . DynamoDB - , . DynamoDB , . ;
  • Amazon SNS — « — » (Pub/Sub), , . SNS push-, SMS- .

Entrenamiento inicial


Para emular el flujo de datos, decidí usar la información de la tarifa aérea devuelta por la API de Aviasales. La documentación tiene una lista bastante extensa de diferentes métodos, tome uno de ellos: el "Calendario de precios para el mes", que devuelve los precios para cada día del mes, agrupados por la cantidad de transferencias. Si no transmite el mes de búsqueda en la solicitud, se devolverá la información del mes siguiente al actual.

Entonces, regístrese, obtenga su token.

Ejemplo de solicitud a continuación:

http://api.travelpayouts.com/v2/prices/month-matrix?currency=rub&origin=LED&destination=HKT&show_to_affiliates=true&token=TOKEN_API

El método anterior de recibir datos de la API con el token en la solicitud funcionará, pero prefiero pasar el token de acceso a través del encabezado, por lo que utilizaremos este método en el script api_caller.py.

Ejemplo de respuesta:

{{
   "success":true,
   "data":[{
      "show_to_affiliates":true,
      "trip_class":0,
      "origin":"LED",
      "destination":"HKT",
      "depart_date":"2015-10-01",
      "return_date":"",
      "number_of_changes":1,
      "value":29127,
      "found_at":"2015-09-24T00:06:12+04:00",
      "distance":8015,
      "actual":true
   }]
}

El ejemplo de respuesta API anterior muestra un boleto de San Petersburgo a Phuk ... Oh, con qué soñar ...
Ya que soy de Kazan, y Phuket "solo está soñando con nosotros" ahora, buscaremos boletos de San Petersburgo a Kazan.
Se supone que ya tiene una cuenta de AWS. Quiero prestar especial atención de inmediato para que Kinesis y el envío de notificaciones por SMS no estén incluidos en el nivel gratuito anual (uso gratuito) . Pero incluso a pesar de esto, teniendo en cuenta un par de dólares, es muy posible construir el sistema propuesto y jugar con él. Y, por supuesto, no olvide eliminar todos los recursos después de que sean innecesarios.
Afortunadamente, las funciones de DynamoDb y lambda serán shareware para nosotros si se mantiene dentro de los límites gratuitos mensuales. Por ejemplo, para DynamoDB: 25 GB de almacenamiento, 25 WCU / RCU y 100 millones de solicitudes. Y un millón de llamadas a funciones lambda por mes.

Sistema de despliegue manual


Configurar flujos de datos de Kinesis


Vaya al servicio Kinesis Data Streams y cree dos nuevas transmisiones, un fragmento para cada una.

¿Qué es un fragmento?
— Amazon Kinesis. 1 / 2 /. 1000 PUT . . , . 2 / 4 / 2000 PUT .

Cuantos más fragmentos haya en su transmisión, mayor será su rendimiento. En principio, las secuencias se escalan de esta manera agregando fragmentos. Pero cuantos más fragmentos tenga, mayor será el precio. Cada fragmento cuesta 1,5 centavos por hora y 1,4 centavos adicionales por cada millón de unidades de carga PUT.

Cree un nuevo hilo con el nombre airline_tickets , 1 fragmento será suficiente para ello:


Ahora cree otra secuencia llamada special_stream :


Configuración del productor


Como productor de datos para analizar una tarea, es suficiente usar una instancia EC2 normal. No tiene que ser una máquina virtual poderosa y costosa; spot t2.micro es bastante adecuado.

Nota importante: por ejemplo, debe usar la imagen: Amazon Linux AMI 2018.03.0, ya que hay menos configuraciones para iniciar rápidamente el Agente Kinesis.

Vaya al servicio EC2, cree una nueva máquina virtual, seleccione la AMI deseada con el tipo t2.micro, que se incluye en el nivel gratuito:


Para que la máquina virtual recién creada pueda interactuar con el servicio de Kinesis, debe otorgarle el derecho de hacerlo. La mejor manera de hacerlo es asignar un rol de IAM. Por lo tanto, en el Paso 3: pantalla Configurar detalles de instancia, seleccione Crear nuevo rol de IAM :

Creación de roles de IAM para EC2

, , EC2 Permissions:


, : AmazonKinesisFullAccess CloudWatchFullAccess.

- , : EC2-KinesisStreams-FullAccess. , , :


, :


.

Puede dejar los parámetros del disco duro de forma predeterminada, también las etiquetas (aunque es una buena práctica usar etiquetas, al menos dé el nombre de la instancia y especifique el entorno).

Ahora estamos en el Paso 6: pestaña Configurar grupo de seguridad, donde debe crear uno nuevo o especificar su grupo de seguridad existente, que le permite conectarse a través de ssh (puerto 22) a la instancia. Seleccione Fuente -> Mi IP allí y puede ejecutar la instancia.


Una vez que ingresa al estado de ejecución, puede intentar conectarse a través de ssh.

Para poder trabajar con Kinesis Agent, después de una conexión exitosa a la máquina, debe ingresar los siguientes comandos en el terminal:

sudo yum -y update
sudo yum install -y python36 python36-pip
sudo /usr/bin/pip-3.6 install --upgrade pip
sudo yum install -y aws-kinesis-agent

Cree una carpeta para guardar las respuestas de la API:

sudo mkdir /var/log/airline_tickets

Antes de iniciar el agente, debe configurar su configuración:

sudo vim /etc/aws-kinesis/agent.json

El contenido del archivo agent.json debería verse así:

{
  "cloudwatch.emitMetrics": true,
  "kinesis.endpoint": "",
  "firehose.endpoint": "",

  "flows": [
    {
      "filePattern": "/var/log/airline_tickets/*log",
      "kinesisStream": "airline_tickets",
      "partitionKeyOption": "RANDOM",
      "dataProcessingOptions": [
         {
            "optionName": "CSVTOJSON",
            "customFieldNames": ["cost","trip_class","show_to_affiliates",
                "return_date","origin","number_of_changes","gate","found_at",
                "duration","distance","destination","depart_date","actual","record_id"]
         }
      ]
    }
  ]
}

Como puede ver en el archivo de configuración, el agente supervisará los archivos con la extensión .log en el directorio / var / log / airline_tickets /, los analizará y los transferirá a la secuencia airline_tickets.

Reiniciamos el servicio y nos aseguramos de que se inicie y funcione:

sudo service aws-kinesis-agent restart

Ahora descargue el script de Python que solicitará datos de la API:

REPO_PATH=https://raw.githubusercontent.com/igorgorbenko/aviasales_kinesis/master/producer

wget $REPO_PATH/api_caller.py -P /home/ec2-user/
wget $REPO_PATH/requirements.txt -P /home/ec2-user/
sudo chmod a+x /home/ec2-user/api_caller.py
sudo /usr/local/bin/pip3 install -r /home/ec2-user/requirements.txt

El script api_caller.py solicita datos de Aviasales y guarda la respuesta recibida en el directorio que escanea el agente de Kinesis. La implementación de este script es bastante estándar, hay una clase TicketsApi, le permite extraer la API de forma asincrónica. En esta clase, pasamos el encabezado con el token y los parámetros de solicitud:

class TicketsApi:
    """Api caller class."""

    def __init__(self, headers):
        """Init method."""
        self.base_url = BASE_URL
        self.headers = headers

    async def get_data(self, data):
        """Get the data from API query."""
        response_json = {}
        async with ClientSession(headers=self.headers) as session:
            try:
                response = await session.get(self.base_url, data=data)
                response.raise_for_status()
                LOGGER.info('Response status %s: %s',
                            self.base_url, response.status)
                response_json = await response.json()
            except HTTPError as http_err:
                LOGGER.error('Oops! HTTP error occurred: %s', str(http_err))
            except Exception as err:
                LOGGER.error('Oops! An error ocurred: %s', str(err))
            return response_json


def prepare_request(api_token):
    """Return the headers and query fot the API request."""
    headers = {'X-Access-Token': api_token,
               'Accept-Encoding': 'gzip'}

    data = FormData()
    data.add_field('currency', CURRENCY)
    data.add_field('origin', ORIGIN)
    data.add_field('destination', DESTINATION)
    data.add_field('show_to_affiliates', SHOW_TO_AFFILIATES)
    data.add_field('trip_duration', TRIP_DURATION)
    return headers, data


async def main():
    """Get run the code."""
    if len(sys.argv) != 2:
        print('Usage: api_caller.py <your_api_token>')
        sys.exit(1)
        return
    api_token = sys.argv[1]
    headers, data = prepare_request(api_token)

    api = TicketsApi(headers)
    response = await api.get_data(data)
    if response.get('success', None):
        LOGGER.info('API has returned %s items', len(response['data']))
        try:
            count_rows = log_maker(response)
            LOGGER.info('%s rows have been saved into %s',
                        count_rows,
                        TARGET_FILE)
        except Exception as e:
            LOGGER.error('Oops! Request result was not saved to file. %s',
                         str(e))
    else:
        LOGGER.error('Oops! API request was unsuccessful %s!', response)

Para probar la configuración correcta y la operatividad del agente, haremos una prueba de ejecución del script api_caller.py:

sudo ./api_caller.py TOKEN


Y observamos el resultado del trabajo en los registros del Agente y en la pestaña Monitoreo en el flujo de datos de airline_tickets:

tail -f /var/log/aws-kinesis-agent/aws-kinesis-agent.log



Como puede ver, todo funciona y el Agente Kinesis envía con éxito los datos a la transmisión. Ahora configure el consumidor.

Configurar Kinesis Data Analytics


Pasemos al componente central de todo el sistema: cree una nueva aplicación en Kinesis Data Analytics llamada kinesis_analytics_airlines_app:


Kinesis Data Analytics permite el análisis en tiempo real de los datos de Kinesis Streams utilizando SQL. Este es un servicio totalmente autoescalable (a diferencia de Kinesis Streams), que:

  1. le permite crear nuevas secuencias (secuencia de salida) basadas en consultas a los datos de origen;
  2. proporciona una secuencia con errores que ocurrieron durante el funcionamiento de la aplicación (secuencia de errores);
  3. Puede determinar automáticamente el esquema de datos de entrada (puede redefinirse manualmente si es necesario).

Este es un servicio costoso: 0,11 USD por hora, por lo que debe usarlo con cuidado y eliminarlo cuando complete el trabajo.

Conecte la aplicación a la fuente de datos:


Seleccione la transmisión a la que desea conectarse (airline_tickets):


A continuación, debe adjuntar el nuevo rol de IAM para que la aplicación pueda leer de la transmisión y escribir en ella. Para hacer esto, es suficiente no cambiar nada en el bloque de permisos de acceso:


Ahora solicitamos el descubrimiento del esquema de datos en la secuencia, para esto hacemos clic en el botón "Descubrir esquema". Como resultado, se actualizará el rol de IAM (se creará uno nuevo) y se lanzará el descubrimiento del esquema a partir de los datos que ya han llegado a la secuencia:


Ahora debe ir al editor de SQL. Cuando haga clic en este botón, aparecerá una ventana con una pregunta sobre el inicio de la aplicación: elija lo que queremos ejecutar:


En la ventana del editor de SQL, inserte una consulta tan simple y haga clic en Guardar y ejecutar SQL:

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ("cost" DOUBLE, "gate" VARCHAR(16));

CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM "cost", "gate"
FROM "SOURCE_SQL_STREAM_001"
WHERE "cost" < 5000
    and "gate" = 'Aeroflot';

En las bases de datos relacionales, trabaja con tablas utilizando instrucciones INSERT para agregar registros y una instrucción SELECT para consultar datos. En Amazon Kinesis Data Analytics, usted trabaja con flujos (STREAM) y "bombas" (PUMP): solicitudes de inserción continua que insertan datos de un flujo en una aplicación a otro flujo.

En la consulta SQL anterior, los boletos de Aeroflot se buscan a precios inferiores a cinco mil rublos. Todos los registros que se encuentren en estas condiciones se colocarán en la secuencia DESTINATION_SQL_STREAM.


En el bloque Destino, seleccione la secuencia de flujo especial, y en la lista desplegable Nombre del flujo en la aplicación DESTINATION_SQL_STREAM:


Como resultado de todas las manipulaciones, debería resultar algo similar a la imagen a continuación:



Crear y suscribirse al tema SNS


Vaya al Servicio de notificación simple y cree un nuevo tema llamado Aerolíneas:


Nos suscribimos a este tema, en él indicamos el número de teléfono móvil al que llegarán las notificaciones por SMS:


Crear una tabla en DynamoDB


Para almacenar los datos sin procesar de su flujo de airline_tickets, cree una tabla en DynamoDB con el mismo nombre. Como clave principal, usaremos record_id:


Crear una función de colector lambda


Creemos una función lambda llamada Collector, cuya tarea es sondear el flujo de airline_tickets y, si hay nuevos registros allí, insertar estos registros en la tabla DynamoDB. Obviamente, además de los derechos predeterminados, esta lambda debe tener acceso para leer el flujo de datos de Kinesis y escribir en DynamoDB.

Crear un rol de IAM para una función de recopilador lambda
IAM Lambda-TicketsProcessingRole:


AmazonKinesisReadOnlyAccess AmazonDynamoDBFullAccess, :



Esta lambda debe ser activada por un activador de Kinesis cuando nuevas entradas lleguen a la línea stream_stream, por lo que debe agregar un nuevo activador:



Queda por insertar el código y guardar la lambda.

"""Parsing the stream and inserting into the DynamoDB table."""
import base64
import json
import boto3
from decimal import Decimal

DYNAMO_DB = boto3.resource('dynamodb')
TABLE_NAME = 'airline_tickets'

class TicketsParser:
    """Parsing info from the Stream."""

    def __init__(self, table_name, records):
        """Init method."""
        self.table = DYNAMO_DB.Table(table_name)
        self.json_data = TicketsParser.get_json_data(records)

    @staticmethod
    def get_json_data(records):
        """Return deserialized data from the stream."""
        decoded_record_data = ([base64.b64decode(record['kinesis']['data'])
                                for record in records])
        json_data = ([json.loads(decoded_record)
                      for decoded_record in decoded_record_data])
        return json_data

    @staticmethod
    def get_item_from_json(json_item):
        """Pre-process the json data."""
        new_item = {
            'record_id': json_item.get('record_id'),
            'cost': Decimal(json_item.get('cost')),
            'trip_class': json_item.get('trip_class'),
            'show_to_affiliates': json_item.get('show_to_affiliates'),
            'origin': json_item.get('origin'),
            'number_of_changes': int(json_item.get('number_of_changes')),
            'gate': json_item.get('gate'),
            'found_at': json_item.get('found_at'),
            'duration': int(json_item.get('duration')),
            'distance': int(json_item.get('distance')),
            'destination': json_item.get('destination'),
            'depart_date': json_item.get('depart_date'),
            'actual': json_item.get('actual')
        }
        return new_item

    def run(self):
        """Batch insert into the table."""
        with self.table.batch_writer() as batch_writer:
            for item in self.json_data:
                dynamodb_item = TicketsParser.get_item_from_json(item)
                batch_writer.put_item(dynamodb_item)

        print('Has been added ', len(self.json_data), 'items')

def lambda_handler(event, context):
    """Parse the stream and insert into the DynamoDB table."""
    print('Got event:', event)
    parser = TicketsParser(TABLE_NAME, event['Records'])
    parser.run()

Crear un notificador de función lambda


La segunda función lambda, que supervisará la segunda secuencia (special_stream) y enviará una notificación a SNS, se crea de la misma manera. Por lo tanto, esta lambda debe tener acceso de lectura desde Kinesis y enviar mensajes al tema SNS especificado, que luego es enviado por el servicio SNS a todos los suscriptores de este tema (correo electrónico, SMS, etc.).

Crear roles de IAM
IAM Lambda-KinesisAlarm , alarm_notifier:



Esta lambda debería funcionar de acuerdo con el activador para que nuevas entradas ingresen a special_stream, por lo que debe configurar el activador de la misma manera que lo hicimos para el recopilador lambda.

Para la conveniencia de configurar este lambda, presentamos una nueva variable de entorno: TOPIC_ARN, donde colocamos el tema ANR (Nombres de recurso de Amazon) de Aerolíneas:


E inserte el código lambda, es muy simple:

import boto3
import base64
import os

SNS_CLIENT = boto3.client('sns')
TOPIC_ARN = os.environ['TOPIC_ARN']


def lambda_handler(event, context):
    try:
        SNS_CLIENT.publish(TopicArn=TOPIC_ARN,
                           Message='Hi! I have found an interesting stuff!',
                           Subject='Airline tickets alarm')
        print('Alarm message has been successfully delivered')
    except Exception as err:
        print('Delivery failure', str(err))

Parece que esto completa la configuración manual del sistema. Solo queda probar y asegurarnos de que configuramos todo correctamente.

Implementar desde código Terraform


Preparación necesaria


Terraform es una herramienta de código abierto muy conveniente para implementar infraestructura desde el código. Tiene su propia sintaxis que es fácil de aprender y muchos ejemplos de cómo y qué implementar. Hay muchos complementos convenientes en el editor de Atom o Visual Studio Code que hacen que sea más fácil trabajar con Terraform.

Puede descargar el kit de distribución desde aquí . Un análisis detallado de todas las características de Terraform está más allá del alcance de este artículo, por lo que nos limitaremos a los puntos principales.

Cómo empezar


El código completo del proyecto está en mi repositorio . Clonamos un repositorio para nosotros mismos. Antes de comenzar, debe asegurarse de haber instalado y configurado AWS CLI, como Terraform buscará credenciales en el archivo ~ / .aws / credentials.

Es una buena práctica implementar toda la infraestructura antes de iniciar el comando de plan para ver qué nos está creando Terraform en la nube:

terraform.exe plan

Se le pedirá que ingrese un número de teléfono para enviarle notificaciones. En esta etapa, es opcional.


Después de analizar el plan de trabajo del programa, podemos comenzar la creación de recursos:

terraform.exe apply

Después de enviar este comando, nuevamente se le pedirá que ingrese un número de teléfono, escriba "sí" cuando aparezca la pregunta sobre la ejecución real de las acciones. Esto le permitirá elevar toda la infraestructura, llevar a cabo todas las configuraciones necesarias para EC2, implementar funciones lambda, etc.

Después de que todos los recursos se hayan creado con éxito a través del código Terraform, debe entrar en los detalles de la aplicación Kinesis Analytics (desafortunadamente, no encontré cómo hacerlo directamente desde el código).

Iniciar la aplicacion:


Después de eso, debe establecer explícitamente el nombre del flujo en la aplicación eligiendo de la lista desplegable:



Ahora todo está listo para funcionar.

Prueba de aplicación


Independientemente de cómo haya implementado el sistema, manualmente o mediante el código Terraform, funcionará igual.

Pasamos por SSH a la máquina virtual EC2 donde está instalado Kinesis Agent y ejecutamos el script api_caller.py

sudo ./api_caller.py TOKEN

Queda por esperar un SMS a su número:


SMS: el mensaje llega al teléfono en casi 1 minuto:


Queda por ver si los registros se guardan en la base de datos DynamoDB para su posterior análisis más detallado. La tabla airline_tickets contiene algo como esto:


Conclusión


En el curso del trabajo realizado, se creó un sistema de procesamiento de datos en línea basado en Amazon Kinesis. Examinamos las opciones para usar Kinesis Agent junto con Kinesis Data Streams y análisis en tiempo real de Kinesis Analytics usando comandos SQL, así como la interacción de Amazon Kinesis con otros servicios de AWS.

Implementamos el sistema anterior de dos maneras: lo suficientemente manual y rápido desde el código Terraform.

El código fuente completo del proyecto está disponible en mi repositorio en GitHub , le sugiero que se familiarice con él.

Estoy listo para discutir el artículo con gusto, espero sus comentarios. Espero una crítica constructiva.

¡Le deseo éxito!

All Articles