Événements de retraitement reçus de Kafka


Bonjour, Habr.


Récemment, j'ai partagé mon expérience sur les paramètres de l'équipe que nous utilisons le plus souvent pour Kafka Producer and Consumer afin de se rapprocher de la livraison garantie. Dans cet article, je veux dire comment nous avons organisé le retraitement d'un événement reçu de Kafka à la suite d'une indisponibilité temporaire d'un système externe.


. -, , Docker-, Kubernetes OpenShift, enterprise- . - , — -.


Kafka


IBM MQ . - dead-letter-queue (DLQ) . DLQ , IBM MQ.


(, ResourceAccessException HTTP- MongoTimeoutException MongoDb), . , , , - . , . , , DLQ .



, . , Consumer’, .



, . , , -, .


, Kafka- , - . retention.ms -, . .


, spring spring-kafka . Spring-kafka spring-retry, BackOffPolicy. , . , - , . , .


spring-kafka ContainerAwareErrorHandler, SeekToCurrentErrorHandler, , offset , . spring-kafka 2.3 BackOffPolicy.


, DLQ - . 2019 , , DLQ ( ). SeekToCurrentErrorHandler. , offset, .



, SeekToCurrentErrorHandler, .


. , . , . DLQ .


, .


Consumer’a


spring-kafka Consumer’a :


public void pauseListenerContainer(MessageListenerContainer listenerContainer, 
                                   Instant retryAt) {
        if (nonNull(retryAt) && listenerContainer.isRunning()) {
            listenerContainer.stop();
            taskScheduler.schedule(() -> listenerContainer.start(), retryAt);
            return;
        }
        // to DLQ
    }

retryAt — , MessageListenerContainer, . , TaskScheduler, spring.


retryAt :


  1. .
  2. . , JSON.
  3. JSON- , . , retryAt.
  4. , retryAt null DLQ .

, , . , . spring-retry , .


, - . , circuit breaker.


1, - , . , , .



, (Retryer), DESTINATION RETRY_AT:



public <K, V> void retry(ConsumerRecord<K, V> record, String retryToTopic, 
                         Instant retryAt, String counter, String groupId, Exception e) {
        Headers headers = ofNullable(record.headers()).orElse(new RecordHeaders());
        List<Header> arrayOfHeaders = 
            new ArrayList<>(Arrays.asList(headers.toArray()));
        updateHeader(arrayOfHeaders, GROUP_ID, groupId::getBytes);
        updateHeader(arrayOfHeaders, DESTINATION, retryToTopic::getBytes);
        updateHeader(arrayOfHeaders, ORIGINAL_PARTITION, 
                     () -> Integer.toString(record.partition()).getBytes());
        if (nonNull(retryAt)) {
            updateHeader(arrayOfHeaders, COUNTER, counter::getBytes);
            updateHeader(arrayOfHeaders, SEND_TO, "retry"::getBytes);
            updateHeader(arrayOfHeaders, RETRY_AT, retryAt.toString()::getBytes);
        } else {
            updateHeader(arrayOfHeaders, REASON, 
                         ExceptionUtils.getStackTrace(e)::getBytes);
            updateHeader(arrayOfHeaders, SEND_TO, "backout"::getBytes);
        }
        ProducerRecord<K, V> messageToSend =
            new ProducerRecord<>(retryTopic, null, null, record.key(), record.value(), arrayOfHeaders);
        kafkaTemplate.send(messageToSend);
    }

, . RETRY_AT , Consumer’a. DESTINATION RETRY_AT :


  • GROUP_ID, .
  • ORIGINAL_PARTITION, Consumer . null, partition record.key() .
  • COUNTER, .
  • SEND_TO — , , RETRY_AT DLQ.
  • REASON — , .

Retryer PostgreSQL. , RETRY_AT ORIGINAL_PARTITION DESTINATION record.key().


PostgreSQL. UI, Retryer REST API. DLQ, , .


, , Retryer, Retryer’ DESTINATION . , , , DLQ UI .


, consumer-, . Retryer . , . retry- Consumer - .



circuit breaker’a, spring-cloud-netflix spring cloud circuit breaker, . , bulkhead , . , spring-cloud-netflix thread pool .



, - .


, , Kafka-, ! retry-, Kafka- Retryer. . Retryer , Consumer.

Source: https://habr.com/ru/post/undefined/


All Articles