concurrency – Incompatibility between Spring Kafka consumers with versions 2.3.13.RELEASE and 2.8.3

I am having problems with kafka consumers of applications with different versions of Spring-kafka, specifically between 2.3.13.RELEASE and 2.8.3.

When deploying to PRO using blue/green strategies for deployment, I am having the problem that all partitions, when using concurrency on consumers, are assigned to the application using version 2.3.13.RELEASE, while the new application deploying a consumer with Spring- kafka version 2.8.3 (to the same topic and group name) when finishing the partition rebalancing is left with no partition at all.

This is the configuration currently used for the consumer:

    @KafkaListener(topics = "spring.kafka.test", groupId = "group-spring-kafka", concurrency = "10")
    public void processMessage(String content) {
        System.out.println("Message received: " + content);
    }

The topic has 10 partitions.

Below is the log of the applications in steps:

  1. Start app1 with spring-kafka version 2.3.13.RELEASE with a consumer:

APP1 LOGS

  .   ____          _            __ _ _
 /\ / ___'_ __ _ _(_)_ __  __ _    
( ( )___ | '_ | '_| | '_ / _` |    
 \/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |___, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::       (v2.2.13.RELEASE)

....9 thread consumer initialisations before this....

2022-04-10 10:56:44.516  INFO 2748 --- [           main] o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
    allow.auto.create.topics = true
    auto.commit.interval.ms = 5000
    auto.offset.reset = latest
    bootstrap.servers = [localhost:9092]
    check.crcs = true
    client.dns.lookup = default
    client.id = 
    client.rack = 
    connections.max.idle.ms = 540000
    default.api.timeout.ms = 60000
    enable.auto.commit = false
    exclude.internal.topics = true
    fetch.max.bytes = 52428800
    fetch.max.wait.ms = 500
    fetch.min.bytes = 1
    group.id = group-spring-kafka
    group.instance.id = null
    heartbeat.interval.ms = 3000
    interceptor.classes = []
    internal.leave.group.on.close = true
    isolation.level = read_uncommitted
    key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
    max.partition.fetch.bytes = 1048576
    max.poll.interval.ms = 300000
    max.poll.records = 500
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
    receive.buffer.bytes = 65536
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 30000
    retry.backoff.ms = 100
    sasl.client.callback.handler.class = null
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.login.callback.handler.class = null
    sasl.login.class = null
    sasl.login.refresh.buffer.seconds = 300
    sasl.login.refresh.min.period.seconds = 60
    sasl.login.refresh.window.factor = 0.8
    sasl.login.refresh.window.jitter = 0.05
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    send.buffer.bytes = 131072
    session.timeout.ms = 10000
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    ssl.endpoint.identification.algorithm = https
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLS
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer

2022-04-10 10:56:44.519  INFO 2748 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 2.3.1
2022-04-10 10:56:44.519  INFO 2748 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 18a913733fb71c01
2022-04-10 10:56:44.519  INFO 2748 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1649581004519
2022-04-10 10:56:44.520  INFO 2748 --- [           main] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-10, groupId=group-spring-kafka] Subscribed to topic(s): spring.kafka.test

.....

2022-04-10 10:56:44.520  INFO 2748 --- [           main] o.s.s.c.ThreadPoolTaskScheduler          : Initializing ExecutorService
2022-04-10 10:56:44.528  INFO 2748 --- [           main] com.spring.kafka.sample.DemoApplication  : Started DemoApplication in 1.858 seconds (JVM running for 2.218)
2022-04-10 10:56:47.670  INFO 2748 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : group-spring-kafka: partitions assigned: [spring.kafka.test-0]
2022-04-10 10:56:47.670  INFO 2748 --- [ntainer#0-3-C-1] o.s.k.l.KafkaMessageListenerContainer    : group-spring-kafka: partitions assigned: [spring.kafka.test-4]
2022-04-10 10:56:47.670  INFO 2748 --- [ntainer#0-9-C-1] o.s.k.l.KafkaMessageListenerContainer    : group-spring-kafka: partitions assigned: [spring.kafka.test-1]
2022-04-10 10:56:47.670  INFO 2748 --- [ntainer#0-2-C-1] o.s.k.l.KafkaMessageListenerContainer    : group-spring-kafka: partitions assigned: [spring.kafka.test-3]
2022-04-10 10:56:47.670  INFO 2748 --- [ntainer#0-5-C-1] o.s.k.l.KafkaMessageListenerContainer    : group-spring-kafka: partitions assigned: [spring.kafka.test-6]
2022-04-10 10:56:47.670  INFO 2748 --- [ntainer#0-1-C-1] o.s.k.l.KafkaMessageListenerContainer    : group-spring-kafka: partitions assigned: [spring.kafka.test-2]
2022-04-10 10:56:47.670  INFO 2748 --- [ntainer#0-8-C-1] o.s.k.l.KafkaMessageListenerContainer    : group-spring-kafka: partitions assigned: [spring.kafka.test-9]
2022-04-10 10:56:47.670  INFO 2748 --- [ntainer#0-6-C-1] o.s.k.l.KafkaMessageListenerContainer    : group-spring-kafka: partitions assigned: [spring.kafka.test-7]
2022-04-10 10:56:47.670  INFO 2748 --- [ntainer#0-4-C-1] o.s.k.l.KafkaMessageListenerContainer    : group-spring-kafka: partitions assigned: [spring.kafka.test-5]

  1. Start app2 with spring-kafka version 2.8.3 with a consumer (same topic and group name as consumer app1):

APP1 LOGS

2022-04-10 11:00:02.696  INFO 2748 --- [ntainer#0-8-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-9, groupId=group-spring-kafka] Attempt to heartbeat failed since group is rebalancing
2022-04-10 11:00:02.697  INFO 2748 --- [ntainer#0-6-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-7, groupId=group-spring-kafka] Revoking previously assigned partitions [spring.kafka.test-7]
2022-04-10 11:00:02.697  INFO 2748 --- [ntainer#0-6-C-1] o.s.k.l.KafkaMessageListenerContainer    : group-spring-kafka: partitions revoked: [spring.kafka.test-7]
2022-04-10 11:00:02.697  INFO 2748 --- [ntainer#0-8-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-9, groupId=group-spring-kafka] Revoking previously assigned partitions [spring.kafka.test-9]
2022-04-10 11:00:02.697  INFO 2748 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=group-spring-kafka] Attempt to heartbeat failed since group is rebalancing
2022-04-10 11:00:02.697  INFO 2748 --- [ntainer#0-6-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-7, groupId=group-spring-kafka] (Re-)joining group
2022-04-10 11:00:02.697  INFO 2748 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-1, groupId=group-spring-kafka] Revoking previously assigned partitions [spring.kafka.test-0]
2022-04-10 11:00:02.697  INFO 2748 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : group-spring-kafka: partitions revoked: [spring.kafka.test-0]
2022-04-10 11:00:02.697  INFO 2748 --- [ntainer#0-8-C-1] o.s.k.l.KafkaMessageListenerContainer    : group-spring-kafka: partitions revoked: [spring.kafka.test-9]
2022-04-10 11:00:02.697  INFO 2748 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=group-spring-kafka] (Re-)joining group
2022-04-10 11:00:02.697  INFO 2748 --- [ntainer#0-8-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-9, groupId=group-spring-kafka] (Re-)joining group
2022-04-10 11:00:02.697  INFO 2748 --- [ntainer#0-9-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-10, groupId=group-spring-kafka] Attempt to heartbeat failed since group is rebalancing
2022-04-10 11:00:02.697  INFO 2748 --- [ntainer#0-9-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-10, groupId=group-spring-kafka] Revoking previously assigned partitions [spring.kafka.test-1]
2022-04-10 11:00:02.697  INFO 2748 --- [ntainer#0-9-C-1] o.s.k.l.KafkaMessageListenerContainer    : group-spring-kafka: partitions revoked: [spring.kafka.test-1]
2022-04-10 11:00:02.697  INFO 2748 --- [ntainer#0-9-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-10, groupId=group-spring-kafka] (Re-)joining group
2022-04-10 11:00:02.699  INFO 2748 --- [ntainer#0-1-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-2, groupId=group-spring-kafka] Attempt to heartbeat failed since group is rebalancing
2022-04-10 11:00:02.699  INFO 2748 --- [ntainer#0-1-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-2, groupId=group-spring-kafka] Revoking previously assigned partitions [spring.kafka.test-2]
2022-04-10 11:00:02.699  INFO 2748 --- [ntainer#0-1-C-1] o.s.k.l.KafkaMessageListenerContainer    : group-spring-kafka: partitions revoked: [spring.kafka.test-2]
2022-04-10 11:00:02.699  INFO 2748 --- [ntainer#0-1-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-2, groupId=group-spring-kafka] (Re-)joining group
2022-04-10 11:00:02.700  INFO 2748 --- [ntainer#0-3-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-4, groupId=group-spring-kafka] Attempt to heartbeat failed since group is rebalancing
2022-04-10 11:00:02.700  INFO 2748 --- [ntainer#0-3-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-4, groupId=group-spring-kafka] Revoking previously assigned partitions [spring.kafka.test-4]
2022-04-10 11:00:02.700  INFO 2748 --- [ntainer#0-3-C-1] o.s.k.l.KafkaMessageListenerContainer    : group-spring-kafka: partitions revoked: [spring.kafka.test-4]
2022-04-10 11:00:02.700  INFO 2748 --- [ntainer#0-3-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-4, groupId=group-spring-kafka] (Re-)joining group
2022-04-10 11:00:02.701  INFO 2748 --- [ntainer#0-5-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-6, groupId=group-spring-kafka] Attempt to heartbeat failed since group is rebalancing
2022-04-10 11:00:02.701  INFO 2748 --- [ntainer#0-5-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-6, groupId=group-spring-kafka] Revoking previously assigned partitions [spring.kafka.test-6]
2022-04-10 11:00:02.701  INFO 2748 --- [ntainer#0-5-C-1] o.s.k.l.KafkaMessageListenerContainer    : group-spring-kafka: partitions revoked: [spring.kafka.test-6]
2022-04-10 11:00:02.701  INFO 2748 --- [ntainer#0-5-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-6, groupId=group-spring-kafka] (Re-)joining group
2022-04-10 11:00:02.703  INFO 2748 --- [ntainer#0-4-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-5, groupId=group-spring-kafka] Attempt to heartbeat failed since group is rebalancing
2022-04-10 11:00:02.703  INFO 2748 --- [ntainer#0-4-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-5, groupId=group-spring-kafka] Revoking previously assigned partitions [spring.kafka.test-5]
2022-04-10 11:00:02.703  INFO 2748 --- [ntainer#0-7-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-8, groupId=group-spring-kafka] Attempt to heartbeat failed since group is rebalancing
2022-04-10 11:00:02.703  INFO 2748 --- [ntainer#0-4-C-1] o.s.k.l.KafkaMessageListenerContainer    : group-spring-kafka: partitions revoked: [spring.kafka.test-5]
2022-04-10 11:00:02.703  INFO 2748 --- [ntainer#0-4-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-5, groupId=group-spring-kafka] (Re-)joining group
2022-04-10 11:00:02.703  INFO 2748 --- [ntainer#0-7-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-8, groupId=group-spring-kafka] Revoking previously assigned partitions [spring.kafka.test-8]
2022-04-10 11:00:02.703  INFO 2748 --- [ntainer#0-7-C-1] o.s.k.l.KafkaMessageListenerContainer    : group-spring-kafka: partitions revoked: [spring.kafka.test-8]
2022-04-10 11:00:02.703  INFO 2748 --- [ntainer#0-7-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-8, groupId=group-spring-kafka] (Re-)joining group
2022-04-10 11:00:02.704  INFO 2748 --- [ntainer#0-2-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-3, groupId=group-spring-kafka] Attempt to heartbeat failed since group is rebalancing
2022-04-10 11:00:02.704  INFO 2748 --- [ntainer#0-2-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-3, groupId=group-spring-kafka] Revoking previously assigned partitions [spring.kafka.test-3]
2022-04-10 11:00:02.704  INFO 2748 --- [ntainer#0-2-C-1] o.s.k.l.KafkaMessageListenerContainer    : group-spring-kafka: partitions revoked: [spring.kafka.test-3]
2022-04-10 11:00:02.704  INFO 2748 --- [ntainer#0-2-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-3, groupId=group-spring-kafka] (Re-)joining group
2022-04-10 11:00:02.707  INFO 2748 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=group-spring-kafka] Successfully joined group with generation 15
2022-04-10 11:00:02.707  INFO 2748 --- [ntainer#0-3-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-4, groupId=group-spring-kafka] Successfully joined group with generation 15
2022-04-10 11:00:02.707  INFO 2748 --- [ntainer#0-8-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-9, groupId=group-spring-kafka] Successfully joined group with generation 15
2022-04-10 11:00:02.707  INFO 2748 --- [ntainer#0-9-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-10, groupId=group-spring-kafka] Successfully joined group with generation 15
2022-04-10 11:00:02.707  INFO 2748 --- [ntainer#0-4-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-5, groupId=group-spring-kafka] Successfully joined group with generation 15
2022-04-10 11:00:02.707  INFO 2748 --- [ntainer#0-8-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-9, groupId=group-spring-kafka] Setting newly assigned partitions: spring.kafka.test-9
2022-04-10 11:00:02.707  INFO 2748 --- [ntainer#0-4-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-5, groupId=group-spring-kafka] Setting newly assigned partitions: spring.kafka.test-5
2022-04-10 11:00:02.707  INFO 2748 --- [ntainer#0-6-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-7, groupId=group-spring-kafka] Successfully joined group with generation 15
2022-04-10 11:00:02.707  INFO 2748 --- [ntainer#0-5-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-6, groupId=group-spring-kafka] Successfully joined group with generation 15
2022-04-10 11:00:02.707  INFO 2748 --- [ntainer#0-1-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-2, groupId=group-spring-kafka] Successfully joined group with generation 15
2022-04-10 11:00:02.707  INFO 2748 --- [ntainer#0-6-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-7, groupId=group-spring-kafka] Setting newly assigned partitions: spring.kafka.test-7
2022-04-10 11:00:02.707  INFO 2748 --- [ntainer#0-7-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-8, groupId=group-spring-kafka] Successfully joined group with generation 15
2022-04-10 11:00:02.707  INFO 2748 --- [ntainer#0-3-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-4, groupId=group-spring-kafka] Setting newly assigned partitions: spring.kafka.test-4
2022-04-10 11:00:02.707  INFO 2748 --- [ntainer#0-5-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-6, groupId=group-spring-kafka] Setting newly assigned partitions: spring.kafka.test-6
2022-04-10 11:00:02.707  INFO 2748 --- [ntainer#0-9-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-10, groupId=group-spring-kafka] Setting newly assigned partitions: spring.kafka.test-1
2022-04-10 11:00:02.707  INFO 2748 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-1, groupId=group-spring-kafka] Setting newly assigned partitions: spring.kafka.test-0
2022-04-10 11:00:02.708  INFO 2748 --- [ntainer#0-7-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-8, groupId=group-spring-kafka] Setting newly assigned partitions: spring.kafka.test-8
2022-04-10 11:00:02.707  INFO 2748 --- [ntainer#0-1-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-2, groupId=group-spring-kafka] Setting newly assigned partitions: spring.kafka.test-2
2022-04-10 11:00:02.710  INFO 2748 --- [ntainer#0-6-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-7, groupId=group-spring-kafka] Setting offset for partition spring.kafka.test-7 to the committed offset FetchPosition{offset=1, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=127.0.0.1:9092 (id: 1001 rack: null), epoch=0}}
2022-04-10 11:00:02.711  INFO 2748 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-1, groupId=group-spring-kafka] Setting offset for partition spring.kafka.test-0 to the committed offset FetchPosition{offset=1, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=127.0.0.1:9092 (id: 1001 rack: null), epoch=0}}
2022-04-10 11:00:02.711  INFO 2748 --- [ntainer#0-5-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-6, groupId=group-spring-kafka] Setting offset for partition spring.kafka.test-6 to the committed offset FetchPosition{offset=1, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=127.0.0.1:9092 (id: 1001 rack: null), epoch=0}}
2022-04-10 11:00:02.710  INFO 2748 --- [ntainer#0-2-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-3, groupId=group-spring-kafka] Successfully joined group with generation 15
2022-04-10 11:00:02.710  INFO 2748 --- [ntainer#0-3-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-4, groupId=group-spring-kafka] Setting offset for partition spring.kafka.test-4 to the committed offset FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=127.0.0.1:9092 (id: 1001 rack: null), epoch=0}}
2022-04-10 11:00:02.710  INFO 2748 --- [ntainer#0-7-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-8, groupId=group-spring-kafka] Setting offset for partition spring.kafka.test-8 to the committed offset FetchPosition{offset=1, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=127.0.0.1:9092 (id: 1001 rack: null), epoch=0}}
2022-04-10 11:00:02.711  INFO 2748 --- [ntainer#0-2-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-3, groupId=group-spring-kafka] Setting newly assigned partitions: spring.kafka.test-3
2022-04-10 11:00:02.710  INFO 2748 --- [ntainer#0-9-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-10, groupId=group-spring-kafka] Setting offset for partition spring.kafka.test-1 to the committed offset FetchPosition{offset=2, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=127.0.0.1:9092 (id: 1001 rack: null), epoch=0}}
2022-04-10 11:00:02.711  INFO 2748 --- [ntainer#0-6-C-1] o.s.k.l.KafkaMessageListenerContainer    : group-spring-kafka: partitions assigned: [spring.kafka.test-7]
2022-04-10 11:00:02.710  INFO 2748 --- [ntainer#0-4-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-5, groupId=group-spring-kafka] Setting offset for partition spring.kafka.test-5 to the committed offset FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=127.0.0.1:9092 (id: 1001 rack: null), epoch=0}}
2022-04-10 11:00:02.711  INFO 2748 --- [ntainer#0-7-C-1] o.s.k.l.KafkaMessageListenerContainer    : group-spring-kafka: partitions assigned: [spring.kafka.test-8]
2022-04-10 11:00:02.710  INFO 2748 --- [ntainer#0-8-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-9, groupId=group-spring-kafka] Setting offset for partition spring.kafka.test-9 to the committed offset FetchPosition{offset=1, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=127.0.0.1:9092 (id: 1001 rack: null), epoch=0}}
2022-04-10 11:00:02.711  INFO 2748 --- [ntainer#0-4-C-1] o.s.k.l.KafkaMessageListenerContainer    : group-spring-kafka: partitions assigned: [spring.kafka.test-5]
2022-04-10 11:00:02.711  INFO 2748 --- [ntainer#0-5-C-1] o.s.k.l.KafkaMessageListenerContainer    : group-spring-kafka: partitions assigned: [spring.kafka.test-6]
2022-04-10 11:00:02.711  INFO 2748 --- [ntainer#0-3-C-1] o.s.k.l.KafkaMessageListenerContainer    : group-spring-kafka: partitions assigned: [spring.kafka.test-4]
2022-04-10 11:00:02.711  INFO 2748 --- [ntainer#0-8-C-1] o.s.k.l.KafkaMessageListenerContainer    : group-spring-kafka: partitions assigned: [spring.kafka.test-9]
2022-04-10 11:00:02.711  INFO 2748 --- [ntainer#0-9-C-1] o.s.k.l.KafkaMessageListenerContainer    : group-spring-kafka: partitions assigned: [spring.kafka.test-1]
2022-04-10 11:00:02.711  INFO 2748 --- [ntainer#0-1-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-2, groupId=group-spring-kafka] Setting offset for partition spring.kafka.test-2 to the committed offset FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=127.0.0.1:9092 (id: 1001 rack: null), epoch=0}}
2022-04-10 11:00:02.711  INFO 2748 --- [ntainer#0-1-C-1] o.s.k.l.KafkaMessageListenerContainer    : group-spring-kafka: partitions assigned: [spring.kafka.test-2]
2022-04-10 11:00:02.711  INFO 2748 --- [ntainer#0-2-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-3, groupId=group-spring-kafka] Setting offset for partition spring.kafka.test-3 to the committed offset FetchPosition{offset=1, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=127.0.0.1:9092 (id: 1001 rack: null), epoch=0}}
2022-04-10 11:00:02.712  INFO 2748 --- [ntainer#0-2-C-1] o.s.k.l.KafkaMessageListenerContainer    : group-spring-kafka: partitions assigned: [spring.kafka.test-3]
2022-04-10 11:00:02.750  INFO 2748 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : group-spring-kafka: partitions assigned: [spring.kafka.test-0]

APP2 LOGS

  .   ____          _            __ _ _
 /\ / ___'_ __ _ _(_)_ __  __ _    
( ( )___ | '_ | '_| | '_ / _` |    
 \/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |___, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::                (v2.6.4)

....9 thread consumer initialisations before this....

2022-04-10 11:00:02.096  INFO 2995 --- [           main] o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
    allow.auto.create.topics = true
    auto.commit.interval.ms = 5000
    auto.offset.reset = latest
    bootstrap.servers = [localhost:9092]
    check.crcs = true
    client.dns.lookup = use_all_dns_ips
    client.id = consumer-group-spring-kafka-10
    client.rack = 
    connections.max.idle.ms = 540000
    default.api.timeout.ms = 60000
    enable.auto.commit = false
    exclude.internal.topics = true
    fetch.max.bytes = 52428800
    fetch.max.wait.ms = 500
    fetch.min.bytes = 1
    group.id = group-spring-kafka
    group.instance.id = null
    heartbeat.interval.ms = 3000
    interceptor.classes = []
    internal.leave.group.on.close = true
    internal.throw.on.fetch.stable.offset.unsupported = false
    isolation.level = read_uncommitted
    key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
    max.partition.fetch.bytes = 1048576
    max.poll.interval.ms = 300000
    max.poll.records = 500
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor, class org.apache.kafka.clients.consumer.CooperativeStickyAssignor]
    receive.buffer.bytes = 65536
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 30000
    retry.backoff.ms = 100
    sasl.client.callback.handler.class = null
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.login.callback.handler.class = null
    sasl.login.class = null
    sasl.login.refresh.buffer.seconds = 300
    sasl.login.refresh.min.period.seconds = 60
    sasl.login.refresh.window.factor = 0.8
    sasl.login.refresh.window.jitter = 0.05
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    security.providers = null
    send.buffer.bytes = 131072
    session.timeout.ms = 45000
    socket.connection.setup.timeout.max.ms = 30000
    socket.connection.setup.timeout.ms = 10000
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
    ssl.endpoint.identification.algorithm = https
    ssl.engine.factory.class = null
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.certificate.chain = null
    ssl.keystore.key = null
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLSv1.3
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.certificates = null
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer

assigned partitions: 
2022-04-10 11:00:02.719  INFO 2995 --- [ntainer#0-5-C-1] o.s.k.l.KafkaMessageListenerContainer    : group-spring-kafka: partitions assigned: []
2022-04-10 11:00:02.719  INFO 2995 --- [ntainer#0-8-C-1] o.s.k.l.KafkaMessageListenerContainer    : group-spring-kafka: partitions assigned: []
2022-04-10 11:00:02.719  INFO 2995 --- [ntainer#0-9-C-1] o.s.k.l.KafkaMessageListenerContainer    : group-spring-kafka: partitions assigned: []
2022-04-10 11:00:02.719  INFO 2995 --- [ntainer#0-7-C-1] o.s.k.l.KafkaMessageListenerContainer    : group-spring-kafka: partitions assigned: []
2022-04-10 11:00:02.719  INFO 2995 --- [ntainer#0-1-C-1] o.s.k.l.KafkaMessageListenerContainer    : group-spring-kafka: partitions assigned: []
2022-04-10 11:00:02.719  INFO 2995 --- [ntainer#0-3-C-1] o.s.k.l.KafkaMessageListenerContainer    : group-spring-kafka: partitions assigned: []
2022-04-10 11:00:02.719  INFO 2995 --- [ntainer#0-4-C-1] o.s.k.l.KafkaMessageListenerContainer    : group-spring-kafka: partitions assigned: []
2022-04-10 11:00:02.719  INFO 2995 --- [ntainer#0-6-C-1] o.s.k.l.KafkaMessageListenerContainer    : group-spring-kafka: partitions assigned: []
2022-04-10 11:00:02.719  INFO 2995 --- [ntainer#0-2-C-1] o.s.k.l.KafkaMessageListenerContainer    : group-spring-kafka: partitions assigned: []
2022-04-10 11:00:02.719  INFO 2995 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : group-spring-kafka: partitions assigned: []

As can be seen, the logs of APP2 (which is the version with spring-kafka 2.8.3) are left without assigned partitions, while APP1 remains with the 10 assigned partitions.

I have tried deploying in reverse order and the result is the same.

If I deploy the applications with the same versions the result is correct, 5 partitions are distributed for each application.

I have also tried to force the partitioning strategies for both versions with “RangeAssignor” but I still have the same problems.

Any ideas? thanks in advance.

[EDITED 5/17/2022]

I have done more tests with different versions of Spring-Kafka, and at the end the problem is reproduced in the update from Spring-Kafka version 2.3.13.RELEASE to Spring-Kafka version 2.4.0.RELEASE.

According to the release announcement of release 2.4.0:

The 2.4.0 kafka-clients are not binary compatible with Spring for Apache Kafka 2.3 so if you wish to use the 2.4.0 clients, you must upgrade to this version. See the appendix in the reference manual for how to override the jar versions, especially if you are using Spring Boot for dependency management and/or you are using the test embedded Kafka broker.

I’m not sure… could the incompatibility described in the advert be causing this behaviour?

Leave a Comment