The evolution of a monolithic application in microservices

As a rule, when you need to do something quickly and cheaply, we donโ€™t think about the fault tolerance and scalability of our application, which after some time necessarily leads to pain. Modern solutions allow you to quickly and easily solve this problem.


On the example of the transition from a monolithic application to microservices, I will try to show all the pros and cons of each approach. The article is divided into three parts:


  • In the first part, we will consider a monolithic application on the Dash web framework, i.e. data generation and display will be in one place.
  • The second part is devoted to the decomposition of a monolithic application into microservices, i.e. One service will be engaged in data generation, another will be displayed, and communication between them will be established through the Kafka message broker.
  • In the third part, microservices will be "packed" in Docker containers.

The final application will look like the diagram below.



Introduction


In order to better understand the example, it is desirable to have at least basic knowledge in Kafka and Docker, I will give some useful courses and articles in my opinion:



github, .


1.


Dash (Plotly), , local_app. .


monolith.py
import datetime

import dash
import dash_core_components as dcc
import dash_html_components as html
import plotly
from dash.dependencies import Input, Output

# pip install pyorbital
from pyorbital.orbital import Orbital
satellite = Orbital('TERRA')

external_stylesheets = ['https://codepen.io/chriddyp/pen/bWLwgP.css']

app = dash.Dash(__name__, external_stylesheets=external_stylesheets)
app.layout = html.Div(
    html.Div([
        html.H4('TERRA Satellite Live Feed'),
        html.Div(id='live-update-text'),
        dcc.Graph(id='live-update-graph'),
        dcc.Interval(
            id='interval-component',
            interval=1*1000, # in milliseconds
            n_intervals=0
        )
    ])
)

@app.callback(Output('live-update-text', 'children'),
              [Input('interval-component', 'n_intervals')])
def update_metrics(n):
    lon, lat, alt = satellite.get_lonlatalt(datetime.datetime.now())
    style = {'padding': '5px', 'fontSize': '16px'}
    return [
        html.Span('Longitude: {0:.2f}'.format(lon), style=style),
        html.Span('Latitude: {0:.2f}'.format(lat), style=style),
        html.Span('Altitude: {0:0.2f}'.format(alt), style=style)
    ]

# Multiple components can update everytime interval gets fired.
@app.callback(Output('live-update-graph', 'figure'),
              [Input('interval-component', 'n_intervals')])
def update_graph_live(n):
    satellite = Orbital('TERRA')
    data = {
        'time': [],
        'Latitude': [],
        'Longitude': [],
        'Altitude': []
    }

    # Collect some data
    for i in range(180):
        time = datetime.datetime.now() - datetime.timedelta(seconds=i*20)
        lon, lat, alt = satellite.get_lonlatalt(
            time
        )
        data['Longitude'].append(lon)
        data['Latitude'].append(lat)
        data['Altitude'].append(alt)
        data['time'].append(time)

    # Create the graph with subplots
    fig = plotly.tools.make_subplots(rows=2, cols=1, vertical_spacing=0.2)
    fig['layout']['margin'] = {
        'l': 30, 'r': 10, 'b': 30, 't': 10
    }
    fig['layout']['legend'] = {'x': 0, 'y': 1, 'xanchor': 'left'}

    fig.append_trace({
        'x': data['time'],
        'y': data['Altitude'],
        'name': 'Altitude',
        'mode': 'lines+markers',
        'type': 'scatter'
    }, 1, 1)
    fig.append_trace({
        'x': data['Longitude'],
        'y': data['Latitude'],
        'text': data['time'],
        'name': 'Longitude vs Latitude',
        'mode': 'lines+markers',
        'type': 'scatter'
    }, 2, 1)

    return fig

if __name__ == '__main__':
    app.run_server(debug=True)

- . python pyorbital, ( - Terra (EOS AM-1)). Dash (Plotly) : 127.0.0.1:8050.


, โ€” altitude, longitude latitude (, ), .. , , โ€” .



( () )


:


  • , .

:


  • , , , , .. ( , ).
  • / . , , , , .

2.


local_microservices_app, Kafka Docker, ( github Stephane Maarek)


, โ€” backend (producer.py), Kafka, โ€” frontend (consumer.py, graph_display.py) Kafka .


backend:
Producer ( ) Kafka ( 20 )


producer.py
from time import sleep
import datetime
from confluent_kafka import Producer
import json
from pyorbital.orbital import Orbital

satellite = Orbital('TERRA')

topic = 'test_topic'

producer = Producer({'bootstrap.servers': 'localhost:9092'})

def acked(err, msg):
    if err is not None:
        print("Failed to deliver message: {}".format(err))
    else:
        print("Produced record to topic {} partition [{}] @ offset {}"
              .format(msg.topic(), msg.partition(), msg.offset()))

# send data every one second
while True:
    time = datetime.datetime.now()
    lon, lat, alt = satellite.get_lonlatalt(time)
    record_value = json.dumps({'lon':lon, 'lat': lat, 'alt': alt, 'time': str(time)})

    producer.produce(topic, key=None, value=record_value, on_delivery=acked)

    producer.poll()
    sleep(1)

frontend:
Consumer ( ) MyKafkaConnect consumer.py, Kafka 180 ( , ) . Kafka.


(monolith.py) , , , MyKafkaConnect, .


consumer.py
import datetime
from confluent_kafka import Consumer, TopicPartition
import json
from collections import deque
from time import sleep

class MyKafkaConnect:
    def __init__(self, topic, group, que_len=180):
        self.topic = topic

        self.conf = {
            'bootstrap.servers': 'localhost:9092',
            'group.id': group,
            'enable.auto.commit': True,
        }

        # the application needs a maximum of 180 data units
        self.data = {
            'time': deque(maxlen=que_len),
            'Latitude': deque(maxlen=que_len),
            'Longitude': deque(maxlen=que_len),
            'Altitude': deque(maxlen=que_len)
        }

        consumer = Consumer(self.conf)
        consumer.subscribe([self.topic])

        # download first 180 messges
        self.partition = TopicPartition(topic=self.topic, partition=0)
        low_offset, high_offset = consumer.get_watermark_offsets(self.partition)

        # move offset back on 180 messages
        if high_offset > que_len:
            self.partition.offset = high_offset - que_len
        else:
            self.partition.offset = low_offset

        # set the moved offset to consumer
        consumer.assign([self.partition])

        self.__update_que(consumer)

    # https://docs.confluent.io/current/clients/python.html#delivery-guarantees
    def __update_que(self, consumer):
        try:
            while True:
                msg = consumer.poll(timeout=0.1)
                if msg is None:
                    break
                elif msg.error():
                    print('error: {}'.format(msg.error()))
                    break
                else:
                    record_value = msg.value()
                    json_data = json.loads(record_value.decode('utf-8'))

                    self.data['Longitude'].append(json_data['lon'])
                    self.data['Latitude'].append(json_data['lat'])
                    self.data['Altitude'].append(json_data['alt'])
                    self.data['time'].append(datetime.datetime.strptime(json_data['time'], '%Y-%m-%d %H:%M:%S.%f'))

                    # save local offset
                    self.partition.offset += 1          
        finally:
            # Close down consumer to commit final offsets.
            # It may take some time, that why I save offset locally
            consumer.close()

    def get_graph_data(self):
        consumer = Consumer(self.conf)
        consumer.subscribe([self.topic])  

        # update low and high offsets (don't work without it)
        consumer.get_watermark_offsets(self.partition)

        # set local offset
        consumer.assign([self.partition])

        self.__update_que(consumer)

        # convert data to compatible format
        o = {key: list(value) for key, value in self.data.items()}
        return o        

    def get_last(self):
        lon = self.data['Longitude'][-1]
        lat = self.data['Latitude'][-1]
        alt = self.data['Altitude'][-1]

        return lon, lat, alt

# for test
if __name__ == '__main__':
    connect = MyKafkaConnect(topic='test_topic', group='test_group')

    while True:        
        test = connect.get_graph_data()

        print('number of messages:', len(test['time']), 
            'unique:', len(set(test['time'])), 
            'time:', test['time'][-1].second)

        sleep(0.1)

graph_display.py
import datetime

import dash
import dash_core_components as dcc
import dash_html_components as html
import plotly
from dash.dependencies import Input, Output

from consumer import MyKafkaConnect

connect = MyKafkaConnect(topic='test_topic', group='test_group')

external_stylesheets = ['https://codepen.io/chriddyp/pen/bWLwgP.css']

app = dash.Dash(__name__, external_stylesheets=external_stylesheets)
app.layout = html.Div(
    html.Div([
        html.H4('TERRA Satellite Live Feed'),
        html.Div(id='live-update-text'),
        dcc.Graph(id='live-update-graph'),
        dcc.Interval(
            id='interval-component',
            interval=1*1000, # in milliseconds
            n_intervals=0
        )
    ])
)

@app.callback(Output('live-update-text', 'children'),
              [Input('interval-component', 'n_intervals')])
def update_metrics(n):
    lon, lat, alt = connect.get_last()

    print('update metrics')

    style = {'padding': '5px', 'fontSize': '16px'}
    return [
        html.Span('Longitude: {0:.2f}'.format(lon), style=style),
        html.Span('Latitude: {0:.2f}'.format(lat), style=style),
        html.Span('Altitude: {0:0.2f}'.format(alt), style=style)
    ]

# Multiple components can update everytime interval gets fired.
@app.callback(Output('live-update-graph', 'figure'),
              [Input('interval-component', 'n_intervals')])
def update_graph_live(n):
    # Collect some data
    data = connect.get_graph_data()
    print('Update graph, data units:', len(data['time']))

    # Create the graph with subplots
    fig = plotly.tools.make_subplots(rows=2, cols=1, vertical_spacing=0.2)
    fig['layout']['margin'] = {
        'l': 30, 'r': 10, 'b': 30, 't': 10
    }
    fig['layout']['legend'] = {'x': 0, 'y': 1, 'xanchor': 'left'}

    fig.append_trace({
        'x': data['time'],
        'y': data['Altitude'],
        'name': 'Altitude',
        'mode': 'lines+markers',
        'type': 'scatter'
    }, 1, 1)
    fig.append_trace({
        'x': data['Longitude'],
        'y': data['Latitude'],
        'text': data['time'],
        'name': 'Longitude vs Latitude',
        'mode': 'lines+markers',
        'type': 'scatter'
    }, 2, 1)

    return fig

if __name__ == '__main__':
    app.run_server(debug=True)

:


  • , backend , , ( , , ).
  • / , .. , backend , .
  • .

:


  • , Kafka, , .


( backend , )


3. Docker


docker_microservices_app. , backend frontend Docker. backend ( - Aura (EOS CH-1) Aqua (EOS PM-1)).


Docker
Source code backend / Dockerfile (packaged by producer.py)
FROM python:3.7

RUN python -m pip install confluent-kafka

RUN python -m pip install pyorbital

WORKDIR /app

COPY producer.py ./

CMD ["python", "producer.py"]

frontend/Dockerfile ( consumer.py graph_display.py)
FROM python:3.7

RUN python -m pip install confluent-kafka

RUN python -m pip install dash plotly

WORKDIR /app

COPY consumer.py graph_display.py ./

CMD ["python", "graph_display.py"]

docker-compose.yml ( backend, frontend Kafka)
version: '2.1'

# Stephane Maarek's kafka-docker
# https://github.com/simplesteph/kafka-stack-docker-compose/blob/master/zk-single-kafka-single.yml
services:
  zoo1:
    image: zookeeper:3.4.9
    hostname: zoo1
    ports:
      - "2181:2181"
    restart: unless-stopped
    environment:
        ZOO_MY_ID: 1
        ZOO_PORT: 2181
        ZOO_SERVERS: server.1=zoo1:2888:3888
    volumes:
      - ./zk-single-kafka-single/zoo1/data:/data
      - ./zk-single-kafka-single/zoo1/datalog:/datalog

  kafka1:
    image: confluentinc/cp-kafka:5.5.0
    hostname: kafka1
    ports:
      - "9092:9092"
    restart: unless-stopped
    environment:
      KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka1:19092,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
      KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
      KAFKA_BROKER_ID: 1
      KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    volumes:
      - ./zk-single-kafka-single/kafka1/data:/var/lib/kafka/data
    depends_on:
      - zoo1

  backend_terra:
    build:
      context: ./backend
    restart: unless-stopped
    environment:
      BOOTSTRAP_SERVERS: "kafka1:19092"
      TOPIC: "terra_topic"
      SATELLITE: "TERRA"
    depends_on:
      - kafka1

  backend_aqua:
    build:
      context: ./backend
    restart: unless-stopped
    environment:
      BOOTSTRAP_SERVERS: "kafka1:19092"
      TOPIC: "aqua_topic"
      SATELLITE: "AQUA"
    depends_on:
      - kafka1

  backend_aura:
    build:
      context: ./backend
    restart: unless-stopped
    environment:
      BOOTSTRAP_SERVERS: "kafka1:19092"
      TOPIC: "aura_topic"
      SATELLITE: "AURA"
    depends_on:
      - kafka1

  frontend:
    build:
      context: ./frontend
    ports:
      - "8050:8050"
    restart: unless-stopped
    environment:
      BOOTSTRAP_SERVERS: "kafka1:19092"
    depends_on:
      - backend_terra
      - backend_aqua
      - backend_aura

Python
backend/producer.py
from time import sleep
import datetime
from confluent_kafka import Producer
import json
from pyorbital.orbital import Orbital
import os

topic = os.environ['TOPIC'] 
bootstrap_servers = os.environ['BOOTSTRAP_SERVERS'] 
s_name = os.environ['SATELLITE']

satellite = Orbital(s_name)

producer = Producer({'bootstrap.servers': bootstrap_servers})

def acked(err, msg):
    if err is not None:
        print("Failed to deliver message: {}".format(err))
    else:
        print("Produced record to topic {} partition [{}] @ offset {}"
              .format(msg.topic(), msg.partition(), msg.offset()))

# send data every one second
while True:
    time = datetime.datetime.now()
    lon, lat, alt = satellite.get_lonlatalt(time)
    record_value = json.dumps({'lon':lon, 'lat': lat, 'alt': alt, 'time': str(time)})

    producer.produce(topic, key=None, value=record_value, on_delivery=acked)

    producer.poll()
    sleep(1)

frontend/consumer.py ( )
import datetime
from confluent_kafka import Consumer, TopicPartition
import json
from collections import deque
from time import sleep

class MyKafkaConnect:
    def __init__(self, topic, group, que_len=180):
        self.topic = topic

        self.conf = {
            'bootstrap.servers': 'localhost:9092',
            'group.id': group,
            'enable.auto.commit': True,
        }

        # the application needs a maximum of 180 data units
        self.data = {
            'time': deque(maxlen=que_len),
            'Latitude': deque(maxlen=que_len),
            'Longitude': deque(maxlen=que_len),
            'Altitude': deque(maxlen=que_len)
        }

        consumer = Consumer(self.conf)
        consumer.subscribe([self.topic])

        # download first 180 messges
        self.partition = TopicPartition(topic=self.topic, partition=0)
        low_offset, high_offset = consumer.get_watermark_offsets(self.partition)

        # move offset back on 180 messages
        if high_offset > que_len:
            self.partition.offset = high_offset - que_len
        else:
            self.partition.offset = low_offset

        # set the moved offset to consumer
        consumer.assign([self.partition])

        self.__update_que(consumer)

    # https://docs.confluent.io/current/clients/python.html#delivery-guarantees
    def __update_que(self, consumer):
        try:
            while True:
                msg = consumer.poll(timeout=0.1)
                if msg is None:
                    break
                elif msg.error():
                    print('error: {}'.format(msg.error()))
                    break
                else:
                    record_value = msg.value()
                    json_data = json.loads(record_value.decode('utf-8'))

                    self.data['Longitude'].append(json_data['lon'])
                    self.data['Latitude'].append(json_data['lat'])
                    self.data['Altitude'].append(json_data['alt'])
                    self.data['time'].append(datetime.datetime.strptime(json_data['time'], '%Y-%m-%d %H:%M:%S.%f'))

                    # save local offset
                    self.partition.offset += 1          
        finally:
            # Close down consumer to commit final offsets.
            # It may take some time, that why I save offset locally
            consumer.close()

    def get_graph_data(self):
        consumer = Consumer(self.conf)
        consumer.subscribe([self.topic])  

        # update low and high offsets (don't work without it)
        consumer.get_watermark_offsets(self.partition)

        # set local offset
        consumer.assign([self.partition])

        self.__update_que(consumer)

        # convert data to compatible format
        o = {key: list(value) for key, value in self.data.items()}
        return o        

    def get_last(self):
        lon = self.data['Longitude'][-1]
        lat = self.data['Latitude'][-1]
        alt = self.data['Altitude'][-1]

        return lon, lat, alt

# for test
if __name__ == '__main__':
    connect = MyKafkaConnect(topic='test_topic', group='test_group')

    while True:        
        test = connect.get_graph_data()

        print('number of messages:', len(test['time']), 
            'unique:', len(set(test['time'])), 
            'time:', test['time'][-1].second)

        sleep(0.1)

frontend/graph_display.py
import datetime

import dash
import dash_core_components as dcc
import dash_html_components as html
import plotly
from dash.dependencies import Input, Output

from consumer import MyKafkaConnect

external_stylesheets = ['https://codepen.io/chriddyp/pen/bWLwgP.css']

app = dash.Dash(__name__, external_stylesheets=external_stylesheets)
app.layout = html.Div(
    html.Div([
        html.Div([
            html.H4('TERRA Satellite Live Feed'),
            html.Div(id='terra-text'),
            dcc.Graph(id='terra-graph')
            ], className="four columns"),
        html.Div([
            html.H4('AQUA Satellite Live Feed'),
            html.Div(id='aqua-text'),
            dcc.Graph(id='aqua-graph')
            ], className="four columns"),
        html.Div([
            html.H4('AURA Satellite Live Feed'),
            html.Div(id='aura-text'),
            dcc.Graph(id='aura-graph')
            ], className="four columns"),
        dcc.Interval(
            id='interval-component',
            interval=1*1000, # in milliseconds
            n_intervals=0
        )
    ], className="row")
)

def create_graphs(topic, live_update_text, live_update_graph):

    connect = MyKafkaConnect(topic=topic, group='test_group')

    @app.callback(Output(live_update_text, 'children'),
                  [Input('interval-component', 'n_intervals')])
    def update_metrics_terra(n):
        lon, lat, alt = connect.get_last()

        print('update metrics')

        style = {'padding': '5px', 'fontSize': '15px'}
        return [
            html.Span('Longitude: {0:.2f}'.format(lon), style=style),
            html.Span('Latitude: {0:.2f}'.format(lat), style=style),
            html.Span('Altitude: {0:0.2f}'.format(alt), style=style)
        ]

    # Multiple components can update everytime interval gets fired.
    @app.callback(Output(live_update_graph, 'figure'),
                  [Input('interval-component', 'n_intervals')])
    def update_graph_live_terra(n):
        # Collect some data
        data = connect.get_graph_data()
        print('Update graph, data units:', len(data['time']))

        # Create the graph with subplots
        fig = plotly.tools.make_subplots(rows=2, cols=1, vertical_spacing=0.2)
        fig['layout']['margin'] = {
            'l': 30, 'r': 10, 'b': 30, 't': 10
        }
        fig['layout']['legend'] = {'x': 0, 'y': 1, 'xanchor': 'left'}

        fig.append_trace({
            'x': data['time'],
            'y': data['Altitude'],
            'name': 'Altitude',
            'mode': 'lines+markers',
            'type': 'scatter'
        }, 1, 1)
        fig.append_trace({
            'x': data['Longitude'],
            'y': data['Latitude'],
            'text': data['time'],
            'name': 'Longitude vs Latitude',
            'mode': 'lines+markers',
            'type': 'scatter'
        }, 2, 1)

        return fig

create_graphs('terra_topic', 'terra-text', 'terra-graph')
create_graphs('aqua_topic', 'aqua-text', 'aqua-graph')
create_graphs('aura_topic', 'aura-text', 'aura-graph')

if __name__ == '__main__':
    app.run_server(
        host='0.0.0.0',
        port=8050,
        debug=True)

( , ):


  • , .. ( Docker- ).
  • .
  • ,

:


  • .. Docker , .

, backend , ( ), , , , .



( , , )



, ( ) . , Kafka , , , , .. , Kafka connectors, - .


Additional related links:
Python + Kafka =? / Nikolay Saskovets / bitnet [Python Meetup 09/14/2019 ]
Nikolay Saskovets, Building microservice systems using Kafka


All Articles