рдХрд╛рдлреНрдХрд╛ рдореЗрдВ рдПрд╡рд░реЛ рд╕реАрд░рд┐рдпрд▓

рдпрд╣рд╛рдВ рдореИрдВ рдПрд╡рд░реЛ рдХреЗ рдорд╛рдзреНрдпрдо рд╕реЗ рдбреЗрдЯрд╛ рдХреЛ рдХреНрд░рдордмрджреНрдз рдХрд░рдиреЗ рдФрд░ рдЗрд╕реЗ рдХрд╛рдлреНрдХрд╛ рдореЗрдВ рд╕реНрдерд╛рдирд╛рдВрддрд░рд┐рдд рдХрд░рдиреЗ рдХрд╛ рдПрдХ рдЙрджрд╛рд╣рд░рдг рдмрддрд╛рдКрдВрдЧрд╛ред рдПрд╡рд░реЛ рдХреЗ рд▓рд┐рдП рдХрд╛рдлреНрдХрд╛ рдХреЗ рд▓рд┐рдП рдПрдХ рдбреЗрдЯрд╛ рдзрд╛рд░рд╛рд╡рд╛рд╣рд┐рдХ рд╣реИ, рдпрд╣ рдЕрдкрдиреЗ рдХрд╛рдо рдореЗрдВ рд╕рд░реНрдХрд┐рдЯ рдХреА рдПрдХ рд░рдЬрд┐рд╕реНрдЯреНрд░реА рдХрд╛ рдЙрдкрдпреЛрдЧ рдХрд░рддрд╛ рд╣реИ рдФрд░ рдПрдХ рдЕрд▓рдЧ рддреИрдирд╛рддреА рд╕рд░реНрд╡рд░ рдкрд░ рд╕рдВрд╕реНрдХрд░рдг рдХрд╛ рд╕рдорд░реНрдерди рдХрд░рддрд╛ рд╣реИред рдХреЗрд╡рд▓ рдПрдХ рдзрд╛рд░рд╛рд╡рд╛рд╣рд┐рдХ рд╣реЛрдЧрд╛, рдФрд░ рдпрджрд┐ рдЖрд╡рд╢реНрдпрдХ рд╣реЛ, рддреЛ рд╕рдВрд╕реНрдХрд░рдг рдХреЛ рд▓рд╛рдЧреВ рдХрд┐рдпрд╛ рдЬрд╛ рд╕рдХрддрд╛ рд╣реИ, рдЙрджрд╛рд╣рд░рдг рдХреЗ рд▓рд┐рдП, рдЙрджрд╛рд╣рд░рдг рдХреЗ рд▓рд┐рдП, рдбреЗрдЯрд╛рдмреЗрд╕ рдореЗрдВред


рдЬреАрдердм рдкреНрд░реЛрдЬреЗрдХреНрдЯ



рдпрд╣ рдПрд╡рд░реЛ рджреНрд╡рд╛рд░рд╛ рддреИрдпрд╛рд░ рдХреНрд░рдордмрджреНрдз рдбреЗрдЯрд╛ рдЬреИрд╕рд╛ рд▓рдЧ рд╕рдХрддрд╛ рд╣реИред рдЗрд╕рдореЗрдВ рдбреЗрдЯрд╛ рдФрд░ рдЙрд╕рдХреЗ рдмрд╛рдж рдбреЗрдЯрд╛ рдХрд╛ рд╣реЗрдбрд┐рдВрдЧ рд╡рд┐рд╡рд░рдг рд╣реЛрддрд╛ рд╣реИред рдпрд╣ рдХреЙрдореНрдкреИрдХреНрдЯ рдФрд░ рддреЗрдЬрд╝ рдирд┐рдХрд▓рд╛, рдХреЛрдИ рдбреБрдкреНрд▓рд┐рдХреЗрдЯ рдлрд╝реАрд▓реНрдб рдирд╛рдо рдирд╣реАрдВ рд╣реИрдВ, рдбреЗрдЯрд╛ рдкреНрд░рд╛рд░реВрдк рджреНрд╡рд┐рдЖрдзрд╛рд░реА рд╣реИред рдбреЗрдЯрд╛ рд╕реНрдХреАрдорд╛ рдХрд╛ рдЙрдкрдпреЛрдЧ рдХрд░рддреЗ рд╕рдордп рдбреЗрдЯрд╛ рдХреА рдЬрд╛рдБрдЪ рдХреА рдЬрд╛рддреА рд╣реИред


рдЙрджрд╛рд╣рд░рдг рд╕рд░реНрдХрд┐рдЯ:


{"namespace": "avro",
"type": "record",
"name": "Person",
"fields": [
     {"name": "name", "type": "string"},
     {"name": "age",  "type": ["int", "null"]}
]
}

рд╕реНрдкреНрд░рд┐рдВрдЧ рд╢реЗрд▓ рдХрд╛ рдЙрдкрдпреЛрдЧ рдХрд░рддреЗ рд╣реБрдП, рдкрд╣рд▓реЗ рдХрдорд╛рдВрдб рдореЗрдВ рдореИрдВ рд╡реНрдпрдХреНрддрд┐рдпреЛрдВ рдХреА рд╕реВрдЪреА рдореЗрдВ рдЬреЛрдбрд╝рддрд╛ рд╣реВрдВ, рдПрд╡рд░реЛ рдпреЛрдЬрдирд╛ рдХреЗ рдЕрдиреБрд╕рд╛рд░ рдЬрд╛рдВрдЪ рдХрд░ рд░рд╣рд╛ рд╣реВрдВ:


@ShellComponent
public class Commands {

    private List<GenericRecord> records = new ArrayList<>();

    @ShellMethod("add user to list for send")
    public void add(String name, int age) {
        GenericRecord record = new GenericData.Record(SchemaRepository.instance().getSchemaObject());
        record.put("name", name);
        record.put("age", age);

        records.add(record);
    }

GenericRecord рдПрдХ рдПрд╡рд░реЛ рд░рд┐рдХреЙрд░реНрдб рд╣реИ рдЬреЛ рдпреЛрдЬрдирд╛ рдХреЗ рдЖрдзрд╛рд░ рдкрд░ рдмрдирддрд╛ рд╣реИред


public class SchemaRepository {

    private static final String SCHEMA = "{\"namespace\": \"avro\",\n" +
            "\"type\": \"record\",\n" +
            "\"name\": \"Person\",\n" +
            "\"fields\": [\n" +
            "     {\"name\": \"name\", \"type\": \"string\"},\n" +
            "     {\"name\": \"age\",  \"type\": [\"int\", \"null\"]}\n" +
            "]\n" +
            "}\n";

    private static final Schema SCHEMA_OBJECT = new Schema.Parser().parse(SCHEMA);

    private static SchemaRepository INSTANCE = new SchemaRepository();

    public static SchemaRepository instance() {
      return INSTANCE;
    }

    public Schema getSchemaObject() {
        return SCHEMA_OBJECT;
    }

}

рдХрдВрд╕реЛрд▓ рдореЗрдВ рд╢реЗрд▓ рд╡реНрдпрдХреНрддрд┐рдпреЛрдВ рдХреЛ рдЬреЛрдбрд╝рдирд╛, рдФрд░ рдХрдлрдХрд╛ рдХреЛ рд╡рд┐рд╖рдп рднреЗрдЬрдирд╛:



@ShellComponent
public class Commands {

    private List<GenericRecord> records = new ArrayList<>();

    final private KafkaTemplate template;

    public Commands(KafkaTemplate template) {
      this.template = template;
    }

    @ShellMethod("send list users to Kafka")
    public void send() {
        template.setDefaultTopic("test");
        template.sendDefault("1", records);
        template.flush();
        records.clear();
    }

рдпрд╣рд╛рдБ рдХрд╛рдлреНрдХрд╛ рдХреЗ рд▓рд┐рдП рдПрд╡рд░реЛ рдзрд╛рд░рд╛рд╡рд╛рд╣рд┐рдХ рд╣реИ:


public class AvroGenericRecordSerializer implements Serializer<List<GenericRecord>> {

    private Schema schema = null;

    @Override public void configure(Map<String, ?> map, boolean b) {
        schema = (Schema) map.get("SCHEMA");
    }

    @Override public byte[] serialize(String arg0, List<GenericRecord> records) {
        byte[] retVal = null;

        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
        GenericDatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema);

        DataFileWriter dataFileWriter = new DataFileWriter<>(datumWriter);
        try {
            dataFileWriter.create(schema, outputStream);
            for (GenericRecord record : records) {
                dataFileWriter.append(record);
            }
            dataFileWriter.flush();
            dataFileWriter.close();
            retVal = outputStream.toByteArray();
        } catch (IOException e) {
            e.printStackTrace();
        }
        return retVal;
    }

    @Override public void close() {
    }

}

рдХрдлрдХрд╛ рдЙрддреНрдкрд╛рджрдХ рд╡рд┐рдиреНрдпрд╛рд╕:


    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServers().get(0));
        props.put(ProducerConfig.RETRIES_CONFIG, 0);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "com.example.model.AvroGenericRecordSerializer");
        props.put("SCHEMA", SchemaRepository.instance().getSchemaObject());
        return props;
    }

рдХреНрд░рдорд╛рдВрдХрди рд╡рд░реНрдЧ рдпрд╣рд╛рдБ рдирд┐рд░реНрджрд┐рд╖реНрдЯ рдХрд┐рдпрд╛ рдЧрдпрд╛ рд╣реИ - "com.example.model.AvroGenericRecordSerializer"
рдФрд░ рдирдпрд╛ рдкреИрд░рд╛рдореАрдЯрд░ "SCHEMA" рдПрдХ рд╕реНрдХреАрдорд╛ рдСрдмреНрдЬреЗрдХреНрдЯ рд╣реИ, рдпрд╣ рджреНрд╡рд┐рдЖрдзрд╛рд░реА рдбреЗрдЯрд╛ рддреИрдпрд╛рд░ рдХрд░рдиреЗ рдореЗрдВ AvroGenericRecordSerializer рдореЗрдВ рдЖрд╡рд╢реНрдпрдХ рд╣реИред


рдХрдВрд╕реЛрд▓ рдореЗрдВ рдкреНрд░рд╛рдкреНрдд рдкрдХреНрд╖ рдкрд░, рд╣рдореЗрдВ рдкреНрд░рд╛рдкреНрдд рдбреЗрдЯрд╛ рджрд┐рдЦрд╛рдИ рджреЗрддрд╛ рд╣реИ:



рдПрд╡рд░реЛ рдбрд┐рд╕реЗрд░рд┐рдПрд▓рд╛рдЗрдЬрд╝рд░


public class AvroGenericRecordDeserializer implements Deserializer {

    private Schema schema = null;

    @Override
    public void configure(Map configs, boolean isKey) {
        schema = (Schema) configs.get("SCHEMA");
    }

    @Override
    public Object deserialize(String s, byte[] bytes) {
        DatumReader<GenericRecord> datumReader = new GenericDatumReader<>(schema);
        SeekableByteArrayInput arrayInput = new SeekableByteArrayInput(bytes);
        List<GenericRecord> records = new ArrayList<>();

        DataFileReader<GenericRecord> dataFileReader = null;
        try {
            dataFileReader = new DataFileReader<>(arrayInput, datumReader);
            while (dataFileReader.hasNext()) {
                GenericRecord record = dataFileReader.next();
                records.add(record);
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
        return records;

    }

}

рдФрд░ рдХрдлрдХрд╛ рдЙрдкрднреЛрдХреНрддрд╛ рдХреЗ рд╕рдорд╛рди:


    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServers().get(0));
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "com.example.model.AvroGenericRecordDeserializer");
        props.put("SCHEMA", SchemaRepository.instance().getSchemaObject());
        return props;
    }

Docker wurstmeister / kafka-docker рд╕реЗ рдЙрдкрдпреЛрдЧ рдХрд┐рдпрд╛ рдЬрд╛рдиреЗ рд╡рд╛рд▓рд╛ рдХрд╛рдлреНрдХрд╛ , рдЖрдк рдХрд┐рд╕реА рдЕрдиреНрдп рдХрд╛ рдЙрдкрдпреЛрдЧ рдХрд░ рд╕рдХрддреЗ рд╣реИрдВ


рдЬреАрдердм рдкреНрд░реЛрдЬреЗрдХреНрдЯ


avro.apache


All Articles