أباتشي كافكا لالدمى

ستكون هذه المقالة مفيدة لأولئك الذين بدأوا للتو في التعرف على هندسة الخدمات الصغيرة وخدمة Apache Kafka. لا تدعي المادة أنها برنامج تعليمي مفصل ، ولكنها ستساعدك على البدء بسرعة باستخدام هذه التكنولوجيا. سأتحدث عن كيفية تثبيت وتكوين Kafka على Windows 10. وسنقوم أيضًا بإنشاء مشروع باستخدام Intellij IDEA و Spring Boot.

لماذا؟


غالبًا ما ترتبط الصعوبات في فهم الأدوات المختلفة بحقيقة أن المطور لم يواجه أبدًا مواقف قد تكون فيها هذه الأدوات مطلوبة. مع كافكا ، هذا هو نفسه تمامًا. نصف الوضع الذي ستكون فيه هذه التقنية مفيدة. إذا كان لديك بنية تطبيق متجانسة ، فأنت بالطبع لا تحتاج إلى أي كافكا. كل شيء يتغير مع الانتقال إلى الخدمات المصغرة. في الواقع ، كل خدمة مايكروسيرف هي برنامج منفصل يقوم بوظيفة واحدة أو أخرى ، ويمكن إطلاقه بشكل مستقل عن الخدمات الصغيرة الأخرى. يمكن مقارنة الخدمات الصغيرة مع الموظفين في المكتب ، الذين يجلسون في مكاتب منفصلة ويحلون مشكلتهم بشكل مستقل. عمل هذا الفريق الموزع لا يمكن تصوره بدون تنسيق مركزي.يجب أن يكون الموظفون قادرين على تبادل الرسائل ونتائج عملهم مع بعضهم البعض. تم تصميم Apache Kafka للخدمات الصغيرة لحل هذه المشكلة.

أباتشي كافكا وسيط رسائل. مع ذلك ، يمكن أن تتفاعل الخدمات المصغرة مع بعضها البعض ، وإرسال واستقبال معلومات مهمة. السؤال الذي يطرح نفسه ، لماذا لا تستخدم POST العادي - reqest لهذه الأغراض ، في جسمك يمكنك نقل البيانات اللازمة والحصول على الإجابة بنفس الطريقة؟ هذا النهج له عدد من العيوب الواضحة. على سبيل المثال ، يمكن للمنتج (خدمة ترسل رسالة) إرسال البيانات فقط في شكل استجابة استجابة لطلب من المستهلك (خدمة تستقبل البيانات). افترض أن المستهلك يرسل طلب POST ويجيب عليه المنتج. في هذا الوقت ، لسبب ما ، لا يمكن للمستهلك قبول الإجابة. ماذا سيحدث للبيانات؟ سوف يضيعون. سيضطر المستهلك مرة أخرى إلى إرسال طلب ويأمل في عدم تغير البيانات التي يريد تلقيها خلال هذه الفترة ،والمنتج لا يزال على استعداد لقبول الطلب.

أباتشي كافكا يحل هذا والعديد من المشاكل الأخرى التي تنشأ عند تبادل الرسائل بين الخدمات الصغيرة. لن يكون من الخطأ التذكير بأن تبادل البيانات بشكل متواصل ومريح هو أحد المشاكل الرئيسية التي يجب حلها لضمان التشغيل المستقر لهيكل الخدمات الصغيرة.

قم بتثبيت وتكوين ZooKeeper و Apache Kafka على Windows 10


أول شيء تحتاج إلى معرفته للبدء هو أن Apache Kafka يعمل على خدمة ZooKeeper. ZooKeeper هي خدمة تهيئة ومزامنة موزعة ، وهذا كل ما نحتاج إلى معرفته عنه في هذا السياق. يجب علينا تنزيله وتكوينه وتشغيله قبل أن نبدأ مع Kafka. قبل البدء في العمل مع ZooKeeper ، تأكد من تثبيت JRE وتكوينه.

يمكنك تنزيل أحدث إصدار من ZooKeeper من الموقع الرسمي .

نقوم باستخراج الملفات من أرشيف ZooKeeper الذي تم تنزيله إلى مجلد على القرص.
في مجلد zookeeper برقم الإصدار ، نجد مجلد conf وفيه الملف "zoo_sample.cfg".



قم بنسخه وتغيير اسم النسخة إلى "zoo.cfg". افتح ملف النسخ وابحث عن سطر dataDir = / tmp / zookeeper فيه. في هذا السطر نكتب المسار الكامل لمجلد zookeeper-x.x.x. يبدو لي هذا: dataDir = C: \\ ZooKeeper \\ zookeeper-3.6.0

الآن نضيف متغير بيئة النظام: ZOOKEEPER_HOME = C: \ ZooKeeper \ zookeeper-3.4.9 وفي نهاية متغير النظام ، قم بإضافة الإدخال: ؛٪ ZOOKEEPER_HOME ٪ \ سلة مهملات؛

شغّل سطر الأوامر واكتب الأمر:

zkserver

إذا تم تنفيذ كل شيء بشكل صحيح ، فسترى شيئًا مثل ما يلي.



هذا يعني أن ZooKeeper بدأ بشكل طبيعي. ننتقل مباشرة إلى تثبيت وتكوين خادم Apache Kafka. قم بتنزيل أحدث إصدار من الموقع الرسمي واستخرج محتويات الأرشيف: kafka.apache.org/downloads

في المجلد مع Kafka نجد مجلد التكوين ، حيث نجد ملف server.properties ونفتحه.



نجد سطر log.dirs = / tmp / kafka-logs ونشير فيه إلى المسار الذي سيحفظ فيه Kafka السجلات: log.dirs = c: / kafka / kafka-logs.



في نفس المجلد ، قم بتحرير ملف zookeeper.properties. نقوم بتغيير السطر dataDir = / tmp / zookeeper إلى dataDir = c: / kafka / zookeeper-data ، دون نسيان الإشارة إلى المسار إلى مجلد Kafka بعد اسم القرص. إذا فعلت كل شيء بشكل صحيح ، يمكنك تشغيل ZooKeeper و Kafka.



بالنسبة للبعض ، قد تكون مفاجأة غير سارة أنه لا يوجد واجهة المستخدم الرسومية للسيطرة على كافكا. ربما يرجع ذلك إلى أن الخدمة مصممة للمهتمين الذين يعملون بشكل حصري مع وحدة التحكم. بطريقة أو بأخرى ، لتشغيل كافكا نحن بحاجة إلى سطر أوامر.

تحتاج أولاً إلى بدء ZooKeeper. في المجلد مع kafka نجد مجلد bin / windows ، حيث نجد الملف لبدء خدمة zookeeper-server-start.bat ، انقر فوقه. لم يحدث شيء؟ يجب أن يكون الأمر كذلك. افتح وحدة التحكم في هذا المجلد واكتب:

 start zookeeper-server-start.bat

لا يعمل مرة أخرى؟ هذا هو المعيار. ذلك لأن zookeeper-server-start.bat يتطلب المعلمات المحددة في ملف zookeeper.properties ، والتي ، كما نتذكر ، تكمن في مجلد التكوين لعمله. نكتب إلى وحدة التحكم:

start zookeeper-server-start.bat c:\kafka\config\zookeeper.properties 

الآن يجب أن يبدأ كل شيء بشكل طبيعي.



مرة أخرى ، افتح وحدة التحكم في هذا المجلد (لا تغلق ZooKeeper!) وقم بتشغيل kafka:

start kafka-server-start.bat c:\kafka\config\server.properties

لكي لا تكتب أوامر في كل مرة على سطر الأوامر ، يمكنك استخدام الطريقة القديمة المثبتة وإنشاء ملف دفعي بالمحتويات التالية:

start C:\kafka\bin\windows\zookeeper-server-start.bat C:\kafka\config\zookeeper.properties
timeout 10
start C:\kafka\bin\windows\kafka-server-start.bat C:\kafka\config\server.properties

خط المهلة 10 مطلوب لتعيين وقفة بين بدء zookeeper و kafka. إذا قمت بكل شيء بشكل صحيح ، عند النقر فوق ملف الدُفعات ، يجب فتح وحدتي تحكم مع تشغيل zookeeper و kafka. الآن يمكننا إنشاء منتج رسالة ومستهلك بالمعلمات الضرورية مباشرة من سطر الأوامر. ولكن ، من الناحية العملية ، قد تكون هناك حاجة إليها إلا لاختبار الخدمة. سنكون أكثر اهتمامًا بكيفية العمل مع كافكا من IDEA.

العمل مع كافكا من IDEA


سنكتب أبسط تطبيق ممكن ، والذي سيكون في نفس الوقت منتج الرسالة ومستهلكها ، ثم نضيف ميزات مفيدة إليها. إنشاء مشروع ربيعي جديد. يتم ذلك بشكل أكثر ملاءمة باستخدام مُهيئ الربيع. أضف التبعيات org.springframework.kafka و spring-boot-starter-web





ونتيجة لذلك ، يجب أن يبدو ملف pom.xml كما يلي:



لإرسال الرسائل ، نحتاج إلى كائن KafkaTemplate <K، V>. كما نرى يتم كتابة الكائن. المعلمة الأولى هي نوع المفتاح ، والثانية هي الرسالة نفسها. في الوقت الحالي ، سنشير إلى كلا المعلمتين كسلسلة. سنقوم بإنشاء الكائن في وحدة تحكم الفئة. قم بتعريف قالب Kafka واطلب من الربيع أن يقوم بتهيئته عن طريق التعليقAutowired.

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

من حيث المبدأ ، منتجنا جاهز. كل ما عليك فعله هو استدعاء طريقة الإرسال () عليه. هناك العديد من الإصدارات المحملة بهذه الطريقة. نستخدم في مشروعنا خيارًا يحتوي على 3 معلمات - إرسال (موضوع السلسلة ، مفتاح K ، بيانات V). نظرًا لكتابة KafkaTemplate بواسطة String ، فسيكون المفتاح والبيانات في طريقة الإرسال عبارة عن سلسلة. تحدد المعلمة الأولى الموضوع ، أي الموضوع الذي سيتم إرسال الرسائل إليه ، والمستهلكون الذين يمكنهم الاشتراك فيه لتلقيها. إذا كان الموضوع المحدد في طريقة الإرسال غير موجود ، فسيتم إنشاؤه تلقائيًا. يبدو النص الكامل للصف مثل هذا.

@RestController
@RequestMapping("msg")
public class MsgController {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @PostMapping
    public void sendOrder(String msgId, String msg){
        kafkaTemplate.send("msg", msgId, msg);
    }
}

تحكم وحدة التحكم إلى localhost : 8080 / msg ، يتم إرسال المفتاح والرسالة نفسها في نص الطلب.

مرسل الرسالة جاهز ، الآن قم بإنشاء مستمع. يتيح لك Spring أيضًا القيام بذلك دون بذل الكثير من الجهد. يكفي إنشاء طريقة ووضع علامة عليها مع التعليق التوضيحيKafkaListener ، في معلمات يمكنك تحديد الموضوع الذي سيتم الاستماع إليه فقط. في حالتنا ، يبدو هذا.

@KafkaListener(topics="msg")

يمكن للطريقة نفسها ، المميزة بعلامة توضيحية ، أن تحدد معلمة واحدة مقبولة ، لها نوع الرسالة التي يرسلها المنتج.

يجب تمييز الفصل الدراسي الذي سيتم إنشاء المستهلك فيه بتعليق علىEnableKafka.

@EnableKafka
@SpringBootApplication
public class SimpleKafkaExampleApplication {

    @KafkaListener(topics="msg")
    public void msgListener(String msg){
        System.out.println(msg);
    }

    public static void main(String[] args) {
        SpringApplication.run(SimpleKafkaExampleApplication.class, args);
    }
}

أيضًا ، في ملف إعدادات application.property ، يجب عليك تحديد معرف مجموعة معلمة الكونسيرج. إذا لم يتم ذلك ، فلن يبدأ التطبيق. المعلمة من نوع String ويمكن أن تكون أي شيء.

spring.kafka.consumer.group-id=app.1

أبسط مشروع كافكا جاهز. لدينا مرسل ومستلم للرسائل. يبقى فقط لتشغيل. أولاً ، قم بتشغيل ZooKeeper و Kafka باستخدام ملف دفعي كتبناه سابقًا ، ثم قم بتشغيل تطبيقنا. من الأنسب إرسال طلب باستخدام Postman. في نص الطلب ، لا تنس تحديد المعلمات msgId و msg.

إذا رأينا مثل هذه الصورة في IDEA ، فإن كل شيء يعمل: أرسل المنتج رسالة ، واستلمها المستهلك وعرضها على وحدة التحكم.


نحن نعقد المشروع


المشاريع الحقيقية التي تستخدم كافكا هي بالتأكيد أكثر تعقيدًا من تلك التي أنشأناها. الآن بعد أن اكتشفنا الوظائف الأساسية للخدمة ، فكر في الميزات الإضافية التي توفرها. بادئ ذي بدء ، سوف نقوم بتحسين المنتج.

إذا فتحت طريقة send () ، فقد تلاحظ أن جميع المتغيرات لها قيمة إرجاع لـ ListenableFuture <SendResult <K، V >>. الآن لن نفكر بالتفصيل في قدرات هذه الواجهة. هنا سيكون كافياً القول أنه مطلوب لعرض نتيجة إرسال رسالة.

@PostMapping
public void sendMsg(String msgId, String msg){
    ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send("msg", msgId, msg);
    future.addCallback(System.out::println, System.err::println);
    kafkaTemplate.flush();
}

تقبل طريقة addCallback () معلمتين - SuccessCallback و FailureCallback. كلاهما واجهات وظيفية. من الاسم يمكنك أن تفهم أن الطريقة الأولى سيتم استدعاؤها كنتيجة للإرسال الناجح للرسالة ، والثانية - نتيجة لخطأ. الآن ، إذا قمنا بتشغيل المشروع ، فسوف نرى شيئًا مثل ما يلي على وحدة التحكم:

SendResult [producerRecord=ProducerRecord(topic=msg, partition=null, headers=RecordHeaders(headers = [], isReadOnly = true), key=1, value=Hello, world!, timestamp=null), recordMetadata=msg-0@6]

دعونا ننظر بعناية إلى منتجنا مرة أخرى. ومن المثير للاهتمام ، ما الذي يحدث إذا لم يكن المفتاح هو String ، ولكن ، على سبيل المثال ، Long ، ولكن كرسالة مرسلة ، والأسوأ من ذلك - نوع من DTO المعقد؟ أولاً ،



دعنا نحاول تغيير المفتاح إلى قيمة رقمية ... إذا حددنا Long باعتباره المفتاح في المنتج ، فسيبدأ التطبيق بشكل طبيعي ، ولكن عند محاولة إرسال رسالة ، سيتم طرح ClassCastException وسيتم الإبلاغ عن أنه لا يمكن إرسال الفئة الطويلة إلى فئة String.



إذا حاولنا إنشاء كائن KafkaTemplate يدويًا ، فسنرى أن كائن واجهة ProducerFactory <K، V> ، على سبيل المثال ، DefaultKafkaProducerFactory <> ، يتم تمريره إلى المُنشئ كمعلمة. من أجل إنشاء DefaultKafkaProducerFactory ، نحتاج إلى تمرير خريطة تحتوي على إعدادات المنتج الخاصة بها إلى مُنشئها. سيتم وضع جميع التعليمات البرمجية لتكوين وإنشاء المنتج في فئة منفصلة. للقيام بذلك ، قم بإنشاء حزمة التكوين وفيها فئة KafkaProducerConfig.

@Configuration
public class KafkaProducerConfig {

    private String kafkaServer="localhost:9092";

    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
                kafkaServer);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                LongSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                StringSerializer.class);
        return props;
    }

    @Bean
    public ProducerFactory<Long, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    @Bean
    public KafkaTemplate<Long, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

في طريقة productionConfigs () ، قم بإنشاء خريطة مع التكوينات وحدد LongSerializer.class كمسلسل للمفتاح. نبدأ ، نرسل طلبًا من Postman ونرى أن كل شيء يعمل الآن كما ينبغي: المنتج يرسل رسالة ، ويستلمها المستهلك.

الآن دعنا نغير نوع القيمة المرسلة. ماذا لو لم يكن لدينا فئة قياسية من مكتبة Java ، ولكن نوعًا من DTO مخصص. دعنا نقول هذا.

@Data
public class UserDto {
    private Long age;
    private String name;
    private Address address;
}

@Data
@AllArgsConstructor
public class Address {
    private String country;
    private String city;
    private String street;
    private Long homeNumber;
    private Long flatNumber;
}

لإرسال DTO كرسالة ، تحتاج إلى إجراء بعض التغييرات على تكوين المنتج. حدد JsonSerializer.class كمسلسل لقيمة الرسالة ولا تنس تغيير نوع السلسلة إلى UserDto في كل مكان.

@Configuration
public class KafkaProducerConfig {

    private String kafkaServer="localhost:9092";

    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
                kafkaServer);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                LongSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                JsonSerializer.class);
        return props;
    }

    @Bean
    public ProducerFactory<Long, UserDto> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    @Bean
    public KafkaTemplate<Long, UserDto> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

ارسل رسالة. سيتم عرض السطر التالي في وحدة التحكم:



الآن سنقوم بتعقيد المستهلك. قبل ذلك ، كانت طريقتنا العامة msgListener (String msg) ، والمعلمة بالتعليقKafkaListener (المواضيع = "msg") تأخذ String كمعلمة وعرضها على وحدة التحكم. ماذا لو أردنا الحصول على معلمات أخرى للرسالة المرسلة ، على سبيل المثال ، مفتاح أو قسم؟ في هذه الحالة ، يجب تغيير نوع القيمة المرسلة.

@KafkaListener(topics="msg")
public void orderListener(ConsumerRecord<Long, UserDto> record){
    System.out.println(record.partition());
    System.out.println(record.key());
    System.out.println(record.value());
}

من كائن ConsumerRecord ، يمكننا الحصول على جميع المعلمات التي نهتم بها.



نرى أنه بدلاً من المفتاح الموجود على وحدة التحكم ، يتم عرض نوع من krakozyabry. هذا لأنه يتم استخدام StringDeserializer بشكل افتراضي لإلغاء تسلسل المفتاح ، وإذا أردنا أن يتم عرض المفتاح بالتنسيق الصحيح بشكل صحيح ، فيجب تغييره إلى LongDeserializer. لتكوين المستهلك في حزمة التكوين ، قم بإنشاء فئة KafkaConsumerConfig.

@Configuration
public class KafkaConsumerConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String kafkaServer;

    @Value("${spring.kafka.consumer.group-id}")
    private String kafkaGroupId;

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServer);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaGroupId);
        return props;
    }

    @Bean
    public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Long, UserDto> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }

    @Bean
    public ConsumerFactory<Long, UserDto> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }
}

تشبه فئة KafkaConsumerConfig تمامًا فئة KafkaProducerConfig التي أنشأناها سابقًا. هناك أيضًا خريطة تحتوي على التكوينات الضرورية ، على سبيل المثال ، مثل أداة إلغاء المفتاح للمفتاح والقيمة. يتم استخدام الخريطة التي تم إنشاؤها لإنشاء ConsumerFactory <> ، والتي بدورها لازمة لإنشاء KafkaListenerContainerFactory <؟>. تفاصيل مهمة: طريقة إرجاع KafkaListenerContainerFactory <؟> يجب أن تسمى kafkaListenerContainerFactory () ، وإلا لن يتمكن الربيع من العثور على الفول المطلوب ولن يتم تجميع المشروع. نحن بدأنا.



نرى الآن أنه يتم عرض المفتاح كما ينبغي ، مما يعني أن كل شيء يعمل. بالطبع ، تتجاوز قدرات Apache Kafka بكثير تلك المذكورة في هذه المقالة ، ومع ذلك ، آمل أنه بعد قراءتها ستحصل على فكرة عن هذه الخدمة ، والأهم من ذلك ، يمكنك البدء في العمل معها.

اغسل يديك كثيرًا ، وارتدي الأقنعة ، ولا تخرج في الخارج دون الحاجة ، وكن بصحة جيدة.

All Articles