Aviasales API与Amazon Kinesis的集成以及无服务器的简单性

哈Ha!

你喜欢坐飞机吗?我喜欢它,但出于自我隔离,我也喜欢分析一种著名资源Aviasales的机票数据。

今天,我们将分析Amazon Kinesis的工作,构建具有实时分析的流系统,将Amazon DynamoDB NoSQL数据库作为主要数据仓库,并为有趣的门票设置SMS警报。

所有细节都砍下!走!



介绍


例如,我们需要访问Aviasales API免费提供对它的访问,没有任何限制,您只需要在“开发人员”部分进行注册即可获取用于访问数据的API令牌。

本文的主要目的是提供对AWS中流信息的使用的一般理解,我们摆脱了所用API返回的数据并非严格相关,而是从缓存中传输的问题,缓存是根据Aviasales.ru和Jetradar.com网站用户的搜索生成的持续48小时。

通过生产者计算机上安装的API接收到的Kinesis-agent机票数据将被自动解析,并通过Kinesis Data Analytics传输到所需的流中。该流的未处理版本将直接写入存储库。部署在DynamoDB中的原始数据存储将允许通过BI工具(例如AWS Quick Sight)对票证进行更深入的分析。

我们将考虑两种方法来部署整个基础结构:

  • 手册-通过AWS管理控制台;
  • Terraform代码的基础结构-适用于懒惰的自动化工程师;

正在开发的系统架构



使用的组件:

  • Aviasales API-此API返回的数据将用于所有后续工作;
  • EC2生产者实例 -云中的常规虚拟机,将在其上生成输入数据流:

    • Kinesis Agent是本地安装的Java应用程序,它提供了一种简单的方法来收集数据并将其发送到Kinesis(Kinesis数据流或Kinesis Firehose)。代理会不断监视指定目录中的一组文件,并将新数据发送到Kinesis。
    • 调用者API脚本 -一个Python脚本,它发出API请求并将响应放入Kinesis Agent监视的文件夹中;
  • Kinesis Data Streams — ;
  • Kinesis Analytics — , . Amazon Kinesis Data Analytics ;
  • AWS Lambda — , . ;
  • Amazon DynamoDB — «‑» , 10 . DynamoDB - , . DynamoDB , . ;
  • Amazon SNS — « — » (Pub/Sub), , . SNS push-, SMS- .

初步训练


为了模拟数据流,我决定使用Aviasales API返回的机票信息。文档提供了相当广泛的各种方法列表,其中之一是“本月的价格日历”,它返回每月的每一天的价格,并按转账次数分组。如果您未在请求中传送搜索月份,则将返回当前月份之后的月份的信息。

因此,注册并获取令牌。

以下示例请求:

http://api.travelpayouts.com/v2/prices/month-matrix?currency=rub&origin=LED&destination=HKT&show_to_affiliates=true&token=TOKEN_API

上面的通过请求中的令牌从API接收数据的方法可以使用,但是我更喜欢通过标头传递访问令牌,因此我们将在api_caller.py脚本中使用此方法。

答案示例:

{{
   "success":true,
   "data":[{
      "show_to_affiliates":true,
      "trip_class":0,
      "origin":"LED",
      "destination":"HKT",
      "depart_date":"2015-10-01",
      "return_date":"",
      "number_of_changes":1,
      "value":29127,
      "found_at":"2015-09-24T00:06:12+04:00",
      "distance":8015,
      "actual":true
   }]
}

上面的API响应示例显示了从圣彼得堡到Phuk的机票...哦,梦to以求...
由于我来自喀山,而普吉岛现在“只是在做梦”,我们将查找从圣彼得堡到喀山的机票。
假设您已经拥有一个AWS账户。我要立即特别注意,Kinesis和通过SMS发送通知不包含在年度免费套餐(免费使用)中但是尽管如此,只要记住几美元,就可以构建建议的系统并使用它。并且,当然,不要在不必要的所有资源后删除它们。
幸运的是,如果您保持每月免费使用限制,DynamoDb和lambda函数将成为我们的共享软件。例如,对于DynamoDB:25 GB的存储,25 WCU / RCU和1亿个请求。每月有100万个调用lambda函数。

手动部署系统


设置Kinesis数据流


转到Kinesis Data Streams服务并创建两个新的流,每个流一个分片。

什么是碎片?
— Amazon Kinesis. 1 / 2 /. 1000 PUT . . , . 2 / 4 / 2000 PUT .

流中的碎片越多,其吞吐量就越大。原则上,通过添加分片以这种方式缩放流。但是碎片越多,价格就越高。每个分片每小时的费用为1.5美分,每百万个PUT有效负载单位为1.4美分。

创建一个名称为airline_tickets的新线程,一个碎片就足够了:


现在创建另一个名为special_stream的


生产者设定


作为用于解析任务的数据生产者,使用常规的EC2实例就足够了。它不必是功能强大且昂贵的虚拟机;现货t2.micro非常适合。

重要说明:例如,您应该使用映像-Amazon Linux AMI 2018.03.0,使用该映像可以快速启动Kinesis Agent的设置较少。

转到EC2服务,创建一个新的虚拟机,然后选择t2.micro类型的所需AMI,该类型包含在Free Tier中:


为了使新创建的虚拟机能够与Kinesis服务交互,您必须授予它这样做的权利。最好的方法是分配一个IAM角色。因此,在“步骤3:配置实例详细信息”屏幕上,选择“ 创建新的IAM角色”

为EC2创建IAM角色

, , EC2 Permissions:


, : AmazonKinesisFullAccess CloudWatchFullAccess.

- , : EC2-KinesisStreams-FullAccess. , , :


, :


.

默认情况下,您也可以保留硬盘的参数,也可以保留标签(尽管使用标签是一种很好的做法,但至少要提供实例名称并指定环境)。

现在,我们在“第6步:配置安全组”选项卡上,您需要在其中创建一个新的安全组或指定现有的安全组,该组允许您通过ssh(端口22)连接到实例。选择“源”->“我的IP”,然后可以运行实例。


进入运行状态后,您可以尝试通过ssh连接到它。

为了成功使用Kinesis Agent,在成功连接到计算机之后,必须在终端中输入以下命令:

sudo yum -y update
sudo yum install -y python36 python36-pip
sudo /usr/bin/pip-3.6 install --upgrade pip
sudo yum install -y aws-kinesis-agent

创建用于保存API响应的文件夹:

sudo mkdir /var/log/airline_tickets

在启动代理之前,您需要配置其配置:

sudo vim /etc/aws-kinesis/agent.json

agent.json文件的内容应如下所示:

{
  "cloudwatch.emitMetrics": true,
  "kinesis.endpoint": "",
  "firehose.endpoint": "",

  "flows": [
    {
      "filePattern": "/var/log/airline_tickets/*log",
      "kinesisStream": "airline_tickets",
      "partitionKeyOption": "RANDOM",
      "dataProcessingOptions": [
         {
            "optionName": "CSVTOJSON",
            "customFieldNames": ["cost","trip_class","show_to_affiliates",
                "return_date","origin","number_of_changes","gate","found_at",
                "duration","distance","destination","depart_date","actual","record_id"]
         }
      ]
    }
  ]
}

从配置文件中可以看到,代理将监视/ var / log / airline_tickets /目录中具有.log扩展名的文件,解析它们并将其传输到airline_tickets流。

我们重新启动该服务,并确保它可以启动并运行:

sudo service aws-kinesis-agent restart

现在下载Python脚本,该脚本将从API请求数据:

REPO_PATH=https://raw.githubusercontent.com/igorgorbenko/aviasales_kinesis/master/producer

wget $REPO_PATH/api_caller.py -P /home/ec2-user/
wget $REPO_PATH/requirements.txt -P /home/ec2-user/
sudo chmod a+x /home/ec2-user/api_caller.py
sudo /usr/local/bin/pip3 install -r /home/ec2-user/requirements.txt

api_caller.py脚本从Aviasales请求数据,并将收到的响应保存在Kinesis代理扫描的目录中。这个脚本的实现是相当标准的,有一个TicketsApi类,它允许您异步提取API。在此类中,我们传递带有令牌和请求参数的标头:

class TicketsApi:
    """Api caller class."""

    def __init__(self, headers):
        """Init method."""
        self.base_url = BASE_URL
        self.headers = headers

    async def get_data(self, data):
        """Get the data from API query."""
        response_json = {}
        async with ClientSession(headers=self.headers) as session:
            try:
                response = await session.get(self.base_url, data=data)
                response.raise_for_status()
                LOGGER.info('Response status %s: %s',
                            self.base_url, response.status)
                response_json = await response.json()
            except HTTPError as http_err:
                LOGGER.error('Oops! HTTP error occurred: %s', str(http_err))
            except Exception as err:
                LOGGER.error('Oops! An error ocurred: %s', str(err))
            return response_json


def prepare_request(api_token):
    """Return the headers and query fot the API request."""
    headers = {'X-Access-Token': api_token,
               'Accept-Encoding': 'gzip'}

    data = FormData()
    data.add_field('currency', CURRENCY)
    data.add_field('origin', ORIGIN)
    data.add_field('destination', DESTINATION)
    data.add_field('show_to_affiliates', SHOW_TO_AFFILIATES)
    data.add_field('trip_duration', TRIP_DURATION)
    return headers, data


async def main():
    """Get run the code."""
    if len(sys.argv) != 2:
        print('Usage: api_caller.py <your_api_token>')
        sys.exit(1)
        return
    api_token = sys.argv[1]
    headers, data = prepare_request(api_token)

    api = TicketsApi(headers)
    response = await api.get_data(data)
    if response.get('success', None):
        LOGGER.info('API has returned %s items', len(response['data']))
        try:
            count_rows = log_maker(response)
            LOGGER.info('%s rows have been saved into %s',
                        count_rows,
                        TARGET_FILE)
        except Exception as e:
            LOGGER.error('Oops! Request result was not saved to file. %s',
                         str(e))
    else:
        LOGGER.error('Oops! API request was unsuccessful %s!', response)

为了测试代理的正确设置和可操作性,我们将对api_caller.py脚本进行测试运行:

sudo ./api_caller.py TOKEN


我们会在座席的日志中以及airline_tickets数据流中的“监视”选项卡上查看工作结果:

tail -f /var/log/aws-kinesis-agent/aws-kinesis-agent.log



如您所见,一切正常,Kinesis Agent成功将数据发送到流。现在配置使用者。

设置Kinesis Data Analytics


让我们继续整个系统的中心组件-在Kinesis Data Analytics中创建一个名为kinesis_analytics_airlines_app的新应用程序:


Kinesis Data Analytics支持使用SQL对Kinesis Streams中的数据进行实时分析。这是一个完全可自动缩放的服务(与Kinesis Streams不同),该服务:

  1. 允许您基于对源数据的查询来创建新流(输出流);
  2. 提供具有在应用程序操作期间发生的错误的流(错误流);
  3. 它可以自动确定输入数据方案(必要时可以手动重新定义)。

这是一项昂贵的服务-每小时0.11 USD,因此您应谨慎使用它,并在完成工作后将其删除。

将应用程序连接到数据源:


选择您要连接的流(airline_tickets):


接下来,您需要附加新的IAM角色,以便应用程序可以从流中读取并写入流中。为此,只要不更改访问权限块中的任何内容即可:


现在,我们请求在流中发现数据方案,为此,我们单击“发现模式”按钮。结果,将更新IAM角色(将创建一个新角色),并将启动从流中已经到达的数据中发现方案的过程:


现在,您需要转到SQL编辑器。当您单击此按钮时,将出现一个有关启动应用程序的问题的窗口-选择我们要运行的内容:


在“ SQL编辑器”窗口中,插入一个简单的查询,然后单击“保存并运行SQL”:

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ("cost" DOUBLE, "gate" VARCHAR(16));

CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM "cost", "gate"
FROM "SOURCE_SQL_STREAM_001"
WHERE "cost" < 5000
    and "gate" = 'Aeroflot';

在关系数据库中,可以使用INSERT语句来添加记录并使用SELECT语句来查询数据的表。在Amazon Kinesis Data Analytics中,您使用流(STREAM)和“泵”(PUMP)-连续插入请求,这些请求将数据从应用程序中的一个流插入到另一个流中。

在上面的SQL查询中,以低于五千卢布的价格搜索Aeroflot机票。属于这些条件的所有记录都将放置在DESTINATION_SQL_STREAM流中。


在目标块中,选择special_stream流,然后在应用程序内流名称DESTINATION_SQL_STREAM下拉列表中:


经过所有操作,结果应类似于下图:



创建和订阅SNS主题


转到简单通知服务,然后创建一个名为“航空公司”的新主题:


我们订阅了该主题,在其中指明了SMS通知将到达的手机号码:


在DynamoDB中创建表


要存储其airline_tickets流的原始数据,请在DynamoDB中创建一个具有相同名称的表。作为主键,我们将使用record_id:


创建Lambda收集器函数


让我们创建一个称为收集器的lambda函数,该函数的任务是轮询airline_tickets流,如果那里有新记录,则将这些记录插入DynamoDB表中。显然,除了默认权限外,此lambda还必须有权读取Kinesis数据流并写入DynamoDB。

为Lambda收集器功能创建IAM角色
IAM Lambda-TicketsProcessingRole:


AmazonKinesisReadOnlyAccess AmazonDynamoDBFullAccess, :



当新条目击中airline_stream流时,该lambda应该由Kinesis触发器触发,因此您需要添加一个新触发器:



仍然需要插入代码并保存lambda。

"""Parsing the stream and inserting into the DynamoDB table."""
import base64
import json
import boto3
from decimal import Decimal

DYNAMO_DB = boto3.resource('dynamodb')
TABLE_NAME = 'airline_tickets'

class TicketsParser:
    """Parsing info from the Stream."""

    def __init__(self, table_name, records):
        """Init method."""
        self.table = DYNAMO_DB.Table(table_name)
        self.json_data = TicketsParser.get_json_data(records)

    @staticmethod
    def get_json_data(records):
        """Return deserialized data from the stream."""
        decoded_record_data = ([base64.b64decode(record['kinesis']['data'])
                                for record in records])
        json_data = ([json.loads(decoded_record)
                      for decoded_record in decoded_record_data])
        return json_data

    @staticmethod
    def get_item_from_json(json_item):
        """Pre-process the json data."""
        new_item = {
            'record_id': json_item.get('record_id'),
            'cost': Decimal(json_item.get('cost')),
            'trip_class': json_item.get('trip_class'),
            'show_to_affiliates': json_item.get('show_to_affiliates'),
            'origin': json_item.get('origin'),
            'number_of_changes': int(json_item.get('number_of_changes')),
            'gate': json_item.get('gate'),
            'found_at': json_item.get('found_at'),
            'duration': int(json_item.get('duration')),
            'distance': int(json_item.get('distance')),
            'destination': json_item.get('destination'),
            'depart_date': json_item.get('depart_date'),
            'actual': json_item.get('actual')
        }
        return new_item

    def run(self):
        """Batch insert into the table."""
        with self.table.batch_writer() as batch_writer:
            for item in self.json_data:
                dynamodb_item = TicketsParser.get_item_from_json(item)
                batch_writer.put_item(dynamodb_item)

        print('Has been added ', len(self.json_data), 'items')

def lambda_handler(event, context):
    """Parse the stream and insert into the DynamoDB table."""
    print('Got event:', event)
    parser = TicketsParser(TABLE_NAME, event['Records'])
    parser.run()

创建Lambda函数通知程序


以相同的方式创建第二个lambda函数,该函数将监视第二个流(special_stream)并向SNS发送通知。因此,此lambda必须具有来自Kinesis的读取访问权限,并将消息发送到指定的SNS主题,然后由SNS服务将其发送给该主题的所有订户(电子邮件,SMS等)。

创建IAM角色
IAM Lambda-KinesisAlarm , alarm_notifier:



该lambda应该根据触发新条目进入special_stream的触发器来工作,因此您需要以与收集器lambda相同的方式配置触发器。

为了方便配置此lambda,我们引入了一个新的环境变量TOPIC_ARN,在其中放置了Airlines的ANR(亚马逊资源名称)主题:


并插入lambda代码,这非常简单:

import boto3
import base64
import os

SNS_CLIENT = boto3.client('sns')
TOPIC_ARN = os.environ['TOPIC_ARN']


def lambda_handler(event, context):
    try:
        SNS_CLIENT.publish(TopicArn=TOPIC_ARN,
                           Message='Hi! I have found an interesting stuff!',
                           Subject='Airline tickets alarm')
        print('Alarm message has been successfully delivered')
    except Exception as err:
        print('Delivery failure', str(err))

看来这完成了手动系统配置。仅用于测试并确保我们正确配置了所有内容。

从Terraform代码进行部署


必要的准备


Terraform是一个非常方便的开源工具,用于通过代码部署基础架构。它具有自己的语法,易于学习,并提供了许多有关如何部署以及部署什么的示例。Atom或Visual Studio代码编辑器中有许多方便的插件,这些插件使使用Terraform更加容易。

您可以从此处下载分发工具包对Terraform的所有功能的详细分析不在本文的讨论范围之内,因此我们将限制自己的重点。

如何开始


完整的项目代码在我的存储库中我们为自己克隆一个存储库。在开始之前,您需要确保已安装并配置了AWS CLI,如下所示 Terraform将在〜/ .aws /凭证文件中查找凭证。

在启动plan命令之前先部署整个基础架构是一个好习惯,以查看Terraform现在正在云中为我们创建的内容:

terraform.exe plan

系统将提示您输入电话号码以向其发送通知。在此阶段,它是可选的。


在分析了程序的工作计划之后,我们可以开始创建资源:

terraform.exe apply

发送此命令后,将再次要求您输入电话号码,并在显示有关实际执行操作的问题时键入“是”。这将使您能够提升整个基础架构,执行EC2的所有必要设置,部署lambda函数等。

通过Terraform代码成功创建所有资源后,您需要进入Kinesis Analytics应用程序的详细信息(不幸的是,我没有直接从代码中找到如何执行此操作)。

启动应用程序:


之后,您必须通过从下拉列表中进行选择来显式设置应用程序内流名称:



现在一切准备就绪。

应用测试


无论您是如何手动部署系统还是通过Terraform代码部署系统,其工作方式都一样。

我们通过SSH进入安装了Kinesis Agent的EC2虚拟机,并运行api_caller.py脚本

sudo ./api_caller.py TOKEN

等待短信到您的电话号码仍然是:


短信-短信将在1分钟内到达电话:


记录是否保存在DynamoDB数据库中以进行后续更详细的分析还有待观察。airline_tickets表包含以下内容:


结论


在完成工作的过程中,构建了基于Amazon Kinesis的在线数据处理系统。我们检查了将Kinesis Agent与Kinesis Data Streams结合使用以及使用SQL命令对Kinesis Analytics进行实时分析的选项,以及Amazon Kinesis与其他AWS服务的交互。

我们以两种方式部署上述系统:足够长的手动时间和从Terraform代码快速获取的时间。

该项目的整个源代码都可以在我在GitHub上的存储库中找到,建议您熟悉一下它。

我准备愉快地讨论这篇文章,我在等待您的评论。我希望进行建设性的批评。

祝你成功!

All Articles