Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-7576

Dynamic update of replica fetcher threads may fail to start/close fetchers

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 1.1.1, 2.0.1, 2.1.0
    • 1.1.2, 2.1.1, 2.0.2
    • core
    • None

    Description

      KAFKA-6051 moved ReplicaFetcherBlockingSend shutdown earlier in the shutdown sequence of ReplicaFetcherThread. As a result, shutdown of replica fetchers can now throw an exception because Selector may be closed on a different thread while data is being written on another thread. KAFKA-7464 changed this behaviour for 2.0.1 and 2.1.0. The exception during close() is now logged and not propagated to avoid exceptions during broker shutdown.

      When config update notification of `num.replica.fetchers` is processed, partitions are migrated as necessary to increase or decrease the number of fetcher threads. Existing fetchers are shutdown first before new ones are created.This migration is performed on the thread processing ZK change notification. The shutdown of Selector of existing fetchers is not safe since replica fetcher thread may be processing data at the time using the same Selector.

      Without the fix from KAFKA-7464, another update of the config or broker restart is required to restart the replica fetchers after dynamic config update if shutdown encounters an exception.

      Exception stack trace:

      java.lang.IllegalArgumentException
              at java.nio.Buffer.position(Buffer.java:244)
              at sun.nio.ch.IOUtil.write(IOUtil.java:68)
              at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
              at org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:210)
              at org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:160)
              at org.apache.kafka.common.utils.Utils.closeAll(Utils.java:718)
              at org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:70)
              at org.apache.kafka.common.network.Selector.doClose(Selector.java:748)
              at org.apache.kafka.common.network.Selector.close(Selector.java:736)
              at org.apache.kafka.common.network.Selector.close(Selector.java:698)
              at org.apache.kafka.common.network.Selector.close(Selector.java:314)
              at org.apache.kafka.clients.NetworkClient.close(NetworkClient.java:533)
              at kafka.server.ReplicaFetcherBlockingSend.close(ReplicaFetcherBlockingSend.scala:107)
              at kafka.server.ReplicaFetcherThread.initiateShutdown(ReplicaFetcherThread.scala:90)
              at kafka.server.AbstractFetcherThread.shutdown(AbstractFetcherThread.scala:86)
              at kafka.server.AbstractFetcherManager$$anonfun$migratePartitions$1$1.apply(AbstractFetcherManager.scala:76)
              at kafka.server.AbstractFetcherManager$$anonfun$migratePartitions$1$1.apply(AbstractFetcherManager.scala:72)
              at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130)
              at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130)
              at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236)
              at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
              at scala.collection.mutable.HashMap.foreach(HashMap.scala:130)
              at kafka.server.AbstractFetcherManager.migratePartitions$1(AbstractFetcherManager.scala:72)
              at kafka.server.AbstractFetcherManager.resizeThreadPool(AbstractFetcherManager.scala:88)
              at kafka.server.DynamicThreadPool.reconfigure(DynamicBrokerConfig.scala:574)
              at kafka.server.DynamicBrokerConfig$$anonfun$kafka$server$DynamicBrokerConfig$$updateCurrentConfig$1.apply(DynamicBrokerConfig.scala:410)
              at kafka.server.DynamicBrokerConfig$$anonfun$kafka$server$DynamicBrokerConfig$$updateCurrentConfig$1.apply(DynamicBrokerConfig.scala:410)
              at scala.collection.immutable.List.foreach(List.scala:392)
              at kafka.server.DynamicBrokerConfig.kafka$server$DynamicBrokerConfig$$updateCurrentConfig(DynamicBrokerConfig.scala:410)
      <SKIP>kafka.common.ZkNodeChangeNotificationListener$ChangeEventProcessThread.doWork(ZkNodeChangeNotificationListener.scala:135)
              at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
      

      The fix from KAFKA-7464 in 2.0.1 and 2.1.0 avoids the issue with creation of replica fetchers during dynamic update. But even for those branches, we should clean up the Selector to avoid resource leak in the dynamic config update case (discarding the exception may be sufficient when the broker is shutdown).

      Attachments

        Issue Links

          Activity

            People

              rsivaram Rajini Sivaram
              rsivaram Rajini Sivaram
              Jason Gustafson Jason Gustafson
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: