Retry consuming messages while processing concurrently using reactor kafka

I have a project where I consume messages from an incoming topic process it and send them to outgoing topic. Now consider that the outgoing topic is down because of some infrastructure issue resulting in error while sending the message. In this case, I am committing till the last successful message (processed and send to outgoing topic successfully) and retrying consuming from where send was failing.

This can be easily achieved in “non-reactive” kafka using ack.nack(index, sleep)and the code looks something like:

@KafkaListener(
        autoStartup = "false",
        topics = "incoming-topic",
        containerFactory = "myListenerContainerFactory",
        groupId = "test-consumer"
)
public void consume(List<ConsumerRecord<String, String>> consumerRecords, Acknowledgment ack) {
    for (int i = 0; i < consumerRecords.size(); i++) {
        ConsumerRecord<String, String> record = consumerRecords.get(i);
        try {
            JsonObject recordNode = gson.fromJson(record.value(), JsonObject.class);
            CompletableFuture<String> future1 = transformationService.transform1(recordNode);
            CompletableFuture<String> future2 = transformationService.transform2(recordNode);
            CompletableFuture.allOf(future1, future2).join();

            outgoingTopicProducer.sendMessage(record, recordNode); // method that calls producer
            ack.acknowledge();
        } catch (ApiException e) { // ApiException thrown from producer
            logger.error("Kafka exception occurred while sending message, blocking queue... ");
            ack.nack(i, 2000); // this enables to read same message again after 2 secs
        } catch (Exception e) { // ignore any other exception like JsonProcessingException
            logger.error("error occurred while processing message. Partition: {}, Offset: {}, Value: {}",
                    record.partition(),
                    record.offset(),
                    record.value());
            logger.error("exception: " + e.getMessage());
        }
    }
}

Now, I want to make this code reactive in order to get better performance and handle backpressure. I also want to concurrently process each partition in parallel. Going through the reactor-kafka documentation, I came up with the following code:

private final Scheduler scheduler = Schedulers.newBoundedElastic(60, Integer.MAX_VALUE, "sample", 60, true);
private final KafkaSender<String, String> sender = KafkaSender.create(SenderOptions.create(getProducerConfig()));
@EventListener(ApplicationReadyEvent.class)
public void receiverFlux() {
    KafkaReceiver.create(getReceiverOptions())
        .receive()
        .groupBy(m -> m.receiverOffset().topicPartition())
        .flatMap(partitionFlux ->
            partitionFlux.publishOn(scheduler)
                    .<SenderRecord<String, String, ReceiverOffset>>handle((record, sink) -> {
                        JsonObject recordNode = gson.fromJson(record.value(), JsonObject.class);

                        CompletableFuture<String> future1 = transformationService.transform1(recordNode);
                        Mono<String> mono1 = Mono.fromFuture(future1)
                                .subscribeOn(Schedulers.parallel())
                                .doFinally(sig -> {
                                    if (sig == SignalType.CANCEL) {
                                        future1.cancel(true);
                                    }
                                });

                        CompletableFuture<String> future2 = transformationService.transform2(recordNode);
                        Mono<String> mono2 = Mono.fromFuture(future2)
                                .subscribeOn(Schedulers.parallel())
                                .doFinally(sig -> {
                                    if (sig == SignalType.CANCEL) {
                                        future2.cancel(true);
                                    }
                                });

                        try {
                            Mono.when(mono1, mono2).block();
                            sink.next(SenderRecord.create(new ProducerRecord<>("outgoing-topic", record.partition(), record.timestamp(), record.key(), recordNode.toString(), record.headers()),
                                    record.receiverOffset()));
                        } catch (Exception e) {
                            sink.error(e);
                        }
                    })
                    .onErrorResume(e -> {
                        logger.error("error occurred wile processing record");
                        logger.error("exception: " + e.getMessage());
                        return Mono.empty();
                    })
                    .as(sender::send)
                    .doOnNext(record -> {
                        logger.info("ack offset");
                        record.correlationMetadata().acknowledge();
                    })
                    .doOnError(e -> {
                        logger.error("error occurred while sending message");
                        logger.error("exception: " + e.getMessage());
                    })
                    .retryWhen(Retry.backoff(10, Duration.ofSeconds(2)))

        ).onErrorResume(e -> {
            logger.error("Restart consumer ....");
            return Mono.empty();
        })
        .repeat()
        .subscribe();
}

And to test this incorrect, I changed the outgoing-topic name to topic and got the following logs:

{"@message":"request(1)"}
{"@message":"[Producer clientId=clientId] Error while fetching metadata with correlation id 1 : {incorrect-topic=TOPIC_AUTHORIZATION_FAILED}"}
{"@message":"[Producer clientId=clientId] Topic authorization failed for topics [incorrect-topic]"}
{"@message":"Sender failed","@fields":{"level":"ERR","throwable":{"message":"Not authorized to access topics: [incorrect-topic]"}}}
{"@message":"error occurred while sending message"}
{"@message":"exception: Not authorized to access topics: [incorrect-topic]"}
{"@message":"Scheduler worker in group main failed with an uncaught exception","@fields{"level":"ERR","throwable":{"stackTrace":[{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"runAsync","fileName":"FluxPublishOn.java","lineNumber":1004,"nativeMethod":false,"className":"reactor.core.publisher.FluxPublishOn$PublishOnConditionalSubscriber"},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"run","fileName":"FluxPublishOn.java","lineNumber":1067,"nativeMethod":false,"className":"reactor.core.publisher.FluxPublishOn$PublishOnConditionalSubscriber"},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"call","fileName":"WorkerTask.java","lineNumber":84,"nativeMethod":false,"className":"reactor.core.scheduler.WorkerTask"},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"call","fileName":"WorkerTask.java","lineNumber":37,"nativeMethod":false,"className":"reactor.core.scheduler.WorkerTask"},{"classLoaderName":null,"moduleName":"java.base","moduleVersion":"15","methodName":"run","fileName":"FutureTask.java","lineNumber":264,"nativeMethod":false,"className":"java.util.concurrent.FutureTask"},{"classLoaderName":null,"moduleName":"java.base","moduleVersion":"15","methodName":"run","fileName":"ScheduledThreadPoolExecutor.java","lineNumber":304,"nativeMethod":false,"className":"java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask"},{"classLoaderName":null,"moduleName":"java.base","moduleVersion":"15","methodName":"runWorker","fileName":"ThreadPoolExecutor.java","lineNumber":1130,"nativeMethod":false,"className":"java.util.concurrent.ThreadPoolExecutor"},{"classLoaderName":null,"moduleName":"java.base","moduleVersion":"15","methodName":"run","fileName":"ThreadPoolExecutor.java","lineNumber":630,"nativeMethod":false,"className":"java.util.concurrent.ThreadPoolExecutor$Worker"},{"classLoaderName":null,"moduleName":"java.base","moduleVersion":"15","methodName":"run","fileName":"Thread.java","lineNumber":832,"nativeMethod":false,"className":"java.lang.Thread"}],"message":"Cannot invoke "java.util.Queue.isEmpty()" because "q" is null","suppressed":[],"localizedMessage":"Cannot invoke "java.util.Queue.isEmpty()" because "q" is null"}}}

So, the observation is that this code is not retrying to send the message in case of failure and when new message comes, it reads the new message, dropping the last failed message resulting in a loss of that message. Priority is that I do not want to lose any message.

Please help me achieve these functionalities. Any other suggestion that can help improve the performance are welcome.

Leave a Comment