Kafka, , , , , . — ! — , .
TopologyTestDriver
Kafka Streams TopologyTestDriver
. API 2.4 . :
TopologyTestDriver
( , ) Properties
:
TopologyTestDriver topologyTestDriver =
new TopologyTestDriver(topology, config.asProperties());
, , , Kafka-. «» , «» — :
TestInputTopic<String, String> inputTopic =
topologyTestDriver.createInputTopic(
INPUT_TOPIC, new StringSerializer(), new StringSerializer());
TestOutputTopic<String, String> outputTopic =
topologyTestDriver.createOutputTopic(
OUTPUT_TOPIC, new StringDeserializer(), new StringDeserializer());
TestInputTopic
TestOutputTopic
, / , .
/ , , , Record
-. time windows, .
TopologyTestDriver
, readKeyValuesToList()
readRecordsToList()
. , , , .
TopologyTestDriver
— key-value-:
KeyValueStore<String, Long> store = topologyTestDriver.getKeyValueStore(STORE_NAME);
«black box»-, «white box»- .
TopologyTestDriver
, .close()
tearDown
- . ( Windows, , - KAFKA-6647 2018 , - 2.5.1. Windows C:\tmp\kafka-streams\<APP-ID>
.)
TopologyTestDriver
. , , /, , . , Kafka Streams API TopologyTestDriver
. , , TopologyTestDriver
, .
TopologyTestDriver
, , TopologyTestDriver
, .
, .
, . . ( ) . , distinct()
Java Streams API.
, , distinct()
KStream
, , , , .
— key-value- Processor API, DSL. ( , , production):
final String STOP_VALUE = "<STOP>";
KStream<String, String> input =
streamsBuilder.stream(INPUT_TOPIC_WRONG, Consumed.with(Serdes.String(), Serdes.String()));
input.groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
.reduce((first, second) -> STOP_VALUE)
.toStream()
.filter((key, value) -> !STOP_VALUE.equals(value))
.to(OUTPUT_TOPIC_WRONG, Produced.with(Serdes.String(), Serdes.String()));
— «» : , , .
( ) , reduce
STOP_VALUE
. , , , . , , Java Streams API . Kafka Streams API — Java Streams API, ?
:
inputTopic.pipeKeyValueList(Arrays.asList(
KeyValue.pair("A", "A"),
KeyValue.pair("B", "B"),
KeyValue.pair("B", "B"),
KeyValue.pair("A", "A"),
KeyValue.pair("C", "C")
));
List<String> expected = Arrays.asList("A", "B", "C");
List<String> actual = outputTopic.readValuesToList();
assertEquals(expected, actual);
! ?!
, , ( ) . — , , , .
reduce
(eventually) , . , ( STOP_VALUE
) «» . , STOP_VALUE
.
TopologyTestDriver
-, .
Kafka Streams developer.confluent.io.
( , ) TopologyTestDriver
, , , Kafka.
EmbeddedKafka TestContainers
«» : EmbeddedKafka TestContainers.
EmbeddedKafka, , Kafka- Java-, .
TestContainers — Java-, , Docker-.
C . , Kafka- (embedded ), . Spring Boot Kafka spring.kafka.bootstrap-servers
. , Kafka- .
, EmbeddedKafka TestContainers . , EmbeddedKafka ( Scala) . TestContainers , « »: TestContainers Kafka, , . TestContainers . EmbeddedKafka , , ( ).
GitHub , .
« Spring» Kafka- :
@SpringBootTest
@EmbeddedKafka(bootstrapServersProperty = "spring.kafka.bootstrap-servers")
@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_CLASS)
public class TestTopologyKafkaEmbedded {
!
, , — Kafka , . . , , , , , . , . , .
, Kafka-:
try (Consumer<String, String> consumer =
configureConsumer(bootstrapServers, outputTopicName);
Producer<String, String> producer =
configureProducer(bootstrapServers)) {
producer.send(new ProducerRecord<>(inputTopicName, "A", "A"));
producer.send(new ProducerRecord<>(inputTopicName, "B", "B"));
producer.send(new ProducerRecord<>(inputTopicName, "B", "B"));
producer.send(new ProducerRecord<>(inputTopicName, "A", "A"));
producer.send(new ProducerRecord<>(inputTopicName, "C", "C"));
producer.flush();
List<String> actual = new ArrayList<>();
while (true) {
ConsumerRecords<String, String> records = KafkaTestUtils.getRecords(consumer, 5000);
if (records.isEmpty()) break;
for (ConsumerRecord<String, String> rec : records) {
actual.add(rec.value());
}
}
List<String> expected = Arrays.asList("A", "B", "C");
Collections.sort(actual);
assertEquals(expected, actual);
}
, : TopologyTestDriver
, , , EmbeddedKafka TestContainers, (- ), :

, TopologyTestDriver
.
TopologyTestDriver
— KafkaStreams. , , «» TopologyTestDriver
, , , .- EmbeddedKafka TestContainers , . flakiness.
— Kafka Streams- .
.
Kafka , Heisenbug 2020 Piter. «Apache Kafka: » HolyJS 2020 Piter.