Description
In kraft mode, if listeners and advertised.listeners are not configured with host addresses, the host parameter value of Listener in BrokerRegistrationRequestData will be null. When the broker is started, a null pointer exception will be thrown, causing startup failure.
A feasible solution is to replace the empty host of endPoint in advertisedListeners with InetAddress.getLocalHost.getCanonicalHostName in Broker Server when building networkListeners.
The following is the debug log:
before fixing:
[2021-04-21 14:15:20,032] DEBUG (broker-2-to-controller-send-thread org.apache.kafka.clients.NetworkClient 522) [broker-2-to-controller] Sending BROKER_REGISTRATION request with header RequestHeader(apiKey=BROKER_REGIS
TRATION, apiVersion=0, clientId=2, correlationId=6) and timeout 30000 to node 2: BrokerRegistrationRequestData(brokerId=2, clusterId='nCqve6D1TEef3NpQniA0Mg', incarnationId=X8w4_1DFT2yUjOm6asPjIQ, listeners=[Listener(n
ame='PLAINTEXT', host=null, port=9092, securityProtocol=0)], features=[], rack=null)
[2021-04-21 14:15:20,033] ERROR (broker-2-to-controller-send-thread kafka.server.BrokerToControllerRequestThread 76) [broker-2-to-controller-send-thread]: unhandled exception caught in InterBrokerSendThread
java.lang.NullPointerException
at org.apache.kafka.common.message.BrokerRegistrationRequestData$Listener.addSize(BrokerRegistrationRequestData.java:515)
at org.apache.kafka.common.message.BrokerRegistrationRequestData.addSize(BrokerRegistrationRequestData.java:216)
at org.apache.kafka.common.protocol.SendBuilder.buildSend(SendBuilder.java:218)
at org.apache.kafka.common.protocol.SendBuilder.buildRequestSend(SendBuilder.java:187)
at org.apache.kafka.common.requests.AbstractRequest.toSend(AbstractRequest.java:101)
at org.apache.kafka.clients.NetworkClient.doSend(NetworkClient.java:525)
at org.apache.kafka.clients.NetworkClient.doSend(NetworkClient.java:501)
at org.apache.kafka.clients.NetworkClient.send(NetworkClient.java:461)
at kafka.common.InterBrokerSendThread.$anonfun$sendRequests$1(InterBrokerSendThread.scala:104)
at kafka.common.InterBrokerSendThread.$anonfun$sendRequests$1$adapted(InterBrokerSendThread.scala:99)
at kafka.common.InterBrokerSendThread$$Lambda$259/910445654.apply(Unknown Source)
at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:563)
at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:561)
at scala.collection.AbstractIterable.foreach(Iterable.scala:919)
at kafka.common.InterBrokerSendThread.sendRequests(InterBrokerSendThread.scala:99)
at kafka.common.InterBrokerSendThread.pollOnce(InterBrokerSendThread.scala:73)
at kafka.server.BrokerToControllerRequestThread.doWork(BrokerToControllerChannelManager.scala:368)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96)
[2021-04-21 14:15:20,034] INFO (broker-2-to-controller-send-thread kafka.server.BrokerToControllerRequestThread 66) [broker-2-to-controller-send-thread]: Stopped
after fixing:
[2021-04-21 15:05:01,095] DEBUG (BrokerToControllerChannelManager broker=2 name=heartbeat org.apache.kafka.clients.NetworkClient 512) [BrokerToControllerChannelManager broker=2 name=heartbeat] Sending BROKER_REGISTRATI
ON request with header RequestHeader(apiKey=BROKER_REGISTRATION, apiVersion=0, clientId=2, correlationId=0) and timeout 30000 to node 2: BrokerRegistrationRequestData(brokerId=2, clusterId='nCqve6D1TEef3NpQniA0Mg', inc
arnationId=xF29h_IRR1KzrERWwssQ2w, listeners=[Listener(name='PLAINTEXT', host='hdpxxx.cn', port=9092, securityProtocol=0)], features=[], rack=null)