Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
3.3.0, 3.4.0, 3.3.1, 3.3.2, 3.4.1, 3.6.0, 3.5.1, 3.5.2, 3.6.1
-
None
Description
Source Connectors with 'exactly.once.support = required' may have some of their tasks that issue InitProducerId requests from the admin client timeout. In the case of MirrorSourceConnector, which was the source connector that i found the bug, the bug was effectively making all the tasks (in the specific case of) become "FAILED". As soon as one of the tasks gets FAILED due to the 'COORDINATOR_NOT_AVAILABLE' messages (due to timeouts), no matter how many restarts i did to the connector/tasks, i couldn't get the MirrorSourceConnector in a healthy RUNNING state again.
Due to the low timeout that has been hard-coded in the code (1ms), there is a chance that the `InitProducerId` requests timeout in case of "slower-than-expected" Kafka brokers (that do not process & respond to the above request in <= 1ms). (feel free to read more information about the issue in the "More Context" section below)
Chris Egerton I would appreciate it if you could respond to the following questions
- How and why was the 1ms magic number for transaction timeout has to be chosen?
- Is there any specific reason that it can be guaranteed that the `InitProducerId` request can be processed in such a small time window?
- I have tried the above in multiple different Kafka clusters that are hosted in different underlying datacenter hosts and i don't believe that those brokers are "slow" for some reason. If you feel that the brokers are slower than expected, i would appreciate any pointers on how could i find out what is the bottleneck
Temporary Mitigation
I have increased the timeout to 1000ms (randomly picked this number, just wanted to give enough time to brokers to always complete those type of requests). It fix can be found in my fork https://github.com/akaltsikis/kafka/commit/8a47992e7dc63954f9d9ac54e8ed1f5a7737c97f
Final solution
The temporary mitigation is not ideal, as it still randomly picks a timeout for such an operation which may high enough but it's not ensured that it will always be high enough. Shall we introduce something client configurable ?
At the same time, i was thinking whether it makes sense to introduce some tests that simulate slower than the "blazing" fast mocked brokers that exist in Unit Tests, so as to be able to catch this type of low timeouts that potentially make some software features not usable.
What is affected
The above bug exists in MirrorSourceConnector Tasks running in distributed Kafka connect cluster or MIrrorMaker 2 jobs that run with distributed mode enabled (pre-requisite for the exactly.once.support to work). I believe this should be true for other SourceConnectors as well (as the code-path that was the one to blame is Connect specific & not MirrorMaker specific).
More context & logs
Connector Logs
Caused by: java.util.concurrent.CompletionException: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment. Call: fenceProducer(api=INIT_PRODUCER_ID)
Broker Logs
[2023-12-12 14:28:18,030] INFO [TransactionCoordinator id=<id>] Returning COORDINATOR_NOT_AVAILABLE error code to client for kafka-connect-uat-mm2-msc-20th-7's InitProducerId request (kafka.coordinator.transaction.TransactionCoordinator) [2023-12-12 14:28:18,030] INFO [Transaction State Manager 1001]: TransactionalId kafka-connect-uat-mm2-msc-20th-7 append transaction log for TxnTransitMetadata(producerId=61137, lastProducerId=61137, producerEpoch=2, lastProducerEpoch=-1, txnTimeoutMs=1, txnState=Empty, topicPartitions=Set(), txnStartTimestamp=-1, txnLastUpdateTimestamp=1702391298028) transition failed due to COORDINATOR_NOT_AVAILABLE, resetting pending state from Some(Empty), aborting state transition and returning COORDINATOR_NOT_AVAILABLE in the callback (kafka.coordinator.transaction.TransactionStateManager)
How to reproduce it
While the bug exists in both the Standalone MM2 deployment, it's easier to reproduce it via deploying the connector to a Kafka Connect cluster (as it is possible to update the config/delete/restart/pause/stop/resume via the Kafka Connect REST API)
Thus, Deploy a MirrorSourceConnector on a Kafka connect cluster (with `exactly.once.source.support = enabled`) and after the initial start, update it's configuration or restart the connector & tasks.
To test whether my fork has fixed the issue once and for good i have created the following script, which constantly restarts the connector every few seconds (after it's tasks get in RUNNING state). I have been running the scripts for a few hours and the MirrorSourceConnector never got in a state that was non recoverable (as it was happening on the upstream versions)
#!/bin/bash # Source vars source /<path>/connect.sh # Kafka Connect API endpoint KAFKA_CONNECT_API=$KAFKA_CONNECT_URL # Kafka Connect connector name CONNECTOR_NAME="<connector_name>" while true; do # Fetch the connector status connector_status=$(curl -k -u $KAFKA_CONNECT_BASIC_AUTH_USERNAME:$KAFKA_CONNECT_BASIC_AUTH_PASSWORD -s "$KAFKA_CONNECT_API/connectors/$CONNECTOR_NAME/status") # Check if connector is in FAILED state if echo "$connector_status" | grep -q '"state":"FAILED"'; then echo "Connector has failed. Exiting." exit 1 fi # Fetch and check all task statuses task_statuses=$(echo "$connector_status" | jq '.tasks[].state') all_running=true for status in $task_statuses; do if [ "$status" != '"RUNNING"' ]; then all_running=false break fi done # If all tasks and the connector are RUNNING, restart them after 90 seconds if $all_running; then echo "All tasks are running. Restarting in 90 seconds." sleep 90 date;curl -k -X POST -H "Content-Type: application/json" -u $KAFKA_CONNECT_BASIC_AUTH_USERNAME:$KAFKA_CONNECT_BASIC_AUTH_PASSWORD $KAFKA_CONNECT_API/connectors/$CONNECTOR_NAME/restart\?includeTasks=true else echo "Not all tasks are running. Checking again..." fi # Sleep for a while before checking again sleep 10 done