Menerapkan Algoritma Konsensus RAFT untuk Penyimpanan KV Terdistribusi di Jawa

Halo lagi. Beberapa hari yang lalu, pelatihan dimulai dalam kelompok baru pada kursus "Arsitek Perangkat Lunak" , dan hari ini kami ingin berbagi artikel yang ditulis oleh salah satu siswa kursus, Anton Pleshakov (kepala pengembangan di Program Logistik dan salah satu pendiri di Clusterra).




Saat ini, sistem microservice terdistribusi telah menjadi standar industri, dan tidak hanya di dunia usaha. Manfaat menggunakan sistem terdistribusi telah dijelaskan dan dibahas lebih dari satu kali. Keuntungan dari layanan microser telah lama diketahui oleh semua orang: teknologi untuk tugas, kompabilitas, skalabilitas, pengembangan skala, pengurangan TTM, dan sebagainya. Jelas bahwa pengembangan aplikasi terdistribusi menyediakan lebih banyak opsi untuk respons yang tepat waktu terhadap permintaan bisnis yang berkembang dan digitalisasi segala sesuatu di sekitarnya.

Penting juga untuk dicatat bahwa saat ini faktor yang sangat penting yang mempengaruhi pilihan strategi pengembangan yang mendukung layanan-mikro adalah ketersediaan semua jenis solusi infrastruktur siap pakai yang mengambil solusi masalah yang terkait dengan biaya tambahan pengoperasian sistem terdistribusi. Kita berbicara tentang sistem orkestrasi wadah, layanan mash, sarana penelusuran yang terdistribusi, pemantauan, penebangan dan sebagainya. Dapat dengan aman dinyatakan bahwa sebagian besar faktor yang sebelumnya disebutkan sebagai minus dari pendekatan layanan-mikro saat ini tidak memiliki pengaruh sebanyak beberapa tahun yang lalu.

Berdasarkan realitas modern, sebagian besar pengembang mencari pada kesempatan pertama untuk beralih dari struktur monolitik ke struktur mikro. Salah satu langkah pertama yang dapat diambil tanpa menggunakan refactoring total dan dekomposisi serius adalah untuk mencapai sistem skalabilitas horizontal. Yaitu, untuk mengubah aplikasi monolitik Anda menjadi sebuah cluster, bahkan mungkin terdiri dari monolit yang sama, tetapi memungkinkan Anda untuk secara dinamis memvariasikan jumlahnya.

Ketika mencoba mencapai skalabilitas horizontal, pertanyaan tentang sinkronisasi data dalam sebuah cluster sangat cepat dan sangat akut muncul. Untungnya, semua DBMS modern mendukung replikasi data antara node dalam satu atau lain cara. Pengembang hanya perlu memilih DBMS untuk tugas tersebut dan memutuskan sifat sistem apa (sesuai dengan teorema CAP) yang ia butuhkan, CP atau AP, dan masalahnya telah teratasi. Dalam kasus ketika CP diperlukan dan persyaratan untuk konsistensi tinggi, salah satu metode untuk menyelesaikan masalah sinkronisasi data adalah dengan menggunakan cluster yang mendukung algoritma konsensus RAFT.

Algoritma yang agak baru ini (dikembangkan pada 2012) memberikan jaminan konsistensi yang tinggi dan sangat populer. Saya memutuskan untuk mencari tahu cara kerjanya, dan menulis implementasi saya dari repositori kunci-nilai yang konsisten di Java (Spring Boot).

Apakah masuk akal untuk mengimplementasikan algoritma terdistribusi sendiri? Jelas bahwa Anda dapat mengambil implementasi yang sudah jadi dari algoritma terdistribusi, dan dengan tingkat probabilitas tertinggi implementasi ini akan lebih baik daripada "sepeda" buatan sendiri. Misalnya, Anda dapat menggunakan DBMS yang mempertahankan tingkat konsistensi yang diperlukan. Atau Anda bisa menggunakan Zookeeper . Atau Anda dapat menemukan kerangka kerja yang cocok untuk bahasa Anda. Untuk java, ada Atomix , yang dengan sempurna memecahkan masalah sinkronisasi data terdistribusi.

Tetapi di sisi lain. Jika Anda mengambil solusi turnkey, maka menggunakan aplikasi eksternal biasanya menambahkan titik kegagalan tambahan ke sistem Anda. Dan kerangka kerja bisa berlebihan atau sulit untuk dioperasikan dan dipelajari, atau mungkin tidak ada sama sekali untuk bahasa pemrograman Anda. Selain itu, implementasi independen dari algoritma konsensus adalah tugas rekayasa yang sangat menarik yang memperluas wawasan Anda dan memberi Anda pemahaman tentang bagaimana memecahkan masalah yang muncul ketika layanan berinteraksi dalam sebuah cluster menggunakan metode yang lebih optimal.

Karena spesifikasi algoritme berisi serangkaian tindakan untuk menjaga integritas data, Anda dapat menggunakan pengetahuan yang diperoleh dan bahkan menggunakan algoritme secara keseluruhan. Bagian mana pun dari algoritma dapat bermanfaat dalam kehidupan nyata. Misalkan Anda memiliki satu set pekerja untuk mem-parsing file secara paralel. Pekerja adalah setara, tetapi Anda ingin menunjuk salah satu pekerja sebagai koordinator, dan ketika pekerja koordinator jatuh, tetapkan pekerja bebas lainnya sebagai koordinator. Bagian pertama dari algoritma RAFT, yang menjelaskan cara memilih pemimpin di antara node yang setara, akan membantu Anda dalam hal ini. Atau, misalnya, jika Anda hanya memiliki dua node dalam kaitannya dengan master-slave, Anda dapat menggunakan aturan replikasi yang dijelaskan dalam spesifikasi RAFT untuk mengatur pertukaran data dalam kasus yang lebih sederhana.

Artikel ini pada dasarnya adalah panduan praktis tentang bagaimana menerapkan RAFT sendiri. Algoritme itu sendiri dan aspek teoretis dari kerjanya tidak akan dipahami. Anda dapat membaca deskripsi singkat di sini di artikel luar biasa ini atau mempelajari spesifikasi lengkapnya di sini . Di sana Anda dapat menemukan visualisasi algoritma yang sangat jelas.

Deskripsi Solusi Umum


Bagian dari kode yang terkait langsung dengan implementasi algoritma dianalisis dalam artikel. Di akhir artikel ada tautan ke repositori, tempat Anda dapat melihat seluruh kode.

Tugasnya adalah sebagai berikut. Kembangkan sistem terdistribusi yang memungkinkan Anda untuk menyimpan data dalam basis data nilai kunci. Data dari setiap node harus konsisten, yaitu, jika data jatuh ke dalam basis data dari satu simpul dan sebagian besar dari node mengkonfirmasi bahwa mereka juga menerima data ini, maka cepat atau lambat data ini akan berada di basis data setiap node. Ketika bagian dari cluster terputus dan ketika terhubung kembali, node yang berada di luar cluster harus mengejar ketinggalan dengan cluster utama dan menyinkronkan. Setiap node menyediakan REST API untuk menulis dan membaca data basis data. Sistem ini terdiri dari dua modul untuk dua jenis node: klien dan server. Di bawah ini kami mempertimbangkan fitur-fitur dari implementasi server itu sendiri. Kode klien ada di repositori.

Node server dapat beroperasi di tiga negara:

  • Pengikut (pengikut). Menerima permintaan baca dari klien. Mengambil detak jantung dari pemimpin
  • Calon (kandidat). Menerima permintaan baca dari klien. Mengirim permintaan suara ke node lain
  • Pemimpin Menerima permintaan baca dan tulis. Mengirim permintaan detak jantung ke node lain. Mengirimkan menambahkan permintaan data ke node lain.

Periode "kepemimpinan" dari salah satu simpul disebut babak (term). Seorang kandidat baru membuka babak baru.

Penyimpanan data


Setiap node menyediakan akses ke repositori log operasi, di mana operasi untuk mengubah data direkam secara berurutan.

https://github.com/pleshakoff/raft/blob/master/server/src/main/java/com/raft/server/operations/OperationsLog.java


public interface OperationsLog {
   void append(Operation operation);
   Operation get(Integer index);
   List<Operation> all();

   Long getTerm(Integer index);
   Integer getLastIndex();
   Long getLastTerm();

   void removeAllFromIndex(int newOperationIndex);
}

Setiap operasi, selain data dan tipe (masukkan, ubah, hapus), berisi jumlah putaran di mana ia dibuat. Selain itu, setiap operasi memiliki indeks yang meningkat secara berurutan. Adalah penting bahwa semua operasi dimasukkan ke dalam log pengikut dalam urutan yang sama di mana mereka dimasukkan ke dalam log pemimpin.

Setiap node memiliki akses ke database di mana data disimpan secara langsung.

https://github.com/pleshakoff/raft/blob/master/server/src/main/java/com/raft/server/storage/Storage.java

public interface Storage {
   List<Entry> all();
   String get(Long key);
   void insert(Long key, String val);
   void update(Long key, String val);
   void delete(Long key);
}

Dalam implementasi saat ini, solusi tertanam dalam memori digunakan baik untuk log dan untuk database (Daftar dan Peta kompetitif biasa). Jika perlu, Anda cukup mengimplementasikan antarmuka yang sesuai untuk mendukung jenis penyimpanan lain.

Aplikasi operasi dari log ke database dilakukan oleh mesin status terdistribusi. Mesin negara adalah mekanisme yang bertanggung jawab untuk mengubah keadaan kluster, membatasi penggunaan perubahan yang salah (operasi yang tidak sesuai pesanan atau node terputus yang menganggap dirinya sebagai pemimpin). Agar perubahan dianggap valid dan agar dapat diterapkan ke database, mereka harus melewati serangkaian pemeriksaan dan memenuhi kriteria tertentu, yang persis seperti yang disediakan oleh mesin negara.

Untuk seorang pemimpin, operasi diterapkan ke database jika sebagian besar node telah mengkonfirmasi fakta bahwa operasi direplikasi ke log mereka juga. Untuk pengikut, operasi diterapkan ke basis data jika sinyal diterima dari pemimpin yang ia masuki ke dalam basis datanya.

Pengatur waktu


Setiap node menyediakan pertukaran data dengan node lain.

Dua jenis pertanyaan didukung:

  • memilih saat melakukan putaran pemungutan suara
  • append, alias detak jantung (jika tanpa data), untuk mereplikasi data log ke pengikut dan untuk mencegah dimulainya putaran suara baru.

Fakta dari awal suatu peristiwa ditentukan oleh penghitung waktu. Dua jenis timer diluncurkan pada node:

  • Pilih. Untuk memulai putaran pemungutan suara. Setiap node memiliki intervalnya sendiri, setelah itu akan mencoba untuk memulai suara baru. Hitungan mundur dimulai lagi ketika menerima detak jantung dari pemimpin.
  • denyut jantung. Untuk mengirim permintaan ke pengikut oleh pemimpin menambahkan. Jika node tidak menerima detak jantung dan timer pemungutan suara telah kedaluwarsa, ia menjadi kandidat dan memulai pemilihan, meningkatkan jumlah putaran pemungutan suara dan mengirimkan permintaan pemungutan suara ke node lain. Jika simpul mengumpulkan mayoritas suara, maka ia menjadi pemimpin dan mulai mengirimkan detak jantung.

Keadaan node saat ini


Setiap node menyimpan data tentang keadaan saat ini.

https://github.com/pleshakoff/raft/blob/master/server/src/main/java/com/raft/server/context/Context.java

public interface Context {
   Integer getId(); //    
   State getState();//: , ,  
   Integer getVotedFor(); 
               //          
   Long getCurrentTerm(); //  
   Integer getCommitIndex(); //    
   List<Peer> getPeers(); //      
}

Node pemimpin juga menyimpan metadata untuk node yang digunakan untuk mereplikasi data.

https://github.com/pleshakoff/raft/blob/master/server/src/main/java/com/raft/server/node/peers/Peer.java

public interface Peer {
   Integer getId(); //  
   Integer getNextIndex(); //  ,    
   Integer getMatchIndex();//   
   Boolean getVoteGranted(); //     
}

Metadata simpul diperbarui oleh pemimpin saat menerima tanggapan dari pengikut. Mereka digunakan untuk menentukan oleh pemimpin operasi indeks berikutnya yang siap diterima oleh pengikut dan operasi mana yang telah ditambahkan ke log pengikut.

Pemungutan suara


Kelas ElectionService bertanggung jawab untuk memilih

public interface ElectionService {
   void processElection();
   AnswerVoteDTO vote(RequestVoteDTO requestVoteDTO);
} 

Mengirim permintaan untuk pemungutan suara


Jika node adalah pengikut dan tidak menerima detak jantung untuk periode yang ditentukan untuk menunggu, maka ia meningkatkan putaran saat ini, menyatakan dirinya sebagai kandidat dan mulai mengirim permintaan suara ke node lain. Jika dia berhasil mengumpulkan kuorum dan sebagian besar node memberikan suaranya, dia akan menjadi pemimpin baru. Dalam istilah RAFT, kuorum lebih dari setengah dari semua simpul (51%).

Mari kita menganalisis metode processElectionkelas ElectionServiceImpl, yang dipanggil oleh penghitung waktu suara ketika suara berakhir dan mengirim node permintaan untuk memilih .

//1
context.setState(CANDIDATE); 
Long term = context.incCurrentTerm(); 
context.setVotedFor(context.getId()); 

List<Integer> peersIds = context.getPeers().stream().map(Peer::getId).collect(Collectors.toList());
long voteGrantedCount = 1L;
long voteRevokedCount = 0L;

//2
while (checkCurrentElectionStatus(term)) {
   List<AnswerVoteDTO> answers = getVoteFromAllPeers(term, peersIds);
   peersIds = new ArrayList<>();
   for (AnswerVoteDTO answer : answers) {
       //3
       if (answer.getStatusCode().equals(OK)) {
           //4
           if (answer.getTerm()>context.getCurrentTerm()) {
               context.setTermGreaterThenCurrent(answer.getTerm());
               return;
           }
           if (answer.isVoteGranted()) {
               //5 
               context.getPeer(answer.getId()).setVoteGranted(true);
               voteGrantedCount++;
           } else
               //6 
               voteRevokedCount++;
       } else {
          peersIds.add(answer.getId());
       }
   }
  //7
  if (voteGrantedCount >= context.getQuorum()) {
       winElection(term);
       return;
   } else if (voteRevokedCount >= context.getQuorum()) {
       loseElection(term);
       return;
   } 

  1. Tetapkan status "Calon". Naikkan angka bulat dan pilih untuk diri kita sendiri.
  2. , ( ). - , , heartbeat .
  3. - , . , , -.
  4. , . , heartbeat .
  5. Node memilih kami! Kami meningkatkan jumlah node yang memberikan suara untuk kami dan memperbaiki bahwa simpul ini memilih kami.
  6. Dipilih bukan untuk kita, kita juga percaya.
  7. Jika kuorum dikumpulkan dan simpul memenangkan pemilihan, kami menetapkan status "Pemimpin". Kalau tidak, kita menjadi pengikut dan menunggu.

Perlu juga dicatat bahwa ketika sebuah simpul menjadi pemimpin, Indeks Berikutnya diatur untuk setiap simpul dalam daftar node yang disimpan di pemimpin, yang sama dengan indeks terakhir dalam log pemimpin ditambah 1. Mulai dari indeks ini, pemimpin akan mencoba memperbarui log pengikut. Bahkan, indeks yang disimpan oleh pemimpin ini mungkin tidak sesuai dengan indeks sebenarnya dari log pengikut dan nilai aktual hanya akan diperoleh saat bertukar data dengan pengikut dan akan disesuaikan. Tetapi beberapa titik awal diperlukan .

  private void winElection(Long term) {
       context.setState(LEADER);
       context.getPeers().forEach(peer ->
               peer.setNextIndex(operationsLog.getLastIndex()+1)

       );
   }

Voting memproses permintaan


Saat memilih, setiap node menerima permintaan dari formulir berikut dari kandidat :

class RequestVoteDTO {
   private final Long term; //     
   private final Integer candidateId; //  
   private final Integer lastLogIndex; //     
   private final Long lastLogTerm; //       
}

Sekarang mari kita lihat prosedur votekelas ElectionServiceImpl, itu memproses permintaan suara dari kandidat dan mengembalikan keputusan mengenai pencalonannya untuk peran pemimpin.

https://github.com/pleshakoff/raft/blob/eba5ea1984e2623702f4c299cf1b0af7a6ba0d14/server/src/main/java/com/raft/server/election/ElectionServiceImpl.java#L178


public AnswerVoteDTO vote(RequestVoteDTO dto) {
   
       boolean termCheck;
       //1
       if (dto.getTerm() < context.getCurrentTerm())
           return new AnswerVoteDTO(context.getId(),context.getCurrentTerm(),false);
       else //2
       if (dto.getTerm().equals(context.getCurrentTerm())) {
           termCheck = (context.getVotedFor() == null||
                          context.getVotedFor().equals(dto.getCandidateId()));
       }
       else
       {   //3
           termCheck = true;
             context.setTermGreaterThenCurrent(dto.getTerm());
       }

       //4  
       boolean logCheck = !((operationsLog.getLastTerm() > dto.getLastLogTerm()) ||
               ((operationsLog.getLastTerm().equals(dto.getLastLogTerm())) &&
                       (operationsLog.getLastIndex() > dto.getLastLogIndex())));


       boolean voteGranted = termCheck&&logCheck;

       //5
       if (voteGranted) {
           context.setVotedFor(dto.getCandidateId());
       }
       //6   
       return new AnswerVoteDTO(context.getId(),context.getCurrentTerm(),voteGranted);
   }

Setelah menerima permintaan dari seorang kandidat, simpul melakukan dua pemeriksaan: memeriksa putaran kandidat dan panjang log-nya. Jika ronde kandidat lebih tinggi dan log-nya lebih panjang atau sama, maka node memberikan simpulnya suara untuk kandidat

  1. Jika putaran simpul saat ini lebih besar daripada putaran kandidat, kami menolak, karena ini adalah permintaan dari beberapa simpul yang tertinggal, yang, tampaknya, berada di luar cluster untuk beberapa waktu dan memulai prosedur pemilihan karena tidak melihat pemimpin saat ini.
  2. , , , , , , ; . — .
  3. ,
  4. . , , , , .
  5. Dengan hasil positif, kami memperbaiki fakta bahwa simpul tersebut ikut serta dalam pemilihan dan memberikan suara untuk kandidat.
  6. Kirim hasilnya kembali ke kandidat

Tentunya, kondisinya bisa ditulis agak lebih pendek dan lebih elegan, tetapi saya meninggalkan opsi yang lebih "naif" agar tidak bingung sendiri dan tidak membingungkan siapa pun.

Replikasi


Pemimpin penghitung waktu mengirimkan pengikut detak jantung ke semua node untuk mengatur ulang penghitung waktu pemilihan mereka. Karena pemimpin menyimpan dalam indeks metadata operasi terakhir dari semua pengikut, ia dapat mengevaluasi apakah mengirim operasi ke node diperlukan. Jika log operasi pemimpin menjadi lebih panjang dari log pengikut mana pun, maka ia, bersama dengan detak jantung, secara berurutan mengirimkan kepadanya operasi yang hilang. Sebut saja tambahkan permintaan. Jika sebagian besar node mengkonfirmasi penerimaan operasi baru, pemimpin menerapkan operasi ini ke database-nya dan meningkatkan indeks operasi yang diterapkan terakhir. Indeks ini juga dikirim ke pengikut bersama dengan permintaan detak jantung. Dan jika indeks pemimpin lebih tinggi dari indeks pengikut, maka pengikut juga menerapkan operasi ke database untuk menyamakan indeks.

Jenis tambahan ini meminta pemimpin mengirim ke pengikut

class RequestAppendDTO {
   private final Long term; //   
   private final Integer leaderId; //   

   private final Integer prevLogIndex;//   
   private final Long prevLogTerm;//   
   private final Integer leaderCommit;//      
   private final Operation operation; //
}

Ada implementasi di mana operasi ditransfer dalam batch beberapa permintaan. Dalam implementasi saat ini, hanya satu operasi yang dapat ditransmisikan per

permintaan. Kelas merespons pengiriman dan pemrosesan permintaan detak jantung-append:

https://github.com/pleshakoff/raft/blob/eba5ea1984e2623702f4c299cf1b0af7a6ba0d14/server/src/main/java/com/raft/server/replication/ReplicationService.java

public interface ReplicationService {
   void appendRequest();
   AnswerAppendDTO append(RequestAppendDTO requestAppendDTO);
}

Kirim permintaan perubahan data


Pertimbangkan fragmen metodesendAppendForOnePeer kelas ReplicationServiceImpl

. Metode ini bertanggung jawab untuk membuat permintaan kepada pengikut dan mengirimkannya .

private CompletableFuture<AnswerAppendDTO> sendAppendForOnePeer(Integer id) {
   return CompletableFuture.supplyAsync(() -> {
       try {
           //1
           Peer peer = context.getPeer(id);

           Operation operation;
           Integer prevIndex;
           //2    
           if (peer.getNextIndex() <= operationsLog.getLastIndex()) {
               operation = operationsLog.get(peer.getNextIndex());
               prevIndex = peer.getNextIndex() - 1;
           } else 
           //3  
           {
               operation = null;
               prevIndex = operationsLog.getLastIndex();
           }


           RequestAppendDTO requestAppendDTO = new RequestAppendDTO(
                   context.getCurrentTerm(), //   
                   context.getId(), //  
                   prevIndex,//      
                   operationsLog.getTerm(prevIndex),//  
                   context.getCommitIndex(),
                               //      
                   Operation //
           );

...
/*   http     */
}

  1. Metadata pengikut
  2. , . ( ), , , , . , , , ,
  3. , , ; , , ,

Selanjutnya, pertimbangkan metode appendRequestkelas ReplicationServiceImpl, yang bertanggung jawab untuk mengirim permintaan append dan memproses hasilnya ke semua pengikut.

https://github.com/pleshakoff/raft/blob/eba5ea1984e2623702f4c299cf1b0af7a6ba0d14/server/src/main/java/com/raft/server/replication/ReplicationServiceImpl.java#L109

public void appendRequest() {
       List<Integer> peersIds = context.getPeers().stream().map(Peer::getId).collect(Collectors.toList());

       //1 
       while (peersIds.size() > 0) {
           //2 
           List<AnswerAppendDTO> answers = sendAppendToAllPeers(peersIds);
           peersIds = new ArrayList<>();
           for (AnswerAppendDTO answer : answers) {
               //3
               if (answer.getStatusCode().equals(OK)) {
                   //4
                   if (answer.getTerm() > context.getCurrentTerm()) {
                        context.setTermGreaterThenCurrent(answer.getTerm());
                       return;
                   }
                   Peer peer = context.getPeer(answer.getId());
                   //5     
                   if (answer.getSuccess()) {                      
                       peer.setNextIndex(answer.getMatchIndex() + 1);
                       peer.setMatchIndex(answer.getMatchIndex());
                       if (peer.getNextIndex() <= operationsLog.getLastIndex())
                           peersIds.add(answer.getId());
                   //6      
                   } else {
                       peer.decNextIndex();
                       peersIds.add(answer.getId());
                   }
               }
           }
           //7
           tryToCommit();
       }
}

  1. Kami mengulangi permintaan sampai kami menerima tanggapan dari semua pengikut bahwa replikasi berhasil. Karena satu operasi dikirim per permintaan, mungkin diperlukan beberapa iterasi untuk menyinkronkan log pengikut
  2. Kirim permintaan ke semua pengikut dan dapatkan daftar dengan jawaban
  3. Kami menganggap jawaban hanya dari pengikut yang tersedia
  4. Jika ternyata putaran salah satu pengikut lebih dari putaran pemimpin, kami menghentikan semuanya dan berubah menjadi pengikut
  5. Jika pengikut menjawab bahwa semuanya berjalan dengan baik, kami memperbarui metadata pengikut: kami menyimpan indeks terakhir dari log pengikut dan indeks dari operasi berikutnya yang diharapkan oleh pengikut.
  6. , , , , . , , . , . , .
  7. , . .


Sekarang mari kita lihat bagaimana sebenarnya pengikut memproses permintaan tambahan dari pemimpin.
Metode appendkelasReplicationServiceImpl

public AnswerAppendDTO append(RequestAppendDTO dto) {
     
       //1     
       if (dto.getTerm() < context.getCurrentTerm()) {
           return new AnswerAppendDTO(context.getId(),context.getCurrentTerm(),false, null);
       } else if (dto.getTerm() > context.getCurrentTerm()) {
           //2 
           context.setCurrentTerm(dto.getTerm());
           context.setVotedFor(null);
       }
       //3  
       applicationEventPublisher.publishEvent(new ResetElectionTimerEvent(this));

       if (!context.getState().equals(FOLLOWER)) {
           context.setState(FOLLOWER);
       }
        
       //4  
       if ((dto.getPrevLogIndex() > operationsLog.getLastIndex()) ||                                                                                        !dto.getPrevLogTerm().equals(operationsLog.getTerm(dto.getPrevLogIndex()))) {
                      return new AnswerAppendDTO(context.getId(), context.getCurrentTerm(), false, null);
       }


       Operation newOperation = dto.getOperation();
       if (newOperation != null) {
           int newOperationIndex = dto.getPrevLogIndex() + 1;
           
         synchronized (this) {
               //5
               if ((newOperationIndex <= operationsLog.getLastIndex()) &&
                      (!newOperation.getTerm().equals(operationsLog.getTerm(newOperationIndex)))){
                   operationsLog.removeAllFromIndex(newOperationIndex);
               }
               //6
               if (newOperationIndex <= operationsLog.getLastIndex())
               {
                 return new AnswerAppendDTO(context.getId(), context.getCurrentTerm(), true,      operationsLog.getLastIndex());
               }
               //7
               operationsLog.append(newOperation);
           }
        }
        //8 
        if (dto.getLeaderCommit() > context.getCommitIndex()) {
           context.setCommitIndex(Math.min(dto.getLeaderCommit(), operationsLog.getLastIndex()));
       }

                 
       return new AnswerAppendDTO(context.getId(), context.getCurrentTerm(), true, operationsLog.getLastIndex());
   }

  1. Jika ronde pemimpin kurang dari ronde pengikut, maka kami mengirim ronde pemimpin kami dan tanda bahwa permintaannya telah ditolak. Begitu pemimpin menerima satu ronde yang lebih besar daripada tanggapannya, dia akan berubah menjadi pengikut
  2. Jika ronde pemimpin lebih dari ronde pengikut, atur ronde ini ke pengikut.
  3. Karena permintaan diterima dari pemimpin, terlepas dari apakah ada data di sana atau tidak, kami menyetel ulang penghitung waktu suara dan, jika kami bukan pengikut, kami menjadi pengikutnya.
  4. , , , , , , . , ,
  5. , . . , , - , , , , . , .
  6. , . ,
  7. ,
  8. , , , .


Tetap hanya untuk mengetahui bagaimana pemimpin menerapkan operasi dari log ke database. Dalam proses pengiriman operasi kepada pengikut dan pemrosesan tanggapan dari mereka, pemimpin memperbarui metadata dari node. Segera setelah jumlah node yang indeks operasi terakhir dalam log lebih besar dari indeks operasi terakhir yang diterapkan ke database oleh pemimpin menjadi sama dengan kuorum, kita dapat menyatakan bahwa sebagian besar node menerima operasi dan kita dapat menerapkannya pada database pemimpin. Dengan kata lain, jika seorang pemimpin mengirim operasi ke pengikut dan sebagian besar dari mereka memasukkannya ke dalam log-nya dan menjawab pemimpin itu, maka kita dapat menerapkan operasi ini ke basis data pemimpin dan meningkatkan indeks operasi terakhir yang diterapkan. Indeks ini dengan permintaan append-heartbeat berikutnya akan terbang ke pengikut dan itu akan menerapkan operasi dengan indeks yang sama dari log-nya ke database-nya.

Mari kita menganalisis metode tryToCommitkelasReplicationServiceImpl

  private void tryToCommit() {
       while (true) {
           //1
           int N = context.getCommitIndex() + 1;
           //2
           Supplier<Long> count = () ->
               context.getPeers().stream().map(Peer::getMatchIndex).
                       filter(matchIndex -> matchIndex >= N).count() + 1;

           //3 
           if (operationsLog.getLastIndex() >= N &&
                   operationsLog.getTerm(N).equals(context.getCurrentTerm())&&
                      count.get()>=context.getQuorum()
           )
           {
               context.setCommitIndex(N);
           } else
               return;
       }
   }

  1. Kami mendapatkan indeks operasi berikut yang diterapkan ke database
  2. Kami menghitung berapa banyak pengikut yang memiliki operasi dengan indeks sedemikian dalam log mereka, dan jangan lupa untuk menambahkan seorang pemimpin
  3. Jika jumlah pengikut seperti itu adalah kuorum dan operasi dengan indeks seperti itu ada di log pemimpin, dan putaran operasi ini setara dengan yang sekarang, maka pemimpin menerapkan operasi ke database dan meningkatkan indeks operasi yang terakhir diterapkan. Operasi dari babak sebelumnya tidak dapat diterapkan, karena pemimpin lain bertanggung jawab atas mereka dan konflik dapat muncul. Setiap pemimpin hanya menerapkan operasi putarannya saat ini.

Kesimpulan


Setiap algoritma terdistribusi, yang mewakili keluarga RAFT, adalah solusi terintegrasi yang kuat yang menjamin pencapaian hasil, tunduk pada semua aturan yang dijelaskan dalam spesifikasi.

Ada banyak algoritma terdistribusi dan mereka berbeda. Ada ZAB, yang diterapkan di Zookeeper dan digunakan, misalnya, untuk menyinkronkan data di Kafka. Ada algoritma dengan persyaratan yang kurang ketat untuk konsistensi, misalnya, massa implementasi protokol Gossip yang digunakan dalam sistem AP. Ada algoritma yang mengikuti prinsip-prinsip RAFT, dan pada saat yang sama menggunakan protokol gosip untuk bertukar log seperti MOKKA, yang juga menggunakan enkripsi.

Saya percaya bahwa mencoba mencari tahu salah satu dari algoritma ini sangat berguna untuk setiap pengembang, dan seperti yang saya sebutkan di atas, solusi dapat menarik baik secara komprehensif maupun di bagian yang terpisah. Dan jelas, Anda pasti perlu melihat ke arah ini kepada mereka yang kegiatannya terkait dengan pengembangan sistem terdistribusi dan mengenai masalah sinkronisasi data, bahkan jika mereka menggunakan solusi industri standar.

Referensi



Kami berharap materi ini bermanfaat bagi Anda. Dan jika Anda ingin mengikuti kursus , Anda bisa melakukannya sekarang.

All Articles