Description
If the filesystems of a multi-worker connect cluster are inconsistent, the FileConfigProvider may be able to find a configuration on worker A but not worker B.
This may lead to worker B experiencing a crash when given a connector/task assignment that was previously validated by worker A.
Steps to reproduce:
1. Configure a two-worker Connect cluster to use the FileConfigProvider
2. Place a secret file on worker A (leader) but not worker B (member).
3. Create a connector via REST which references the secret file on-disk.
4. Observe that the connector creation succeeds
5. Wait for a rebalance which assigns either the connector or task to worker B.
Expected behavior:
The connector/task is marked FAILED, and the exception is attributed to the FileConfigProvider not able to find the file.
Actual behavior:
Worker B prints this log message and shuts down:
[Worker clientId=connect-1, groupId=my-connect-cluster] Uncaught exception in herder work thread, exiting: 2org.apache.kafka.common.config.ConfigException: Invalid value java.nio.file.NoSuchFileException: /path/to/secrets/file.properties for configuration Could not read properties from file /path/to/secrets/file.properties at org.apache.kafka.common.config.provider.FileConfigProvider.get(FileConfigProvider.java:92) at org.apache.kafka.common.config.ConfigTransformer.transform(ConfigTransformer.java:103) at org.apache.kafka.connect.runtime.WorkerConfigTransformer.transform(WorkerConfigTransformer.java:58) at org.apache.kafka.connect.runtime.distributed.ClusterConfigState.connectorConfig(ClusterConfigState.java:135) at org.apache.kafka.connect.runtime.distributed.DistributedHerder.startConnector(DistributedHerder.java:1464) at org.apache.kafka.connect.runtime.distributed.DistributedHerder.processConnectorConfigUpdates(DistributedHerder.java:638) at org.apache.kafka.connect.runtime.distributed.DistributedHerder.tick(DistributedHerder.java:457) at org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:326) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630) at java.base/java.lang.Thread.run(Thread.java:831)
Having an inconsistent filesystem is not a recommended configuration, but it is preferable in such situations to prevent such a connector configuration error from crashing the worker irrecoverably.
Attachments
Issue Links
- is related to
-
KAFKA-14670 Refactor connect plugins to be called from wrapper classes
- In Progress
- links to