Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-4523

Controlled shutdown fails if consumer group restabilizes during shutdown

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 0.10.1.0
    • 0.10.1.2, 0.10.2.0
    • None
    • None

    Description

      If I begin a controlled shutdown of a broker that is a coordinator for a consumer group, often the shutdown will fail with the following error:

      [2016-12-12 16:24:15,424] INFO [Replica Manager on Broker 10]: Shut down completely (kafka.server.ReplicaManager)
      [2016-12-12 16:24:15,424] INFO [ExpirationReaper-10], Shutting down (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
      [2016-12-12 16:24:15,450] INFO [ExpirationReaper-10], Stopped  (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
      [2016-12-12 16:24:15,450] INFO [ExpirationReaper-10], Shutdown completed (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
      [2016-12-12 16:24:15,451] INFO Shutting down. (kafka.log.LogManager)
      [2016-12-12 16:24:31,241] INFO [GroupCoordinator 10]: Preparing to restabilize group my-consumer-group with old generation 2673 (kafka.coordinator.GroupCoordinator)
      [2016-12-12 16:24:32,499] INFO [GroupCoordinator 10]: Group my-consumer-group with generation 2674 is now empty (kafka.coordinator.GroupCoordinator)
      [2016-12-12 16:24:32,515] FATAL [Replica Manager on Broker 10]: Halting due to unrecoverable I/O error while handling produce request:  (kafka.server.ReplicaManager)
      kafka.common.KafkaStorageException: I/O exception in append to log '__consumer_offsets-33'
      	at kafka.log.Log.append(Log.scala:349)
      	at kafka.cluster.Partition$$anonfun$10.apply(Partition.scala:443)
      	at kafka.cluster.Partition$$anonfun$10.apply(Partition.scala:429)
      	at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234)
      	at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:240)
      	at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:429)
      	at kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:407)
      	at kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:393)
      	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
      	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
      	at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)
      	at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
      	at scala.collection.AbstractTraversable.map(Traversable.scala:105)
      	at kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:393)
      	at kafka.server.ReplicaManager.appendMessages(ReplicaManager.scala:330)
      	at kafka.coordinator.GroupMetadataManager.store(GroupMetadataManager.scala:251)
      	at kafka.coordinator.GroupCoordinator$$anonfun$onCompleteJoin$6.apply(GroupCoordinator.scala:726)
      	at kafka.coordinator.GroupCoordinator$$anonfun$onCompleteJoin$6.apply(GroupCoordinator.scala:726)
      	at scala.Option.foreach(Option.scala:236)
      	at kafka.coordinator.GroupCoordinator.onCompleteJoin(GroupCoordinator.scala:726)
      	at kafka.coordinator.DelayedJoin.onComplete(DelayedJoin.scala:39)
      	at kafka.server.DelayedOperation.forceComplete(DelayedOperation.scala:70)
      	at kafka.coordinator.DelayedJoin$$anonfun$tryComplete$1.apply$mcZ$sp(DelayedJoin.scala:37)
      	at kafka.coordinator.GroupCoordinator.tryCompleteJoin(GroupCoordinator.scala:672)
      	at kafka.coordinator.DelayedJoin.tryComplete(DelayedJoin.scala:37)
      	at kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:315)
      	at kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:234)
      	at kafka.coordinator.GroupCoordinator.onMemberFailure(GroupCoordinator.scala:665)
      	at kafka.coordinator.GroupCoordinator.onExpireHeartbeat(GroupCoordinator.scala:740)
      	at kafka.coordinator.DelayedHeartbeat.onExpiration(DelayedHeartbeat.scala:33)
      	at kafka.server.DelayedOperation.run(DelayedOperation.scala:107)
      	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
      	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
      	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
      	at java.lang.Thread.run(Thread.java:745)
      Caused by: java.nio.channels.ClosedChannelException
      	at sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:110)
      	at sun.nio.ch.FileChannelImpl.size(FileChannelImpl.java:300)
      	at kafka.log.FileMessageSet.truncateTo(FileMessageSet.scala:405)
      	at kafka.log.FileMessageSet.trim(FileMessageSet.scala:378)
      	at kafka.log.Log.roll(Log.scala:773)
      	at kafka.log.Log.maybeRoll(Log.scala:742)
      	at kafka.log.Log.append(Log.scala:405)
      	... 35 more
      

      This then causes the broker to attempt to run recovery on all log segments on the next startup, which obviously is not ideal. It looks like the group coodinator is shutdown after the log manager [1], should the order be reversed?

      [1] https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/KafkaServer.scala#L588

      Attachments

        Issue Links

          Activity

            People

              SteveNiemitz Steve Niemitz
              SteveNiemitz Steve Niemitz
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: