Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-7464

Fail to shutdown ReplicaManager during broker cleaned shutdown

Attach filesAttach ScreenshotVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Critical
    • Resolution: Fixed
    • 2.0.0
    • 2.0.1, 2.1.0
    • None
    • None

    Description

      In 2.0 deployment, we saw the following log when shutting down the ReplicaManager in broker cleaned shutdown:

      2018/09/27 08:22:18.699 WARN [CoreUtils$] [Thread-1] [kafka-server] [] null
      java.lang.IllegalArgumentException: null
              at java.nio.Buffer.position(Buffer.java:244) ~[?:1.8.0_121]
              at sun.nio.ch.IOUtil.write(IOUtil.java:68) ~[?:1.8.0_121]
              at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471) ~[?:1.8.0_121]
              at org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:214) ~[kafka-clients-2.0.0.22.jar:?]
              at org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:164) ~[kafka-clients-2.0.0.22.jar:?]
              at org.apache.kafka.common.utils.Utils.closeAll(Utils.java:806) ~[kafka-clients-2.0.0.22.jar:?]
              at org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:107) ~[kafka-clients-2.0.0.22.jar:?]
              at org.apache.kafka.common.network.Selector.doClose(Selector.java:751) ~[kafka-clients-2.0.0.22.jar:?]
              at org.apache.kafka.common.network.Selector.close(Selector.java:739) ~[kafka-clients-2.0.0.22.jar:?]
              at org.apache.kafka.common.network.Selector.close(Selector.java:701) ~[kafka-clients-2.0.0.22.jar:?]
              at org.apache.kafka.common.network.Selector.close(Selector.java:315) ~[kafka-clients-2.0.0.22.jar:?]
              at org.apache.kafka.clients.NetworkClient.close(NetworkClient.java:595) ~[kafka-clients-2.0.0.22.jar:?]
              at kafka.server.ReplicaFetcherBlockingSend.close(ReplicaFetcherBlockingSend.scala:107) ~[kafka_2.11-2.0.0.22.jar:?]
              at kafka.server.ReplicaFetcherThread.initiateShutdown(ReplicaFetcherThread.scala:108) ~[kafka_2.11-2.0.0.22.jar:?]
              at kafka.server.AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:183) ~[kafka_2.11-2.0.0.22.jar:?]
              at kafka.server.AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:182) ~[kafka_2.11-2.0.0.22.jar:?]
              at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733) ~[scala-library-2.11.12.jar:?]
              at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130) ~[scala-library-2.11.12.jar:?]
              at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130) ~[scala-library-2.11.12.jar:?]
              at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236) ~[scala-library-2.11.12.jar:?]
              at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40) ~[scala-library-2.11.12.jar:?]
              at scala.collection.mutable.HashMap.foreach(HashMap.scala:130) ~[scala-library-2.11.12.jar:?]
              at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732) ~[scala-library-2.11.12.jar:?]
              at kafka.server.AbstractFetcherManager.closeAllFetchers(AbstractFetcherManager.scala:182) ~[kafka_2.11-2.0.0.22.jar:?]
              at kafka.server.ReplicaFetcherManager.shutdown(ReplicaFetcherManager.scala:37) ~[kafka_2.11-2.0.0.22.jar:?]
              at kafka.server.ReplicaManager.shutdown(ReplicaManager.scala:1471) ~[kafka_2.11-2.0.0.22.jar:?]
              at kafka.server.KafkaServer$$anonfun$shutdown$12.apply$mcV$sp(KafkaServer.scala:616) ~[kafka_2.11-2.0.0.22.jar:?]
              at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:86) ~[kafka_2.11-2.0.0.22.jar:?]
              at kafka.server.KafkaServer.shutdown(KafkaServer.scala:616) ~[kafka_2.11-2.0.0.22.jar:?]
      

      After that, we noticed that some of the replica fetcher thread fail to shutdown:

      2018/09/27 08:22:46.176 ERROR [LogDirFailureChannel] [ReplicaFetcherThread-26-13085] [kafka-server] [] Error while rolling log segment for video-social-gestures-30 in dir /export/content/kafka/i001_caches
      java.nio.channels.ClosedChannelException: null
              at sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:110) ~[?:1.8.0_121]
              at sun.nio.ch.FileChannelImpl.size(FileChannelImpl.java:300) ~[?:1.8.0_121]
              at org.apache.kafka.common.record.FileRecords.truncateTo(FileRecords.java:244) ~[kafka-clients-2.0.0.22.jar:?]
              at org.apache.kafka.common.record.FileRecords.trim(FileRecords.java:206) ~[kafka-clients-2.0.0.22.jar:?]
              at kafka.log.LogSegment.onBecomeInactiveSegment(LogSegment.scala:512) ~[kafka_2.11-2.0.0.22.jar:?]
              at kafka.log.Log$$anonfun$roll$2$$anonfun$apply$30.apply(Log.scala:1493) ~[kafka_2.11-2.0.0.22.jar:?]
              at kafka.log.Log$$anonfun$roll$2$$anonfun$apply$30.apply(Log.scala:1493) ~[kafka_2.11-2.0.0.22.jar:?]
              at scala.Option.foreach(Option.scala:257) ~[scala-library-2.11.12.jar:?]
              at kafka.log.Log$$anonfun$roll$2.apply(Log.scala:1493) ~[kafka_2.11-2.0.0.22.jar:?]
              at kafka.log.Log$$anonfun$roll$2.apply(Log.scala:1479) ~[kafka_2.11-2.0.0.22.jar:?]
              at kafka.log.Log.maybeHandleIOException(Log.scala:1856) ~[kafka_2.11-2.0.0.22.jar:?]
              at kafka.log.Log.roll(Log.scala:1479) ~[kafka_2.11-2.0.0.22.jar:?]
              at kafka.log.Log.kafka$log$Log$$maybeRoll(Log.scala:1465) ~[kafka_2.11-2.0.0.22.jar:?]
              at kafka.log.Log$$anonfun$append$2.apply(Log.scala:868) ~[kafka_2.11-2.0.0.22.jar:?]
              at kafka.log.Log$$anonfun$append$2.apply(Log.scala:762) ~[kafka_2.11-2.0.0.22.jar:?]
              at kafka.log.Log.maybeHandleIOException(Log.scala:1856) ~[kafka_2.11-2.0.0.22.jar:?]
              at kafka.log.Log.append(Log.scala:762) ~[kafka_2.11-2.0.0.22.jar:?]
              at kafka.log.Log.appendAsFollower(Log.scala:743) ~[kafka_2.11-2.0.0.22.jar:?]
              at kafka.cluster.Partition$$anonfun$doAppendRecordsToFollowerOrFutureReplica$1.apply(Partition.scala:601) ~[kafka_2.11-2.0.0.22.jar:?]
              at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251) ~[kafka_2.11-2.0.0.22.jar:?]
              at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:257) ~[kafka_2.11-2.0.0.22.jar:?]
              at kafka.cluster.Partition.doAppendRecordsToFollowerOrFutureReplica(Partition.scala:588) ~[kafka_2.11-2.0.0.22.jar:?]
              at kafka.cluster.Partition.appendRecordsToFollowerOrFutureReplica(Partition.scala:608) ~[kafka_2.11-2.0.0.22.jar:?]
              at kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:130) ~[kafka_2.11-2.0.0.22.jar:?]
              at kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:43) ~[kafka_2.11-2.0.0.22.jar:?]
              at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:188) ~[kafka_2.11-2.0.0.22.jar:?]
              at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:174) ~[kafka_2.11-2.0.0.22.jar:?]
              at scala.Option.foreach(Option.scala:257) ~[scala-library-2.11.12.jar:?]
              at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:174) ~[kafka_2.11-2.0.0.22.jar:?]
              at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:171) ~[kafka_2.11-2.0.0.22.jar:?]
              at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) ~[scala-library-2.11.12.jar:?]
              at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) ~[scala-library-2.11.12.jar:?]
              at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply$mcV$sp(AbstractFetcherThread.scala:171) ~[kafka_2.11-2.0.0.22.jar:?]
              at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:171) ~[kafka_2.11-2.0.0.22.jar:?]
              at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:171) ~[kafka_2.11-2.0.0.22.jar:?]
              at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251) ~[kafka_2.11-2.0.0.22.jar:?]
              at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:169) ~[kafka_2.11-2.0.0.22.jar:?]
              at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:115) ~[kafka_2.11-2.0.0.22.jar:?]
              at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82) ~[kafka_2.11-2.0.0.22.jar:?]

      Worse more, we found out that if there is a exception thrown in ReplicaFetcherManager shutdown, we basically will skip purgatory shutdown and HW checkpoint and in our case we didn't see the "Shut down completely" log:

       def shutdown(checkpointHW: Boolean = true) {
          info("Shutting down")
          removeMetrics()
          if (logDirFailureHandler != null)
            logDirFailureHandler.shutdown()
          replicaFetcherManager.shutdown()
          replicaAlterLogDirsManager.shutdown()
          delayedFetchPurgatory.shutdown()
          delayedProducePurgatory.shutdown()
          delayedDeleteRecordsPurgatory.shutdown()
          if (checkpointHW)
            checkpointHighWatermarks()
          info("Shut down completely")
        }
      

      The reason why we see this is that after KAFKA-6051, we close leaderEndPoint in replica fetcher thread initiateShutdown to try to preempt in-progress fetch request and accelerate repica fetcher thread shutdown. However, leaderEndpoint can throw an Exception when the replica fetcher thread is still actively fetching.

       

      I am wondering whether we should try to catch the exception thrown in "leaderEndpoint.close()" instead of letting it throw up in the call stack. In my opinion, it is safe to do so because ReplicaFetcherThread.initiateShutdown will be called when:

      1. Server shutdown – In this case we will shut down the process anyway so even though we fail to close leader enpoint cleanly there is no harm.
      2. shutdownIdleFetcherThread – In this case the fetcher thread is idle and we will not use it again anyway so there is no harm either.

      Attachments

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            hzxa21 Zhanxiang (Patrick) Huang
            hzxa21 Zhanxiang (Patrick) Huang
            Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Slack

                Issue deployment