Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-8773

Static membership protocol borks on re-used group id

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 2.3.0
    • None
    • None
    • None

    Description

      I am using the new static group membership protocol in 2.3.0. I have a situation in which an application defines multiple consumers, lets call them:

      consumer-1
      consumer-2

      Each consumer uses the same group id "x", as they all belong to the same application "x". With dynamic group membership, this is no problem at all. However, with static membership starting a single instance of this application (and therefore both consumers have the same instance.id) fails consistently with errors like:

      2019-08-08 16:56:47,223 ERROR — org.apa.kaf.cli.con.int.AbstractCoordinator : [Consumer instanceId=x-1, clientId=consumer-2, groupId=x] Received fatal exception: group.instance.id gets fenced
      2019-08-08 16:56:47,229 ERROR — org.apa.kaf.cli.con.int.AbstractCoordinator : [Consumer instanceId=x-1, clientId=consumer-1, groupId=x] Received fatal exception: group.instance.id gets fenced
      2019-08-08 16:56:47,234 ERROR ---red.mic.kaf.AbstractKafkaAutoCommitConsumerService: Exception in polling thread. Will die for safety. [[EXCEPTION: org.apache.kafka.common.errors.FencedInstanceIdException: The broker rejected this static consumer since another consumer with the same group.instance.id has registered with a different member.id.
      ]]
      2019-08-08 16:56:47,229 ERROR — red.mic.kaf.AbstractKafkaAutoCommitConsumerService: Exception in polling thread. Will die for safety. [[EXCEPTION: org.apache.kafka.common.errors.FencedInstanceIdException: The broker rejected this static consumer since another consumer with the same group.instance.id has registered with a different member.id.
      ]]

      and to top it off, I also get this obviously incorrect error:

      2019-08-08 16:56:47,235 ERROR — red.mic.kaf.AbstractKafkaAutoCommitConsumerService: Exception in polling thread. Will die for safety. [[EXCEPTION: org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'version': java.nio.BufferUnderflowException
      at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:110) ~[kafka-clients-2.3.0.jar:?]
      at org.apache.kafka.clients.consumer.internals.ConsumerProtocol.deserializeAssignment(ConsumerProtocol.java:106) ~[kafka-clients-2.3.0.jar:?]
      at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:262) ~[kafka-clients-2.3.0.jar:?]
      at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:424) ~[kafka-clients-2.3.0.jar:?]
      at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:358) ~[kafka-clients-2.3.0.jar:?]
      at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:353) ~[kafka-clients-2.3.0.jar:?]
      at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1251) ~[kafka-clients-2.3.0.jar:?]
      at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216) ~[kafka-clients-2.3.0.jar:?]
      at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1201) ~[kafka-clients-2.3.0.jar:?]
      at com.redock.microservice.kafka.BasicCommitAfterProcessingConsumer.run(BasicCommitAfterProcessingConsumer.kt:51) ~[classes/:?]
      at com.redock.microservice.kafka.AbstractKafkaAutoCommitConsumerService$start$2.invokeSuspend(AbstractKafkaAutoCommitConsumerService.kt:44) [classes/:?]
      ... suppressed 2 lines
      at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]
      at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264) [?:?]
      at java.util.concurrent.FutureTask.run(FutureTask.java) [?:?]
      at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) [?:?]
      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
      at java.lang.Thread.run(Thread.java:834) [?:?]
      ]]

       

      The broker logs contain this error:

      ERROR given member.id x-1-1565298855983 is identified as a known static member x-1,but not matching the expected member.id x-1-1565298855984 (kafka.coordinator.group.GroupMetadata)

       

      It seems like the client-id is not taken into account by the server in figuring the static group membership?

      While the workaround is simple – change the group id of each consumer to include the client id – I don't believe this should be necessary.

      Attachments

        Activity

          People

            Unassigned Unassigned
            rocketraman Raman Gupta
            Votes:
            2 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

              Created:
              Updated: