Details
-
Bug
-
Status: Open
-
Major
-
Resolution: Unresolved
-
2.3.0
-
None
-
None
-
None
Description
I am using the new static group membership protocol in 2.3.0. I have a situation in which an application defines multiple consumers, lets call them:
consumer-1
consumer-2
Each consumer uses the same group id "x", as they all belong to the same application "x". With dynamic group membership, this is no problem at all. However, with static membership starting a single instance of this application (and therefore both consumers have the same instance.id) fails consistently with errors like:
2019-08-08 16:56:47,223 ERROR — org.apa.kaf.cli.con.int.AbstractCoordinator : [Consumer instanceId=x-1, clientId=consumer-2, groupId=x] Received fatal exception: group.instance.id gets fenced 2019-08-08 16:56:47,229 ERROR — org.apa.kaf.cli.con.int.AbstractCoordinator : [Consumer instanceId=x-1, clientId=consumer-1, groupId=x] Received fatal exception: group.instance.id gets fenced 2019-08-08 16:56:47,234 ERROR ---red.mic.kaf.AbstractKafkaAutoCommitConsumerService: Exception in polling thread. Will die for safety. [[EXCEPTION: org.apache.kafka.common.errors.FencedInstanceIdException: The broker rejected this static consumer since another consumer with the same group.instance.id has registered with a different member.id. ]] 2019-08-08 16:56:47,229 ERROR — red.mic.kaf.AbstractKafkaAutoCommitConsumerService: Exception in polling thread. Will die for safety. [[EXCEPTION: org.apache.kafka.common.errors.FencedInstanceIdException: The broker rejected this static consumer since another consumer with the same group.instance.id has registered with a different member.id. ]]
and to top it off, I also get this obviously incorrect error:
2019-08-08 16:56:47,235 ERROR — red.mic.kaf.AbstractKafkaAutoCommitConsumerService: Exception in polling thread. Will die for safety. [[EXCEPTION: org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'version': java.nio.BufferUnderflowException at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:110) ~[kafka-clients-2.3.0.jar:?] at org.apache.kafka.clients.consumer.internals.ConsumerProtocol.deserializeAssignment(ConsumerProtocol.java:106) ~[kafka-clients-2.3.0.jar:?] at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:262) ~[kafka-clients-2.3.0.jar:?] at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:424) ~[kafka-clients-2.3.0.jar:?] at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:358) ~[kafka-clients-2.3.0.jar:?] at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:353) ~[kafka-clients-2.3.0.jar:?] at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1251) ~[kafka-clients-2.3.0.jar:?] at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216) ~[kafka-clients-2.3.0.jar:?] at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1201) ~[kafka-clients-2.3.0.jar:?] at com.redock.microservice.kafka.BasicCommitAfterProcessingConsumer.run(BasicCommitAfterProcessingConsumer.kt:51) ~[classes/:?] at com.redock.microservice.kafka.AbstractKafkaAutoCommitConsumerService$start$2.invokeSuspend(AbstractKafkaAutoCommitConsumerService.kt:44) [classes/:?] ... suppressed 2 lines at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?] at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264) [?:?] at java.util.concurrent.FutureTask.run(FutureTask.java) [?:?] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) [?:?] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?] at java.lang.Thread.run(Thread.java:834) [?:?] ]]
The broker logs contain this error:
ERROR given member.id x-1-1565298855983 is identified as a known static member x-1,but not matching the expected member.id x-1-1565298855984 (kafka.coordinator.group.GroupMetadata)
It seems like the client-id is not taken into account by the server in figuring the static group membership?
While the workaround is simple – change the group id of each consumer to include the client id – I don't believe this should be necessary.