用Java实现分布式KV存储的RAFT共识算法。

再一次问好。几天前,一个新的小组开始了关于“软件架构师”课程的培训,今天,我们想分享一个由该课程的学生之一Anton Pleshakov(Program Logistics开发负责人和Clusterra联合创始人)撰写的文章。




当前,分布式微服务系统已经实际上已经成为行业标准,不仅在企业界。已经多次描述和讨论了使用分布式系统的好处。微服务的优势早已为大家所熟知:任务技术,可组合性,可伸缩性,开发扩展,TTM缩减等。显然,分布式应用程序的开发提供了更多选择,以便及时响应不断增长的业务需求和周围所有事物的数字化。

还需要注意的是,目前影响选择支持微服务的开发策略的一个非常重要的因素是各种现成的基础架构解决方案的可用性,这些解决方案解决了与操作分布式系统的额外成本相关的问题。我们正在谈论容器编排系统,服务mash,分布式跟踪,监视,日志记录等方法。可以肯定地说,今天提到的大多数今天作为微服务方法缺点的因素并没有几年前那么大的影响。

基于现代现实,大多数开发人员都在第一时间寻求从单一结构转换为微服务结构的机会。在不诉诸总重构和严重分解的情况下可以采取的第一步是实现水平可伸缩性系统。也就是说,将您的整体应用程序变成一个集群,甚至可能由相同的整体组件组成,但允许您动态更改其数量。

当试图实现水平可伸缩性时,群集内的数据同步问题非常迅速且非常尖锐。幸运的是,所有现代DBMS都以一种或另一种方式支持节点之间的数据复制。开发人员只需选择用于任务的DBMS并根据CP或AP决定他需要的系统属性(根据CAP定理),CP或AP,问题就可以解决。在需要CP且一致性要求很高的情况下,解决数据同步问题的方法之一是使用支持RAFT共识算法的集群。

这种相当新的算法(2012年开发)提供了高度的一致性保证,并且非常受欢迎。我决定弄清楚它是如何工作的,并用Java(Spring Boot)编写了一个一致的键值存储库的实现。

自己实现任何分布式算法是否有意义?显然,您可以采用分布式算法的现成实现,并且以最高的概率实现比自制“自行车”更好。例如,您可以使用维护所需一致性级别的DBMS。或者,您可以部署Zookeeper。或者,您可以找到适合您的语言的框架。对于Java,有Atomix,它可以完美解决同步分布式数据的问题。

但另一方面。如果采用交钥匙解决方案,则通常使用外部应用程序会给系统增加一个额外的故障点。框架可能是多余的,或者难以操作和学习,或者对于您的编程语言而言,它们可能根本不存在。此外,共识算法的独立实现是一项非常有趣的工程任务,它可以拓宽您的视野,并使您了解如何使用更优化的方法解决服务在集群中交互时出现的问题。

由于算法规范包含一组维护数据完整性的措施,因此您可以使用所获得的知识,甚至可以完整地使用算法。该算法的任何部分在现实生活中都可能有用。假设您有一组用于并行解析文件的工作程序。工人是等价的,但是您想指定其中一位工人作为协调员,当协调员跌倒时,请指派任何其他自由工人作为协调员。 RAFT算法的前半部分描述了如何在等效节点之间选择一个领导者,将为您提供帮助。例如,如果您只有两个与主从节点有关的节点,那么您可以很好地使用RAFT规范中描述的复制规则,以便在较简单的情况下组织数据交换。

本文本质上是有关如何自己实施RAFT的实用指南。该算法本身及其工作的理论方面将不被理解。您可以在这篇出色的文章中阅读简要说明,或在此处学习完整的规范在那里,您可以找到算法的非常清晰的可视化。

通用解决方案说明


本文分析了与算法实现直接相关的那部分代码。本文结尾处有一个到存储库的链接,您可以在其中查看整个代码。

任务如下。开发一个分布式系统,使您可以将数据存储在键值数据库中。每个节点的数据必须一致,即,如果数据进入一个节点的数据库,并且大多数节点确认它们也已接收到该数据,则该数据迟早会在每个节点的数据库中。当集群的一部分断开连接并重新连接时,集群外部的节点必须赶上主集群并进行同步。每个节点都提供用于写入和读取数据库数据的REST API。该系统由用于两种类型的节点的两个模块组成:客户端和服务器。下面我们考虑服务器本身实现的功能。客户端代码在存储库中。

服务器节点可以在三种状态下运行:

  • 追随者(追随者)。接受来自客户端的读取请求。得到领导者的心跳
  • 候选人(候选人)。接受来自客户端的读取请求。将投票请求发送到其他节点
  • 领导 接受读写请求。将心跳请求发送到其他节点。将附加请求数据发送到其他节点。

节点之一的“领导”期间称为回合(任期)。新的候选人开始新一轮。

数据存储


每个节点都提供对操作日志存储库的访问,其中顺序记录了用于更改数据的操作。

https://github.com/pleshakoff/raft/blob/master/server/src/main/java/com/raft/server/operations/OperationsLog.java


public interface OperationsLog {
   void append(Operation operation);
   Operation get(Integer index);
   List<Operation> all();

   Long getTerm(Integer index);
   Integer getLastIndex();
   Long getLastTerm();

   void removeAllFromIndex(int newOperationIndex);
}

除数据和类型(插入,更改,删除)外,每个操作还包含创建操作的回合编号。此外,每个操作的索引都按顺序增加。重要的是,所有操作均应按照插入领导者日志的顺序插入跟随者的日志。

每个节点都可以访问直接存储数据的数据库。

https://github.com/pleshakoff/raft/blob/master/server/src/main/java/com/raft/server/storage/Storage.java

public interface Storage {
   List<Entry> all();
   String get(Long key);
   void insert(Long key, String val);
   void update(Long key, String val);
   void delete(Long key);
}

在当前的实现中,嵌入式内存解决方案既用于日志又用于数据库(普通竞争列表和地图)。如有必要,您可以简单地实现适当的接口以支持其他类型的存储。

从日志到数据库的操作应用是由分布式状态机执行的。状态机是一种机制,负责更改群集的状态,限制使用不正确的更改(乱序操作或认为自己是领导者的断开节点)。为了使更改被视为有效并将更改应用到数据库,它们必须通过一系列检查并满足某些条件,这正是状态机提供的条件。

对于领导者,如果大多数节点都已确认该操作也已复制到其日志中,那么该操作将应用于数据库。对于跟随者,如果从领导者接收到进入其数据库的信号,则该操作将应用于数据库。

计时器


每个节点提供与其他节点的数据交换。

支持两种查询:

  • 进行一轮投票时投票
  • 附加(也称为心跳)(如果没有数据)将日志数据复制到关注者,并防止开始新一轮投票。

事件发生的事实由计时器确定。在节点上启动两种类型的计时器:

  • 投票。开始一轮投票。每个节点都有其自己的间隔,在此间隔之后,它将尝试开始新的投票。收到领导者的心跳后,倒数重新开始。
  • 心跳。由追加领导者向关注者发送请求。如果该节点未收到心跳并且投票计时器已过期,则它将成为候选者并发起选举,增加投票回合的数量并将投票请求发送到其他节点。如果节点收集了多数票,则它将成为领导者并开始发出心跳信号。

节点的当前状态


每个节点都存储有关当前状态的数据。

https://github.com/pleshakoff/raft/blob/master/server/src/main/java/com/raft/server/context/Context.java

public interface Context {
   Integer getId(); //    
   State getState();//: , ,  
   Integer getVotedFor(); 
               //          
   Long getCurrentTerm(); //  
   Integer getCommitIndex(); //    
   List<Peer> getPeers(); //      
}

领导节点还存储其向其复制数据的节点的元数据。

https://github.com/pleshakoff/raft/blob/master/server/src/main/java/com/raft/server/node/peers/Peer.java

public interface Peer {
   Integer getId(); //  
   Integer getNextIndex(); //  ,    
   Integer getMatchIndex();//   
   Boolean getVoteGranted(); //     
}

领导者在收到来自跟随者的响应时,会更新节点元数据。领导者使用它们来确定跟随者准备接受的下一个索引操作以及已经将哪些操作添加到跟随者的日志中。

表决


ElectionService 类负责投票

public interface ElectionService {
   void processElection();
   AnswerVoteDTO vote(RequestVoteDTO requestVoteDTO);
} 

发送投票请求


如果该节点是跟随者,并且在为等待设置的时间内未接收到心跳,则它将增加其当前回合,声明自己为候选者并开始向其他节点发送投票请求。如果他设法达到法定人数,并且大多数节点都投了赞成票,那么他将成为新的领导人。用RAFT术语来说,仲裁占所有节点的一半以上(51%)。

让我们分析一下processElectionclass 方法ElectionServiceImpl,当投票到期时,投票计时器会调用它,并向节点发送投票请求

//1
context.setState(CANDIDATE); 
Long term = context.incCurrentTerm(); 
context.setVotedFor(context.getId()); 

List<Integer> peersIds = context.getPeers().stream().map(Peer::getId).collect(Collectors.toList());
long voteGrantedCount = 1L;
long voteRevokedCount = 0L;

//2
while (checkCurrentElectionStatus(term)) {
   List<AnswerVoteDTO> answers = getVoteFromAllPeers(term, peersIds);
   peersIds = new ArrayList<>();
   for (AnswerVoteDTO answer : answers) {
       //3
       if (answer.getStatusCode().equals(OK)) {
           //4
           if (answer.getTerm()>context.getCurrentTerm()) {
               context.setTermGreaterThenCurrent(answer.getTerm());
               return;
           }
           if (answer.isVoteGranted()) {
               //5 
               context.getPeer(answer.getId()).setVoteGranted(true);
               voteGrantedCount++;
           } else
               //6 
               voteRevokedCount++;
       } else {
          peersIds.add(answer.getId());
       }
   }
  //7
  if (voteGrantedCount >= context.getQuorum()) {
       winElection(term);
       return;
   } else if (voteRevokedCount >= context.getQuorum()) {
       loseElection(term);
       return;
   } 

  1. 设置状态为“候选”。提高轮数并为自己投票。
  2. , ( ). - , , heartbeat .
  3. - , . , , -.
  4. , . , heartbeat .
  5. 节点投票给我们!我们增加了为我们投票的节点数量,并确定该节点为我们投票。
  6. 我们也相信不投票给我们。
  7. 如果达到了法定人数,并且节点赢得了选举,我们将建立“领导者”状态。否则,我们将成为追随者并等待。

还应注意,当节点成为领导者时,将为领导者存储的节点列表中的每个节点设置下一个索引,该索引等于领导者日志中的最后一个索引加1。从该索引开始,领导者将尝试更新跟随者日志。实际上,领导者存储的该索引可能与追随者日志的真实索引不符,只有与追随者交换数据时才会获取实际值并进行调整。但是需要一些起点

  private void winElection(Long term) {
       context.setState(LEADER);
       context.getPeers().forEach(peer ->
               peer.setNextIndex(operationsLog.getLastIndex()+1)

       );
   }

投票请求处理


投票时,每个节点都会收到来自候选人以下形式请求

class RequestVoteDTO {
   private final Long term; //     
   private final Integer candidateId; //  
   private final Integer lastLogIndex; //     
   private final Long lastLogTerm; //       
}

现在让我们看一下vote课程程序ElectionServiceImpl,它处理候选人的投票请求,并返回有关其候选人担任领导职务的决定。

https://github.com/pleshakoff/raft/blob/eba5ea1984e2623702f4c299cf1b0af7a6ba0d14/server/src/main/java/com/raft/server/election/ElectionServiceImpl.java#L178


public AnswerVoteDTO vote(RequestVoteDTO dto) {
   
       boolean termCheck;
       //1
       if (dto.getTerm() < context.getCurrentTerm())
           return new AnswerVoteDTO(context.getId(),context.getCurrentTerm(),false);
       else //2
       if (dto.getTerm().equals(context.getCurrentTerm())) {
           termCheck = (context.getVotedFor() == null||
                          context.getVotedFor().equals(dto.getCandidateId()));
       }
       else
       {   //3
           termCheck = true;
             context.setTermGreaterThenCurrent(dto.getTerm());
       }

       //4  
       boolean logCheck = !((operationsLog.getLastTerm() > dto.getLastLogTerm()) ||
               ((operationsLog.getLastTerm().equals(dto.getLastLogTerm())) &&
                       (operationsLog.getLastIndex() > dto.getLastLogIndex())));


       boolean voteGranted = termCheck&&logCheck;

       //5
       if (voteGranted) {
           context.setVotedFor(dto.getCandidateId());
       }
       //6   
       return new AnswerVoteDTO(context.getId(),context.getCurrentTerm(),voteGranted);
   }

收到候选人的请求后,节点会进行两项检查:检查候选人的回合和他的日志长度。如果候选人的回合较高,并且其对数更长或相等,则该节点为其候选人投票给其节点

  1. 如果当前回合的结局大于候选人的回合,我们将拒绝,因为这是一些滞后结的请求,显然是在集群之外一段时间了,并开始选举程序,因为它没有看到现任领导人。
  2. , , , , , , ; . — .
  3. ,
  4. . , , , , .
  5. 有了积极的结果,我们修复了该节点参加选举并为候选人投票的事实。
  6. 将结果发回考生

当然,条件可以写得更短,更优雅,但是我留下了一个更“天真”的选择,以免使自己困惑,也不会使任何人感到困惑。

复写


计时器领导者将心跳跟随者发送到所有节点以重置其投票计时器。由于领导者在他的元数据中存储了所有跟随者的最后操作的索引,因此他可以评估是否需要将操作发送到节点。如果领导者的操作日志变得比任何跟随者的日志都要长,那么他将与心跳一道,依次向他发送缺少的操作。称其为追加请求。如果大多数节点确认收到新操作,则领导者将这些操作应用于他的数据库,并增加上一次应用的操作的索引。该索引也随心跳请求一起发送给关注者。并且,如果领导者索引高于跟随者索引,那么跟随者还将对其数据库应用操作以使索引相等。

领导者发送给跟随者的这种追加请求

class RequestAppendDTO {
   private final Long term; //   
   private final Integer leaderId; //   

   private final Integer prevLogIndex;//   
   private final Long prevLogTerm;//   
   private final Integer leaderCommit;//      
   private final Operation operation; //
}

在某些实现中,每个请求分批转移操作。在当前的实现中,每个

请求只能发送一个操作,该类响应心跳附加请求的发送和处理:

https://github.com/pleshakoff/raft/blob/eba5ea1984e2623702f4c299cf1b0af7a6ba0d14/server/src/main/java/com/raft/server/replication/ReplicationService.java

public interface ReplicationService {
   void appendRequest();
   AnswerAppendDTO append(RequestAppendDTO requestAppendDTO);
}

提交数据更改请求


考虑一个sendAppendForOnePeerReplicationServiceImpl

方法的一个片段,该方法负责产生一个请求给跟随者并发送给它

private CompletableFuture<AnswerAppendDTO> sendAppendForOnePeer(Integer id) {
   return CompletableFuture.supplyAsync(() -> {
       try {
           //1
           Peer peer = context.getPeer(id);

           Operation operation;
           Integer prevIndex;
           //2    
           if (peer.getNextIndex() <= operationsLog.getLastIndex()) {
               operation = operationsLog.get(peer.getNextIndex());
               prevIndex = peer.getNextIndex() - 1;
           } else 
           //3  
           {
               operation = null;
               prevIndex = operationsLog.getLastIndex();
           }


           RequestAppendDTO requestAppendDTO = new RequestAppendDTO(
                   context.getCurrentTerm(), //   
                   context.getId(), //  
                   prevIndex,//      
                   operationsLog.getTerm(prevIndex),//  
                   context.getCommitIndex(),
                               //      
                   Operation //
           );

...
/*   http     */
}

  1. 追随者元数据
  2. , . ( ), , , , . , , , ,
  3. , , ; , , ,

接下来,考虑appendRequest方法ReplicationServiceImpl,该方法负责发送附加请求并将结果处理给所有关注者。

https://github.com/pleshakoff/raft/blob/eba5ea1984e2623702f4c299cf1b0af7a6ba0d14/server/src/main/java/com/raft/server/replication/ReplicationServiceImpl.java#L109

public void appendRequest() {
       List<Integer> peersIds = context.getPeers().stream().map(Peer::getId).collect(Collectors.toList());

       //1 
       while (peersIds.size() > 0) {
           //2 
           List<AnswerAppendDTO> answers = sendAppendToAllPeers(peersIds);
           peersIds = new ArrayList<>();
           for (AnswerAppendDTO answer : answers) {
               //3
               if (answer.getStatusCode().equals(OK)) {
                   //4
                   if (answer.getTerm() > context.getCurrentTerm()) {
                        context.setTermGreaterThenCurrent(answer.getTerm());
                       return;
                   }
                   Peer peer = context.getPeer(answer.getId());
                   //5     
                   if (answer.getSuccess()) {                      
                       peer.setNextIndex(answer.getMatchIndex() + 1);
                       peer.setMatchIndex(answer.getMatchIndex());
                       if (peer.getNextIndex() <= operationsLog.getLastIndex())
                           peersIds.add(answer.getId());
                   //6      
                   } else {
                       peer.decNextIndex();
                       peersIds.add(answer.getId());
                   }
               }
           }
           //7
           tryToCommit();
       }
}

  1. 我们重复该请求,直到收到所有关注者的复制成功的响应。由于每个请求都会发送一个操作,因此可能需要进行多次迭代才能同步关注者的日志
  2. 向所有关注者发送请求并获取答案列表
  3. 我们仅考虑来自可用关注者的答案
  4. 如果事实证明某个跟随者的回合超过了领导者的回合,我们将停止一切并变成跟随者
  5. 如果关注者回答一切都成功,则我们将更新关注者元数据:我们将保存关注者日志的最后一个索引以及关注者期望的下一个操作的索引。
  6. , , , , . , , . , . , .
  7. , . .


现在,让我们看看跟随者如何精确地处理领导者的追加请求。
方法appendReplicationServiceImpl

public AnswerAppendDTO append(RequestAppendDTO dto) {
     
       //1     
       if (dto.getTerm() < context.getCurrentTerm()) {
           return new AnswerAppendDTO(context.getId(),context.getCurrentTerm(),false, null);
       } else if (dto.getTerm() > context.getCurrentTerm()) {
           //2 
           context.setCurrentTerm(dto.getTerm());
           context.setVotedFor(null);
       }
       //3  
       applicationEventPublisher.publishEvent(new ResetElectionTimerEvent(this));

       if (!context.getState().equals(FOLLOWER)) {
           context.setState(FOLLOWER);
       }
        
       //4  
       if ((dto.getPrevLogIndex() > operationsLog.getLastIndex()) ||                                                                                        !dto.getPrevLogTerm().equals(operationsLog.getTerm(dto.getPrevLogIndex()))) {
                      return new AnswerAppendDTO(context.getId(), context.getCurrentTerm(), false, null);
       }


       Operation newOperation = dto.getOperation();
       if (newOperation != null) {
           int newOperationIndex = dto.getPrevLogIndex() + 1;
           
         synchronized (this) {
               //5
               if ((newOperationIndex <= operationsLog.getLastIndex()) &&
                      (!newOperation.getTerm().equals(operationsLog.getTerm(newOperationIndex)))){
                   operationsLog.removeAllFromIndex(newOperationIndex);
               }
               //6
               if (newOperationIndex <= operationsLog.getLastIndex())
               {
                 return new AnswerAppendDTO(context.getId(), context.getCurrentTerm(), true,      operationsLog.getLastIndex());
               }
               //7
               operationsLog.append(newOperation);
           }
        }
        //8 
        if (dto.getLeaderCommit() > context.getCommitIndex()) {
           context.setCommitIndex(Math.min(dto.getLeaderCommit(), operationsLog.getLastIndex()));
       }

                 
       return new AnswerAppendDTO(context.getId(), context.getCurrentTerm(), true, operationsLog.getLastIndex());
   }

  1. 如果领导者的回合少于跟随者的回合,则我们向领导发送一回合,并发出拒绝其要求的标志。领导者收到一轮大于其回应的回合后,就会变成跟随者
  2. 如果领导者的回合大于关注者的回合,请将此回合设置为关注者。
  3. 由于是从领导者那里收到请求的,所以不管那里是否有数据,我们都会重置投票计时器,如果我们不是追随者,那么我们将成为投票者
  4. , , , , , , . , ,
  5. , . . , , - , , , , . , .
  6. , . ,
  7. ,
  8. , , , .


仅需弄清楚领导者如何将日志中的操作应用于数据库。在将操作发送给跟随者并处理来自跟随者的响应的过程中,领导者更新节点的元数据。一旦日志中最后一个操作的索引大于领导者对数据库执行的最后一个操作的索引的节点数等于定额,我们就可以说大多数节点已接收到该操作并将其应用于领导者数据库。换句话说,如果领导者将操作发送给关注者,并且大多数人将其插入他的日志中并回答了领导者,那么我们可以将此操作应用于领导者的数据库并增加上一次应用操作的索引。具有下一个append-heartbeat请求的索引将飞向跟随者,并将从日志中将具有相同索引的操作应用于其数据库。

让我们分析tryToCommit方法ReplicationServiceImpl

  private void tryToCommit() {
       while (true) {
           //1
           int N = context.getCommitIndex() + 1;
           //2
           Supplier<Long> count = () ->
               context.getPeers().stream().map(Peer::getMatchIndex).
                       filter(matchIndex -> matchIndex >= N).count() + 1;

           //3 
           if (operationsLog.getLastIndex() >= N &&
                   operationsLog.getTerm(N).equals(context.getCurrentTerm())&&
                      count.get()>=context.getQuorum()
           )
           {
               context.setCommitIndex(N);
           } else
               return;
       }
   }

  1. 我们得到以下应用于数据库的操作的索引
  2. 我们计算有多少追随者在其日志中使用该索引进行操作,并且不要忘记添加领导者
  3. 如果此类关注者的数量为法定人数,并且具有此类索引的操作位于领导者的日志中,并且此操作的轮次与当前操作相等,则领导者将操作应用于数据库并增加上一次应用的操作的索引。上一轮的操作无法应用,因为另一位领导人对此负责,可能会发生冲突。每个领导者仅应用当前轮次的操作。

结论


任何分布式算法(以该家族的代表为RAFT)都是功能强大的集成解决方案,可以保证在遵循本规范中描述的所有规则的前提下实现结果。

分布式算法有很多,它们是不同的。 ZAB是在Zookeeper中实现的,例如用于在Kafka中同步数据。有些算法对一致性的要求不那么严格,例如,在AP系统中使用的Gossip协议的实现量很大。有些算法遵循RAFT原理,同时使用八卦协议交换日志,例如MOKKA,它也使用加密。

我相信尝试找出这些算法中的任何一种对任何开发人员都非常有用,并且如上所述,解决方案可能会很有趣,无论是全面的还是单独的部分。显然,您绝对需要朝着那些与分布式系统开发以及数据同步问题相关的活动的方向看,即使他们使用标准的工业解决方案。

参考文献



我们希望这些材料对您有用。而且,如果您想学习一门课程,可以立即进行。

All Articles