Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-7576

Dynamic update of replica fetcher threads may fail to start/close fetchers

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 1.1.1, 2.0.1, 2.1.0
    • Fix Version/s: 1.1.2, 2.1.1, 2.0.2
    • Component/s: core
    • Labels:
      None

      Description

      KAFKA-6051 moved ReplicaFetcherBlockingSend shutdown earlier in the shutdown sequence of ReplicaFetcherThread. As a result, shutdown of replica fetchers can now throw an exception because Selector may be closed on a different thread while data is being written on another thread. KAFKA-7464 changed this behaviour for 2.0.1 and 2.1.0. The exception during close() is now logged and not propagated to avoid exceptions during broker shutdown.

      When config update notification of `num.replica.fetchers` is processed, partitions are migrated as necessary to increase or decrease the number of fetcher threads. Existing fetchers are shutdown first before new ones are created.This migration is performed on the thread processing ZK change notification. The shutdown of Selector of existing fetchers is not safe since replica fetcher thread may be processing data at the time using the same Selector.

      Without the fix from KAFKA-7464, another update of the config or broker restart is required to restart the replica fetchers after dynamic config update if shutdown encounters an exception.

      Exception stack trace:

      java.lang.IllegalArgumentException
              at java.nio.Buffer.position(Buffer.java:244)
              at sun.nio.ch.IOUtil.write(IOUtil.java:68)
              at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
              at org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:210)
              at org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:160)
              at org.apache.kafka.common.utils.Utils.closeAll(Utils.java:718)
              at org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:70)
              at org.apache.kafka.common.network.Selector.doClose(Selector.java:748)
              at org.apache.kafka.common.network.Selector.close(Selector.java:736)
              at org.apache.kafka.common.network.Selector.close(Selector.java:698)
              at org.apache.kafka.common.network.Selector.close(Selector.java:314)
              at org.apache.kafka.clients.NetworkClient.close(NetworkClient.java:533)
              at kafka.server.ReplicaFetcherBlockingSend.close(ReplicaFetcherBlockingSend.scala:107)
              at kafka.server.ReplicaFetcherThread.initiateShutdown(ReplicaFetcherThread.scala:90)
              at kafka.server.AbstractFetcherThread.shutdown(AbstractFetcherThread.scala:86)
              at kafka.server.AbstractFetcherManager$$anonfun$migratePartitions$1$1.apply(AbstractFetcherManager.scala:76)
              at kafka.server.AbstractFetcherManager$$anonfun$migratePartitions$1$1.apply(AbstractFetcherManager.scala:72)
              at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130)
              at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130)
              at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236)
              at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
              at scala.collection.mutable.HashMap.foreach(HashMap.scala:130)
              at kafka.server.AbstractFetcherManager.migratePartitions$1(AbstractFetcherManager.scala:72)
              at kafka.server.AbstractFetcherManager.resizeThreadPool(AbstractFetcherManager.scala:88)
              at kafka.server.DynamicThreadPool.reconfigure(DynamicBrokerConfig.scala:574)
              at kafka.server.DynamicBrokerConfig$$anonfun$kafka$server$DynamicBrokerConfig$$updateCurrentConfig$1.apply(DynamicBrokerConfig.scala:410)
              at kafka.server.DynamicBrokerConfig$$anonfun$kafka$server$DynamicBrokerConfig$$updateCurrentConfig$1.apply(DynamicBrokerConfig.scala:410)
              at scala.collection.immutable.List.foreach(List.scala:392)
              at kafka.server.DynamicBrokerConfig.kafka$server$DynamicBrokerConfig$$updateCurrentConfig(DynamicBrokerConfig.scala:410)
      <SKIP>kafka.common.ZkNodeChangeNotificationListener$ChangeEventProcessThread.doWork(ZkNodeChangeNotificationListener.scala:135)
              at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
      

      The fix from KAFKA-7464 in 2.0.1 and 2.1.0 avoids the issue with creation of replica fetchers during dynamic update. But even for those branches, we should clean up the Selector to avoid resource leak in the dynamic config update case (discarding the exception may be sufficient when the broker is shutdown).

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                rsivaram Rajini Sivaram
                Reporter:
                rsivaram Rajini Sivaram
                Reviewer:
                Jason Gustafson
              • Votes:
                0 Vote for this issue
                Watchers:
                3 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: