Sérialisation Avro chez Kafka

Ici, je vais décrire un exemple de sérialisation de données via Avro et de transfert vers Kafka. Pour Avro, il existe un sérialiseur de données pour Kafka, il utilise un registre de circuits dans son travail et prend en charge la gestion des versions sur un serveur déployé distinct. Ici, il n'y aura qu'un sérialiseur, et la version si nécessaire, par exemple, peut être implémentée, par exemple, dans la base de données.


Projet Github



Voici à quoi pourraient ressembler les données sérialisées préparées par Avro. Il y a une description de l'en-tête des données, puis les données elles-mêmes. Il s'avère compact et rapide, il n'y a pas de noms de champs en double, le format des données est binaire. Les données sont vérifiées lorsqu'elles sont ajoutées à l'aide d'un schéma de données.


Exemple de circuit:


{"namespace": "avro",
"type": "record",
"name": "Person",
"fields": [
     {"name": "name", "type": "string"},
     {"name": "age",  "type": ["int", "null"]}
]
}

En utilisant Spring Shell, dans la première commande, j'ajoute à la liste des personnes, en vérifiant selon le schéma Avro:


@ShellComponent
public class Commands {

    private List<GenericRecord> records = new ArrayList<>();

    @ShellMethod("add user to list for send")
    public void add(String name, int age) {
        GenericRecord record = new GenericData.Record(SchemaRepository.instance().getSchemaObject());
        record.put("name", name);
        record.put("age", age);

        records.add(record);
    }

GenericRecord est un enregistrement Avro qui est formé sur la base du schéma.


public class SchemaRepository {

    private static final String SCHEMA = "{\"namespace\": \"avro\",\n" +
            "\"type\": \"record\",\n" +
            "\"name\": \"Person\",\n" +
            "\"fields\": [\n" +
            "     {\"name\": \"name\", \"type\": \"string\"},\n" +
            "     {\"name\": \"age\",  \"type\": [\"int\", \"null\"]}\n" +
            "]\n" +
            "}\n";

    private static final Schema SCHEMA_OBJECT = new Schema.Parser().parse(SCHEMA);

    private static SchemaRepository INSTANCE = new SchemaRepository();

    public static SchemaRepository instance() {
      return INSTANCE;
    }

    public Schema getSchemaObject() {
        return SCHEMA_OBJECT;
    }

}

Ajout de personnes shell à la console et envoi du sujet à Kafka:



@ShellComponent
public class Commands {

    private List<GenericRecord> records = new ArrayList<>();

    final private KafkaTemplate template;

    public Commands(KafkaTemplate template) {
      this.template = template;
    }

    @ShellMethod("send list users to Kafka")
    public void send() {
        template.setDefaultTopic("test");
        template.sendDefault("1", records);
        template.flush();
        records.clear();
    }

Voici le sérialiseur Avro pour Kafka lui-même:


public class AvroGenericRecordSerializer implements Serializer<List<GenericRecord>> {

    private Schema schema = null;

    @Override public void configure(Map<String, ?> map, boolean b) {
        schema = (Schema) map.get("SCHEMA");
    }

    @Override public byte[] serialize(String arg0, List<GenericRecord> records) {
        byte[] retVal = null;

        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
        GenericDatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema);

        DataFileWriter dataFileWriter = new DataFileWriter<>(datumWriter);
        try {
            dataFileWriter.create(schema, outputStream);
            for (GenericRecord record : records) {
                dataFileWriter.append(record);
            }
            dataFileWriter.flush();
            dataFileWriter.close();
            retVal = outputStream.toByteArray();
        } catch (IOException e) {
            e.printStackTrace();
        }
        return retVal;
    }

    @Override public void close() {
    }

}

Configuration du producteur Kafka:


    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServers().get(0));
        props.put(ProducerConfig.RETRIES_CONFIG, 0);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "com.example.model.AvroGenericRecordSerializer");
        props.put("SCHEMA", SchemaRepository.instance().getSchemaObject());
        return props;
    }

La classe de sérialisation est spécifiée ici - "com.example.model.AvroGenericRecordSerializer"
et le nouveau paramètre "SCHEMA" est un objet de schéma, il est nécessaire dans AvroGenericRecordSerializer pour préparer les données binaires


Du côté de la réception dans la console, nous voyons les données reçues:



Désérialiseur Avro


public class AvroGenericRecordDeserializer implements Deserializer {

    private Schema schema = null;

    @Override
    public void configure(Map configs, boolean isKey) {
        schema = (Schema) configs.get("SCHEMA");
    }

    @Override
    public Object deserialize(String s, byte[] bytes) {
        DatumReader<GenericRecord> datumReader = new GenericDatumReader<>(schema);
        SeekableByteArrayInput arrayInput = new SeekableByteArrayInput(bytes);
        List<GenericRecord> records = new ArrayList<>();

        DataFileReader<GenericRecord> dataFileReader = null;
        try {
            dataFileReader = new DataFileReader<>(arrayInput, datumReader);
            while (dataFileReader.hasNext()) {
                GenericRecord record = dataFileReader.next();
                records.add(record);
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
        return records;

    }

}

Et similaire au consommateur Kafka:


    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServers().get(0));
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "com.example.model.AvroGenericRecordDeserializer");
        props.put("SCHEMA", SchemaRepository.instance().getSchemaObject());
        return props;
    }

Kafka utilisé à partir de Docker wurstmeister / kafka-docker , vous pouvez utiliser n'importe quel autre


Projet Github


avro.apache


All Articles