Details
Description
KAFKA-6051 moved ReplicaFetcherBlockingSend shutdown earlier in the shutdown sequence of ReplicaFetcherThread. As a result, shutdown of replica fetchers can now throw an exception because Selector may be closed on a different thread while data is being written on another thread. KAFKA-7464 changed this behaviour for 2.0.1 and 2.1.0. The exception during close() is now logged and not propagated to avoid exceptions during broker shutdown.
When config update notification of `num.replica.fetchers` is processed, partitions are migrated as necessary to increase or decrease the number of fetcher threads. Existing fetchers are shutdown first before new ones are created.This migration is performed on the thread processing ZK change notification. The shutdown of Selector of existing fetchers is not safe since replica fetcher thread may be processing data at the time using the same Selector.
Without the fix from KAFKA-7464, another update of the config or broker restart is required to restart the replica fetchers after dynamic config update if shutdown encounters an exception.
Exception stack trace:
java.lang.IllegalArgumentException
at java.nio.Buffer.position(Buffer.java:244)
at sun.nio.ch.IOUtil.write(IOUtil.java:68)
at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
at org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:210)
at org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:160)
at org.apache.kafka.common.utils.Utils.closeAll(Utils.java:718)
at org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:70)
at org.apache.kafka.common.network.Selector.doClose(Selector.java:748)
at org.apache.kafka.common.network.Selector.close(Selector.java:736)
at org.apache.kafka.common.network.Selector.close(Selector.java:698)
at org.apache.kafka.common.network.Selector.close(Selector.java:314)
at org.apache.kafka.clients.NetworkClient.close(NetworkClient.java:533)
at kafka.server.ReplicaFetcherBlockingSend.close(ReplicaFetcherBlockingSend.scala:107)
at kafka.server.ReplicaFetcherThread.initiateShutdown(ReplicaFetcherThread.scala:90)
at kafka.server.AbstractFetcherThread.shutdown(AbstractFetcherThread.scala:86)
at kafka.server.AbstractFetcherManager$$anonfun$migratePartitions$1$1.apply(AbstractFetcherManager.scala:76)
at kafka.server.AbstractFetcherManager$$anonfun$migratePartitions$1$1.apply(AbstractFetcherManager.scala:72)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130)
at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:130)
at kafka.server.AbstractFetcherManager.migratePartitions$1(AbstractFetcherManager.scala:72)
at kafka.server.AbstractFetcherManager.resizeThreadPool(AbstractFetcherManager.scala:88)
at kafka.server.DynamicThreadPool.reconfigure(DynamicBrokerConfig.scala:574)
at kafka.server.DynamicBrokerConfig$$anonfun$kafka$server$DynamicBrokerConfig$$updateCurrentConfig$1.apply(DynamicBrokerConfig.scala:410)
at kafka.server.DynamicBrokerConfig$$anonfun$kafka$server$DynamicBrokerConfig$$updateCurrentConfig$1.apply(DynamicBrokerConfig.scala:410)
at scala.collection.immutable.List.foreach(List.scala:392)
at kafka.server.DynamicBrokerConfig.kafka$server$DynamicBrokerConfig$$updateCurrentConfig(DynamicBrokerConfig.scala:410)
<SKIP>kafka.common.ZkNodeChangeNotificationListener$ChangeEventProcessThread.doWork(ZkNodeChangeNotificationListener.scala:135)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
The fix from KAFKA-7464 in 2.0.1 and 2.1.0 avoids the issue with creation of replica fetchers during dynamic update. But even for those branches, we should clean up the Selector to avoid resource leak in the dynamic config update case (discarding the exception may be sufficient when the broker is shutdown).
Attachments
Issue Links
- is duplicated by
-
KAFKA-7607 NetworkClientUtils.sendAndReceive can take a long time to return during shutdown
- Resolved
- links to