Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-16837

Kafka Connect fails on update connector for incorrect previous Config Provider tasks

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 3.5.1, 3.6.1, 3.8.0
    • 3.8.0, 3.7.1
    • connect
    • None

    Description

      Hello,

      We faced an issue when is not possible to update Connector config if the previous task contains ConfigProvider's value with incorrect value that leads to ConfigException.

      I can provide simple Test Case to reproduce it with FileConfigProvider, but actually any ConfigProvider is acceptable that could raise exception if something wrong with config (like resource doesn't exist).

      Prerequisites:

      Kafka Connect instance with config providers:

       

      config.providers=file
      config.providers.file.class=org.apache.kafka.common.config.provider.FileConfigProvider

       

      1. Create Kafka topic "test"
      2. On the Kafka Connect instance create the file "/opt/kafka/provider.properties" with content

      topics=test
      

      3. Create simple FileSink connector:

      PUT /connectors/local-file-sink/config
      {
        "connector.class": "FileStreamSink",
        "tasks.max": "1",
        "file": "/opt/kafka/test.sink.txt",
        "topics": "${file:/opt/kafka/provider.properties:topics}"
      }
      

      4. Checks that everything works fine:

      GET /connectors?expand=info&expand=status
      ...
          "status": {
            "name": "local-file-sink",
            "connector": {
              "state": "RUNNING",
              "worker_id": "10.10.10.10:8083"
            },
            "tasks": [
              {
                "id": 0,
                "state": "RUNNING",
                "worker_id": "10.10.10.10:8083"
              }
            ],
            "type": "sink"
          }
        }
      }
      

      Looks fine.

      5. Renames the file to "/opt/kafka/provider2.properties".
      6. Update connector with new correct file name:

      PUT /connectors/local-file-sink/config
      {
        "connector.class": "FileStreamSink",
        "tasks.max": "1",
        "file": "/opt/kafka/test.sink.txt",
        "topics": "${file:/opt/kafka/provider2.properties:topics}"
      }
      

      Update succeed, got 200.
      7. Checks that everything works fine:

      {
        "local-file-sink": {
          "info": {
            "name": "local-file-sink",
            "config": {
              "connector.class": "FileStreamSink",
              "file": "/opt/kafka/test.sink.txt",
              "tasks.max": "1",
              "topics": "${file:/opt/kafka/provider2.properties:topics}",
              "name": "local-file-sink"
            },
            "tasks": [
              {
                "connector": "local-file-sink",
                "task": 0
              }
            ],
            "type": "sink"
          },
          "status": {
            "name": "local-file-sink",
            "connector": {
              "state": "RUNNING",
              "worker_id": "10.10.10.10:8083"
            },
            "tasks": [
              {
                "id": 0,
                "state": "FAILED",
                "worker_id": "10.10.10.10:8083",
                "trace": "org.apache.kafka.common.errors.InvalidTopicException: Invalid topics: [${file:/opt/kafka/provider.properties:topics}]"
              }
            ],
            "type": "sink"
          }
        }
      }
      

      Config has been updated, but new task has not been created. And as result connector doesn't work.

      It failed on:

      [2024-05-24T12:08:24.362][ERROR][request_id= ][tenant_id= ][thread=DistributedHerder-connect-1-1][class=org.apache.kafka.connect.runtime.distributed.DistributedHerder][method=lambda$reconfigureConnectorTasksWithExponentialBackoffRetries$44] [Worker clientId=connect-1, groupId=streaming-service_streaming_service] Failed to reconfigure connector's tasks (local-file-sink), retrying after backoff.
      org.apache.kafka.common.config.ConfigException: Could not read properties from file /opt/kafka/provider.properties
       at org.apache.kafka.common.config.provider.FileConfigProvider.get(FileConfigProvider.java:98)
       at org.apache.kafka.common.config.ConfigTransformer.transform(ConfigTransformer.java:103)
       at org.apache.kafka.connect.runtime.WorkerConfigTransformer.transform(WorkerConfigTransformer.java:58)
       at org.apache.kafka.connect.storage.ClusterConfigState.taskConfig(ClusterConfigState.java:181)
       at org.apache.kafka.connect.runtime.AbstractHerder.taskConfigsChanged(AbstractHerder.java:804)
       at org.apache.kafka.connect.runtime.distributed.DistributedHerder.publishConnectorTaskConfigs(DistributedHerder.java:2089)
       at org.apache.kafka.connect.runtime.distributed.DistributedHerder.reconfigureConnector(DistributedHerder.java:2082)
       at org.apache.kafka.connect.runtime.distributed.DistributedHerder.reconfigureConnectorTasksWithExponentialBackoffRetries(DistributedHerder.java:2025)
       at org.apache.kafka.connect.runtime.distributed.DistributedHerder.lambda$null$42(DistributedHerder.java:2038)
       at org.apache.kafka.connect.runtime.distributed.DistributedHerder.runRequest(DistributedHerder.java:2232)
       at org.apache.kafka.connect.runtime.distributed.DistributedHerder.tick(DistributedHerder.java:470)
       at org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:371)
       at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
       at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
       at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
       at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
       at java.base/java.lang.Thread.run(Thread.java:840)  

      As I understand it happens, because on the connector update AbstractHerder tries to update current tasks:

      https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java#L1051

      and before do it, herder tries to compare old task config and new one. But it doesn't compare original values, it tries to get ConfigProvider calculated value for previous task and failed as not possible to get file for previous task, by ConfigProvider.

      The main question do we really need to compare ConfigProvider calculated values there instead of comparing original configs?
      Now it leads to issues as lot of ConfigProviders usually raise Exception if resource not found.

       

      As WA we can remove and create connector, instead of update. But there is one case when it doesn't help: KAFKA-16838

      Attachments

        1. kafka_connect_config.png
          57 kB
          Sergey Ivanov

        Issue Links

          Activity

            People

              ChrisEgerton Chris Egerton
              mrMigles Sergey Ivanov
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: