Kafka
  1. Kafka
  2. KAFKA-126

Log flush should complete upon broker shutdown

    Details

    • Type: Bug Bug
    • Status: Resolved
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 0.7
    • Component/s: None
    • Labels:
      None

      Description

      Broker shutdown currently forces the flush scheduler to shutdown(Now). This leads to an unclean shutdown. cleanupLogs may be affected by a similar scenario.

      2011/08/31 09:45:34.833 ERROR [LogManager] [kafka-logflusher-0] [kafka] Error flushing topic MyTopic
      java.nio.channels.ClosedByInterruptException
      at java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:184)
      at sun.nio.ch.FileChannelImpl.force(FileChannelImpl.java:362)
      at kafka.message.FileMessageSet.flush(FileMessageSet.scala:174)
      at kafka.log.Log.flush(Log.scala:306)
      at kafka.log.LogManager$$anonfun$kafka$log$LogManager$$flushAllLogs$1.apply(LogManager.scala:274)
      at kafka.log.LogManager$$anonfun$kafka$log$LogManager$$flushAllLogs$1.apply(LogManager.scala:263)
      at scala.collection.Iterator$class.foreach(Iterator.scala:631)
      at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30)
      at kafka.log.LogManager.kafka$log$LogManager$$flushAllLogs(LogManager.scala:263)
      at kafka.log.LogManager$$anonfun$startup$1.apply$mcV$sp(LogManager.scala:129)
      at kafka.utils.Utils$$anon$2.run(Utils.scala:58)
      at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
      at java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:317)
      at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:150)
      at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$101(ScheduledThreadPoolExecutor.java:98)
      at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.runPeriodic(ScheduledThreadPoolExecutor.java:181)
      at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:205)
      at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
      at java.lang.Thread.run(Thread.java:619)

      A possible fix this would be to use shutdown() instead of shutdownNow() in the scheduler.

      1. KAFKA-126_v1.patch
        3 kB
        Joel Koshy
      2. KAFKA-126_v2.patch
        11 kB
        Joel Koshy
      3. KAFKA-126_v3.patch
        4 kB
        Joel Koshy

        Issue Links

          Activity

          Hide
          Joel Koshy added a comment -

          This is pretty much the simple fix that I suggested in the description. i.e., change the shutdown behavior of KafkaScheduler to use shutdown() instead of shutdownNow(). I also added a shutdownNow() method to KafkaScheduler for access to the immediate shutdown behavior.

          For testing: I changed the flush settings to:
          log.flush.interval=10000
          log.default.flush.interval.ms=0
          log.default.flush.scheduler.interval.ms=20
          ran ProducerPerformance and shut down the broker a couple of times. These settings consistently reproduce the exception without this fix.

          The other usages of shutdown (now called shutdownNow) are by the zookeeper offset committer, and the LogManager's log cleanup. I think these schedulers should also switch to using shutdown() and that is accomplished by this patch.

          Finally, I'm piggy backing an small unrelated warning message in the producer config - i.e. when both zk.connect and broker.list are specified, zk.connect takes precedence. (Let me know if you prefer this to be removed from this patch.)

          Show
          Joel Koshy added a comment - This is pretty much the simple fix that I suggested in the description. i.e., change the shutdown behavior of KafkaScheduler to use shutdown() instead of shutdownNow(). I also added a shutdownNow() method to KafkaScheduler for access to the immediate shutdown behavior. For testing: I changed the flush settings to: log.flush.interval=10000 log.default.flush.interval.ms=0 log.default.flush.scheduler.interval.ms=20 ran ProducerPerformance and shut down the broker a couple of times. These settings consistently reproduce the exception without this fix. The other usages of shutdown (now called shutdownNow) are by the zookeeper offset committer, and the LogManager's log cleanup. I think these schedulers should also switch to using shutdown() and that is accomplished by this patch. Finally, I'm piggy backing an small unrelated warning message in the producer config - i.e. when both zk.connect and broker.list are specified, zk.connect takes precedence. (Let me know if you prefer this to be removed from this patch.)
          Hide
          Neha Narkhede added a comment -

          Using the executor shutdown API, the process would block until the currently executing thread finishes execution. I guess that works fine unless for some odd reason, the thread blocks on one action forever, in which case the process would have to be killed manually. I can't imagine of a concrete example of when it could block forever though.

          On the other hand, I guess that if/when that kind of blocking happens, it is a serious problem/bug that needs attention anyways. So using the shutdown() API looks like a good approach.

          +1

          Show
          Neha Narkhede added a comment - Using the executor shutdown API, the process would block until the currently executing thread finishes execution. I guess that works fine unless for some odd reason, the thread blocks on one action forever, in which case the process would have to be killed manually. I can't imagine of a concrete example of when it could block forever though. On the other hand, I guess that if/when that kind of blocking happens, it is a serious problem/bug that needs attention anyways. So using the shutdown() API looks like a good approach. +1
          Hide
          Jun Rao added a comment -

          Thanks Joel for the patch. The zookeeper offset committer should use shutdownNow. If there is anything wrong with ZK server, we still want to be able to shut down a consumer immedidately.

          Show
          Jun Rao added a comment - Thanks Joel for the patch. The zookeeper offset committer should use shutdownNow. If there is anything wrong with ZK server, we still want to be able to shut down a consumer immedidately.
          Hide
          Joel Koshy added a comment -

          That makes sense. Here is the updated patch.

          Show
          Joel Koshy added a comment - That makes sense. Here is the updated patch.
          Hide
          Joel Koshy added a comment -

          Oops - forgot to rebase.

          Show
          Joel Koshy added a comment - Oops - forgot to rebase.
          Hide
          Jun Rao added a comment -

          Thanks, Joel. Just committed this.

          Show
          Jun Rao added a comment - Thanks, Joel. Just committed this.

            People

            • Assignee:
              Unassigned
              Reporter:
              Joel Koshy
            • Votes:
              0 Vote for this issue
              Watchers:
              0 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development