Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-12777

AutoTopicCreationManager does not handle response errors

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Critical
    • Resolution: Fixed
    • 3.0.0
    • 3.0.0
    • None
    • None

    Description

      The request completion callback in AutoTopicCreationManager assumes the response is present.

      override def onComplete(response: ClientResponse): Unit = {
              debug(s"Auto topic creation completed for ${creatableTopics.keys} with response ${response.responseBody.toString}.")
              clearInflightRequests(creatableTopics)
            }
      

      We should at least check to see if the response exists before logging it and clearing the in-flight flag.

      This was found while debugging a separate issue. Here is a log snippet:

      [2021-05-13 11:21:03,890] DEBUG [BrokerToControllerChannelManager broker=1 name=forwarding] Version mismatch when attempting to send EnvelopeRequestData(requestData=java.nio.HeapByteBuffer[pos=0 lim=43 cap=43], requestPrincipal=[0, 0, 5, 85, 115, 101, 114, 10, 65, 78, 79, 78, 89, 77, 79, 85, 83, 0, 0], clientHostAddress=[127, 0, 0, 1]) with correlation id 2 to 0 (org.apache.kafka.clients.NetworkClient:495)
      org.apache.kafka.common.errors.UnsupportedVersionException: The broker does not support ENVELOPE
      [2021-05-13 11:21:03,893] ERROR [BrokerToControllerChannelManager broker=1 name=forwarding]: Request EnvelopeRequestData(requestData=java.nio.HeapByteBuffer[pos=0 lim=43 cap=43], requestPrincipal=[0, 0, 5, 85, 115, 101, 114, 10, 65, 78, 79, 78, 89, 77, 79, 85, 83, 0, 0], clientHostAddress=[127, 0, 0, 1]) failed due to unsupported version error (kafka.server.BrokerToControllerRequestThread:76)
      org.apache.kafka.common.errors.UnsupportedVersionException: The broker does not support ENVELOPE
      [2021-05-13 11:21:03,894] ERROR [BrokerToControllerChannelManager broker=1 name=forwarding] Uncaught error in request completion: (org.apache.kafka.clients.NetworkClient:576)
      java.lang.NullPointerException
      	at kafka.server.DefaultAutoTopicCreationManager$$anon$1.$anonfun$onComplete$1(AutoTopicCreationManager.scala:179)
      	at kafka.utils.Logging.debug(Logging.scala:62)
      	at kafka.utils.Logging.debug$(Logging.scala:62)
      	at kafka.server.DefaultAutoTopicCreationManager.debug(AutoTopicCreationManager.scala:67)
      	at kafka.server.DefaultAutoTopicCreationManager$$anon$1.onComplete(AutoTopicCreationManager.scala:179)
      	at kafka.server.BrokerToControllerRequestThread.handleResponse(BrokerToControllerChannelManager.scala:355)
      	at kafka.server.BrokerToControllerRequestThread.$anonfun$generateRequests$1(BrokerToControllerChannelManager.scala:339)
      	at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
      	at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:574)
      	at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:545)
      	at kafka.common.InterBrokerSendThread.pollOnce(InterBrokerSendThread.scala:74)
      	at kafka.server.BrokerToControllerRequestThread.doWork(BrokerToControllerChannelManager.scala:374)
      	at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96)
      

      I suspect the NPE is due to response.responseBody being null.

      Attachments

        Activity

          People

            mumrah David Arthur
            mumrah David Arthur
            Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: