Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-8676

Avoid Stopping Unnecessary Connectors and Tasks

    XMLWordPrintableJSON

    Details

    • Type: Improvement
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 2.3.0
    • Fix Version/s: 2.4.0, 2.3.1
    • Component/s: KafkaConnect
    • Labels:
    • Environment:
      centOS
    • Flags:
      Patch

      Description

      When adding a new connector or changing a connector configuration, Kafka Connect 2.3.0 will stop all existing tasks and start all the tasks, including the new tasks and the existing ones. However, it is not necessary at all. Only the new connector and tasks need to be started. As the rebalancing can be applied for both running and suspended tasks.The following patch will fix this problem and starts only the new tasks and connectors.

      The problem lies in the KafkaConfigBackingStore.ConsumeCallback.onCompletion() function (line 623 in KafkaConfigBackingStore.java). When record.key() startsWith "commit-", the tasks are being committed, and the deferred tasks are processed, Some new tasks are added to the 'updatedTasks'(line 623 in KafkaConfigBackingStore.java), and the 'updatedTasks' are sent to updateListener to complete the task configuration update(line 638 in KafkaConfigBackingStore.java). In the updateListener.onTaskConfigUpdate() function, the  'updatedTasks' are added to the member variable, 'taskConfigUpdates', of class DistributedHerder(line 1295 in DistributedHerder.java).

      In another thread, 'taskConfigUpdates' is copied to 'taskConfigUpdatesCopy' in updateConfigsWithIncrementalCooperative() (line 445 in DistributedHerder.java). The 'taskConfigUpdatesCopy' is subsequently used in processTaskConfigUpdatesWithIncrementalCooperative() (line 345 in DistributedHerder.java). This function then uses  'taskConfigUpdatesCopy' to find connectors to stop(line 492 in DistributedHerder.java), and finally get the tasks to stop, which are all the tasks. The worker thread does the actual job of stop(line 499 in DistributedHerder.java). 

      In the original code, all the tasks are added to the 'updatedTasks' (line 623 in KafkaConfigBackingStore.java), which means all the active connectors are in the 'connectorsWhoseTasksToStop' set, and all the tasks are in the 'tasksToStop' list. This causes the stops, and of course the subsequent restarts, of all the tasks. 

      So, adding only the 'deferred' tasks to the  'updatedTasks' can avoid the stops and restarts of unnecessary tasks.

        Attachments

          Activity

            People

            • Assignee:
              Unassigned
              Reporter:
              Echolly Luying Liu
              Reviewer:
              Konstantine Karantasis
            • Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Time Tracking

                Estimated:
                Original Estimate - 1h
                1h
                Remaining:
                Remaining Estimate - 1h
                1h
                Logged:
                Time Spent - Not Specified
                Not Specified