Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-5073

Kafka Streams stuck rebalancing after exception thrown in rebalance listener

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 0.11.0.0
    • 0.11.0.0
    • streams
    • None

    Description

      An exception thrown in the Steams rebalance listener will cause the Kafka consumer coordinator to log an error, but the streams app will not bubble the exception up to the uncaught exception handler.

      This will leave the app stuck in rebalancing state if for instance an exception is thrown by the consumer during state restore.

      Here is an example log that shows the error when the consumer throws a CRC error during state restore.

      [2017-04-13 14:46:41,409] ERROR [XXX-StreamThread-1] User provided listener org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener for group XXXXXXX failed on partition assignment (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:269)
      org.apache.kafka.common.KafkaException: Record batch for partition _my_topic-0 at offset 42 is invalid, cause: Record is corrupt (stored crc = 1982353474, computed crc = 1572524932)
              at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.maybeEnsureValid(Fetcher.java:904)
              at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.nextFetchedRecord(Fetcher.java:936)
              at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:960)
              at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1200(Fetcher.java:864)
              at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:517)
              at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:482)
              at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1069)
              at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1002)
              at org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:145)
              at org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsAssigned(StreamThread.java:1329)
              at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:265)
              at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:363)
              at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:310)
              at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:296)
              at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1037)
              at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1002)
              at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:546)
              at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:702)
              at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:326)
      

      Attachments

        Issue Links

          Activity

            People

              mjsax Matthias J. Sax
              xvrl Xavier Léauté
              Votes:
              0 Vote for this issue
              Watchers:
              6 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: