Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-17719

Connect may fail to start tasks when reading from a compacted config topic

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • None
    • None
    • connect
    • None

    Description

      The fix for KAFKA-16838 (via https://github.com/apache/kafka/pull/16122) alters the logic for materializing a view of the config topic to ignore task configs when there is no configuration for that connector present earlier in the config topic. However, the logic fails to consider topics that might get compacted over time.

      In particular, when we have a connector C1 running fine, the records in the config topic for the connector will look something like C1, T1, T2, Task-commit-record.

      If the connector gets a config update that doesn't produce any new task configs (note that this is a valid case when there are no task config changes[1]) we only produce a Connector config record [2]. The config topic now looks like C1, T1, T2, Task-commit-record, C1. However, if the topic gets compacted we will end up with T1, T2, Task-commit-record, C1. This can be a common scenario in large and old connect clusters.

      Based on the changes for KAFKA-16838, when the connect worker reads this config state it ignores the task configs [3] for this while the connector is still active and we might have active assignments for the same. The symptom of this issue is an NPE which shows up when trying to start the tasks: 

      java.lang.NullPointerException: Cannot invoke "java.util.Map.size()" because "inputMap" is null
      	at org.apache.kafka.common.utils.Utils.castToStringObjectMap
      	at org.apache.kafka.common.config.AbstractConfig.<init>
      	at org.apache.kafka.common.config.AbstractConfig.<init>
      	at org.apache.kafka.connect.runtime.TaskConfig.<init>
      	at org.apache.kafka.connect.runtime.Worker.startTask)
      	at org.apache.kafka.connect.runtime.Worker.startSourceTask
      	at org.apache.kafka.connect.runtime.distributed.DistributedHerder.startTask
      	at org.apache.kafka.connect.runtime.distributed.DistributedHerder.lambda$getTaskStartingCallable$41
      	at java.base/java.util.concurrent.FutureTask.run
      	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker
      	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run
      	at java.base/java.lang.Thread.run(Thread.java:1583)

       

      [1] - https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java#L1047 
      [2] - https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java#L524
      [3] - https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java#L1072-L1074

      Attachments

        Activity

          People

            Unassigned Unassigned
            cmukka20 Chaitanya Mukka
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated: