Kafka Fail-Over Using Quarkus Reactive Messaging

Background

Recently, I came across a scenario where, at first glance, we needed an active-active Kafka cluster with bi-directional replication using MirrorMaker2. However, while diving deeper into the nuances of the solution that this particular project demanded, a number of factors showed this wasn’t really the case. For example, a look at how the producers/consumers will handle temporary failure and/or guarantee exactly-once processing downstream added a ton of complexity to the problem.

By taking a step back to the design board, we changed the approach to an active-standby scenario. In this scenario, the active cluster will replicate all its configs, consumer groups, and topics to the standby cluster that will become active in a case of failure. However, this posed a problem: how can I get the minimum possible downtime in my producers without the need to restart them with the new configuration for the new cluster?

In this article, we’ll look at a solution using Quarkus and its reactive messaging capabilities powered by SmallRye Reactive Messaging.

We will not talk about how to set up both Kafka clusters. You can feel free to use any means that’s easier for you. In this example, I’m using Strimzi on top of an OpenShift Cluster.

What Is SmallRye Reactive Messaging?

According to their documentation, “SmallRye Reactive Messaging is a framework for building event-driven, data streaming, and event-sourcing applications.”

That means that we have a lightweight, container-friendly implementation for async development messaging applications. It should also be noted that SmallRye can also talk with AMQP and MQTT messaging systems.

Since we’re using this with Quarkus, I strongly suggest that you read this documentation as we’re going directly into SmallRye configurations, conventions, and libraries.

Configuring the SmallRye Connectors and Channels

SmallRye has the concepts of channels and connectors. Basically, each connector will handle a different technology. In this case, we’re going to use the smallrye-kafka connector. Also, we have inbound and outbound connectors. Since in this article we’re only going to talk about producers, we’re going to use only outbound connectors.

One of the Quarkus advantages is that by simply putting this dependency below, we don’t need to actually explicitly configure the connector. It just assumes that smallrye-kafka is the correct one.

<dependency>
  <groupId>io.quarkus</groupId>
  <artifactId>quarkus-smallrye-reactive-messaging-kafka</artifactId>
</dependency>

As for the configuration, we’ll specify the bootstrap-server route, some producer configures like linger and delivery.timeout.msand also our serializer. The configs for the main channel should look like this:

mp.messaging.outgoing.expense-out.bootstrap.servers=main-cluster-kafka-bootstrap.kafka-main.svc.cluster.local:9092
mp.messaging.outgoing.expense-out.delivery.timeout.ms=5000
mp.messaging.outgoing.expense-out.linger.ms=10
mp.messaging.outgoing.expense-out.request.timeout.ms=4000
mp.messaging.outgoing.expense-out.value.serializer=io.quarkus.kafka.client.serialization.ObjectMapperSerializer
mp.messaging.outgoing.expense-out.topic=expense

We’re also going to configure our failover channel with this config:

mp.messaging.outgoing.expense-out-failover.bootstrap.servers=backup-cluster-kafka-bootstrap.kafka-backup.svc.cluster.local:9092
mp.messaging.outgoing.expense-out-failover.topic=expense
mp.messaging.outgoing.expense-out-failover.value.serializer=io.quarkus.kafka.client.serialization.ObjectMapperSerializer

Here we can see that we’re going to send our messages to a totally different cluster in case of a fail.

Of course, here we’re simplifying things just to show how it can be done because the failure for this specific topic can be a transient and may be an isolated failure.

Using Mutiny!

For our implementation, we’re going to rely on Mutiny! Reactive Stream APIs to handle the asynchronous nature of this implementation. When we send a message with a Kafka producer, the message isn’t sent instantly to the broker: the producer awaits either the linger.ms configuration to expire or for the batch size to fill up with messages, and then sends it to the broker. In case of a failure, it will also retry based on retries config and delivery.timeout.ms. After these fails, the producer expires the message with an error, and that’s when our fail-over code will execute: after the main producer has exhausted all of its resources to try and send the message.

In this code, we’re using a Uni<T> which is a Mutiny! specialized stream that emits only an item or a failure. We can also subscribe callbacks in it. This basically means that we can throw success in case the emitter sends the message correctly, or call a failover method in case of a failure.

Our code will be fairly simple:

    @Inject
	@Channel("expense-out-failover")
	Emitter<Expense> failoverSender;

	@Inject
	@Channel("expense-out")
	MutinyEmitter<Expense> mutinyEmitter;

	private static final Logger LOG = Logger.getLogger(ExpenseSender.class);

	private void sendExpenseFailOver(Expense expense){
		failoverSender.send(Message.of(expense)
				.addMetadata(OutgoingKafkaRecordMetadata.<String> builder()
						.withHeaders(new RecordHeaders().add("channel", "failover".getBytes(StandardCharsets.UTF_8))).build())
				);
	}

	private void sendExpenseWithUni(Expense expense){
		mutinyEmitter.send(expense)
		.map(x -> expense)
		.subscribe().with(
				result -> LOG.info("------------------Message Sent using the main channel--------------------------"),
				failure -> {
					LOG.info("-------------------------------Sending Message using the failover channel---------------------");
					sendExpenseFailOver(expense);}
				);
	}

	public void sendMessageWithFailover(Expense expense) {
		sendExpenseWithUni(expense);
	}

We’ll try to send the message using our mutinyEmitter and use the subscribe operation for Uni. A subscribe means that we can have callbacks when the operation is finished: one for the success and one for the failure.

In case of success, we don’t need to perform any tasks but just log an info message; but in case of a failure, we call the sendExpenseFailover method, which has a simple SmallRye Emitter. To send the message, we also add a header to indicate that this particular record was sent using the fail-over channels. In the case you are replicating this topic using MirrorMaker2, for example, a consumer can check if the message was replicated or sent by other channels and take action when needed. This really depends on the use case.

Receiving the Messages Using REST

In order to receive our messages, we’re going to need a simple endpoint to receive the entity and asynchronously send the message to Kafka.

Our endpoint will look like this:

@POST
@Produces(MediaType.APPLICATION_JSON)
public Response addExpense(Expense expense) {
	expenseSender.sendMessageWithFailover(expense);
    return Response.accepted(expense).build();
}

Testing the Solution

We’re going to use curl to test our solution. First, we need to check if the main channel is working, so we send this information:

curl --header "Content-Type: application/json" -X POST --data '{ "id":3, "desc":"Market", "paymentMethod":"CreditCard", "amount": 10.0}' http://localhost:8080/expense

Then we check on the broker with kafka-console-consumer.sh script to see if the message was delivered correctly. (By the way, all of these Kafka commands are being executed from inside the Broker Pods. That is why the bootstrap server is set to localhost.)

kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic expense --from-beginning --property print.headers=true
{"id":3,"desc":"Market","paymentMethod":"CreditCard","amount":"10.0"}

Now, to test our failover method, we first need to kill our broker pods from the main cluster. We’ll start to see this message on the app log:

2022-05-10 16:23:56,198 WARN  [org.apa.kaf.cli.NetworkClient] (kafka-producer-network-thread | kafka-producer-expense-out-failover) [Producer clientId=kafka-producer-expense-out-failover] Connection to node -1 (backup-cluster-kafka-bootstrap.kafka-backup.svc.cluster.local/172.30.69.10:9092) could not be established. Broker may not be available

Then we send our new record using curl:

curl --header "Content-Type: application/json" -X POST --data '{ "id":4, "desc":"Market", "paymentMethod":"CreditCard", "amount": 10.0}' http://localhost:8080/expense

Now we check with kafka-console-consumer on our backup cluster.

kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic expense --from-beginning --property print.headers=true
channel:failover        {"id":4,"desc":"Market","paymentMethod":"CreditCard","amount":"10.0"}

With the property print.headers=true, we can see that our message indeed went through the fail-over channel and was successfully sent to the backup cluster. So our producer didn’t lose a message or go down. Of course, this is a simplification of a real strategy, but this is enough to start on an HA strategy for producers and consumers and also for an Active-WarmUp Architecture.

All the code for this article can be found on GitHub.

Cheers!!!

.

Leave a Comment