Die Entwicklung einer monolithischen Anwendung in Mikrodiensten

Wenn Sie schnell und kostengünstig etwas tun müssen, denken wir in der Regel nicht an die Fehlertoleranz und Skalierbarkeit unserer Anwendung, was nach einiger Zeit zwangsläufig zu Schmerzen führt. Mit modernen Lösungen können Sie dieses Problem schnell und einfach lösen.


Am Beispiel des Übergangs von einer monolithischen Anwendung zu Microservices werde ich versuchen, alle Vor- und Nachteile jedes Ansatzes aufzuzeigen. Der Artikel ist in drei Teile gegliedert:


  • Im ersten Teil werden wir eine monolithische Anwendung auf dem Dash-Webframework betrachten, d. H. Die Datenerzeugung und -anzeige erfolgt an einem Ort.
  • Der zweite Teil ist der Zersetzung einer monolithischen Anwendung in Mikrodienste gewidmet, d.h. Ein Dienst wird an der Datengenerierung beteiligt sein, ein anderer wird angezeigt und die Kommunikation zwischen ihnen wird über den Kafka-Nachrichtenbroker hergestellt.
  • Im dritten Teil werden Microservices in Docker-Containern "verpackt".

Die endgültige Anwendung sieht wie in der folgenden Abbildung aus.



Einführung


Um das Beispiel besser zu verstehen, ist es wünschenswert, mindestens Grundkenntnisse in Kafka und Docker zu haben. Ich werde meiner Meinung nach einige nützliche Kurse und Artikel geben:



github, .


1.


Dash (Plotly), , local_app. .


monolith.py
import datetime

import dash
import dash_core_components as dcc
import dash_html_components as html
import plotly
from dash.dependencies import Input, Output

# pip install pyorbital
from pyorbital.orbital import Orbital
satellite = Orbital('TERRA')

external_stylesheets = ['https://codepen.io/chriddyp/pen/bWLwgP.css']

app = dash.Dash(__name__, external_stylesheets=external_stylesheets)
app.layout = html.Div(
    html.Div([
        html.H4('TERRA Satellite Live Feed'),
        html.Div(id='live-update-text'),
        dcc.Graph(id='live-update-graph'),
        dcc.Interval(
            id='interval-component',
            interval=1*1000, # in milliseconds
            n_intervals=0
        )
    ])
)

@app.callback(Output('live-update-text', 'children'),
              [Input('interval-component', 'n_intervals')])
def update_metrics(n):
    lon, lat, alt = satellite.get_lonlatalt(datetime.datetime.now())
    style = {'padding': '5px', 'fontSize': '16px'}
    return [
        html.Span('Longitude: {0:.2f}'.format(lon), style=style),
        html.Span('Latitude: {0:.2f}'.format(lat), style=style),
        html.Span('Altitude: {0:0.2f}'.format(alt), style=style)
    ]

# Multiple components can update everytime interval gets fired.
@app.callback(Output('live-update-graph', 'figure'),
              [Input('interval-component', 'n_intervals')])
def update_graph_live(n):
    satellite = Orbital('TERRA')
    data = {
        'time': [],
        'Latitude': [],
        'Longitude': [],
        'Altitude': []
    }

    # Collect some data
    for i in range(180):
        time = datetime.datetime.now() - datetime.timedelta(seconds=i*20)
        lon, lat, alt = satellite.get_lonlatalt(
            time
        )
        data['Longitude'].append(lon)
        data['Latitude'].append(lat)
        data['Altitude'].append(alt)
        data['time'].append(time)

    # Create the graph with subplots
    fig = plotly.tools.make_subplots(rows=2, cols=1, vertical_spacing=0.2)
    fig['layout']['margin'] = {
        'l': 30, 'r': 10, 'b': 30, 't': 10
    }
    fig['layout']['legend'] = {'x': 0, 'y': 1, 'xanchor': 'left'}

    fig.append_trace({
        'x': data['time'],
        'y': data['Altitude'],
        'name': 'Altitude',
        'mode': 'lines+markers',
        'type': 'scatter'
    }, 1, 1)
    fig.append_trace({
        'x': data['Longitude'],
        'y': data['Latitude'],
        'text': data['time'],
        'name': 'Longitude vs Latitude',
        'mode': 'lines+markers',
        'type': 'scatter'
    }, 2, 1)

    return fig

if __name__ == '__main__':
    app.run_server(debug=True)

- . python pyorbital, ( - Terra (EOS AM-1)). Dash (Plotly) : 127.0.0.1:8050.


, — altitude, longitude latitude (, ), .. , , — .



( () )


:


  • , .

:


  • , , , , .. ( , ).
  • / . , , , , .

2.


local_microservices_app, Kafka Docker, ( github Stephane Maarek)


, — backend (producer.py), Kafka, — frontend (consumer.py, graph_display.py) Kafka .


backend:
Producer ( ) Kafka ( 20 )


producer.py
from time import sleep
import datetime
from confluent_kafka import Producer
import json
from pyorbital.orbital import Orbital

satellite = Orbital('TERRA')

topic = 'test_topic'

producer = Producer({'bootstrap.servers': 'localhost:9092'})

def acked(err, msg):
    if err is not None:
        print("Failed to deliver message: {}".format(err))
    else:
        print("Produced record to topic {} partition [{}] @ offset {}"
              .format(msg.topic(), msg.partition(), msg.offset()))

# send data every one second
while True:
    time = datetime.datetime.now()
    lon, lat, alt = satellite.get_lonlatalt(time)
    record_value = json.dumps({'lon':lon, 'lat': lat, 'alt': alt, 'time': str(time)})

    producer.produce(topic, key=None, value=record_value, on_delivery=acked)

    producer.poll()
    sleep(1)

frontend:
Consumer ( ) MyKafkaConnect consumer.py, Kafka 180 ( , ) . Kafka.


(monolith.py) , , , MyKafkaConnect, .


consumer.py
import datetime
from confluent_kafka import Consumer, TopicPartition
import json
from collections import deque
from time import sleep

class MyKafkaConnect:
    def __init__(self, topic, group, que_len=180):
        self.topic = topic

        self.conf = {
            'bootstrap.servers': 'localhost:9092',
            'group.id': group,
            'enable.auto.commit': True,
        }

        # the application needs a maximum of 180 data units
        self.data = {
            'time': deque(maxlen=que_len),
            'Latitude': deque(maxlen=que_len),
            'Longitude': deque(maxlen=que_len),
            'Altitude': deque(maxlen=que_len)
        }

        consumer = Consumer(self.conf)
        consumer.subscribe([self.topic])

        # download first 180 messges
        self.partition = TopicPartition(topic=self.topic, partition=0)
        low_offset, high_offset = consumer.get_watermark_offsets(self.partition)

        # move offset back on 180 messages
        if high_offset > que_len:
            self.partition.offset = high_offset - que_len
        else:
            self.partition.offset = low_offset

        # set the moved offset to consumer
        consumer.assign([self.partition])

        self.__update_que(consumer)

    # https://docs.confluent.io/current/clients/python.html#delivery-guarantees
    def __update_que(self, consumer):
        try:
            while True:
                msg = consumer.poll(timeout=0.1)
                if msg is None:
                    break
                elif msg.error():
                    print('error: {}'.format(msg.error()))
                    break
                else:
                    record_value = msg.value()
                    json_data = json.loads(record_value.decode('utf-8'))

                    self.data['Longitude'].append(json_data['lon'])
                    self.data['Latitude'].append(json_data['lat'])
                    self.data['Altitude'].append(json_data['alt'])
                    self.data['time'].append(datetime.datetime.strptime(json_data['time'], '%Y-%m-%d %H:%M:%S.%f'))

                    # save local offset
                    self.partition.offset += 1          
        finally:
            # Close down consumer to commit final offsets.
            # It may take some time, that why I save offset locally
            consumer.close()

    def get_graph_data(self):
        consumer = Consumer(self.conf)
        consumer.subscribe([self.topic])  

        # update low and high offsets (don't work without it)
        consumer.get_watermark_offsets(self.partition)

        # set local offset
        consumer.assign([self.partition])

        self.__update_que(consumer)

        # convert data to compatible format
        o = {key: list(value) for key, value in self.data.items()}
        return o        

    def get_last(self):
        lon = self.data['Longitude'][-1]
        lat = self.data['Latitude'][-1]
        alt = self.data['Altitude'][-1]

        return lon, lat, alt

# for test
if __name__ == '__main__':
    connect = MyKafkaConnect(topic='test_topic', group='test_group')

    while True:        
        test = connect.get_graph_data()

        print('number of messages:', len(test['time']), 
            'unique:', len(set(test['time'])), 
            'time:', test['time'][-1].second)

        sleep(0.1)

graph_display.py
import datetime

import dash
import dash_core_components as dcc
import dash_html_components as html
import plotly
from dash.dependencies import Input, Output

from consumer import MyKafkaConnect

connect = MyKafkaConnect(topic='test_topic', group='test_group')

external_stylesheets = ['https://codepen.io/chriddyp/pen/bWLwgP.css']

app = dash.Dash(__name__, external_stylesheets=external_stylesheets)
app.layout = html.Div(
    html.Div([
        html.H4('TERRA Satellite Live Feed'),
        html.Div(id='live-update-text'),
        dcc.Graph(id='live-update-graph'),
        dcc.Interval(
            id='interval-component',
            interval=1*1000, # in milliseconds
            n_intervals=0
        )
    ])
)

@app.callback(Output('live-update-text', 'children'),
              [Input('interval-component', 'n_intervals')])
def update_metrics(n):
    lon, lat, alt = connect.get_last()

    print('update metrics')

    style = {'padding': '5px', 'fontSize': '16px'}
    return [
        html.Span('Longitude: {0:.2f}'.format(lon), style=style),
        html.Span('Latitude: {0:.2f}'.format(lat), style=style),
        html.Span('Altitude: {0:0.2f}'.format(alt), style=style)
    ]

# Multiple components can update everytime interval gets fired.
@app.callback(Output('live-update-graph', 'figure'),
              [Input('interval-component', 'n_intervals')])
def update_graph_live(n):
    # Collect some data
    data = connect.get_graph_data()
    print('Update graph, data units:', len(data['time']))

    # Create the graph with subplots
    fig = plotly.tools.make_subplots(rows=2, cols=1, vertical_spacing=0.2)
    fig['layout']['margin'] = {
        'l': 30, 'r': 10, 'b': 30, 't': 10
    }
    fig['layout']['legend'] = {'x': 0, 'y': 1, 'xanchor': 'left'}

    fig.append_trace({
        'x': data['time'],
        'y': data['Altitude'],
        'name': 'Altitude',
        'mode': 'lines+markers',
        'type': 'scatter'
    }, 1, 1)
    fig.append_trace({
        'x': data['Longitude'],
        'y': data['Latitude'],
        'text': data['time'],
        'name': 'Longitude vs Latitude',
        'mode': 'lines+markers',
        'type': 'scatter'
    }, 2, 1)

    return fig

if __name__ == '__main__':
    app.run_server(debug=True)

:


  • , backend , , ( , , ).
  • / , .. , backend , .
  • .

:


  • , Kafka, , .


( backend , )


3. Docker


docker_microservices_app. , backend frontend Docker. backend ( - Aura (EOS CH-1) Aqua (EOS PM-1)).


Docker
Quellcode-Backend / Dockerfile (gepackt von Producer.py)
FROM python:3.7

RUN python -m pip install confluent-kafka

RUN python -m pip install pyorbital

WORKDIR /app

COPY producer.py ./

CMD ["python", "producer.py"]

frontend/Dockerfile ( consumer.py graph_display.py)
FROM python:3.7

RUN python -m pip install confluent-kafka

RUN python -m pip install dash plotly

WORKDIR /app

COPY consumer.py graph_display.py ./

CMD ["python", "graph_display.py"]

docker-compose.yml ( backend, frontend Kafka)
version: '2.1'

# Stephane Maarek's kafka-docker
# https://github.com/simplesteph/kafka-stack-docker-compose/blob/master/zk-single-kafka-single.yml
services:
  zoo1:
    image: zookeeper:3.4.9
    hostname: zoo1
    ports:
      - "2181:2181"
    restart: unless-stopped
    environment:
        ZOO_MY_ID: 1
        ZOO_PORT: 2181
        ZOO_SERVERS: server.1=zoo1:2888:3888
    volumes:
      - ./zk-single-kafka-single/zoo1/data:/data
      - ./zk-single-kafka-single/zoo1/datalog:/datalog

  kafka1:
    image: confluentinc/cp-kafka:5.5.0
    hostname: kafka1
    ports:
      - "9092:9092"
    restart: unless-stopped
    environment:
      KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka1:19092,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
      KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
      KAFKA_BROKER_ID: 1
      KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    volumes:
      - ./zk-single-kafka-single/kafka1/data:/var/lib/kafka/data
    depends_on:
      - zoo1

  backend_terra:
    build:
      context: ./backend
    restart: unless-stopped
    environment:
      BOOTSTRAP_SERVERS: "kafka1:19092"
      TOPIC: "terra_topic"
      SATELLITE: "TERRA"
    depends_on:
      - kafka1

  backend_aqua:
    build:
      context: ./backend
    restart: unless-stopped
    environment:
      BOOTSTRAP_SERVERS: "kafka1:19092"
      TOPIC: "aqua_topic"
      SATELLITE: "AQUA"
    depends_on:
      - kafka1

  backend_aura:
    build:
      context: ./backend
    restart: unless-stopped
    environment:
      BOOTSTRAP_SERVERS: "kafka1:19092"
      TOPIC: "aura_topic"
      SATELLITE: "AURA"
    depends_on:
      - kafka1

  frontend:
    build:
      context: ./frontend
    ports:
      - "8050:8050"
    restart: unless-stopped
    environment:
      BOOTSTRAP_SERVERS: "kafka1:19092"
    depends_on:
      - backend_terra
      - backend_aqua
      - backend_aura

Python
backend/producer.py
from time import sleep
import datetime
from confluent_kafka import Producer
import json
from pyorbital.orbital import Orbital
import os

topic = os.environ['TOPIC'] 
bootstrap_servers = os.environ['BOOTSTRAP_SERVERS'] 
s_name = os.environ['SATELLITE']

satellite = Orbital(s_name)

producer = Producer({'bootstrap.servers': bootstrap_servers})

def acked(err, msg):
    if err is not None:
        print("Failed to deliver message: {}".format(err))
    else:
        print("Produced record to topic {} partition [{}] @ offset {}"
              .format(msg.topic(), msg.partition(), msg.offset()))

# send data every one second
while True:
    time = datetime.datetime.now()
    lon, lat, alt = satellite.get_lonlatalt(time)
    record_value = json.dumps({'lon':lon, 'lat': lat, 'alt': alt, 'time': str(time)})

    producer.produce(topic, key=None, value=record_value, on_delivery=acked)

    producer.poll()
    sleep(1)

frontend/consumer.py ( )
import datetime
from confluent_kafka import Consumer, TopicPartition
import json
from collections import deque
from time import sleep

class MyKafkaConnect:
    def __init__(self, topic, group, que_len=180):
        self.topic = topic

        self.conf = {
            'bootstrap.servers': 'localhost:9092',
            'group.id': group,
            'enable.auto.commit': True,
        }

        # the application needs a maximum of 180 data units
        self.data = {
            'time': deque(maxlen=que_len),
            'Latitude': deque(maxlen=que_len),
            'Longitude': deque(maxlen=que_len),
            'Altitude': deque(maxlen=que_len)
        }

        consumer = Consumer(self.conf)
        consumer.subscribe([self.topic])

        # download first 180 messges
        self.partition = TopicPartition(topic=self.topic, partition=0)
        low_offset, high_offset = consumer.get_watermark_offsets(self.partition)

        # move offset back on 180 messages
        if high_offset > que_len:
            self.partition.offset = high_offset - que_len
        else:
            self.partition.offset = low_offset

        # set the moved offset to consumer
        consumer.assign([self.partition])

        self.__update_que(consumer)

    # https://docs.confluent.io/current/clients/python.html#delivery-guarantees
    def __update_que(self, consumer):
        try:
            while True:
                msg = consumer.poll(timeout=0.1)
                if msg is None:
                    break
                elif msg.error():
                    print('error: {}'.format(msg.error()))
                    break
                else:
                    record_value = msg.value()
                    json_data = json.loads(record_value.decode('utf-8'))

                    self.data['Longitude'].append(json_data['lon'])
                    self.data['Latitude'].append(json_data['lat'])
                    self.data['Altitude'].append(json_data['alt'])
                    self.data['time'].append(datetime.datetime.strptime(json_data['time'], '%Y-%m-%d %H:%M:%S.%f'))

                    # save local offset
                    self.partition.offset += 1          
        finally:
            # Close down consumer to commit final offsets.
            # It may take some time, that why I save offset locally
            consumer.close()

    def get_graph_data(self):
        consumer = Consumer(self.conf)
        consumer.subscribe([self.topic])  

        # update low and high offsets (don't work without it)
        consumer.get_watermark_offsets(self.partition)

        # set local offset
        consumer.assign([self.partition])

        self.__update_que(consumer)

        # convert data to compatible format
        o = {key: list(value) for key, value in self.data.items()}
        return o        

    def get_last(self):
        lon = self.data['Longitude'][-1]
        lat = self.data['Latitude'][-1]
        alt = self.data['Altitude'][-1]

        return lon, lat, alt

# for test
if __name__ == '__main__':
    connect = MyKafkaConnect(topic='test_topic', group='test_group')

    while True:        
        test = connect.get_graph_data()

        print('number of messages:', len(test['time']), 
            'unique:', len(set(test['time'])), 
            'time:', test['time'][-1].second)

        sleep(0.1)

frontend/graph_display.py
import datetime

import dash
import dash_core_components as dcc
import dash_html_components as html
import plotly
from dash.dependencies import Input, Output

from consumer import MyKafkaConnect

external_stylesheets = ['https://codepen.io/chriddyp/pen/bWLwgP.css']

app = dash.Dash(__name__, external_stylesheets=external_stylesheets)
app.layout = html.Div(
    html.Div([
        html.Div([
            html.H4('TERRA Satellite Live Feed'),
            html.Div(id='terra-text'),
            dcc.Graph(id='terra-graph')
            ], className="four columns"),
        html.Div([
            html.H4('AQUA Satellite Live Feed'),
            html.Div(id='aqua-text'),
            dcc.Graph(id='aqua-graph')
            ], className="four columns"),
        html.Div([
            html.H4('AURA Satellite Live Feed'),
            html.Div(id='aura-text'),
            dcc.Graph(id='aura-graph')
            ], className="four columns"),
        dcc.Interval(
            id='interval-component',
            interval=1*1000, # in milliseconds
            n_intervals=0
        )
    ], className="row")
)

def create_graphs(topic, live_update_text, live_update_graph):

    connect = MyKafkaConnect(topic=topic, group='test_group')

    @app.callback(Output(live_update_text, 'children'),
                  [Input('interval-component', 'n_intervals')])
    def update_metrics_terra(n):
        lon, lat, alt = connect.get_last()

        print('update metrics')

        style = {'padding': '5px', 'fontSize': '15px'}
        return [
            html.Span('Longitude: {0:.2f}'.format(lon), style=style),
            html.Span('Latitude: {0:.2f}'.format(lat), style=style),
            html.Span('Altitude: {0:0.2f}'.format(alt), style=style)
        ]

    # Multiple components can update everytime interval gets fired.
    @app.callback(Output(live_update_graph, 'figure'),
                  [Input('interval-component', 'n_intervals')])
    def update_graph_live_terra(n):
        # Collect some data
        data = connect.get_graph_data()
        print('Update graph, data units:', len(data['time']))

        # Create the graph with subplots
        fig = plotly.tools.make_subplots(rows=2, cols=1, vertical_spacing=0.2)
        fig['layout']['margin'] = {
            'l': 30, 'r': 10, 'b': 30, 't': 10
        }
        fig['layout']['legend'] = {'x': 0, 'y': 1, 'xanchor': 'left'}

        fig.append_trace({
            'x': data['time'],
            'y': data['Altitude'],
            'name': 'Altitude',
            'mode': 'lines+markers',
            'type': 'scatter'
        }, 1, 1)
        fig.append_trace({
            'x': data['Longitude'],
            'y': data['Latitude'],
            'text': data['time'],
            'name': 'Longitude vs Latitude',
            'mode': 'lines+markers',
            'type': 'scatter'
        }, 2, 1)

        return fig

create_graphs('terra_topic', 'terra-text', 'terra-graph')
create_graphs('aqua_topic', 'aqua-text', 'aqua-graph')
create_graphs('aura_topic', 'aura-text', 'aura-graph')

if __name__ == '__main__':
    app.run_server(
        host='0.0.0.0',
        port=8050,
        debug=True)

( , ):


  • , .. ( Docker- ).
  • .
  • ,

:


  • .. Docker , .

, backend , ( ), , , , .



( , , )



, ( ) . , Kafka , , , , .. , Kafka connectors, - .


Zusätzliche verwandte Links:
Python + Kafka =? / Nikolay Saskovets / bitnet [Python Meetup 14.09.2019 ]
Nikolay Saskovets, Aufbau von Microservice-Systemen mit Kafka


All Articles