تنفيذ خوارزمية توافق RAFT لتخزين KV الموزع في جافا

مرحبا مجددا. قبل أيام قليلة ، بدأ التدريب في مجموعة جديدة حول الدورة "مهندس البرمجيات" ، واليوم نود أن نشارك مقالًا كتبه أحد طلاب الدورة ، أنطون بليشاكوف (رئيس قسم التطوير في Program Logistics والمؤسس المشارك في Clusterra).




في الوقت الحالي ، أصبحت أنظمة الخدمات المصغرة الموزعة معيار الصناعة فعليًا ، وليس فقط في عالم الشركات. تم وصف ومناقشة فوائد استخدام الأنظمة الموزعة أكثر من مرة. لطالما عرف الجميع مزايا الخدمات المصغرة: تقنيات المهمة ، والتركيب ، والقابلية للتطوير ، وتطوير التطوير ، وتقليل TTM ، وما إلى ذلك. من الواضح أن تطوير التطبيقات الموزعة يوفر المزيد من الخيارات للاستجابة في الوقت المناسب لمتطلبات الأعمال المتزايدة ورقمنة كل شيء حولها.

من المهم أيضًا ملاحظة أنه من العوامل المهمة جدًا التي تؤثر حاليًا على اختيار استراتيجية تطوير لصالح الخدمات الدقيقة هو توفر جميع أنواع حلول البنية التحتية الجاهزة التي تأخذ حل المشكلات المرتبطة بالتكاليف الإضافية لتشغيل نظام موزع. نحن نتحدث عن أنظمة تنسيق الحاويات ، وهرس الخدمة ، ووسائل التتبع الموزع والمراقبة وقطع الأشجار وما إلى ذلك. يمكن القول بأمان أن معظم العوامل التي سبق ذكرها على أنها سلبيات منهج الخدمات الصغيرة اليوم لم يكن لها تأثير كبير مثل بضع سنوات مضت.

بناءً على الحقائق الحديثة ، يسعى معظم المطورين في أول فرصة للتبديل من بنية متجانسة إلى بنية خدمات صغيرة. من أولى الخطوات التي يمكن اتخاذها دون اللجوء إلى إعادة الهيكلة الكلية والتحلل الجاد هو تحقيق نظام قابلية التوسع الأفقي. أي ، لتحويل تطبيقك المترابط إلى كتلة ، ربما تتكون حتى من نفس الأحجار المتجانسة ، ولكن تسمح لك بتغيير رقمها ديناميكيًا.

عند محاولة تحقيق قابلية التوسع الأفقية ، تنشأ مسألة مزامنة البيانات داخل مجموعة بسرعة كبيرة وحادة للغاية. لحسن الحظ ، تدعم جميع نظم إدارة قواعد البيانات (DBMS) الحديثة نسخ البيانات بين العقد بطريقة أو بأخرى. يحتاج المطور فقط إلى تحديد DBMS للمهمة وتحديد خصائص النظام (وفقًا لنظرية CAP) التي يحتاجها ، CP أو AP ، ويتم حل المشكلة. في الحالة التي تكون فيها CP مطلوبة ومتطلبات الاتساق عالية ، فإن إحدى طرق حل مشكلة مزامنة البيانات هي استخدام مجموعة تدعم خوارزمية توافق RAFT.

هذه الخوارزمية الجديدة إلى حد ما (تم تطويرها في عام 2012) تعطي ضمانًا عاليًا للاتساق وتحظى بشعبية كبيرة. قررت معرفة كيفية عملها ، وكتبت تنفيذي لمستودع قيم ثابت متناسق في Java (Spring Boot).

هل يعقل تنفيذ أي خوارزمية موزعة بنفسك؟ من الواضح أنه يمكنك تنفيذ تنفيذ جاهز لخوارزمية موزعة ، وبأعلى درجة من الاحتمال سيكون هذا التنفيذ أفضل من "دراجة" محلية الصنع. على سبيل المثال ، يمكنك استخدام DBMS الذي يحافظ على مستوى الاتساق المطلوب. أو يمكنك نشر Zookeeper . أو يمكنك إيجاد إطار عمل يناسب لغتك. بالنسبة إلى Java ، يوجد Atomix ، الذي يحل تمامًا مشاكل مزامنة البيانات الموزعة.

لكن على الجانب الآخر. إذا كنت تستخدم حلًا جاهزًا ، فإن استخدام تطبيق خارجي عادة ما يضيف نقطة فشل إضافية إلى نظامك. وقد تكون الأطر زائدة عن الحاجة أو صعبة التشغيل والتعلم ، أو قد لا تكون موجودة على الإطلاق للغة البرمجة الخاصة بك. بالإضافة إلى ذلك ، يعد التنفيذ المستقل لخوارزمية الإجماع مهمة هندسية مثيرة للاهتمام للغاية توسع آفاقك وتمنحك فهمًا لكيفية حل المشكلات التي تنشأ عندما تتفاعل الخدمات في مجموعة باستخدام الطريقة الأكثر مثالية.

نظرًا لأن مواصفات الخوارزمية تحتوي على مجموعة من الإجراءات للحفاظ على تكامل البيانات ، يمكنك استخدام المعرفة المكتسبة وحتى استخدام الخوارزمية بالكامل. يمكن لأي جزء من الخوارزمية أن يكون مفيدًا في الحياة الواقعية. افترض أن لديك مجموعة من العاملين لتحليل الملفات بالتوازي. العمال متساوون ، لكنك تريد تعيين أحد العمال كمنسق ، وعندما يقع العامل المنسق ، قم بتعيين أي عامل مجاني آخر كمنسق. سيساعدك النصف الأول من خوارزمية RAFT ، التي تصف كيفية اختيار قائد بين العقد المتكافئة ، في ذلك. أو على سبيل المثال ، إذا كان لديك عقدتان فقط فيما يتعلق بالعبد الرئيسي ، يمكنك استخدام قواعد النسخ المتماثل الموضحة في مواصفات RAFT لتنظيم تبادل البيانات في حالتك الأبسط.

المقالة عبارة عن دليل عملي حول كيفية تنفيذ RAFT بنفسك. لن يتم فهم الخوارزمية نفسها والجوانب النظرية لعملها. يمكنك قراءة وصف موجز هنا في هذه المقالة الممتازة أو دراسة المواصفات الكاملة هنا . هناك يمكنك العثور على تصور واضح للغاية للخوارزمية.

وصف الحل العام


يتم تحليل جزء من التعليمات البرمجية المرتبطة مباشرة بتنفيذ الخوارزمية في المقالة. في نهاية المقال يوجد رابط إلى المستودع ، حيث يمكنك رؤية الرمز بالكامل.

كانت المهمة على النحو التالي. تطوير نظام موزع يسمح لك بتخزين البيانات في قاعدة بيانات ذات قيمة رئيسية. يجب أن تكون بيانات كل عقدة متسقة ، أي إذا دخلت البيانات في قاعدة بيانات عقدة واحدة وأكدت معظم العقد أنها تلقت هذه البيانات أيضًا ، فستكون هذه البيانات في قاعدة بيانات كل عقدة عاجلاً أم آجلاً. عندما يتم فصل جزء من الكتلة وعندما يتم توصيله مرة أخرى ، يجب أن تلتحق العقد التي كانت خارج المجموعة مع الكتلة الرئيسية وتتزامن. توفر كل عقدة واجهة برمجة تطبيقات REST لكتابة وقراءة بيانات قاعدة البيانات. يتكون النظام من وحدتين لنوعين من العقد: العميل والخادم. أدناه نعتبر ميزات تنفيذ الخادم نفسه. كود العميل موجود في المستودع.

يمكن أن تعمل عقدة الخادم في ثلاث حالات:

  • تابع (تابع). قبول طلبات القراءة من العميل. يأخذ ضربات القلب من القائد
  • مرشح (مرشح). قبول طلبات القراءة من العميل. يرسل طلبات التصويت إلى العقد الأخرى
  • زعيم يقبل طلبات القراءة والكتابة. يرسل طلبات ضربات القلب إلى العقد الأخرى. يرسل إلحاق طلبات البيانات إلى العقد الأخرى.

تسمى فترة "قيادة" إحدى العقد الجولة (المدى). مرشح جديد يفتح جولة جديدة.

مخزن البيانات


توفر كل عقدة الوصول إلى مستودع سجل العمليات ، حيث يتم تسجيل عمليات تغيير البيانات بالتسلسل.

https://github.com/pleshakoff/raft/blob/master/server/src/main/java/com/raft/server/operations/OperationsLog.java


public interface OperationsLog {
   void append(Operation operation);
   Operation get(Integer index);
   List<Operation> all();

   Long getTerm(Integer index);
   Integer getLastIndex();
   Long getLastTerm();

   void removeAllFromIndex(int newOperationIndex);
}

تحتوي كل عملية ، بالإضافة إلى البيانات والنوع (إدراج ، تغيير ، حذف) ، على رقم الجولة التي تم إنشاؤها فيها. بالإضافة إلى ذلك ، تحتوي كل عملية على فهرس يزيد بالتسلسل. من المهم أن يتم إدراج جميع العمليات في سجلات المتابعين بنفس الترتيب الذي يتم فيه إدراجهم في سجل الزعيم.

كل عقدة لديها حق الوصول إلى قاعدة بيانات يتم فيها تخزين البيانات مباشرة.

https://github.com/pleshakoff/raft/blob/master/server/src/main/java/com/raft/server/storage/Storage.java

public interface Storage {
   List<Entry> all();
   String get(Long key);
   void insert(Long key, String val);
   void update(Long key, String val);
   void delete(Long key);
}

في التطبيق الحالي ، يتم استخدام الحلول المضمنة في الذاكرة لكل من السجل وقاعدة البيانات (قائمة تنافسية عادية وخريطة). إذا لزم الأمر ، يمكنك ببساطة تنفيذ الواجهة المناسبة لدعم أنواع التخزين الأخرى.

يتم تطبيق العمليات من السجل إلى قاعدة البيانات بواسطة آلة حالة موزعة. جهاز الحالة هو مثل هذه الآلية المسؤولة عن تغيير حالة الكتلة ، وتقييد استخدام التغييرات غير الصحيحة (عمليات خارج الترتيب أو عقدة غير متصلة تعتبر نفسها رائدة). من أجل اعتبار التغييرات صالحة ومن أجل تطبيقها على قاعدة البيانات ، يجب أن يجتازوا سلسلة من عمليات التحقق وتفي بمعايير معينة ، وهو بالضبط ما توفره آلة الحالة.

بالنسبة للقائد ، يتم تطبيق عملية على قاعدة البيانات إذا أكدت معظم العقد حقيقة أن العملية يتم نسخها إلى سجلهم أيضًا. بالنسبة للمتابعين ، يتم تطبيق العملية على قاعدة البيانات إذا تم تلقي إشارة من القائد أنها دخلت إلى قاعدة بياناته.

المؤقتات


توفر كل عقدة تبادل البيانات مع العقد الأخرى.

يتم دعم نوعين من الاستعلامات:

  • التصويت عند إجراء جولة من التصويت
  • إلحاق ، ويعرف أيضًا باسم نبض القلب (إذا لم يكن هناك بيانات) ، لتكرار بيانات السجل للمتابعين ولمنع بدء جولة جديدة من التصويت.

يتم تحديد حقيقة بداية الحدث بواسطة جهاز ضبط الوقت. يتم إطلاق نوعين من المؤقتات على العقدة:

  • تصويت. لبدء جولة التصويت. كل عقدة لها فاصل زمني خاص بها ، وبعد ذلك ستحاول بدء تصويت جديد. يبدأ العد التنازلي من جديد عند تلقي نبضات من الزعيم.
  • نبض القلب. لإرسال طلب للمتابعين من قبل قائد الإلحاق. إذا لم تحصل العقدة على نبضات قلب وانتهت موقت التصويت ، فإنها تصبح مرشحًا وتبدأ الانتخابات ، وتزيد عدد جولة التصويت وترسل طلبات التصويت إلى العقد الأخرى. إذا جمعت العقدة غالبية الأصوات ، فإنها تصبح الزعيم وتبدأ في إرسال نبض القلب.

الحالة الحالية للعقدة


تخزن كل عقدة بيانات عن الحالة الحالية.

https://github.com/pleshakoff/raft/blob/master/server/src/main/java/com/raft/server/context/Context.java

public interface Context {
   Integer getId(); //    
   State getState();//: , ,  
   Integer getVotedFor(); 
               //          
   Long getCurrentTerm(); //  
   Integer getCommitIndex(); //    
   List<Peer> getPeers(); //      
}

تقوم العقدة البادئة أيضًا بتخزين البيانات الوصفية للعقد التي تقوم بنسخ البيانات إليها.

https://github.com/pleshakoff/raft/blob/master/server/src/main/java/com/raft/server/node/peers/Peer.java

public interface Peer {
   Integer getId(); //  
   Integer getNextIndex(); //  ,    
   Integer getMatchIndex();//   
   Boolean getVoteGranted(); //     
}

يتم تحديث بيانات تعريف العقدة من قبل الزعيم عند تلقي ردود من المتابعين. يتم استخدامها لتحديد القائد لعملية الفهرس التالية التي يكون المتابعون على استعداد لقبولها والعمليات التي تمت إضافتها بالفعل إلى سجل المتابع.

التصويت


إن فئة ElectionService مسؤولة عن التصويت

public interface ElectionService {
   void processElection();
   AnswerVoteDTO vote(RequestVoteDTO requestVoteDTO);
} 

إرسال طلب التصويت


إذا كانت العقدة متابعًا ولا تتلقى نبضات قلب للفترة المحددة للانتظار ، فإنها تزيد من جولتها الحالية ، وتعلن نفسها كمرشحة وتبدأ في إرسال طلبات التصويت إلى العقد الأخرى. إذا تمكن من جمع النصاب القانوني وأدلت معظم العقد بصوته ، فسيصبح القائد الجديد. في مصطلحات RAFT ، يبلغ النصاب القانوني أكثر من نصف العقد (51٪).

دعونا نحلل طريقة processElectionالفصل ElectionServiceImpl، والتي يطلق عليها موقت التصويت عندما ينتهي التصويت ويرسل العقد طلبًا للتصويت .

//1
context.setState(CANDIDATE); 
Long term = context.incCurrentTerm(); 
context.setVotedFor(context.getId()); 

List<Integer> peersIds = context.getPeers().stream().map(Peer::getId).collect(Collectors.toList());
long voteGrantedCount = 1L;
long voteRevokedCount = 0L;

//2
while (checkCurrentElectionStatus(term)) {
   List<AnswerVoteDTO> answers = getVoteFromAllPeers(term, peersIds);
   peersIds = new ArrayList<>();
   for (AnswerVoteDTO answer : answers) {
       //3
       if (answer.getStatusCode().equals(OK)) {
           //4
           if (answer.getTerm()>context.getCurrentTerm()) {
               context.setTermGreaterThenCurrent(answer.getTerm());
               return;
           }
           if (answer.isVoteGranted()) {
               //5 
               context.getPeer(answer.getId()).setVoteGranted(true);
               voteGrantedCount++;
           } else
               //6 
               voteRevokedCount++;
       } else {
          peersIds.add(answer.getId());
       }
   }
  //7
  if (voteGrantedCount >= context.getQuorum()) {
       winElection(term);
       return;
   } else if (voteRevokedCount >= context.getQuorum()) {
       loseElection(term);
       return;
   } 

  1. تعيين حالة "المرشح". ارفع الرقم وصوت لأنفسنا.
  2. , ( ). - , , heartbeat .
  3. - , . , , -.
  4. , . , heartbeat .
  5. العقدة صوتت لنا! نقوم بزيادة عدد العقد التي تدلي بصوتها لنا ونصلح أن هذه العقدة صوتت لنا.
  6. صوتنا ليس لنا ، نحن نعتقد أيضا.
  7. إذا تم جمع النصاب القانوني وفازت العقدة في الانتخابات ، فإننا نحدد وضع "القائد". خلاف ذلك ، نصبح تابعين وننتظر.

وتجدر الإشارة أيضًا إلى أنه عندما تصبح العقدة زعيمة ، يتم تعيين الفهرس التالي لكل عقدة في قائمة العُقد المخزنة بواسطة القائد ، والتي تساوي الفهرس الأخير في سجل القائد بالإضافة إلى 1. بدءًا من هذا الفهرس ، سيحاول القائد تحديث سجلات المتابعين. في الواقع ، قد لا يتوافق هذا الفهرس المخزن من قبل القائد مع الفهرس الحقيقي لسجل المتابع وسيتم الحصول على القيمة الفعلية فقط عند تبادل البيانات مع المتابع وسيتم تعديلها. ولكن هناك حاجة إلى بعض نقطة البداية .

  private void winElection(Long term) {
       context.setState(LEADER);
       context.getPeers().forEach(peer ->
               peer.setNextIndex(operationsLog.getLastIndex()+1)

       );
   }

معالجة طلب التصويت


عند التصويت ، تتلقى كل عقدة طلبًا من النموذج التالي من المرشح :

class RequestVoteDTO {
   private final Long term; //     
   private final Integer candidateId; //  
   private final Integer lastLogIndex; //     
   private final Long lastLogTerm; //       
}

الآن دعونا نلقي نظرة على إجراء voteالفصل ElectionServiceImpl، فهو يعالج طلب التصويت من المرشح ويعيد قرارًا بشأن ترشيحه لدور القائد.

https://github.com/pleshakoff/raft/blob/eba5ea1984e2623702f4c299cf1b0af7a6ba0d14/server/src/main/java/com/raft/server/election/ElectionServiceImpl.java#L178


public AnswerVoteDTO vote(RequestVoteDTO dto) {
   
       boolean termCheck;
       //1
       if (dto.getTerm() < context.getCurrentTerm())
           return new AnswerVoteDTO(context.getId(),context.getCurrentTerm(),false);
       else //2
       if (dto.getTerm().equals(context.getCurrentTerm())) {
           termCheck = (context.getVotedFor() == null||
                          context.getVotedFor().equals(dto.getCandidateId()));
       }
       else
       {   //3
           termCheck = true;
             context.setTermGreaterThenCurrent(dto.getTerm());
       }

       //4  
       boolean logCheck = !((operationsLog.getLastTerm() > dto.getLastLogTerm()) ||
               ((operationsLog.getLastTerm().equals(dto.getLastLogTerm())) &&
                       (operationsLog.getLastIndex() > dto.getLastLogIndex())));


       boolean voteGranted = termCheck&&logCheck;

       //5
       if (voteGranted) {
           context.setVotedFor(dto.getCandidateId());
       }
       //6   
       return new AnswerVoteDTO(context.getId(),context.getCurrentTerm(),voteGranted);
   }

عند تلقي طلب من أحد المرشحين ، تقوم العقدة بإجراء فحصين: التحقق من جولة المرشح وطول سجله. إذا كانت جولة المرشح أعلى وكان سجله أطول أو متساوٍ ، فإن العقدة تعطي عقدة التصويت للمرشح

  1. إذا كانت الجولة الحالية من العقدة أكبر من جولة المرشح ، فإننا نرفض ، لأن هذا طلب لبعض العقدة المتأخرة ، والتي ، على ما يبدو ، كانت خارج الكتلة لبعض الوقت وبدأت في إجراء الانتخابات لأنها لم تر الزعيم الحالي.
  2. , , , , , , ; . — .
  3. ,
  4. . , , , , .
  5. بنتيجة إيجابية ، نثبت حقيقة أن العقدة شاركت في الانتخابات وأدلت بصوت للمرشح.
  6. أرسل النتيجة مرة أخرى إلى المرشح

بالتأكيد ، كان من الممكن أن تكون الشروط مكتوبة إلى حد ما أقصر وأكثر أناقة ، لكني تركت خيارًا أكثر "ساذجًا" حتى لا أخلط بيني وبين عدم إرباك أي شخص.

تكرار


يرسل قائد المؤقت متابعين دقات القلب إلى جميع العقد لإعادة ضبط مؤقتات التصويت الخاصة بهم. نظرًا لأن القائد يخزن في مؤشرات بياناته الوصفية الخاصة بالعمليات الأخيرة لجميع المتابعين ، يمكنه تقييم ما إذا كان إرسال العملية إلى العقد مطلوبًا. إذا أصبح سجل عمليات القائد أطول من سجل أي متابع ، فإنه ، مع نبضات القلب ، يرسل إليه العمليات المفقودة بالتتابع. نسميها طلب إلحاق. إذا أكدت معظم العقد استلام العمليات الجديدة ، يقوم القائد بتطبيق هذه العمليات على قاعدة بياناته ويزيد من فهرس آخر عملية تم تطبيقها. يتم إرسال هذا الفهرس أيضًا إلى المتابعين مع طلب ضربات القلب. وإذا كان مؤشر القائد أعلى من فهرس المتابعين ، فإن المتابع يطبق أيضًا العمليات على قاعدة بياناته من أجل مساواة المؤشرات.

هذا النوع من الإلحاق يرسله القائد إلى المتابع

class RequestAppendDTO {
   private final Long term; //   
   private final Integer leaderId; //   

   private final Integer prevLogIndex;//   
   private final Long prevLogTerm;//   
   private final Integer leaderCommit;//      
   private final Operation operation; //
}

هناك تطبيقات يتم فيها نقل العمليات على دفعات متعددة لكل طلب. في التطبيق الحالي ، يمكن إرسال عملية واحدة فقط لكل

طلب ، ويستجيب الفصل لإرسال ومعالجة طلب إلحاق ضربات القلب:

https://github.com/pleshakoff/raft/blob/eba5ea1984e2623702f4c299cf1b0af7a6ba0d14/server/src/main/java/com/raft/server/replication/ReplicationService.java

public interface ReplicationService {
   void appendRequest();
   AnswerAppendDTO append(RequestAppendDTO requestAppendDTO);
}

إرسال طلب تغيير البيانات


ضع في اعتبارك جزءًا من طريقةsendAppendForOnePeer فئة ReplicationServiceImpl

، فالطريقة مسؤولة عن إنشاء طلب إلى المتابع وإرساله .

private CompletableFuture<AnswerAppendDTO> sendAppendForOnePeer(Integer id) {
   return CompletableFuture.supplyAsync(() -> {
       try {
           //1
           Peer peer = context.getPeer(id);

           Operation operation;
           Integer prevIndex;
           //2    
           if (peer.getNextIndex() <= operationsLog.getLastIndex()) {
               operation = operationsLog.get(peer.getNextIndex());
               prevIndex = peer.getNextIndex() - 1;
           } else 
           //3  
           {
               operation = null;
               prevIndex = operationsLog.getLastIndex();
           }


           RequestAppendDTO requestAppendDTO = new RequestAppendDTO(
                   context.getCurrentTerm(), //   
                   context.getId(), //  
                   prevIndex,//      
                   operationsLog.getTerm(prevIndex),//  
                   context.getCommitIndex(),
                               //      
                   Operation //
           );

...
/*   http     */
}

  1. البيانات الوصفية للمتابعين
  2. , . ( ), , , , . , , , ,
  3. , , ; , , ,

بعد ذلك ، ضع في اعتبارك طريقة appendRequestالفصل ReplicationServiceImpl، المسؤولة عن إرسال طلب الإلحاق ومعالجة النتيجة لجميع المتابعين.

https://github.com/pleshakoff/raft/blob/eba5ea1984e2623702f4c299cf1b0af7a6ba0d14/server/src/main/java/com/raft/server/replication/ReplicationServiceImpl.java#L109

public void appendRequest() {
       List<Integer> peersIds = context.getPeers().stream().map(Peer::getId).collect(Collectors.toList());

       //1 
       while (peersIds.size() > 0) {
           //2 
           List<AnswerAppendDTO> answers = sendAppendToAllPeers(peersIds);
           peersIds = new ArrayList<>();
           for (AnswerAppendDTO answer : answers) {
               //3
               if (answer.getStatusCode().equals(OK)) {
                   //4
                   if (answer.getTerm() > context.getCurrentTerm()) {
                        context.setTermGreaterThenCurrent(answer.getTerm());
                       return;
                   }
                   Peer peer = context.getPeer(answer.getId());
                   //5     
                   if (answer.getSuccess()) {                      
                       peer.setNextIndex(answer.getMatchIndex() + 1);
                       peer.setMatchIndex(answer.getMatchIndex());
                       if (peer.getNextIndex() <= operationsLog.getLastIndex())
                           peersIds.add(answer.getId());
                   //6      
                   } else {
                       peer.decNextIndex();
                       peersIds.add(answer.getId());
                   }
               }
           }
           //7
           tryToCommit();
       }
}

  1. نكرر الطلب حتى نتلقى استجابة من جميع المتابعين بأن النسخ المتماثل كان ناجحًا. نظرًا لإرسال عملية واحدة لكل طلب ، فقد يستغرق الأمر عدة تكرارات لمزامنة سجلات المتابعين
  2. إرسال الطلبات إلى جميع المتابعين والحصول على قائمة بالإجابات
  3. نعتبر الإجابات فقط من المتابعين المتاحين
  4. إذا اتضح أن جولة أحد المتابعين هي أكثر من جولة الزعيم ، فإننا نوقف كل شيء ونتحول إلى متابعين
  5. إذا رد المتابع بأن كل شيء كان ناجحًا ، فإننا نقوم بتحديث البيانات الوصفية للمتابعين: نقوم بحفظ الفهرس الأخير لسجل المتابع وفهرس العملية التالية التي يتوقعها المتابع.
  6. , , , , . , , . , . , .
  7. , . .


الآن دعونا نرى كيف يعالج التابع بالضبط طلب الإلحاق من القائد.
طريقة appendالفصلReplicationServiceImpl

public AnswerAppendDTO append(RequestAppendDTO dto) {
     
       //1     
       if (dto.getTerm() < context.getCurrentTerm()) {
           return new AnswerAppendDTO(context.getId(),context.getCurrentTerm(),false, null);
       } else if (dto.getTerm() > context.getCurrentTerm()) {
           //2 
           context.setCurrentTerm(dto.getTerm());
           context.setVotedFor(null);
       }
       //3  
       applicationEventPublisher.publishEvent(new ResetElectionTimerEvent(this));

       if (!context.getState().equals(FOLLOWER)) {
           context.setState(FOLLOWER);
       }
        
       //4  
       if ((dto.getPrevLogIndex() > operationsLog.getLastIndex()) ||                                                                                        !dto.getPrevLogTerm().equals(operationsLog.getTerm(dto.getPrevLogIndex()))) {
                      return new AnswerAppendDTO(context.getId(), context.getCurrentTerm(), false, null);
       }


       Operation newOperation = dto.getOperation();
       if (newOperation != null) {
           int newOperationIndex = dto.getPrevLogIndex() + 1;
           
         synchronized (this) {
               //5
               if ((newOperationIndex <= operationsLog.getLastIndex()) &&
                      (!newOperation.getTerm().equals(operationsLog.getTerm(newOperationIndex)))){
                   operationsLog.removeAllFromIndex(newOperationIndex);
               }
               //6
               if (newOperationIndex <= operationsLog.getLastIndex())
               {
                 return new AnswerAppendDTO(context.getId(), context.getCurrentTerm(), true,      operationsLog.getLastIndex());
               }
               //7
               operationsLog.append(newOperation);
           }
        }
        //8 
        if (dto.getLeaderCommit() > context.getCommitIndex()) {
           context.setCommitIndex(Math.min(dto.getLeaderCommit(), operationsLog.getLastIndex()));
       }

                 
       return new AnswerAppendDTO(context.getId(), context.getCurrentTerm(), true, operationsLog.getLastIndex());
   }

  1. إذا كانت جولة القائد أقل من جولة المتابعين ، فإننا نرسل قائدنا جولة وإشارة إلى رفض طلبه. بمجرد أن يتلقى الزعيم جولة أكبر من رده ، سوف يتحول إلى تابع
  2. إذا كانت جولة القائد أكثر من جولة المتابع ، فقم بتعيين هذه الجولة على المتابع.
  3. نظرًا لأنه تم تلقي طلب من القائد ، بغض النظر عما إذا كانت هناك بيانات أم لا ، فإننا نعيد تعيين موقت التصويت ، وإذا لم نكن متابعين ، فإننا نصبح
  4. , , , , , , . , ,
  5. , . . , , - , , , , . , .
  6. , . ,
  7. ,
  8. , , , .


يبقى فقط لمعرفة كيف يطبق القائد العمليات من السجل إلى قاعدة البيانات. في عملية إرسال العمليات إلى المتابعين ومعالجة الردود منهم ، يقوم القائد بتحديث البيانات الوصفية للعقد. بمجرد أن يصبح عدد العقد التي يكون فهرس العملية الأخيرة في السجل أكبر من فهرس العملية الأخيرة المطبقة على قاعدة البيانات من قبل القائد يساوي النصاب القانوني ، يمكننا أن نذكر أن معظم العقد تلقت العملية ويمكننا تطبيقها على قاعدة بيانات الزعيم. بمعنى آخر ، إذا أرسل قائد عملية إلى المتابعين وأدخلها معظمهم في سجله وأجاب على القائد ، فيمكننا تطبيق هذه العملية على قاعدة بيانات الزعيم وزيادة فهرس آخر عملية تم تطبيقها. سينتقل هذا الفهرس مع طلب إلحاق نبضات القلب التالي إلى المتابع وسيطبق العملية بنفس الفهرس من سجله إلى قاعدة بياناته.

دعونا نحلل طريقة tryToCommitالفصلReplicationServiceImpl

  private void tryToCommit() {
       while (true) {
           //1
           int N = context.getCommitIndex() + 1;
           //2
           Supplier<Long> count = () ->
               context.getPeers().stream().map(Peer::getMatchIndex).
                       filter(matchIndex -> matchIndex >= N).count() + 1;

           //3 
           if (operationsLog.getLastIndex() >= N &&
                   operationsLog.getTerm(N).equals(context.getCurrentTerm())&&
                      count.get()>=context.getQuorum()
           )
           {
               context.setCommitIndex(N);
           } else
               return;
       }
   }

  1. نحصل على الفهرس التالي للعملية المطبقة على قاعدة البيانات
  2. نحن نحسب عدد المتابعين الذين لديهم عملية بمثل هذا الفهرس في سجلاتهم ، ولا تنسوا إضافة قائد
  3. إذا كان عدد هؤلاء المتابعين هو النصاب القانوني وكانت العملية بمثل هذا الفهرس في سجل القائد ، وكانت جولة هذه العملية معادلة للعملية الحالية ، عندئذٍ يطبق القائد العملية على قاعدة البيانات ويزيد من فهرس آخر عملية تم تطبيقها. لا يمكن تطبيق العمليات من الجولة السابقة ، لأن قائدًا آخر كان مسؤولًا عنها وقد ينشأ صراع. يطبق كل قائد عمليات جولته الحالية فقط.

استنتاج


أي خوارزمية موزعة ، وممثلها RAFT ، هو حل متكامل قوي يضمن تحقيق النتيجة ، مع مراعاة جميع القواعد الموضحة في المواصفات.

هناك العديد من الخوارزميات الموزعة وهي مختلفة. هناك ZAB ، الذي يتم تنفيذه في Zookeeper ويستخدم ، على سبيل المثال ، لمزامنة البيانات في Kafka. هناك خوارزميات ذات متطلبات أقل صرامة للاتساق ، على سبيل المثال ، كتلة تطبيقات بروتوكول القيل والقال التي يتم استخدامها في أنظمة AP. هناك خوارزميات تتبع مبادئ RAFT ، وفي الوقت نفسه تستخدم بروتوكول القيل والقال لتبادل السجلات مثل MOKKA ، والذي يستخدم أيضًا التشفير.

أعتقد أن محاولة اكتشاف أي من هذه الخوارزميات مفيد للغاية لأي مطور ، وكما ذكرت أعلاه ، يمكن أن تكون الحلول مثيرة للاهتمام بشكل شامل وفي أجزاء منفصلة. ومن الواضح أنك تحتاج بالتأكيد إلى النظر في هذا الاتجاه إلى أولئك الذين ترتبط أنشطتهم بتطوير الأنظمة الموزعة وفيما يتعلق بقضايا مزامنة البيانات ، حتى إذا كانوا يستخدمون حلولًا صناعية قياسية.

المراجع



نأمل أن تكون المادة مفيدة لك. وإذا كنت ترغب في الالتحاق بدورة ، يمكنك القيام بذلك الآن.

All Articles