Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-15005

Status of KafkaConnect task not correct

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Duplicate
    • 2.5.1, 3.0.0, 3.3.2
    • None
    • connect
    • None

    Description

      Our MM2 is running version 2.5.1.

      After a rebalance of our MM2 source tasks, we found there were several tasks always in UNASSIGNED status, even the real tasks already started. 

      So we dump the payload of the status topic of Kafka Connect, and found the last two status change is status RUNNING followed by status UNASSIGNED.

      LogAppendTime: 1684167885961 keysize: 64 valuesize: 95 sequence: -1 headerKeys: [] key: task-7 payload: {"state":"RUNNING","trace":null,"worker_id":"worker-1","generation":437643}
      
      LogAppendTime: 1684167903456 keysize: 64 valuesize: 98 sequence: -1 headerKeys: [] key: task-7 payload: {"state":"UNASSIGNED","trace":null,"worker_id":"worker-2","generation":437643}
       

      But usually, the RUNNING status should be appended after the UNASSIGNED, because the worker coordinator will revoked the tasks before start new tasks.

      Then we checked the log of our MM2 worker. And found that, during that time, there was a task that revoked on worker-2 and started on worker-1.

       

      Worker-1

      [2023-05-15 09:24:45,950] INFO [Worker clientId=connect-1, groupId=xxxx__group] Starting task task-7 (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
      
      [2023-05-15 09:24:45,951] INFO Creating task task-7 (org.apache.kafka.connect.runtime.Worker) 

      Worker-2

      [2023-05-15 09:24:40,922] INFO Stopping task task-7 (org.apache.kafka.connect.runtime.Worker) 

       

      So I think the incorrect status was caused by the revoked task finished later than the new started task, which made the UNASSIGNED status append to that status topic after the RUNNING status. 

       

      After reading the code of DistributeHerder, I found that the task revoking is running in a thread pool, the revoke operation just return after submit all the callables. So I think even in the same worker, there is not a guarantee that the revoke operation will always finish before the new tasks start.

      for (final ConnectorTaskId taskId : tasks) {
          callables.add(getTaskStoppingCallable(taskId));
      }
      
      // The actual timeout for graceful task/connector stop is applied in worker's
      // stopAndAwaitTask/stopAndAwaitConnector methods.
      startAndStop(callables);
      log.info("Finished stopping tasks in preparation for rebalance"); 

       

      Attachments

        Activity

          People

            Unassigned Unassigned
            LucentWong Yu Wang
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: