Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-9607

Should not clear partition queue during task close

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 2.6.0
    • Component/s: streams
    • Labels:
      None

      Description

      We detected an issue with a corrupted task failed to revive:

      [2020-02-25T08:23:38-08:00] (streams-soak-trunk_soak_i-06305ad57801079ae_streamslog) [2020-02-25 16:23:38,137] INFO [stream-soak-test-8f2124ec-8bd0-410a-8e3d-f202a32ab774-StreamThread-1] stream-thread [stream-soak-test-8f2124ec-8bd0-410a-8e3d-f202a32ab774-StreamThread-1] Handle new assignment with:
              New active tasks: [0_0, 3_1]
              New standby tasks: []
              Existing active tasks: [0_0]
              Existing standby tasks: [] (org.apache.kafka.streams.processor.internals.TaskManager)
      [2020-02-25T08:23:38-08:00] (streams-soak-trunk_soak_i-06305ad57801079ae_streamslog) [2020-02-25 16:23:38,138] INFO [stream-soak-test-8f2124ec-8bd0-410a-8e3d-f202a32ab774-StreamThread-1] [Consumer clientId=stream-soak-test-8f2124ec-8bd0-410a-8e3d-f202a32ab774-StreamThread-1-consumer, groupId=stream-soak-test] Adding newly assigned partitions: k8sName-id-repartition-1 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
      [2020-02-25T08:23:38-08:00] (streams-soak-trunk_soak_i-06305ad57801079ae_streamslog) [2020-02-25 16:23:38,138] INFO [stream-soak-test-8f2124ec-8bd0-410a-8e3d-f202a32ab774-StreamThread-1] stream-thread [stream-soak-test-8f2124ec-8bd0-410a-8e3d-f202a32ab774-StreamThread-1] State transition from RUNNING to PARTITIONS_ASSIGNED (org.apache.kafka.streams.processor.internals.StreamThread)
      [2020-02-25T08:23:39-08:00] (streams-soak-trunk_soak_i-06305ad57801079ae_streamslog) [2020-02-25 16:23:38,419] WARN [stream-soak-test-8f2124ec-8bd0-410a-8e3d-f202a32ab774-StreamThread-1] stream-thread [stream-soak-test-8f2124ec-8bd0-410a-8e3d-f202a32ab774-StreamThread-1] Encountered org.apache.kafka.clients.consumer.OffsetOutOfRangeException fetching records from restore consumer for partitions [stream-soak-test-KSTREAM-AGGREGATE-STATE-STORE-0000000049-changelog-1], it is likely that the consumer's position has fallen out of the topic partition offset range because the topic was truncated or compacted on the broker, marking the corresponding tasks as corrupted and re-initializingit later. (org.apache.kafka.streams.processor.internals.StoreChangelogReader)
      [2020-02-25T08:23:38-08:00] (streams-soak-trunk_soak_i-06305ad57801079ae_streamslog) [2020-02-25 16:23:38,139] INFO [stream-soak-test-8f2124ec-8bd0-410a-8e3d-f202a32ab774-StreamThread-1] [Consumer clientId=stream-soak-test-8f2124ec-8bd0-410a-8e3d-f202a32ab774-StreamThread-1-consumer, groupId=stream-soak-test] Setting offset for partition k8sName-id-repartition-1 to the committed offset FetchPosition{offset=3592242, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[ip-172-31-25-115.us-west-2.compute.internal:9092 (id: 1003 rack: null)], epoch=absent}} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
      [2020-02-25T08:23:39-08:00] (streams-soak-trunk_soak_i-06305ad57801079ae_streamslog) [2020-02-25 16:23:38,463] ERROR [stream-soak-test-8f2124ec-8bd0-410a-8e3d-f202a32ab774-StreamThread-1] stream-thread [stream-soak-test-8f2124ec-8bd0-410a-8e3d-f202a32ab774-StreamThread-1] Encountered the following exception during processing and the thread is going to shut down:  (org.apache.kafka.streams.processor.internals.StreamThread)
      [2020-02-25T08:23:39-08:00] (streams-soak-trunk_soak_i-06305ad57801079ae_streamslog) java.lang.IllegalStateException: Partition k8sName-id-repartition-1 not found.
              at org.apache.kafka.streams.processor.internals.PartitionGroup.setPartitionTime(PartitionGroup.java:99)
              at org.apache.kafka.streams.processor.internals.StreamTask.initializeTaskTime(StreamTask.java:651)
              at org.apache.kafka.streams.processor.internals.StreamTask.initializeMetadata(StreamTask.java:631)
              at org.apache.kafka.streams.processor.internals.StreamTask.completeRestoration(StreamTask.java:209)
              at org.apache.kafka.streams.processor.internals.TaskManager.tryToCompleteRestoration(TaskManager.java:270)
              at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:834)
              at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:749)
              at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:725)
      

      The root cause is that we accidentally cleanup the partition group map so that next time we reboot the task it would miss input partitions.

      By avoiding clean up the partition group, we may have a slight overhead for GC which is ok. In terms of correctness, currently there is no way to revive the task with partitions reassigned.

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                bchen225242 Boyang Chen
                Reporter:
                bchen225242 Boyang Chen
              • Votes:
                0 Vote for this issue
                Watchers:
                3 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: