哈Ha!你喜欢坐飞机吗?我喜欢它,但出于自我隔离,我也喜欢分析一种著名资源Aviasales的机票数据。今天,我们将分析Amazon Kinesis的工作,构建具有实时分析的流系统,将Amazon DynamoDB NoSQL数据库作为主要数据仓库,并为有趣的门票设置SMS警报。所有细节都砍下!走!介绍
例如,我们需要访问Aviasales API。免费提供对它的访问,没有任何限制,您只需要在“开发人员”部分进行注册即可获取用于访问数据的API令牌。本文的主要目的是提供对AWS中流信息的使用的一般理解,我们摆脱了所用API返回的数据并非严格相关,而是从缓存中传输的问题,缓存是根据Aviasales.ru和Jetradar.com网站用户的搜索生成的持续48小时。
通过生产者计算机上安装的API接收到的Kinesis-agent机票数据将被自动解析,并通过Kinesis Data Analytics传输到所需的流中。该流的未处理版本将直接写入存储库。部署在DynamoDB中的原始数据存储将允许通过BI工具(例如AWS Quick Sight)对票证进行更深入的分析。我们将考虑两种方法来部署整个基础结构:- 手册-通过AWS管理控制台;
- Terraform代码的基础结构-适用于懒惰的自动化工程师;
正在开发的系统架构
使用的组件:初步训练
为了模拟数据流,我决定使用Aviasales API返回的机票信息。该文档提供了相当广泛的各种方法列表,其中之一是“本月的价格日历”,它返回每月的每一天的价格,并按转账次数分组。如果您未在请求中传送搜索月份,则将返回当前月份之后的月份的信息。因此,注册并获取令牌。以下示例请求:http://api.travelpayouts.com/v2/prices/month-matrix?currency=rub&origin=LED&destination=HKT&show_to_affiliates=true&token=TOKEN_API
上面的通过请求中的令牌从API接收数据的方法可以使用,但是我更喜欢通过标头传递访问令牌,因此我们将在api_caller.py脚本中使用此方法。答案示例:{{
"success":true,
"data":[{
"show_to_affiliates":true,
"trip_class":0,
"origin":"LED",
"destination":"HKT",
"depart_date":"2015-10-01",
"return_date":"",
"number_of_changes":1,
"value":29127,
"found_at":"2015-09-24T00:06:12+04:00",
"distance":8015,
"actual":true
}]
}
上面的API响应示例显示了从圣彼得堡到Phuk的机票...哦,梦to以求...由于我来自喀山,而普吉岛现在“只是在做梦”,我们将查找从圣彼得堡到喀山的机票。假设您已经拥有一个AWS账户。我要立即特别注意,Kinesis和通过SMS发送通知不包含在年度免费套餐(免费使用)中。但是尽管如此,只要记住几美元,就可以构建建议的系统并使用它。并且,当然,不要在不必要的所有资源后删除它们。
幸运的是,如果您保持每月免费使用限制,DynamoDb和lambda函数将成为我们的共享软件。例如,对于DynamoDB:25 GB的存储,25 WCU / RCU和1亿个请求。每月有100万个调用lambda函数。手动部署系统
设置Kinesis数据流
转到Kinesis Data Streams服务并创建两个新的流,每个流一个分片。什么是碎片?— Amazon Kinesis. 1 / 2 /. 1000 PUT . . , . 2 / 4 / 2000 PUT .
流中的碎片越多,其吞吐量就越大。原则上,通过添加分片以这种方式缩放流。但是碎片越多,价格就越高。每个分片每小时的费用为1.5美分,每百万个PUT有效负载单位为1.4美分。创建一个名称为airline_tickets的新线程,一个碎片就足够了:现在创建另一个名为special_stream的流:生产者设定
作为用于解析任务的数据生产者,使用常规的EC2实例就足够了。它不必是功能强大且昂贵的虚拟机;现货t2.micro非常适合。重要说明:例如,您应该使用映像-Amazon Linux AMI 2018.03.0,使用该映像可以快速启动Kinesis Agent的设置较少。转到EC2服务,创建一个新的虚拟机,然后选择t2.micro类型的所需AMI,该类型包含在Free Tier中:为了使新创建的虚拟机能够与Kinesis服务交互,您必须授予它这样做的权利。最好的方法是分配一个IAM角色。因此,在“步骤3:配置实例详细信息”屏幕上,选择“ 创建新的IAM角色”:为EC2创建IAM角色, , EC2 Permissions:
, : AmazonKinesisFullAccess CloudWatchFullAccess.
- , : EC2-KinesisStreams-FullAccess. , , :
, :
.
默认情况下,您也可以保留硬盘的参数,也可以保留标签(尽管使用标签是一种很好的做法,但至少要提供实例名称并指定环境)。现在,我们在“第6步:配置安全组”选项卡上,您需要在其中创建一个新的安全组或指定现有的安全组,该组允许您通过ssh(端口22)连接到实例。选择“源”->“我的IP”,然后可以运行实例。进入运行状态后,您可以尝试通过ssh连接到它。为了成功使用Kinesis Agent,在成功连接到计算机之后,必须在终端中输入以下命令:sudo yum -y update
sudo yum install -y python36 python36-pip
sudo /usr/bin/pip-3.6 install --upgrade pip
sudo yum install -y aws-kinesis-agent
创建用于保存API响应的文件夹:sudo mkdir /var/log/airline_tickets
在启动代理之前,您需要配置其配置:sudo vim /etc/aws-kinesis/agent.json
agent.json文件的内容应如下所示:{
"cloudwatch.emitMetrics": true,
"kinesis.endpoint": "",
"firehose.endpoint": "",
"flows": [
{
"filePattern": "/var/log/airline_tickets/*log",
"kinesisStream": "airline_tickets",
"partitionKeyOption": "RANDOM",
"dataProcessingOptions": [
{
"optionName": "CSVTOJSON",
"customFieldNames": ["cost","trip_class","show_to_affiliates",
"return_date","origin","number_of_changes","gate","found_at",
"duration","distance","destination","depart_date","actual","record_id"]
}
]
}
]
}
从配置文件中可以看到,代理将监视/ var / log / airline_tickets /目录中具有.log扩展名的文件,解析它们并将其传输到airline_tickets流。我们重新启动该服务,并确保它可以启动并运行:sudo service aws-kinesis-agent restart
现在下载Python脚本,该脚本将从API请求数据:REPO_PATH=https://raw.githubusercontent.com/igorgorbenko/aviasales_kinesis/master/producer
wget $REPO_PATH/api_caller.py -P /home/ec2-user/
wget $REPO_PATH/requirements.txt -P /home/ec2-user/
sudo chmod a+x /home/ec2-user/api_caller.py
sudo /usr/local/bin/pip3 install -r /home/ec2-user/requirements.txt
api_caller.py脚本从Aviasales请求数据,并将收到的响应保存在Kinesis代理扫描的目录中。这个脚本的实现是相当标准的,有一个TicketsApi类,它允许您异步提取API。在此类中,我们传递带有令牌和请求参数的标头:class TicketsApi:
"""Api caller class."""
def __init__(self, headers):
"""Init method."""
self.base_url = BASE_URL
self.headers = headers
async def get_data(self, data):
"""Get the data from API query."""
response_json = {}
async with ClientSession(headers=self.headers) as session:
try:
response = await session.get(self.base_url, data=data)
response.raise_for_status()
LOGGER.info('Response status %s: %s',
self.base_url, response.status)
response_json = await response.json()
except HTTPError as http_err:
LOGGER.error('Oops! HTTP error occurred: %s', str(http_err))
except Exception as err:
LOGGER.error('Oops! An error ocurred: %s', str(err))
return response_json
def prepare_request(api_token):
"""Return the headers and query fot the API request."""
headers = {'X-Access-Token': api_token,
'Accept-Encoding': 'gzip'}
data = FormData()
data.add_field('currency', CURRENCY)
data.add_field('origin', ORIGIN)
data.add_field('destination', DESTINATION)
data.add_field('show_to_affiliates', SHOW_TO_AFFILIATES)
data.add_field('trip_duration', TRIP_DURATION)
return headers, data
async def main():
"""Get run the code."""
if len(sys.argv) != 2:
print('Usage: api_caller.py <your_api_token>')
sys.exit(1)
return
api_token = sys.argv[1]
headers, data = prepare_request(api_token)
api = TicketsApi(headers)
response = await api.get_data(data)
if response.get('success', None):
LOGGER.info('API has returned %s items', len(response['data']))
try:
count_rows = log_maker(response)
LOGGER.info('%s rows have been saved into %s',
count_rows,
TARGET_FILE)
except Exception as e:
LOGGER.error('Oops! Request result was not saved to file. %s',
str(e))
else:
LOGGER.error('Oops! API request was unsuccessful %s!', response)
为了测试代理的正确设置和可操作性,我们将对api_caller.py脚本进行测试运行:sudo ./api_caller.py TOKEN
我们会在座席的日志中以及airline_tickets数据流中的“监视”选项卡上查看工作结果:tail -f /var/log/aws-kinesis-agent/aws-kinesis-agent.log
如您所见,一切正常,Kinesis Agent成功将数据发送到流。现在配置使用者。设置Kinesis Data Analytics
让我们继续整个系统的中心组件-在Kinesis Data Analytics中创建一个名为kinesis_analytics_airlines_app的新应用程序:Kinesis Data Analytics支持使用SQL对Kinesis Streams中的数据进行实时分析。这是一个完全可自动缩放的服务(与Kinesis Streams不同),该服务:- 允许您基于对源数据的查询来创建新流(输出流);
- 提供具有在应用程序操作期间发生的错误的流(错误流);
- 它可以自动确定输入数据方案(必要时可以手动重新定义)。
这是一项昂贵的服务-每小时0.11 USD,因此您应谨慎使用它,并在完成工作后将其删除。
将应用程序连接到数据源:选择您要连接的流(airline_tickets):接下来,您需要附加新的IAM角色,以便应用程序可以从流中读取并写入流中。为此,只要不更改访问权限块中的任何内容即可:现在,我们请求在流中发现数据方案,为此,我们单击“发现模式”按钮。结果,将更新IAM角色(将创建一个新角色),并将启动从流中已经到达的数据中发现方案的过程:现在,您需要转到SQL编辑器。当您单击此按钮时,将出现一个有关启动应用程序的问题的窗口-选择我们要运行的内容:在“ SQL编辑器”窗口中,插入一个简单的查询,然后单击“保存并运行SQL”:CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ("cost" DOUBLE, "gate" VARCHAR(16));
CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM "cost", "gate"
FROM "SOURCE_SQL_STREAM_001"
WHERE "cost" < 5000
and "gate" = 'Aeroflot';
在关系数据库中,可以使用INSERT语句来添加记录并使用SELECT语句来查询数据的表。在Amazon Kinesis Data Analytics中,您使用流(STREAM)和“泵”(PUMP)-连续插入请求,这些请求将数据从应用程序中的一个流插入到另一个流中。
在上面的SQL查询中,以低于五千卢布的价格搜索Aeroflot机票。属于这些条件的所有记录都将放置在DESTINATION_SQL_STREAM流中。在目标块中,选择special_stream流,然后在应用程序内流名称DESTINATION_SQL_STREAM下拉列表中:经过所有操作,结果应类似于下图:创建和订阅SNS主题
转到简单通知服务,然后创建一个名为“航空公司”的新主题:我们订阅了该主题,在其中指明了SMS通知将到达的手机号码:在DynamoDB中创建表
要存储其airline_tickets流的原始数据,请在DynamoDB中创建一个具有相同名称的表。作为主键,我们将使用record_id:创建Lambda收集器函数
让我们创建一个称为收集器的lambda函数,该函数的任务是轮询airline_tickets流,如果那里有新记录,则将这些记录插入DynamoDB表中。显然,除了默认权限外,此lambda还必须有权读取Kinesis数据流并写入DynamoDB。为Lambda收集器功能创建IAM角色IAM Lambda-TicketsProcessingRole:
AmazonKinesisReadOnlyAccess AmazonDynamoDBFullAccess, :
当新条目击中airline_stream流时,该lambda应该由Kinesis触发器触发,因此您需要添加一个新触发器:仍然需要插入代码并保存lambda。"""Parsing the stream and inserting into the DynamoDB table."""
import base64
import json
import boto3
from decimal import Decimal
DYNAMO_DB = boto3.resource('dynamodb')
TABLE_NAME = 'airline_tickets'
class TicketsParser:
"""Parsing info from the Stream."""
def __init__(self, table_name, records):
"""Init method."""
self.table = DYNAMO_DB.Table(table_name)
self.json_data = TicketsParser.get_json_data(records)
@staticmethod
def get_json_data(records):
"""Return deserialized data from the stream."""
decoded_record_data = ([base64.b64decode(record['kinesis']['data'])
for record in records])
json_data = ([json.loads(decoded_record)
for decoded_record in decoded_record_data])
return json_data
@staticmethod
def get_item_from_json(json_item):
"""Pre-process the json data."""
new_item = {
'record_id': json_item.get('record_id'),
'cost': Decimal(json_item.get('cost')),
'trip_class': json_item.get('trip_class'),
'show_to_affiliates': json_item.get('show_to_affiliates'),
'origin': json_item.get('origin'),
'number_of_changes': int(json_item.get('number_of_changes')),
'gate': json_item.get('gate'),
'found_at': json_item.get('found_at'),
'duration': int(json_item.get('duration')),
'distance': int(json_item.get('distance')),
'destination': json_item.get('destination'),
'depart_date': json_item.get('depart_date'),
'actual': json_item.get('actual')
}
return new_item
def run(self):
"""Batch insert into the table."""
with self.table.batch_writer() as batch_writer:
for item in self.json_data:
dynamodb_item = TicketsParser.get_item_from_json(item)
batch_writer.put_item(dynamodb_item)
print('Has been added ', len(self.json_data), 'items')
def lambda_handler(event, context):
"""Parse the stream and insert into the DynamoDB table."""
print('Got event:', event)
parser = TicketsParser(TABLE_NAME, event['Records'])
parser.run()
创建Lambda函数通知程序
以相同的方式创建第二个lambda函数,该函数将监视第二个流(special_stream)并向SNS发送通知。因此,此lambda必须具有来自Kinesis的读取访问权限,并将消息发送到指定的SNS主题,然后由SNS服务将其发送给该主题的所有订户(电子邮件,SMS等)。创建IAM角色IAM Lambda-KinesisAlarm , alarm_notifier:
该lambda应该根据触发新条目进入special_stream的触发器来工作,因此您需要以与收集器lambda相同的方式配置触发器。为了方便配置此lambda,我们引入了一个新的环境变量TOPIC_ARN,在其中放置了Airlines的ANR(亚马逊资源名称)主题:并插入lambda代码,这非常简单:import boto3
import base64
import os
SNS_CLIENT = boto3.client('sns')
TOPIC_ARN = os.environ['TOPIC_ARN']
def lambda_handler(event, context):
try:
SNS_CLIENT.publish(TopicArn=TOPIC_ARN,
Message='Hi! I have found an interesting stuff!',
Subject='Airline tickets alarm')
print('Alarm message has been successfully delivered')
except Exception as err:
print('Delivery failure', str(err))
看来这完成了手动系统配置。仅用于测试并确保我们正确配置了所有内容。从Terraform代码进行部署
必要的准备
Terraform是一个非常方便的开源工具,用于通过代码部署基础架构。它具有自己的语法,易于学习,并提供了许多有关如何部署以及部署什么的示例。Atom或Visual Studio代码编辑器中有许多方便的插件,这些插件使使用Terraform更加容易。您可以从此处下载分发工具包。对Terraform的所有功能的详细分析不在本文的讨论范围之内,因此我们将限制自己的重点。如何开始
完整的项目代码在我的存储库中。我们为自己克隆一个存储库。在开始之前,您需要确保已安装并配置了AWS CLI,如下所示 Terraform将在〜/ .aws /凭证文件中查找凭证。在启动plan命令之前先部署整个基础架构是一个好习惯,以查看Terraform现在正在云中为我们创建的内容:terraform.exe plan
系统将提示您输入电话号码以向其发送通知。在此阶段,它是可选的。在分析了程序的工作计划之后,我们可以开始创建资源:terraform.exe apply
发送此命令后,将再次要求您输入电话号码,并在显示有关实际执行操作的问题时键入“是”。这将使您能够提升整个基础架构,执行EC2的所有必要设置,部署lambda函数等。通过Terraform代码成功创建所有资源后,您需要进入Kinesis Analytics应用程序的详细信息(不幸的是,我没有直接从代码中找到如何执行此操作)。启动应用程序:之后,您必须通过从下拉列表中进行选择来显式设置应用程序内流名称:现在一切准备就绪。应用测试
无论您是如何手动部署系统还是通过Terraform代码部署系统,其工作方式都一样。我们通过SSH进入安装了Kinesis Agent的EC2虚拟机,并运行api_caller.py脚本sudo ./api_caller.py TOKEN
等待短信到您的电话号码仍然是:短信-短信将在1分钟内到达电话:记录是否保存在DynamoDB数据库中以进行后续更详细的分析还有待观察。airline_tickets表包含以下内容:结论
在完成工作的过程中,构建了基于Amazon Kinesis的在线数据处理系统。我们检查了将Kinesis Agent与Kinesis Data Streams结合使用以及使用SQL命令对Kinesis Analytics进行实时分析的选项,以及Amazon Kinesis与其他AWS服务的交互。我们以两种方式部署上述系统:足够长的手动时间和从Terraform代码快速获取的时间。该项目的整个源代码都可以在我在GitHub上的存储库中找到,建议您熟悉一下它。我准备愉快地讨论这篇文章,我在等待您的评论。我希望进行建设性的批评。祝你成功!