Las trampas de probar las corrientes de Kafka


Kafka, , , , , . — ! — , .


TopologyTestDriver


Kafka Streams TopologyTestDriver. API 2.4 . :



TopologyTestDriver ( , ) Properties:


TopologyTestDriver topologyTestDriver = 
       new TopologyTestDriver(topology, config.asProperties());

, , , Kafka-. «» , «» — :


TestInputTopic<String, String> inputTopic =
   topologyTestDriver.createInputTopic(
         INPUT_TOPIC, new StringSerializer(), new StringSerializer());
TestOutputTopic<String, String> outputTopic =
   topologyTestDriver.createOutputTopic(
         OUTPUT_TOPIC, new StringDeserializer(), new StringDeserializer());

TestInputTopic TestOutputTopic , / , .


/ , , , Record-. time windows, .


TopologyTestDriver , readKeyValuesToList() readRecordsToList(). , , , .


TopologyTestDriver — key-value-:


KeyValueStore<String, Long> store = topologyTestDriver.getKeyValueStore(STORE_NAME);

«black box»-, «white box»- .


TopologyTestDriver , .close() tearDown- . ( Windows, , - KAFKA-6647 2018 , - 2.5.1. Windows C:\tmp\kafka-streams\<APP-ID> .)


TopologyTestDriver . , , /, , . , Kafka Streams API TopologyTestDriver. , , TopologyTestDriver , .


TopologyTestDriver


, , TopologyTestDriver, .


, .


, . . ( ) . , distinct() Java Streams API.


, , distinct() KStream , , , , .


— key-value- Processor API, DSL. ( , , production):


final String STOP_VALUE = "<STOP>";
KStream<String, String> input =
    streamsBuilder.stream(INPUT_TOPIC_WRONG, Consumed.with(Serdes.String(), Serdes.String()));

input.groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
  .reduce((first, second) -> STOP_VALUE)
  .toStream()
  .filter((key, value) -> !STOP_VALUE.equals(value))
  .to(OUTPUT_TOPIC_WRONG, Produced.with(Serdes.String(), Serdes.String()));

— «» : , , .


( ) , reduce STOP_VALUE. , , , . , , Java Streams API . Kafka Streams API — Java Streams API, ?


:


inputTopic.pipeKeyValueList(Arrays.asList(
     KeyValue.pair("A", "A"),
     KeyValue.pair("B", "B"),
     KeyValue.pair("B", "B"),
     KeyValue.pair("A", "A"),
     KeyValue.pair("C", "C")
));
List<String> expected = Arrays.asList("A", "B", "C");
List<String> actual = outputTopic.readValuesToList();
assertEquals(expected, actual);

! ?!


, , ( ) . — , , , .


reduce (eventually) , . , ( STOP_VALUE) «» . , STOP_VALUE .


TopologyTestDriver-, .


Kafka Streams developer.confluent.io.


( , ) TopologyTestDriver, , , Kafka.


EmbeddedKafka TestContainers


«» : EmbeddedKafka TestContainers.


EmbeddedKafka, , Kafka- Java-, .


TestContainers — Java-, , Docker-.


C . , Kafka- (embedded ), . Spring Boot Kafka spring.kafka.bootstrap-servers. , Kafka- .


, EmbeddedKafka TestContainers . , EmbeddedKafka ( Scala) . TestContainers , « »: TestContainers Kafka, , . TestContainers . EmbeddedKafka , , ( ).


GitHub , .


« Spring» Kafka- :


@SpringBootTest
@EmbeddedKafka(bootstrapServersProperty = "spring.kafka.bootstrap-servers")
@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_CLASS)
public class TestTopologyKafkaEmbedded {
   /*     KafkaStreams
           EmbeddedKafka*/

!


, , — Kafka , . . , , , , , . , . , .


, Kafka-:


try (Consumer<String, String> consumer = 
          configureConsumer(bootstrapServers, outputTopicName);
      Producer<String, String> producer = 
          configureProducer(bootstrapServers)) {
   producer.send(new ProducerRecord<>(inputTopicName, "A", "A"));
   producer.send(new ProducerRecord<>(inputTopicName, "B", "B"));
   producer.send(new ProducerRecord<>(inputTopicName, "B", "B"));
   producer.send(new ProducerRecord<>(inputTopicName, "A", "A"));
   producer.send(new ProducerRecord<>(inputTopicName, "C", "C"));
   producer.flush();
   List<String> actual = new ArrayList<>();

   while (true) {
       ConsumerRecords<String, String> records = KafkaTestUtils.getRecords(consumer, 5000);
       if (records.isEmpty()) break;
          for (ConsumerRecord<String, String> rec : records) {
             actual.add(rec.value());
          }
       }
       List<String> expected = Arrays.asList("A", "B", "C");
       Collections.sort(actual);
       assertEquals(expected, actual);
   }

, : TopologyTestDriver, , , EmbeddedKafka TestContainers, (- ), :



, TopologyTestDriver .



  • TopologyTestDriver — KafkaStreams. , , «» TopologyTestDriver , , , .
  • EmbeddedKafka TestContainers , . flakiness.

— Kafka Streams- .


.


Kafka , Heisenbug 2020 Piter. «Apache Kafka: » HolyJS 2020 Piter.

All Articles