Implementing the RAFT Consensus Algorithm for Distributed KV Storage in Java

Hello again. A few days ago, training began in a new group on the course “Software Architect” , and today we would like to share an article written by one of the students of the course, Anton Pleshakov (head of development at Program Logistics and co-founder at Clusterra).




Currently, distributed microservice systems have become virtually the industry standard, and not only in the enterprise world. The benefits of using distributed systems have been described and discussed more than once. The advantages of microservices have long been known to everyone: technologies for the task, composability, scalability, development scaling, TTM reduction, and so on. It is obvious that the development of distributed applications provides more options for timely response to growing business demands and digitalization of everything around.

It is also important to note that at the moment a very important factor affecting the choice of a development strategy in favor of microservices is the availability of all kinds of ready-made infrastructure solutions that take on the solution of problems associated with the additional costs of operating a distributed system. We are talking about container orchestration systems, service mash, means of distributed tracing, monitoring, logging and so on. It can be safely stated that most of the factors previously mentioned as the minuses of the microservice approach today do not have as much influence as a couple of years ago.

Based on modern realities, most developers seek at the first opportunity to switch from a monolithic structure to a microservice one. One of the first steps that can be taken without resorting to total refactoring and serious decomposition is to achieve a horizontal scalability system. That is, to turn your monolithic application into a cluster, possibly even consisting of the same monoliths, but allowing you to dynamically vary their number.

When trying to achieve horizontal scalability, the question of data synchronization within a cluster very quickly and very acutely arises. Fortunately, all modern DBMSs support data replication between nodes in one way or another. The developer just needs to select the DBMS for the task and decide what properties of the system (according to the CAP theorem) he needs, CP or AP, and the issue is resolved. In the case when CP is required and the requirements for consistency are high, one of the methods to solve the data synchronization problem is to use a cluster that supports the RAFT consensus algorithm.

This rather new algorithm (was developed in 2012) gives a high guarantee of consistency and is very popular. I decided to figure out how it works, and wrote my implementation of a consistent key-value repository in Java (Spring Boot).

Does it make sense to implement any distributed algorithm yourself? It is clear that you can take a ready-made implementation of a distributed algorithm, and with the highest degree of probability this implementation will be better than a home-made “bicycle”. For example, you can use a DBMS that maintains the required level of consistency. Or you can deploy Zookeeper . Or you can find a framework suitable for your language. For java, there is Atomix , which perfectly solves the problems of synchronizing distributed data.

But on the other side. If you take a turnkey solution, then using an external application is usually adding an additional point of failure to your system. And frameworks can be redundant or difficult to operate and learn, or they may not exist at all for your programming language. In addition, the independent implementation of the consensus algorithm is an extremely interesting engineering task that broadens your horizons and gives you an understanding of how to solve the problems that arise when services interact in a cluster using the more optimal method.

Since the specification of the algorithm contains a set of measures to maintain data integrity, you can use the acquired knowledge and even use the algorithm in its entirety. Any part of the algorithm can be useful in real life. Suppose you have a set of workers for parsing files in parallel. Workers are equivalent, but you want to designate one of the workers as a coordinator, and when the coordinating worker falls, assign any other free worker as coordinator. The first half of the RAFT algorithm, which describes how to choose a leader among equivalent nodes, will help you with this. Or, for example, if you have only two nodes in relation to master-slave, you can very well use the replication rules described in the RAFT specification for organizing data exchange in your simpler case.

The article is essentially a practical guide on how to implement RAFT yourself. The algorithm itself and the theoretical aspects of its work will not be understood. You can read a brief description here in this excellent article or study the full specification here . There you can find a very clear visualization of the algorithm.

General Solution Description


The part of the code that is directly related to the implementation of the algorithm is analyzed in the article. At the end of the article there is a link to the repository, where you can see the whole code.

The task was as follows. Develop a distributed system that allows you to store data in a key-value database. The data of each node must be consistent, namely, if the data fell into the database of one node and most of the nodes confirmed that they also received this data, then sooner or later this data will be in the database of each node. When a part of the cluster is disconnected and when it is connected back, the nodes that were outside the cluster must catch up with the main cluster and synchronize. Each node provides a REST API for writing and reading database data. The system consists of two modules for two types of nodes: client and server. Below we consider the features of the implementation of the server itself. The client code is in the repository.

A server node can operate in three states:

  • Follower (follower). Accepts read requests from the client. Takes a heartbeat from the leader
  • Candidate (candidate). Accepts read requests from the client. Sends vote requests to other nodes
  • Leader Accepts read and write requests. Sends heartbeat requests to other nodes. Sends append requests data to other nodes.

The period of “leadership” of one of the nodes is called the round (term). A new candidate opens a new round.

Data storage


Each node provides access to the repository of the operation log, in which operations for changing data are sequentially recorded.

https://github.com/pleshakoff/raft/blob/master/server/src/main/java/com/raft/server/operations/OperationsLog.java


public interface OperationsLog {
   void append(Operation operation);
   Operation get(Integer index);
   List<Operation> all();

   Long getTerm(Integer index);
   Integer getLastIndex();
   Long getLastTerm();

   void removeAllFromIndex(int newOperationIndex);
}

Each operation, in addition to data and type (insert, change, delete), contains the number of the round in which it was created. In addition, each operation has an index that increases sequentially. It is important that all operations are inserted into the logs of followers in the same order in which they are inserted into the leader’s log.

Each node has access to a database in which data is stored directly.

https://github.com/pleshakoff/raft/blob/master/server/src/main/java/com/raft/server/storage/Storage.java

public interface Storage {
   List<Entry> all();
   String get(Long key);
   void insert(Long key, String val);
   void update(Long key, String val);
   void delete(Long key);
}

In the current implementation, embedded in-memory solutions are used both for the log and for the database (ordinary competitive List and Map). If necessary, you can simply implement the appropriate interface to support other types of storage.

The application of operations from the log to the database is carried out by a distributed state machine. A state machine is such a mechanism that is responsible for changing the state of a cluster by restricting the use of incorrect changes (out-of-order operations or a disconnected node that considers itself a leader). In order for the changes to be considered valid and in order for them to be applied to the database, they must pass a series of checks and meet certain criteria, which is exactly what the state machine provides.

For a leader, an operation is applied to the database if most of the nodes have confirmed the fact that the operation is replicated to their log too. For a follower, the operation is applied to the database if a signal is received from the leader that she got into his database.

Timers


Each node provides data exchange with other nodes.

Two types of queries are supported:

  • vote when conducting a round of voting
  • append, aka heartbeat (if without data), to replicate log data to followers and to prevent the start of a new round of voting.

The fact of the onset of an event is determined by the timer. Two types of timers are launched on the node:

  • vote. To start a round of voting. Each node has its own interval, after which it will try to start a new vote. The countdown starts anew when receiving a heartbeat from the leader.
  • heartbeat. To send a request to followers by the append leader. If the node does not receive a heartbeat and the voting timer has expired, it becomes a candidate and initiates elections, increases the number of the voting round and sends out voting requests to other nodes. If the node collects the majority of votes, then it becomes the leader and starts sending out heartbeat.

The current state of the node


Each node stores data about the current state.

https://github.com/pleshakoff/raft/blob/master/server/src/main/java/com/raft/server/context/Context.java

public interface Context {
   Integer getId(); //    
   State getState();//: , ,  
   Integer getVotedFor(); 
               //          
   Long getCurrentTerm(); //  
   Integer getCommitIndex(); //    
   List<Peer> getPeers(); //      
}

A leader node also stores metadata for the nodes to which it replicates data.

https://github.com/pleshakoff/raft/blob/master/server/src/main/java/com/raft/server/node/peers/Peer.java

public interface Peer {
   Integer getId(); //  
   Integer getNextIndex(); //  ,    
   Integer getMatchIndex();//   
   Boolean getVoteGranted(); //     
}

Node metadata is updated by the leader when receiving responses from followers. They are used to determine by the leader which next index operation the follower is ready to accept and which operations have already been added to the follower's log.

Voting


The ElectionService class is responsible for voting

public interface ElectionService {
   void processElection();
   AnswerVoteDTO vote(RequestVoteDTO requestVoteDTO);
} 

Sending a request for voting


If the node is a follower and does not receive a heartbeat for the period set for the wait, then it increases its current round, declares itself a candidate and begins to send vote requests to other nodes. If he manages to gather a quorum and most of the nodes cast his vote, he will become the new leader. In RAFT terms, quorum is more than half of all nodes (51%).

Let's analyze the processElectionclass method ElectionServiceImpl, which is called by the vote-timer when the vote expires and sends the nodes a request for voting .

//1
context.setState(CANDIDATE); 
Long term = context.incCurrentTerm(); 
context.setVotedFor(context.getId()); 

List<Integer> peersIds = context.getPeers().stream().map(Peer::getId).collect(Collectors.toList());
long voteGrantedCount = 1L;
long voteRevokedCount = 0L;

//2
while (checkCurrentElectionStatus(term)) {
   List<AnswerVoteDTO> answers = getVoteFromAllPeers(term, peersIds);
   peersIds = new ArrayList<>();
   for (AnswerVoteDTO answer : answers) {
       //3
       if (answer.getStatusCode().equals(OK)) {
           //4
           if (answer.getTerm()>context.getCurrentTerm()) {
               context.setTermGreaterThenCurrent(answer.getTerm());
               return;
           }
           if (answer.isVoteGranted()) {
               //5 
               context.getPeer(answer.getId()).setVoteGranted(true);
               voteGrantedCount++;
           } else
               //6 
               voteRevokedCount++;
       } else {
          peersIds.add(answer.getId());
       }
   }
  //7
  if (voteGrantedCount >= context.getQuorum()) {
       winElection(term);
       return;
   } else if (voteRevokedCount >= context.getQuorum()) {
       loseElection(term);
       return;
   } 

  1. Set the status of “Candidate”. Raise the round number and vote for ourselves.
  2. , ( ). - , , heartbeat .
  3. - , . , , -.
  4. , . , heartbeat .
  5. The node voted for us! We increase the number of nodes casting votes for us and fix that this node voted for us.
  6. Voted not for us, we also believe.
  7. If the quorum is collected and the node won the election, we establish the status of “Leader”. Otherwise, we become a follower and wait.

It should also be noted that when a node becomes a leader, the Next Index is set for each node in the list of nodes stored at the leader, which is equal to the last index in the leader’s log plus 1. Starting from this index, the leader will try to update the follower logs. In fact, this index stored by the leader may not correspond to the real index of the follower’s log and the actual value will be obtained only when exchanging data with the follower and will be adjusted. But some starting point is needed .

  private void winElection(Long term) {
       context.setState(LEADER);
       context.getPeers().forEach(peer ->
               peer.setNextIndex(operationsLog.getLastIndex()+1)

       );
   }

Voting request processing


When voting, each node receives a request of the following form from the candidate :

class RequestVoteDTO {
   private final Long term; //     
   private final Integer candidateId; //  
   private final Integer lastLogIndex; //     
   private final Long lastLogTerm; //       
}

Now let's look at the voteclass procedure ElectionServiceImpl, it processes the vote request from the candidate and returns a decision regarding his candidacy for the role of leader.

https://github.com/pleshakoff/raft/blob/eba5ea1984e2623702f4c299cf1b0af7a6ba0d14/server/src/main/java/com/raft/server/election/ElectionServiceImpl.java#L178


public AnswerVoteDTO vote(RequestVoteDTO dto) {
   
       boolean termCheck;
       //1
       if (dto.getTerm() < context.getCurrentTerm())
           return new AnswerVoteDTO(context.getId(),context.getCurrentTerm(),false);
       else //2
       if (dto.getTerm().equals(context.getCurrentTerm())) {
           termCheck = (context.getVotedFor() == null||
                          context.getVotedFor().equals(dto.getCandidateId()));
       }
       else
       {   //3
           termCheck = true;
             context.setTermGreaterThenCurrent(dto.getTerm());
       }

       //4  
       boolean logCheck = !((operationsLog.getLastTerm() > dto.getLastLogTerm()) ||
               ((operationsLog.getLastTerm().equals(dto.getLastLogTerm())) &&
                       (operationsLog.getLastIndex() > dto.getLastLogIndex())));


       boolean voteGranted = termCheck&&logCheck;

       //5
       if (voteGranted) {
           context.setVotedFor(dto.getCandidateId());
       }
       //6   
       return new AnswerVoteDTO(context.getId(),context.getCurrentTerm(),voteGranted);
   }

Upon receiving a request from a candidate, the node makes two checks: checks the candidate’s round and the length of his log. If the candidate’s round is higher and its log is longer or equal, then the node gives its node a vote for the candidate

  1. If the current round of the knot is larger than the round of the candidate, we refuse, because this is a request of some lagging knot, which, apparently, was outside the cluster for some time and started the election procedure because it did not see the current leader.
  2. , , , , , , ; . — .
  3. ,
  4. . , , , , .
  5. With a positive outcome, we fix the fact that the node took part in the elections and cast a vote for the candidate.
  6. Send the result back to the candidate

Surely, the conditions could be written somewhat shorter and more elegant, but I left such a more “naive” option so as not to get confused myself and not to confuse anyone.

Replication


The timer leader sends heartbeat followers to all nodes to reset their voting timers. Since the leader stores in his metadata indices of the last operations of all followers, he can evaluate whether sending the operation to nodes is required. If the leader’s operations log becomes longer than the log of any follower, then he, along with heartbeat, sequentially sends him the missing operations. Call it append request. If most nodes confirm receipt of new operations, the leader applies these operations to his database and increases the index of the last applied operation. This index is also sent to followers along with a heartbeat request. And if the leader index is higher than the follower index, then the follower also applies operations to its database in order to equalize the indices.

This kind of append request the leader sends to the follower

class RequestAppendDTO {
   private final Long term; //   
   private final Integer leaderId; //   

   private final Integer prevLogIndex;//   
   private final Long prevLogTerm;//   
   private final Integer leaderCommit;//      
   private final Operation operation; //
}

There are implementations in which operations are transferred in batches of several per request. In the current implementation, only one operation can be transmitted per

request. The class responds to sending and processing the heartbeat-append request:

https://github.com/pleshakoff/raft/blob/eba5ea1984e2623702f4c299cf1b0af7a6ba0d14/server/src/main/java/com/raft/server/replication/ReplicationService.java

public interface ReplicationService {
   void appendRequest();
   AnswerAppendDTO append(RequestAppendDTO requestAppendDTO);
}

Submit a data change request


Consider a fragment of a sendAppendForOnePeerclass ReplicationServiceImpl

method. The method is responsible for generating a request to the follower and sending it .

private CompletableFuture<AnswerAppendDTO> sendAppendForOnePeer(Integer id) {
   return CompletableFuture.supplyAsync(() -> {
       try {
           //1
           Peer peer = context.getPeer(id);

           Operation operation;
           Integer prevIndex;
           //2    
           if (peer.getNextIndex() <= operationsLog.getLastIndex()) {
               operation = operationsLog.get(peer.getNextIndex());
               prevIndex = peer.getNextIndex() - 1;
           } else 
           //3  
           {
               operation = null;
               prevIndex = operationsLog.getLastIndex();
           }


           RequestAppendDTO requestAppendDTO = new RequestAppendDTO(
                   context.getCurrentTerm(), //   
                   context.getId(), //  
                   prevIndex,//      
                   operationsLog.getTerm(prevIndex),//  
                   context.getCommitIndex(),
                               //      
                   Operation //
           );

...
/*   http     */
}

  1. Follower metadata
  2. , . ( ), , , , . , , , ,
  3. , , ; , , ,

Next, consider the appendRequestclass method ReplicationServiceImpl, which is responsible for sending the append request and processing the result to all followers.

https://github.com/pleshakoff/raft/blob/eba5ea1984e2623702f4c299cf1b0af7a6ba0d14/server/src/main/java/com/raft/server/replication/ReplicationServiceImpl.java#L109

public void appendRequest() {
       List<Integer> peersIds = context.getPeers().stream().map(Peer::getId).collect(Collectors.toList());

       //1 
       while (peersIds.size() > 0) {
           //2 
           List<AnswerAppendDTO> answers = sendAppendToAllPeers(peersIds);
           peersIds = new ArrayList<>();
           for (AnswerAppendDTO answer : answers) {
               //3
               if (answer.getStatusCode().equals(OK)) {
                   //4
                   if (answer.getTerm() > context.getCurrentTerm()) {
                        context.setTermGreaterThenCurrent(answer.getTerm());
                       return;
                   }
                   Peer peer = context.getPeer(answer.getId());
                   //5     
                   if (answer.getSuccess()) {                      
                       peer.setNextIndex(answer.getMatchIndex() + 1);
                       peer.setMatchIndex(answer.getMatchIndex());
                       if (peer.getNextIndex() <= operationsLog.getLastIndex())
                           peersIds.add(answer.getId());
                   //6      
                   } else {
                       peer.decNextIndex();
                       peersIds.add(answer.getId());
                   }
               }
           }
           //7
           tryToCommit();
       }
}

  1. We repeat the request until we receive a response from all followers that the replication was successful. Since one operation is sent per request, it may take several iterations to synchronize the logs of followers
  2. Send requests to all followers and get a list with answers
  3. We consider answers only from available followers
  4. If it turns out that the round of one of the followers is more than the leader’s round, we stop everything and turn into a follower
  5. If the follower replied that everything was successful, we update the follower metadata: we save the last index of the follower’s log and the index of the next operation expected by the follower.
  6. , , , , . , , . , . , .
  7. , . .


Now let's see how exactly the follower processes the append request from the leader. Class
methodappendReplicationServiceImpl

public AnswerAppendDTO append(RequestAppendDTO dto) {
     
       //1     
       if (dto.getTerm() < context.getCurrentTerm()) {
           return new AnswerAppendDTO(context.getId(),context.getCurrentTerm(),false, null);
       } else if (dto.getTerm() > context.getCurrentTerm()) {
           //2 
           context.setCurrentTerm(dto.getTerm());
           context.setVotedFor(null);
       }
       //3  
       applicationEventPublisher.publishEvent(new ResetElectionTimerEvent(this));

       if (!context.getState().equals(FOLLOWER)) {
           context.setState(FOLLOWER);
       }
        
       //4  
       if ((dto.getPrevLogIndex() > operationsLog.getLastIndex()) ||                                                                                        !dto.getPrevLogTerm().equals(operationsLog.getTerm(dto.getPrevLogIndex()))) {
                      return new AnswerAppendDTO(context.getId(), context.getCurrentTerm(), false, null);
       }


       Operation newOperation = dto.getOperation();
       if (newOperation != null) {
           int newOperationIndex = dto.getPrevLogIndex() + 1;
           
         synchronized (this) {
               //5
               if ((newOperationIndex <= operationsLog.getLastIndex()) &&
                      (!newOperation.getTerm().equals(operationsLog.getTerm(newOperationIndex)))){
                   operationsLog.removeAllFromIndex(newOperationIndex);
               }
               //6
               if (newOperationIndex <= operationsLog.getLastIndex())
               {
                 return new AnswerAppendDTO(context.getId(), context.getCurrentTerm(), true,      operationsLog.getLastIndex());
               }
               //7
               operationsLog.append(newOperation);
           }
        }
        //8 
        if (dto.getLeaderCommit() > context.getCommitIndex()) {
           context.setCommitIndex(Math.min(dto.getLeaderCommit(), operationsLog.getLastIndex()));
       }

                 
       return new AnswerAppendDTO(context.getId(), context.getCurrentTerm(), true, operationsLog.getLastIndex());
   }

  1. If the leader’s round is less than the follower’s round, then we send our leader a round and a sign that his request has been rejected. As soon as the leader receives a round larger than his in response, he will turn into a follower
  2. If the leader’s round is more than the follower’s round, set this round to the follower.
  3. Since the request was received from the leader, regardless of whether there is data there or not, we reset the vote timer and, if we were not a follower, we become it
  4. , , , , , , . , ,
  5. , . . , , - , , , , . , .
  6. , . ,
  7. ,
  8. , , , .


It remains only to figure out how the leader applies operations from the log to the database. In the process of sending operations to followers and processing responses from them, the leader updates the metadata of the nodes. As soon as the number of nodes whose index of the last operation in the log is greater than the index of the last operation applied to the database by the leader becomes equal to the quorum, we can state that most nodes received the operation and we can apply it to the leader database. In other words, if a leader sent an operation to followers and most of them inserted it into his log and answered the leader, then we can apply this operation to the leader’s database and increase the index of the last operation applied. This index with the next append-heartbeat request will fly to the follower and it will apply the operation with the same index from its log to its database.

Let's analyze the tryToCommitclass methodReplicationServiceImpl

  private void tryToCommit() {
       while (true) {
           //1
           int N = context.getCommitIndex() + 1;
           //2
           Supplier<Long> count = () ->
               context.getPeers().stream().map(Peer::getMatchIndex).
                       filter(matchIndex -> matchIndex >= N).count() + 1;

           //3 
           if (operationsLog.getLastIndex() >= N &&
                   operationsLog.getTerm(N).equals(context.getCurrentTerm())&&
                      count.get()>=context.getQuorum()
           )
           {
               context.setCommitIndex(N);
           } else
               return;
       }
   }

  1. We get the following index of the operation applied to the database
  2. We count how many followers have an operation with such an index in their logs, and do not forget to add a leader
  3. If the number of such followers is quorum and the operation with such an index is in the leader’s log, and the round of this operation is equivalent to the current one, then the leader applies the operation to the database and increases the index of the last applied operation. Operations from the previous round cannot be applied, because another leader was responsible for them and a conflict could arise. Each leader applies operations only of his current round.

Conclusion


Any distributed algorithm, the representative of the family of which is RAFT, is a powerful integrated solution that guarantees the achievement of the result, subject to all the rules described in the specification.

There are many distributed algorithms and they are different. There is ZAB, which is implemented in Zookeeper and is used, for example, to synchronize data in Kafka. There are algorithms with less stringent requirements for consistency, for example, the mass of implementations of the Gossip protocol that are used in AP systems. There are algorithms that follow the principles of RAFT, and at the same time use the gossip protocol for exchanging logs such as MOKKA, which also uses encryption.

I believe that trying to figure out any of these algorithms is extremely useful for any developer, and as I mentioned above, solutions can be interesting both comprehensively and in separate parts. And obviously, you definitely need to look in this direction to those whose activities are related to the development of distributed systems and regarding data synchronization issues, even if they use standard industrial solutions.

References



We hope the material was useful to you. And if you want to catch a course , you can do it right now.

All Articles