Comme j'ai écrit mon messager

Un soir, après une autre journée frustrante, remplie de tentatives d'équilibrer le jeu, j'ai décidé que j'avais un besoin urgent de repos. Je vais passer à un autre projet, le faire rapidement, rendre l’estime de soi qui a baissé au cours du développement du jeu et qui va prendre le jeu par la tempête avec une vigueur renouvelée! L'essentiel est de choisir un projet agréable et relaxant ... Ecrivez votre propre messager? Ha! À quel point cela peut-il être dur?

Le code peut être trouvé ici .


Bref historique


Pendant près d'un an avant de commencer à travailler sur le messager, il travaillait sur le jeu en ligne multijoueur Line Tower Wars. La programmation s'est bien passée, tout le reste (équilibre et visuel en particulier) n'était pas très bon. Soudain, il s'est avéré que faire un jeu et faire un jeu amusant (amusant pour quelqu'un d'autre que lui-même) sont deux choses différentes. Après un an d'épreuve, j'avais besoin de me laisser distraire, alors j'ai décidé de m'essayer à autre chose. Le choix s'est porté sur le développement mobile, à savoir Flutter. J'ai entendu beaucoup de bonnes choses à propos de Flutter, et j'ai aimé la fléchette après une courte expérience. J'ai décidé d'écrire mon propre messager. Tout d'abord, il est recommandé d'implémenter à la fois le client et le serveur. Deuxièmement, il y aura quelque chose d'important à mettre dans le portefeuille pour chercher du travail, je suis juste dans le processus.

Fonctionnalité planifiée


  • Discussions privées et en groupe
  • Envoi de texte, d'images et de vidéos
  • Appels audio et vidéo
  • Confirmation de réception et lecture (ticks de Votsap)
  • "Impressions ..."
  • Notifications
  • Recherche par code QR et géolocalisation

Pour l'avenir, je peux dire avec fierté (et avec soulagement) que presque tout ce qui est prévu a été mis en œuvre, et qui n'a pas encore été mis en œuvre - le sera dans un proche avenir.



Sélection de la langue


Je n'ai pas réfléchi longtemps au choix de la langue. Au début, il était tentant d'utiliser la fléchette pour le client et le serveur, mais une inspection plus détaillée a montré qu'il n'y avait pas beaucoup de pilotes pour les fléchettes disponibles, et ceux qui n'inspiraient pas beaucoup de confiance. Bien que je ne me porte pas garant de parler de l'instant présent, la situation s'est peut-être améliorée. Mon choix s'est donc porté sur C #, avec lequel j'ai travaillé dans Unity.

Architecture


Il a commencé par réfléchir à l'architecture. Bien sûr, étant donné que 3 personnes et demie utiliseront très probablement mon messager, on n'aurait pas à se soucier de l'architecture en général. Vous prenez et faites comme dans d'innombrables tutoriels. Voici le nœud, voici le mongo, voici les sockets web. Terminé. Et Firebase est ici. Mais ce n'est pas intéressant. J'ai décidé de créer un messager qui peut facilement évoluer horizontalement, comme si j'attendais des millions de clients simultanés. Cependant, n'ayant aucune expérience dans ce domaine, j'ai dû tout apprendre en pratique par la méthode des erreurs et encore des erreurs.

L'architecture finale ressemble à ceci


Je ne prétends pas qu'une telle architecture est super cool et fiable, mais elle est viable et devrait en théorie résister à de lourdes charges et évoluer horizontalement, mais je ne comprends pas vraiment comment vérifier. Et j'espère que je n'ai pas manqué un moment évident qui est connu de tout le monde sauf moi.

Vous trouverez ci-dessous une description détaillée des composants individuels.

Serveur frontal


Avant même de commencer à créer le jeu, j'étais fasciné par le concept d'un serveur asynchrone à thread unique. Effectivement et sans race'ov potentielle - que pouvez-vous demander d'autre. Afin de comprendre comment ces serveurs sont organisés, j'ai commencé à me plonger dans le module de asynciolangage python. La solution que j'ai vue semblait très élégante. En bref, la solution de pseudo-code ressemble à ceci.
//  ,      ,    
//       .      socket.Receive
//     , :
var bytesReceived = Completer<object>();
selector.Register(
    socket,
    SocketEvent.Receive,
    () => bytesReceived.Complete(null)
);

await bytesReceived.Future;

int n = socket.Receive(...); //   

// selector -     poll.   
//        (Receive 
//  ), ,    ,  .
//   completer,      ,
//        , ,     .
//     ,       .

En utilisant cette technique simple, nous pouvons servir un grand nombre de sockets dans un seul thread. Nous ne bloquons jamais un flux en attendant la réception ou l'envoi d'octets. Le flux est toujours occupé par un travail utile. Concurrence, en un mot.

Les serveurs frontaux sont implémentés de cette façon. Ils sont tous à un seul thread et asynchrones. Par conséquent, pour des performances maximales, vous devez exécuter autant de serveurs sur une seule machine que de cœurs (4 sur la photo).

Le serveur Frontend lit le message du client et, en fonction du code du message, l'envoie à l'une des rubriques de Kafka.

Une petite note de bas de page pour ceux qui ne connaissent pas kafa
, RabbitMQ. . , ( authentication backend authentication, ). ? - , (partition). , . , , . , ( , , , (headers)).

? ? . (consumer) ( consumer'), ( ) . , , , 2 , . 3 — 2. .

Le serveur frontal envoie un message à la kafka sans clé (lorsqu'il n'y a pas de clé, la kafka envoie simplement des messages à la partie à son tour). Le message est extrait de la rubrique par l'un des serveurs principaux correspondants. Le serveur traite le message et ... et ensuite? Et ce qui dépend en plus du type de message.

Dans le cas le plus courant, un cycle demande-réponse se produit. Par exemple, pour une demande d'inscription, il suffit de donner une réponse au client ( Success,EmailAlreadyInUse, etc). Mais à un message contenant une invitation à un chat existant de nouveaux membres (Vasya, Emil et Julia), nous devons répondre immédiatement avec trois types de messages différents. Le premier type - vous devez informer l'invitateur du résultat de l'opération (soudain, une erreur de serveur s'est produite). Le deuxième type - vous devez informer tous les membres actuels du chat qu'il y a maintenant tel ou tel nouveau membre dans le chat. La troisième consiste à envoyer des invitations à Vasya, Emil et Yulia.

D'accord, cela ne semble pas très difficile, mais pour envoyer un message à un client, nous devons: 1) savoir à quel serveur frontal ce client est connecté (nous ne choisissons pas le serveur auquel le client se connectera, l'équilibreur décide pour nous); 2) envoyer un message du serveur principal au serveur frontal souhaité; 3) en fait, envoyez un message au client.

Pour implémenter les points 1 et 2, j'ai décidé d'utiliser une rubrique distincte (rubrique "serveurs frontaux"). La séparation des thèmes d'authentification, de session et d'appel en partitions sert de mécanisme de parallélisation. Nous voyons que le serveur de session est lourdement chargé? Nous venons d'ajouter quelques nouveaux serveurs de partition et de session, et Kafka redistribuera la charge pour nous, déchargeant les serveurs de session existants. La séparation de la rubrique "serveurs frontaux" dans la partition sert de mécanisme de routage .

Chaque serveur frontal correspond à une partie de la rubrique "serveurs frontaux" (avec le même index que le serveur lui-même). Autrement dit, serveur 0 - partition 0, et ainsi de suite. Kafka permet de souscrire non seulement à un sujet spécifique, mais aussi à une partie spécifique d'un certain sujet. Tous les serveurs frontaux des startups s'abonnent à la partition correspondante. Ainsi, le serveur principal est capable d'envoyer un message à un serveur frontal spécifique en envoyant un message à une partition spécifique.

D'accord, maintenant, lorsque le client se joint, il vous suffit d'enregistrer quelque part une paire de UserId - Frontend Server Index. En cas de déconnexion - supprimer. À ces fins, n'importe laquelle des nombreuses bases de données de valeurs-clés en mémoire fera l'affaire. J'ai choisi un radis.

À quoi cela ressemble dans la pratique. Tout d'abord, une fois la connexion établie, le client Andrey envoie un message au serveur Join. Le serveur frontend reçoit le message et le transmet à la rubrique de session, en ajoutant préalablement l'en-tête «Frontend Server»: {index}. L'un des serveurs de session d'arrière-plan recevra un message, lira le jeton d'autorisation, déterminera le type d'utilisateur auquel il s'est joint, lira l'index ajouté par le serveur frontal et écrira UserId - Index sur le radis. À partir de ce moment, le client est considéré comme étant en ligne, et maintenant nous savons par quel serveur frontal (et, par conséquent, par quelle partie de la rubrique "serveurs frontaux") nous pouvons "tendre la main" lorsque d'autres clients envoient des messages à Andrey.

* En fait, le processus est un peu plus compliqué que je l'ai décrit. Vous pouvez le trouver dans le code source.

Pseudo-code du serveur frontal


// Frontend Server 6
while (true) {
    // Consume from "Frontend Servers" topic, partition 6
    var messageToClient = consumer.Consume();
    if (message != null) {
        relayMessageToClient(messageToClient);
    }

    var callbacks = selector.Poll();
    while (callbacks.TryDequeue(out callback)) {
        callback();
    }

    long now = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
    while (!callAtQueue.IsEmpty && callAtQueue.PeekPriority() <= now) {
        callAtQueue.Dequeue()();
    }

    while (messagesToRelayToBackendServers.TryDequeue(out messageFromClient)) {
        // choose topic
        producer.Produce(topic, messageFromClient);
    }
}


Il y a quelques astuces ici.
1) relayMessageToClient. Ce sera une erreur de prendre simplement le socket que vous voulez et de commencer immédiatement à lui envoyer un message, car nous envoyons peut-être déjà un autre message au client. Si nous commençons à envoyer des octets sans vérifier si le socket est actuellement occupé, les messages seront mélangés. Comme dans de nombreux autres endroits où un traitement ordonné des données est requis, l'astuce consiste à utiliser une file d'attente, à savoir une file d'attente de Completers ( TaskCompletionSourceen C #).
void async relayMessageToClient(message) {
    // find client
    await client.ReadyToSend();
    await sendMessage(client, message);
    client.CompleteSend();
}

class Client {
    // ...
    sendMessageQueue = new LinkedList<Completer<object>>();

    async Future ReadyToSend() {
        var sendMessage = Completer<object>();
	if (sendMessageQueue.IsEmpty) {
	    sendMessageQueue.AddLast(sendMessage);
	} else {
	    var prevSendMessage = sendMessageQueue.Last;
	    sendMessageQueue.AddLast(sendMessage);
	    await prevSendMessage.Future;
	}
    }

    void CompleteSend() {
        var sendMessage = sendMessageQueue.RemoveFirst();
	sendMessage.Complete(null);
    }
}

Si la file d'attente n'est pas vide, le socket est déjà occupé pour le moment. Créez-en un nouveau completer, ajoutez-le à la file d'attente et à awaitla précédente completer . Ainsi, lorsque le message précédent est envoyé, il CompleteSendse termine completer, ce qui amène le serveur à commencer à envoyer le message suivant. Une telle file d'attente permet également de propager en douceur des exceptions. Supposons qu'une erreur s'est produite lors de l'envoi d'un message à un client. Dans ce cas, nous devons terminer, à l'exception de l'envoi non seulement de ce message, mais aussi de tous les messages qui sont actuellement en attente dans la file d'attente (attendez await'ah). Si nous ne le faisons pas, ils continueront de se bloquer et nous recevrons une fuite de mémoire. Par souci de concision, le code qui fait cela n'est pas affiché ici.

2)selector.Poll. En fait, ce n’est même pas une astuce, mais juste une tentative pour aplanir les lacunes de l’implémentation de la méthode Socket.Select( selector- juste un wrapper sur cette méthode). Selon le système d'exploitation sous le capot, cette méthode utilise soit selectou poll. Mais ce n'est pas important ici. L'important est de savoir comment cette méthode fonctionne avec les listes que nous alimentons en entrée (liste des sockets pour la lecture, l'écriture, la vérification des erreurs). Cette méthode prend des listes, interroge les sockets et ne laisse que les sockets dans les listes qui sont prêts à effectuer l'opération requise. Toutes les autres sockets sont supprimées des listes. "Coup de pied" se produit à traversRemoveAt(c'est-à-dire que tous les éléments suivants sont décalés, ce qui est inefficace). De plus, comme nous devons interroger toutes les sockets enregistrées à chaque itération du cycle, un tel «nettoyage» est généralement nuisible, nous devons à nouveau remplir les listes à chaque fois. Nous pouvons contourner tous ces problèmes en utilisant un problème personnalisé List, RemoveAtdont la méthode ne supprime pas l'élément de la liste, mais le marque simplement comme supprimé. La classe ListForPollingest mon implémentation d'une telle liste. ListForPollingne fonctionne qu'avec la méthode Socket.Selectet ne convient à rien d'autre.

3)callAtQueue. Dans la plupart des cas, le serveur frontal, après avoir envoyé le message client au serveur principal, attend une réponse (confirmation que l'opération a réussi, ou une erreur en cas de problème). S'il n'attend pas de réponse dans un délai configurable, il envoie une erreur au client pour qu'il n'attende pas une réponse qui ne viendra jamais. callAtQueueEst une file d'attente prioritaire. Immédiatement après que le serveur envoie le message à Kafka, il fait quelque chose comme ceci:
long now = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
callAtQueue.Enqueue(callback, now + config.WaitForReplyMSec);

Dans le rappel, l'attente d'une réponse est annulée et l'envoi de l'erreur du serveur commence. Si une réponse du serveur principal est reçue, le rappel ne fait rien. Il n'y a aucun moyen de l'

utiliser await Task.WhenAny(answerReceivedTask, Task.Delay(x)), car le code après son Task.Delayexécution est exécuté sur le thread du pool.

Ici, en fait, tout sur les serveurs frontaux. Une légère correction s'impose ici. En fait, le serveur n'est pas complètementfiletage simple. Bien sûr, kafka sous le capot utilise des threads, mais je veux dire le code de l'application. Le fait est que l'envoi d'un message sur le sujet de la kafka (produit) peut ne pas réussir. En cas d'échec, Kafka répète l'envoi d'un certain nombre de fois configurable, mais, si les départs répétés échouent, Kafka abandonne cette activité comme désespérée. Vous pouvez vérifier si le message a été envoyé avec succès ou non dans deliveryHandlerlequel nous passons à la méthode Produce. Kafka appelle ce gestionnaire dans le thread d'E / S du producteur (le thread qui envoie des messages). Nous devons nous assurer que le message a été envoyé avec succès, et sinon, annuler l'attente d'une réponse du serveur principal (la réponse ne viendra pas car la demande n'a pas été envoyée) et envoyer une erreur au client. Autrement dit, nous ne pouvons pas éviter d'interagir avec un autre fil.

* Lors de la rédaction d'un article, je me suis soudain rendu compte que nous ne pouvions pas passer deliveryHandlerà la méthode Produceou simplement ignorer toutes les erreurs kafka (l'erreur sera toujours envoyée au client dans le délai que j'ai décrit plus tôt) - alors tout notre code sera monotrame. Maintenant, je pense à mieux faire.

Pourquoi, en fait, du kafka, pas du lapin?
, , , , , RabbitMQ? . , , . ? , frontend . , backend , , . , , . , error-prone. , basicGet , , , . . basicGet, , . .


Serveur backend


Par rapport au serveur frontal, il n'y a pratiquement pas de points intéressants ici. Tous les serveurs principaux fonctionnent de la même manière. Au démarrage, le serveur s'abonne à la rubrique (authentification, session ou appel selon le rôle), et la kafka lui assigne une ou plusieurs partitions. Le serveur reçoit le message de Kafka, traite et envoie généralement un ou plusieurs messages en réponse. Code presque réel:
void Run() {
    long lastCommitTime = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();

    while (true) {
        var consumeResult = consumer.Consume(
            TimeSpan.FromMilliseconds(config.Consumer.PollTimeoutMSec)
        );

        if (consumeResult != null) {
            var workUnit = new WorkUnit() {
                ConsumeResult = consumeResult,
            };

            LinkedList<WorkUnit> workUnits;
            if (partitionToWorkUnits.ContainsKey(consumeResult.Partition)) {
                workUnits = partitionToWorkUnits[consumeResult.Partition];
            } else {
                workUnits = partitionToWorkUnits[consumeResult.Partition] =
                    new LinkedList<WorkUnit>();
            }

            workUnits.AddLast(workUnit);

            handleWorkUnit(workUnit);
        }

	if (
            DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() - lastCommitTime >=
            config.Consumer.CommitIntervalMSec
        ) {
            commitOffsets();
	    lastCommitTime = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
	}
    }
}

Quel genre de compensations engager?
. — (offset) (0, 1 ). 0. TopicPartitionOffset. (consume) , ConsumeResult, , , TopicPartitionOffset. ?

at least once delivery, , ( ). , (commited) . , consumer 16, , 16 , , , . - consumer' consumer' , 16 + 1 ( + 1). 17 . N , .

J'ai désactivé la validation automatique et je me suis engagé. Ceci est nécessaire car handleWorkUnit, lorsque le traitement du message est effectivement effectué, il s'agit d'une async voidméthode, il n'y a donc aucune garantie que le message 5 sera traité avant le message 6. Kafka ne stocke qu'un seul décalage validé (et non un ensemble de décalage), respectivement, avant de valider le décalage 6, nous devons également nous assurer que tous les messages précédents ont été traités. En outre, un serveur principal peut consommer des messages de plusieurs partitions en même temps et doit donc s'assurer qu'il valide le décalage correct sur la partition correspondante. Pour cela, nous utilisons une carte de hachage de la partition de formulaire: unités de travail. Voici à quoi ressemble le code commitOffsets(du vrai code cette fois):
private void commitOffsets() {
    foreach (LinkedList<WorkUnit> workUnits in partitionToWorkUnits.Values) {
        WorkUnit lastFinishedWorkUnit = null;
        LinkedListNode<WorkUnit> workUnit;
        while ((workUnit = workUnits.First) != null && workUnit.Value.IsFinished) {
            lastFinishedWorkUnit = workUnit.Value;
            workUnits.RemoveFirst();
        }

        if (lastFinishedWorkUnit != null) {
            offsets.Add(lastFinishedWorkUnit.ConsumeResult.TopicPartitionOffset);
        }
    }

    if (offsets.Count > 0) {
        consumer.Commit(offsets);
        foreach (var offset in offsets) {
            logger.Debug(
                "{Identifier}: Commited offset {TopicPartitionOffset}",
                identifier,
                offset
            );
        }
        offsets.Clear();
    }
}

Comme vous pouvez le voir, nous parcourons les unités, trouvons la dernière unité terminée à ce moment, après quoi il n'y en a pas de incomplète , et validons le décalage correspondant. Une telle boucle nous permet d'éviter les commits "troués". Par exemple, si nous avons actuellement 4 unités ( 0: Finished, 1: Not Finished, 2: Finished, 3: Finished), nous ne pouvons valider que la 0ème unité, car si nous validons la 3ème immédiatement, cela peut entraîner la perte potentielle de la 1ère, si le serveur meurt en ce moment.
class WorkUnit {
    public ConsumeResult<Null, byte[]> ConsumeResult { get; set; }
    private int finished = 0;

    public bool IsFinished => finished == 1;

    public void Finish() {
        Interlocked.Increment(ref finished);
    }
}


handleWorkUnitcomme cela a été dit, la async voidméthode et, en conséquence, est complètement enveloppée try-catch-finally. En tryil appelle le service nécessaire, et en finally- workUnit.Finish().

Les services sont assez banals. Ici, par exemple, quel code est exécuté lorsque l'utilisateur envoie un nouveau message:
private async Task<ServiceResult> createShareItem(CreateShareItemMessage msg) {
    byte[] message;
    byte[] messageToPals1 = null;
    int?[] partitions1 = null;

    //  UserId  .
    long? userId = hashService.ValidateSessionIdentifier(msg.SessionIdentifier);
    if (userId != null) {
        var shareItem = new ShareItemModel(
            requestIdentifier: msg.RequestIdentifier,
            roomIdentifier: msg.RoomIdentifier,
            creatorId: userId,
            timeOfCreation: null,
            type: msg.ShareItemType,
            content: msg.Content
        );

        //      null,
        //     .
        long? timeOfCreation = await storageService.CreateShareItem(shareItem);
        if (timeOfCreation != null) {
            //      .
            List<long> pals = await inMemoryStorageService.GetRoomPals(
                msg.RoomIdentifier
            );
            if (pals == null) {
            	//     -       .
                pals = await storageService.GetRoomPals(msg.RoomIdentifier);
                await inMemoryStorageService.SaveRoomPals(msg.RoomIdentifier, pals);
            }

            //    ,  .
            pals.Remove(userId.Value);

            if (pals.Count > 0) {
            	//  ack,  ,    
                //    .
                await storageService.CreateAck(
                    msg.RequestIdentifier, userId.Value, msg.RoomIdentifier,
                    timeOfCreation.Value, pals
                );

                // in -  UserId, out -   frontend ,
                //    .  -   -
                //   null.
                partitions1 = await inMemoryStorageService.GetUserPartitions(pals);

                List<long> onlinePals = getOnlinePals(pals, partitions1);

                //    ,       .
                //         .
                if (onlinePals.Count > 0) {
                    messageToPals1 = converterService.EncodeNewShareItemMessage(
                        userId.Value, timeOfCreation.Value, onlinePals, shareItem
                    );
                    nullRepeatedPartitions(partitions1);
                    // -         
                    // frontend ,    null' .
                }
            }

            message = converterService.EncodeSuccessfulShareItemCreationMessage(
                msg.RequestIdentifier, timeOfCreation.Value
            );
        } else {
            message = converterService.EncodeMessage(
                MessageCode.RoomNotFound, msg.RequestIdentifier
            );
        }
    } else {
        message = converterService.EncodeMessage(
            MessageCode.UserNotFound, msg.RequestIdentifier
        );
    }

    return new ServiceResult(
        message: message, //    .
        messageToPals1: messageToPals1, //  -    .
        partitions1: partitions1
    );
}


Base de données


La plupart des fonctionnalités des services appelés par les serveurs backend consistent simplement à ajouter de nouvelles données à la base de données et à traiter celles existantes. De toute évidence, la façon dont la base de données est organisée et comment nous fonctionnons est très importante pour le messager, et ici, je voudrais dire que j'ai abordé la question du choix d'une base de données très soigneusement après avoir soigneusement étudié toutes les options, mais ce n'est pas le cas. Je viens de choisir CockroachDb car il promet beaucoup avec un minimum d'effort et possède une syntaxe compatible postgres (j'ai déjà travaillé avec postgres). Il y avait des pensées d'utiliser Cassandra, mais à la fin j'ai décidé de m'attarder sur quelque chose de familier. Je n'avais jamais travaillé avec Kafka, ni avec Rabbit, ni avec Flutter and Dart, ni avec WebRtc, alors j'ai décidé de ne pas traîner Cassandra aussi, parce que j'avais peur de me noyer dans une multitude de nouvelles technologies pour moi.

De toutes les parties de mon projet, la conception de bases de données est la chose dont je doute le plus. Je ne suis pas sûr que les décisions que j'ai prises soient vraiment de bonnes décisions. Tout fonctionne, mais pourrait être mieux fait. Par exemple, il existe des tables ShareRooms (lorsque j'appelle des chats) et ShareItems (lorsque j'appelle des messages). Ainsi, tous les utilisateurs entrant dans une pièce sont enregistrés dans le champ jsonb de cette pièce. C'est pratique, mais évidemment très lent, donc je vais probablement le refaire en utilisant des clés étrangères. Ou, par exemple, la table ShareItems stocke tous les messages. Ce qui est également pratique, mais puisque ShareItems est l'une des tables les plus chargées (persistantes selectetinsert), il pourrait être utile de créer une nouvelle table pour chaque pièce ou quelque chose comme ça. Kokroach diffuse des enregistrements sur différents nœuds, en conséquence, vous devez soigneusement réfléchir à l'enregistrement qui ira afin d'obtenir des performances maximales, mais je ne l'ai pas fait. En général, comme on peut le comprendre à partir de tout ce qui précède, les bases de données ne sont pas mon point fort. En ce moment, je teste généralement tout pour les postgres, et non les kokroach, car il y a moins de charge sur ma machine de travail, elle est déjà si pauvre en charges qu'elle va bientôt décoller. Heureusement, le code pour postgres et kokroach diffère beaucoup, donc la commutation n'est pas difficile.

Maintenant, je suis en train d'étudier le fonctionnement réel du cocroach (comment le mappage se produit entre SQL et la valeur-clé (le cocroach utilise RocksDb sous le capot), comment il répartit les données entre les nœuds, les répliques, etc.). Il valait bien sûr la peine d'étudier le cocotier avant de l'utiliser, mais mieux vaut tard que jamais.

Je pense que la base subira de grands changements lorsque je comprendrai mieux cette question. En ce moment, la table Acks me hante. Dans ce tableau, je stocke des données sur qui n'a pas encore reçu et qui n'a pas encore lu le message (pour afficher les coches de l'utilisateur). Il est facile d'informer l'utilisateur que son message a été lu si l'utilisateur est en ligne maintenant, mais sinon, nous devons enregistrer ces informations afin d'avertir l'utilisateur plus tard. Et comme les discussions de groupe sont disponibles, il ne suffit pas de stocker l'indicateur, vous avez besoin de données sur les utilisateurs individuels. Donc, ici, nous demandons directement l'utilisation de chaînes de bits (une ligne pour les utilisateurs qui n'ont pas encore reçu, la seconde - pour ceux qui n'ont pas encore lu). Surtout le support kokroach bitetbit varying. Cependant, je n'ai jamais compris comment mettre en œuvre cette activité, étant donné que la composition des pièces peut constamment changer. Pour que les chaînes de bits conservent leur signification, les utilisateurs dans la salle doivent rester dans le même ordre, ce qui est assez difficile à faire lorsque, par exemple, certains utilisateurs quittent la salle. Il y a des options ici. Il vaut peut-être la peine d'écrire -1 au lieu de supprimer l'utilisateur du champ jsonb afin que l'ordre soit préservé, ou d'utiliser une méthode de versioning, afin que nous sachions que cette chaîne de bits fait référence à l'ordre des utilisateurs, qui était alors à l'époque, et non dans l'ordre actuel des utilisateurs. Je suis toujours en train de réfléchir à une meilleure implémentation de cette activité, mais pour le moment, ceux qui n'ont pas encore reçu et n'ont pas lu les utilisateurs ne sont que des champs jsonb. Étant donné que la table Acks est écrite avec chaque message, la quantité de données est importante.Bien que l'enregistrement, bien sûr, soit supprimé lorsque le message est reçu et lu par tout le monde.

Battement


Pendant longtemps, j'ai travaillé côté serveur et utilisé des clients de console simples pour le test, donc je n'ai même pas créé de projet Flutter. Et quand je l'ai créé, je pensais que la partie serveur était une partie complexe , et l'application est comme ça, poubelle, je vais le comprendre dans quelques jours. En travaillant sur le serveur, j'ai créé Hello Worlds pour flutter plusieurs fois pour avoir une idée du cadre, et comme le messager n'a pas besoin d'interface utilisateur complexe, je pensais qu'il était complètement prêt. Donc, l'interface utilisateur, vraiment, est une ordure, mais la mise en œuvre de la fonctionnalité m'a posé des problèmes (et elle sera toujours livrée, car tout n'est pas prêt).

Gestion de l'État


Le sujet le plus populaire. Il existe mille façons de gérer votre état et l'approche recommandée est modifiée tous les six mois. Maintenant, le courant dominant est le fournisseur. Personnellement, j'ai choisi 2 façons pour moi: bloc et redux. Bloc (Business Logic Component) pour la gestion de l'état local et redux pour la gestion globale.

Bloc n'est pas une sorte de bibliothèque (bien que, bien sûr, il y ait aussi une bibliothèque qui réduit le passe-partout, mais je ne l'utilise pas). Bloc est une approche basée sur les flux. En général, le dart est une langue assez agréable et les flux sont généralement si doux. L'essence de cette approche est que nous poussons toute la logique métier dans les services et que nous communiquons entre l'interface utilisateur et les services via un contrôleur qui nous fournit différents flux. L'utilisateur a-t-il cliqué sur le bouton «rechercher un contact»? En utilisantsink(l'autre extrémité du flux) nous envoyons un événement au contrôleur SearchContactsEvent, le contrôleur appellera le service souhaité, attendra le résultat et renverra la liste des utilisateurs à l'interface utilisateur également via le flux. L'interface utilisateur attend les résultats à l'aide de StreamBuilder(widget qui est reconstruit chaque fois que de nouvelles données arrivent dans le flux auquel elles sont abonnées). En fait, c'est tout. Dans certains cas, nous devons mettre à jour l'interface utilisateur sans aucune intervention de l'utilisateur (par exemple, lorsqu'un nouveau message est arrivé), mais cela se fait également facilement via les flux. En fait, un simple MVC avec des flux, pas de magie.

Par rapport à d'autres approches, le bloc nécessite plus de passe-partout, mais, à mon avis, il est préférable d'utiliser des solutions natives sans la participation de bibliothèques tierces, sauf si l'utilisation d'une solution tierce donne desavantages. Plus il y a d'abstractions en haut, plus il est difficile de comprendre ce qu'est l'erreur lorsqu'une erreur se produit. Je ne considère pas que les avantages du fournisseur soient suffisamment importants pour y basculer. Mais j'ai peu d'expérience dans ce domaine, il est donc probable que je changerai de camp à l'avenir.

Eh bien, à propos de redux, et donc tout le monde sait tout, donc il n'y a rien à dire. De plus, je l'ai coupé de l'application :) Je l'ai utilisé pour gérer mon compte, mais ensuite, réalisant que dans ce cas il n'y a pas d'avantages particuliers par rapport au bloc, je l'ai coupé pour ne pas trop traîner. Mais en général, je considère que redux est une chose utile pour gérer l'état global.

La partie la plus atroce


Que dois-je faire si l'utilisateur envoie un message, mais avant qu'il ne soit envoyé, la connexion Internet est perdue? Que dois-je faire si l'utilisateur a reçu une confirmation de lecture, mais qu'il a fermé l'application avant la mise à jour de l'enregistrement correspondant dans la base de données? Que dois-je faire si l'utilisateur a invité son ami dans la chambre, mais avant que l'invitation ne soit envoyée, sa batterie est morte? Avez-vous déjà posé des questions similaires? Je suis ici. Avant. Mais dans le processus de développement, j'ai commencé à me demander. Étant donné que la connexion peut disparaître à tout moment et que le téléphone s'éteint à tout moment, tout doit être confirmé . Pas drôle. Par conséquent, le tout premier message que le client envoie au serveur ( Joinsi vous vous en souvenez) n'est pas seulement «Bonjour je suis en ligne» , c'est"Bonjour, je suis en ligne et voici les salles non confirmées, voici les remerciements non confirmés, voici les opérations d'adhésion aux salles non confirmées, et voici les derniers messages reçus par chambre . " Et le serveur répond avec une feuille similaire: «Pendant que vous étiez hors ligne, tel ou tel vos messages ont été lus par tel ou tel utilisateur, et ils ont également invité Petya dans cette salle, et Sveta a quitté cette salle, et vous avez été invité dans cette salle, mais pour ces deux salles ont 40 nouveaux postes . " J'aimerais vraiment savoir comment des choses similaires sont faites dans d'autres messagers, car ma mise en œuvre ne brille pas avec grâce.

Images


Pour le moment, vous pouvez envoyer du texte, du texte + des images et seulement des images. Le téléchargement de vidéos n'a pas encore été mis en œuvre. Les images sont un peu compressées et enregistrées dans le stockage Firebase. Le message lui-même contient des liens. À la réception du message, le client télécharge des images, génère des miniatures et enregistre tout dans le système de fichiers. Les chemins d'accès aux fichiers sont écrits dans la base de données. Soit dit en passant, la génération de miniatures est le seul code exécuté sur un thread séparé, car il s'agit d'une opération lourde en calcul. Je démarre simplement un flux de travail, je lui donne une image et en retour, je reçois une miniature. Le code est extrêmement simple, car dart fournit des abstractions pratiques pour travailler avec des flux.

ThumbnailGeneratorService
class ThumbnailGeneratorService {
  SendPort _sendPort;
  final Queue<Completer<Uint8List>> _completerQueue =
      Queue<Completer<Uint8List>>();

  ThumbnailGeneratorService() {
    var receivePort = ReceivePort();
    Isolate.spawn(startWorker, receivePort.sendPort);

    receivePort.listen((data) {
      if (data is SendPort) {
        _sendPort = data;
      } else {
        var completer = _completerQueue.removeFirst();
        completer.complete(data);
      }
    });
  }

  static void startWorker(SendPort sendPort) async {
    var receivePort = ReceivePort();
    sendPort.send(receivePort.sendPort);

    receivePort.listen((imageBytes) {
      Image image = decodeImage(imageBytes);
      Image thumbnail = copyResize(image, width: min(image.width, 200));

      sendPort.send(Uint8List.fromList(encodePng(thumbnail)));
    });
  }

  Future<Uint8List> generate(Uint8List imageBytes) {
    var completer = Completer<Uint8List>();
    _completerQueue.add(completer);
    
    _sendPort.send(imageBytes);

    return completer.future;
  }
}


L'authentification Firebase est également utilisée, mais uniquement pour l'autorisation d'accès au stockage Firebase (afin que l'utilisateur ne puisse pas, par exemple, remplir la photo de profil pour quelqu'un d'autre ). Toutes les autres autorisations se font via mes serveurs.

Format du message


Vous êtes probablement horrifié ici, car j'utilise des tableaux d'octets réguliers. Json disparaît car l'efficacité est requise, et je ne connaissais pas protobuf lorsque j'ai commencé. L'utilisation de tableaux nécessite beaucoup de prudence car un index est incorrect et les choses tournent mal.

Les 4 premiers octets correspondent à la longueur du message.
L'octet suivant est le code du message.
Les 16 octets suivants sont l'identifiant de la demande (uuid).
Les 40 octets suivants constituent le jeton d'autorisation.
Le reste du message .

Longueur du messagerequis, car je n'utilise pas de sockets http ou web, ou un autre protocole qui permet de séparer un message d'un autre. Mes serveurs frontaux ne voient que les flux d'octets, et ils ont besoin de savoir où un message se termine et un autre commence. Il existe plusieurs façons de séparer les messages (par exemple, utiliser un type de caractère jamais trouvé dans les messages comme séparateur), mais j'ai préféré spécifier la longueur, car cette méthode est la plus simple, bien qu'elle entraîne des frais généraux, car la plupart des messages sont manquants et un octet pour indiquer la longueur.

Le code du message n'est qu'un des membres de l'énumérationMessageCode. Le routage est effectué selon le code, et comme nous pouvons extraire le code du tableau sans désérialisation préalable, le serveur frontal décide dans quel sujet de la kafka envoyer un message au lieu de déléguer cette responsabilité à quelqu'un d'autre.

Identifiant de demandeprésent dans la plupart des postes, mais pas dans tous. Il remplit 2 fonctions: par cet identifiant, le client établit la correspondance entre la demande envoyée et la réponse reçue (si le client a envoyé les messages A, B, C dans cet ordre, cela ne signifie pas que les réponses viendront également dans l'ordre). La deuxième fonction est d'éviter les doublons. Comme mentionné précédemment, kafka garantit au moins une fois la livraison. Autrement dit, dans de rares cas, les messages peuvent toujours être dupliqués. En ajoutant la colonne RequestIdentifier avec une contrainte unique à la table de base de données souhaitée, nous pouvons éviter d'insérer un doublon.

Jeton d'autorisationEst un UserId (8 octets) + 32 octets de signature HmacSha256. Je ne pense pas que cela vaille la peine d'utiliser Jwt ici. Jwt est environ 7 à 8 fois plus grand pour quoi? Mes utilisateurs n'ont aucune réclamation, donc une simple signature hmac est très bien. L'autorisation par le biais d'autres services n'est pas et n'est pas prévue.

Appels audio et vidéo


C'est drôle que j'ai délibérément reporté la mise en œuvre des appels audio et vidéo, car j'étais sûr que je ne serais pas en mesure de résoudre les problèmes, mais en fait, il s'est avéré que c'était l'une des fonctionnalités les plus faciles à mettre en œuvre. Au moins la fonctionnalité de base. En général, l'ajout de WebRtc à l'application et l'obtention de la première session vidéo n'ont pris que quelques heures et, miraculeusement, le premier test a réussi. Avant cela, je pensais que le code qui fonctionnait la première fois était un mythe. Habituellement, le premier test d'une nouvelle fonctionnalité échoue toujours en raison d'une sorte d'erreur stupide comme «a ajouté un service, mais ne l'a pas enregistré dans un conteneur DI».

Pas très bref sur WebRtc pour les non-initiés
WebRtc — , peer-to-peer , , peer-to-peer , . - , , . , .

(peer-to-peer), , 3 ( , 3 . 3 ).

— stun . stun , — Source IP Source Port , . ? - . IP . - , , , Source IP Source Port IP - NAT [ Source IP | Source Port | Router External IP | Router Port ]. - , Dest IP Dest Port Router External IP Router Port NAT, , Source IP — Source Port , . , , , , , , NAT . stun NAT . stun Router External IP — Router Port. — . , «» NAT (NAT traversal) , NAT , stun .
* NAT , . , , WebRtc .

— turn. , , peer-to-peer . Fallback . , , , , , peer-to-peer . turn — coturn, .

— . , . , . — . . , , , :) — .

WebRtc 3 : offer, answer candidate. offer , answer, . , , , , . ( ) , .


La technologie WebRtc elle-même établit une connexion et est engagée dans le transfert de flux aller-retour, mais ce n'est pas un cadre pour créer des appels à part entière. Par appel, je veux dire une session de communication avec la possibilité d'annuler, de rejeter et d'accepter l'appel, ainsi que de raccrocher. De plus, vous devez faire savoir à l'appelant si l'autre partie est déjà prise. Et aussi pour implémenter de petites choses comme "attendre une réponse à l'appel N secondes, puis réinitialiser". Si vous implémentez simplement WebRtc dans l'application sous une forme nue, alors avec un appel entrant, la caméra et la vidéo s'allumeront spontanément, ce qui, bien sûr, est inacceptable.

Dans sa forme pure, WebRtc implique généralement l'envoi de candidats à l'autre partie dès que possible afin que les négociations commencent le plus rapidement possible, ce qui est logique. Lors de mes tests, les candidats au parti d'accueil sont généralement toujourscommencé à venir avant même que l'offre arrive. Ces candidats «précoces» ne peuvent pas être écartés, ils doivent être mémorisés pour que plus tard, lorsque l'offre arrive et RTCPeerConnectionest créée, les ajouter à la connexion. Le fait que les candidats puissent commencer à arriver avant même l'offre, ainsi que d'autres raisons, font de la mise en œuvre d'appels à part entière une tâche non triviale. Que faire si plusieurs utilisateurs nous appellent en même temps? Nous recevrons des candidats de tous, et bien que nous puissions séparer les candidats d'un utilisateur d'un autre, il devient difficile de savoir quels candidats rejeter, car nous ne savons pas quelle offre viendra plus tôt. Il y aura également des problèmes si les candidats commencent à venir vers nous et ensuite une offre au moment où nous appelons nous- mêmes quelqu'un.

Après avoir testé plusieurs options avec WebRtc nu, je suis arrivé à la conclusion que tenter de faire des appels sous cette forme serait problématique et lourd de fuites de mémoire, j'ai donc décidé d'ajouter une autre étape au processus de négociation WebRtc. J'appelle cette étape Inquire - Grant/Refuse.

L'idée est très simple, mais il m'a fallu un certain temps pour y arriver. L'appelant avant même de créer le flux et RTCPeerConnection(et généralement avant d'exécuter tout code lié à WebRtc) envoie un message via le serveur de signaux de l'autre côté Inquire. Du côté de la réception, il est vérifié si l'utilisateur est actuellement dans une autre session de communication ( boolchamp simple ). Si c'est le cas, un message est renvoyé.Refuse, et de cette façon, nous faisons savoir à l'appelant que l'utilisateur est occupé, et le récepteur - que tel ou tel téléphone a appelé alors qu'il était occupé par une autre conversation. Si l'utilisateur est actuellement libre, il est réservé . L' Inquireidentifiant de session est envoyé dans le message , et cet identifiant est défini comme l'identifiant de la courant session. Si l'utilisateur est réservé, il rejettera tous les Inquire/Offer/Candidatemessages avec des identifiants de session autres que celui en cours. Après la réservation, le récepteur envoie un message via le serveur de signaux à l'appelant Grant. Il convient de dire que ce processus n'est pas visible pour l'utilisateur récepteur, car il n'y a pas encore d'appel. Et l'essentiel ici n'est pas d'oublier de raccrocher un timeout côté réception. Du coup, nous réserverons une session, et aucune offre ne suivra.

L'appelant reçoit Grant, et c'est là que WebRtc commence par les offres, les candidats, et c'est tout pour tout le monde. L'offre vole vers le destinataire et celui-ci, à la réception, affiche un écran avec les boutons Répondre / Rejeter. Mais les candidats, comme d'habitude, n'attendent personne. Ils recommencent à arriver encore plus tôt que l'offre, car il n'y a aucune raison d'attendre que l'utilisateur réponde à l'appel. Il peut ne pas répondre, mais rejeter ou attendre l'expiration du délai d'expiration - les candidats seront simplement expulsés.

Situation actuelle et plans futurs


  • Discussions privées et en groupe
  • Envoi de texte, d'images et de vidéos
  • Appels audio et vidéo
  • Accusé de réception et lecture
  • "Impressions ..."
  • Notifications
  • Recherche par code QR et géolocalisation


La recherche par code QR est, de manière inattendue, assez problématique à mettre en œuvre, car presque tous les plugins pour l'analyse de code que j'ai essayé refusent de démarrer ou ne fonctionnent pas correctement. Mais je pense que les problèmes seront résolus ici. Et pour la mise en œuvre de la recherche de géolocalisation, je n'ai pas encore repris. En théorie, il ne devrait pas y avoir de problèmes particuliers.

Notifications en cours, ainsi que l'envoi de vidéos.

Que faut-il faire d'autre?


Oh, beaucoup.
Tout d'abord, il n'y a pas de tests. Les collègues écrivaient des tests, alors je me suis complètement détendu.
Deuxièmement, il n'est actuellement pas possible d'inviter des utilisateurs à un chat existant et de quitter le chat. Le code serveur est prêt pour cela, le code client ne l'est pas.
Troisièmement, si la gestion des erreurs sur le serveur est plus ou moins, il n'y a pas de gestion des erreurs sur le client. Il ne suffit pas de faire une entrée de journal; vous devez réessayer l'opération. Désormais, par exemple, le mécanisme de renvoi des messages n'est pas implémenté.
Quatrièmement, le serveur ne fait pas de ping sur le client, donc la déconnexion n'est pas détectée si, par exemple, le client a perdu Internet. La déconnexion n'est détectée que lorsque le client ferme l'application.
Cinquièmement, les index ne sont pas utilisés dans la base de données.
Sixièmement, l' optimisation. Le code a un grand nombre d'endroits où quelque chose comme est écrit // @@TODO: Pool. La plupart des tableaux ne sont que newcela. Le serveur principal crée de nombreux tableaux de longueur fixe, donc ici vous pouvez et devez utiliser le pool.
Septièmement, il existe de nombreux emplacements sur le client où le code awaitse termine, bien que cela ne soit pas nécessaire. L'envoi d'images, par exemple, semble donc lent car le codeawaitIl enregistre les images dans le système de fichiers et génère des miniatures avant d'afficher le message, bien que cela ne doive pas être fait. Ou, par exemple, si vous ouvrez l'application et pendant votre absence, ils vous ont envoyé des images, le démarrage sera lent, car à nouveau toutes ces images sont téléchargées, enregistrées dans le système, des vignettes sont générées, et seulement après que le démarrage se termine et que vous êtes jeté de l'écran de démarrage sur l'écran d'accueil. Tous ces redondants awaitont été conçus pour faciliter le débogage, mais, bien sûr, vous devez vous débarrasser des attentes inutiles avant la publication.
HuitièmeL'interface utilisateur est maintenant à moitié prête, car je n'ai pas encore décidé comment je veux la voir. Par conséquent, maintenant tout n'est pas intuitif, la moitié des boutons ne savent pas ce qu'ils font. Et les boutons ne sont souvent pas enfoncés la première fois, car maintenant ils ne sont plus que des icônes avec GestureDetectoret sans remplissage, il n’est donc pas toujours possible d’y entrer. De plus, dans certains endroits, le dépassement de pixels n'est pas résolu.
Neuvièmement, il est désormais impossible de se connecter à un compte, il suffit de s'inscrire. Par conséquent, si vous désinstallez l'application et la réinstallez, vous ne pourrez pas vous connecter à votre compte :)
Dixièmement, le code de vérification n'est pas envoyé par la poste. Maintenant, le code est généralement toujours le même, car il est plus facile à déboguer.
OnzièmeLe principe de la responsabilité unique est violé dans de nombreux endroits. Besoin d'un refactor. Les classes chargées d'interagir avec la base de données (à la fois sur le client et sur le serveur) sont généralement très gonflées, car elles sont engagées dans toutes les opérations de base de données.
Douzièmement, le serveur frontal attend désormais toujours une réponse du serveur principal, même si le message n'implique pas l'envoi d'une réponse (par exemple, un message avec un code IsTypinget certains messages liés à WebRtc). Par conséquent, sans attendre de réponse, il écrit une erreur dans la console, bien que ce ne soit pas une erreur.
Treizièmement, les images complètes ne s'ouvrent pas au toucher.
Cent millions de cinquièmescertains messages qui doivent être envoyés par lots sont envoyés séparément. La même chose s'applique à certaines opérations de base de données. Au lieu d'exécuter une seule commande, les commandes sont exécutées en boucle avec await(brr ..).
Cent millions de sixièmes, certaines valeurs sont codées en dur, au lieu d'être configurables.
Cent un million septièmesla journalisation sur le serveur se fait désormais uniquement sur la console, et sur le client en général, directement sur le widget. Sur l'écran principal, il y a un onglet Journaux, où tous les journaux sur le robinet sont supprimés. Le fait est que ma machine de travail refuse d'exécuter à la fois l'émulateur et tout ce qui est nécessaire pour le serveur (kafka, base de données, radis et tous les serveurs). Le débit avec un appareil connecté n'a pas fonctionné non plus, tout était bien suspendu dans la moitié des cas, car l'ordinateur ne pouvait pas supporter les charges. Par conséquent, vous devez créer une version à chaque fois, la déposer sur l'appareil, l'installer et la tester comme ceci. Pour voir les journaux, je les dépose directement dans le widget. Perversion, je sais, mais il n'y a pas le choix. Pour la même raison, de nombreuses méthodes renvoient FutureetawaitIls le sont (pour intercepter l'exception et les lancer dans le widget), mais ils ne devraient pas. Si vous regardez le code, vous verrez une _logErrorméthode laide dans de nombreuses classes qui fait cela. Bien sûr, cela ira également à la poubelle.
Cent millions huit, pas de sons.
Cent millions et neuvième, vous devez utiliser davantage la mise en cache.
Cent millions de dixièmes, beaucoup de code répétitif. Par exemple, de nombreuses actions vérifient tout d'abord la validité du jeton, et s'il n'est pas valide, elles envoient une erreur. Je pense que vous devez implémenter un pipeline de middleware simple.

Et beaucoup de petites choses, comme la concaténation de chaînes au lieu d'utiliser StringBuilder'a,Disposeon ne l'appelle pas partout où il faut, et ainsi de suite. En général, l'état normal du projet est en cours d'élaboration. Tout ce qui précède est résoluble, mais il y a un problème fondamental auquel je n'ai pas pensé jusqu'au dernier moment, car cela m'a échappé: le messager devrait fonctionner même lorsque l'application n'est pas ouverte et la mienne ne fonctionne pas. Pour être honnête, la solution à ce problème ne m'a pas encore traversé l'esprit. Ici, apparemment, vous ne pouvez pas vous passer du code natif.

J'évaluerais l'état de préparation du projet à 70%.

Sommaire


Six mois se sont écoulés depuis le début des travaux sur le projet. Combiné avec un travail à temps partiel et a pris de longues pauses, mais il a quand même fallu beaucoup de temps et d'énergie. Je prévois de mettre en œuvre toutes les fonctionnalités déclarées + ajouter quelque chose d'inhabituel comme du tic-tac-toe ou des ébauches directement dans la pièce. Sans raison, juste parce que c'est intéressant.

Si vous avez des questions, écrivez. Le courrier est sur github.

All Articles