If a log is being flushed when Kafka is shut down, KafkaScheduler immediately terminates the execution of the flush tasks, causing exceptions to be thrown and aborting the clean shutdown process. When restarted, the instance will launch in recovery mode, scanning the most recent log segment for each topic before launching.
This issue is particularly highlighted by environments which have a large number of topics and a low flush threshold. In our case, nearly every deploy to our Kafka service in which the app is restarted triggers an unclean shutdown followed by startup in recovery mode. The real pain here is reading through every log file on each restart, but this can be avoided entirely.
The behavior is caused by the way the KafkaScheduler threadpool (responsible for log flushing) is shut down. Currently, KafkaScheduler.shutdown() calls "executor.shutdownNow()." executor.shutdownNow()'s behavior immediately cancels all running tasks in a threadpool and terminates. This causes each Log to throw a java.nio.channels.ClosedByInterruptException as the flush to disk is interrupted, aborting the clean shutdown. Here's a sample log of the exceptions we see thrown when restarting Kafka: https://gist.github.com/1265940
The fix for this is pretty simple. Rather than calling executor.shutdownNow() in KafkaScheduler.shutdown(), we can call executor.shutdown() (compare Javadoc: http://bit.ly/mZUSaD). This method "initiates an orderly shutdown in which previously submitted tasks are executed, but no new tasks will be accepted." Currently-running flushes are allowed to complete (generally very quick), future flush tasks are cancelled, no exceptions are thrown, and the shutdown finishes cleanly within about a second.
I've attached the patch we've applied and deployed. It applies cleanly to 0.6.1.