Avro-Serialisierung bei Kafka

Hier beschreibe ich ein Beispiel für die Serialisierung von Daten über Avro und deren Übertragung an Kafka. Für Avro gibt es einen Datenserialisierer für Kafka, der bei seiner Arbeit eine Registrierung von Schaltkreisen verwendet und die Versionierung auf einem separaten bereitgestellten Server unterstützt. Hier gibt es nur einen Serializer, und eine ggf. Versionsverwaltung kann beispielsweise in der Datenbank implementiert werden.


Github-Projekt



So könnten die von Avro vorbereiteten serialisierten Daten aussehen. Es gibt eine Überschriftenbeschreibung der Daten und dann der Daten selbst. Es stellt sich kompakt und schnell heraus, es gibt keine doppelten Feldnamen, das Datenformat ist binär. Daten werden beim Hinzufügen mithilfe eines Datenschemas überprüft.


Beispielschaltung:


{"namespace": "avro",
"type": "record",
"name": "Person",
"fields": [
     {"name": "name", "type": "string"},
     {"name": "age",  "type": ["int", "null"]}
]
}

Mit Spring Shell füge ich im ersten Befehl der Liste der Personen hinzu und überprüfe nach dem Avro-Schema:


@ShellComponent
public class Commands {

    private List<GenericRecord> records = new ArrayList<>();

    @ShellMethod("add user to list for send")
    public void add(String name, int age) {
        GenericRecord record = new GenericData.Record(SchemaRepository.instance().getSchemaObject());
        record.put("name", name);
        record.put("age", age);

        records.add(record);
    }

GenericRecord ist ein Avro-Datensatz, der auf der Grundlage des Schemas erstellt wird.


public class SchemaRepository {

    private static final String SCHEMA = "{\"namespace\": \"avro\",\n" +
            "\"type\": \"record\",\n" +
            "\"name\": \"Person\",\n" +
            "\"fields\": [\n" +
            "     {\"name\": \"name\", \"type\": \"string\"},\n" +
            "     {\"name\": \"age\",  \"type\": [\"int\", \"null\"]}\n" +
            "]\n" +
            "}\n";

    private static final Schema SCHEMA_OBJECT = new Schema.Parser().parse(SCHEMA);

    private static SchemaRepository INSTANCE = new SchemaRepository();

    public static SchemaRepository instance() {
      return INSTANCE;
    }

    public Schema getSchemaObject() {
        return SCHEMA_OBJECT;
    }

}

Hinzufügen von Shell-Personen zur Konsole und Senden des Themas an Kafka:



@ShellComponent
public class Commands {

    private List<GenericRecord> records = new ArrayList<>();

    final private KafkaTemplate template;

    public Commands(KafkaTemplate template) {
      this.template = template;
    }

    @ShellMethod("send list users to Kafka")
    public void send() {
        template.setDefaultTopic("test");
        template.sendDefault("1", records);
        template.flush();
        records.clear();
    }

Hier ist der Avro-Serializer für Kafka selbst:


public class AvroGenericRecordSerializer implements Serializer<List<GenericRecord>> {

    private Schema schema = null;

    @Override public void configure(Map<String, ?> map, boolean b) {
        schema = (Schema) map.get("SCHEMA");
    }

    @Override public byte[] serialize(String arg0, List<GenericRecord> records) {
        byte[] retVal = null;

        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
        GenericDatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema);

        DataFileWriter dataFileWriter = new DataFileWriter<>(datumWriter);
        try {
            dataFileWriter.create(schema, outputStream);
            for (GenericRecord record : records) {
                dataFileWriter.append(record);
            }
            dataFileWriter.flush();
            dataFileWriter.close();
            retVal = outputStream.toByteArray();
        } catch (IOException e) {
            e.printStackTrace();
        }
        return retVal;
    }

    @Override public void close() {
    }

}

Konfiguration des Kafka-Herstellers:


    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServers().get(0));
        props.put(ProducerConfig.RETRIES_CONFIG, 0);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "com.example.model.AvroGenericRecordSerializer");
        props.put("SCHEMA", SchemaRepository.instance().getSchemaObject());
        return props;
    }

Die Serialisierungsklasse wird hier angegeben - "com.example.model.AvroGenericRecordSerializer"
und der neue Parameter "SCHEMA" ist ein Schemaobjekt, das in AvroGenericRecordSerializer zur Vorbereitung von Binärdaten benötigt wird


Auf der Empfangsseite in der Konsole sehen wir die empfangenen Daten:



Avro Deserializer


public class AvroGenericRecordDeserializer implements Deserializer {

    private Schema schema = null;

    @Override
    public void configure(Map configs, boolean isKey) {
        schema = (Schema) configs.get("SCHEMA");
    }

    @Override
    public Object deserialize(String s, byte[] bytes) {
        DatumReader<GenericRecord> datumReader = new GenericDatumReader<>(schema);
        SeekableByteArrayInput arrayInput = new SeekableByteArrayInput(bytes);
        List<GenericRecord> records = new ArrayList<>();

        DataFileReader<GenericRecord> dataFileReader = null;
        try {
            dataFileReader = new DataFileReader<>(arrayInput, datumReader);
            while (dataFileReader.hasNext()) {
                GenericRecord record = dataFileReader.next();
                records.add(record);
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
        return records;

    }

}

Und ähnlich wie beim Kafka-Verbraucher:


    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServers().get(0));
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "com.example.model.AvroGenericRecordDeserializer");
        props.put("SCHEMA", SchemaRepository.instance().getSchemaObject());
        return props;
    }

Kafka von Docker wurstmeister / kafka-docker verwendet , können Sie jede andere verwenden


Github-Projekt


avro.apache


All Articles