Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-16047

Source connector with EOS enabled have some InitProducerId requests timing out, effectively failing all the tasks & the whole connector

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 3.3.0, 3.4.0, 3.3.1, 3.3.2, 3.4.1, 3.6.0, 3.5.1, 3.5.2, 3.6.1
    • 3.8.0, 3.7.1
    • connect, mirrormaker
    • None

    Description

      Source Connectors with 'exactly.once.support = required' may have some of their tasks that issue InitProducerId requests from the admin client timeout. In the case of MirrorSourceConnector, which was the source connector that i found the bug, the bug was effectively making all the tasks (in the specific case of) become "FAILED". As soon as one of the tasks gets FAILED due to the 'COORDINATOR_NOT_AVAILABLE' messages (due to timeouts), no matter how many restarts i did to the connector/tasks, i couldn't get the MirrorSourceConnector in a healthy RUNNING state again.

      Due to the low timeout that has been hard-coded in the code (1ms), there is a chance that the `InitProducerId` requests timeout in case of "slower-than-expected" Kafka brokers (that do not process & respond to the above request in <= 1ms). (feel free to read more information about the issue in the "More Context" section below)

      ChrisEgerton I would appreciate it if you could respond to the following questions

      • How and why was the 1ms magic number for transaction timeout has to be chosen?
      • Is there any specific reason that it can be guaranteed that the `InitProducerId` request can be processed in such a small time window?
      • I have tried the above in multiple different Kafka clusters that are hosted in different underlying datacenter hosts and i don't believe that those brokers are "slow" for some reason. If you feel that the brokers are slower than expected, i would appreciate any pointers on how could i find out what is the bottleneck

      Temporary Mitigation

      I have increased the timeout to 1000ms (randomly picked this number, just wanted to give enough time to brokers to always complete those type of requests). It fix can be found in my fork https://github.com/akaltsikis/kafka/commit/8a47992e7dc63954f9d9ac54e8ed1f5a7737c97f

      Final solution

      The temporary mitigation is not ideal, as it still randomly picks a timeout for such an operation which may high enough but it's not ensured that it will always be high enough. Shall we introduce something client configurable ?
      At the same time, i was thinking whether it makes sense to introduce some tests that simulate slower than the "blazing" fast mocked brokers that exist in Unit Tests, so as to be able to catch this type of low timeouts that potentially make some software features not usable.

      What is affected

      The above bug exists in MirrorSourceConnector Tasks running in distributed Kafka connect cluster or MIrrorMaker 2 jobs that run with distributed mode enabled (pre-requisite for the exactly.once.support to work). I believe this should be true for other SourceConnectors as well (as the code-path that was the one to blame is Connect specific & not MirrorMaker specific).

      More context & logs

      Connector Logs

      Caused by: java.util.concurrent.CompletionException: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment. Call: fenceProducer(api=INIT_PRODUCER_ID)
      

      Broker Logs

      [2023-12-12 14:28:18,030] INFO [TransactionCoordinator id=<id>] Returning COORDINATOR_NOT_AVAILABLE error code to client for kafka-connect-uat-mm2-msc-20th-7's InitProducerId request (kafka.coordinator.transaction.TransactionCoordinator)
      
      
      [2023-12-12 14:28:18,030] INFO [Transaction State Manager 1001]: TransactionalId kafka-connect-uat-mm2-msc-20th-7 append transaction log for TxnTransitMetadata(producerId=61137, lastProducerId=61137, producerEpoch=2, lastProducerEpoch=-1, txnTimeoutMs=1, txnState=Empty, topicPartitions=Set(), txnStartTimestamp=-1, txnLastUpdateTimestamp=1702391298028) transition failed due to COORDINATOR_NOT_AVAILABLE, resetting pending state from Some(Empty), aborting state transition and returning COORDINATOR_NOT_AVAILABLE in the callback (kafka.coordinator.transaction.TransactionStateManager)
      

      How to reproduce it

      While the bug exists in both the Standalone MM2 deployment, it's easier to reproduce it via deploying the connector to a Kafka Connect cluster (as it is possible to update the config/delete/restart/pause/stop/resume via the Kafka Connect REST API)
      Thus, Deploy a MirrorSourceConnector on a Kafka connect cluster (with `exactly.once.source.support = enabled`) and after the initial start, update it's configuration or restart the connector & tasks.

      To test whether my fork has fixed the issue once and for good i have created the following script, which constantly restarts the connector every few seconds (after it's tasks get in RUNNING state). I have been running the scripts for a few hours and the MirrorSourceConnector never got in a state that was non recoverable (as it was happening on the upstream versions)

      #!/bin/bash
      
      # Source vars
      source /<path>/connect.sh
      # Kafka Connect API endpoint
      KAFKA_CONNECT_API=$KAFKA_CONNECT_URL
      
      # Kafka Connect connector name
      CONNECTOR_NAME="<connector_name>"
      
      while true; do
          # Fetch the connector status
          connector_status=$(curl -k -u $KAFKA_CONNECT_BASIC_AUTH_USERNAME:$KAFKA_CONNECT_BASIC_AUTH_PASSWORD -s "$KAFKA_CONNECT_API/connectors/$CONNECTOR_NAME/status")
      
          # Check if connector is in FAILED state
          if echo "$connector_status" | grep -q '"state":"FAILED"'; then
              echo "Connector has failed. Exiting."
              exit 1
          fi
      
          # Fetch and check all task statuses
          task_statuses=$(echo "$connector_status" | jq '.tasks[].state')
          all_running=true
          for status in $task_statuses; do
              if [ "$status" != '"RUNNING"' ]; then
                  all_running=false
                  break
              fi
          done
      
          # If all tasks and the connector are RUNNING, restart them after 90 seconds
          if $all_running; then
              echo "All tasks are running. Restarting in 90 seconds."
              sleep 90
              date;curl -k -X POST -H "Content-Type: application/json" -u $KAFKA_CONNECT_BASIC_AUTH_USERNAME:$KAFKA_CONNECT_BASIC_AUTH_PASSWORD $KAFKA_CONNECT_API/connectors/$CONNECTOR_NAME/restart\?includeTasks=true
          else
              echo "Not all tasks are running. Checking again..."
          fi
      
          # Sleep for a while before checking again
          sleep 10
      done
      

      Attachments

        Activity

          People

            ecomar Edoardo Comar
            akaltsikis Angelos Kaltsikis
            Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: