WebFlux reactor kafka retry mechanism is not invoking the process method

May I know what’s wrong in my code as the retry mechanism is not working as expected. In my understanding processRecord method should be invoked with every retry.

@EventListener(ApplicationReadyEvent.class)
    public void processEvent() {
        log.info("processEvent:: started..");

        Scheduler scheduler = Schedulers.newBoundedElastic(threadPoolConfig.getThreadCap(),
                threadPoolConfig.getQueuedTaskCap(), threadPoolConfig.getThreadPrefix(), threadPoolConfig.getTtlSeconds());

        Flux<ReceiverRecord<String, String>> receiverRecordFlux = Flux.defer(requestReactiveKafkaConsumerTemplate::receive);
        receiverRecordFlux.groupBy(m -> m.receiverOffset().topicPartition())
                .doOnNext(partitionFlux -> log.info("processEvent:: topicPartition {}", partitionFlux.key()))
                .flatMap(partitionFlux -> partitionFlux.subscribeOn(scheduler)
                        .doOnNext(r -> log.info("processEvent:: Record received from offset {} from topicPartition {} with message key {}", r.receiverOffset().topicPartition(), r.key(), r.offset()))
                        .flatMap(this::processRecord)
                        .doOnNext(receiverRecordInfo -> log.info("processEvent:: Record processed from offset {} from topicPartition {} with message key {}", receiverRecordInfo.receiverOffset().offset(), receiverRecordInfo.receiverOffset().topicPartition()))
                        .retryWhen(Retry.backoff(3, Duration.ofMillis(200))
                                .jitter(0d)
                                .doAfterRetry(retrySignal -> log.error("Retried {}", retrySignal.totalRetries()))
                                .onRetryExhaustedThrow((retryBackoffSpec, retrySignal) -> new KafkaRetryExhaustException())
                        )
                        .doOnError(KafkaRetryExhaustException.class, (msg) -> {
                            log.error("exception... ", msg);
                        })
                        .onErrorResume(throwable -> Mono.empty())
                )
                .subscribe(
                        key -> log.info("Successfully consumed messages, key {}", key),
                        error -> log.error("Error while consuming messages ", error));
    }

logs

*2022-06-06 13:50:33,867 INFO [reactive-kafka-consumergroupId-1] reactor.util.Loggers$Slf4JLogger: processEvent:: topicPartition test-topic-0 2022-06-06 13:50:33,879 INFO [CasprConsumer-1] reactor.util.Loggers$Slf4JLogger: processEvent:: Record received from offset test-topic-0 from topicPartition 937 with message key 45 2022-06-06 13:50:33,883 INFO [CasprConsumer-1] reactor.util.Loggers$Slf4JLogger: processRecord:: processing started 2022-06-06 13:50:33,918 INFO [CasprConsumer-1] reactor.util.Loggers$Slf4JLogger: processRecord:: is httpRequest null? false 2022-06-06 13:50:34,180 ERROR [reactor-http-nio-3] reactor.util.Loggers$Slf4JLogger: processRecord:: exception.. org.springframework.web.reactive.function.client.WebClientResponseException$NotFound: 404 Not Found from POST http://localhost:8080/caps-app/capservices/fulfillment /accesspoints/v3/slots at org.springframework.web.reactive.function.client.WebClientResponseException.create(WebClientResponseException.java:202) Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: Error has been observed at the following site( s):
__checkpoint ⇢ 404 from POST http://localhost:8080/caps-app/capservices/fulfillment/accesspoints/v3/slots [DefaultWebClient]
Original Stack Trace: at org.springframework.web.reactive.function.client.WebClientResponseException.create(WebClientResponseException.java:202) at org.springframework.web.reactive.function.client.DefaultClientResponse.lambda$createException$1Reponse. :207) at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:106) at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79) at reactor.mpluxFcore.publisher$Error. DefaultIfEmptySubscriber.onNext(FluxDefaultIfEmpty.java:101) at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129) at reactor.core.publisher.FluxContextWrite$ExriteWriteFtextWNext(FluxMapFuseable.java:129) at reactor.core.publisher.FluxContextWrite$ExriteWritetextFonWlux:java. .core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onNext(FluxMapFuseable.java:299) at reactor.core.publisher.FluxFilterFuseable$FilterFuseableConditionalSubscriber.onNext(FluxFilterFuseable.java:299) at reactor.core.publisher.FluxFilterFuseable$FilterFuseableConditionalSubscriber.onNext(FluxFilterFuseable.java). erators$MonoSubscriber.complete(Operators.java:1816) at reactor.core.publisher.MonoCollect$CollectSubscriber.onComplete(MonoCollect.java:159) at reactor.core.publisher.FluxMap$MapSubscriber.onCompletejava:144Map. at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:260) at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:144) at reactor.netty.channel.FluxReceive.onInboundReceive(FluxReceive.onInboundComplete(FluxReceive.onInboundComplete) .java:400) at reactor.netty.channel.ChannelOperations.onInboundComplete(ChannelOperations.java:419) at reactor.netty.channel.ChannelOperations.terminate(ChannelOperations.java:473) at reactor.netty.http.client.HttpClientOperations. onInboundNext(HttpClientOperations.java:703) at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:93) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannel.texthanneljaHandlerConn.37) at io.netty.channel. .invokeChannelRead(AbstractCh annelHandlerContext.java:365) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMestractHandlerHandlerContext.java:357) at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMestractHandlerContext.java:357) at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMestractHandlerHandlerHnetcoler10) at io.netty.handler. .invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.AbstractChannelHandlerChannelRead atchannelHandlerChannel7: CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436) at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.DuplexHandler.java:436) at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.DuplexHandler.java:327)Message. at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251) at io.netty.channel.Abstra ctChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.channelfireAbstractChannelHandlerContextHannelRead35(AbstractChannelHandlerContext.java:365) at io.netty.channel.channelfireChannelChannelReadChan.35 .DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerChanlerContext36 netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) at io.netty.netoptyNiochannelEventy. java:722) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658) at io.netty.channel.nio.NioE ventLoop.processSelectedKeys(NioEventLoop.java:584) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496) at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEvent.java:Exe99) io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) at java.base/java.lang.Thread. (Thread.java:833) 2022-06-06 13:50:34,391 ERROR [parallel-2] reactor.util.Loggers$Slf4JLogger: Retried 0 2022-06-06 13:50:34,797 ERROR [parallel-3] reactor.util.Loggers$Slf4JLogger: Retried 1 2022-06-06 13:50:35,611 ERROR [parallel-4] reactor.util.Loggers$Slf4JLogger: Retried 2 2022-06-06 13:50:35,613 ERROR [CasprConsumer-1] reactor.util.Loggers$Slf4JLogger: exception… com.walmart.caspr.exception.KafkaRetryExhaustException: null at com.walmart.caspr.service.ReactiveConsumerService.lambda$processEvent$5(ReactiveConsumerService.java:71) at reactor.util. retry.RetryBackoffSpec.lambda$generateCompanion$4(RetryBackoffSpec.java:557) at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.drain(FluxConcatMap.java:376) at reactor.core.publisher.FluxMaluxConjapImexmedia$Contcat :251) at reactor.core.publisher.EmitterProcessor.drain(EmitterProcessor.java:491) at reactor.core.publisher.EmitterProcessor.tryEmitNext(EmitterProcessor.java:299) at reactor.core.publisher.SinkManySerializedSerialized.tryEmitkNext java:100) at reactor.core.publisher.InternalManySink.emitNext(InternalManySink.java:27) at reactor.core.publisher.FluxRetryWhen$RetryWhenMainSubscriber.onError(FluxRetryWhen.java:190) at reactor.lux.publisherFeeker$Feeker. .onError(FluxPeek.java:222) at reactor.core.p ublisher.FluxFlatMap$FlatMapMain.checkTerminated(FluxFlatMap.java:842) at reactor.core.publisher.FluxFlatMap$FlatMapMain.drainLoop(FluxFlatMap.java:608) at reactor.core.publisher.FluxFlatMapFlatMap$Flux.va 588) at reactor.core.publisher.FluxFlatMap$FlatMapMain.onError(FluxFlatMap.java:451) at reactor.core.publisher.FluxPeek$PeekSubscriber.onError(FluxPeek.java:222) at reactor.core.publisher.FluxSubscribeOn$SubscribeOnSubscriber .onError(FluxSubscribeOn.java:157) at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.subscribe(FluxGroupBy.java:721) at reactor.core.publisher.FluxSubscribeOn$SubscribeOnSubscriber.rune.194).FluxS core.scheduler.WorkerTask.call(WorkerTask.java:84) at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java: 264) at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) at java.base/ java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) at java.base/java.lang.Thread. run(Thread.java:833)

Leave a Comment