Description
If I begin a controlled shutdown of a broker that is a coordinator for a consumer group, often the shutdown will fail with the following error:
[2016-12-12 16:24:15,424] INFO [Replica Manager on Broker 10]: Shut down completely (kafka.server.ReplicaManager) [2016-12-12 16:24:15,424] INFO [ExpirationReaper-10], Shutting down (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper) [2016-12-12 16:24:15,450] INFO [ExpirationReaper-10], Stopped (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper) [2016-12-12 16:24:15,450] INFO [ExpirationReaper-10], Shutdown completed (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper) [2016-12-12 16:24:15,451] INFO Shutting down. (kafka.log.LogManager) [2016-12-12 16:24:31,241] INFO [GroupCoordinator 10]: Preparing to restabilize group my-consumer-group with old generation 2673 (kafka.coordinator.GroupCoordinator) [2016-12-12 16:24:32,499] INFO [GroupCoordinator 10]: Group my-consumer-group with generation 2674 is now empty (kafka.coordinator.GroupCoordinator) [2016-12-12 16:24:32,515] FATAL [Replica Manager on Broker 10]: Halting due to unrecoverable I/O error while handling produce request: (kafka.server.ReplicaManager) kafka.common.KafkaStorageException: I/O exception in append to log '__consumer_offsets-33' at kafka.log.Log.append(Log.scala:349) at kafka.cluster.Partition$$anonfun$10.apply(Partition.scala:443) at kafka.cluster.Partition$$anonfun$10.apply(Partition.scala:429) at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234) at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:240) at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:429) at kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:407) at kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:393) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.immutable.Map$Map1.foreach(Map.scala:109) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:393) at kafka.server.ReplicaManager.appendMessages(ReplicaManager.scala:330) at kafka.coordinator.GroupMetadataManager.store(GroupMetadataManager.scala:251) at kafka.coordinator.GroupCoordinator$$anonfun$onCompleteJoin$6.apply(GroupCoordinator.scala:726) at kafka.coordinator.GroupCoordinator$$anonfun$onCompleteJoin$6.apply(GroupCoordinator.scala:726) at scala.Option.foreach(Option.scala:236) at kafka.coordinator.GroupCoordinator.onCompleteJoin(GroupCoordinator.scala:726) at kafka.coordinator.DelayedJoin.onComplete(DelayedJoin.scala:39) at kafka.server.DelayedOperation.forceComplete(DelayedOperation.scala:70) at kafka.coordinator.DelayedJoin$$anonfun$tryComplete$1.apply$mcZ$sp(DelayedJoin.scala:37) at kafka.coordinator.GroupCoordinator.tryCompleteJoin(GroupCoordinator.scala:672) at kafka.coordinator.DelayedJoin.tryComplete(DelayedJoin.scala:37) at kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:315) at kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:234) at kafka.coordinator.GroupCoordinator.onMemberFailure(GroupCoordinator.scala:665) at kafka.coordinator.GroupCoordinator.onExpireHeartbeat(GroupCoordinator.scala:740) at kafka.coordinator.DelayedHeartbeat.onExpiration(DelayedHeartbeat.scala:33) at kafka.server.DelayedOperation.run(DelayedOperation.scala:107) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.nio.channels.ClosedChannelException at sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:110) at sun.nio.ch.FileChannelImpl.size(FileChannelImpl.java:300) at kafka.log.FileMessageSet.truncateTo(FileMessageSet.scala:405) at kafka.log.FileMessageSet.trim(FileMessageSet.scala:378) at kafka.log.Log.roll(Log.scala:773) at kafka.log.Log.maybeRoll(Log.scala:742) at kafka.log.Log.append(Log.scala:405) ... 35 more
This then causes the broker to attempt to run recovery on all log segments on the next startup, which obviously is not ideal. It looks like the group coodinator is shutdown after the log manager [1], should the order be reversed?
[1] https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/KafkaServer.scala#L588
Attachments
Issue Links
- links to