Apache Kafka untuk Dummies

Artikel ini akan berguna bagi mereka yang baru mulai berkenalan dengan arsitektur layanan mikro dan dengan layanan Apache Kafka. Materi tersebut tidak mengklaim sebagai tutorial terperinci, tetapi akan membantu Anda dengan cepat memulai dengan teknologi ini. Saya akan berbicara tentang cara menginstal dan mengkonfigurasi Kafka di Windows 10. Kami juga akan membuat proyek menggunakan Intellij IDEA dan Spring Boot.

Untuk apa?


Kesulitan dalam memahami berbagai alat sering dikaitkan dengan fakta bahwa pengembang tidak pernah mengalami situasi di mana alat ini mungkin diperlukan. Dengan Kafka, ini persis sama. Kami menggambarkan situasi di mana teknologi ini akan berguna. Jika Anda memiliki arsitektur aplikasi monolitik, maka tentu saja Anda tidak memerlukan Kafka. Semuanya berubah dengan transisi ke layanan microser. Faktanya, setiap microservice adalah program terpisah yang melakukan satu atau beberapa fungsi lainnya, dan yang dapat diluncurkan secara independen dari layanan microser lainnya. Layanan microser dapat dibandingkan dengan karyawan di kantor, yang duduk di meja terpisah dan secara mandiri menyelesaikan masalah mereka. Pekerjaan tim terdistribusi seperti itu tidak terpikirkan tanpa koordinasi pusat.Karyawan harus dapat saling bertukar pesan dan hasil pekerjaan mereka satu sama lain. Apache Kafka untuk layanan microser dirancang untuk memecahkan masalah ini.

Apache Kafka adalah broker pesan. Dengannya, layanan microser dapat berinteraksi satu sama lain, mengirim dan menerima informasi penting. Pertanyaannya adalah, mengapa tidak menggunakan POST biasa - diperlukan untuk keperluan ini, di mana Anda dapat mentransfer data yang diperlukan dan mendapatkan jawabannya dengan cara yang sama? Pendekatan ini memiliki sejumlah kelemahan yang jelas. Misalnya, produsen (layanan yang mengirim pesan) hanya dapat mengirim data dalam bentuk respons sebagai tanggapan atas permintaan dari konsumen (layanan yang menerima data). Misalkan konsumen mengirimkan permintaan POST dan produsen menjawabnya. Pada saat ini, karena suatu alasan, konsumen tidak dapat menerima jawabannya. Apa yang akan terjadi pada data? Mereka akan hilang. Konsumen sekali lagi harus mengirim permintaan dan berharap bahwa data yang dia ingin terima tidak berubah selama ini,dan produsen masih siap menerima permintaan.

Apache Kafka memecahkan ini dan banyak masalah lain yang muncul saat bertukar pesan antara layanan microser. Tidak akan salah untuk mengingat bahwa pertukaran data yang tidak terganggu dan nyaman adalah salah satu masalah utama yang harus dipecahkan untuk memastikan operasi yang stabil dari arsitektur layanan-mikro.

Instal dan konfigurasikan ZooKeeper dan Apache Kafka di Windows 10


Hal pertama yang perlu Anda ketahui untuk memulai adalah bahwa Apache Kafka berjalan di atas layanan ZooKeeper. ZooKeeper adalah layanan konfigurasi dan sinkronisasi yang terdistribusi, dan hanya itu yang perlu kita ketahui tentang hal ini dalam konteks ini. Kita harus mengunduh, mengkonfigurasi, dan menjalankannya sebelum memulai dengan Kafka. Sebelum Anda mulai bekerja dengan ZooKeeper, pastikan Anda telah menginstal dan mengkonfigurasi JRE.

Anda dapat mengunduh ZooKeeper versi terbaru dari situs web resmi .

Kami mengekstrak file dari arsip ZooKeeper yang diunduh ke folder pada disk.
Di folder zookeeper dengan nomor versi, kami menemukan folder conf dan di dalamnya file "zoo_sample.cfg".



Salin dan ubah nama salinan menjadi "zoo.cfg". Buka file salin dan temukan baris dataDir = / tmp / zookeeper di dalamnya. Pada baris ini kita menulis path lengkap ke folder zookeeper-x.x.x kita. Kelihatannya seperti ini untuk saya: dataDir = C: \\ ZooKeeper \\ zookeeper-3.6.0

Sekarang kita menambahkan variabel lingkungan sistem: ZOOKEEPER_HOME = C: \ ZooKeeper \ zookeeper-3.4.9 dan pada akhir variabel sistem Path menambahkan entri:;% ZOOKEEPER_HOME % \ tempat sampah;

Jalankan baris perintah dan tulis perintah:

zkserver

Jika semuanya dilakukan dengan benar, Anda akan melihat sesuatu seperti yang berikut ini.



Ini berarti bahwa ZooKeeper memulai secara normal. Kami langsung melanjutkan ke instalasi dan konfigurasi server Apache Kafka. Unduh versi terbaru dari situs resmi dan ekstrak konten arsip: kafka.apache.org/downloads

Dalam folder dengan Kafka kita menemukan folder konfigurasi, di dalamnya kita menemukan file server.properties dan membukanya.



Kami menemukan baris log.dirs = / tmp / kafka-log dan menunjukkan di dalamnya jalur di mana Kafka akan menyimpan log: log.dirs = c: / kafka / kafka-logs.



Di folder yang sama, edit file properti zookeeper.prop. Kami mengubah baris dataDir = / tmp / zookeeper ke dataDir = c: / kafka / zookeeper-data, tanpa lupa menunjukkan jalur ke folder Kafka Anda setelah nama disk. Jika Anda melakukan semuanya dengan benar, Anda dapat menjalankan ZooKeeper dan Kafka.



Bagi sebagian orang, mungkin merupakan kejutan yang tidak menyenangkan bahwa tidak ada GUI untuk mengendalikan Kafka. Mungkin ini karena layanan ini dirancang untuk nerd keras yang bekerja secara eksklusif dengan konsol. Dengan satu atau lain cara, untuk menjalankan Kafka kita perlu baris perintah.

Pertama, Anda perlu memulai ZooKeeper. Dalam folder dengan kafka kita menemukan folder bin / windows, di dalamnya kita menemukan file untuk meluncurkan layanan zookeeper-server-start.bat, klik di atasnya. Tidak ada yang terjadi? Seharusnya begitu. Buka konsol di folder ini dan tulis:

 start zookeeper-server-start.bat

Tidak bekerja lagi Ini adalah norma. Itu karena zookeeper-server-start.bat membutuhkan parameter yang ditentukan dalam file properti zookeeper.prop, yang, seperti yang kita ingat, terletak di folder konfigurasi untuk pekerjaannya. Kami menulis ke konsol:

start zookeeper-server-start.bat c:\kafka\config\zookeeper.properties 

Sekarang semuanya harus dimulai dengan normal.



Sekali lagi, buka konsol di folder ini (jangan tutup ZooKeeper!) Dan jalankan kafka:

start kafka-server-start.bat c:\kafka\config\server.properties

Agar tidak menulis perintah setiap kali pada baris perintah, Anda dapat menggunakan metode yang sudah terbukti dan membuat file batch dengan konten berikut:

start C:\kafka\bin\windows\zookeeper-server-start.bat C:\kafka\config\zookeeper.properties
timeout 10
start C:\kafka\bin\windows\kafka-server-start.bat C:\kafka\config\server.properties

Timeout 10 line diperlukan untuk mengatur jeda antara mulai zookeeper dan kafka. Jika Anda melakukan segalanya dengan benar, ketika Anda mengklik file batch, dua konsol akan terbuka dengan zookeeper dan kafka berjalan. Sekarang kita dapat membuat penghasil pesan dan konsumen dengan parameter yang diperlukan langsung dari baris perintah. Tetapi, dalam praktiknya, itu mungkin diperlukan kecuali untuk menguji layanan. Kami akan jauh lebih tertarik dengan cara bekerja dengan kafka dari IDEA.

Bekerja dengan kafka dari IDEA


Kami akan menulis aplikasi sesederhana mungkin, yang sekaligus akan menjadi penghasil dan konsumen pesan, dan kemudian menambahkan fitur-fitur yang bermanfaat untuknya. Buat proyek pegas baru. Ini paling mudah dilakukan menggunakan inisialisasi pegas. Tambahkan dependensi org.springframework.kafka dan spring-boot-starter-web





Akibatnya, file pom.xml akan terlihat seperti ini:



Untuk mengirim pesan, kita memerlukan objek KafkaTemplate <K, V>. Seperti yang kita lihat objek diketik. Parameter pertama adalah jenis kunci, yang kedua adalah pesan itu sendiri. Untuk saat ini, kami akan menunjukkan kedua parameter sebagai String. Kami akan membuat objek di kelas-controller. Nyatakan KafkaTemplate dan minta Spring untuk menginisialisasi dengan anotasiAutowired.

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

Pada prinsipnya, produsen kami siap. Yang masih harus dilakukan adalah memanggil metode send () di atasnya. Ada beberapa versi berlebihan dari metode ini. Kami menggunakan dalam proyek kami opsi dengan 3 parameter - kirim (topik String, kunci K, data V). Karena KafkaTemplate diketik oleh String, kunci dan data dalam metode kirim akan berupa string. Parameter pertama menunjukkan topik, yaitu topik ke mana pesan akan dikirim, dan konsumen mana yang dapat berlangganan untuk menerimanya. Jika topik yang ditentukan dalam metode pengiriman tidak ada, itu akan dibuat secara otomatis. Teks kelas penuh terlihat seperti ini.

@RestController
@RequestMapping("msg")
public class MsgController {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @PostMapping
    public void sendOrder(String msgId, String msg){
        kafkaTemplate.send("msg", msgId, msg);
    }
}

Pengontrol memetakan ke localhost : 8080 / msg, kunci dan pesan itu sendiri ditransmisikan dalam tubuh permintaan.

Pengirim pesan sudah siap, sekarang buat pendengar. Musim semi juga memungkinkan Anda melakukan ini tanpa banyak usaha. Cukup membuat metode dan menandainya dengan penjelasan @KafkaListener, pada parameter yang Anda hanya dapat menentukan topik yang akan didengarkan. Dalam kasus kami, sepertinya ini.

@KafkaListener(topics="msg")

Metode itu sendiri, ditandai dengan anotasi, dapat menentukan satu parameter yang diterima, yang memiliki jenis pesan yang dikirimkan oleh produsen.

Kelas di mana konsumen akan dibuat harus ditandai dengan penjelasan @EnableKafka.

@EnableKafka
@SpringBootApplication
public class SimpleKafkaExampleApplication {

    @KafkaListener(topics="msg")
    public void msgListener(String msg){
        System.out.println(msg);
    }

    public static void main(String[] args) {
        SpringApplication.run(SimpleKafkaExampleApplication.class, args);
    }
}

Juga, dalam file pengaturan application.property, Anda harus menentukan parameter parameter concierge-id. Jika ini tidak dilakukan, aplikasi tidak akan memulai. Parameternya adalah tipe String dan bisa apa saja.

spring.kafka.consumer.group-id=app.1

Proyek kafka kami yang paling sederhana sudah siap. Kami memiliki pengirim dan penerima pesan. Tetap hanya berjalan. Pertama, luncurkan ZooKeeper dan Kafka menggunakan file batch yang kami tulis sebelumnya, kemudian luncurkan aplikasi kami. Paling mudah mengirim permintaan menggunakan tukang pos. Dalam isi permintaan, jangan lupa untuk menentukan parameter msgId dan msg.

Jika kita melihat gambar seperti itu di IDEA, maka semuanya berfungsi: produser mengirim pesan, konsumen menerimanya dan menampilkannya di konsol.


Kami menyulitkan proyek


Proyek nyata menggunakan Kafka tentu lebih rumit daripada yang kami buat. Sekarang kami telah mengetahui fungsi dasar layanan, pertimbangkan fitur tambahan apa yang disediakannya. Untuk memulainya, kami akan meningkatkan produser.

Jika Anda membuka metode send (), Anda mungkin memperhatikan bahwa semua variannya memiliki nilai balik dari ListenableFuture <SendResult <K, V >>. Sekarang kami tidak akan mempertimbangkan secara terperinci kemampuan antarmuka ini. Di sini akan cukup untuk mengatakan bahwa perlu untuk melihat hasil pengiriman pesan.

@PostMapping
public void sendMsg(String msgId, String msg){
    ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send("msg", msgId, msg);
    future.addCallback(System.out::println, System.err::println);
    kafkaTemplate.flush();
}

Metode addCallback () menerima dua parameter - SuccessCallback dan FailureCallback. Keduanya adalah antarmuka fungsional. Dari namanya, Anda dapat memahami bahwa metode yang pertama akan dipanggil sebagai hasil dari pengiriman pesan yang berhasil, yang kedua - sebagai akibat dari kesalahan. Sekarang, jika kita menjalankan proyek, kita akan melihat sesuatu seperti yang berikut di konsol:

SendResult [producerRecord=ProducerRecord(topic=msg, partition=null, headers=RecordHeaders(headers = [], isReadOnly = true), key=1, value=Hello, world!, timestamp=null), recordMetadata=msg-0@6]

Mari kita teliti lagi produsen kita. Menariknya, apa yang terjadi jika kuncinya bukan String, tetapi, katakanlah, Long, tetapi sebagai pesan yang dikirimkan, bahkan lebih buruk lagi - semacam DTO yang rumit? Pertama,



mari kita coba untuk mengubah kunci ke nilai numerik ... Jika kita menetapkan Long sebagai kunci di produser, aplikasi akan mulai secara normal, tetapi ketika Anda mencoba mengirim pesan, ClassCastException akan dilempar dan akan dilaporkan bahwa kelas Long tidak dapat dilemparkan ke kelas String.



Jika kita mencoba membuat objek KafkaTemplate secara manual, kita akan melihat bahwa objek antarmuka ProducerFactory <K, V>, misalnya, DefaultKafkaProducerFactory <>, diteruskan ke konstruktor sebagai parameter. Untuk membuat DefaultKafkaProducerFactory, kita perlu meneruskan Peta yang berisi pengaturan produsennya ke konstruktornya. Semua kode untuk konfigurasi dan pembuatan produser akan ditempatkan di kelas yang terpisah. Untuk melakukan ini, buat paket config dan di dalamnya kelas KafkaProducerConfig.

@Configuration
public class KafkaProducerConfig {

    private String kafkaServer="localhost:9092";

    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
                kafkaServer);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                LongSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                StringSerializer.class);
        return props;
    }

    @Bean
    public ProducerFactory<Long, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    @Bean
    public KafkaTemplate<Long, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

Dalam metode producerConfigs (), buat peta dengan konfigurasi dan tentukan LongSerializer.class sebagai serializer untuk kunci tersebut. Kami mulai, mengirim permintaan dari Postman dan melihat bahwa sekarang semuanya berfungsi sebagaimana mestinya: produsen mengirim pesan, dan konsumen menerimanya.

Sekarang mari kita ubah jenis nilai yang dikirimkan. Bagaimana jika kita tidak memiliki kelas standar dari perpustakaan Java, tetapi semacam DTO kustom. Katakan ini.

@Data
public class UserDto {
    private Long age;
    private String name;
    private Address address;
}

@Data
@AllArgsConstructor
public class Address {
    private String country;
    private String city;
    private String street;
    private Long homeNumber;
    private Long flatNumber;
}

Untuk mengirim DTO sebagai pesan, Anda perlu membuat beberapa perubahan pada konfigurasi produsen. Tentukan JsonSerializer.class sebagai serializer dari nilai pesan dan jangan lupa untuk mengubah tipe String ke UserDto di mana-mana.

@Configuration
public class KafkaProducerConfig {

    private String kafkaServer="localhost:9092";

    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
                kafkaServer);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                LongSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                JsonSerializer.class);
        return props;
    }

    @Bean
    public ProducerFactory<Long, UserDto> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    @Bean
    public KafkaTemplate<Long, UserDto> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

Kirim pesan. Baris berikut akan ditampilkan di konsol:



Sekarang kita akan menyulitkan konsumen. Sebelum ini, metode kami public void msgListener (String msg), ditandai dengan anotasi @KafkaListener (topik = "msg") mengambil String sebagai parameter dan menampilkannya di konsol. Bagaimana jika kita ingin mendapatkan parameter lain dari pesan yang dikirimkan, misalnya, kunci atau partisi? Dalam hal ini, jenis nilai yang dikirimkan harus diubah.

@KafkaListener(topics="msg")
public void orderListener(ConsumerRecord<Long, UserDto> record){
    System.out.println(record.partition());
    System.out.println(record.key());
    System.out.println(record.value());
}

Dari objek ConsumerRecord kita bisa mendapatkan semua parameter yang kita minati.



Kami melihat bahwa alih-alih tombol pada konsol, beberapa jenis krakozyabry ditampilkan. Ini karena StringDeserializer digunakan secara default untuk deserialize kunci, dan jika kita ingin kunci dalam format integer ditampilkan dengan benar, kita harus mengubahnya ke LongDeserializer. Untuk mengkonfigurasi konsumen dalam paket konfigurasi, buat kelas KafkaConsumerConfig.

@Configuration
public class KafkaConsumerConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String kafkaServer;

    @Value("${spring.kafka.consumer.group-id}")
    private String kafkaGroupId;

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServer);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaGroupId);
        return props;
    }

    @Bean
    public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Long, UserDto> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }

    @Bean
    public ConsumerFactory<Long, UserDto> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }
}

Kelas KafkaConsumerConfig sangat mirip dengan KafkaProducerConfig yang kami buat sebelumnya. Ada juga Peta yang berisi konfigurasi yang diperlukan, misalnya, seperti deserializer untuk kunci dan nilai. Peta yang dibuat digunakan untuk membuat ConsumerFactory <>, yang, pada gilirannya, diperlukan untuk membuat KafkaListenerContainerFactory <?>. Detail penting: metode mengembalikan KafkaListenerContainerFactory <?> Harus disebut kafkaListenerContainerFactory (), jika tidak, Spring tidak akan dapat menemukan kacang yang diinginkan dan proyek tidak akan dikompilasi. Kami mulai.



Kita melihat bahwa sekarang kunci ditampilkan sebagaimana mestinya, yang berarti semuanya berfungsi. Tentu saja, kemampuan Apache Kafka jauh melampaui yang dijelaskan dalam artikel ini, namun, saya berharap bahwa setelah membacanya Anda akan mendapatkan ide tentang layanan ini dan, yang paling penting, Anda dapat mulai bekerja dengannya.

Cuci tangan Anda lebih sering, pakai masker, jangan keluar tanpa perlu, dan sehatlah.

All Articles