Details
-
Bug
-
Status: Open
-
Major
-
Resolution: Unresolved
-
2.4.0
-
None
-
None
Description
When connecting to a broker via ssl and producer.* or consumer.* properties is not set, connectors and tasks have incorrect statuses RUNNING. But must be FAILED, because there are ssl errors in logs.
For example, the FILE-TEST connector has the RUNNING status, although there are errors in the logs of the ssl producer and the lack of a file file.txt
GET https://host:8084/connectors/FILE-TEST/status
{ "name": "FILE-TEST", "connector": { "state": "RUNNING", "worker_id": "host:8084" }, "tasks": [ { "id": 0, "state": "RUNNING", "worker_id": "host:8084" } ], "type": "source" }
connect.log [2020-09-21 09:56:15,794] DEBUG [Producer clientId=connector-producer-FILE-TEST-0] Connection with host/1.2.3.4 disconnected (org.apache.kafka.common.network.Selector)[2020-09-21 09:56:15,794] DEBUG [Producer clientId=connector-producer-FILE-TEST-0] Connection with host/1.2.3.4 disconnected (org.apache.kafka.common.network.Selector)java.io.EOFException at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:119) at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:424) at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:385) at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:651) at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:572) at org.apache.kafka.common.network.Selector.poll(Selector.java:483) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:540) at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:335) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:244) at java.lang.Thread.run(Thread.java:748) [2020-09-21 09:56:16,648] DEBUG [Producer clientId=connector-producer-FILE-TEST-0] Give up sending metadata request since no node is available (org.apache.kafka.clients.NetworkClient) [2020-09-21 09:56:16,690] DEBUG [Consumer clientId=consumer-connect-cluster-1, groupId=connect-cluster] Node 1 sent an incremental fetch response for session 542278996 with 0 response partition(s), 1 implied partition(s) (org.apache.kafka.clients.FetchSessionHandler) [2020-09-21 09:56:16,691] DEBUG [Consumer clientId=consumer-connect-cluster-1, groupId=connect-cluster] Added READ_UNCOMMITTED fetch request for partition connect-offsets-0 at position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=host:9093 (id: 1 rack: null), epoch=0}} to node host:9093 (id: 1 rack: null) (org.apache.kafka.clients.consumer.internals.Fetcher) [2020-09-21 09:56:16,691] DEBUG [Consumer clientId=consumer-connect-cluster-1, groupId=connect-cluster] Built incremental fetch (sessionId=542278996, epoch=512) for node 1. Added 0 partition(s), altered 0 partition(s), removed 0 partition(s) out of 1 partition(s) (org.apache.kafka.clients.FetchSessionHandler) [2020-09-21 09:56:16,691] DEBUG [Consumer clientId=consumer-connect-cluster-1, groupId=connect-cluster] Sending READ_UNCOMMITTED IncrementalFetchRequest(toSend=(), toForget=(), implied=(connect-offsets-0)) to broker tkli-host:9093 (id: 1 rack: null) (org.apache.kafka.clients.consumer.internals.Fetcher) messages: Sep 21 09:56:46 host kafka-server-start: [2020-09-21 09:56:46,987] INFO [SocketServer brokerId=1] Failed authentication with /1.2.3.4 (SSL handshake failed) (org.apache.kafka.common.network.Selector)
connect-distributed.properties
bootstrap.servers=host:9093 config.storage.replication.factor=1 config.storage.topic=connect-configs group.id=connect-cluster key.converter=org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable=true listeners=https://host:8084 listeners.https.ssl.client.auth=required listeners.https.ssl.enabled.protocols=TLSv1.2 listeners.https.ssl.key.password=q1w2e3r4 listeners.https.ssl.keystore.location=connect.keystore.jks listeners.https.ssl.keystore.password=q1w2e3r4 listeners.https.ssl.truststore.location=connect.truststore.jks listeners.https.ssl.truststore.password=q1w2e3r4 offset.flush.interval.ms=10000 offset.storage.replication.factor=1 offset.storage.topic=connect-offsets plugin.path=share/java rest.advertised.listener=https security.protocol=SSL ssl.client.auth=required ssl.enabled.protocols=TLSv1.2 ssl.endpoint.identification.algorithm=https ssl.key.password=q1w2e3r4 ssl.keystore.location=connect.keystore.jks ssl.keystore.password=q1w2e3r4 ssl.truststore.location=connect.truststore.jks ssl.truststore.password=q1w2e3r4 status.storage.replication.factor=1 status.storage.topic=connect-status value.converter=org.apache.kafka.connect.json.JsonConverter value.converter.schemas.enable=true