Acara pemrosesan ulang yang diterima dari Kafka


Halo Habr.


Baru-baru ini, saya berbagi pengalaman tentang parameter mana dalam tim yang paling sering kami gunakan untuk Kafka Produser dan Konsumen untuk lebih dekat dengan pengiriman yang dijamin. Dalam artikel ini saya ingin memberi tahu bagaimana kami mengatur pemrosesan kembali suatu peristiwa yang diterima dari Kafka sebagai akibat dari tidak tersedianya sementara suatu sistem eksternal.


. -, , Docker-, Kubernetes OpenShift, enterprise- . - , — -.


Kafka


IBM MQ . - dead-letter-queue (DLQ) . DLQ , IBM MQ.


(, ResourceAccessException HTTP- MongoTimeoutException MongoDb), . , , , - . , . , , DLQ .



, . , Consumer’, .



, . , , -, .


, Kafka- , - . retention.ms -, . .


, spring spring-kafka . Spring-kafka spring-retry, BackOffPolicy. , . , - , . , .


spring-kafka ContainerAwareErrorHandler, SeekToCurrentErrorHandler, , offset , . spring-kafka 2.3 BackOffPolicy.


, DLQ - . 2019 , , DLQ ( ). SeekToCurrentErrorHandler. , offset, .



, SeekToCurrentErrorHandler, .


. , . , . DLQ .


, .


Consumer’a


spring-kafka Consumer’a :


public void pauseListenerContainer(MessageListenerContainer listenerContainer, 
                                   Instant retryAt) {
        if (nonNull(retryAt) && listenerContainer.isRunning()) {
            listenerContainer.stop();
            taskScheduler.schedule(() -> listenerContainer.start(), retryAt);
            return;
        }
        // to DLQ
    }

retryAt — , MessageListenerContainer, . , TaskScheduler, spring.


retryAt :


  1. .
  2. . , JSON.
  3. JSON- , . , retryAt.
  4. , retryAt null DLQ .

, , . , . spring-retry , .


, - . , circuit breaker.


1, - , . , , .



, (Retryer), DESTINATION RETRY_AT:



public <K, V> void retry(ConsumerRecord<K, V> record, String retryToTopic, 
                         Instant retryAt, String counter, String groupId, Exception e) {
        Headers headers = ofNullable(record.headers()).orElse(new RecordHeaders());
        List<Header> arrayOfHeaders = 
            new ArrayList<>(Arrays.asList(headers.toArray()));
        updateHeader(arrayOfHeaders, GROUP_ID, groupId::getBytes);
        updateHeader(arrayOfHeaders, DESTINATION, retryToTopic::getBytes);
        updateHeader(arrayOfHeaders, ORIGINAL_PARTITION, 
                     () -> Integer.toString(record.partition()).getBytes());
        if (nonNull(retryAt)) {
            updateHeader(arrayOfHeaders, COUNTER, counter::getBytes);
            updateHeader(arrayOfHeaders, SEND_TO, "retry"::getBytes);
            updateHeader(arrayOfHeaders, RETRY_AT, retryAt.toString()::getBytes);
        } else {
            updateHeader(arrayOfHeaders, REASON, 
                         ExceptionUtils.getStackTrace(e)::getBytes);
            updateHeader(arrayOfHeaders, SEND_TO, "backout"::getBytes);
        }
        ProducerRecord<K, V> messageToSend =
            new ProducerRecord<>(retryTopic, null, null, record.key(), record.value(), arrayOfHeaders);
        kafkaTemplate.send(messageToSend);
    }

, . RETRY_AT , Consumer’a. DESTINATION RETRY_AT :


  • GROUP_ID, .
  • ORIGINAL_PARTITION, Consumer . null, partition record.key() .
  • COUNTER, .
  • SEND_TO — , , RETRY_AT DLQ.
  • REASON — , .

Retryer PostgreSQL. , RETRY_AT ORIGINAL_PARTITION DESTINATION record.key().


PostgreSQL. UI, Retryer REST API. DLQ, , .


, , Retryer, Retryer’ DESTINATION . , , , DLQ UI .


, consumer-, . Retryer . , . retry- Consumer - .



circuit breaker’a, spring-cloud-netflix spring cloud circuit breaker, . , bulkhead , . , spring-cloud-netflix thread pool .



, - .


, , Kafka-, ! retry-, Kafka- Retryer. . Retryer , Consumer.

Source: https://habr.com/ru/post/undefined/


All Articles