سأصف هنا مثالاً على تسلسل البيانات من خلال Avro ونقلها إلى Kafka. بالنسبة لـ Avro ، يوجد مُسلسِل بيانات لـ Kafka ، ويستخدم سجلًا للدوائر في عمله ويدعم الإصدارات على خادم منشور منفصل. هنا لن يكون هناك سوى مُسلسل ، ويمكن تنفيذ الإصدارات إذا لزم الأمر ، على سبيل المثال ، في قاعدة البيانات على سبيل المثال.
مشروع جيثب

هذا ما قد تبدو عليه البيانات المتسلسلة التي أعدتها Avro. يوجد وصف عنوان البيانات ثم البيانات نفسها. اتضح أنه مضغوط وسريع ، لا توجد أسماء حقول مكررة ، تنسيق البيانات ثنائي. يتم التحقق من البيانات عند إضافتها باستخدام مخطط البيانات.
مثال للدائرة:
{"namespace": "avro",
"type": "record",
"name": "Person",
"fields": [
     {"name": "name", "type": "string"},
     {"name": "age",  "type": ["int", "null"]}
]
}
باستخدام Spring Shell ، في الأمر الأول ، أضيف إلى قائمة الأشخاص ، والتحقق وفقًا لمخطط Avro:
@ShellComponent
public class Commands {
    private List<GenericRecord> records = new ArrayList<>();
    @ShellMethod("add user to list for send")
    public void add(String name, int age) {
        GenericRecord record = new GenericData.Record(SchemaRepository.instance().getSchemaObject());
        record.put("name", name);
        record.put("age", age);
        records.add(record);
    }
GenericRecord هو سجل Avro الذي تم تشكيله على أساس المخطط.
public class SchemaRepository {
    private static final String SCHEMA = "{\"namespace\": \"avro\",\n" +
            "\"type\": \"record\",\n" +
            "\"name\": \"Person\",\n" +
            "\"fields\": [\n" +
            "     {\"name\": \"name\", \"type\": \"string\"},\n" +
            "     {\"name\": \"age\",  \"type\": [\"int\", \"null\"]}\n" +
            "]\n" +
            "}\n";
    private static final Schema SCHEMA_OBJECT = new Schema.Parser().parse(SCHEMA);
    private static SchemaRepository INSTANCE = new SchemaRepository();
    public static SchemaRepository instance() {
      return INSTANCE;
    }
    public Schema getSchemaObject() {
        return SCHEMA_OBJECT;
    }
}
إضافة صدف إلى وحدة التحكم وإرسال الموضوع إلى كافكا:

@ShellComponent
public class Commands {
    private List<GenericRecord> records = new ArrayList<>();
    final private KafkaTemplate template;
    public Commands(KafkaTemplate template) {
      this.template = template;
    }
    @ShellMethod("send list users to Kafka")
    public void send() {
        template.setDefaultTopic("test");
        template.sendDefault("1", records);
        template.flush();
        records.clear();
    }
هنا هو Avro serializer لـ Kafka نفسه:
public class AvroGenericRecordSerializer implements Serializer<List<GenericRecord>> {
    private Schema schema = null;
    @Override public void configure(Map<String, ?> map, boolean b) {
        schema = (Schema) map.get("SCHEMA");
    }
    @Override public byte[] serialize(String arg0, List<GenericRecord> records) {
        byte[] retVal = null;
        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
        GenericDatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema);
        DataFileWriter dataFileWriter = new DataFileWriter<>(datumWriter);
        try {
            dataFileWriter.create(schema, outputStream);
            for (GenericRecord record : records) {
                dataFileWriter.append(record);
            }
            dataFileWriter.flush();
            dataFileWriter.close();
            retVal = outputStream.toByteArray();
        } catch (IOException e) {
            e.printStackTrace();
        }
        return retVal;
    }
    @Override public void close() {
    }
}
تكوين منتج كافكا:
    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServers().get(0));
        props.put(ProducerConfig.RETRIES_CONFIG, 0);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "com.example.model.AvroGenericRecordSerializer");
        props.put("SCHEMA", SchemaRepository.instance().getSchemaObject());
        return props;
    }
يتم تحديد فئة التسلسل هنا - "com.example.model.AvroGenericRecordSerializer"
والمعلمة الجديدة "SCHEMA" هي كائن مخطط ، وهي مطلوبة في AvroGenericRecordSerializer في إعداد البيانات الثنائية
على جانب الاستلام في وحدة التحكم ، نرى البيانات المستلمة:

Avro deserializer
public class AvroGenericRecordDeserializer implements Deserializer {
    private Schema schema = null;
    @Override
    public void configure(Map configs, boolean isKey) {
        schema = (Schema) configs.get("SCHEMA");
    }
    @Override
    public Object deserialize(String s, byte[] bytes) {
        DatumReader<GenericRecord> datumReader = new GenericDatumReader<>(schema);
        SeekableByteArrayInput arrayInput = new SeekableByteArrayInput(bytes);
        List<GenericRecord> records = new ArrayList<>();
        DataFileReader<GenericRecord> dataFileReader = null;
        try {
            dataFileReader = new DataFileReader<>(arrayInput, datumReader);
            while (dataFileReader.hasNext()) {
                GenericRecord record = dataFileReader.next();
                records.add(record);
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
        return records;
    }
}
وعلى غرار مستهلك كافكا:
    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServers().get(0));
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "com.example.model.AvroGenericRecordDeserializer");
        props.put("SCHEMA", SchemaRepository.instance().getSchemaObject());
        return props;
    }
تستخدم كافكا من Docker wurstmeister / kafka-docker ، يمكنك استخدام أي أخرى
مشروع جيثب
avro.apache