Serialisasi avro di Kafka

Di sini saya akan menjelaskan contoh serialisasi data melalui Avro dan mentransfernya ke Kafka. Untuk Avro ada serializer data untuk Kafka, ia menggunakan registri sirkuit dalam pekerjaannya dan mendukung versi pada server yang dikerahkan terpisah. Di sini hanya akan ada serializer, dan versi jika perlu, misalnya, dapat diimplementasikan, misalnya, dalam database.


Proyek Github



Ini adalah seperti apa data serial yang disiapkan oleh Avro mungkin terlihat. Ada deskripsi judul data dan kemudian data itu sendiri. Ternyata ringkas dan cepat, tidak ada nama bidang duplikat, format data adalah biner. Data diperiksa ketika ditambahkan menggunakan skema data.


Contoh rangkaian:


{"namespace": "avro",
"type": "record",
"name": "Person",
"fields": [
     {"name": "name", "type": "string"},
     {"name": "age",  "type": ["int", "null"]}
]
}

Menggunakan Spring Shell, pada perintah pertama saya menambahkan ke daftar orang, memeriksa sesuai dengan skema Avro:


@ShellComponent
public class Commands {

    private List<GenericRecord> records = new ArrayList<>();

    @ShellMethod("add user to list for send")
    public void add(String name, int age) {
        GenericRecord record = new GenericData.Record(SchemaRepository.instance().getSchemaObject());
        record.put("name", name);
        record.put("age", age);

        records.add(record);
    }

GenericRecord adalah catatan Avro yang dibentuk berdasarkan skema.


public class SchemaRepository {

    private static final String SCHEMA = "{\"namespace\": \"avro\",\n" +
            "\"type\": \"record\",\n" +
            "\"name\": \"Person\",\n" +
            "\"fields\": [\n" +
            "     {\"name\": \"name\", \"type\": \"string\"},\n" +
            "     {\"name\": \"age\",  \"type\": [\"int\", \"null\"]}\n" +
            "]\n" +
            "}\n";

    private static final Schema SCHEMA_OBJECT = new Schema.Parser().parse(SCHEMA);

    private static SchemaRepository INSTANCE = new SchemaRepository();

    public static SchemaRepository instance() {
      return INSTANCE;
    }

    public Schema getSchemaObject() {
        return SCHEMA_OBJECT;
    }

}

Menambahkan shell people ke konsol, dan mengirim topik ke Kafka:



@ShellComponent
public class Commands {

    private List<GenericRecord> records = new ArrayList<>();

    final private KafkaTemplate template;

    public Commands(KafkaTemplate template) {
      this.template = template;
    }

    @ShellMethod("send list users to Kafka")
    public void send() {
        template.setDefaultTopic("test");
        template.sendDefault("1", records);
        template.flush();
        records.clear();
    }

Ini adalah serializer Avro untuk Kafka sendiri:


public class AvroGenericRecordSerializer implements Serializer<List<GenericRecord>> {

    private Schema schema = null;

    @Override public void configure(Map<String, ?> map, boolean b) {
        schema = (Schema) map.get("SCHEMA");
    }

    @Override public byte[] serialize(String arg0, List<GenericRecord> records) {
        byte[] retVal = null;

        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
        GenericDatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema);

        DataFileWriter dataFileWriter = new DataFileWriter<>(datumWriter);
        try {
            dataFileWriter.create(schema, outputStream);
            for (GenericRecord record : records) {
                dataFileWriter.append(record);
            }
            dataFileWriter.flush();
            dataFileWriter.close();
            retVal = outputStream.toByteArray();
        } catch (IOException e) {
            e.printStackTrace();
        }
        return retVal;
    }

    @Override public void close() {
    }

}

Konfigurasi produsen Kafka:


    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServers().get(0));
        props.put(ProducerConfig.RETRIES_CONFIG, 0);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "com.example.model.AvroGenericRecordSerializer");
        props.put("SCHEMA", SchemaRepository.instance().getSchemaObject());
        return props;
    }

Kelas serialisasi ditentukan di sini - "com.example.model.AvroGenericRecordSerializer"
dan parameter baru "SCHEMA" adalah objek skema, diperlukan di AvroGenericRecordSerializer dalam menyiapkan data biner


Di sisi penerima di konsol, kami melihat data yang diterima:



Deserializer avro


public class AvroGenericRecordDeserializer implements Deserializer {

    private Schema schema = null;

    @Override
    public void configure(Map configs, boolean isKey) {
        schema = (Schema) configs.get("SCHEMA");
    }

    @Override
    public Object deserialize(String s, byte[] bytes) {
        DatumReader<GenericRecord> datumReader = new GenericDatumReader<>(schema);
        SeekableByteArrayInput arrayInput = new SeekableByteArrayInput(bytes);
        List<GenericRecord> records = new ArrayList<>();

        DataFileReader<GenericRecord> dataFileReader = null;
        try {
            dataFileReader = new DataFileReader<>(arrayInput, datumReader);
            while (dataFileReader.hasNext()) {
                GenericRecord record = dataFileReader.next();
                records.add(record);
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
        return records;

    }

}

Dan mirip dengan konsumen Kafka:


    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServers().get(0));
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "com.example.model.AvroGenericRecordDeserializer");
        props.put("SCHEMA", SchemaRepository.instance().getSchemaObject());
        return props;
    }

Kafka digunakan dari Docker wurstmeister / kafka-docker , Anda dapat menggunakan yang lain


Proyek Github


avro.apache


All Articles