Guide de développement du service backend Python

Bonjour, je m'appelle Alexander Vasin, je suis dĂ©veloppeur backend Ă  Edadil. L'idĂ©e de ce matĂ©riel a commencĂ© avec le fait que je voulais analyser le travail d'introduction ( Ya.Disk ) dans l'Ă©cole de dĂ©veloppement Yandex Backend . J'ai commencĂ© Ă  dĂ©crire toutes les subtilitĂ©s du choix de certaines technologies, la mĂ©thodologie de test ... Il s'est avĂ©rĂ© ne pas ĂȘtre du tout une analyse, mais un guide trĂšs dĂ©taillĂ© sur la façon d'Ă©crire des backends en Python. De l'idĂ©e initiale, il n'y avait que des exigences pour le service, sur l'exemple duquel il est pratique de dĂ©monter les outils et les technologies. En consĂ©quence, je me suis rĂ©veillĂ© sur cent mille caractĂšres. Il fallait tellement de choses pour tout examiner en dĂ©tail. Ainsi, le programme pour les 100 prochains kilo-octets: comment construire un backend de service, du choix des outils au dĂ©ploiement.



TL; DR: Voici un représentant GitHub avec application, et qui aime les (vrais) longs livres - s'il vous plaßt, sous cat.

Nous allons développer et tester le service API REST en Python, le placer dans un conteneur Docker léger et le déployer à l'aide d'Ansible.

Vous pouvez implĂ©menter le service API REST de diffĂ©rentes maniĂšres Ă  l'aide de diffĂ©rents outils. La solution dĂ©crite n'est pas la seule bonne, j'ai choisi la mise en Ɠuvre et les outils en fonction de mon expĂ©rience et de mes prĂ©fĂ©rences personnelles.


Qu'est-ce qu'on fait?


Imaginez qu'une boutique de cadeaux en ligne prévoit de lancer une action dans différentes régions. Pour qu'une stratégie de vente soit efficace, une analyse de marché est nécessaire. Le magasin a un fournisseur qui envoie réguliÚrement (par exemple, par courrier) le déchargement des données avec des informations sur les résidents.

Développons un service d'API REST Python qui analysera les données fournies et identifiera la demande de cadeaux de résidents de différents groupes d'ùge dans différentes villes par mois.

Nous implémentons les gestionnaires suivants dans le service:

  • POST /imports
    Ajoute un nouveau téléchargement avec des données;
  • GET /imports/$import_id/citizens
    Renvoie les résidents de la sortie spécifiée;
  • PATCH /imports/$import_id/citizens/$citizen_id
    Modifie les informations sur le résident (et ses proches) dans le déchargement spécifié;
  • GET /imports/$import_id/citizens/birthdays
    , ( ), ;
  • GET /imports/$import_id/towns/stat/percentile/age
    50-, 75- 99- ( ) .

?


Nous Ă©crivons donc un service en Python en utilisant des frameworks, des bibliothĂšques et des SGBD familiers.

Dans 4 confĂ©rences du cours vidĂ©o, divers SGBD et leurs fonctionnalitĂ©s sont dĂ©crits. Pour ma mise en Ɠuvre, j'ai choisi le SGBD PostgreSQL , qui s'est imposĂ© comme une solution fiable avec une excellente documentation en russe , une forte communautĂ© russe (vous pouvez toujours trouver la rĂ©ponse Ă  une question en russe), et mĂȘme des cours gratuits . Le modĂšle relationnel est assez polyvalent et bien compris par de nombreux dĂ©veloppeurs. Bien que la mĂȘme chose puisse ĂȘtre faite sur n'importe quel SGBD NoSQL, dans cet article, nous considĂ©rerons PostgreSQL.

L'objectif principal du service - la transmission de donnĂ©es sur le rĂ©seau entre la base de donnĂ©es et les clients - n'implique pas une charge importante sur le processeur, mais nĂ©cessite la capacitĂ© de traiter plusieurs demandes Ă  la fois. Dans 10 confĂ©rences considĂ©rĂ©es comme une approche asynchrone. Il vous permet de servir efficacement plusieurs clients dans le mĂȘme processus de systĂšme d'exploitation (contrairement, par exemple, au modĂšle de prĂ©-fork utilisĂ© dans Flask / Django, qui crĂ©e plusieurs processus pour traiter les demandes des utilisateurs, chacun consomme de la mĂ©moire, mais est inactif la plupart du temps ) Par consĂ©quent, en tant que bibliothĂšque pour l'Ă©criture du service, j'ai choisi l' aiohttp asynchrone . La 5e confĂ©rence du cours vidĂ©o raconte que SQLAlchemy



vous permet de dĂ©composer des requĂȘtes complexes en parties, de les rĂ©utiliser, de gĂ©nĂ©rer des requĂȘtes avec un ensemble dynamique de champs (par exemple, le processeur PATCH permet la mise Ă  jour partielle d'un rĂ©sident avec des champs arbitraires) et de se concentrer directement sur la logique mĂ©tier. Le pilote asyncpg peut gĂ©rer ces demandes et transfĂ©rer les donnĂ©es le plus rapidement possible , et asyncpgsa les aidera Ă  se faire des amis .

Mon outil préféré pour gérer l'état de la base de données et travailler avec les migrations est Alembic . Au fait, j'en ai récemment parlé à Moscou Python .

La logique de validation a été succinctement décrite par les programmes de guimauve (y compris les vérifications des liens familiaux). Utilisation du module aiohttp-specJ'ai lié des gestionnaires et des schémas aiohttp pour la validation des données, et le bonus était de générer de la documentation au format Swagger et de l'afficher dans une interface graphique .

Pour les tests d'écriture, j'ai choisi pytest, plus à ce sujet en 3 conférences .

Pour déboguer et profiler ce projet, j'ai utilisé le débogueur PyCharm ( cours 9 ).

Dans 7, la confĂ©rence dĂ©crit comment n'importe quel ordinateur Docker (ou mĂȘme sur un systĂšme d'exploitation diffĂ©rent) peut fonctionner en mode pack sans avoir Ă  ajuster l'environnement d'application pour dĂ©marrer et facile Ă  installer / mettre Ă  jour / supprimer l'application sur le serveur.

Pour le déploiement, j'ai choisi Ansible. Il vous permet de décrire de maniÚre déclarative l'état souhaité du serveur et de ses services, fonctionne via ssh et ne nécessite pas de logiciel spécial.

DĂ©veloppement


J'ai décidé de donner un nom au package Python analyzeret d'utiliser la structure suivante:



Dans le fichier, analyzer/__init__.pyj'ai posté des informations générales sur le package: description ( docstring ), version, licence, contacts développeur.

Il peut ĂȘtre consultĂ© avec l'aide intĂ©grĂ©e
$ python
>>> import analyzer
>>> help(analyzer)

Help on package analyzer:

NAME
    analyzer

DESCRIPTION
      REST API,    .

PACKAGE CONTENTS
    api (package)
    db (package)
    utils (package)

DATA
    __all__ = ('__author__', '__email__', '__license__', '__maintainer__',...
    __email__ = 'alvassin@yandex.ru'
    __license__ = 'MIT'
    __maintainer__ = 'Alexander Vasin'

VERSION
    0.0.1

AUTHOR
    Alexander Vasin

FILE
    /Users/alvassin/Work/backendschool2019/analyzer/__init__.py

Le package a deux points d'entrée - le service API REST ( analyzer/api/__main__.py) et l'utilitaire de gestion d'état de la base de données ( analyzer/db/__main__.py). Les fichiers sont appelés __main__.pypour une raison - tout d'abord, un tel nom attire l'attention, il indique clairement que le fichier est un point d'entrée.

DeuxiÚmement, grùce à cette approche des points d'entrée python -m:

# REST API
$ python -m analyzer.api --help

#    
$ python -m analyzer.db --help

Pourquoi avez-vous besoin de commencer avec setup.py?


À l'avenir, nous rĂ©flĂ©chirons Ă  la façon de distribuer l'application: elle peut ĂȘtre emballĂ©e dans une archive zip (ainsi que wheel / egg-), un package rpm, un fichier pkg pour macOS et installĂ© sur un ordinateur distant, une machine virtuelle, MacBook ou Docker- rĂ©cipient.

L'objectif principal du fichier setup.pyest de décrire le package avec l'application pour . Le fichier doit contenir des informations générales sur le package (nom, version, auteur, etc.), mais vous pouvez également y spécifier les modules requis pour le travail, les dépendances «supplémentaires» (par exemple, pour les tests), les points d'entrée (par exemple, les commandes exécutables ) et les exigences de l'interprÚte. Les plugins Setuptools vous permettent de collecter des artefacts à partir du package décrit. Il existe des plugins intégrés: zip, egg, rpm, paquet macOS. Les plugins restants sont distribués via PyPI: wheel ,distutils/setuptools



xar , pex .

En fin de compte, en décrivant un fichier, nous obtenons de grandes opportunités. C'est pourquoi le développement d'un nouveau projet doit commencer setup.py.

Dans la fonction, setup()les modules dépendants sont indiqués par une liste:

setup(..., install_requires=["aiohttp", "SQLAlchemy"])

Mais j'ai dĂ©crit les dĂ©pendances dans des fichiers sĂ©parĂ©s requirements.txtet requirements.dev.txtdont le contenu est utilisĂ© dans setup.py. Il me semble plus flexible, en plus il y a un secret: plus tard, il vous permettra de construire une image Docker plus rapidement. Les dĂ©pendances seront dĂ©finies en tant qu'Ă©tape distincte avant d'installer l'application elle-mĂȘme et lors de la reconstruction du conteneur Docker, il se trouve dans le cache.

Pour setup.pypouvoir lire les dépendances des fichiers requirements.txtet requirements.dev.txt, la fonction s'écrit:

def load_requirements(fname: str) -> list:
    requirements = []
    with open(fname, 'r') as fp:
        for req in parse_requirements(fp.read()):
            extras = '[{}]'.format(','.join(req.extras)) if req.extras else ''
            requirements.append(
                '{}{}{}'.format(req.name, extras, req.specifier)
            )
    return requirements

Il est intĂ©ressant de noter que setuptoolslorsque la distribution des sources d'assemblage par dĂ©faut inclut uniquement les fichiers d'assemblage .py, .c, .cppet .h. Pour un fichier de dĂ©pendance requirements.txtet requirements.dev.txtfrapper le sac, ils doivent ĂȘtre clairement spĂ©cifiĂ©s dans le fichier MANIFEST.in.

setup.py entiĂšrement
import os
from importlib.machinery import SourceFileLoader

from pkg_resources import parse_requirements
from setuptools import find_packages, setup

module_name = 'analyzer'

# ,     (   ), 
#   __init__.py   machinery.
module = SourceFileLoader(
    module_name, os.path.join(module_name, '__init__.py')
).load_module()

def load_requirements(fname: str) -> list:
    requirements = []
    with open(fname, 'r') as fp:
        for req in parse_requirements(fp.read()):
            extras = '[{}]'.format(','.join(req.extras)) if req.extras else ''
            requirements.append(
                '{}{}{}'.format(req.name, extras, req.specifier)
            )
    return requirements

setup(
    name=module_name,
    version=module.__version__,
    author=module.__author__,
    author_email=module.__email__,
    license=module.__license__,
    description=module.__doc__,
    long_description=open('README.rst').read(),
    url='https://github.com/alvassin/backendschool2019',
    platforms='all',
    classifiers=[
        'Intended Audience :: Developers',
        'Natural Language :: Russian',
        'Operating System :: MacOS',
        'Operating System :: POSIX',
        'Programming Language :: Python',
        'Programming Language :: Python :: 3',
        'Programming Language :: Python :: 3.8',
        'Programming Language :: Python :: Implementation :: CPython'
    ],
    python_requires='>=3.8',
    packages=find_packages(exclude=['tests']),
    install_requires=load_requirements('requirements.txt'),
    extras_require={'dev': load_requirements('requirements.dev.txt')},
    entry_points={
        'console_scripts': [
            # f-strings  setup.py   - 
            # .
            #   ,     Python 3.8, 
            # source distribution       
            #   Python.     
            # .
            '{0}-api = {0}.api.__main__:main'.format(module_name),
            '{0}-db = {0}.db.__main__:main'.format(module_name)
        ]
    },
    include_package_data=True
)

Vous pouvez installer le projet en mode développement à l'aide de la commande suivante (en mode éditable, Python n'installera pas le package entier dans un dossier site-packages, mais créera uniquement des liens, donc toutes les modifications apportées aux fichiers du package seront immédiatement visibles):

#      extra- "dev"
pip install -e '.[dev]'

#      
pip install -e .

Comment spécifier des versions de dépendances?


C'est formidable lorsque les dĂ©veloppeurs travaillent activement sur leurs packages - les bogues y sont activement corrigĂ©s, de nouvelles fonctionnalitĂ©s apparaissent et les commentaires peuvent ĂȘtre obtenus plus rapidement. Mais parfois, les changements dans les bibliothĂšques dĂ©pendantes ne sont pas rĂ©trocompatibles et peuvent entraĂźner des erreurs dans votre application si vous n'y pensez pas au prĂ©alable.

Par exemple, pour chaque package dĂ©pendant, vous pouvez spĂ©cifier une version spĂ©cifique aiohttp==3.6.2. Ensuite, l'application sera garantie d'ĂȘtre construite spĂ©cifiquement avec les versions des bibliothĂšques dĂ©pendantes avec lesquelles elle a Ă©tĂ© testĂ©e. Mais cette approche a un inconvĂ©nient - si les dĂ©veloppeurs corrigent un bogue critique dans un package dĂ©pendant qui n'affecte pas la compatibilitĂ© descendante, ce correctif n'entrera pas dans l'application.

Il existe une approche pour le contrÎle de version Version sémantique, ce qui suggÚre de soumettre la version au format MAJOR.MINOR.PATCH:

  • MAJOR - augmente lorsque des modifications incompatibles vers l'arriĂšre sont ajoutĂ©es;
  • MINOR - Augmente lors de l'ajout de nouvelles fonctionnalitĂ©s avec prise en charge de la compatibilitĂ© descendante;
  • PATCH - augmente lors de l'ajout de corrections de bogues avec prise en charge de la compatibilitĂ© descendante.

Si un paquet dépendant suit cette approche (dont les auteurs sont généralement signalés dans les fichiers README et Changelog), il suffit de fixer la valeur de MAJOR, MINORet de limiter la valeur minimale pour PATCH version: >= MAJOR.MINOR.PATCH, == MAJOR.MINOR.*.

Une telle exigence peut ĂȘtre implĂ©mentĂ©e Ă  l'aide de l'opĂ©rateur ~ = . Par exemple, cela aiohttp~=3.6.2permettra Ă  PIP de s'installer pour la aiohttpversion 3.6.3, mais pas 3.7.

Si vous spécifiez l'intervalle des versions de dépendance, cela donnera un avantage supplémentaire - il n'y aura pas de conflits de version entre les bibliothÚques dépendantes.

Si vous dĂ©veloppez une bibliothĂšque qui nĂ©cessite un package de dĂ©pendance diffĂ©rent, autorisez-le non pas une version spĂ©cifique, mais un intervalle. Il sera alors beaucoup plus facile pour les utilisateurs de votre bibliothĂšque de l'utiliser (tout d'un coup leur application nĂ©cessite le mĂȘme package de dĂ©pendance, mais d'une version diffĂ©rente).

Le versioning sémantique n'est qu'un accord entre les auteurs et les consommateurs de packages. Cela ne garantit pas que les auteurs écrivent du code sans bogues et ne peuvent pas faire d'erreur dans la nouvelle version de leur package.

Base de données


Nous concevons le schéma


La description du gestionnaire POST / imports fournit un exemple de déchargement avec des informations sur les résidents:

Exemple de téléchargement
{
  "citizens": [
    {
      "citizen_id": 1,
      "town": "",
      "street": " ",
      "building": "1675",
      "apartment": 7,
      "name": "  ",
      "birth_date": "26.12.1986",
      "gender": "male",
      "relatives": [2]
    },
    {
      "citizen_id": 2,
      "town": "",
      "street": " ",
      "building": "1675",
      "apartment": 7,
      "name": "  ",
      "birth_date": "01.04.1997",
      "gender": "male",
      "relatives": [1]
    },
    {
      "citizen_id": 3,
      "town": "",
      "street": " ",
      "building": "2",
      "apartment": 11,
      "name": "  ",
      "birth_date": "23.11.1986",
      "gender": "female",
      "relatives": []
    },
    ...
  ]
}

La premiĂšre pensĂ©e a Ă©tĂ© de stocker toutes les informations sur le rĂ©sident dans une seule table citizens, oĂč la relation serait reprĂ©sentĂ©e par un champ relativessous la forme d'une liste d'entiers .

Mais cette méthode présente plusieurs inconvénients
  1. GET /imports/$import_id/citizens/birthdays , , citizens . relatives UNNEST.

    , 10- :
    SELECT 
        relations.citizen_id, 
        relations.relative_id, 
        date_part('month', relatives.birth_date) as relative_birth_month
    FROM (
    	SELECT
            citizens.import_id, 
            citizens.citizen_id,
            UNNEST(citizens.relatives) as relative_id
    	FROM citizens
        WHERE import_id = 1
    ) as relations
    INNER JOIN citizens as relatives ON
        relations.import_id = relatives.import_id AND
        relations.relative_id = relatives.citizen_id
    

  2. relatives PostgreSQL, : relatives , . ( ) .

De plus, j'ai décidé de rassembler toutes les données nécessaires au travail sous une troisiÚme forme normale , et la structure suivante a été obtenue:



  1. Le tableau des importations se compose d'une colonne à incrémentation automatique import_id. Il est nécessaire de créer une vérification de clé étrangÚre dans le tableau citizens.
  2. La table des citoyens stocke des données scalaires sur le résident (tous les champs sauf les informations sur les relations familiales).

    Une paire ( import_id, citizen_id) est utilisée comme clé primaire , garantissant l'unicité des résidents citizen_iddans le cadre import_id.

    Une clé étrangÚre citizens.import_id -> imports.import_idgarantit que le champ citizens.import_idcontient uniquement les déchargements existants.
  3. relations .

    ( ): citizens relations .
    (import_id, citizen_id, relative_id) , import_id citizen_id c relative_id.

    : (relations.import_id, relations.citizen_id) -> (citizens.import_id, citizens.citizen_id) (relations.import_id, relations.relative_id) -> (citizens.import_id, citizens.citizen_id), , citizen_id relative_id .

Cette structure garantit l' intĂ©gritĂ© des donnĂ©es Ă  l'aide des outils PostgreSQL , elle vous permet d' obtenir efficacement des rĂ©sidents avec des proches de la base de donnĂ©es, mais est soumise Ă  une condition de concurrence lors de la mise Ă  jour des informations sur les rĂ©sidents avec des requĂȘtes compĂ©titives (nous examinerons de plus prĂšs la mise en Ɠuvre du gestionnaire PATCH).

Décrire le schéma dans SQLAlchemy


Dans le chapitre 5, j'ai expliquĂ© comment crĂ©er des requĂȘtes Ă  l'aide de SQLAlchemy, vous devez dĂ©crire le schĂ©ma de la base de donnĂ©es Ă  l'aide d'objets spĂ©ciaux: les tables sont dĂ©crites Ă  l'aide sqlalchemy.Tableet liĂ©es Ă  un registre sqlalchemy.MetaDataqui stocke toutes les mĂ©ta-informations sur la base de donnĂ©es. Soit dit en passant, le registre MetaDatapeut non seulement stocker les mĂ©ta-informations dĂ©crites en Python, mais Ă©galement reprĂ©senter l'Ă©tat rĂ©el de la base de donnĂ©es sous la forme d'objets SQLAlchemy.

Cette fonctionnalité permet également à Alembic de comparer les conditions et de générer automatiquement un code de migration.

Soit dit en passant, chaque base de donnĂ©es a son propre schĂ©ma de dĂ©nomination des contraintes par dĂ©faut. Pour que vous ne perdiez pas de temps Ă  nommer de nouvelles contraintes ou Ă  rechercher / rappeler quelle contrainte vous ĂȘtes sur le point de supprimer, SQLAlchemy suggĂšre d'utiliser des modĂšles de dĂ©nomination pour les conventions de dĂ©nomination . Ils peuvent ĂȘtre dĂ©finis dans le registre MetaData.

Créer un registre MetaData et lui passer des modÚles de dénomination
# analyzer/db/schema.py
from sqlalchemy import MetaData

convention = {
    'all_column_names': lambda constraint, table: '_'.join([
        column.name for column in constraint.columns.values()
    ]),

    #  
    'ix': 'ix__%(table_name)s__%(all_column_names)s',

    #   
    'uq': 'uq__%(table_name)s__%(all_column_names)s',

    #  CHECK-constraint-
    'ck': 'ck__%(table_name)s__%(constraint_name)s',

    #   
    'fk': 'fk__%(table_name)s__%(all_column_names)s__%(referred_table_name)s',

    #   
    'pk': 'pk__%(table_name)s'
}
metadata = MetaData(naming_convention=convention)

Si vous spĂ©cifiez des modĂšles de dĂ©nomination, Alembic les utilisera lors de la gĂ©nĂ©ration automatique des migrations et nommera toutes les contraintes en fonction d'eux. À l'avenir, le registre crĂ©Ă© MetaDatasera nĂ©cessaire pour dĂ©crire les tables:

Nous décrivons le schéma de base de données avec des objets SQLAlchemy
# analyzer/db/schema.py
from enum import Enum, unique

from sqlalchemy import (
    Column, Date, Enum as PgEnum, ForeignKey, ForeignKeyConstraint, Integer,
    String, Table
)


@unique
class Gender(Enum):
    female = 'female'
    male = 'male'


imports_table = Table(
    'imports',
    metadata,
    Column('import_id', Integer, primary_key=True)
)

citizens_table = Table(
    'citizens',
    metadata,
    Column('import_id', Integer, ForeignKey('imports.import_id'),
           primary_key=True),
    Column('citizen_id', Integer, primary_key=True),
    Column('town', String, nullable=False, index=True),
    Column('street', String, nullable=False),
    Column('building', String, nullable=False),
    Column('apartment', Integer, nullable=False),
    Column('name', String, nullable=False),
    Column('birth_date', Date, nullable=False),
    Column('gender', PgEnum(Gender, name='gender'), nullable=False),
)

relations_table = Table(
    'relations',
    metadata,
    Column('import_id', Integer, primary_key=True),
    Column('citizen_id', Integer, primary_key=True),
    Column('relative_id', Integer, primary_key=True),
    ForeignKeyConstraint(
        ('import_id', 'citizen_id'),
        ('citizens.import_id', 'citizens.citizen_id')
    ),
    ForeignKeyConstraint(
        ('import_id', 'relative_id'),
        ('citizens.import_id', 'citizens.citizen_id')
    ),
)

Personnaliser Alembic


Lorsque le schéma de la base de données est décrit, il est nécessaire de générer des migrations, mais pour cela, vous devez d'abord configurer Alembic, qui est également abordé au chapitre 5 .

Pour utiliser la commande alembic, vous devez effectuer les Ă©tapes suivantes:

  1. Installer le paquet: pip install alembic
  2. Initialize Alambic: cd analyzer && alembic init db/alembic.

    Cette commande créera un fichier de configuration analyzer/alembic.iniet un dossier analyzer/db/alembicavec le contenu suivant:
    • env.py- AppelĂ© chaque fois que vous dĂ©marrez Alembic. Se connecte au registre Alembic sqlalchemy.MetaDataavec une description de l'Ă©tat souhaitĂ© de la base de donnĂ©es et contient des instructions pour dĂ©marrer les migrations.
    • script.py.mako - le modĂšle sur la base duquel les migrations sont gĂ©nĂ©rĂ©es.
    • versions - le dossier dans lequel Alembic recherchera (et gĂ©nĂ©rera) les migrations.
  3. Spécifiez l'adresse de la base de données dans le fichier alembic.ini:

    ; analyzer/alembic.ini
    [alembic] 
    sqlalchemy.url = postgresql://user:hackme@localhost/analyzer
  4. Spécifiez une description de l'état souhaité de la base de données (registre sqlalchemy.MetaData) afin qu'Alembic puisse générer automatiquement des migrations:

    # analyzer/db/alembic/env.py
    from analyzer.db import schema
    target_metadata = schema.metadata

Alembic est configurĂ© et peut dĂ©jĂ  ĂȘtre utilisĂ©, mais dans notre cas, cette configuration prĂ©sente plusieurs inconvĂ©nients:

  1. L'utilitaire alembicrecherche alembic.inidans le répertoire de travail actuel. Vous alembic.inipouvez spécifier le chemin d'accÚs à l' argument de ligne de commande, mais cela n'est pas pratique: je veux pouvoir appeler la commande à partir de n'importe quel dossier sans paramÚtres supplémentaires.
  2. Pour configurer Alembic pour fonctionner avec une base de données spécifique, vous devez modifier le fichier alembic.ini. Il serait beaucoup plus pratique de spécifier les paramÚtres de base de données pour la variable d'environnement et / ou un argument de ligne de commande, par exemple --pg-url.
  3. Le nom de l'utilitaire alembicne correspond pas trÚs bien au nom de notre service (et l'utilisateur peut en fait ne pas avoir du tout Python et ne rien savoir sur Alembic). Il serait beaucoup plus pratique pour l'utilisateur final que toutes les commandes exécutables du service aient un préfixe commun, par exemple analyzer-*.

Ces problÚmes sont résolus avec un petit wrapper. analyzer/db/__main__.py:

  • Alembic utilise un module standard pour traiter les arguments de ligne de commande argparse. Il vous permet d'ajouter un argument facultatif --pg-urlavec une valeur par dĂ©faut Ă  partir d'une variable d'environnement ANALYZER_PG_URL.

    Le code
    import os
    from alembic.config import CommandLine, Config
    from analyzer.utils.pg import DEFAULT_PG_URL
    
    
    def main():
        alembic = CommandLine()
        alembic.parser.add_argument(
            '--pg-url', default=os.getenv('ANALYZER_PG_URL', DEFAULT_PG_URL),
            help='Database URL [env var: ANALYZER_PG_URL]'
        )
        options = alembic.parser.parse_args()
    
        #    Alembic
        config = Config(file_=options.config, ini_section=options.name,
                        cmd_opts=options)
    
        #   sqlalchemy.url   Alembic
        config.set_main_option('sqlalchemy.url', options.pg_url)
    
        #   alembic
        exit(alembic.run_cmd(config, options))
    
    
    if __name__ == '__main__':
        main()
  • Le chemin d'accĂšs au fichier alembic.inipeut ĂȘtre calculĂ© par rapport Ă  l'emplacement du fichier exĂ©cutable et non au rĂ©pertoire de travail actuel de l'utilisateur.

    Le code
    import os
    from alembic.config import CommandLine, Config
    from pathlib import Path
    
    
    PROJECT_PATH = Path(__file__).parent.parent.resolve()
    
    
    def main():
        alembic = CommandLine()
        options = alembic.parser.parse_args()
    
        #     (alembic.ini),   
        #    
        if not os.path.isabs(options.config):
            options.config = os.path.join(PROJECT_PATH, options.config)
    
        #    Alembic
        config = Config(file_=options.config, ini_section=options.name,
                        cmd_opts=options)
    
        #      alembic   (,  alembic
        #   env.py,       )
        alembic_location = config.get_main_option('script_location')
        if not os.path.isabs(alembic_location):
            config.set_main_option('script_location',
                                   os.path.join(PROJECT_PATH, alembic_location))
    
        #   alembic
        exit(alembic.run_cmd(config, options))
    
    
    if __name__ == '__main__':
        main()

Lorsque l' utilitaire de gestion de l'Ă©tat de la base de donnĂ©es est prĂȘt, il peut ĂȘtre enregistrĂ© en setup.pytant que commande exĂ©cutable avec un nom comprĂ©hensible pour l'utilisateur final, par exemple analyzer-db:

Enregistrez une commande exécutable dans setup.py
from setuptools import setup

setup(..., entry_points={
    'console_scripts': [
        'analyzer-db = analyzer.db.__main__:main'
    ]
})

AprÚs avoir réinstallé le module, un fichier sera généré env/bin/analyzer-dbet la commande analyzer-dbdeviendra disponible:

$ pip install -e '.[dev]'

Nous générons des migrations


Pour générer des migrations, deux états sont requis: l'état souhaité (que nous avons décrit avec des objets SQLAlchemy) et l'état réel (la base de données, dans notre cas, est vide).

J'ai décidé que le moyen le plus simple d'augmenter Postgres était avec Docker, et pour plus de commodité, j'ai ajouté une commande make postgresqui exécute un conteneur en arriÚre-plan avec PostgreSQL sur le port 5432:

Augmentez PostgreSQL et générez la migration
$ make postgres
...
$ analyzer-db revision --message="Initial" --autogenerate
INFO  [alembic.runtime.migration] Context impl PostgresqlImpl.
INFO  [alembic.runtime.migration] Will assume transactional DDL.
INFO  [alembic.autogenerate.compare] Detected added table 'imports'
INFO  [alembic.autogenerate.compare] Detected added table 'citizens'
INFO  [alembic.autogenerate.compare] Detected added index 'ix__citizens__town' on '['town']'
INFO  [alembic.autogenerate.compare] Detected added table 'relations'
  Generating /Users/alvassin/Work/backendschool2019/analyzer/db/alembic/versions/d5f704ed4610_initial.py ...  done

Alembic fait généralement un bon travail de routine pour générer des migrations, mais je voudrais attirer l'attention sur les points suivants:

  • Les types de donnĂ©es utilisateur spĂ©cifiĂ©s dans les tables crĂ©Ă©es sont crĂ©Ă©s automatiquement (dans notre cas - gender), mais le code pour les supprimer n'est downgradepas gĂ©nĂ©rĂ©. Si vous appliquez, annulez, puis appliquez Ă  nouveau la migration, cela provoquera une erreur car le type de donnĂ©es spĂ©cifiĂ© existe dĂ©jĂ .

    Supprimer le type de données de genre dans la méthode de rétrogradation
    from alembic import op
    from sqlalchemy import Column, Enum
    
    GenderType = Enum('female', 'male', name='gender')
    
    
    def upgrade():
        ...
        #      GenderType   
        op.create_table('citizens', ...,
                        Column('gender', GenderType, nullable=False))
        ...
    
    
    def downgrade():
        op.drop_table('citizens')
    
        #       
        GenderType.drop(op.get_bind())
  • Dans la mĂ©thode, downgradecertaines actions peuvent parfois ĂȘtre supprimĂ©es (si nous supprimons la table entiĂšre, vous ne pouvez pas supprimer ses index sĂ©parĂ©ment):

    par exemple
    def downgrade():
    op.drop_table('relations')
    
    #      citizens,    
    #    
    op.drop_index(op.f('ix__citizens__town'), table_name='citizens')
    op.drop_table('citizens')
    op.drop_table('imports')

Lorsque la migration est fixe et prĂȘte, nous l'appliquons:

$ analyzer-db upgrade head
INFO  [alembic.runtime.migration] Context impl PostgresqlImpl.
INFO  [alembic.runtime.migration] Will assume transactional DDL.
INFO  [alembic.runtime.migration] Running upgrade  -> d5f704ed4610, Initial

application


Avant de commencer à créer des gestionnaires, vous devez configurer l'application aiohttp.

Si vous regardez le démarrage rapide aiohttp, vous pouvez écrire quelque chose comme ceci
import logging

from aiohttp import web


def main():
    #  
    logging.basicConfig(level=logging.DEBUG)

    #  
    app = web.Application()

    #  
    app.router.add_route(...)

    #  
    web.run_app(app)

Ce code soulÚve un certain nombre de questions et présente un certain nombre d'inconvénients:

  • Comment configurer l'application? Au minimum, vous devez spĂ©cifier l'hĂŽte et le port de connexion des clients, ainsi que les informations de connexion Ă  la base de donnĂ©es.

    J'aime vraiment rĂ©soudre ce problĂšme avec l'aide du module ConfigArgParse: il Ă©tend le standard argparseet permet d'utiliser des arguments de ligne de commande, des variables d'environnement (indispensables pour configurer les conteneurs Docker) et mĂȘme des fichiers de configuration (ainsi que la combinaison de ces mĂ©thodes) pour la configuration. En l'utilisant ConfigArgParse, vous pouvez Ă©galement valider les valeurs des paramĂštres de configuration d'application.

    Un exemple de traitement des paramĂštres Ă  l'aide de ConfigArgParse
    from aiohttp import web
    from configargparse import ArgumentParser, ArgumentDefaultsHelpFormatter
    
    from analyzer.utils.argparse import positive_int
    
    parser = ArgumentParser(
        #        ANALYZER_,
        #  ANALYZER_API_ADDRESS  ANALYZER_API_PORT
        auto_env_var_prefix='ANALYZER_',
    
        #     
        formatter_class=ArgumentDefaultsHelpFormatter
    )
    
    parser.add_argument('--api-address', default='0.0.0.0',
                        help='IPv4/IPv6 address API server would listen on')
    
    #      
    parser.add_argument('--api-port', type=positive_int, default=8081,
                        help='TCP port API server would listen on')
    
    
    def main():
        #   ,     
        #  ,    
        args = parser.parse_args()
    
        #       
        app = web.Application()
        web.run_app(app, host=args.api_address, port=args.api_port)
    
    
    if __name__ == '__main__':
        main()

    , ConfigArgParse, argparse, ( -h --help). :

    $ python __main__.py --help
    usage: __main__.py [-h] [--api-address API_ADDRESS] [--api-port API_PORT]
    
    If an arg is specified in more than one place, then commandline values override environment variables which override defaults.
    
    optional arguments:
      -h, --help            show this help message and exit
      --api-address API_ADDRESS
                            IPv4/IPv6 address API server would listen on [env var: ANALYZER_API_ADDRESS] (default: 0.0.0.0)
      --api-port API_PORT   TCP port API server would listen on [env var: ANALYZER_API_PORT] (default: 8081)
  • — , «» . , .

    os.environ.clear(), Python (, asyncio?), , ConfigArgParser.

    import os
    from typing import Callable
    from configargparse import ArgumentParser
    from yarl import URL
    
    from analyzer.api.app import create_app
    from analyzer.utils.pg import DEFAULT_PG_URL
    
    ENV_VAR_PREFIX = 'ANALYZER_'
    
    parser = ArgumentParser(auto_env_var_prefix=ENV_VAR_PREFIX)
    parser.add_argument('--pg-url', type=URL, default=URL(DEFAULT_PG_URL),
                       help='URL to use to connect to the database')
    
    
    def clear_environ(rule: Callable):
        """
          ,     
         rule
        """
        #   os.environ    tuple,    
        # os.environ   
        for name in filter(rule, tuple(os.environ)):
            os.environ.pop(name)
    
    
    def main():
        #  
        args = parser.parse_args()
    
        #      ANALYZER_
        clear_environ(lambda i: i.startswith(ENV_VAR_PREFIX))
    
        #  
        app = create_app(args)
        ...
    
    
    if __name__ == '__main__':
        main()
  • stderr/ .

    9 , logging.basicConfig() stderr.

    , . aiomisc.

    aiomisc
    import logging
    
    from aiomisc.log import basic_config
    
    basic_config(logging.DEBUG, buffered=True)    
    
  • , ? , fork , (, Windows ).

    import os
    from sys import argv
    
    import forklib
    from aiohttp.web import Application, run_app
    from aiomisc import bind_socket
    from setproctitle import setproctitle
    
    
    def main():
        sock = bind_socket(address='0.0.0.0', port=8081, proto_name='http')
        setproctitle(f'[Master] {os.path.basename(argv[0])}')
    
        def worker():
            setproctitle(f'[Worker] {os.path.basename(argv[0])}')
            app = Application()
            run_app(app, sock=sock)
    
        forklib.fork(os.cpu_count(), worker, auto_restart=True)
    
    
    if __name__ == '__main__':
        main()
    
  • - ? , ( — ) , nobody. — .

    import os
    import pwd
    
    from aiohttp.web import run_app
    from aiomisc import bind_socket
    
    from analyzer.api.app import create_app
    
    
    def main():
        #  
        sock = bind_socket(address='0.0.0.0', port=8085, proto_name='http')
    
        user = pwd.getpwnam('nobody')
        os.setgid(user.pw_gid)
        os.setuid(user.pw_uid)
    
        app = create_app(...)
        run_app(app, sock=sock)
    
    
    if __name__ == '__main__':
        main()
  • create_app, .


Toutes les réponses de gestionnaire réussies seront renvoyées au format JSON. Il serait également pratique pour les clients de recevoir des informations sur les erreurs sous une forme sérialisée (par exemple, pour voir quels champs n'ont pas passé la validation).

La documentation aiohttppropose une mĂ©thode json_responsequi prend un objet, le sĂ©rialise en JSON et renvoie un nouvel objet aiohttp.web.Responseavec un en-tĂȘte Content-Type: application/jsonet des donnĂ©es sĂ©rialisĂ©es Ă  l'intĂ©rieur.

Comment sérialiser des données à l'aide de json_response
from aiohttp.web import Application, View, run_app
from aiohttp.web_response import json_response


class SomeView(View):
    async def get(self):
        return json_response({'hello': 'world'})


app = Application()
app.router.add_route('*', '/hello', SomeView)
run_app(app)

Mais il existe un autre moyen: aiohttp vous permet d'enregistrer un sérialiseur arbitraire pour un type spécifique de données de réponse dans le registre aiohttp.PAYLOAD_REGISTRY. Par exemple, vous pouvez spécifier un sérialiseur aiohttp.JsonPayloadpour les objets de type Mapping .

Dans ce cas, il suffira que le gestionnaire retourne un objet Responseavec les données de réponse dans le paramÚtre body. aiohttp trouvera un sérialiseur qui correspond au type de données et sérialisera la réponse.

Outre le fait que la sérialisation des objets est décrite en un seul endroit, cette approche est également plus flexible - elle vous permet d'implémenter des solutions trÚs intéressantes (nous considérerons l'un des cas d'utilisation dans le gestionnaire GET /imports/$import_id/citizens).

Comment sérialiser des données à l'aide de aiohttp.PAYLOAD_REGISTRY
from types import MappingProxyType
from typing import Mapping

from aiohttp import PAYLOAD_REGISTRY, JsonPayload
from aiohttp.web import run_app, Application, Response, View

PAYLOAD_REGISTRY.register(JsonPayload, (Mapping, MappingProxyType))


class SomeView(View):
    async def get(self):
        return Response(body={'hello': 'world'})


app = Application()
app.router.add_route('*', '/hello', SomeView)
run_app(app)

Il est important de comprendre que json_response, par exemple aiohttp.JsonPayload, ils utilisent une méthode standard json.dumpsqui ne peut pas sérialiser des types de données complexes, par exemple, datetime.dateou asyncpg.Record( asyncpgrenvoie des enregistrements de la base de données en tant qu'instances de cette classe). De plus, certains objets complexes peuvent en contenir d'autres: dans un enregistrement de la base de données, il peut y avoir un champ type datetime.date.

Les développeurs Python ont résolu ce problÚme: la méthode json.dumpsvous permet d'utiliser l'argument defaultpour spécifier une fonction qui est appelée lorsqu'il est nécessaire de sérialiser un objet inconnu. La fonction devrait convertir un objet inconnu en un type qui peut sérialiser le module json.

Comment étendre JsonPayload pour sérialiser des objets arbitraires
import json
from datetime import date
from functools import partial, singledispatch
from typing import Any

from aiohttp.payload import JsonPayload as BaseJsonPayload
from aiohttp.typedefs import JSONEncoder

@singledispatch
def convert(value):
    raise NotImplementedError(f'Unserializable value: {value!r}')


@convert.register(Record)
def convert_asyncpg_record(value: Record):
    """
        , 
    asyncpg
    """
    return dict(value)


@convert.register(date)
def convert_date(value: date):
    """
       date      —  
      .     
      ..
    """
    return value.strftime('%d.%m.%Y')
    
 
dumps = partial(json.dumps, default=convert)


class JsonPayload(BaseJsonPayload):
    def __init__(self,
                 value: Any,
                 encoding: str = 'utf-8',
                 content_type: str = 'application/json',
                 dumps: JSONEncoder = dumps,
                 *args: Any,
                 **kwargs: Any) -> None:
        super().__init__(value, encoding, content_type, dumps, *args, **kwargs)

Gestionnaires


aiohttp vous permet d'implĂ©menter des gestionnaires avec des fonctions et classes asynchrones. Les classes sont plus extensibles: premiĂšrement, le code appartenant Ă  un gestionnaire peut ĂȘtre placĂ© Ă  un endroit, et deuxiĂšmement, les classes vous permettent d'utiliser l'hĂ©ritage pour vous dĂ©barrasser de la duplication de code (par exemple, chaque gestionnaire nĂ©cessite une connexion Ă  la base de donnĂ©es).

Classe de base du gestionnaire
from aiohttp.web_urldispatcher import View
from asyncpgsa import PG


class BaseView(View):
    URL_PATH: str

    @property
    def pg(self) -> PG:
        return self.request.app['pg']

Comme il est difficile de lire un gros fichier, j'ai décidé de diviser les gestionnaires en fichiers. Les petits fichiers encouragent une faible connectivité et, par exemple, s'il y a des importations en anneau à l'intérieur des gestionnaires, cela signifie que quelque chose ne va pas dans la composition des entités.

POST / importations


Le gestionnaire d'entrĂ©e reçoit json avec des donnĂ©es sur les rĂ©sidents. La taille de requĂȘte maximale autorisĂ©e dans aiohttp est contrĂŽlĂ©e par l'option client_max_sizeet est de 2 Mo par dĂ©faut . Si la limite est dĂ©passĂ©e, aiohttp retournera une rĂ©ponse HTTP avec un Ă©tat de 413: Erreur d'entitĂ© trop grande demandĂ©e.

En mĂȘme temps, le json correct avec les lignes et les nombres les plus longs pĂšsera ~ 63 mĂ©gaoctets, donc les restrictions sur la taille de la demande doivent ĂȘtre Ă©tendues.

Ensuite, vous devez vérifier et désérialiser les données . S'ils sont incorrects, vous devez renvoyer une réponse HTTP 400: Bad Request.

J'avais besoin de deux régimes Marhsmallow. Le premier CitizenSchemavérifie les données de chaque résident individuel et désérialise également la chaßne joyeux anniversaire dans l'objet datetime.date:

  • Type de donnĂ©es, format et disponibilitĂ© de tous les champs obligatoires;
  • Manque de champs inconnus;
  • La date de naissance doit ĂȘtre indiquĂ©e dans le format DD.MM.YYYYet ne peut avoir aucune signification pour l'avenir;
  • La liste des proches de chaque rĂ©sident doit contenir des identifiants uniques des rĂ©sidents prĂ©sents dans ce tĂ©lĂ©chargement.

Le deuxiÚme schéma ImportSchema,, vérifie le déchargement dans son ensemble:

  • citizen_id chaque rĂ©sident du dĂ©chargement doit ĂȘtre unique;
  • Les liens familiaux doivent ĂȘtre Ă  double sens (si le rĂ©sident n ° 1 a un rĂ©sident n ° 2 dans la liste des parents, le rĂ©sident n ° 2 doit Ă©galement avoir un parent n ° 1).

Si les donnĂ©es sont correctes, elles doivent ĂȘtre ajoutĂ©es Ă  la base de donnĂ©es avec une nouvelle unique import_id.
Pour ajouter des donnĂ©es, vous devrez effectuer plusieurs requĂȘtes dans diffĂ©rentes tables. Afin d'Ă©viter des donnĂ©es partiellement partiellement ajoutĂ©es dans la base de donnĂ©es en cas d'erreur ou d'exception (par exemple, lorsque vous dĂ©connectez un client qui n'a pas reçu de rĂ©ponse complĂšte, aiohttp lĂšvera une exception CancelledError ), vous devez utiliser une transaction .

Il est nĂ©cessaire d'ajouter des donnĂ©es aux tables par parties , car dans une requĂȘte Ă  PostgreSQL, il ne peut y avoir plus de 32 767 arguments. Il y a citizens9 champs dans le tableau . Par consĂ©quent, pour 1 requĂȘte, seules 32 767/9 = 3 640 lignes peuvent ĂȘtre insĂ©rĂ©es dans ce tableau, et en un tĂ©lĂ©chargement, il peut y avoir jusqu'Ă  10 000 habitants.

GET / importations / $ import_id / citoyens


Le gestionnaire renvoie tous les rĂ©sidents pour dĂ©chargement avec le spĂ©cifiĂ© import_id. Si le tĂ©lĂ©chargement spĂ©cifiĂ© n'existe pas , vous devez renvoyer la rĂ©ponse HTTP 404: Not Found. Ce comportement semble ĂȘtre courant pour les gestionnaires qui ont besoin d'un dĂ©chargement existant, j'ai donc extrait le code de vĂ©rification dans une classe distincte.

Classe de base pour les gestionnaires avec déchargements
from aiohttp.web_exceptions import HTTPNotFound
from sqlalchemy import select, exists

from analyzer.db.schema import imports_table


class BaseImportView(BaseView):
    @property
    def import_id(self):
        return int(self.request.match_info.get('import_id'))

    async def check_import_exists(self):
        query = select([
            exists().where(imports_table.c.import_id == self.import_id)
        ])
        if not await self.pg.fetchval(query):
            raise HTTPNotFound()

Pour obtenir une liste de parents pour chaque résident, vous devrez effectuer LEFT JOINde table citizensen table relations, en agrégeant le champ relations.relative_idregroupé par import_idet citizen_id.

Si le résident n'a pas de parents, il LEFT JOINretournera la relations.relative_idvaleur pour lui sur le terrain NULLet, à la suite de l'agrégation, la liste des parents ressemblera [NULL].

Pour corriger cette valeur incorrecte, j'ai utilisé la fonction array_remove .

La base de données stocke la date dans un format YYYY-MM-DD, mais nous avons besoin d'un format DD.MM.YYYY.

Techniquement, vous pouvez formater la date avec une requĂȘte SQL ou du cĂŽtĂ© Python au moment de la sĂ©rialisation de la rĂ©ponse avec json.dumps(asyncpg renvoie la valeur du champ en birth_datetant qu'instance de la classedatetime.date)

J'ai choisi la sérialisation cÎté Python, étant donné que c'est birth_datele seul objet datetime.datedu projet avec un seul format (voir la section «Sérialisation des données» ).

MalgrĂ© le fait que le processeur exĂ©cute deux requĂȘtes (vĂ©rification de l'existence d'un dĂ©chargement et d'une demande de liste de rĂ©sidents), il n'est pas nĂ©cessaire d'utiliser une transaction . Par dĂ©faut, PostgreSQL utilise le niveau d'isolement, READ COMMITTEDet mĂȘme au sein d'une transaction, toutes les modifications apportĂ©es Ă  d'autres transactions terminĂ©es avec succĂšs seront visibles (ajout de nouvelles lignes, modification de celles existantes).

Le tĂ©lĂ©chargement le plus important dans une vue texte peut prendre environ 63 mĂ©gaoctets, ce qui est beaucoup, surtout si l'on considĂšre que plusieurs demandes de rĂ©ception de donnĂ©es peuvent arriver en mĂȘme temps. Il existe un moyen assez intĂ©ressant d'obtenir des donnĂ©es de la base de donnĂ©es Ă  l'aide du curseur et de les envoyer au client en plusieurs parties .

Pour ce faire, nous devons implémenter deux objets:

  1. Un objet SelectQuerytype AsyncIterablequi renvoie des enregistrements de la base de données. Au premier appel, il se connecte à la base de données, ouvre une transaction et crée un curseur; lors d'une nouvelle itération, il renvoie les enregistrements de la base de données. Il est retourné par le gestionnaire.

    Code SelectQuery
    from collections import AsyncIterable
    from asyncpgsa.transactionmanager import ConnectionTransactionContextManager
    from sqlalchemy.sql import Select
    
    
    class SelectQuery(AsyncIterable):
        """
        ,     PostgreSQL   
        ,  ,    
        """
        PREFETCH = 500
    
        __slots__ = (
            'query', 'transaction_ctx', 'prefetch', 'timeout'
        )
    
        def __init__(self, query: Select,
                     transaction_ctx: ConnectionTransactionContextManager,
                     prefetch: int = None,
                     timeout: float = None):
            self.query = query
            self.transaction_ctx = transaction_ctx
            self.prefetch = prefetch or self.PREFETCH
            self.timeout = timeout
    
        async def __aiter__(self):
            async with self.transaction_ctx as conn:
                cursor = conn.cursor(self.query, prefetch=self.prefetch,
                                     timeout=self.timeout)
                async for row in cursor:
                    yield row
    
  2. Un sérialiseur AsyncGenJSONListPayloadqui peut parcourir les générateurs asynchrones, sérialiser les données d'un générateur asynchrone vers JSON et envoyer des données aux clients en plusieurs parties. Il est enregistré aiohttp.PAYLOAD_REGISTRYcomme sérialiseur d'objets AsyncIterable.

    Code AsyncGenJSONListPayload
    import json
    from functools import partial
    
    from aiohttp import Payload
    
    
    # ,    JSON  asyncpg.Record  datetime.date
    dumps = partial(json.dumps, default=convert, ensure_ascii=False)
    
    
    class AsyncGenJSONListPayload(Payload):
        """
           AsyncIterable,     
         JSON   
        """
        def __init__(self, value, encoding: str = 'utf-8',
                     content_type: str = 'application/json',
                     root_object: str = 'data',
                     *args, **kwargs):
            self.root_object = root_object
            super().__init__(value, content_type=content_type, encoding=encoding,
                             *args, **kwargs)
    
        async def write(self, writer):
            #  
            await writer.write(
                ('{"%s":[' % self.root_object).encode(self._encoding)
            )
    
            first = True
            async for row in self._value:
                #      
                if not first:
                    await writer.write(b',')
                else:
                    first = False
    
                await writer.write(dumps(row).encode(self._encoding))
    
            #  
            await writer.write(b']}')

De plus, dans le gestionnaire, il sera possible de crĂ©er un objet SelectQuery, de lui passer une requĂȘte SQL et une fonction pour ouvrir la transaction et la renvoyer Ă  Response body:

Code gestionnaire
# analyzer/api/handlers/citizens.py
from aiohttp.web_response import Response
from aiohttp_apispec import docs, response_schema

from analyzer.api.schema import CitizensResponseSchema
from analyzer.db.schema import citizens_table as citizens_t
from analyzer.utils.pg import SelectQuery

from .query import CITIZENS_QUERY
from .base import BaseImportView


class CitizensView(BaseImportView):
    URL_PATH = r'/imports/{import_id:\d+}/citizens'

    @docs(summary='    ')
    @response_schema(CitizensResponseSchema())
    async def get(self):
        await self.check_import_exists()

        query = CITIZENS_QUERY.where(
            citizens_t.c.import_id == self.import_id
        )
        body = SelectQuery(query, self.pg.transaction())
        return Response(body=body)

aiohttpil détecte un aiohttp.PAYLOAD_REGISTRYsérialiseur enregistré AsyncGenJSONListPayloadpour les objets de type dans le registre AsyncIterable. Ensuite, le sérialiseur parcourt l'objet SelectQueryet envoie des données au client. Lors du premier appel, l'objet SelectQueryreçoit une connexion à la base de données, ouvre une transaction et crée un curseur; lors d'une nouvelle itération, il recevra les données de la base de données avec le curseur et les renverra ligne par ligne.

Cette approche permet de ne pas allouer de mĂ©moire pour la totalitĂ© du volume de donnĂ©es Ă  chaque requĂȘte, mais elle a une particularitĂ©: l'application ne pourra pas renvoyer l'Ă©tat HTTP correspondant au client en cas d'erreur (aprĂšs tout, l'Ă©tat HTTP, les en-tĂȘtes ont dĂ©jĂ  Ă©tĂ© envoyĂ©s au client et les donnĂ©es sont en cours d'Ă©criture).

Lorsqu'une exception se produit, il ne reste plus qu'Ă  se dĂ©connecter. Une exception, bien sĂ»r, peut ĂȘtre sĂ©curisĂ©e, mais le client ne pourra pas comprendre exactement quelle erreur s'est produite.

D'un autre cĂŽtĂ©, une situation similaire peut se produire mĂȘme si le processeur reçoit toutes les donnĂ©es de la base de donnĂ©es, mais le rĂ©seau clignote lors de la transmission des donnĂ©es au client - personne n'est Ă  l'abri de cela.

PATCH / imports / $ import_id / citoyens / $ citizen_id


Le gestionnaire reçoit l'identifiant du dĂ©chargement import_id, le rĂ©sident citizen_id, ainsi que json avec les nouvelles donnĂ©es sur le rĂ©sident. Dans le cas d'un dĂ©chargement inexistant ou d'un rĂ©sident , une rĂ©ponse HTTP doit ĂȘtre retournĂ©e 404: Not Found.

Les donnĂ©es transmises par le client doivent ĂȘtre vĂ©rifiĂ©es et dĂ©sĂ©rialisĂ©es . S'ils sont incorrects, vous devez renvoyer une rĂ©ponse HTTP 400: Bad Request. J'ai implĂ©mentĂ© un schĂ©ma Marshmallow PatchCitizenSchemaqui vĂ©rifie:

  • Le type et le format des donnĂ©es pour les champs spĂ©cifiĂ©s.
  • Date de naissance. Il doit ĂȘtre spĂ©cifiĂ© dans un format DD.MM.YYYYet ne peut ĂȘtre significatif Ă  l'avenir.
  • Une liste des proches de chaque rĂ©sident. Il doit avoir des identifiants uniques pour les rĂ©sidents.

L'existence des parents indiquĂ©s dans le champ relativesne peut pas ĂȘtre vĂ©rifiĂ©e sĂ©parĂ©ment: si un relationsrĂ©sident inexistant est ajoutĂ© Ă  la table, PostgreSQL renvoie une erreur ForeignKeyViolationErrorqui peut ĂȘtre traitĂ©e et le statut HTTP peut ĂȘtre retournĂ© 400: Bad Request.

Quel statut doit ĂȘtre retournĂ© si le client a envoyĂ© des donnĂ©es incorrectes pour un rĂ©sident inexistant ou un dĂ©chargement ? Il est sĂ©mantiquement plus correct de vĂ©rifier d'abord l'existence d'un dĂ©chargement et d'un rĂ©sident (s'il n'y en a pas, retour 404: Not Found) et ensuite seulement si le client a envoyĂ© les donnĂ©es correctes (sinon, retour 400: Bad Request). En pratique, il est souvent moins coĂ»teux de vĂ©rifier d'abord les donnĂ©es, et seulement si elles sont correctes, d'accĂ©der Ă  la base de donnĂ©es.

Les deux options sont acceptables, mais j'ai décidé de choisir une deuxiÚme option moins chÚre, car dans tous les cas, le résultat de l'opération est une erreur qui n'affecte rien (le client corrigera les données et découvrira ensuite que le résident n'existe pas).

Si les donnĂ©es sont correctes, il est nĂ©cessaire de mettre Ă  jour les informations sur le rĂ©sident dans la base de donnĂ©es . Dans le gestionnaire, vous devrez effectuer plusieurs requĂȘtes sur diffĂ©rentes tables. Si une erreur ou une exception se produit, les modifications apportĂ©es Ă  la base de donnĂ©es doivent ĂȘtre annulĂ©es, de sorte que les requĂȘtes doivent ĂȘtre effectuĂ©es dans une transaction .

La méthode PATCH vous permet de transférer uniquement certains champs pour un résident.

Le gestionnaire doit ĂȘtre Ă©crit de telle maniĂšre qu'il ne se bloque pas lors de l'accĂšs aux donnĂ©es que le client n'a pas spĂ©cifiĂ© et n'exĂ©cute pas non plus les requĂȘtes sur les tables dans lesquelles les donnĂ©es n'ont pas changĂ©.

Si le client a prĂ©cisĂ© le champ relatives, il est nĂ©cessaire d'obtenir une liste des proches existants. S'il a changĂ©, dĂ©terminez les enregistrements de la table qui relativesdoivent ĂȘtre supprimĂ©s et ceux Ă  ajouter afin d'aligner la base de donnĂ©es sur la demande du client. Par dĂ©faut, PostgreSQL utilise l'isolement des transactions READ COMMITTED. Cela signifie que dans le cadre de la transaction en cours, les modifications seront visibles pour les enregistrements existants (ainsi que les nouveaux) des autres transactions terminĂ©es. Cela peut conduire Ă  une condition de concurrence entre les demandes concurrentielles .

Supposons qu'il y ait un déchargement avec les résidents#1. #2, #3sans parenté. Le service reçoit deux demandes simultanées de changement de résident n ° 1: {"relatives": [2]}et {"relatives": [3]}. aiohttp créera deux gestionnaires qui recevront simultanément l'état actuel du résident de PostgreSQL.

Chaque gestionnaire ne dĂ©tectera pas une seule relation associĂ©e et dĂ©cidera d'ajouter une nouvelle relation avec le parent spĂ©cifiĂ©. En consĂ©quence, le rĂ©sident n ° 1 a le mĂȘme domaine que les proches [2,3].



Ce comportement ne peut pas ĂȘtre qualifiĂ© d'Ă©vident. Deux options sont prĂ©vues pour dĂ©cider du rĂ©sultat de la course: pour terminer uniquement la premiĂšre demande, et pour la seconde, renvoyer une rĂ©ponse HTTP
409: Conflict(afin que le client répÚte la demande), ou pour exécuter les demandes à tour de rÎle (la deuxiÚme demande ne sera traitée qu'aprÚs la fin de la premiÚre).

La premiĂšre option peut ĂȘtre implĂ©mentĂ©e en activant le mode d'isolementSERIALIZABLE. Si au cours du traitement de la demande, quelqu'un a dĂ©jĂ  rĂ©ussi Ă  modifier et Ă  valider les donnĂ©es, une exception sera levĂ©e, qui peut ĂȘtre traitĂ©e et le statut HTTP correspondant retournĂ©.

L'inconvĂ©nient de cette solution - un grand nombre de verrous dans PostgreSQL, SERIALIZABLElĂšvera une exception, mĂȘme si les requĂȘtes concurrentielles modifient les enregistrements des rĂ©sidents de diffĂ©rents dĂ©chargements.

Vous pouvez également utiliser le mécanisme de verrouillage des recommandations . Si vous obtenez un tel verrouillage import_id, les demandes concurrentielles de différents déchargements pourront s'exécuter en parallÚle.

Pour traiter les demandes concurrentielles en un seul téléchargement, vous pouvez implémenter le comportement de l'une des options: la fonction pg_try_advisory_xact_lockessaie d'obtenir un verrou et
il renvoie le rĂ©sultat boolĂ©en immĂ©diatement (s'il n'Ă©tait pas possible d'obtenir le verrou - une exception peut ĂȘtre levĂ©e), mais il pg_advisory_xact_lockattend que la
ressource soit disponible pour le blocage (dans ce cas, les requĂȘtes seront exĂ©cutĂ©es sĂ©quentiellement, j'ai optĂ© pour cette option).

Par consĂ©quent, le gestionnaire doit renvoyer les informations actuelles sur le rĂ©sident mis Ă  jour . Il a Ă©tĂ© possible de nous limiter au retour des donnĂ©es de sa demande au client (puisque nous renvoyons une rĂ©ponse au client, cela signifie qu'il n'y a eu aucune exception et que toutes les demandes ont Ă©tĂ© traitĂ©es avec succĂšs). Ou - utilisez le mot clĂ© RETURNING dans les requĂȘtes qui modifient la base de donnĂ©es et gĂ©nĂšrent une rĂ©ponse Ă  partir des rĂ©sultats. Mais ces deux approches ne nous permettraient pas de voir et de tester le cas de la race des États.

Il n'y avait aucune exigence de charge Ă©levĂ©e pour le service, j'ai donc dĂ©cidĂ© de demander Ă  nouveau toutes les donnĂ©es sur le rĂ©sident et de renvoyer au client un rĂ©sultat honnĂȘte de la base de donnĂ©es.

GET / importations / $ import_id / citoyens / anniversaires


Le gestionnaire calcule le nombre de cadeaux que chaque rĂ©sident du dĂ©chargement recevra Ă  ses proches (premiĂšre commande). Le numĂ©ro est groupĂ© par mois pour le tĂ©lĂ©chargement avec le spĂ©cifiĂ© import_id. Dans le cas d'un tĂ©lĂ©chargement inexistant , une rĂ©ponse HTTP doit ĂȘtre retournĂ©e 404: Not Found.

Il existe deux options de mise en Ɠuvre:

  1. Obtenez des données pour les résidents avec des proches dans la base de données et du cÎté Python, agrégez les données par mois et générez des listes pour les mois pour lesquels il n'y a pas de données dans la base de données.
  2. Compilez une requĂȘte json dans la base de donnĂ©es et ajoutez des stubs pour les mois manquants.

J'ai optĂ© pour la premiĂšre option - visuellement, elle semble plus comprĂ©hensible et prise en charge. Le nombre d'anniversaires dans un mois donnĂ© peut ĂȘtre obtenu en faisant JOINdu tableau avec les liens familiaux ( relations.citizen_id- le rĂ©sident pour lequel nous considĂ©rons les anniversaires des proches) dans le tableau citizens(contenant la date de naissance Ă  partir de laquelle vous souhaitez obtenir le mois).

Les valeurs de mois ne doivent pas contenir de zĂ©ros en tĂȘte. Le mois obtenu Ă  partir du champ birth_dateutilisant la fonction date_partpeut contenir un zĂ©ro non significatif. Pour l' enlever, je fis castĂ  integerla requĂȘte SQL.

Malgré le fait que le gestionnaire doive répondre à deux demandes (vérifier l'existence du déchargement et obtenir des informations sur les anniversaires et les cadeaux), aucune transaction n'est requise .

Par défaut, PostgreSQL utilise le mode READ COMMITTED, dans lequel tous les nouveaux enregistrements (ajoutés par d'autres transactions) et existants (modifiés par d'autres transactions) sont visibles dans la transaction en cours une fois qu'ils sont terminés avec succÚs.

Par exemple, si un nouveau téléchargement est ajouté au moment de la réception des données, cela n'affectera pas les données existantes. Si, au moment de la réception des données, une demande de changement de résident est satisfaite, soit les données ne seront pas encore visibles (si la transaction modifiant les données n'est pas terminée), soit la transaction se terminera complÚtement et toutes les modifications seront immédiatement visibles. L'intégrité obtenue de la base de données ne sera pas violée.

GET / imports / $ import_id / towns / stat / percentile / age


Le gestionnaire calcule les 50e, 75e et 99e centiles de l'Ăąge (annĂ©es entiĂšres) des rĂ©sidents par ville dans l'Ă©chantillon avec l'id_import_id spĂ©cifiĂ©. Dans le cas d'un tĂ©lĂ©chargement inexistant , une rĂ©ponse HTTP doit ĂȘtre retournĂ©e 404: Not Found.

MalgrĂ© le fait que le processeur exĂ©cute deux requĂȘtes (vĂ©rification de l'existence du dĂ©chargement et obtention d'une liste de rĂ©sidents), il n'est pas nĂ©cessaire d'utiliser une transaction .

Il existe deux options de mise en Ɠuvre:

  1. Obtenez l'ùge des résidents à partir de la base de données, regroupés par ville, puis du cÎté Python, calculez les centiles en utilisant numpy (qui est spécifié comme référence dans la tùche) et arrondissez à deux décimales.
  2. PostgreSQL: percentile_cont , SQL-, numpy .

La deuxiÚme option nécessite de transférer moins de données entre l'application et PostgreSQL, mais elle n'a pas d'écueil trÚs évident: dans PostgreSQL, l'arrondi est mathématique, ( SELECT ROUND(2.5)renvoie 3), et en Python - comptabilité, à l'entier le plus proche ( round(2.5)retourne 2).

Pour tester le gestionnaire, l'implĂ©mentation doit ĂȘtre la mĂȘme dans PostgreSQL et Python (l'implĂ©mentation d'une fonction avec arrondi mathĂ©matique en Python semble plus facile). Il convient de noter que lors du calcul des centiles, numpy et PostgreSQL peuvent renvoyer des nombres lĂ©gĂšrement diffĂ©rents, mais en tenant compte de l'arrondissement, cette diffĂ©rence ne sera pas perceptible.

Essai


Que faut-il vĂ©rifier dans cette application? PremiĂšrement, que les gestionnaires rĂ©pondent aux exigences et effectuent le travail requis dans un environnement aussi proche que possible de l'environnement de combat. DeuxiĂšmement, les migrations qui modifient l'Ă©tat de la base de donnĂ©es fonctionnent sans erreur. TroisiĂšmement, il existe un certain nombre de fonctions auxiliaires qui pourraient Ă©galement ĂȘtre correctement couvertes par des tests.

J'ai dĂ©cidĂ© d'utiliser le framework pytest en raison de sa flexibilitĂ© et de sa facilitĂ© d'utilisation. Il offre un mĂ©canisme puissant pour prĂ©parer l'environnement aux tests - les luminaires , c'est-Ă -dire qu'il fonctionne avec un dĂ©corateurpytest.mark.fixturedont les noms peuvent ĂȘtre spĂ©cifiĂ©s par le paramĂštre du test. Si pytest dĂ©tecte un paramĂštre avec un nom de luminaire dans l'annotation de test, il exĂ©cutera ce luminaire et transmettra le rĂ©sultat dans la valeur de ce paramĂštre. Et si le luminaire est un gĂ©nĂ©rateur, le paramĂštre de test prendra la valeur renvoyĂ©e yieldet une fois le test terminĂ©, la deuxiĂšme partie du luminaire sera exĂ©cutĂ©e, ce qui peut effacer les ressources ou fermer les connexions.

Pour la plupart des tests, nous avons besoin d'une base de données PostgreSQL. Pour isoler les tests les uns des autres, vous pouvez créer une base de données distincte avant chaque test et la supprimer aprÚs l'exécution.

Créer une base de données d'appareils pour chaque test
import os
import uuid

import pytest
from sqlalchemy import create_engine
from sqlalchemy_utils import create_database, drop_database
from yarl import URL

from analyzer.utils.pg import DEFAULT_PG_URL

PG_URL = os.getenv('CI_ANALYZER_PG_URL', DEFAULT_PG_URL)


@pytest.fixture
def postgres():
    tmp_name = '.'.join([uuid.uuid4().hex, 'pytest'])
    tmp_url = str(URL(PG_URL).with_path(tmp_name))
    create_database(tmp_url)

    try:
        #      postgres  -
        yield tmp_url
    finally:
        drop_database(tmp_url)


def test_db(postgres):
    """
     ,  PostgreSQL
    """
    engine = create_engine(postgres)
    assert engine.execute('SELECT 1').scalar() == 1
    engine.dispose()

Le module sqlalchemy_utils a fait un excellent travail de cette tùche , en tenant compte des caractéristiques des différentes bases de données et pilotes. Par exemple, PostgreSQL ne permet pas l'exécution CREATE DATABASEdans un bloc de transaction. Lors de la création d'une base de données, il sqlalchemy_utilstraduit psycopg2(qui exécute généralement toutes les demandes dans une transaction) en mode de validation automatique.

Autre caractĂ©ristique importante: si au moins un client est connectĂ© Ă  PostgreSQL, la base de donnĂ©es ne peut pas ĂȘtre supprimĂ©e, mais sqlalchemy_utilsdĂ©connecte tous les clients avant de supprimer la base de donnĂ©es. La base de donnĂ©es sera supprimĂ©e avec succĂšs mĂȘme si un test avec des connexions actives s'y bloque.

Nous avons besoin de PostgreSQL dans différents états: pour tester les migrations, nous avons besoin d'une base de données propre, tandis que les gestionnaires exigent que toutes les migrations soient appliquées. Vous pouvez modifier par programme l'état d'une base de données à l'aide des commandes Alembic; elles nécessitent l'objet de configuration Alembic pour les appeler.

Créer un objet de configuration de luminaire Alembic
from types import SimpleNamespace

import pytest

from analyzer.utils.pg import make_alembic_config


@pytest.fixture()
def alembic_config(postgres):
    cmd_options = SimpleNamespace(config='alembic.ini', name='alembic',
                                  pg_url=postgres, raiseerr=False, x=None)
    return make_alembic_config(cmd_options)

Veuillez noter que les appareils alembic_configont un paramÚtre postgres- pytestpermet non seulement d'indiquer la dépendance du test sur les appareils, mais aussi les dépendances entre appareils.

Ce mécanisme vous permet de séparer de maniÚre flexible la logique et d'écrire du code trÚs concis et réutilisable.

Gestionnaires


Le test des gestionnaires nĂ©cessite une base de donnĂ©es avec des tables et des types de donnĂ©es crĂ©Ă©s. Pour appliquer des migrations, vous devez appeler par programme la commande de mise Ă  niveau d'Alembic. Pour l'appeler, vous avez besoin d'un objet avec la configuration Alembic, que nous avons dĂ©jĂ  dĂ©fini avec des fixtures alembic_config. La base de donnĂ©es avec migrations ressemble Ă  une entitĂ© complĂštement indĂ©pendante, et elle peut ĂȘtre reprĂ©sentĂ©e comme un appareil:

from alembic.command import upgrade

@pytest.fixture
async def migrated_postgres(alembic_config, postgres):
    upgrade(alembic_config, 'head')
    #  DSN  ,    
    return postgres

Lorsqu'il y a de nombreuses migrations dans le projet, leur application pour chaque test peut prendre trop de temps. Pour accélérer le processus, vous pouvez créer une base de données avec des migrations une fois , puis l' utiliser comme modÚle .

En plus de la base de données pour tester les gestionnaires, vous aurez besoin d'une application en cours d'exécution, ainsi que d'un client configuré pour fonctionner avec cette application. Pour rendre l'application facile à tester, j'ai mis sa création dans une fonction create_appqui prend des paramÚtres à exécuter: une base de données, un port pour l'API REST, et autres.

Les arguments pour lancer l'application peuvent Ă©galement ĂȘtre reprĂ©sentĂ©s comme un appareil distinct. Pour les crĂ©er, vous devrez dĂ©terminer le port libre pour exĂ©cuter l'application de test et l'adresse de la base de donnĂ©es temporaire migrĂ©e.

Pour déterminer le port libre, j'ai utilisé le luminaire aiomisc_unused_portdu package aiomisc.

Un appareil standard aiohttp_unused_portconviendrait également, mais il renvoie une fonction pour déterminer les ports libres, tandis qu'il aiomisc_unused_portrenvoie immédiatement le numéro de port. Pour notre application, nous devons déterminer un seul port libre, j'ai donc décidé de ne pas écrire une ligne de code supplémentaire avec un appel aiohttp_unused_port.

@pytest.fixture
def arguments(aiomisc_unused_port, migrated_postgres):
    return parser.parse_args(
        [
            '--log-level=debug',
            '--api-address=127.0.0.1',
            f'--api-port={aiomisc_unused_port}',
            f'--pg-url={migrated_postgres}'
        ]
    )

Tous les tests avec des gestionnaires impliquent des demandes à l'API REST; travailler directement avec l'application n'est aiohttppas requis. Par conséquent, j'ai créé un appareil qui lance l'application et, en utilisant l'usine, aiohttp_clientcrée et renvoie un client de test standard connecté à l'application aiohttp.test_utils.TestClient.

from analyzer.api.app import create_app

@pytest.fixture
async def api_client(aiohttp_client, arguments):
    app = create_app(arguments)
    client = await aiohttp_client(app, server_kwargs={
        'port': arguments.api_port
    })

    try:
        yield client
    finally:
        await client.close()

Maintenant, si vous spécifiez le luminaire dans les paramÚtres de test api_client, les événements suivants se produisent:

  1. postgres ( migrated_postgres).
  2. alembic_config Alembic, ( migrated_postgres).
  3. migrated_postgres ( arguments).
  4. aiomisc_unused_port ( arguments).
  5. arguments ( api_client).
  6. api_client .
  7. .
  8. api_client .
  9. postgres .

Les appareils peuvent Ă©viter la duplication de code, mais en plus de prĂ©parer l'environnement dans les tests, il y a un autre endroit potentiel oĂč il y aura beaucoup du mĂȘme code - les demandes d'application.

Tout d'abord, en faisant une demande, nous nous attendons Ă  obtenir un certain statut HTTP. DeuxiĂšmement, si l'Ă©tat correspond Ă  celui attendu, avant de travailler avec les donnĂ©es, vous devez vous assurer qu'elles ont le bon format. Il est facile de faire une erreur ici et d'Ă©crire un gestionnaire qui effectue les calculs corrects et renvoie le rĂ©sultat correct, mais ne passe pas la validation automatique en raison du format de rĂ©ponse incorrect (par exemple, oubliez d'envelopper la rĂ©ponse dans un dictionnaire avec une clĂ© data). Toutes ces vĂ©rifications pourraient ĂȘtre effectuĂ©es en un seul endroit.

Dans le moduleanalyzer.testing J'ai préparé pour chaque gestionnaire une fonction d'aide qui vérifie l'état de HTTP, ainsi que le format de réponse à l'aide de Marshmallow.

GET / importations / $ import_id / citoyens


J'ai décidé de commencer avec un gestionnaire qui renvoie des résidents, car il est trÚs utile pour vérifier les résultats d'autres gestionnaires qui modifient l'état de la base de données.

Je n'ai intentionnellement pas utilisĂ© de code qui ajoute des donnĂ©es Ă  la base de donnĂ©es du gestionnaire POST /imports, bien qu'il ne soit pas difficile d'en faire une fonction distincte. Le code du gestionnaire a la propriĂ©tĂ© de changer et s'il y a une erreur dans le code qui s'ajoute Ă  la base de donnĂ©es, il y a une chance que le test cesse de fonctionner comme prĂ©vu et implicitement pour les dĂ©veloppeurs arrĂȘte d'afficher des erreurs.

Pour ce test, j'ai défini les ensembles de données de test suivants:

  • DĂ©chargement avec plusieurs proches. VĂ©rifie que pour chaque rĂ©sident une liste avec les identifiants des proches sera correctement constituĂ©e.
  • DĂ©chargement avec un rĂ©sident sans parents. VĂ©rifie que le champ relativesest une liste vide (en raison LEFT JOINde la requĂȘte SQL, la liste des parents peut ĂȘtre Ă©gale [None]).
  • DĂ©chargement avec un rĂ©sident qui est un parent de lui-mĂȘme.
  • DĂ©chargement vide. VĂ©rifie que le gestionnaire autorise l'ajout de dĂ©chargement vide et ne se bloque pas avec une erreur.

Pour exĂ©cuter le mĂȘme test sĂ©parĂ©ment Ă  chaque tĂ©lĂ©chargement, j'ai utilisĂ© un autre mĂ©canisme Pytest trĂšs puissant - le paramĂ©trage . Ce mĂ©canisme vous permet d'envelopper la fonction de test dans le dĂ©corateur pytest.mark.parametrizeet de dĂ©crire les paramĂštres que la fonction de test doit prendre pour chaque cas de test individuel.

Comment paramétrer un test
import pytest

from analyzer.utils.testing import generate_citizen

datasets = [
    #    
    [
        generate_citizen(citizen_id=1, relatives=[2, 3]),
        generate_citizen(citizen_id=2, relatives=[1]),
        generate_citizen(citizen_id=3, relatives=[1])
    ],

    #   
    [
        generate_citizen(relatives=[])
    ],

    #   ,    
    [
        generate_citizen(citizen_id=1, name='', gender='male',
                         birth_date='17.02.2020', relatives=[1])
    ],

    #  
    [],
]


@pytest.mark.parametrize('dataset', datasets)
async def test_get_citizens(api_client, dataset):
    """
        4 ,    
    """

Ainsi, le test ajoutera le téléchargement à la base de données, puis, à l'aide d'une demande au gestionnaire, il recevra des informations sur les résidents et comparera le téléchargement de référence avec celui reçu. Mais comment comparez-vous les résidents?

Chaque rĂ©sident se compose de champs scalaires et d'un champ relatives- une liste d'identifiants de parents. Une liste en Python est un type ordonnĂ©, et lors de la comparaison de l'ordre des Ă©lĂ©ments de chaque liste importe, mais lors de la comparaison des listes avec les frĂšres et sƓurs, l'ordre ne devrait pas avoir d'importance.

Si vous apportez relativesĂ  l'ensemble avant la comparaison, alors lors de la comparaison, cela ne fonctionne pas pour trouver une situation oĂč l'un des habitants sur le terrain relativesa des doublons. Si vous triez la liste avec les identifiants des proches, cela Ă©vitera le problĂšme de l'ordre diffĂ©rent des identifiants des proches, mais en mĂȘme temps dĂ©tectera les doublons.

Lors de la comparaison de deux listes avec des rĂ©sidents, l'un peut rencontrer un problĂšme similaire: techniquement, l'ordre des rĂ©sidents dans le dĂ©chargement n'est pas important, mais il est important de dĂ©tecter s'il y a deux rĂ©sidents avec les mĂȘmes identifiants dans un dĂ©chargement et pas dans l'autre. Ainsi, en plus d'organiser la liste avec des proches, les proches de chaque rĂ©sident doivent organiser les rĂ©sidents Ă  chaque dĂ©chargement.

Puisque la tùche de comparer les résidents se posera plus d'une fois, j'ai mis en place deux fonctions: une pour comparer deux résidents, et la seconde pour comparer deux listes avec des résidents:

Comparez les résidents
from typing import Iterable, Mapping

def normalize_citizen(citizen):
    """
         
    """
    return {**citizen, 'relatives': sorted(citizen['relatives'])}


def compare_citizens(left: Mapping, right: Mapping) -> bool:
    """
      
    """
    return normalize_citizen(left) == normalize_citizen(right)


def compare_citizen_groups(left: Iterable, right: Iterable) -> bool:
    """
          ,   
      
    """
    left = [normalize_citizen(citizen) for citizen in left]
    left.sort(key=lambda citizen: citizen['citizen_id'])

    right = [normalize_citizen(citizen) for citizen in right]
    right.sort(key=lambda citizen: citizen['citizen_id'])
    return left == right

Pour m'assurer que ce gestionnaire ne retourne pas les résidents d'autres déchargements, j'ai décidé d'ajouter un déchargement supplémentaire avec un habitant avant chaque test.

POST / importations


J'ai défini les jeux de données suivants pour tester le gestionnaire:

  • DonnĂ©es correctes, censĂ©es ĂȘtre ajoutĂ©es avec succĂšs Ă  la base de donnĂ©es.

    • ( ).

      . , , insert , .
    • ( , ).

      , .
    • .

      , . :)


    • , aiohttp PostgreSQL 32 767 ( ).
    • DĂ©chargement vide

      Le gestionnaire doit prendre en compte un tel cas et ne pas tomber, en essayant d'effectuer une insertion vide dans la table avec les habitants.

  • DonnĂ©es avec des erreurs, attendez-vous Ă  une rĂ©ponse HTTP de 400: Bad Request.

    • La date de naissance est incorrecte (futur).
    • citizen_id n'est pas unique dans le tĂ©lĂ©chargement.
    • Une parentĂ© est indiquĂ©e de maniĂšre incorrecte (il n'y a qu'un rĂ©sident Ă  l'autre, mais il n'y a pas de retour).
    • Le rĂ©sident a un parent inexistant au dĂ©chargement.
    • Les liens familiaux ne sont pas uniques.

Si le processeur a fonctionné correctement et que les données ont été ajoutées, vous devez ajouter les résidents à la base de données et les comparer avec le déchargement standard. Pour obtenir des résidents, j'ai utilisé le gestionnaire déjà testé GET /imports/$import_id/citizenset, à titre de comparaison, une fonction compare_citizen_groups.

PATCH / imports / $ import_id / citoyens / $ citizen_id


La validation des données est à bien des égards similaire à celle décrite dans le gestionnaire POST /importsà quelques exceptions prÚs: il n'y a qu'un seul résident et le client ne peut passer que les champs qu'il souhaite .

J'ai décidé d'utiliser les ensembles suivants avec des données incorrectes pour vérifier que le gestionnaire retournera une réponse HTTP 400: Bad request:

  • Le champ est spĂ©cifiĂ©, mais a un type et / ou un format de donnĂ©es incorrects
  • La date de naissance est incorrecte (heure future).
  • Le champ relativescontient un parent qui n'existe pas dans le dĂ©chargement.

Il est également nécessaire de vérifier que le gestionnaire met correctement à jour les informations sur le résident et ses proches.

Pour ce faire, créez un téléchargement avec trois habitants, dont deux sont des parents, et envoyez une demande avec de nouvelles valeurs pour tous les champs scalaires et un nouvel identifiant relatif dans le champ relatives.

Pour m'assurer que le gestionnaire distingue les rĂ©sidents de dĂ©chargements diffĂ©rents avant le test (et, par exemple, ne change pas les rĂ©sidents avec les mĂȘmes identifiants d'un autre dĂ©chargement), j'ai crĂ©Ă© un dĂ©chargement supplĂ©mentaire avec trois rĂ©sidents qui ont les mĂȘmes identifiants.

Le gestionnaire doit enregistrer les nouvelles valeurs des champs scalaires, ajouter un nouveau parent spĂ©cifiĂ© et supprimer la relation avec un ancien parent non spĂ©cifiĂ©. Tous les changements de parentĂ© devraient ĂȘtre bilatĂ©raux. Il ne devrait pas y avoir de changement dans les autres dĂ©chargements.

Étant donnĂ© qu'un tel gestionnaire peut ĂȘtre soumis Ă  des conditions de concurrence (cela a Ă©tĂ© discutĂ© dans la section DĂ©veloppement), j'ai ajoutĂ© deux tests supplĂ©mentaires . L'un reproduit le problĂšme avec l'Ă©tat de race (Ă©tend la classe de gestionnaire et supprime le verrou), le second prouve que le problĂšme avec l'Ă©tat de race n'est pas reproduit.

GET / importations / $ import_id / citoyens / anniversaires


Pour tester ce gestionnaire, j'ai sélectionné les jeux de données suivants:

  • Un dĂ©chargement dans lequel un rĂ©sident a un parent en un mois et deux parents en un autre.
  • DĂ©chargement avec un rĂ©sident sans parents. VĂ©rifie que le gestionnaire n'en tient pas compte dans les calculs.
  • DĂ©chargement vide. VĂ©rifie que le gestionnaire n'Ă©chouera pas et renverra le dictionnaire correct avec 12 mois dans la rĂ©ponse.
  • DĂ©chargement avec un rĂ©sident qui est un parent de lui-mĂȘme. VĂ©rifie qu'un rĂ©sident achĂštera un cadeau pour le mois de sa naissance.

Le gestionnaire doit retourner tous les mois dans la rĂ©ponse, mĂȘme s'il n'y a pas d'anniversaire au cours de ces mois. Pour Ă©viter les doublons, j'ai crĂ©Ă© une fonction Ă  laquelle vous pouvez passer le dictionnaire afin qu'il le complĂšte avec des valeurs pour les mois manquants.

Pour m'assurer que le gestionnaire fait la distinction entre les résidents de déchargements différents, j'ai ajouté un déchargement supplémentaire avec deux parents. Si le gestionnaire les utilise par erreur dans les calculs, les résultats seront incorrects et le gestionnaire tombera avec une erreur.

GET / imports / $ import_id / towns / stat / percentile / age


La particularitĂ© de ce test est que les rĂ©sultats de son travail dĂ©pendent de l'heure actuelle: l'Ăąge des habitants est calculĂ© en fonction de la date du jour. Pour s'assurer que les rĂ©sultats des tests ne changent pas au fil du temps, la date actuelle, les dates de naissance des rĂ©sidents et les rĂ©sultats attendus doivent ĂȘtre enregistrĂ©s. Cela facilitera la reproduction de tous les cas de bord, mĂȘme.

Quelle est la meilleure date de correction? Le gestionnaire utilise la fonction PostgreSQL pour calculer l'ùge des résidents AGE, qui prend le premier paramÚtre comme date pour laquelle il est nécessaire de calculer l'ùge et le second comme date de base (définie par une constante TownAgeStatView.CURRENT_DATE).

Nous remplaçons la date de base dans le gestionnaire par l'heure du test
from unittest.mock import patch

import pytz

CURRENT_DATE = datetime(2020, 2, 17, tzinfo=pytz.utc)


@patch('analyzer.api.handlers.TownAgeStatView.CURRENT_DATE', new=CURRENT_DATE)
async def test_get_ages(...):
    ...

Pour tester le gestionnaire, j'ai sélectionné les ensembles de données suivants (pour tous les résidents, j'ai indiqué une ville, car le gestionnaire agrÚge les résultats par ville):

  • DĂ©chargement avec plusieurs rĂ©sidents dont l'anniversaire est demain (Ăąge - plusieurs annĂ©es et 364 jours). VĂ©rifie que le processeur utilise uniquement le nombre d'annĂ©es complĂštes dans les calculs.
  • DĂ©chargement avec un rĂ©sident dont l'anniversaire est aujourd'hui (Ăąge - exactement quelques annĂ©es). Il vĂ©rifie le cas rĂ©gional - l'Ăąge d'un rĂ©sident dont l'anniversaire est aujourd'hui ne doit pas ĂȘtre calculĂ© comme rĂ©duit d'un an.
  • DĂ©chargement vide. Le gestionnaire ne doit pas tomber dessus.

La numpyréférence pour le calcul des centiles - avec interpolation linéaire, et les résultats de référence pour les tests que j'ai calculés pour eux.

Vous devez Ă©galement arrondir les valeurs de percentile fractionnaire Ă  deux dĂ©cimales. Si vous avez utilisĂ© PostgreSQL pour l'arrondi dans le gestionnaire et Python pour le calcul des donnĂ©es de rĂ©fĂ©rence, vous remarquerez peut-ĂȘtre que l' arrondi dans Python 3 et PostgreSQL peut donner des rĂ©sultats diffĂ©rents .

par exemple
# Python 3
round(2.5)
> 2

-- PostgreSQL
SELECT ROUND(2.5)
> 3

Le fait est que Python utilise l'arrondi de banque au pair le plus proche , et PostgreSQL utilise mathématique (demi-hausse). Si les calculs et l'arrondi sont effectués dans PostgreSQL, il serait correct d'utiliser également l'arrondi mathématique dans les tests.

Au début, j'ai décrit des ensembles de données avec des dates de naissance dans un format texte, mais il n'était pas pratique de lire un test dans ce format: chaque fois, je devais calculer l'ùge de chaque habitant dans mon esprit afin de me rappeler ce qu'un ensemble de données particulier vérifiait. Bien sûr, vous pouviez vous en sortir avec les commentaires du code, mais j'ai décidé d'aller un peu plus loin et j'ai écrit une fonction age2datequi vous permet de décrire la date de naissance sous forme d'ùge: le nombre d'années et de jours.

Par exemple, comme ceci
import pytz

from analyzer.utils.testing import generate_citizen


CURRENT_DATE = datetime(2020, 2, 17, tzinfo=pytz.utc)

def age2date(years: int, days: int = 0, base_date=CURRENT_DATE) -> str:
    birth_date = copy(base_date).replace(year=base_date.year - years)
    birth_date -= timedelta(days=days)
    return birth_date.strftime(BIRTH_DATE_FORMAT)

#    ?  ,     ?
generate_citizen(birth_date='17.02.2009')

#   11       
generate_citizen(birth_date=age2date(years=11))

Pour m'assurer que le gestionnaire distingue les résidents de déchargements différents, j'ai ajouté un déchargement supplémentaire avec un résident d'une autre ville: si le gestionnaire l'utilise par erreur, une ville supplémentaire apparaßtra dans les résultats et le test se cassera.

Un fait intéressant: lorsque j'ai écrit ce test le 29 février 2020, j'ai soudainement cessé de générer des décharges avec les résidents en raison d'un bug dans Faker (2020 est une année bissextile, et d'autres années que Faker a choisies n'étaient pas toujours des années bissextiles aussi) n'était pas le 29 février). N'oubliez pas d'enregistrer les dates et de tester les cas limites!

Migrations


Le code de migration Ă  premiĂšre vue semble Ă©vident et le moins sujet aux erreurs, pourquoi le tester? Il s'agit d'une erreur trĂšs dangereuse: les erreurs de migration les plus insidieuses peuvent se manifester au moment le plus inopportun. MĂȘme s'ils ne gĂąchent pas les donnĂ©es, ils peuvent entraĂźner des temps d'arrĂȘt inutiles.

La migration initiale existante dans le projet modifie la structure de la base de donnĂ©es, mais ne modifie pas les donnĂ©es. Quelles erreurs courantes peuvent ĂȘtre protĂ©gĂ©es contre de telles migrations?

  • downgrade ( , , ).

    , (--): , — .
  • C .
  • ( ).

La plupart de ces erreurs seront dĂ©tectĂ©es par le test d'escalier . Son idĂ©e - d'utiliser une seule migration, effectuer systĂ©matiquement les mĂ©thodes upgrade, downgrade, upgradepour chaque migration. Un tel test suffit Ă  ĂȘtre ajoutĂ© au projet une fois, il ne nĂ©cessite pas de support et servira fidĂšlement.

Mais si la migration, en plus de la structure, modifiait les donnĂ©es, il faudrait alors Ă©crire au moins un test distinct, en vĂ©rifiant que les donnĂ©es changent correctement dans la mĂ©thode upgradeet reviennent Ă  l'Ă©tat initial dans downgrade. Juste au cas oĂč: un projet avec des exemples de tests de diffĂ©rentes migrations , que j'ai prĂ©parĂ© pour un rapport sur Alembic Ă  Moscou Python.

Assemblée


L'artefact final que nous allons dĂ©ployer et que nous voulons obtenir Ă  la suite de l'assemblage est une image Docker. Pour construire, vous devez sĂ©lectionner l'image de base avec Python. L'image officielle python:latestpĂšse ~ 1 Go et, si elle est utilisĂ©e comme image de base, l'image avec l'application sera Ă©norme. Il existe des images basĂ©es sur Alpine OS , dont la taille est beaucoup plus petite. Mais avec un nombre croissant de packages installĂ©s, la taille de l'image finale augmentera et, par consĂ©quent, mĂȘme l'image collectĂ©e sur la base d'Alpine ne sera pas si petite. J'ai choisi snakepacker / python comme image de base - elle pĂšse un peu plus que les images alpines, mais est basĂ©e sur Ubuntu, qui propose une vaste sĂ©lection de packages et de bibliothĂšques.

AutrementrĂ©duire la taille de l'image avec l'application - n'incluez pas dans l'image finale le compilateur, les bibliothĂšques et les fichiers avec des en-tĂȘtes pour l'assemblage, qui ne sont pas nĂ©cessaires au fonctionnement de l'application.

Pour ce faire, vous pouvez utiliser l' assemblage en plusieurs Ă©tapes de Docker:

  1. À l'aide d'une image «lourde» snakepacker/python:all(~ 1 Go, ~ 500 Mo compressĂ©s), crĂ©ez un environnement virtuel, installez-y toutes les dĂ©pendances et le package d'application. Cette image est nĂ©cessaire exclusivement pour l'assemblage, elle peut contenir un compilateur, toutes les bibliothĂšques et fichiers nĂ©cessaires avec des en-tĂȘtes.

    FROM snakepacker/python:all as builder
    
    #   
    RUN python3.8 -m venv /usr/share/python3/app
    
    #  source distribution     
    COPY dist/ /mnt/dist/
    RUN /usr/share/python3/app/bin/pip install /mnt/dist/*
  2. Nous copions l'environnement virtuel fini dans une image «légÚre» snakepacker/python:3.8(~ 100 Mo, compressée ~ 50 Mo), qui ne contient que l'interpréteur de la version requise de Python.

    Important: dans un environnement virtuel, des chemins absolus sont utilisĂ©s, il doit donc ĂȘtre copiĂ© Ă  la mĂȘme adresse Ă  laquelle il a Ă©tĂ© assemblĂ© dans le conteneur collecteur.

    FROM snakepacker/python:3.8 as api
    
    #       builder
    COPY --from=builder /usr/share/python3/app /usr/share/python3/app
    
    #  ,     
    # 
    RUN ln -snf /usr/share/python3/app/bin/analyzer-* /usr/local/bin/
    
    #        
    CMD ["analyzer-api"]

Pour rĂ©duire le temps nĂ©cessaire Ă  la crĂ©ation de l'image , les modules dĂ©pendants de l'application peuvent ĂȘtre installĂ©s avant d'ĂȘtre installĂ©s dans l'environnement virtuel. Docker les mettra en cache et ne rĂ©installera pas s'ils n'ont pas changĂ©.

Dockerfile entiĂšrement
###############      ################
#  — «» (~1 ,    ~500 )    
#    
FROM snakepacker/python:all as builder

#      pip
RUN python3.8 -m venv /usr/share/python3/app
RUN /usr/share/python3/app/bin/pip install -U pip

#   ,  .   
# Docker   ,  requirements.txt  
COPY requirements.txt /mnt/
RUN /usr/share/python3/app/bin/pip install -Ur /mnt/requirements.txt

#  source distribution     
COPY dist/ /mnt/dist/
RUN /usr/share/python3/app/bin/pip install /mnt/dist/* \
    && /usr/share/python3/app/bin/pip check

###########################   ############################
#    «» (~100 ,    ~50 )   Python
FROM snakepacker/python:3.8 as api

#         builder
COPY --from=builder /usr/share/python3/app /usr/share/python3/app

#  ,     
# 
RUN ln -snf /usr/share/python3/app/bin/analyzer-* /usr/local/bin/

#        
CMD ["analyzer-api"]

Pour faciliter l'assemblage, j'ai ajouté une commande make uploadqui collecte l'image Docker et la télécharge sur hub.docker.com.

Ci


Maintenant que le code est couvert de tests et que nous pouvons construire une image Docker, il est temps d'automatiser ces processus. La premiÚre chose qui vous vient à l'esprit: exécutez des tests pour créer des demandes de pool et, lorsque vous ajoutez des modifications à la branche principale, collectez une nouvelle image Docker et téléchargez-la sur le Docker Hub (ou les packages GitHub , si vous n'allez pas distribuer l'image publiquement).

J'ai résolu ce problÚme avec les actions GitHub . Pour ce faire, il a fallu créer un fichier YAML dans un dossier .github/workflowset y décrire un workflow (avec deux tùches: testet publish), que j'ai nommé CI.

La tùche testest exécutée à chaque démarrage du workflow CI, à l'aide de servicesrécupÚre un conteneur avec PostgreSQL, attend qu'il soit disponible et se lance pytestdans le conteneur snakepacker/python:all.

La tùche publishn'est exécutée que si les modifications ont été ajoutées à la branche masteret si la tùche testa réussi. Il collecte la distribution source par le conteneur snakepacker/python:all, puis collecte et charge l'image Docker avec docker/build-push-action@v1.

Description complĂšte du workflow
name: CI

# Workflow      
#   -  master
on:
  push:
    branches: [ master ]
  pull_request:
    branches: [ master ]

jobs:
  #       workflow
  test:
    runs-on: ubuntu-latest

    services:
      postgres:
        image: docker://postgres
        ports:
          - 5432:5432
        env:
          POSTGRES_USER: user
          POSTGRES_PASSWORD: hackme
          POSTGRES_DB: analyzer

    steps:
      - uses: actions/checkout@v2
      - name: test
        uses: docker://snakepacker/python:all
        env:
          CI_ANALYZER_PG_URL: postgresql://user:hackme@postgres/analyzer
        with:
          args: /bin/bash -c "pip install -U '.[dev]' && pylama && wait-for-port postgres:5432 && pytest -vv --cov=analyzer --cov-report=term-missing tests"

  #    Docker-  
  publish:
    #        master
    if: github.event_name == 'push' && github.ref == 'refs/heads/master'
    # ,   test   
    needs: test
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v2
      - name: sdist
        uses: docker://snakepacker/python:all
        with:
          args: make sdist

      - name: build-push
        uses: docker/build-push-action@v1
        with:
          username: ${{ secrets.REGISTRY_LOGIN }}
          password: ${{ secrets.REGISTRY_TOKEN }}
          repository: alvassin/backendschool2019
          target: api
          tags: 0.0.1, latest

Maintenant, lorsque vous ajoutez des modifications au maĂźtre dans l'onglet Actions sur GitHub, vous pouvez voir le lancement des tests, l'assemblage et le chargement de l'image Docker:



Et lors de la création d'une demande de pool dans la branche principale, les résultats de la tùche y seront également affichés test:



DĂ©ployer


Pour déployer l'application sur le serveur fourni, vous devez installer Docker, Docker Compose, démarrer les conteneurs avec l'application et PostgreSQL et appliquer les migrations.

Ces Ă©tapes peuvent ĂȘtre automatisĂ©es Ă  l'aide du systĂšme de gestion de configuration d'Ansible. Il est Ă©crit en Python, ne nĂ©cessite pas d'agents spĂ©ciaux (se connecte directement via ssh), utilise des modĂšles jinja et permet de dĂ©crire de maniĂšre dĂ©clarative l'Ă©tat souhaitĂ© dans les fichiers YAML. L'approche dĂ©clarative vous permet de ne pas penser Ă  l'Ă©tat actuel du systĂšme et aux actions nĂ©cessaires pour amener le systĂšme Ă  l'Ă©tat souhaitĂ©. Tout ce travail repose sur les Ă©paules des modules Ansible.

Ansible vous permet de regrouper les tùches logiquement liées en rÎles , puis de les réutiliser. Nous aurons besoin de deux rÎles:docker(installe et configure Docker) et analyzer(installe et configure l'application).

Le rÎledocker ajoute un référentiel avec Docker au systÚme, installe et configure les packages docker-ceet docker-compose.

Vous pouvez Ă©ventuellement configurer l'API REST pour qu'elle reprenne automatiquement aprĂšs un redĂ©marrage du serveur. Ubuntu vous permet de rĂ©soudre ce problĂšme Ă  l'aide d'un systĂšme d'initialisation systemd. Il contrĂŽle des unitĂ©s reprĂ©sentant diverses ressources (dĂ©mons, sockets, points de montage et autres). Pour ajouter une nouvelle unitĂ© Ă  systemd, vous devez dĂ©crire sa configuration dans un fichier .service distinct et placer ce fichier dans l'un des dossiers spĂ©ciaux, par exemple, dans /etc/systemd/system. Ensuite, l'unitĂ© peut ĂȘtre lancĂ©e et activer le chargement automatique pour elle.

Paquetdocker-celors de l'installation, il créera automatiquement un fichier avec la configuration de l'unité - il vous suffit de vous assurer qu'il est en cours d'exécution et qu'il s'allume au démarrage du systÚme. Pour Docker Compose docker-compose@.servicesera créé par Ansible. Le symbole @dans le nom indique à systemd que l'unité est un modÚle. Cela vous permet de démarrer le service docker-composeavec un paramÚtre - par exemple, avec le nom de notre service, qui sera remplacé à la place du %ifichier de configuration de l'unité:

[Unit]
Description=%i service with docker compose
Requires=docker.service
After=docker.service

[Service]
Type=oneshot
RemainAfterExit=true
WorkingDirectory=/etc/docker/compose/%i
ExecStart=/usr/local/bin/docker-compose up -d --remove-orphans
ExecStop=/usr/local/bin/docker-compose down

[Install]
WantedBy=multi-user.target

Le rĂŽleanalyzer gĂ©nĂ©rera un fichier Ă  partir du modĂšle docker-compose.ymlĂ  l'adresse /etc/docker/compose/analyzer, enregistrera l'application en tant que service lancĂ© automatiquement systemdet appliquera la migration. Lorsque les rĂŽles sont prĂȘts, vous devez dĂ©crire le playbook.

---

- name: Gathering facts
  hosts: all
  become: yes
  gather_facts: yes

- name: Install docker
  hosts: docker
  become: yes
  gather_facts: no
  roles:
    - docker

- name: Install analyzer
  hosts: api
  become: yes
  gather_facts: no
  roles:
    - analyzer

La liste des hĂŽtes, ainsi que les variables utilisĂ©es dans les rĂŽles, peuvent ĂȘtre spĂ©cifiĂ©es dans le fichier d'inventaire hosts.ini.

[api]
130.193.51.154

[docker:children]
api

[api:vars]
analyzer_image = alvassin/backendschool2019
analyzer_pg_user = user
analyzer_pg_password = hackme
analyzer_pg_dbname = analyzer

Une fois tous les fichiers Ansible prĂȘts, exĂ©cutez-le:

$ ansible-playbook -i hosts.ini deploy.yml

À propos des tests de rĂ©sistance
, , . , - . : , — , 10 . , (, , CI-): .

, , , 10 . ? , , . , , .

RPS, : . , , import_id, POST /imports . .

, Python 3, Locust.

, locustfile.py locust. - .

Locust . , .
self.round .

locustfile.py
# locustfile.py
import logging
from http import HTTPStatus

from locust import HttpLocust, constant, task, TaskSet
from locust.exception import RescheduleTask

from analyzer.api.handlers import (
    CitizenBirthdaysView, CitizensView, CitizenView, TownAgeStatView
)
from analyzer.utils.testing import generate_citizen, generate_citizens, url_for


class AnalyzerTaskSet(TaskSet):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.round = 0

    def make_dataset(self):
        citizens = [
            #     .   
            # PATCH-  relatives    
            # ,     - 
            # (     ,    
            # ).
            generate_citizen(citizen_id=1, relatives=[2]),
            generate_citizen(citizen_id=2, relatives=[1]),
            *generate_citizens(citizens_num=9998, relations_num=1000,
                               start_citizen_id=3)
        ]
        return {citizen['citizen_id']: citizen for citizen in citizens}

    def request(self, method, path, expected_status, **kwargs):
        with self.client.request(
                method, path, catch_response=True, **kwargs
        ) as resp:
            if resp.status_code != expected_status:
                resp.failure(f'expected status {expected_status}, '
                             f'got {resp.status_code}')
            logging.info(
                'round %r: %s %s, http status %d (expected %d), took %rs',
                self.round, method, path, resp.status_code, expected_status,
                resp.elapsed.total_seconds()
            )
            return resp

    def create_import(self, dataset):
        resp = self.request('POST', '/imports', HTTPStatus.CREATED,
                            json={'citizens': list(dataset.values())})
        if resp.status_code != HTTPStatus.CREATED:
            raise RescheduleTask
        return resp.json()['data']['import_id']

    def get_citizens(self, import_id):
        url = url_for(CitizensView.URL_PATH, import_id=import_id)
        self.request('GET', url, HTTPStatus.OK,
                     name='/imports/{import_id}/citizens')

    def update_citizen(self, import_id):
        url = url_for(CitizenView.URL_PATH, import_id=import_id, citizen_id=1)
        self.request('PATCH', url, HTTPStatus.OK,
                     name='/imports/{import_id}/citizens/{citizen_id}',
                     json={'relatives': [i for i in range(3, 10)]})

    def get_birthdays(self, import_id):
        url = url_for(CitizenBirthdaysView.URL_PATH, import_id=import_id)
        self.request('GET', url, HTTPStatus.OK,
                     name='/imports/{import_id}/citizens/birthdays')

    def get_town_stats(self, import_id):
        url = url_for(TownAgeStatView.URL_PATH, import_id=import_id)
        self.request('GET', url, HTTPStatus.OK,
                     name='/imports/{import_id}/towns/stat/percentile/age')

    @task
    def workflow(self):
        self.round += 1
        dataset = self.make_dataset()

        import_id = self.create_import(dataset)
        self.get_citizens(import_id)
        self.update_citizen(import_id)
        self.get_birthdays(import_id)
        self.get_town_stats(import_id)


class WebsiteUser(HttpLocust):
    task_set = AnalyzerTaskSet
    wait_time = constant(1)

100 c , , :



, ( — 95 , — ). .



— Ansible ~20.15 ~20.30 Locust.


Que peut-on faire d'autre?


Le profilage de l'application a montrĂ© qu'environ un quart du temps total d'exĂ©cution des requĂȘtes est consacrĂ© Ă  la sĂ©rialisation et Ă  la dĂ©sĂ©rialisation de JSON: de nombreuses donnĂ©es sont envoyĂ©es et reçues du service. Ces processus peuvent ĂȘtre considĂ©rablement accĂ©lĂ©rĂ©s Ă  l'aide de la bibliothĂšque orjson , mais le service devra ĂȘtre un peu prĂ©parĂ© - orjsonil ne s'agit pas d'un remplacement direct pour le module standard. json

Habituellement, la production nĂ©cessite plusieurs copies du service pour garantir la tolĂ©rance aux pannes et faire face Ă  la charge. Pour gĂ©rer un groupe de services, vous avez besoin d'un outil qui indique si une copie du service est "vivante". Ce problĂšme peut ĂȘtre rĂ©solu par un gestionnaire /healthqui interroge toutes les ressources nĂ©cessaires au travail, dans notre cas, une base de donnĂ©es. SiSELECT 1exĂ©cutĂ© en moins d'une seconde, puis le service est vivant. Sinon, vous devez y prĂȘter attention.

Lorsqu'une application fonctionne de maniÚre trÚs intensive avec un réseau, uvloop peut augmenter froidement les performances.

Un facteur important est la lisibilité du code. Un de mes collÚgues, Yuri Shikanov, a écrit un module gris combinant plusieurs outils pour la vérification automatique et l'exécution de code, qui est facile à ajouter à un pre-commithook Git, mis en place avec un seul fichier de configuration ou des variables d'environnement. Gray vous permet de trier les importations ( isort ), optimise les expressions python en fonction des nouvelles versions du langage ( pyupgrade ), ajoute des virgules à la fin des appels de fonction, les importations, les listes, etc. (add-trailing-virgule ), ainsi que des guillemets à un seul formulaire ( unifier ).

* * *


C'est tout pour moi: nous avons développé, couvert de tests, assemblé et déployé le service, et également effectué des tests de charge.

Remerciements


Je voudrais exprimer ma profonde gratitude aux gars qui ont pris le temps de participer à la rédaction de cet article, de revoir le code, de présenter mes idées et commentaires: à Maria Zelenova zelma, Vladimir Solomatin leenr, Anastasia Semenova morkov, Yuri Shikanov dizballanze, Mikhail Shushpanov mishush, Pavel Mosein pavkazzz et surtout à Dmitry Orlov orlovdl.

All Articles