Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-7112

StreamThread does not check for state again after pollRequests()

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 2.0.0
    • Component/s: streams
    • Labels:
      None

      Description

      In StreamThread's main loop, we have:

              if (state == State.PARTITIONS_ASSIGNED) {
                  // try to fetch some records with zero poll millis
                  // to unblock the restoration as soon as possible
                  records = pollRequests(Duration.ZERO);
      
                  if (taskManager.updateNewAndRestoringTasks()) {
                      setState(State.RUNNING);
                  }
              }
      

      in which we first check for state, and if it is PARTITIONS_ASSIGNED then call `consumer.poll()` and then call `askManager.updateNewAndRestoringTasks()`. There is a race condition though, that if another rebalance gets triggered, then `onPartitionRevoked` will be called in which we will restoreConsumer.unsubscribe();, and then if we call taskManager.updateNewAndRestoringTasks() right away we will see this:

      Encountered the following error during processing: (org.apache.kafka.streams.processor.internals.StreamThread)
      java.lang.IllegalStateException: Consumer is not subscribed to any topics or assigned any partitions
              at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1159)
              at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1150)
              at org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:83)
              at org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:317)
              at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:808)
              at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:767)
              at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:736)
      

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                guozhang Guozhang Wang
                Reporter:
                guozhang Guozhang Wang
              • Votes:
                0 Vote for this issue
                Watchers:
                3 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: