Kafka, , , , , . — ! — , .
TopologyTestDriver
Kafka Streams TopologyTestDriver. API 2.4 . :
TopologyTestDriver ( , ) Properties:
TopologyTestDriver topologyTestDriver = 
       new TopologyTestDriver(topology, config.asProperties());
, , , Kafka-. «» , «» — :
TestInputTopic<String, String> inputTopic =
   topologyTestDriver.createInputTopic(
         INPUT_TOPIC, new StringSerializer(), new StringSerializer());
TestOutputTopic<String, String> outputTopic =
   topologyTestDriver.createOutputTopic(
         OUTPUT_TOPIC, new StringDeserializer(), new StringDeserializer());
TestInputTopic TestOutputTopic , / , .
/ , , , Record-. time windows, .
TopologyTestDriver , readKeyValuesToList() readRecordsToList(). , , , .
TopologyTestDriver — key-value-:
KeyValueStore<String, Long> store = topologyTestDriver.getKeyValueStore(STORE_NAME);
«black box»-, «white box»- .
TopologyTestDriver , .close() tearDown- . ( Windows, , - KAFKA-6647 2018 , - 2.5.1. Windows C:\tmp\kafka-streams\<APP-ID> .)
TopologyTestDriver . , , /, , . , Kafka Streams API TopologyTestDriver. , , TopologyTestDriver , .
TopologyTestDriver
, , TopologyTestDriver, .
, .
, . . ( ) . , distinct() Java Streams API.
, , distinct() KStream , , , , .
— key-value- Processor API, DSL. ( , , production):
final String STOP_VALUE = "<STOP>";
KStream<String, String> input =
    streamsBuilder.stream(INPUT_TOPIC_WRONG, Consumed.with(Serdes.String(), Serdes.String()));
input.groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
  .reduce((first, second) -> STOP_VALUE)
  .toStream()
  .filter((key, value) -> !STOP_VALUE.equals(value))
  .to(OUTPUT_TOPIC_WRONG, Produced.with(Serdes.String(), Serdes.String()));
— «» : , , .
( ) , reduce STOP_VALUE. , , , . , , Java Streams API . Kafka Streams API — Java Streams API, ?
:
inputTopic.pipeKeyValueList(Arrays.asList(
     KeyValue.pair("A", "A"),
     KeyValue.pair("B", "B"),
     KeyValue.pair("B", "B"),
     KeyValue.pair("A", "A"),
     KeyValue.pair("C", "C")
));
List<String> expected = Arrays.asList("A", "B", "C");
List<String> actual = outputTopic.readValuesToList();
assertEquals(expected, actual);
! ?!
, , ( ) . — , , , .
reduce (eventually) , . , ( STOP_VALUE) «» . , STOP_VALUE .
TopologyTestDriver-, .
Kafka Streams developer.confluent.io.
( , ) TopologyTestDriver, , , Kafka.
EmbeddedKafka TestContainers
«» : EmbeddedKafka TestContainers.
EmbeddedKafka, , Kafka- Java-, .
TestContainers — Java-, , Docker-.
C . , Kafka- (embedded ), . Spring Boot Kafka spring.kafka.bootstrap-servers. , Kafka- .
, EmbeddedKafka TestContainers . , EmbeddedKafka ( Scala) . TestContainers , « »: TestContainers Kafka, , . TestContainers . EmbeddedKafka , , ( ).
GitHub , .
« Spring» Kafka- :
@SpringBootTest
@EmbeddedKafka(bootstrapServersProperty = "spring.kafka.bootstrap-servers")
@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_CLASS)
public class TestTopologyKafkaEmbedded {
   
!
, , — Kafka , . . , , , , , . , . , .
, Kafka-:
try (Consumer<String, String> consumer = 
          configureConsumer(bootstrapServers, outputTopicName);
      Producer<String, String> producer = 
          configureProducer(bootstrapServers)) {
   producer.send(new ProducerRecord<>(inputTopicName, "A", "A"));
   producer.send(new ProducerRecord<>(inputTopicName, "B", "B"));
   producer.send(new ProducerRecord<>(inputTopicName, "B", "B"));
   producer.send(new ProducerRecord<>(inputTopicName, "A", "A"));
   producer.send(new ProducerRecord<>(inputTopicName, "C", "C"));
   producer.flush();
   List<String> actual = new ArrayList<>();
   while (true) {
       ConsumerRecords<String, String> records = KafkaTestUtils.getRecords(consumer, 5000);
       if (records.isEmpty()) break;
          for (ConsumerRecord<String, String> rec : records) {
             actual.add(rec.value());
          }
       }
       List<String> expected = Arrays.asList("A", "B", "C");
       Collections.sort(actual);
       assertEquals(expected, actual);
   }
, : TopologyTestDriver, , , EmbeddedKafka TestContainers, (- ), :

, TopologyTestDriver .
- TopologyTestDriver— KafkaStreams. , , «»- TopologyTestDriver, , , .
- EmbeddedKafka TestContainers , . flakiness.
— Kafka Streams- .
.
Kafka , Heisenbug 2020 Piter. «Apache Kafka: » HolyJS 2020 Piter.