Aviasales API integration with Amazon Kinesis and serverless simplicity

Hello, Habr!

Do you like to fly airplanes? I love it, but on self-isolation I also liked to analyze the airfare data of one well-known resource - Aviasales.

Today we will analyze the work of Amazon Kinesis, build a streaming system with real-time analytics, put the Amazon DynamoDB NoSQL database as the main data warehouse and set up SMS alerts for interesting tickets.

All the details under the cut! Go!



Introduction


For example, we need access to the Aviasales API . Access to it is provided free of charge and without restrictions, you only need to register in the "Developers" section to get your API token for accessing data.

The main purpose of this article is to give a general understanding of the use of streaming information in AWS, we make it out of the question that the data returned by the API used is not strictly relevant and is transferred from the cache, which is generated based on searches of users of the Aviasales.ru and Jetradar.com sites for last 48 hours.

The Kinesis-agent ticket data received through the API installed on the producer machine will be automatically parsed and transferred to the desired stream through Kinesis Data Analytics. An unprocessed version of this stream will be written directly to the repository. Deployed in DynamoDB storage of raw data will allow for a more in-depth analysis of tickets through BI tools, such as AWS Quick Sight.

We will consider two options for deploying the entire infrastructure:

  • Manual - through the AWS Management Console;
  • Infrastructure from Terraform code - for lazy automation engineers;

Architecture of the system under development



Components Used:

  • Aviasales API - the data returned by this API will be used for all subsequent work;
  • EC2 Producer Instance - a regular virtual machine in the cloud on which the input data stream will be generated:

    • Kinesis Agent is a locally installed Java application that provides an easy way to collect and send data to Kinesis (Kinesis Data Streams or Kinesis Firehose). The agent constantly monitors a set of files in the specified directories and sends new data to Kinesis;
    • Caller API Script - a Python script that makes API requests and puts the response in a folder that Kinesis Agent monitors;
  • Kinesis Data Streams — ;
  • Kinesis Analytics — , . Amazon Kinesis Data Analytics ;
  • AWS Lambda — , . ;
  • Amazon DynamoDB — «‑» , 10 . DynamoDB - , . DynamoDB , . ;
  • Amazon SNS — « — » (Pub/Sub), , . SNS push-, SMS- .

Initial training


To emulate the data stream, I decided to use the airfare information returned by the Aviasales API. The documentation has a fairly extensive list of different methods, take one of them - the “Price Calendar for the Month”, which returns prices for each day of the month, grouped by the number of transfers. If you do not transmit the search month in the request, information will be returned for the month following the current one.

So, register, get your token.

Example request below:

http://api.travelpayouts.com/v2/prices/month-matrix?currency=rub&origin=LED&destination=HKT&show_to_affiliates=true&token=TOKEN_API

The above method of receiving data from the API with the token in the request will work, but I prefer to pass the access token through the header, so we will use this method in the api_caller.py script.

Answer example:

{{
   "success":true,
   "data":[{
      "show_to_affiliates":true,
      "trip_class":0,
      "origin":"LED",
      "destination":"HKT",
      "depart_date":"2015-10-01",
      "return_date":"",
      "number_of_changes":1,
      "value":29127,
      "found_at":"2015-09-24T00:06:12+04:00",
      "distance":8015,
      "actual":true
   }]
}

The API response example above shows a ticket from St. Petersburg to Phuk ... Oh, what to dream of ...
Since I'm from Kazan, and Phuket is "just dreaming about us" now, we will look for tickets from St. Petersburg to Kazan.
It is assumed that you already have an AWS account. I want to pay special attention right away that Kinesis and sending notifications via SMS are not included in the annual Free Tier (free use) . But even despite this, having a couple of dollars in mind, it is quite possible to build the proposed system and play with it. And, of course, do not forget to delete all resources after they become unnecessary.
Fortunately, DynamoDb and lambda functions will be shareware for us if you keep within the monthly free limits. For example, for DynamoDB: 25 GB of storage, 25 WCU / RCU and 100 million requests. And a million calls to lambda functions per month.

Manual deployment system


Setting up Kinesis Data Streams


Go to the Kinesis Data Streams service and create two new streams, one shard for each.

What is a shard?
— Amazon Kinesis. 1 / 2 /. 1000 PUT . . , . 2 / 4 / 2000 PUT .

The more shards in your stream, the greater its throughput. In principle, streams are scaled this way by adding shards. But the more shards you have, the higher the price. Each shard costs 1.5 cents per hour and an additional 1.4 cents for every million PUT payload units.

Create a new thread with the name airline_tickets , 1 shard will be enough for it:


Now create another stream called special_stream :


Producer Setting


As a data producer for parsing a task, it is enough to use a regular EC2 instance. It does not have to be a powerful expensive virtual machine; spot t2.micro is quite suitable.

Important note: for example, you should use image - Amazon Linux AMI 2018.03.0, with it there are fewer settings to quickly launch the Kinesis Agent.

Go to the EC2 service, create a new virtual machine, select the desired AMI with the t2.micro type, which is included in Free Tier:


In order for the newly created virtual machine to be able to interact with the Kinesis service, you must give it the right to do so. The best way to do this is to assign an IAM Role. Therefore, on the Step 3: Configure Instance Details screen, select Create new IAM Role :

Creating IAM Roles for EC2

, , EC2 Permissions:


, : AmazonKinesisFullAccess CloudWatchFullAccess.

- , : EC2-KinesisStreams-FullAccess. , , :


, :


.

You can leave the parameters of the hard disk by default, the tags too (although it’s good practice to use tags, at least give the name of the instance and specify the environment).

Now we are on the Step 6: Configure Security Group tab, where you need to create a new one or specify your existing Security group, which allows you to connect via ssh (port 22) to the instance. Select Source -> My IP there and you can run the instance.


Once it enters running status, you can try to connect to it via ssh.

To be able to work with Kinesis Agent, after a successful connection to the machine, you must enter the following commands in the terminal:

sudo yum -y update
sudo yum install -y python36 python36-pip
sudo /usr/bin/pip-3.6 install --upgrade pip
sudo yum install -y aws-kinesis-agent

Create a folder for saving API responses:

sudo mkdir /var/log/airline_tickets

Before starting the agent, you need to configure its config:

sudo vim /etc/aws-kinesis/agent.json

The contents of the agent.json file should look like this:

{
  "cloudwatch.emitMetrics": true,
  "kinesis.endpoint": "",
  "firehose.endpoint": "",

  "flows": [
    {
      "filePattern": "/var/log/airline_tickets/*log",
      "kinesisStream": "airline_tickets",
      "partitionKeyOption": "RANDOM",
      "dataProcessingOptions": [
         {
            "optionName": "CSVTOJSON",
            "customFieldNames": ["cost","trip_class","show_to_affiliates",
                "return_date","origin","number_of_changes","gate","found_at",
                "duration","distance","destination","depart_date","actual","record_id"]
         }
      ]
    }
  ]
}

As you can see from the configuration file, the agent will monitor files with the .log extension in the / var / log / airline_tickets / directory, parse them and transfer them to the airline_tickets stream.

We restart the service and make sure that it starts and works:

sudo service aws-kinesis-agent restart

Now download the Python script that will request data from the API:

REPO_PATH=https://raw.githubusercontent.com/igorgorbenko/aviasales_kinesis/master/producer

wget $REPO_PATH/api_caller.py -P /home/ec2-user/
wget $REPO_PATH/requirements.txt -P /home/ec2-user/
sudo chmod a+x /home/ec2-user/api_caller.py
sudo /usr/local/bin/pip3 install -r /home/ec2-user/requirements.txt

The api_caller.py script requests data from Aviasales and saves the received response in the directory that Kinesis agent scans. The implementation of this script is fairly standard, there is a class TicketsApi, it allows you to pull the API asynchronously. In this class, we pass the header with the token and the request parameters:

class TicketsApi:
    """Api caller class."""

    def __init__(self, headers):
        """Init method."""
        self.base_url = BASE_URL
        self.headers = headers

    async def get_data(self, data):
        """Get the data from API query."""
        response_json = {}
        async with ClientSession(headers=self.headers) as session:
            try:
                response = await session.get(self.base_url, data=data)
                response.raise_for_status()
                LOGGER.info('Response status %s: %s',
                            self.base_url, response.status)
                response_json = await response.json()
            except HTTPError as http_err:
                LOGGER.error('Oops! HTTP error occurred: %s', str(http_err))
            except Exception as err:
                LOGGER.error('Oops! An error ocurred: %s', str(err))
            return response_json


def prepare_request(api_token):
    """Return the headers and query fot the API request."""
    headers = {'X-Access-Token': api_token,
               'Accept-Encoding': 'gzip'}

    data = FormData()
    data.add_field('currency', CURRENCY)
    data.add_field('origin', ORIGIN)
    data.add_field('destination', DESTINATION)
    data.add_field('show_to_affiliates', SHOW_TO_AFFILIATES)
    data.add_field('trip_duration', TRIP_DURATION)
    return headers, data


async def main():
    """Get run the code."""
    if len(sys.argv) != 2:
        print('Usage: api_caller.py <your_api_token>')
        sys.exit(1)
        return
    api_token = sys.argv[1]
    headers, data = prepare_request(api_token)

    api = TicketsApi(headers)
    response = await api.get_data(data)
    if response.get('success', None):
        LOGGER.info('API has returned %s items', len(response['data']))
        try:
            count_rows = log_maker(response)
            LOGGER.info('%s rows have been saved into %s',
                        count_rows,
                        TARGET_FILE)
        except Exception as e:
            LOGGER.error('Oops! Request result was not saved to file. %s',
                         str(e))
    else:
        LOGGER.error('Oops! API request was unsuccessful %s!', response)

To test the correct settings and operability of the agent, we will make a test run of the api_caller.py script:

sudo ./api_caller.py TOKEN


And we look at the result of work in the Agent’s logs and on the Monitoring tab in the airline_tickets data stream:

tail -f /var/log/aws-kinesis-agent/aws-kinesis-agent.log



As you can see, everything works and the Kinesis Agent successfully sends data to the stream. Now configure consumer.

Setting up Kinesis Data Analytics


Let's move on to the central component of the entire system - create a new application in Kinesis Data Analytics called kinesis_analytics_airlines_app:


Kinesis Data Analytics enables real-time analytics of data from Kinesis Streams using SQL. This is a fully auto-scalable service (unlike Kinesis Streams), which:

  1. allows you to create new streams (Output Stream) based on queries to the source data;
  2. provides a stream with errors that occurred during application operation (Error Stream);
  3. It can automatically determine the input data scheme (it can be manually redefined if necessary).

This is an expensive service - 0.11 USD per hour, so you should use it carefully and remove it when you complete the work.

Connect the application to the data source:


Select the stream you want to connect to (airline_tickets):


Next, you need to attach the new IAM Role so that the application can read from the stream and write to the stream. To do this, it is enough to not change anything in the Access permissions block:


Now we request the discovery of the data scheme in the stream, for this we click on the button "Discover schema". As a result, the IAM role is updated (a new one is created) and the discovery of the scheme from the data that has already arrived in the stream is launched:


Now you need to go to the SQL editor. When you click on this button, a window with a question about launching the application will appear - choose what we want to run:


In the SQL editor window, insert such a simple query and click Save and Run SQL:

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ("cost" DOUBLE, "gate" VARCHAR(16));

CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM "cost", "gate"
FROM "SOURCE_SQL_STREAM_001"
WHERE "cost" < 5000
    and "gate" = 'Aeroflot';

In relational databases, you work with tables using INSERT statements to add records and a SELECT statement to query data. In Amazon Kinesis Data Analytics, you work with streams (STREAM) and “pumps” (PUMP) - continuous insertion requests that insert data from one stream in an application into another stream.

In the above SQL query, Aeroflot tickets are searched for at prices below five thousand rubles. All records that fall under these conditions will be placed in the DESTINATION_SQL_STREAM stream.


In the Destination block, select the special_stream stream, and in the In-application stream name DESTINATION_SQL_STREAM drop-down list:


As a result of all the manipulations, something similar to the picture below should turn out:



Creating and subscribing to the SNS topic


Go to the Simple Notification Service and create a new topic named Airlines:


We subscribe to this topic, in it we indicate the mobile phone number to which SMS notifications will come:


Creating a table in DynamoDB


To store the raw data of their airline_tickets stream, create a table in DynamoDB with the same name. As the primary key, we will use record_id:


Creating a lambda collector function


Let's create a lambda function called Collector, whose task is to poll the airline_tickets stream and, if there are new records there, insert these records into the DynamoDB table. Obviously, in addition to the default rights, this lambda must have access to read the Kinesis data stream and write to DynamoDB.

Creating an IAM role for a lambda collector function
IAM Lambda-TicketsProcessingRole:


AmazonKinesisReadOnlyAccess AmazonDynamoDBFullAccess, :



This lambda should be triggered by a Kinesis trigger when new entries hit the airline_stream stream, so you need to add a new trigger:



It remains to insert the code and save the lambda.

"""Parsing the stream and inserting into the DynamoDB table."""
import base64
import json
import boto3
from decimal import Decimal

DYNAMO_DB = boto3.resource('dynamodb')
TABLE_NAME = 'airline_tickets'

class TicketsParser:
    """Parsing info from the Stream."""

    def __init__(self, table_name, records):
        """Init method."""
        self.table = DYNAMO_DB.Table(table_name)
        self.json_data = TicketsParser.get_json_data(records)

    @staticmethod
    def get_json_data(records):
        """Return deserialized data from the stream."""
        decoded_record_data = ([base64.b64decode(record['kinesis']['data'])
                                for record in records])
        json_data = ([json.loads(decoded_record)
                      for decoded_record in decoded_record_data])
        return json_data

    @staticmethod
    def get_item_from_json(json_item):
        """Pre-process the json data."""
        new_item = {
            'record_id': json_item.get('record_id'),
            'cost': Decimal(json_item.get('cost')),
            'trip_class': json_item.get('trip_class'),
            'show_to_affiliates': json_item.get('show_to_affiliates'),
            'origin': json_item.get('origin'),
            'number_of_changes': int(json_item.get('number_of_changes')),
            'gate': json_item.get('gate'),
            'found_at': json_item.get('found_at'),
            'duration': int(json_item.get('duration')),
            'distance': int(json_item.get('distance')),
            'destination': json_item.get('destination'),
            'depart_date': json_item.get('depart_date'),
            'actual': json_item.get('actual')
        }
        return new_item

    def run(self):
        """Batch insert into the table."""
        with self.table.batch_writer() as batch_writer:
            for item in self.json_data:
                dynamodb_item = TicketsParser.get_item_from_json(item)
                batch_writer.put_item(dynamodb_item)

        print('Has been added ', len(self.json_data), 'items')

def lambda_handler(event, context):
    """Parse the stream and insert into the DynamoDB table."""
    print('Got event:', event)
    parser = TicketsParser(TABLE_NAME, event['Records'])
    parser.run()

Creating a lambda function notifier


The second lambda function, which will monitor the second stream (special_stream) and send a notification to SNS, is created in the same way. Therefore, this lambda must have read access from Kinesis and send messages to the specified SNS topic, which is then sent by the SNS service to all subscribers of this topic (email, SMS, etc.).

Create IAM Roles
IAM Lambda-KinesisAlarm , alarm_notifier:



This lambda should work according to the trigger for new entries to enter the special_stream, so you need to configure the trigger in the same way as we did for the Collector lambda.

For the convenience of configuring this lambda, we introduce a new environment variable - TOPIC_ARN, where we place the ANR (Amazon Recourse Names) topic of Airlines:


And insert the lambda code, it’s very simple:

import boto3
import base64
import os

SNS_CLIENT = boto3.client('sns')
TOPIC_ARN = os.environ['TOPIC_ARN']


def lambda_handler(event, context):
    try:
        SNS_CLIENT.publish(TopicArn=TOPIC_ARN,
                           Message='Hi! I have found an interesting stuff!',
                           Subject='Airline tickets alarm')
        print('Alarm message has been successfully delivered')
    except Exception as err:
        print('Delivery failure', str(err))

It seems that this completes the manual system configuration. It remains only to test and make sure that we configured everything correctly.

Deploy from Terraform code


Necessary preparation


Terraform is a very convenient open-source tool for deploying infrastructure from code. It has its own syntax that is easy to learn and many examples of how and what to deploy. There are many convenient plugins in the Atom or Visual Studio Code editor that make it easier to work with Terraform.

You can download the distribution kit from here . A detailed analysis of all the features of Terraform is beyond the scope of this article, so we will restrict ourselves to the main points.

How to start


The full project code is in my repository . We clone a repository to ourselves. Before starting, you need to make sure that you have installed and configured AWS CLI, as Terraform will look for credentials in the ~ / .aws / credentials file.

It’s good practice to deploy the entire infrastructure before launching the plan command to see what Terraform is creating for us in the cloud:

terraform.exe plan

You will be prompted to enter a phone number to send notifications to it. At this stage, it is optional.


After analyzing the program’s work plan, we can start the creation of resources:

terraform.exe apply

After sending this command, you will again be asked to enter a phone number, type “yes” when the question about the actual execution of actions is displayed. This will allow you to raise the entire infrastructure, carry out all the necessary settings for EC2, deploy lambda functions, etc.

After all the resources have been successfully created through the Terraform code, you need to go into the details of the Kinesis Analytics application (unfortunately, I did not find how to do this directly from the code).

Launch the application:


After that, you must explicitly set the in-application stream name by choosing from the drop-down list:



Now everything is ready to go.

Application Testing


Regardless of how you deployed the system, manually or through the Terraform code, it will work the same.

We go through SSH to the EC2 virtual machine where Kinesis Agent is installed and run the api_caller.py script

sudo ./api_caller.py TOKEN

It remains to wait for an SMS to your number:


SMS - message arrives on the phone in almost 1 minute:


It remains to be seen whether records are saved in the DynamoDB database for subsequent, more detailed analysis. The airline_tickets table contains something like this:


Conclusion


In the course of the work done, an online data processing system based on Amazon Kinesis was built. We examined the options for using Kinesis Agent in conjunction with Kinesis Data Streams and real-time analytics of Kinesis Analytics using SQL commands, as well as the interaction of Amazon Kinesis with other AWS services.

We deployed the above system in two ways: long enough manual and fast from the Terraform code.

The entire source code of the project is available in my repository on GitHub , I suggest you familiarize yourself with it.

With pleasure I am ready to discuss the article, I'm waiting for your comments. I hope for constructive criticism.

Wish you success!

All Articles