Kafka
  1. Kafka
  2. KAFKA-695

Broker shuts down due to attempt to read a closed index file

    Details

    • Type: Bug Bug
    • Status: Closed
    • Priority: Blocker Blocker
    • Resolution: Fixed
    • Affects Version/s: 0.8.0
    • Fix Version/s: 0.8.0
    • Component/s: log
    • Labels:

      Description

      Broker shuts down with the following error message -

      013/01/11 01:43:51.320 ERROR [KafkaApis] [request-expiration-task] [kafka] [] [KafkaApi-277] error when processing request (service_metrics,2,39192,2000000)
      java.nio.channels.ClosedChannelException
      at sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:88)
      at sun.nio.ch.FileChannelImpl.read(FileChannelImpl.java:613)
      at kafka.log.FileMessageSet.searchFor(FileMessageSet.scala:82)
      at kafka.log.LogSegment.translateOffset(LogSegment.scala:76)
      at kafka.log.LogSegment.read(LogSegment.scala:106)
      at kafka.log.Log.read(Log.scala:386)
      at kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSet(KafkaApis.scala:369)
      at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:327)
      at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:323)
      at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
      at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
      at scala.collection.immutable.Map$Map1.foreach(Map.scala:105)
      at scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
      at scala.collection.immutable.Map$Map1.map(Map.scala:93)
      at kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSets(KafkaApis.scala:323)
      at kafka.server.KafkaApis$FetchRequestPurgatory.expire(KafkaApis.scala:519)
      at kafka.server.KafkaApis$FetchRequestPurgatory.expire(KafkaApis.scala:501)
      at kafka.server.RequestPurgatory$ExpiredRequestReaper.run(RequestPurgatory.scala:222)
      at java.lang.Thread.run(Thread.java:619)
      2013/01/11 01:43:52.815 INFO [Processor] [kafka-processor-10251-2] [kafka] [] Closing socket connection to /172.20.72.244.
      2013/01/11 01:43:54.286 INFO [Processor] [kafka-processor-10251-3] [kafka] [] Closing socket connection to /172.20.72.243.
      2013/01/11 01:43:54.385 ERROR [LogManager] [kafka-logflusher-1] [kafka] [] [Log Manager on Broker 277] Error flushing topic service_metrics
      java.nio.channels.ClosedChannelException
      at sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:88)
      at sun.nio.ch.FileChannelImpl.force(FileChannelImpl.java:349)
      at kafka.log.FileMessageSet$$anonfun$flush$1.apply$mcV$sp(FileMessageSet.scala:154)
      at kafka.log.FileMessageSet$$anonfun$flush$1.apply(FileMessageSet.scala:154)
      at kafka.log.FileMessageSet$$anonfun$flush$1.apply(FileMessageSet.scala:154)
      at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
      at kafka.log.FileMessageSet.flush(FileMessageSet.scala:153)
      at kafka.log.LogSegment.flush(LogSegment.scala:151)
      at kafka.log.Log.flush(Log.scala:493)
      at kafka.log.LogManager$$anonfun$kafka$log$LogManager$$flushDirtyLogs$2.apply(LogManager.scala:319)
      at kafka.log.LogManager$$anonfun$kafka$log$LogManager$$flushDirtyLogs$2.apply(LogManager.scala:310)
      at scala.collection.Iterator$class.foreach(Iterator.scala:631)
      at scala.collection.JavaConversions$JIteratorWrapper.foreach(JavaConversions.scala:474)
      at scala.collection.IterableLike$class.foreach(IterableLike.scala:79)
      at scala.collection.JavaConversions$JCollectionWrapper.foreach(JavaConversions.scala:495)
      at kafka.log.LogManager.kafka$log$LogManager$$flushDirtyLogs(LogManager.scala:310)
      at kafka.log.LogManager$$anonfun$startup$2.apply$mcV$sp(LogManager.scala:144)
      at kafka.utils.Utils$$anon$2.run(Utils.scala:66)
      at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
      at java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:317)
      at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:150)
      at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$101(ScheduledThreadPoolExecutor.java:98)
      at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.runPeriodic(ScheduledThreadPoolExecutor.java:181)
      at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:205)
      at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
      at java.lang.Thread.run(Thread.java:619)
      2013/01/11 01:43:54.447 FATAL [LogManager] [kafka-logflusher-1] [kafka] [] [Log Manager on Broker 277] Halting due to unrecoverable I/O error while flushing logs: null
      java.nio.channels.ClosedChannelException
      at sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:88)
      at sun.nio.ch.FileChannelImpl.force(FileChannelImpl.java:349)
      at kafka.log.FileMessageSet$$anonfun$flush$1.apply$mcV$sp(FileMessageSet.scala:154)
      at kafka.log.FileMessageSet$$anonfun$flush$1.apply(FileMessageSet.scala:154)
      at kafka.log.FileMessageSet$$anonfun$flush$1.apply(FileMessageSet.scala:154)
      at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
      at kafka.log.FileMessageSet.flush(FileMessageSet.scala:153)
      at kafka.log.LogSegment.flush(LogSegment.scala:151)
      at kafka.log.Log.flush(Log.scala:493)

      at kafka.log.LogSegment.flush(LogSegment.scala:151)
      at kafka.log.Log.flush(Log.scala:493)
      at kafka.log.LogManager$$anonfun$kafka$log$LogManager$$flushDirtyLogs$2.apply(LogManager.scala:319)
      at kafka.log.LogManager$$anonfun$kafka$log$LogManager$$flushDirtyLogs$2.apply(LogManager.scala:310)
      at scala.collection.Iterator$class.foreach(Iterator.scala:631)
      at scala.collection.JavaConversions$JIteratorWrapper.foreach(JavaConversions.scala:474)
      at scala.collection.IterableLike$class.foreach(IterableLike.scala:79)
      at scala.collection.JavaConversions$JCollectionWrapper.foreach(JavaConversions.scala:495)
      at kafka.log.LogManager.kafka$log$LogManager$$flushDirtyLogs(LogManager.scala:310)
      at kafka.log.LogManager$$anonfun$startup$2.apply$mcV$sp(LogManager.scala:144)
      at kafka.utils.Utils$$anon$2.run(Utils.scala:66)
      at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
      at java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:317)
      at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:150)
      at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$101(ScheduledThreadPoolExecutor.java:98)
      at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.runPeriodic(ScheduledThreadPoolExecutor.java:181)
      at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:205)
      at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
      at java.lang.Thread.run(Thread.java:619)
      2013/01/11 01:43:54.512 INFO [ComponentsContextLoaderListener] [Thread-2] [kafka] [] Shutting down...

      1. kafka-695_followup_v2.patch
        10 kB
        Jun Rao
      2. kafka-695_followup.patch
        8 kB
        Jun Rao
      3. kafka-695_v2.patch
        3 kB
        Jun Rao
      4. kafka-695.patch
        2 kB
        Jun Rao

        Issue Links

          Activity

          Hide
          Jay Kreps added a comment -

          Hmm, so this can happen. Currently in 0.8 when we delete a file there is nothing that block reads on that file. The assumption is that this only happens at the tail of the log and is rare. This bug problem is fixed on trunk by the async delete patch.

          However the question is whether this is really what is happening...?

          Show
          Jay Kreps added a comment - Hmm, so this can happen. Currently in 0.8 when we delete a file there is nothing that block reads on that file. The assumption is that this only happens at the tail of the log and is rare. This bug problem is fixed on trunk by the async delete patch. However the question is whether this is really what is happening...?
          Hide
          Jun Rao added a comment -

          The weird thing is that the file actually exists on disk (this is a low volume topic that has only 1 segment) and there was no logging that shows the segment has been deleted.

          Show
          Jun Rao added a comment - The weird thing is that the file actually exists on disk (this is a low volume topic that has only 1 segment) and there was no logging that shows the segment has been deleted.
          Hide
          Jay Kreps added a comment -

          Some research:
          1. Taking also KAFKA-719 we have examples of this happening in the background flush, the read, and in append(). Flush and append only happen on the active segment so that complicates things.
          2. The FileChannel in FileMessageSet is private so the only way to close it is to either call close() or delete() on the message set.
          3. Since delete() first calls close() it is hard to say which code path was taken (you would get the same error message).
          4. The only call to close() is LogSegment.close() the only call to that is in Log.close() so that's not it.
          5. There are several calls to delete() but all are inside the lock except for deleteSegments.
          6. deleteSegments should, intuitively, not delete the active segment since we force a segment roll if needed inside the lock inside markDeleteWhile()
          7. But it is possible for deleteSegments to collide with truncateTo, but not sure how feasible this is.

          Show
          Jay Kreps added a comment - Some research: 1. Taking also KAFKA-719 we have examples of this happening in the background flush, the read, and in append(). Flush and append only happen on the active segment so that complicates things. 2. The FileChannel in FileMessageSet is private so the only way to close it is to either call close() or delete() on the message set. 3. Since delete() first calls close() it is hard to say which code path was taken (you would get the same error message). 4. The only call to close() is LogSegment.close() the only call to that is in Log.close() so that's not it. 5. There are several calls to delete() but all are inside the lock except for deleteSegments. 6. deleteSegments should, intuitively, not delete the active segment since we force a segment roll if needed inside the lock inside markDeleteWhile() 7. But it is possible for deleteSegments to collide with truncateTo, but not sure how feasible this is.
          Hide
          Jun Rao added a comment -

          Another piece of info is that the log segment seems to exist after the broker is shut down (it's loaded on broker restart). So, it didn't seem that the segment was deleted. After restart, the same problem shows up after the broker is running for about 1 day.

          Show
          Jun Rao added a comment - Another piece of info is that the log segment seems to exist after the broker is shut down (it's loaded on broker restart). So, it didn't seem that the segment was deleted. After restart, the same problem shows up after the broker is running for about 1 day.
          Hide
          Jun Rao added a comment -

          I have a theory of what's happening here. What we overlooked is that there is another possibility for us to get a closed channel, other than explicitly closing it. If a thread is in the middle of a read/write of a file channel and the thread is interrupted. The channel will be closed automatically. I guess the following is what has happened.

          The ExpiredRequestReaper thread is in the middle of expiring a FetchRequest and gets interrupted by ExpiredRequestReaper.forcePurge(). When the interruption occurs, the reaper thread could be reading the file channel (see stracktrace below). This will cause the file channel to be closed. All subsequent reads and writes on this file channel will fail due to ClosedChannelException.

          java.nio.channels.ClosedChannelException
          at sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:88)
          at sun.nio.ch.FileChannelImpl.read(FileChannelImpl.java:613)
          at kafka.log.FileMessageSet.searchFor(FileMessageSet.scala:83)
          at kafka.log.LogSegment.translateOffset(LogSegment.scala:76)
          at kafka.log.LogSegment.read(LogSegment.scala:91)
          at kafka.log.Log.read(Log.scala:390)
          at kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSet(KafkaApis.scala:372)
          at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:330)
          at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:326)
          at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
          at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
          at scala.collection.immutable.Map$Map1.foreach(Map.scala:105)
          at scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
          at scala.collection.immutable.Map$Map1.map(Map.scala:93)
          at kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSets(KafkaApis.scala:326)
          at kafka.server.KafkaApis$FetchRequestPurgatory.expire(KafkaApis.scala:528)
          at kafka.server.KafkaApis$FetchRequestPurgatory.expire(KafkaApis.scala:510)
          at kafka.server.RequestPurgatory$ExpiredRequestReaper.run(RequestPurgatory.scala:222)
          at java.lang.Thread.run(Thread.java:619)

          Attach a quick fix. The problem is that we shouldn't be using interrupt to communicate with the ExpiredRequestReaper thread since it has dangerous side effects. The patch basically uses a boolean flag to indicate that a full purge is needed and changes the ExpiredRequestReaper thread not to block for more than 500ms (so that it gets a chance to do the full purge).

          Not sure what's the best way to unit test this though.

          Show
          Jun Rao added a comment - I have a theory of what's happening here. What we overlooked is that there is another possibility for us to get a closed channel, other than explicitly closing it. If a thread is in the middle of a read/write of a file channel and the thread is interrupted. The channel will be closed automatically. I guess the following is what has happened. The ExpiredRequestReaper thread is in the middle of expiring a FetchRequest and gets interrupted by ExpiredRequestReaper.forcePurge(). When the interruption occurs, the reaper thread could be reading the file channel (see stracktrace below). This will cause the file channel to be closed. All subsequent reads and writes on this file channel will fail due to ClosedChannelException. java.nio.channels.ClosedChannelException at sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:88) at sun.nio.ch.FileChannelImpl.read(FileChannelImpl.java:613) at kafka.log.FileMessageSet.searchFor(FileMessageSet.scala:83) at kafka.log.LogSegment.translateOffset(LogSegment.scala:76) at kafka.log.LogSegment.read(LogSegment.scala:91) at kafka.log.Log.read(Log.scala:390) at kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSet(KafkaApis.scala:372) at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:330) at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:326) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) at scala.collection.immutable.Map$Map1.foreach(Map.scala:105) at scala.collection.TraversableLike$class.map(TraversableLike.scala:206) at scala.collection.immutable.Map$Map1.map(Map.scala:93) at kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSets(KafkaApis.scala:326) at kafka.server.KafkaApis$FetchRequestPurgatory.expire(KafkaApis.scala:528) at kafka.server.KafkaApis$FetchRequestPurgatory.expire(KafkaApis.scala:510) at kafka.server.RequestPurgatory$ExpiredRequestReaper.run(RequestPurgatory.scala:222) at java.lang.Thread.run(Thread.java:619) Attach a quick fix. The problem is that we shouldn't be using interrupt to communicate with the ExpiredRequestReaper thread since it has dangerous side effects. The patch basically uses a boolean flag to indicate that a full purge is needed and changes the ExpiredRequestReaper thread not to block for more than 500ms (so that it gets a chance to do the full purge). Not sure what's the best way to unit test this though.
          Hide
          Jay Kreps added a comment -

          This is a pretty brilliant catch.

          That code should not be there. It looks like the interrupt from shutdown (which is okay maybe cause you are shutting down) somehow got pasted into elsewhere.

          Show
          Jay Kreps added a comment - This is a pretty brilliant catch. That code should not be there. It looks like the interrupt from shutdown (which is okay maybe cause you are shutting down) somehow got pasted into elsewhere.
          Hide
          Jun Rao added a comment -

          So, do you think the patch is ok or do you plan to provide a better patch?

          Show
          Jun Rao added a comment - So, do you think the patch is ok or do you plan to provide a better patch?
          Hide
          Jay Kreps added a comment -

          I'm not sure. I think I need to rewind back to what forcePurge is trying to do. From what I see I think we are replaying an immediate interrupt with a 500ms check which may be fine, though obviously its higher latency. The purpose of forcePurge is to try to clean out dead memory?

          Show
          Jay Kreps added a comment - I'm not sure. I think I need to rewind back to what forcePurge is trying to do. From what I see I think we are replaying an immediate interrupt with a 500ms check which may be fine, though obviously its higher latency. The purpose of forcePurge is to try to clean out dead memory?
          Hide
          Neha Narkhede added a comment -

          Great catch! One question just to see if I understood the problem correctly. So the problem happens if the forcePurge() interrupts the expired request reaper while reading message set from the log. In that case, we should also see "error when processing request.." in the kafka log, because we catch all throwables in the readMessageSet() API. Do you see that ?

          >> The purpose of forcePurge is to try to clean out dead memory?

          Yes, this was added as part of fixing KAFKA-664

          I think the patch looks good, it is fine to delay the full purge by a few ms.

          Show
          Neha Narkhede added a comment - Great catch! One question just to see if I understood the problem correctly. So the problem happens if the forcePurge() interrupts the expired request reaper while reading message set from the log. In that case, we should also see "error when processing request.." in the kafka log, because we catch all throwables in the readMessageSet() API. Do you see that ? >> The purpose of forcePurge is to try to clean out dead memory? Yes, this was added as part of fixing KAFKA-664 I think the patch looks good, it is fine to delay the full purge by a few ms.
          Hide
          Neha Narkhede added a comment -

          The LogRecoveryTest throws the following new error messages now -

          [2013-01-28 10:15:54,534] ERROR ExpiredRequestReaper-0 Error in long poll expiry thread: (kafka.server.RequestPurgatory$ExpiredRequestReaper:102)
          java.lang.InterruptedException
          at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:1961)
          at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2038)
          at java.util.concurrent.DelayQueue.poll(DelayQueue.java:209)
          at kafka.server.RequestPurgatory$ExpiredRequestReaper.pollExpired(RequestPurgatory.scala:269)
          at kafka.server.RequestPurgatory$ExpiredRequestReaper.run(RequestPurgatory.scala:221)
          at java.lang.Thread.run(Thread.java:680)
          [2013-01-28 10:15:54,534] ERROR ExpiredRequestReaper-0 Error in long poll expiry thread: (kafka.server.RequestPurgatory$ExpiredRequestReaper:102)
          java.lang.InterruptedException
          at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:1961)
          at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2038)
          at java.util.concurrent.DelayQueue.poll(DelayQueue.java:209)
          at kafka.server.RequestPurgatory$ExpiredRequestReaper.pollExpired(RequestPurgatory.scala:269)
          at kafka.server.RequestPurgatory$ExpiredRequestReaper.run(RequestPurgatory.scala:221)
          at java.lang.Thread.run(Thread.java:680)

          Show
          Neha Narkhede added a comment - The LogRecoveryTest throws the following new error messages now - [2013-01-28 10:15:54,534] ERROR ExpiredRequestReaper-0 Error in long poll expiry thread: (kafka.server.RequestPurgatory$ExpiredRequestReaper:102) java.lang.InterruptedException at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:1961) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2038) at java.util.concurrent.DelayQueue.poll(DelayQueue.java:209) at kafka.server.RequestPurgatory$ExpiredRequestReaper.pollExpired(RequestPurgatory.scala:269) at kafka.server.RequestPurgatory$ExpiredRequestReaper.run(RequestPurgatory.scala:221) at java.lang.Thread.run(Thread.java:680) [2013-01-28 10:15:54,534] ERROR ExpiredRequestReaper-0 Error in long poll expiry thread: (kafka.server.RequestPurgatory$ExpiredRequestReaper:102) java.lang.InterruptedException at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:1961) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2038) at java.util.concurrent.DelayQueue.poll(DelayQueue.java:209) at kafka.server.RequestPurgatory$ExpiredRequestReaper.pollExpired(RequestPurgatory.scala:269) at kafka.server.RequestPurgatory$ExpiredRequestReaper.run(RequestPurgatory.scala:221) at java.lang.Thread.run(Thread.java:680)
          Hide
          Jun Rao added a comment -

          Attach patch v2. Simplified the logic a bit. ExpiredRequestReaper now checks if we need to do a full purge directly (since it wakes up periodically) and RequestPurgatory.watch() just increments the request count. Also, reduced the wait from 500ms to 200ms.

          The new error in LogRecoveryTest happens when we interrupt the ExpiredRequestReaper thread during shutdown. So, this is fine. The test still fails transiently, but the problem has been there for some time.

          Show
          Jun Rao added a comment - Attach patch v2. Simplified the logic a bit. ExpiredRequestReaper now checks if we need to do a full purge directly (since it wakes up periodically) and RequestPurgatory.watch() just increments the request count. Also, reduced the wait from 500ms to 200ms. The new error in LogRecoveryTest happens when we interrupt the ExpiredRequestReaper thread during shutdown. So, this is fine. The test still fails transiently, but the problem has been there for some time.
          Hide
          Neha Narkhede added a comment -

          +1

          Show
          Neha Narkhede added a comment - +1
          Hide
          Jun Rao added a comment -

          Just realized there are a couple of other corner cases that we need to handle. First, we shut down ExpiredRequestReaper by interrupting the thread. This of course, can close a filechannel, which can cause a KafkaStorageException during a subsequent log append. This means that we may unnecessarily do an unclean shutdown. Second, the replicaFetcher thread is shut down through interruption too. When the thread is interrupted, it can be in the middle of a log append. Similarly, this means another unnecessary unclean shutdown.

          We can remove interruption during shutdown and just rely on the isRunning flag. This is fine for the first case since ExpiredRequestReaper wakes up every 200ms. In the second case, this means that we may have to wait for the last outstanding fetch request to complete. This can potentially make shutdown longer. Not sure if this is a big concern.

          Show
          Jun Rao added a comment - Just realized there are a couple of other corner cases that we need to handle. First, we shut down ExpiredRequestReaper by interrupting the thread. This of course, can close a filechannel, which can cause a KafkaStorageException during a subsequent log append. This means that we may unnecessarily do an unclean shutdown. Second, the replicaFetcher thread is shut down through interruption too. When the thread is interrupted, it can be in the middle of a log append. Similarly, this means another unnecessary unclean shutdown. We can remove interruption during shutdown and just rely on the isRunning flag. This is fine for the first case since ExpiredRequestReaper wakes up every 200ms. In the second case, this means that we may have to wait for the last outstanding fetch request to complete. This can potentially make shutdown longer. Not sure if this is a big concern.
          Hide
          Jun Rao added a comment -

          Attach a follow up patch.

          1. Make ExpiredRequestReaper a non-daemon thread since we want to shut it down explicitly. Remove interrupt since the thread now wakes up periodically.

          2. AbstractFetcher: Make it non-interruptible. Just wait for the last fetch request to complete.

          3. ReplicaFetcher: handle KafkaStorageException properly.

          Show
          Jun Rao added a comment - Attach a follow up patch. 1. Make ExpiredRequestReaper a non-daemon thread since we want to shut it down explicitly. Remove interrupt since the thread now wakes up periodically. 2. AbstractFetcher: Make it non-interruptible. Just wait for the last fetch request to complete. 3. ReplicaFetcher: handle KafkaStorageException properly.
          Hide
          Jun Rao added a comment -

          reopen it to deal with followup issue.s

          Show
          Jun Rao added a comment - reopen it to deal with followup issue.s
          Hide
          Neha Narkhede added a comment -

          Thanks for the follow up patch, most of the changes are good. I have a few questions -

          1. ReplicaFetcherThread
          Shouldn't we also catch Throwable and at least log an error saying why the fetcher thread died ? Otherwise, if there is some code bug, it will die anyway, but we will not know the reason why.

          2. AbstractFetcherThread
          Why are we overriding isInterruptible in AbstractFetcherThread. I think you want ReplicaFetcherThread to be uninterruptible, but ConsumerThread can be interruptible, no ?

          3. RequestPurgatory
          Shouldn't expiration thread be a daemon thread ? We don't really want the expiration thread to block the JVM from shutting down, do we ?

          Show
          Neha Narkhede added a comment - Thanks for the follow up patch, most of the changes are good. I have a few questions - 1. ReplicaFetcherThread Shouldn't we also catch Throwable and at least log an error saying why the fetcher thread died ? Otherwise, if there is some code bug, it will die anyway, but we will not know the reason why. 2. AbstractFetcherThread Why are we overriding isInterruptible in AbstractFetcherThread. I think you want ReplicaFetcherThread to be uninterruptible, but ConsumerThread can be interruptible, no ? 3. RequestPurgatory Shouldn't expiration thread be a daemon thread ? We don't really want the expiration thread to block the JVM from shutting down, do we ?
          Hide
          Jun Rao added a comment -

          Attach followup patch v2.

          Show
          Jun Rao added a comment - Attach followup patch v2.
          Hide
          Jun Rao added a comment -

          1. Any unexpected exception in ReplicaFetcherThread will kill the thread and the error will be logged in ShutdownableThread.

          2. This is a good point and is fixed in the v2 patch.

          3. We are shutting down the expiration thread explicitly. If the thread can't be shut down, it indicates a bug. It's probably better to expose the bug than making this a daemon thread.

          So, the only change in patch v2 is to address #2.

          Show
          Jun Rao added a comment - 1. Any unexpected exception in ReplicaFetcherThread will kill the thread and the error will be logged in ShutdownableThread. 2. This is a good point and is fixed in the v2 patch. 3. We are shutting down the expiration thread explicitly. If the thread can't be shut down, it indicates a bug. It's probably better to expose the bug than making this a daemon thread. So, the only change in patch v2 is to address #2.
          Hide
          Neha Narkhede added a comment -

          1,3. Makes sense and the follow up patch v2 looks good.
          +1

          Show
          Neha Narkhede added a comment - 1,3. Makes sense and the follow up patch v2 looks good. +1
          Hide
          Jay Kreps added a comment -

          Can we just remove the interrupt entirely instead of making it optional. Unless we want to write tests for both cases and maintain this functionality in perpetuity...seems like we should just stop using interrupt?

          Show
          Jay Kreps added a comment - Can we just remove the interrupt entirely instead of making it optional. Unless we want to write tests for both cases and maintain this functionality in perpetuity...seems like we should just stop using interrupt?
          Hide
          Jun Rao added a comment -

          It's possible. However, some subclasses of ShutdownableThread such as ConsumerFetchThread does blocking writes to a queue. So, if the queue is full, not sure if we can shut down the consumer thread without sending interrupts.

          Show
          Jun Rao added a comment - It's possible. However, some subclasses of ShutdownableThread such as ConsumerFetchThread does blocking writes to a queue. So, if the queue is full, not sure if we can shut down the consumer thread without sending interrupts.

            People

            • Assignee:
              Jun Rao
              Reporter:
              Neha Narkhede
            • Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development