Implementierung des RAFT-Konsensalgorithmus für verteilten KV-Speicher in Java

Hallo wieder. Vor einigen Tagen begann die Ausbildung in einer neuen Gruppe zum Kurs „Software Architect“ . Heute möchten wir einen Artikel teilen, der von einem der Kursteilnehmer, Anton Pleshakov (Entwicklungsleiter bei Program Logistics und Mitbegründer bei Clusterra), verfasst wurde.




Derzeit sind verteilte Microservice-Systeme nicht nur in der Unternehmenswelt zum Industriestandard geworden. Die Vorteile der Verwendung verteilter Systeme wurden mehr als einmal beschrieben und diskutiert. Die Vorteile von Microservices sind seit langem allen bekannt: Technologien für die Aufgabe, Kompositionsfähigkeit, Skalierbarkeit, Entwicklungsskalierung, TTM-Reduzierung usw. Es ist offensichtlich, dass die Entwicklung verteilter Anwendungen mehr Optionen bietet, um rechtzeitig auf wachsende Geschäftsanforderungen zu reagieren und alles zu digitalisieren.

Es ist auch wichtig anzumerken, dass derzeit ein sehr wichtiger Faktor für die Wahl einer Entwicklungsstrategie zugunsten von Microservices die Verfügbarkeit aller Arten von vorgefertigten Infrastrukturlösungen ist, die die Lösung von Problemen übernehmen, die mit den zusätzlichen Kosten für den Betrieb eines verteilten Systems verbunden sind. Wir sprechen über Container-Orchestrierungssysteme, Service-Mash, Mittel zur verteilten Ablaufverfolgung, Überwachung, Protokollierung usw. Es kann mit Sicherheit festgestellt werden, dass die meisten Faktoren, die zuvor als Minuspunkte des heutigen Microservice-Ansatzes genannt wurden, nicht mehr so ​​viel Einfluss haben wie vor ein paar Jahren.

Basierend auf modernen Realitäten suchen die meisten Entwickler nach der ersten Möglichkeit, von einer monolithischen Struktur zu einer Microservice-Struktur zu wechseln. Einer der ersten Schritte, die unternommen werden können, ohne auf eine vollständige Umgestaltung und ernsthafte Zerlegung zurückzugreifen, ist die Erzielung eines horizontalen Skalierbarkeitssystems. Das heißt, Sie verwandeln Ihre monolithische Anwendung in einen Cluster, der möglicherweise sogar aus denselben Monolithen besteht, aber es Ihnen ermöglicht, deren Anzahl dynamisch zu variieren.

Beim Versuch, eine horizontale Skalierbarkeit zu erreichen, stellt sich die Frage der Datensynchronisation innerhalb eines Clusters sehr schnell und sehr akut. Glücklicherweise unterstützen alle modernen DBMS die Datenreplikation zwischen Knoten auf die eine oder andere Weise. Der Entwickler muss nur das DBMS für die Aufgabe auswählen und entscheiden, welche Eigenschaften des Systems (gemäß dem CAP-Theorem) er benötigt, CP oder AP, und das Problem ist behoben. In dem Fall, in dem CP erforderlich ist und die Anforderungen an die Konsistenz hoch sind, besteht eine der Methoden zur Lösung des Problems der Datensynchronisation darin, einen Cluster zu verwenden, der den RAFT-Konsensalgorithmus unterstützt.

Dieser ziemlich neue Algorithmus (wurde 2012 entwickelt) bietet eine hohe Konsistenzgarantie und ist sehr beliebt. Ich beschloss herauszufinden, wie es funktioniert, und schrieb meine Implementierung eines konsistenten Schlüsselwert-Repositorys in Java (Spring Boot).

Ist es sinnvoll, einen verteilten Algorithmus selbst zu implementieren? Es ist klar, dass Sie eine vorgefertigte Implementierung eines verteilten Algorithmus durchführen können, und mit der höchsten Wahrscheinlichkeit ist diese Implementierung besser als ein selbst hergestelltes „Fahrrad“. Beispielsweise können Sie ein DBMS verwenden, das die erforderliche Konsistenzstufe beibehält. Oder Sie können Zookeeper bereitstellen . Oder Sie finden einen Rahmen, der zu Ihrer Sprache passt. Für Java gibt es Atomix , das die Probleme der Synchronisierung verteilter Daten perfekt löst.

Aber auf der anderen Seite. Wenn Sie eine schlüsselfertige Lösung wählen, führt die Verwendung einer externen Anwendung normalerweise zu einem zusätzlichen Fehlerpunkt für Ihr System. Und Frameworks können redundant oder schwierig zu bedienen und zu erlernen sein oder für Ihre Programmiersprache überhaupt nicht vorhanden sein. Darüber hinaus ist die unabhängige Implementierung des Konsensalgorithmus eine äußerst interessante technische Aufgabe, die Ihren Horizont erweitert und Ihnen ein Verständnis dafür vermittelt, wie Sie die Probleme lösen können, die auftreten, wenn Dienste in einem Cluster mit der optimaleren Methode interagieren.

Da die Spezifikation des Algorithmus eine Reihe von Maßnahmen zur Aufrechterhaltung der Datenintegrität enthält, können Sie das erworbene Wissen und sogar den Algorithmus in seiner Gesamtheit verwenden. Jeder Teil des Algorithmus kann im wirklichen Leben nützlich sein. Angenommen, Sie haben eine Reihe von Workern zum parallelen Parsen von Dateien. Arbeiter sind gleichwertig, aber Sie möchten einen der Arbeiter als Koordinator bestimmen, und wenn der koordinierende Arbeiter fällt, weisen Sie einen anderen freien Arbeiter als Koordinator zu. Die erste Hälfte des RAFT-Algorithmus, in der beschrieben wird, wie ein Leader unter äquivalenten Knoten ausgewählt wird, hilft Ihnen dabei. Wenn Sie beispielsweise nur zwei Knoten in Bezug auf Master-Slave haben, können Sie die in der RAFT-Spezifikation beschriebenen Replikationsregeln sehr gut verwenden, um den Datenaustausch in Ihrem einfacheren Fall zu organisieren.

Der Artikel ist im Wesentlichen eine praktische Anleitung zur Implementierung von RAFT. Der Algorithmus selbst und die theoretischen Aspekte seiner Arbeit werden nicht verstanden. Sie können hier in diesem ausgezeichneten Artikel eine kurze Beschreibung lesen oder die vollständige Spezifikation hier studieren . Dort finden Sie eine sehr übersichtliche Visualisierung des Algorithmus.

Allgemeine Lösungsbeschreibung


Der Teil des Codes, der in direktem Zusammenhang mit der Implementierung des Algorithmus steht, wird im Artikel analysiert. Am Ende des Artikels befindet sich ein Link zum Repository, in dem Sie den gesamten Code sehen können.

Die Aufgabe war wie folgt. Entwickeln Sie ein verteiltes System, mit dem Sie Daten in einer Schlüsselwertdatenbank speichern können. Die Daten jedes Knotens müssen konsistent sein, dh wenn die Daten in die Datenbank eines Knotens gelangt sind und die meisten Knoten bestätigt haben, dass sie auch diese Daten empfangen haben, befinden sich diese Daten früher oder später in der Datenbank jedes Knotens. Wenn ein Teil des Clusters getrennt und wieder verbunden wird, müssen die Knoten außerhalb des Clusters den Hauptcluster einholen und synchronisieren. Jeder Knoten bietet eine REST-API zum Schreiben und Lesen von Datenbankdaten. Das System besteht aus zwei Modulen für zwei Knotentypen: Client und Server. Im Folgenden werden die Funktionen der Implementierung des Servers selbst betrachtet. Der Client-Code befindet sich im Repository.

Ein Serverknoten kann in drei Zuständen betrieben werden:

  • Anhänger (Anhänger). Akzeptiert Leseanforderungen vom Client. Nimmt dem Anführer einen Herzschlag ab
  • Kandidat (Kandidat). Akzeptiert Leseanforderungen vom Client. Sendet Abstimmungsanfragen an andere Knoten
  • Führer Akzeptiert Lese- und Schreibanforderungen. Sendet Heartbeat-Anforderungen an andere Knoten. Sendet Daten zum Anhängen von Anforderungen an andere Knoten.

Die Periode der „Führung“ eines der Knoten wird als Runde (Term) bezeichnet. Ein neuer Kandidat eröffnet eine neue Runde.

Datenspeicher


Jeder Knoten bietet Zugriff auf das Repository des Operationsprotokolls, in dem Operationen zum Ändern von Daten nacheinander aufgezeichnet werden.

https://github.com/pleshakoff/raft/blob/master/server/src/main/java/com/raft/server/operations/OperationsLog.java


public interface OperationsLog {
   void append(Operation operation);
   Operation get(Integer index);
   List<Operation> all();

   Long getTerm(Integer index);
   Integer getLastIndex();
   Long getLastTerm();

   void removeAllFromIndex(int newOperationIndex);
}

Jede Operation enthält neben Daten und Typ (Einfügen, Ändern, Löschen) die Nummer der Runde, in der sie erstellt wurde. Zusätzlich hat jede Operation einen Index, der sequentiell ansteigt. Es ist wichtig, dass alle Vorgänge in derselben Reihenfolge in die Protokolle der Follower eingefügt werden, in der sie in das Protokoll des Anführers eingefügt werden.

Jeder Knoten hat Zugriff auf eine Datenbank, in der Daten direkt gespeichert werden.

https://github.com/pleshakoff/raft/blob/master/server/src/main/java/com/raft/server/storage/Storage.java

public interface Storage {
   List<Entry> all();
   String get(Long key);
   void insert(Long key, String val);
   void update(Long key, String val);
   void delete(Long key);
}

In der aktuellen Implementierung werden eingebettete In-Memory-Lösungen sowohl für das Protokoll als auch für die Datenbank verwendet (normale Wettbewerbsliste und Karte). Bei Bedarf können Sie einfach die entsprechende Schnittstelle implementieren, um andere Speichertypen zu unterstützen.

Die Anwendung von Operationen aus dem Protokoll auf die Datenbank wird von einer verteilten Zustandsmaschine ausgeführt. Eine Zustandsmaschine ist ein solcher Mechanismus, der dafür verantwortlich ist, den Status eines Clusters zu ändern und die Verwendung falscher Änderungen einzuschränken (Operationen außerhalb der Reihenfolge oder ein getrennter Knoten, der sich selbst als führend betrachtet). Damit die Änderungen als gültig angesehen und auf die Datenbank angewendet werden können, müssen sie eine Reihe von Prüfungen bestehen und bestimmte Kriterien erfüllen, genau das bietet die Zustandsmaschine.

Für einen Leiter wird eine Operation auf die Datenbank angewendet, wenn die meisten Knoten die Tatsache bestätigt haben, dass die Operation auch in ihr Protokoll repliziert wird. Für einen Follower wird die Operation auf die Datenbank angewendet, wenn vom Leiter ein Signal empfangen wird, dass er in seine Datenbank gelangt ist.

Timer


Jeder Knoten bietet Datenaustausch mit anderen Knoten.

Es werden zwei Arten von Abfragen unterstützt:

  • Abstimmung bei der Durchführung einer Abstimmungsrunde
  • Anhängen, auch bekannt als Heartbeat (falls ohne Daten), um Protokolldaten an Follower zu replizieren und den Beginn einer neuen Abstimmungsrunde zu verhindern.

Die Tatsache, dass ein Ereignis einsetzt, wird vom Timer bestimmt. Auf dem Knoten werden zwei Arten von Timern gestartet:

  • Abstimmung. Eine Abstimmungsrunde starten. Jeder Knoten hat ein eigenes Intervall, nach dem er versucht, eine neue Abstimmung zu starten. Der Countdown beginnt von neuem, wenn der Anführer einen Herzschlag erhält.
  • Herzschlag. Senden einer Anfrage an Follower durch den Anhängeführer. Wenn der Knoten keinen Herzschlag empfängt und der Abstimmungszeitgeber abgelaufen ist, wird er ein Kandidat und leitet Wahlen ein, erhöht die Anzahl der Abstimmungsrunden und sendet Abstimmungsanfragen an andere Knoten. Wenn der Knoten die Mehrheit der Stimmen sammelt, wird er zum Anführer und sendet einen Herzschlag aus.

Der aktuelle Status des Knotens


Jeder Knoten speichert Daten über den aktuellen Status.

https://github.com/pleshakoff/raft/blob/master/server/src/main/java/com/raft/server/context/Context.java

public interface Context {
   Integer getId(); //    
   State getState();//: , ,  
   Integer getVotedFor(); 
               //          
   Long getCurrentTerm(); //  
   Integer getCommitIndex(); //    
   List<Peer> getPeers(); //      
}

Ein Führungsknoten speichert auch Metadaten für die Knoten, auf die er Daten repliziert.

https://github.com/pleshakoff/raft/blob/master/server/src/main/java/com/raft/server/node/peers/Peer.java

public interface Peer {
   Integer getId(); //  
   Integer getNextIndex(); //  ,    
   Integer getMatchIndex();//   
   Boolean getVoteGranted(); //     
}

Knotenmetadaten werden vom Leiter aktualisiert, wenn Antworten von Followern empfangen werden. Sie werden verwendet, um vom Leiter zu bestimmen, welche nächste Indexoperation der Follower akzeptieren möchte und welche Operationen bereits zum Protokoll des Followers hinzugefügt wurden.

Wählen


Die ElectionService- Klasse ist für die Abstimmung verantwortlich

public interface ElectionService {
   void processElection();
   AnswerVoteDTO vote(RequestVoteDTO requestVoteDTO);
} 

Senden eines Antrags auf Abstimmung


Wenn der Knoten ein Follower ist und für den für die Wartezeit festgelegten Zeitraum keinen Herzschlag erhält, erhöht er seine aktuelle Runde, erklärt sich selbst zum Kandidaten und beginnt, Abstimmungsanfragen an andere Knoten zu senden. Wenn es ihm gelingt, ein Quorum zu erreichen und die meisten Knoten seine Stimme abgeben, wird er der neue Führer. In RAFT-Begriffen ist das Quorum mehr als die Hälfte aller Knoten (51%).

Lassen Sie uns die processElectionKlassenmethode analysieren ElectionServiceImpl, die vom Abstimmungs-Timer aufgerufen wird, wenn die Abstimmung abläuft, und den Knoten eine Anforderung zur Abstimmung sendet .

//1
context.setState(CANDIDATE); 
Long term = context.incCurrentTerm(); 
context.setVotedFor(context.getId()); 

List<Integer> peersIds = context.getPeers().stream().map(Peer::getId).collect(Collectors.toList());
long voteGrantedCount = 1L;
long voteRevokedCount = 0L;

//2
while (checkCurrentElectionStatus(term)) {
   List<AnswerVoteDTO> answers = getVoteFromAllPeers(term, peersIds);
   peersIds = new ArrayList<>();
   for (AnswerVoteDTO answer : answers) {
       //3
       if (answer.getStatusCode().equals(OK)) {
           //4
           if (answer.getTerm()>context.getCurrentTerm()) {
               context.setTermGreaterThenCurrent(answer.getTerm());
               return;
           }
           if (answer.isVoteGranted()) {
               //5 
               context.getPeer(answer.getId()).setVoteGranted(true);
               voteGrantedCount++;
           } else
               //6 
               voteRevokedCount++;
       } else {
          peersIds.add(answer.getId());
       }
   }
  //7
  if (voteGrantedCount >= context.getQuorum()) {
       winElection(term);
       return;
   } else if (voteRevokedCount >= context.getQuorum()) {
       loseElection(term);
       return;
   } 

  1. Stellen Sie den Status "Kandidat" ein. Erhöhen Sie die runde Zahl und stimmen Sie für uns ab.
  2. , ( ). - , , heartbeat .
  3. - , . , , -.
  4. , . , heartbeat .
  5. Der Knoten hat für uns gestimmt! Wir erhöhen die Anzahl der Knoten, die Stimmen für uns abgeben, und korrigieren, dass dieser Knoten für uns gestimmt hat.
  6. Nicht für uns gewählt, glauben wir auch.
  7. Wenn das Quorum gesammelt wird und der Knoten die Wahl gewonnen hat, legen wir den Status „Leader“ fest. Ansonsten werden wir Anhänger und warten.

Es sollte auch beachtet werden, dass, wenn ein Knoten zum Leader wird, der nächste Index für jeden Knoten in der Liste der vom Leader gespeicherten Knoten festgelegt wird. Dies entspricht dem letzten Index im Leader-Protokoll plus 1. Ausgehend von diesem Index versucht der Leader, die Follower-Protokolle zu aktualisieren. Tatsächlich entspricht dieser vom Leader gespeicherte Index möglicherweise nicht dem tatsächlichen Index des Protokolls des Followers, und der tatsächliche Wert wird nur beim Datenaustausch mit dem Follower ermittelt und angepasst. Es ist jedoch ein Ausgangspunkt erforderlich .

  private void winElection(Long term) {
       context.setState(LEADER);
       context.getPeers().forEach(peer ->
               peer.setNextIndex(operationsLog.getLastIndex()+1)

       );
   }

Bearbeitung von Abstimmungsanfragen


Bei der Abstimmung erhält jeder Knoten vom Kandidaten eine Anfrage der folgenden Form :

class RequestVoteDTO {
   private final Long term; //     
   private final Integer candidateId; //  
   private final Integer lastLogIndex; //     
   private final Long lastLogTerm; //       
}

Schauen wir uns nun das voteKlassenverfahren an ElectionServiceImpl, es verarbeitet die Abstimmungsanfrage des Kandidaten und gibt eine Entscheidung bezüglich seiner Kandidatur für die Rolle des Leiters zurück.

https://github.com/pleshakoff/raft/blob/eba5ea1984e2623702f4c299cf1b0af7a6ba0d14/server/src/main/java/com/raft/server/election/ElectionServiceImpl.java#L178


public AnswerVoteDTO vote(RequestVoteDTO dto) {
   
       boolean termCheck;
       //1
       if (dto.getTerm() < context.getCurrentTerm())
           return new AnswerVoteDTO(context.getId(),context.getCurrentTerm(),false);
       else //2
       if (dto.getTerm().equals(context.getCurrentTerm())) {
           termCheck = (context.getVotedFor() == null||
                          context.getVotedFor().equals(dto.getCandidateId()));
       }
       else
       {   //3
           termCheck = true;
             context.setTermGreaterThenCurrent(dto.getTerm());
       }

       //4  
       boolean logCheck = !((operationsLog.getLastTerm() > dto.getLastLogTerm()) ||
               ((operationsLog.getLastTerm().equals(dto.getLastLogTerm())) &&
                       (operationsLog.getLastIndex() > dto.getLastLogIndex())));


       boolean voteGranted = termCheck&&logCheck;

       //5
       if (voteGranted) {
           context.setVotedFor(dto.getCandidateId());
       }
       //6   
       return new AnswerVoteDTO(context.getId(),context.getCurrentTerm(),voteGranted);
   }

Nach Erhalt einer Anfrage von einem Kandidaten führt der Knoten zwei Überprüfungen durch: Überprüft die Runde des Kandidaten und die Länge seines Protokolls. Wenn die Runde des Kandidaten höher ist und sein Protokoll länger oder gleich ist, gibt der Knoten seinem Knoten eine Stimme für den Kandidaten

  1. Wenn die aktuelle Runde des Knotens größer ist als die Runde des Kandidaten, lehnen wir ab, da dies eine Anfrage eines verzögerten Knotens ist, der anscheinend einige Zeit außerhalb des Clusters lag und das Wahlverfahren gestartet hat, weil er den aktuellen Führer nicht gesehen hat.
  2. , , , , , , ; . — .
  3. ,
  4. . , , , , .
  5. Mit einem positiven Ergebnis korrigieren wir die Tatsache, dass der Knoten an den Wahlen teilgenommen hat, und geben eine Stimme für den Kandidaten ab.
  6. Senden Sie das Ergebnis an den Kandidaten zurück

Sicherlich hätten die Bedingungen etwas kürzer und eleganter geschrieben werden können, aber ich habe eine so „naivere“ Option gelassen, um mich nicht zu verwirren und niemanden zu verwirren.

Reproduzieren


Der Timer-Leader sendet Heartbeat-Follower an alle Knoten, um deren Abstimmungs-Timer zurückzusetzen. Da der Leiter in seinen Metadatenindizes die letzten Operationen aller Follower speichert, kann er bewerten, ob das Senden der Operation an Knoten erforderlich ist. Wenn das Operationsprotokoll des Anführers länger wird als das Protokoll eines Nachfolgers, sendet er ihm zusammen mit dem Herzschlag nacheinander die fehlenden Operationen. Nennen Sie es Append-Anfrage. Wenn die meisten Knoten den Empfang neuer Operationen bestätigen, wendet der Leiter diese Operationen auf seine Datenbank an und erhöht den Index der zuletzt angewendeten Operation. Dieser Index wird zusammen mit einer Heartbeat-Anfrage auch an Follower gesendet. Und wenn der Leader-Index höher als der Follower-Index ist, wendet der Follower auch Operationen auf seine Datenbank an, um die Indizes auszugleichen.

Diese Art von Append-Anfrage sendet der Leader an den Follower

class RequestAppendDTO {
   private final Long term; //   
   private final Integer leaderId; //   

   private final Integer prevLogIndex;//   
   private final Long prevLogTerm;//   
   private final Integer leaderCommit;//      
   private final Operation operation; //
}

Es gibt Implementierungen, bei denen Vorgänge in Stapeln von mehreren pro Anforderung übertragen werden. In der aktuellen Implementierung kann nur eine Operation pro

Anforderung übertragen werden. Die Klasse antwortet auf das Senden und Verarbeiten der Heartbeat-Append-Anforderung:

https://github.com/pleshakoff/raft/blob/eba5ea1984e2623702f4c299cf1b0af7a6ba0d14/server/src/main/java/com/raft/server/replication/ReplicationService.java

public interface ReplicationService {
   void appendRequest();
   AnswerAppendDTO append(RequestAppendDTO requestAppendDTO);
}

Senden Sie eine Datenänderungsanforderung


Stellen Sie sich ein Fragment einer sendAppendForOnePeerKlassenmethode vor ReplicationServiceImpl

. Die Methode ist dafür verantwortlich, eine Anforderung an den Follower zu generieren und diese zu senden .

private CompletableFuture<AnswerAppendDTO> sendAppendForOnePeer(Integer id) {
   return CompletableFuture.supplyAsync(() -> {
       try {
           //1
           Peer peer = context.getPeer(id);

           Operation operation;
           Integer prevIndex;
           //2    
           if (peer.getNextIndex() <= operationsLog.getLastIndex()) {
               operation = operationsLog.get(peer.getNextIndex());
               prevIndex = peer.getNextIndex() - 1;
           } else 
           //3  
           {
               operation = null;
               prevIndex = operationsLog.getLastIndex();
           }


           RequestAppendDTO requestAppendDTO = new RequestAppendDTO(
                   context.getCurrentTerm(), //   
                   context.getId(), //  
                   prevIndex,//      
                   operationsLog.getTerm(prevIndex),//  
                   context.getCommitIndex(),
                               //      
                   Operation //
           );

...
/*   http     */
}

  1. Follower-Metadaten
  2. , . ( ), , , , . , , , ,
  3. , , ; , , ,

Betrachten Sie als Nächstes die appendRequestKlassenmethode ReplicationServiceImpl, die für das Senden der Append-Anforderung und die Verarbeitung des Ergebnisses an alle Follower verantwortlich ist.

https://github.com/pleshakoff/raft/blob/eba5ea1984e2623702f4c299cf1b0af7a6ba0d14/server/src/main/java/com/raft/server/replication/ReplicationServiceImpl.java#L109

public void appendRequest() {
       List<Integer> peersIds = context.getPeers().stream().map(Peer::getId).collect(Collectors.toList());

       //1 
       while (peersIds.size() > 0) {
           //2 
           List<AnswerAppendDTO> answers = sendAppendToAllPeers(peersIds);
           peersIds = new ArrayList<>();
           for (AnswerAppendDTO answer : answers) {
               //3
               if (answer.getStatusCode().equals(OK)) {
                   //4
                   if (answer.getTerm() > context.getCurrentTerm()) {
                        context.setTermGreaterThenCurrent(answer.getTerm());
                       return;
                   }
                   Peer peer = context.getPeer(answer.getId());
                   //5     
                   if (answer.getSuccess()) {                      
                       peer.setNextIndex(answer.getMatchIndex() + 1);
                       peer.setMatchIndex(answer.getMatchIndex());
                       if (peer.getNextIndex() <= operationsLog.getLastIndex())
                           peersIds.add(answer.getId());
                   //6      
                   } else {
                       peer.decNextIndex();
                       peersIds.add(answer.getId());
                   }
               }
           }
           //7
           tryToCommit();
       }
}

  1. Wir wiederholen die Anfrage, bis wir von allen Followern eine Antwort erhalten, dass die Replikation erfolgreich war. Da pro Anforderung eine Operation gesendet wird, kann es mehrere Iterationen dauern, um die Protokolle der Follower zu synchronisieren
  2. Senden Sie Anfragen an alle Follower und erhalten Sie eine Liste mit Antworten
  3. Wir berücksichtigen nur Antworten von verfügbaren Followern
  4. Wenn sich herausstellt, dass die Runde eines der Anhänger mehr ist als die Runde des Anführers, stoppen wir alles und verwandeln uns in einen Anhänger
  5. Wenn der Follower geantwortet hat, dass alles erfolgreich war, aktualisieren wir die Follower-Metadaten: Wir speichern den letzten Index des Follower-Protokolls und den Index der nächsten vom Follower erwarteten Operation.
  6. , , , , . , , . , . , .
  7. , . .


Nun wollen wir sehen, wie genau der Follower die Append-Anfrage vom Leader verarbeitet. Klasse -
MethodeappendReplicationServiceImpl

public AnswerAppendDTO append(RequestAppendDTO dto) {
     
       //1     
       if (dto.getTerm() < context.getCurrentTerm()) {
           return new AnswerAppendDTO(context.getId(),context.getCurrentTerm(),false, null);
       } else if (dto.getTerm() > context.getCurrentTerm()) {
           //2 
           context.setCurrentTerm(dto.getTerm());
           context.setVotedFor(null);
       }
       //3  
       applicationEventPublisher.publishEvent(new ResetElectionTimerEvent(this));

       if (!context.getState().equals(FOLLOWER)) {
           context.setState(FOLLOWER);
       }
        
       //4  
       if ((dto.getPrevLogIndex() > operationsLog.getLastIndex()) ||                                                                                        !dto.getPrevLogTerm().equals(operationsLog.getTerm(dto.getPrevLogIndex()))) {
                      return new AnswerAppendDTO(context.getId(), context.getCurrentTerm(), false, null);
       }


       Operation newOperation = dto.getOperation();
       if (newOperation != null) {
           int newOperationIndex = dto.getPrevLogIndex() + 1;
           
         synchronized (this) {
               //5
               if ((newOperationIndex <= operationsLog.getLastIndex()) &&
                      (!newOperation.getTerm().equals(operationsLog.getTerm(newOperationIndex)))){
                   operationsLog.removeAllFromIndex(newOperationIndex);
               }
               //6
               if (newOperationIndex <= operationsLog.getLastIndex())
               {
                 return new AnswerAppendDTO(context.getId(), context.getCurrentTerm(), true,      operationsLog.getLastIndex());
               }
               //7
               operationsLog.append(newOperation);
           }
        }
        //8 
        if (dto.getLeaderCommit() > context.getCommitIndex()) {
           context.setCommitIndex(Math.min(dto.getLeaderCommit(), operationsLog.getLastIndex()));
       }

                 
       return new AnswerAppendDTO(context.getId(), context.getCurrentTerm(), true, operationsLog.getLastIndex());
   }

  1. Wenn die Runde des Anführers geringer ist als die Runde des Nachfolgers, senden wir unserem Anführer eine Runde und ein Zeichen, dass seine Anfrage abgelehnt wurde. Sobald der Anführer eine Runde erhält, die größer ist als seine Antwort, wird er zu einem Anhänger
  2. Wenn die Runde des Anführers größer ist als die Runde des Nachfolgers, setzen Sie diese Runde auf den Nachfolger.
  3. Da die Anfrage vom Vorsitzenden eingegangen ist, setzen wir den Abstimmungszeitgeber zurück, unabhängig davon, ob Daten vorhanden sind oder nicht, und wenn wir kein Anhänger sind, werden wir es
  4. , , , , , , . , ,
  5. , . . , , - , , , , . , .
  6. , . ,
  7. ,
  8. , , , .


Es bleibt nur herauszufinden, wie der Leiter Operationen aus dem Protokoll auf die Datenbank anwendet. Beim Senden von Vorgängen an Follower und Verarbeiten von Antworten von diesen aktualisiert der Leiter die Metadaten der Knoten. Sobald die Anzahl der Knoten, deren Index der letzten Operation im Protokoll größer ist als der Index der letzten Operation, die vom Leader auf die Datenbank angewendet wurde, dem Quorum entspricht, können wir angeben, dass die meisten Knoten die Operation empfangen haben, und sie auf die Leader-Datenbank anwenden. Mit anderen Worten, wenn ein Anführer eine Operation an Follower gesendet hat und die meisten von ihnen sie in sein Protokoll eingefügt und dem Anführer geantwortet haben, können wir diese Operation auf die Datenbank des Anführers anwenden und den Index der zuletzt angewendeten Operation erhöhen. Dieser Index mit der nächsten Append-Heartbeat-Anforderung fliegt zum Follower und wendet die Operation mit demselben Index aus seinem Protokoll auf seine Datenbank an.

Lassen Sie uns die tryToCommitKlassenmethode analysierenReplicationServiceImpl

  private void tryToCommit() {
       while (true) {
           //1
           int N = context.getCommitIndex() + 1;
           //2
           Supplier<Long> count = () ->
               context.getPeers().stream().map(Peer::getMatchIndex).
                       filter(matchIndex -> matchIndex >= N).count() + 1;

           //3 
           if (operationsLog.getLastIndex() >= N &&
                   operationsLog.getTerm(N).equals(context.getCurrentTerm())&&
                      count.get()>=context.getQuorum()
           )
           {
               context.setCommitIndex(N);
           } else
               return;
       }
   }

  1. Wir erhalten den folgenden Index der Operation, die auf die Datenbank angewendet wird
  2. Wir zählen, wie viele Follower eine Operation mit einem solchen Index in ihren Protokollen haben, und vergessen nicht, einen Leader hinzuzufügen
  3. Wenn die Anzahl solcher Follower beschlussfähig ist und sich die Operation mit einem solchen Index im Protokoll des Leiters befindet und die Runde dieser Operation der aktuellen entspricht, wendet der Leiter die Operation auf die Datenbank an und erhöht den Index der zuletzt angewendeten Operation. Operationen aus der vorherigen Runde können nicht angewendet werden, da ein anderer Anführer für sie verantwortlich war und ein Konflikt entstehen könnte. Jeder Anführer wendet nur Operationen seiner aktuellen Runde an.

Fazit


Jeder verteilte Algorithmus, dessen Vertreter RAFT ist, ist eine leistungsstarke integrierte Lösung, die das Erreichen des Ergebnisses unter Einhaltung aller in der Spezifikation beschriebenen Regeln garantiert.

Es gibt viele verteilte Algorithmen und sie sind unterschiedlich. Es gibt ZAB, das in Zookeeper implementiert ist und beispielsweise zum Synchronisieren von Daten in Kafka verwendet wird. Es gibt Algorithmen mit weniger strengen Anforderungen an die Konsistenz, beispielsweise die Masse der Implementierungen des Gossip-Protokolls, die in AP-Systemen verwendet werden. Es gibt Algorithmen, die den Prinzipien von RAFT folgen und gleichzeitig das Klatschprotokoll für den Austausch von Protokollen verwenden, z. B. MOKKA, das auch Verschlüsselung verwendet.

Ich glaube, dass der Versuch, einen dieser Algorithmen herauszufinden, für jeden Entwickler äußerst nützlich ist, und wie oben erwähnt, können Lösungen sowohl umfassend als auch in separaten Teilen interessant sein. Und natürlich müssen Sie auf jeden Fall in diese Richtung schauen, wenn sich die Aktivitäten auf die Entwicklung verteilter Systeme und die Datensynchronisation beziehen, auch wenn sie industrielle Standardlösungen verwenden.

Verweise



Wir hoffen, dass das Material für Sie nützlich war. Und wenn Sie einen Kurs belegen möchten , können Sie dies jetzt tun.

All Articles