
Hello, Habr.
Recently, I shared my experience about which parameters in the team we most often use for Kafka Producer and Consumer in order to get closer to guaranteed delivery. In this article I want to tell how we organized the reprocessing of an event received from Kafka as a result of temporary unavailability of an external system.
. -, , Docker-, Kubernetes OpenShift, enterprise- . - , — -.
Kafka
IBM MQ . - dead-letter-queue (DLQ) . DLQ , IBM MQ.
(, ResourceAccessException HTTP- MongoTimeoutException MongoDb), . , , , - . , . , , DLQ .
, . , Consumer’, .

, . , , -, .
, Kafka- , - . retention.ms -, . .
, spring spring-kafka . Spring-kafka spring-retry, BackOffPolicy. , . , - , . , .
spring-kafka ContainerAwareErrorHandler, SeekToCurrentErrorHandler, , offset , . spring-kafka 2.3 BackOffPolicy.
, DLQ - . 2019 , , DLQ ( ). SeekToCurrentErrorHandler. , offset, .
, SeekToCurrentErrorHandler, .
. , . , . DLQ .
, .
Consumer’a
spring-kafka Consumer’a :
public void pauseListenerContainer(MessageListenerContainer listenerContainer,
Instant retryAt) {
if (nonNull(retryAt) && listenerContainer.isRunning()) {
listenerContainer.stop();
taskScheduler.schedule(() -> listenerContainer.start(), retryAt);
return;
}
}
retryAt — , MessageListenerContainer, . , TaskScheduler, spring.
retryAt :
- .
- . , JSON.
- JSON- , . , retryAt.
- , retryAt null DLQ .
, , . , . spring-retry , .
, - . , circuit breaker.
1, - , . , , .
, (Retryer), DESTINATION RETRY_AT:
public <K, V> void retry(ConsumerRecord<K, V> record, String retryToTopic,
Instant retryAt, String counter, String groupId, Exception e) {
Headers headers = ofNullable(record.headers()).orElse(new RecordHeaders());
List<Header> arrayOfHeaders =
new ArrayList<>(Arrays.asList(headers.toArray()));
updateHeader(arrayOfHeaders, GROUP_ID, groupId::getBytes);
updateHeader(arrayOfHeaders, DESTINATION, retryToTopic::getBytes);
updateHeader(arrayOfHeaders, ORIGINAL_PARTITION,
() -> Integer.toString(record.partition()).getBytes());
if (nonNull(retryAt)) {
updateHeader(arrayOfHeaders, COUNTER, counter::getBytes);
updateHeader(arrayOfHeaders, SEND_TO, "retry"::getBytes);
updateHeader(arrayOfHeaders, RETRY_AT, retryAt.toString()::getBytes);
} else {
updateHeader(arrayOfHeaders, REASON,
ExceptionUtils.getStackTrace(e)::getBytes);
updateHeader(arrayOfHeaders, SEND_TO, "backout"::getBytes);
}
ProducerRecord<K, V> messageToSend =
new ProducerRecord<>(retryTopic, null, null, record.key(), record.value(), arrayOfHeaders);
kafkaTemplate.send(messageToSend);
}
, . RETRY_AT , Consumer’a. DESTINATION RETRY_AT :
- GROUP_ID, .
- ORIGINAL_PARTITION, Consumer . null, partition record.key() .
- COUNTER, .
- SEND_TO — , , RETRY_AT DLQ.
- REASON — , .
Retryer PostgreSQL. , RETRY_AT ORIGINAL_PARTITION DESTINATION record.key().
PostgreSQL. UI, Retryer REST API. DLQ, , .
, , Retryer, Retryer’ DESTINATION . , , , DLQ UI .
, consumer-, . Retryer . , . retry- Consumer - .

circuit breaker’a, spring-cloud-netflix spring cloud circuit breaker, . , bulkhead , . , spring-cloud-netflix thread pool .
, - .
, , Kafka-, ! retry-, Kafka- Retryer. . Retryer , Consumer.