Wiederaufbereitungsereignisse von Kafka erhalten


Hallo Habr.


Kürzlich habe ich meine Erfahrungen darüber geteilt, welche Parameter im Team wir am häufigsten für Kafka Producer und Consumer verwenden, um der garantierten Lieferung näher zu kommen. In diesem Artikel möchte ich erläutern, wie wir die Wiederaufbereitung eines von Kafka erhaltenen Ereignisses aufgrund der vorübergehenden Nichtverfügbarkeit eines externen Systems organisiert haben.


. -, , Docker-, Kubernetes OpenShift, enterprise- . - , — -.


Kafka


IBM MQ . - dead-letter-queue (DLQ) . DLQ , IBM MQ.


(, ResourceAccessException HTTP- MongoTimeoutException MongoDb), . , , , - . , . , , DLQ .



, . , Consumer’, .



, . , , -, .


, Kafka- , - . retention.ms -, . .


, spring spring-kafka . Spring-kafka spring-retry, BackOffPolicy. , . , - , . , .


spring-kafka ContainerAwareErrorHandler, SeekToCurrentErrorHandler, , offset , . spring-kafka 2.3 BackOffPolicy.


, DLQ - . 2019 , , DLQ ( ). SeekToCurrentErrorHandler. , offset, .



, SeekToCurrentErrorHandler, .


. , . , . DLQ .


, .


Consumer’a


spring-kafka Consumer’a :


public void pauseListenerContainer(MessageListenerContainer listenerContainer, 
                                   Instant retryAt) {
        if (nonNull(retryAt) && listenerContainer.isRunning()) {
            listenerContainer.stop();
            taskScheduler.schedule(() -> listenerContainer.start(), retryAt);
            return;
        }
        // to DLQ
    }

retryAt — , MessageListenerContainer, . , TaskScheduler, spring.


retryAt :


  1. .
  2. . , JSON.
  3. JSON- , . , retryAt.
  4. , retryAt null DLQ .

, , . , . spring-retry , .


, - . , circuit breaker.


1, - , . , , .



, (Retryer), DESTINATION RETRY_AT:



public <K, V> void retry(ConsumerRecord<K, V> record, String retryToTopic, 
                         Instant retryAt, String counter, String groupId, Exception e) {
        Headers headers = ofNullable(record.headers()).orElse(new RecordHeaders());
        List<Header> arrayOfHeaders = 
            new ArrayList<>(Arrays.asList(headers.toArray()));
        updateHeader(arrayOfHeaders, GROUP_ID, groupId::getBytes);
        updateHeader(arrayOfHeaders, DESTINATION, retryToTopic::getBytes);
        updateHeader(arrayOfHeaders, ORIGINAL_PARTITION, 
                     () -> Integer.toString(record.partition()).getBytes());
        if (nonNull(retryAt)) {
            updateHeader(arrayOfHeaders, COUNTER, counter::getBytes);
            updateHeader(arrayOfHeaders, SEND_TO, "retry"::getBytes);
            updateHeader(arrayOfHeaders, RETRY_AT, retryAt.toString()::getBytes);
        } else {
            updateHeader(arrayOfHeaders, REASON, 
                         ExceptionUtils.getStackTrace(e)::getBytes);
            updateHeader(arrayOfHeaders, SEND_TO, "backout"::getBytes);
        }
        ProducerRecord<K, V> messageToSend =
            new ProducerRecord<>(retryTopic, null, null, record.key(), record.value(), arrayOfHeaders);
        kafkaTemplate.send(messageToSend);
    }

, . RETRY_AT , Consumer’a. DESTINATION RETRY_AT :


  • GROUP_ID, .
  • ORIGINAL_PARTITION, Consumer . null, partition record.key() .
  • COUNTER, .
  • SEND_TO — , , RETRY_AT DLQ.
  • REASON — , .

Retryer PostgreSQL. , RETRY_AT ORIGINAL_PARTITION DESTINATION record.key().


PostgreSQL. UI, Retryer REST API. DLQ, , .


, , Retryer, Retryer’ DESTINATION . , , , DLQ UI .


, consumer-, . Retryer . , . retry- Consumer - .



circuit breaker’a, spring-cloud-netflix spring cloud circuit breaker, . , bulkhead , . , spring-cloud-netflix thread pool .



, - .


, , Kafka-, ! retry-, Kafka- Retryer. . Retryer , Consumer.

Source: https://habr.com/ru/post/undefined/


All Articles