Integração da API Aviasales com Amazon Kinesis e simplicidade sem servidor

Olá Habr!

Você gosta de pilotar aviões? Adoro, mas no auto-isolamento também gostei de analisar os dados das passagens aéreas de um recurso bem conhecido - Aviasales.

Hoje, analisaremos o trabalho do Amazon Kinesis, construiremos um sistema de streaming com análises em tempo real, colocaremos o banco de dados Amazon DynamoDB NoSQL como o principal data warehouse e configuraremos alertas por SMS para tickets interessantes.

Todos os detalhes sob o corte! Vai!



Introdução


Por exemplo, precisamos acessar a API Aviasales . O acesso a ele é fornecido gratuitamente e sem restrições. Você só precisa se registrar na seção "Desenvolvedores" para obter seu token de API para acessar dados.

O principal objetivo deste artigo é fornecer uma compreensão geral do uso de informações de streaming na AWS, e deixamos de questionar que os dados retornados pela API usada não são estritamente relevantes e são transferidos do cache, que é gerado com base em pesquisas de usuários dos sites Aviasales.ru e Jetradar.com para últimas 48 horas.

Os dados de passagem aérea do agente Kinesis recebidos por meio da API instalada na máquina produtora serão analisados ​​e transferidos automaticamente para o fluxo desejado por meio do Kinesis Data Analytics. Uma versão não processada desse fluxo será gravada diretamente no repositório. A implantação do armazenamento de dados brutos no DynamoDB permitirá uma análise mais profunda dos tickets por meio de ferramentas de BI, como o AWS Quick Sight.

Consideraremos duas opções para implantar toda a infraestrutura:

  • Manual - por meio do AWS Management Console;
  • Infraestrutura do código Terraform - para engenheiros de automação preguiçosos;

Arquitetura do sistema em desenvolvimento



Componentes Utilizados:

  • API Aviasales - os dados retornados por essa API serão usados ​​para todo o trabalho subsequente;
  • Instância do produtor EC2 - uma máquina virtual comum na nuvem na qual o fluxo de dados de entrada será gerado:

    • O Kinesis Agent é um aplicativo Java instalado localmente que fornece uma maneira fácil de coletar e enviar dados para o Kinesis (Kinesis Data Streams ou Kinesis Firehose). O agente monitora constantemente um conjunto de arquivos nos diretórios especificados e envia novos dados ao Kinesis;
    • Script da API do chamador - um script Python que faz solicitações de API e coloca a resposta em uma pasta que o Kinesis Agent monitora;
  • Kinesis Data Streams — ;
  • Kinesis Analytics — , . Amazon Kinesis Data Analytics ;
  • AWS Lambda — , . ;
  • Amazon DynamoDB — «‑» , 10 . DynamoDB - , . DynamoDB , . ;
  • Amazon SNS — « — » (Pub/Sub), , . SNS push-, SMS- .

Treino inicial


Para emular o fluxo de dados, decidi usar as informações de passagem aérea retornadas pela API Aviasales. A documentação possui uma lista bastante extensa de métodos diferentes, use um deles - o “Calendário de Preços para o Mês”, que retorna preços para cada dia do mês, agrupados pelo número de transferências. Se você não transmitir o mês de pesquisa na solicitação, as informações serão retornadas para o mês seguinte ao atual.

Então, registre-se, pegue seu token.

Exemplo de solicitação abaixo:

http://api.travelpayouts.com/v2/prices/month-matrix?currency=rub&origin=LED&destination=HKT&show_to_affiliates=true&token=TOKEN_API

O método acima de receber dados da API com o token na solicitação funcionará, mas eu prefiro passar o token de acesso pelo cabeçalho, portanto, usaremos esse método no script api_caller.py.

Exemplo de resposta:

{{
   "success":true,
   "data":[{
      "show_to_affiliates":true,
      "trip_class":0,
      "origin":"LED",
      "destination":"HKT",
      "depart_date":"2015-10-01",
      "return_date":"",
      "number_of_changes":1,
      "value":29127,
      "found_at":"2015-09-24T00:06:12+04:00",
      "distance":8015,
      "actual":true
   }]
}

O exemplo de resposta da API acima mostra uma passagem de São Petersburgo para Phuk ... Ah, com o que sonhar ...
Como eu sou de Kazan e Phuket está "apenas sonhando conosco" agora, procuraremos passagens de São Petersburgo para Kazan.
Supõe-se que você já tenha uma conta da AWS. Quero prestar atenção especial imediatamente, pois o Kinesis e o envio de notificações por SMS não estão incluídos no nível gratuito anual (uso gratuito) . Mas, apesar disso, tendo alguns dólares em mente, é bem possível construir o sistema proposto e brincar com ele. E, é claro, não esqueça de excluir todos os recursos depois que eles se tornarem desnecessários.
Felizmente, as funções DynamoDb e lambda serão shareware para nós, se você permanecer dentro dos limites mensais gratuitos. Por exemplo, para o DynamoDB: 25 GB de armazenamento, 25 WCU / RCU e 100 milhões de solicitações. E um milhão de chamadas para funções lambda por mês.

Sistema de implantação manual


Configurando o Kinesis Data Streams


Acesse o serviço Kinesis Data Streams e crie dois novos fluxos, um shard para cada um.

O que é um fragmento?
— Amazon Kinesis. 1 / 2 /. 1000 PUT . . , . 2 / 4 / 2000 PUT .

Quanto mais fragmentos em seu fluxo, maior a taxa de transferência. Em princípio, os fluxos são dimensionados dessa maneira adicionando shards. Mas quanto mais fragmentos você tiver, maior será o preço. Cada fragmento custa 1,5 centavo por hora e 1,4 centavo adicional para cada milhão de unidades de carga útil de PUT.

Crie um novo thread com o nome airlines_tickets , 1 fragmento será suficiente para isso:


Agora crie outro fluxo chamado special_stream :


Configuração do Produtor


Como produtor de dados para analisar uma tarefa, basta usar uma instância regular do EC2. Ele não precisa ser uma máquina virtual cara e poderosa; o spot t2.micro é bastante adequado.

Nota importante: por exemplo, você deve usar image - Amazon Linux AMI 2018.03.0, com poucas configurações para iniciar rapidamente o Kinesis Agent.

Vá para o serviço EC2, crie uma nova máquina virtual, selecione a AMI desejada com o tipo t2.micro, incluído no nível gratuito:


Para que a máquina virtual recém-criada possa interagir com o serviço Kinesis, você deve conceder o direito de fazê-lo. A melhor maneira de fazer isso é atribuir uma função do IAM. Portanto, na tela Etapa 3: Configurar detalhes da instância, selecione Criar nova função do IAM :

Criando funções do IAM para EC2

, , EC2 Permissions:


, : AmazonKinesisFullAccess CloudWatchFullAccess.

- , : EC2-KinesisStreams-FullAccess. , , :


, :


.

Você pode deixar os parâmetros do disco rígido por padrão, também as tags (embora seja uma boa prática usar tags, pelo menos forneça o nome da instância e especifique o ambiente).

Agora, estamos na etapa 6: guia Configurar grupo de segurança, onde é necessário criar um novo ou especificar seu grupo de segurança existente, que permite a conexão via ssh (porta 22) à instância. Selecione Origem -> Meu IP lá e você pode executar a instância.


Depois de entrar no status de execução, você pode tentar se conectar a ele via ssh.

Para poder trabalhar com o Kinesis Agent, após uma conexão bem-sucedida à máquina, você deve inserir os seguintes comandos no terminal:

sudo yum -y update
sudo yum install -y python36 python36-pip
sudo /usr/bin/pip-3.6 install --upgrade pip
sudo yum install -y aws-kinesis-agent

Crie uma pasta para salvar as respostas da API:

sudo mkdir /var/log/airline_tickets

Antes de iniciar o agente, você precisa configurar sua configuração:

sudo vim /etc/aws-kinesis/agent.json

O conteúdo do arquivo agent.json deve ficar assim:

{
  "cloudwatch.emitMetrics": true,
  "kinesis.endpoint": "",
  "firehose.endpoint": "",

  "flows": [
    {
      "filePattern": "/var/log/airline_tickets/*log",
      "kinesisStream": "airline_tickets",
      "partitionKeyOption": "RANDOM",
      "dataProcessingOptions": [
         {
            "optionName": "CSVTOJSON",
            "customFieldNames": ["cost","trip_class","show_to_affiliates",
                "return_date","origin","number_of_changes","gate","found_at",
                "duration","distance","destination","depart_date","actual","record_id"]
         }
      ]
    }
  ]
}

Como você pode ver no arquivo de configuração, o agente irá monitorar arquivos com a extensão .log no diretório / var / log / airlines_tickets /, analisá-los e transferi-los para o fluxo da companhia aérea.

Reiniciamos o serviço e garantimos que ele inicie e funcione:

sudo service aws-kinesis-agent restart

Agora baixe o script Python que solicitará dados da API:

REPO_PATH=https://raw.githubusercontent.com/igorgorbenko/aviasales_kinesis/master/producer

wget $REPO_PATH/api_caller.py -P /home/ec2-user/
wget $REPO_PATH/requirements.txt -P /home/ec2-user/
sudo chmod a+x /home/ec2-user/api_caller.py
sudo /usr/local/bin/pip3 install -r /home/ec2-user/requirements.txt

O script api_caller.py solicita dados da Aviasales e salva a resposta recebida no diretório que o agente Kinesis verifica. A implementação desse script é bastante padrão; existe uma classe TicketsApi, que permite extrair a API de forma assíncrona. Nesta classe, passamos o cabeçalho com o token e os parâmetros de solicitação:

class TicketsApi:
    """Api caller class."""

    def __init__(self, headers):
        """Init method."""
        self.base_url = BASE_URL
        self.headers = headers

    async def get_data(self, data):
        """Get the data from API query."""
        response_json = {}
        async with ClientSession(headers=self.headers) as session:
            try:
                response = await session.get(self.base_url, data=data)
                response.raise_for_status()
                LOGGER.info('Response status %s: %s',
                            self.base_url, response.status)
                response_json = await response.json()
            except HTTPError as http_err:
                LOGGER.error('Oops! HTTP error occurred: %s', str(http_err))
            except Exception as err:
                LOGGER.error('Oops! An error ocurred: %s', str(err))
            return response_json


def prepare_request(api_token):
    """Return the headers and query fot the API request."""
    headers = {'X-Access-Token': api_token,
               'Accept-Encoding': 'gzip'}

    data = FormData()
    data.add_field('currency', CURRENCY)
    data.add_field('origin', ORIGIN)
    data.add_field('destination', DESTINATION)
    data.add_field('show_to_affiliates', SHOW_TO_AFFILIATES)
    data.add_field('trip_duration', TRIP_DURATION)
    return headers, data


async def main():
    """Get run the code."""
    if len(sys.argv) != 2:
        print('Usage: api_caller.py <your_api_token>')
        sys.exit(1)
        return
    api_token = sys.argv[1]
    headers, data = prepare_request(api_token)

    api = TicketsApi(headers)
    response = await api.get_data(data)
    if response.get('success', None):
        LOGGER.info('API has returned %s items', len(response['data']))
        try:
            count_rows = log_maker(response)
            LOGGER.info('%s rows have been saved into %s',
                        count_rows,
                        TARGET_FILE)
        except Exception as e:
            LOGGER.error('Oops! Request result was not saved to file. %s',
                         str(e))
    else:
        LOGGER.error('Oops! API request was unsuccessful %s!', response)

Para testar as configurações e operacionalidade corretas do agente, faremos uma execução de teste do script api_caller.py:

sudo ./api_caller.py TOKEN


E analisamos o resultado do trabalho nos logs do agente e na guia Monitoramento no fluxo de dados da companhia aérea_tickets:

tail -f /var/log/aws-kinesis-agent/aws-kinesis-agent.log



Como você pode ver, tudo funciona e o Kinesis Agent envia com êxito dados ao fluxo. Agora configure consumidor.

Configurando o Kinesis Data Analytics


Vamos para o componente central de todo o sistema - crie um novo aplicativo no Kinesis Data Analytics chamado kinesis_analytics_airlines_app:


O Kinesis Data Analytics permite análises em tempo real dos dados do Kinesis Streams usando SQL. Este é um serviço totalmente escalonável automaticamente (diferente do Kinesis Streams), que:

  1. permite criar novos fluxos (fluxo de saída) com base em consultas aos dados de origem;
  2. fornece um fluxo com erros que ocorreram durante a operação do aplicativo (fluxo de erro);
  3. Ele pode determinar automaticamente o esquema de dados de entrada (pode ser redefinido manualmente, se necessário).

Este é um serviço caro - 0,11 USD por hora, portanto, você deve usá-lo com cuidado e removê-lo quando concluir o trabalho.

Conecte o aplicativo à fonte de dados:


Selecione o fluxo ao qual você deseja se conectar (airlines_tickets):


Em seguida, você precisa anexar a nova função do IAM para que o aplicativo possa ler e transmitir no fluxo. Para fazer isso, basta não alterar nada no bloco de permissões de acesso:


Agora solicitamos a descoberta do esquema de dados no fluxo, para isso, clicamos no botão "Discover schema". Como resultado, a função do IAM será atualizada (uma nova será criada) e a descoberta do esquema a partir dos dados que já chegaram no fluxo será iniciada:


Agora você precisa ir para o editor SQL. Quando você clica nesse botão, uma janela com uma pergunta sobre o lançamento do aplicativo será exibida - escolha o que queremos executar:


Na janela do editor SQL, insira uma consulta tão simples e clique em Salvar e Executar SQL:

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ("cost" DOUBLE, "gate" VARCHAR(16));

CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM "cost", "gate"
FROM "SOURCE_SQL_STREAM_001"
WHERE "cost" < 5000
    and "gate" = 'Aeroflot';

Nos bancos de dados relacionais, você trabalha com tabelas usando instruções INSERT para adicionar registros e uma instrução SELECT para consultar dados. No Amazon Kinesis Data Analytics, você trabalha com fluxos (STREAM) e "bombas" (PUMP) - solicitações de inserção contínua que inserem dados de um fluxo em um aplicativo em outro fluxo.

Na consulta SQL acima, os bilhetes Aeroflot são pesquisados ​​a preços abaixo de cinco mil rublos. Todos os registros que se enquadram nessas condições serão colocados no fluxo DESTINATION_SQL_STREAM.


No bloco Destino, selecione o fluxo special_stream e, na lista suspensa Nome do fluxo no aplicativo DESTINATION_SQL_STREAM:


Como resultado de todas as manipulações, algo semelhante à figura abaixo deve acontecer:



Criando e assinando o tópico SNS


Vá para o Serviço de notificação simples e crie um novo tópico chamado Companhias aéreas:


Assinamos este tópico, nele indicamos o número do celular para o qual as notificações por SMS chegarão:


Criando uma tabela no DynamoDB


Para armazenar os dados brutos do fluxo airlines_tickets, crie uma tabela no DynamoDB com o mesmo nome. Como chave primária, usaremos record_id:


Criando uma função de coletor lambda


Vamos criar uma função lambda chamada Collector, cuja tarefa é pesquisar o fluxo da companhia aérea_tickets e, se houver novos registros, inserir esses registros na tabela do DynamoDB. Obviamente, além dos direitos padrão, esse lambda deve ter acesso para ler o fluxo de dados do Kinesis e gravar no DynamoDB.

Criando uma função do IAM para uma função de coletor lambda
IAM Lambda-TicketsProcessingRole:


AmazonKinesisReadOnlyAccess AmazonDynamoDBFullAccess, :



Esse lambda deve ser acionado por um gatilho Kinesis quando novas entradas atingirem o fluxo airlines_stream, portanto, você precisa adicionar um novo gatilho:



Resta inserir o código e salvar o lambda.

"""Parsing the stream and inserting into the DynamoDB table."""
import base64
import json
import boto3
from decimal import Decimal

DYNAMO_DB = boto3.resource('dynamodb')
TABLE_NAME = 'airline_tickets'

class TicketsParser:
    """Parsing info from the Stream."""

    def __init__(self, table_name, records):
        """Init method."""
        self.table = DYNAMO_DB.Table(table_name)
        self.json_data = TicketsParser.get_json_data(records)

    @staticmethod
    def get_json_data(records):
        """Return deserialized data from the stream."""
        decoded_record_data = ([base64.b64decode(record['kinesis']['data'])
                                for record in records])
        json_data = ([json.loads(decoded_record)
                      for decoded_record in decoded_record_data])
        return json_data

    @staticmethod
    def get_item_from_json(json_item):
        """Pre-process the json data."""
        new_item = {
            'record_id': json_item.get('record_id'),
            'cost': Decimal(json_item.get('cost')),
            'trip_class': json_item.get('trip_class'),
            'show_to_affiliates': json_item.get('show_to_affiliates'),
            'origin': json_item.get('origin'),
            'number_of_changes': int(json_item.get('number_of_changes')),
            'gate': json_item.get('gate'),
            'found_at': json_item.get('found_at'),
            'duration': int(json_item.get('duration')),
            'distance': int(json_item.get('distance')),
            'destination': json_item.get('destination'),
            'depart_date': json_item.get('depart_date'),
            'actual': json_item.get('actual')
        }
        return new_item

    def run(self):
        """Batch insert into the table."""
        with self.table.batch_writer() as batch_writer:
            for item in self.json_data:
                dynamodb_item = TicketsParser.get_item_from_json(item)
                batch_writer.put_item(dynamodb_item)

        print('Has been added ', len(self.json_data), 'items')

def lambda_handler(event, context):
    """Parse the stream and insert into the DynamoDB table."""
    print('Got event:', event)
    parser = TicketsParser(TABLE_NAME, event['Records'])
    parser.run()

Criando um Notificador de Função Lambda


A segunda função lambda, que monitorará o segundo fluxo (special_stream) e enviará uma notificação ao SNS, é criada da mesma maneira. Portanto, esse lambda deve ter acesso de leitura do Kinesis e enviar mensagens para o tópico especificado do SNS, que é então enviado pelo serviço SNS a todos os assinantes deste tópico (email, SMS, etc.).

Criar funções do IAM
IAM Lambda-KinesisAlarm , alarm_notifier:



Esse lambda deve funcionar de acordo com o gatilho para que novas entradas entrem no special_stream, portanto, você deve configurar o gatilho da mesma maneira que fizemos no lambda do Collector.

Para facilitar a configuração desse lambda, apresentamos uma nova variável de ambiente - TOPIC_ARN, onde colocamos o tópico ANR (Amazon Recourse Names) de Airlines:


E insira o código lambda, é muito simples:

import boto3
import base64
import os

SNS_CLIENT = boto3.client('sns')
TOPIC_ARN = os.environ['TOPIC_ARN']


def lambda_handler(event, context):
    try:
        SNS_CLIENT.publish(TopicArn=TOPIC_ARN,
                           Message='Hi! I have found an interesting stuff!',
                           Subject='Airline tickets alarm')
        print('Alarm message has been successfully delivered')
    except Exception as err:
        print('Delivery failure', str(err))

Parece que isso completa a configuração manual do sistema. Resta apenas testar e garantir que configuramos tudo corretamente.

Implantar a partir do código Terraform


Preparação necessária


O Terraform é uma ferramenta de código aberto muito conveniente para implantar infraestrutura a partir do código. Ele possui sua própria sintaxe, fácil de aprender e muitos exemplos de como e o que implantar. Existem muitos plugins convenientes no editor de código Atom ou Visual Studio que facilitam o trabalho com o Terraform.

Você pode baixar o kit de distribuição aqui . Uma análise detalhada de todos os recursos do Terraform está além do escopo deste artigo, portanto, nos restringiremos aos pontos principais.

Como começar


O código completo do projeto está no meu repositório . Clonamos um repositório para nós mesmos. Antes de iniciar, você deve certificar-se de ter instalado e configurado o AWS CLI, conforme O Terraform procurará credenciais no arquivo ~ / .aws / credentials.

É uma boa prática implantar toda a infraestrutura antes de iniciar o comando plan para ver o que a Terraform está criando para nós na nuvem:

terraform.exe plan

Você será solicitado a inserir um número de telefone para enviar notificações a ele. Nesta fase, é opcional.


Após analisar o plano de trabalho do programa, podemos começar a criação de recursos:

terraform.exe apply

Após o envio deste comando, você será solicitado a inserir um número de telefone novamente, digite "yes" quando a pergunta sobre a execução real das ações for exibida. Isso permitirá aumentar toda a infraestrutura, realizar todas as configurações necessárias para o EC2, implantar funções lambda etc.

Depois que todos os recursos foram criados com sucesso por meio do código Terraform, é necessário entrar nos detalhes do aplicativo Kinesis Analytics (infelizmente, não encontrei como fazer isso diretamente no código).

Inicie o aplicativo:


Depois disso, você deve definir explicitamente o nome do fluxo no aplicativo escolhendo na lista suspensa:



Agora está tudo pronto para ir.

Teste de Aplicação


Independentemente de como você implantou o sistema, manualmente ou através do código Terraform, ele funcionará da mesma maneira.

Passamos pelo SSH até a máquina virtual EC2 onde o Kinesis Agent está instalado e executamos o script api_caller.py

sudo ./api_caller.py TOKEN

Resta aguardar um SMS no seu número:


SMS - a mensagem chega ao telefone em quase 1 minuto:


Resta saber se os registros são salvos no banco de dados do DynamoDB para análises subsequentes e mais detalhadas. A tabela airlines_tickets contém algo como isto:


Conclusão


No decorrer do trabalho, foi construído um sistema de processamento de dados on-line baseado no Amazon Kinesis. Examinamos as opções de uso do Kinesis Agent em conjunto com o Kinesis Data Streams e a análise em tempo real do Kinesis Analytics usando comandos SQL, bem como a interação do Amazon Kinesis com outros serviços da AWS.

Implementamos o sistema acima de duas maneiras: manual o suficiente e rápido a partir do código Terraform.

Todo o código fonte do projeto está disponível no meu repositório no GitHub . Sugiro que você se familiarize com ele.

Estou pronto para discutir o artigo com prazer, estou aguardando seus comentários. Espero por críticas construtivas.

Desejo-lhe sucesso!

All Articles