Implémentation de l'algorithme de consensus RAFT pour le stockage KV distribué en Java

Rebonjour. Il y a quelques jours, la formation a commencé dans un nouveau groupe sur le cours «Architecte logiciel» , et nous aimerions aujourd'hui partager un article écrit par l'un des étudiants du cours, Anton Pleshakov (responsable du développement chez Program Logistics et co-fondateur de Clusterra).




Actuellement, les systèmes de microservices distribués sont devenus pratiquement la norme de l'industrie, et pas seulement dans le monde de l'entreprise. Les avantages de l'utilisation de systèmes distribués ont été décrits et discutés plus d'une fois. Les avantages des microservices sont connus de tous depuis longtemps: technologies pour la tâche, composabilité, évolutivité, évolutivité du développement, réduction du TTM, etc. Il est évident que le développement d'applications distribuées offre plus d'options pour une réponse rapide aux demandes croissantes des entreprises et la numérisation de tout ce qui l'entoure.

Il est également important de noter qu'à l'heure actuelle, un facteur très important affectant le choix d'une stratégie de développement en faveur des microservices est la disponibilité de toutes sortes de solutions d'infrastructure prêtes à l'emploi qui résolvent les problèmes liés aux coûts supplémentaires d'exploitation d'un système distribué. Nous parlons de systèmes d'orchestration de conteneurs, de purée de service, de moyens de traçage distribué, de surveillance, de journalisation, etc. On peut affirmer avec certitude que la plupart des facteurs mentionnés précédemment comme les inconvénients de l'approche microservice aujourd'hui n'ont pas autant d'influence qu'il y a quelques années.

Sur la base des réalités modernes, la plupart des développeurs cherchent à la première occasion de passer d'une structure monolithique à une structure microservice. L'une des premières étapes qui peuvent être prises sans recourir au refactoring total et à une décomposition sérieuse est de parvenir à un système d'évolutivité horizontale. Autrement dit, pour transformer votre application monolithique en un cluster, peut-être même composé des mêmes monolithes, mais vous permettant de faire varier dynamiquement leur nombre.

Lorsque l'on cherche à atteindre une évolutivité horizontale, la question de la synchronisation des données au sein d'un cluster se pose très rapidement et de manière très aiguë. Heureusement, tous les SGBD modernes prennent en charge la réplication des données entre les nœuds d'une manière ou d'une autre. Le développeur a juste besoin de sélectionner le SGBD pour la tâche et de décider quelles propriétés du système (selon le théorème CAP) il a besoin, CP ou AP, et le problème est résolu. Dans le cas où CP est requis et les exigences de cohérence sont élevées, l'une des méthodes pour résoudre le problème de synchronisation des données consiste à utiliser un cluster qui prend en charge l'algorithme de consensus RAFT.

Cet algorithme assez nouveau (développé en 2012) offre une grande garantie de cohérence et est très populaire. J'ai décidé de comprendre comment cela fonctionne et j'ai écrit mon implémentation d'un référentiel de valeurs-clés cohérent en Java (Spring Boot).

Est-il judicieux d'implémenter vous-même un algorithme distribué? Il est clair que vous pouvez prendre une implémentation prête à l'emploi d'un algorithme distribué, et avec le plus haut degré de probabilité, cette implémentation sera meilleure qu'un «vélo» fait maison. Par exemple, vous pouvez utiliser un SGBD qui maintient le niveau de cohérence requis. Ou vous pouvez déployer Zookeeper . Ou vous pouvez trouver un cadre adapté à votre langue. Pour java, il y a Atomix , qui résout parfaitement les problèmes de synchronisation des données distribuées.

Mais de l'autre côté. Si vous prenez une solution clé en main, l'utilisation d'une application externe ajoute généralement un point de défaillance supplémentaire à votre système. Et les frameworks peuvent être redondants ou difficiles à utiliser et à apprendre, ou ils peuvent ne pas exister du tout pour votre langage de programmation. De plus, l'implémentation indépendante de l'algorithme de consensus est une tâche d'ingénierie extrêmement intéressante qui élargit vos horizons et vous permet de comprendre comment résoudre les problèmes qui surviennent lorsque les services interagissent dans un cluster en utilisant la méthode la plus optimale.

Étant donné que la spécification de l'algorithme contient un ensemble de mesures pour maintenir l'intégrité des données, vous pouvez utiliser les connaissances acquises et même utiliser l'algorithme dans son intégralité. N'importe quelle partie de l'algorithme peut être utile dans la vie réelle. Supposons que vous disposiez d'un ensemble de travailleurs pour analyser les fichiers en parallèle. Les travailleurs sont équivalents, mais vous souhaitez désigner l'un des travailleurs en tant que coordinateur, et lorsque le travailleur coordinateur tombe, affectez tout autre travailleur libre en tant que coordinateur. La première moitié de l'algorithme RAFT, qui décrit comment choisir un leader parmi des nœuds équivalents, vous y aidera. Ou par exemple, si vous n'avez que deux nœuds par rapport à maître-esclave, vous pouvez très bien utiliser les règles de réplication décrites dans la spécification RAFT pour organiser l'échange de données dans votre cas le plus simple.

L'article est essentiellement un guide pratique sur la façon d'implémenter RAFT vous-même. L'algorithme lui-même et les aspects théoriques de son travail ne seront pas compris. Vous pouvez lire une brève description ici dans cet excellent article ou étudier la spécification complète ici . Vous y trouverez une visualisation très claire de l'algorithme.

Description générale de la solution


La partie du code directement liée à la mise en œuvre de l'algorithme est analysée dans l'article. À la fin de l'article, il y a un lien vers le référentiel, où vous pouvez voir tout le code.

La tâche était la suivante. Développez un système distribué qui vous permet de stocker des données dans une base de données de valeurs-clés. Les données de chaque nœud doivent être cohérentes, à savoir si les données sont entrées dans la base de données d'un nœud et que la plupart des nœuds ont confirmé qu'ils avaient également reçu ces données, alors tôt ou tard ces données seront dans la base de données de chaque nœud. Lorsqu'une partie du cluster est déconnectée et lorsqu'elle est reconnectée, les nœuds qui se trouvaient en dehors du cluster doivent rattraper le cluster principal et se synchroniser. Chaque nœud fournit une API REST pour écrire et lire les données de la base de données. Le système se compose de deux modules pour deux types de nœuds: client et serveur. Ci-dessous, nous considérons les caractéristiques de la mise en œuvre du serveur lui-même. Le code client est dans le référentiel.

Un nœud de serveur peut fonctionner dans trois états:

  • Suiveur (suiveur). Accepte les demandes de lecture du client. Prend un battement de coeur du leader
  • Candidat (candidat). Accepte les demandes de lecture du client. Envoie des demandes de vote à d'autres nœuds
  • Chef Accepte les demandes de lecture et d'écriture. Envoie des requêtes de pulsation à d'autres nœuds. Envoie des données de demande d'ajout à d'autres nœuds.

La période de «leadership» de l'un des nœuds s'appelle la ronde (terme). Un nouveau candidat ouvre un nouveau tour.

Stockage de données


Chaque nœud donne accès au référentiel du journal des opérations, dans lequel les opérations de modification des données sont enregistrées séquentiellement.

https://github.com/pleshakoff/raft/blob/master/server/src/main/java/com/raft/server/operations/OperationsLog.java


public interface OperationsLog {
   void append(Operation operation);
   Operation get(Integer index);
   List<Operation> all();

   Long getTerm(Integer index);
   Integer getLastIndex();
   Long getLastTerm();

   void removeAllFromIndex(int newOperationIndex);
}

Chaque opération, en plus des données et du type (insérer, modifier, supprimer), contient le numéro du cycle dans lequel elle a été créée. De plus, chaque opération a un index qui augmente séquentiellement. Il est important que toutes les opérations soient insérées dans les journaux des suiveurs dans le même ordre dans lequel elles sont insérées dans le journal du leader.

Chaque nœud a accès à une base de données dans laquelle les données sont stockées directement.

https://github.com/pleshakoff/raft/blob/master/server/src/main/java/com/raft/server/storage/Storage.java

public interface Storage {
   List<Entry> all();
   String get(Long key);
   void insert(Long key, String val);
   void update(Long key, String val);
   void delete(Long key);
}

Dans l'implémentation actuelle, des solutions intégrées en mémoire sont utilisées à la fois pour le journal et pour la base de données (liste et carte concurrentes ordinaires). Si nécessaire, vous pouvez simplement implémenter l'interface appropriée pour prendre en charge d'autres types de stockage.

L'application des opérations du journal à la base de données est effectuée par une machine à états distribuée. Une machine à états est un tel mécanisme qui est chargé de changer l'état d'un cluster, de restreindre l'utilisation de modifications incorrectes (opérations hors service ou nœud déconnecté qui se considère comme un leader). Pour que les modifications soient considérées comme valides et pour qu'elles soient appliquées à la base de données, elles doivent passer une série de vérifications et répondre à certains critères, c'est précisément ce que fournit la machine d'état.

Pour un leader, une opération est appliquée à la base de données si la plupart des nœuds ont confirmé le fait que l'opération est également répliquée dans leur journal. Pour une adepte, l'opération est appliquée à la base de données si un signal est reçu du chef de file qu'elle est entrée dans sa base de données.

Minuteries


Chaque nœud assure l'échange de données avec d'autres nœuds.

Deux types de requêtes sont pris en charge:

  • voter lors d'un tour de scrutin
  • ajouter, alias battement de cœur (s'il n'y a pas de données), pour répliquer les données du journal aux abonnés et pour empêcher le début d'un nouveau tour de scrutin.

Le fait du début d'un événement est déterminé par le temporisateur. Deux types de temporisateurs sont lancés sur le nœud:

  • voter. Pour commencer un tour de scrutin. Chaque nœud a son propre intervalle, après quoi il essaiera de commencer un nouveau vote. Le compte à rebours recommence lorsque vous recevez un battement de cœur du leader.
  • battement de coeur. Pour envoyer une demande aux abonnés par le leader de l'ajout. Si le nœud ne reçoit pas de battement de cœur et que le délai de vote a expiré, il devient candidat et déclenche des élections, augmente le nombre de tours de scrutin et envoie des demandes de vote à d'autres nœuds. Si le nœud recueille la majorité des votes, il devient le leader et commence à envoyer des battements de cœur.

L'état actuel du nœud


Chaque nœud stocke des données sur l'état actuel.

https://github.com/pleshakoff/raft/blob/master/server/src/main/java/com/raft/server/context/Context.java

public interface Context {
   Integer getId(); //    
   State getState();//: , ,  
   Integer getVotedFor(); 
               //          
   Long getCurrentTerm(); //  
   Integer getCommitIndex(); //    
   List<Peer> getPeers(); //      
}

Un nœud leader stocke également des métadonnées pour les nœuds vers lesquels il réplique les données.

https://github.com/pleshakoff/raft/blob/master/server/src/main/java/com/raft/server/node/peers/Peer.java

public interface Peer {
   Integer getId(); //  
   Integer getNextIndex(); //  ,    
   Integer getMatchIndex();//   
   Boolean getVoteGranted(); //     
}

Les métadonnées des nœuds sont mises à jour par le responsable lors de la réception des réponses des abonnés. Ils sont utilisés pour déterminer par le leader quelle prochaine opération d'index le suiveur est prêt à accepter et quelles opérations ont déjà été ajoutées au journal du suiveur.

Vote


La classe ElectionService est responsable du vote

public interface ElectionService {
   void processElection();
   AnswerVoteDTO vote(RequestVoteDTO requestVoteDTO);
} 

Envoi d'une demande de vote


Si le nœud est un suiveur et ne reçoit pas de battement de cœur pendant la période définie pour l'attente, il augmente son tour en cours, se déclare candidat et commence à envoyer des demandes de vote à d'autres nœuds. S'il parvient à réunir un quorum et que la plupart des nœuds votent, il deviendra le nouveau chef. En termes RAFT, le quorum représente plus de la moitié de tous les nœuds (51%).

Analysons la méthode de processElectionclasse ElectionServiceImpl, qui est appelée par le temporisateur de vote lorsque le vote expire et envoie aux nœuds une demande de vote .

//1
context.setState(CANDIDATE); 
Long term = context.incCurrentTerm(); 
context.setVotedFor(context.getId()); 

List<Integer> peersIds = context.getPeers().stream().map(Peer::getId).collect(Collectors.toList());
long voteGrantedCount = 1L;
long voteRevokedCount = 0L;

//2
while (checkCurrentElectionStatus(term)) {
   List<AnswerVoteDTO> answers = getVoteFromAllPeers(term, peersIds);
   peersIds = new ArrayList<>();
   for (AnswerVoteDTO answer : answers) {
       //3
       if (answer.getStatusCode().equals(OK)) {
           //4
           if (answer.getTerm()>context.getCurrentTerm()) {
               context.setTermGreaterThenCurrent(answer.getTerm());
               return;
           }
           if (answer.isVoteGranted()) {
               //5 
               context.getPeer(answer.getId()).setVoteGranted(true);
               voteGrantedCount++;
           } else
               //6 
               voteRevokedCount++;
       } else {
          peersIds.add(answer.getId());
       }
   }
  //7
  if (voteGrantedCount >= context.getQuorum()) {
       winElection(term);
       return;
   } else if (voteRevokedCount >= context.getQuorum()) {
       loseElection(term);
       return;
   } 

  1. Définissez le statut de «Candidat». Augmentez le nombre de tours et votez pour nous-mêmes.
  2. , ( ). - , , heartbeat .
  3. - , . , , -.
  4. , . , heartbeat .
  5. Le nœud a voté pour nous! Nous augmentons le nombre de nœuds votant pour nous et corrigeons que ce nœud a voté pour nous.
  6. Voté non pour nous, nous croyons aussi.
  7. Si le quorum est collecté et que le nœud a remporté l'élection, nous établissons le statut de «Leader». Sinon, nous devenons un suiveur et attendons.

Il convient également de noter que lorsqu'un nœud devient un leader, l'index suivant est défini pour chaque nœud dans la liste des nœuds stockés par le leader, qui est égal au dernier index du journal du leader plus 1. À partir de cet index, le leader tentera de mettre à jour les journaux du suiveur. En fait, cet indice stocké par le leader peut ne pas correspondre à l'indice réel du journal du suiveur et la valeur réelle ne sera obtenue que lors de l'échange de données avec le suiveur et sera ajustée. Mais un point de départ est nécessaire .

  private void winElection(Long term) {
       context.setState(LEADER);
       context.getPeers().forEach(peer ->
               peer.setNextIndex(operationsLog.getLastIndex()+1)

       );
   }

Traitement des demandes de vote


Lors du vote, chaque nœud reçoit une demande d'un candidat comme ceci :

class RequestVoteDTO {
   private final Long term; //     
   private final Integer candidateId; //  
   private final Integer lastLogIndex; //     
   private final Long lastLogTerm; //       
}

Examinons maintenant la procédure de voteclasse ElectionServiceImpl, elle traite la demande de vote du candidat et retourne une décision concernant sa candidature au poste de leader.

https://github.com/pleshakoff/raft/blob/eba5ea1984e2623702f4c299cf1b0af7a6ba0d14/server/src/main/java/com/raft/server/election/ElectionServiceImpl.java#L178


public AnswerVoteDTO vote(RequestVoteDTO dto) {
   
       boolean termCheck;
       //1
       if (dto.getTerm() < context.getCurrentTerm())
           return new AnswerVoteDTO(context.getId(),context.getCurrentTerm(),false);
       else //2
       if (dto.getTerm().equals(context.getCurrentTerm())) {
           termCheck = (context.getVotedFor() == null||
                          context.getVotedFor().equals(dto.getCandidateId()));
       }
       else
       {   //3
           termCheck = true;
             context.setTermGreaterThenCurrent(dto.getTerm());
       }

       //4  
       boolean logCheck = !((operationsLog.getLastTerm() > dto.getLastLogTerm()) ||
               ((operationsLog.getLastTerm().equals(dto.getLastLogTerm())) &&
                       (operationsLog.getLastIndex() > dto.getLastLogIndex())));


       boolean voteGranted = termCheck&&logCheck;

       //5
       if (voteGranted) {
           context.setVotedFor(dto.getCandidateId());
       }
       //6   
       return new AnswerVoteDTO(context.getId(),context.getCurrentTerm(),voteGranted);
   }

À la réception d'une demande d'un candidat, le nœud effectue deux vérifications: vérifie la ronde du candidat et la longueur de son journal. Si le tour du candidat est plus élevé et son journal est plus long ou égal, alors le nœud donne à son nœud un vote pour le candidat

  1. Si le tour du nœud actuel est plus grand que le tour du candidat, nous refusons, car il s'agit d'une demande de nœud en retard, qui, apparemment, était en dehors du cluster depuis un certain temps et a commencé la procédure électorale parce qu'il n'a pas vu le chef actuel.
  2. , , , , , , ; . — .
  3. ,
  4. . , , , , .
  5. Avec un résultat positif, nous corrigeons le fait que le nœud a participé aux élections et a voté pour le candidat.
  6. Renvoyer le résultat au candidat

Certes, les conditions auraient pu être écrites un peu plus courtes et plus élégantes, mais j'ai laissé une option tellement «naïve» afin de ne pas m'embrouiller et de ne dérouter personne.

La réplication


Le responsable du minuteur envoie des suiveurs de pulsations à tous les nœuds pour réinitialiser leurs minuteurs de vote. Puisque le leader stocke dans ses index de métadonnées les dernières opérations de tous les suiveurs, il peut évaluer si l'envoi de l'opération aux nœuds est nécessaire. Si le journal des opérations du leader devient plus long que le journal de n'importe quel suiveur, alors, avec le rythme cardiaque, il lui envoie séquentiellement les opérations manquantes. Appelez-le ajouter une demande. Si la plupart des nœuds confirment la réception de nouvelles opérations, le leader applique ces opérations à sa base de données et augmente l'index de la dernière opération appliquée. Cet index est également envoyé aux abonnés avec une demande de pulsation. Et si l'indice leader est supérieur à l'indice suiveur, le suiveur applique également des opérations à sa base de données afin d'égaliser les indices.

Ce type de demande d'ajout que le leader envoie au suiveur

class RequestAppendDTO {
   private final Long term; //   
   private final Integer leaderId; //   

   private final Integer prevLogIndex;//   
   private final Long prevLogTerm;//   
   private final Integer leaderCommit;//      
   private final Operation operation; //
}

Il existe des implémentations dans lesquelles les opérations sont transférées par lots de plusieurs par demande. Dans l'implémentation actuelle, une seule opération peut être transmise par

requête. La classe répond à l'envoi et au traitement de la requête heartbeat-append:

https://github.com/pleshakoff/raft/blob/eba5ea1984e2623702f4c299cf1b0af7a6ba0d14/server/src/main/java/com/raft/server/replication/ReplicationService.java

public interface ReplicationService {
   void appendRequest();
   AnswerAppendDTO append(RequestAppendDTO requestAppendDTO);
}

Soumettre une demande de modification des données


Considérez un fragment d'une méthode desendAppendForOnePeer classe ReplicationServiceImpl

. La méthode est responsable de générer une demande au suiveur et de l'envoyer .

private CompletableFuture<AnswerAppendDTO> sendAppendForOnePeer(Integer id) {
   return CompletableFuture.supplyAsync(() -> {
       try {
           //1
           Peer peer = context.getPeer(id);

           Operation operation;
           Integer prevIndex;
           //2    
           if (peer.getNextIndex() <= operationsLog.getLastIndex()) {
               operation = operationsLog.get(peer.getNextIndex());
               prevIndex = peer.getNextIndex() - 1;
           } else 
           //3  
           {
               operation = null;
               prevIndex = operationsLog.getLastIndex();
           }


           RequestAppendDTO requestAppendDTO = new RequestAppendDTO(
                   context.getCurrentTerm(), //   
                   context.getId(), //  
                   prevIndex,//      
                   operationsLog.getTerm(prevIndex),//  
                   context.getCommitIndex(),
                               //      
                   Operation //
           );

...
/*   http     */
}

  1. Métadonnées du suiveur
  2. , . ( ), , , , . , , , ,
  3. , , ; , , ,

Ensuite, considérez la méthode de appendRequestclasse ReplicationServiceImpl, qui est responsable de l'envoi de la demande d'ajout et du traitement du résultat à tous les abonnés.

https://github.com/pleshakoff/raft/blob/eba5ea1984e2623702f4c299cf1b0af7a6ba0d14/server/src/main/java/com/raft/server/replication/ReplicationServiceImpl.java#L109

public void appendRequest() {
       List<Integer> peersIds = context.getPeers().stream().map(Peer::getId).collect(Collectors.toList());

       //1 
       while (peersIds.size() > 0) {
           //2 
           List<AnswerAppendDTO> answers = sendAppendToAllPeers(peersIds);
           peersIds = new ArrayList<>();
           for (AnswerAppendDTO answer : answers) {
               //3
               if (answer.getStatusCode().equals(OK)) {
                   //4
                   if (answer.getTerm() > context.getCurrentTerm()) {
                        context.setTermGreaterThenCurrent(answer.getTerm());
                       return;
                   }
                   Peer peer = context.getPeer(answer.getId());
                   //5     
                   if (answer.getSuccess()) {                      
                       peer.setNextIndex(answer.getMatchIndex() + 1);
                       peer.setMatchIndex(answer.getMatchIndex());
                       if (peer.getNextIndex() <= operationsLog.getLastIndex())
                           peersIds.add(answer.getId());
                   //6      
                   } else {
                       peer.decNextIndex();
                       peersIds.add(answer.getId());
                   }
               }
           }
           //7
           tryToCommit();
       }
}

  1. Nous répétons la demande jusqu'à ce que nous recevions une réponse de tous les abonnés que la réplication a réussi. Étant donné qu'une opération est envoyée par demande, la synchronisation des journaux des abonnés peut prendre plusieurs itérations.
  2. Envoyez des demandes à tous les abonnés et obtenez une liste de réponses
  3. Nous considérons les réponses uniquement des abonnés disponibles
  4. S'il s'avère que le tour d'un des suiveurs est supérieur au tour du leader, nous arrêtons tout et devenons un suiveur
  5. Si le suiveur répond que tout a réussi, nous mettons à jour les métadonnées du suiveur: nous enregistrons le dernier index du journal du suiveur et l'index de la prochaine opération attendue par le suiveur.
  6. , , , , . , , . , . , .
  7. , . .


Voyons maintenant comment exactement le suiveur traite la demande d'ajout du leader.
Méthode de appendclasseReplicationServiceImpl

public AnswerAppendDTO append(RequestAppendDTO dto) {
     
       //1     
       if (dto.getTerm() < context.getCurrentTerm()) {
           return new AnswerAppendDTO(context.getId(),context.getCurrentTerm(),false, null);
       } else if (dto.getTerm() > context.getCurrentTerm()) {
           //2 
           context.setCurrentTerm(dto.getTerm());
           context.setVotedFor(null);
       }
       //3  
       applicationEventPublisher.publishEvent(new ResetElectionTimerEvent(this));

       if (!context.getState().equals(FOLLOWER)) {
           context.setState(FOLLOWER);
       }
        
       //4  
       if ((dto.getPrevLogIndex() > operationsLog.getLastIndex()) ||                                                                                        !dto.getPrevLogTerm().equals(operationsLog.getTerm(dto.getPrevLogIndex()))) {
                      return new AnswerAppendDTO(context.getId(), context.getCurrentTerm(), false, null);
       }


       Operation newOperation = dto.getOperation();
       if (newOperation != null) {
           int newOperationIndex = dto.getPrevLogIndex() + 1;
           
         synchronized (this) {
               //5
               if ((newOperationIndex <= operationsLog.getLastIndex()) &&
                      (!newOperation.getTerm().equals(operationsLog.getTerm(newOperationIndex)))){
                   operationsLog.removeAllFromIndex(newOperationIndex);
               }
               //6
               if (newOperationIndex <= operationsLog.getLastIndex())
               {
                 return new AnswerAppendDTO(context.getId(), context.getCurrentTerm(), true,      operationsLog.getLastIndex());
               }
               //7
               operationsLog.append(newOperation);
           }
        }
        //8 
        if (dto.getLeaderCommit() > context.getCommitIndex()) {
           context.setCommitIndex(Math.min(dto.getLeaderCommit(), operationsLog.getLastIndex()));
       }

                 
       return new AnswerAppendDTO(context.getId(), context.getCurrentTerm(), true, operationsLog.getLastIndex());
   }

  1. Si le tour du leader est inférieur au tour du suiveur, alors nous envoyons à notre chef un tour et un signe que sa demande a été rejetée. Dès que le leader reçoit un round plus grand que le sien en réponse, il se transforme en suiveur
  2. Si le tour du leader est supérieur au tour du suiveur, définissez ce tour sur le suiveur.
  3. Depuis que la demande a été reçue du leader, qu'il y ait des données ou non, nous remettons à zéro la minuterie de vote et, si nous n'étions pas un suiveur, nous le devenons
  4. , , , , , , . , ,
  5. , . . , , - , , , , . , .
  6. , . ,
  7. ,
  8. , , , .


Il ne reste plus qu'à comprendre comment le leader applique les opérations du journal à la base de données. En train d'envoyer des opérations aux suiveurs et de traiter les réponses de ceux-ci, le leader met à jour les métadonnées des nœuds. Dès que le nombre de nœuds dont l'index de la dernière opération dans le journal est supérieur à l'index de la dernière opération appliquée à la base de données par le leader devient égal au quorum, nous pouvons déclarer que la plupart des nœuds ont reçu l'opération et nous pouvons l'appliquer à la base de données du leader. En d'autres termes, si un leader a envoyé une opération à des abonnés et que la plupart d'entre eux l'ont insérée dans son journal et ont répondu au leader, nous pouvons appliquer cette opération à la base de données du leader et augmenter l'index de la dernière opération appliquée. Cet index avec la prochaine demande append-heartbeat volera vers le suiveur et appliquera l'opération avec le même index de son journal à sa base de données.

Analysons la méthode de tryToCommitclasseReplicationServiceImpl

  private void tryToCommit() {
       while (true) {
           //1
           int N = context.getCommitIndex() + 1;
           //2
           Supplier<Long> count = () ->
               context.getPeers().stream().map(Peer::getMatchIndex).
                       filter(matchIndex -> matchIndex >= N).count() + 1;

           //3 
           if (operationsLog.getLastIndex() >= N &&
                   operationsLog.getTerm(N).equals(context.getCurrentTerm())&&
                      count.get()>=context.getQuorum()
           )
           {
               context.setCommitIndex(N);
           } else
               return;
       }
   }

  1. Nous obtenons l'index suivant de l'opération appliquée à la base de données
  2. Nous comptons combien de followers ont une opération avec un tel index dans leurs logs, et n'oubliez pas d'ajouter un leader
  3. Si le nombre de ces abonnés est le quorum et que l'opération avec un tel index est dans le journal du leader, et que le cycle de cette opération est équivalent à celui en cours, le leader applique l'opération à la base de données et augmente l'index de la dernière opération appliquée. Les opérations du cycle précédent ne peuvent pas être appliquées, car un autre chef en était responsable et un conflit pourrait survenir. Chaque chef applique des opérations uniquement de son tour en cours.

Conclusion


Tout algorithme distribué, dont le représentant de la famille est RAFT, est une solution intégrée puissante qui garantit l'obtention du résultat, sous réserve de toutes les règles décrites dans le cahier des charges.

Il existe de nombreux algorithmes distribués et ils sont différents. Il y a ZAB, qui est implémenté dans Zookeeper et qui est utilisé, par exemple, pour synchroniser les données dans Kafka. Il existe des algorithmes avec des exigences de cohérence moins strictes, par exemple, la masse des implémentations du protocole Gossip qui sont utilisées dans les systèmes AP. Il existe des algorithmes qui suivent les principes de RAFT et utilisent en même temps le protocole Gossip pour échanger des journaux tels que MOKKA, qui utilise également le cryptage.

Je crois qu'essayer de comprendre l'un de ces algorithmes est extrêmement utile pour tout développeur, et comme je l'ai mentionné ci-dessus, les solutions peuvent être intéressantes à la fois de manière globale et dans des parties distinctes. Et évidemment, il faut absolument aller dans cette direction vers ceux dont les activités sont liées au développement de systèmes distribués et concernant les problèmes de synchronisation des données, même s'ils utilisent des solutions industrielles standard.

Références



Nous espérons que le matériel vous a été utile. Et si vous voulez suivre un cours , vous pouvez le faire dès maintenant.

All Articles