Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-14419

Failed SyncGroup leading to partitions lost due to processing during rebalances

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 3.3.1
    • None
    • streams
    • None
    • AWS EC2 CentOS Linux 3.10.0-1160.76.1.el7.x86_64

    Description

      Trigger scenario:

      Four Kafka client application instances on separate EC2 instances with a total of 8 active and 8 standby stream tasks for the same stream topology, consuming from an input topic with 8 partitions. Sometimes a handful of messages are consumed twice by one of the stream tasks when stream tasks on another application instance join the consumer group after an application instance restart.

      Additional information:

      Messages are produced to the topic by another Kafka streams topology deployed on the same four application instances. I have verified that each message is only produced once by enabling debug logging in the topology flow right before producing each message to the topic.

      Logs from stream thread with duplicate consumption:

       

      2022-11-21 15:09:33,677 INFO [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:1066] [Consumer clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer, groupId=messages.xms.mt.enqueue.sms] Request joining group due to: group is already rebalancing
      2022-11-21 15:09:33,677 INFO [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:566] [Consumer clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer, groupId=messages.xms.mt.enqueue.sms] (Re-)joining group
      
      Input records consumed for the first time
      
      2022-11-21 15:09:33,919 INFO [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:627] [Consumer clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer, groupId=messages.xms.mt.enqueue.sms] Successfully joined group with generation Generation{generationId=8017, memberId='messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer-77a68a5a-fb15-4808-9d87-30f21eabea74', protocol='stream'}
      2022-11-21 15:09:33,920 INFO [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:826] [Consumer clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer, groupId=messages.xms.mt.enqueue.sms] SyncGroup failed: The group began another rebalance. Need to re-join the group. Sent generation was Generation{generationId=8017, memberId='messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer-77a68a5a-fb15-4808-9d87-30f21eabea74', protocol='stream'}
      2022-11-21 15:09:33,922 INFO [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:1019] [Consumer clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer, groupId=messages.xms.mt.enqueue.sms] Resetting generation due to: encountered REBALANCE_IN_PROGRESS from SYNC_GROUP response
      2022-11-21 15:09:33,922 INFO [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:1066] [Consumer clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer, groupId=messages.xms.mt.enqueue.sms] Request joining group due to: encountered REBALANCE_IN_PROGRESS from SYNC_GROUP response
      2022-11-21 15:09:33,923 INFO [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] o.a.k.c.c.i.ConsumerCoordinator [ConsumerCoordinator.java:819] [Consumer clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer, groupId=messages.xms.mt.enqueue.sms] Giving away all assigned partitions as lost since generation/memberID has been reset,indicating that consumer is in old state or no longer part of the group
      2022-11-21 15:09:33,923 INFO [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] o.a.k.c.c.i.ConsumerCoordinator [ConsumerCoordinator.java:354] [Consumer clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer, groupId=messages.xms.mt.enqueue.sms] Lost previously assigned partitions messages.xms.mt.batch.enqueue.sms-1
      2022-11-21 15:09:33,923 INFO [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] o.a.k.s.p.i.StreamThread [StreamsRebalanceListener.java:104] stream-thread [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] at state RUNNING: partitions [messages.xms.mt.batch.enqueue.sms-1] lost due to missed rebalance.
              lost active tasks: [0_1]
              lost assigned standby tasks: []
      2022-11-21 15:09:33,941 INFO [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] o.a.k.s.p.i.StreamTask [StreamTask.java:1220] stream-thread [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] task [0_1] Suspended RUNNING
      2022-11-21 15:09:33,941 INFO [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] o.a.k.s.p.i.StreamTask [StreamTask.java:295] stream-thread [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] task [0_1] Suspended running
      2022-11-21 15:09:33,941 INFO [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] o.a.k.c.c.KafkaConsumer [KafkaConsumer.java:1082] [Consumer clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-restore-consumer, groupId=null] Unsubscribed all topics or patterns and assigned partitions
      2022-11-21 15:09:33,942 INFO [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] o.a.k.s.p.i.RecordCollectorImpl [RecordCollectorImpl.java:333] stream-thread [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] task [0_1] Closing record collector dirty
      2022-11-21 15:09:33,942 INFO [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] o.a.k.s.p.i.StreamTask [StreamTask.java:537] stream-thread [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] task [0_1] Closed dirty
      2022-11-21 15:09:33,942 INFO [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] o.a.k.s.p.i.StreamThread [StreamsRebalanceListener.java:117] stream-thread [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] partitions lost took 19 ms.
      2022-11-21 15:09:33,942 INFO [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:1066] [Consumer clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer, groupId=messages.xms.mt.enqueue.sms] Request joining group due to: rebalance failed due to 'The group is rebalancing, so a rejoin is needed.' (RebalanceInProgressException)
      2022-11-21 15:09:33,942 INFO [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:566] [Consumer clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer, groupId=messages.xms.mt.enqueue.sms] (Re-)joining group
      2022-11-21 15:09:35,391 INFO [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:627] [Consumer clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer, groupId=messages.xms.mt.enqueue.sms] Successfully joined group with generation Generation{generationId=8018, memberId='messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer-77a68a5a-fb15-4808-9d87-30f21eabea74', protocol='stream'}
      2022-11-21 15:09:35,395 INFO [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:802] [Consumer clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer, groupId=messages.xms.mt.enqueue.sms] Successfully synced group in generation Generation{generationId=8018, memberId='messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer-77a68a5a-fb15-4808-9d87-30f21eabea74', protocol='stream'}
      2022-11-21 15:09:35,396 INFO [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] o.a.k.c.c.i.ConsumerCoordinator [ConsumerCoordinator.java:428] [Consumer clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer, groupId=messages.xms.mt.enqueue.sms] Updating assignment with
              Assigned partitions:                       [messages.xms.mt.batch.enqueue.sms-1]
              Current owned partitions:                  []
              Added partitions (assigned - owned):       [messages.xms.mt.batch.enqueue.sms-1]
              Revoked partitions (owned - assigned):     []
      2022-11-21 15:09:35,396 INFO [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] o.a.k.c.c.i.ConsumerCoordinator [ConsumerCoordinator.java:300] [Consumer clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer, groupId=messages.xms.mt.enqueue.sms] Notifying assignor about the new Assignment(partitions=[messages.xms.mt.batch.enqueue.sms-1], userDataSize=52)
      2022-11-21 15:09:35,396 INFO [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] o.a.k.s.p.i.StreamsPartitionAssignor [StreamsPartitionAssignor.java:1361] stream-thread [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer] No followup rebalance was requested, resetting the rebalance schedule.
      2022-11-21 15:09:35,396 INFO [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] o.a.k.s.p.i.TaskManager [TaskManager.java:273] stream-thread [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] Handle new assignment with:
              New active tasks: [0_1]
              New standby tasks: []
              Existing active tasks: []
              Existing standby tasks: []
      2022-11-21 15:09:35,396 INFO [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] o.a.k.c.c.i.ConsumerCoordinator [ConsumerCoordinator.java:312] [Consumer clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer, groupId=messages.xms.mt.enqueue.sms] Adding newly assigned partitions: messages.xms.mt.batch.enqueue.sms-1
      2022-11-21 15:09:35,396 INFO [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] o.a.k.s.p.i.StreamThread [StreamThread.java:234] stream-thread [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] State transition from RUNNING to PARTITIONS_ASSIGNED
      2022-11-21 15:09:35,398 INFO [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] o.a.k.c.c.i.ConsumerCoordinator [ConsumerCoordinator.java:968] [Consumer clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer, groupId=messages.xms.mt.enqueue.sms] Setting offset for partition messages.xms.mt.batch.enqueue.sms-1 to the committed offset FetchPosition{offset=26744389, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[b-1.us1tst-eventbus.ujvxjq.c5.kafka.us-east-1.amazonaws.com:9094 (id: 1 rack: use1-az6)], epoch=19}}
      2022-11-21 15:09:35,444 INFO [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] o.a.k.s.p.i.StreamTask [StreamTask.java:235] stream-thread [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] task [0_1] Initialized
      2022-11-21 15:09:35,445 INFO [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] o.a.k.s.p.i.StreamTask [StreamTask.java:260] stream-thread [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] task [0_1] Restored and ready to run
      2022-11-21 15:09:35,445 INFO [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] o.a.k.s.p.i.StreamThread [StreamThread.java:866] stream-thread [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] Restoration took 49 ms for all tasks [0_1]
      2022-11-21 15:09:35,445 INFO [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] o.a.k.s.p.i.StreamThread [StreamThread.java:234] stream-thread [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] State transition from PARTITIONS_ASSIGNED to RUNNING
      22022-11-21 15:09:35,446 INFO [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] o.a.k.s.KafkaStreams [KafkaStreams.java:342] stream-client [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b] State transition from REBALANCING to RUNNING
      2022-11-21 15:09:35,446 INFO [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] o.a.k.c.c.KafkaConsumer [KafkaConsumer.java:2270] [Consumer clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer, groupId=messages.xms.mt.enqueue.sms] Requesting the log end offset for messages.xms.mt.batch.enqueue.sms-1 in order to compute lag
      
      Same input records consumed for the second time

      Streams consumer configuration:

              allow.auto.create.topics = false
              auto.commit.interval.ms = 5000
              auto.offset.reset = earliest
              bootstrap.servers = [b-3.us1tst-eventbus.ujvxjq.c5.kafka.us-east-1.amazonaws.com:9094, b-1.us1tst-eventbus.ujvxjq.c5.kafka.us-east-1.amazonaws.com:9094, b-2.us1tst-eventbus.ujvxjq.c5.kafka.us-east-1.amazonaws.com:9094]
              check.crcs = true
              client.dns.lookup = use_all_dns_ips
              client.id = messages.xms.mms.mt-05bfc9d3-7f4b-48d4-9c8c-cf9d3e496fef-StreamThread-1-consumer
              client.rack = 
              connections.max.idle.ms = 540000
              default.api.timeout.ms = 60000
              enable.auto.commit = false
              exclude.internal.topics = true
              fetch.max.bytes = 52428800
              fetch.max.wait.ms = 500
              fetch.min.bytes = 1
              group.id = messages.xms.mms.mt
              group.instance.id = null
              heartbeat.interval.ms = 1500
              interceptor.classes = []
              internal.leave.group.on.close = true
              internal.throw.on.fetch.stable.offset.unsupported = false
              isolation.level = read_committed
              key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
              max.partition.fetch.bytes = 1048576
              max.poll.interval.ms = 300000
              max.poll.records = 1000
              metadata.max.age.ms = 300000
              metric.reporters = []
              metrics.num.samples = 2
              metrics.recording.level = INFO
              metrics.sample.window.ms = 30000
              partition.assignment.strategy = [org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor]
              receive.buffer.bytes = 65536
              reconnect.backoff.max.ms = 1000
              reconnect.backoff.ms = 50
              request.timeout.ms = 30000
              retry.backoff.ms = 100
              sasl.client.callback.handler.class = null
              sasl.jaas.config = null
              sasl.kerberos.kinit.cmd = /usr/bin/kinit
              sasl.kerberos.min.time.before.relogin = 60000
              sasl.kerberos.service.name = null
              sasl.kerberos.ticket.renew.jitter = 0.05
              sasl.kerberos.ticket.renew.window.factor = 0.8
              sasl.login.callback.handler.class = null
              sasl.login.class = null
              sasl.login.connect.timeout.ms = null
              sasl.login.read.timeout.ms = null
              sasl.login.refresh.buffer.seconds = 300
              sasl.login.refresh.min.period.seconds = 60
              sasl.login.refresh.window.factor = 0.8
              sasl.login.refresh.window.jitter = 0.05
              sasl.login.retry.backoff.max.ms = 10000
              sasl.login.retry.backoff.ms = 100
              sasl.mechanism = GSSAPI
              sasl.oauthbearer.clock.skew.seconds = 30
              sasl.oauthbearer.expected.audience = null
              sasl.oauthbearer.expected.issuer = null
              sasl.oauthbearer.jwks.endpoint.refresh.ms = 3600000
              sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 10000
              sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100
              sasl.oauthbearer.jwks.endpoint.url = null
              sasl.oauthbearer.scope.claim.name = scope
              sasl.oauthbearer.sub.claim.name = sub
              sasl.oauthbearer.token.endpoint.url = null
              security.protocol = SSL
              security.providers = null
              send.buffer.bytes = 131072
              session.timeout.ms = 6000
              socket.connection.setup.timeout.max.ms = 30000
              socket.connection.setup.timeout.ms = 10000
              ssl.cipher.suites = null
              ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
              ssl.endpoint.identification.algorithm = https
              ssl.engine.factory.class = null
              ssl.key.password = null
              ssl.keymanager.algorithm = SunX509
              ssl.keystore.certificate.chain = null
              ssl.keystore.key = null
              ssl.keystore.location = /opt/apps/msl/xms-gateway/conf/xms.us1tst.jks
              ssl.keystore.password = [hidden]
              ssl.keystore.type = JKS
              ssl.protocol = TLSv1.3
              ssl.provider = null
              ssl.secure.random.implementation = null
              ssl.trustmanager.algorithm = PKIX
              ssl.truststore.certificates = null
              ssl.truststore.location = null
              ssl.truststore.password = null
              ssl.truststore.type = JKS
              value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer

       

      The message about lost partition that is highlighted in red above only occurs when messages are consumed twice, which happens roughly two times out of ten in my application restart test scenario.

      This issue no longer occurs when the patch suggested in KAFKA-14362 is applied.

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              Carlstedt Mikael
              Votes:
              0 Vote for this issue
              Watchers:
              6 Start watching this issue

              Dates

                Created:
                Updated: