Kafka
  1. Kafka
  2. KAFKA-757

System Test Hard Failure cases : "Fatal error during KafkaServerStable startup" when hard-failed broker is re-started

    Details

    • Type: Bug Bug
    • Status: Closed
    • Priority: Blocker Blocker
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 0.8.0
    • Component/s: None
    1. kafka-757-v3.patch
      3 kB
      Swapnil Ghike
    2. kafka-757-v2.patch
      2 kB
      Swapnil Ghike
    3. kafka-757-v1.patch
      2 kB
      Swapnil Ghike

      Issue Links

        Activity

        Hide
        Jun Rao added a comment -

        Thanks for patch v3. Committed to 0.8.

        Show
        Jun Rao added a comment - Thanks for patch v3. Committed to 0.8.
        Hide
        Swapnil Ghike added a comment -

        Yes, your point is valid. Jay also suggested to implement truncate() directly without calling indexSlotFor(). Patch v3 contains the change.

        Show
        Swapnil Ghike added a comment - Yes, your point is valid. Jay also suggested to implement truncate() directly without calling indexSlotFor(). Patch v3 contains the change.
        Hide
        Jun Rao added a comment -

        Thanks for the patch. Good catch. I think problem B can explain item 2 in KAFKA-750.

        I am not so sure about the fix in OffsetIndex though. indexSlotFor() assumes that the index is valid. However, when truncate() is called, the index may not be valid. Instead of changing the assumption in indexSlotFor(), it's probably better to implement truncate() directly without relying on index lookups.

        Show
        Jun Rao added a comment - Thanks for the patch. Good catch. I think problem B can explain item 2 in KAFKA-750 . I am not so sure about the fix in OffsetIndex though. indexSlotFor() assumes that the index is valid. However, when truncate() is called, the index may not be valid. Instead of changing the assumption in indexSlotFor(), it's probably better to implement truncate() directly without relying on index lookups.
        Hide
        Swapnil Ghike added a comment -

        Sorry, in indexSlotFor() we should check if lastOffset is the same as or less than baseOffset. Attached patch v2.

        Show
        Swapnil Ghike added a comment - Sorry, in indexSlotFor() we should check if lastOffset is the same as or less than baseOffset. Attached patch v2.
        Hide
        Swapnil Ghike added a comment -

        There are two parts to this patch :

        A. Move the sanity check to detect corrupt index files from OffsetIndex constructor to Log constructor below the recovery logic. In case of a hard kill, checking for corrupt index files before the last segment has been recovered will fail the require() assertion.

        B. The following corner case is possible:
        1. A broker rolled a new log segment file and an index file of non-zero size, and got hard killed before any appends to the index file were flushed.
        2. When the broker reboots and tries to load existing log segments, it will encounter this index file that has non-zero size, but has no data.
        3. Since the broker was hard killed, it will enter the recovery logic in Log.loadLogSegments().
        4. The recovery logic will try to truncate the index file to the base offset of the segment. It will try to find the indexSlotFor(baseOffset). indexForSlot() will return a non- zero value, because the relativeOffset(idx, mid) == relOffset == 0.
        5. This will set the size of index file to a non-zero value (which will be half of its original size which was maxIndexSize * 8).
        6. Thus, the require() check for corrupted index file in Log constructor will not pass since we have #entries == size != 0 && lastOffset == baseOffset.

        The solution is to modify indexSlotFor() such that it returns -1 for non–zero sized index file whose lastOffset is 0 (assuming that setLength() will set empty bytes to 0), so that the index file is truncated to #entries == size == 0.

        Testing done:
        1. Unit tests passed.
        2. Change the flush interval and index append interval to really low values. Produce data using console producer (index file will have flushed entries), hard kill the broker, restart the broker. Should see the exception without A. Should pass with A, ctrl+C the broker.
        3. Cleanup the kafka-logs directory, don't cleanup the zookeeper. Restart the broker (to create empty log and index files for topics created in 2 above), it will boot up, hard kill it. Restart the broker again, it should fail without B, should boot successfully with B.

        Show
        Swapnil Ghike added a comment - There are two parts to this patch : A. Move the sanity check to detect corrupt index files from OffsetIndex constructor to Log constructor below the recovery logic. In case of a hard kill, checking for corrupt index files before the last segment has been recovered will fail the require() assertion. B. The following corner case is possible: 1. A broker rolled a new log segment file and an index file of non-zero size, and got hard killed before any appends to the index file were flushed. 2. When the broker reboots and tries to load existing log segments, it will encounter this index file that has non-zero size, but has no data. 3. Since the broker was hard killed, it will enter the recovery logic in Log.loadLogSegments(). 4. The recovery logic will try to truncate the index file to the base offset of the segment. It will try to find the indexSlotFor(baseOffset). indexForSlot() will return a non- zero value, because the relativeOffset(idx, mid) == relOffset == 0. 5. This will set the size of index file to a non-zero value (which will be half of its original size which was maxIndexSize * 8). 6. Thus, the require() check for corrupted index file in Log constructor will not pass since we have #entries == size != 0 && lastOffset == baseOffset. The solution is to modify indexSlotFor() such that it returns -1 for non–zero sized index file whose lastOffset is 0 (assuming that setLength() will set empty bytes to 0), so that the index file is truncated to #entries == size == 0. Testing done: 1. Unit tests passed. 2. Change the flush interval and index append interval to really low values. Produce data using console producer (index file will have flushed entries), hard kill the broker, restart the broker. Should see the exception without A. Should pass with A, ctrl+C the broker. 3. Cleanup the kafka-logs directory, don't cleanup the zookeeper. Restart the broker (to create empty log and index files for topics created in 2 above), it will boot up, hard kill it. Restart the broker again, it should fail without B, should boot successfully with B.
        Hide
        John Fung added a comment -

        This is happening in all Hard-failure testcases in System Test: testcase_015[1-9]

        The following error is thrown when a hard-failed broker (kill -9) is re-started:

        [2013-02-12 10:07:15,015] INFO Started Kafka CSV metrics reporter with polling period 5 seconds (kafka.metrics.KafkaCSVMetricsReporter)
        [2013-02-12 10:07:15,020] INFO [Kafka Server 3], starting (kafka.server.KafkaServer)
        [2013-02-12 10:07:15,046] INFO [Log Manager on Broker 3] Loading log 'test_1-2' (kafka.log.LogManager)
        [2013-02-12 10:07:15,062] INFO Creating or reloading log segment /tmp/kafka_server_3_logs/test_1-2/00000000000000000000.log (kafka.log.FileMessageSet)
        [2013-02-12 10:07:15,066] INFO Loaded index file /tmp/kafka_server_3_logs/test_1-2/00000000000000000000.index with maxEntries = 1310720, maxIndexSize = 10485760, entries = 1310720, lastOffset = 0, file position = 10485760 (kafka.log.OffsetIndex)
        [2013-02-12 10:07:15,068] FATAL Fatal error during KafkaServerStable startup. Prepare to shutdown (kafka.server.KafkaServerStartable)
        java.lang.IllegalArgumentException: requirement failed: Corrupt index found, index file (/tmp/kafka_server_3_logs/test_1-2/00000000000000000000.index) has non-zero size but last offset is 0.
        at scala.Predef$.require(Predef.scala:145)
        at kafka.log.OffsetIndex.<init>(OffsetIndex.scala:95)
        at kafka.log.LogSegment.<init>(LogSegment.scala:36)
        at kafka.log.Log$$anonfun$loadSegments$2.apply(Log.scala:163)
        at kafka.log.Log$$anonfun$loadSegments$2.apply(Log.scala:147)
        at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:827)
        at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
        at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
        at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:826)
        at kafka.log.Log.loadSegments(Log.scala:147)
        at kafka.log.Log.<init>(Log.scala:125)
        at kafka.log.LogManager$$anonfun$loadLogs$1$$anonfun$apply$3.apply(LogManager.scala:115)
        at kafka.log.LogManager$$anonfun$loadLogs$1$$anonfun$apply$3.apply(LogManager.scala:109)
        at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
        at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
        at kafka.log.LogManager$$anonfun$loadLogs$1.apply(LogManager.scala:109)
        at kafka.log.LogManager$$anonfun$loadLogs$1.apply(LogManager.scala:101)
        at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
        at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:32)
        at kafka.log.LogManager.loadLogs(LogManager.scala:101)
        at kafka.log.LogManager.<init>(LogManager.scala:62)
        at kafka.server.KafkaServer.startup(KafkaServer.scala:59)
        at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:34)
        at kafka.Kafka$.main(Kafka.scala:46)
        at kafka.Kafka.main(Kafka.scala)
        [2013-02-12 10:07:15,069] INFO [Kafka Server 3], shutting down (kafka.server.KafkaServer)

        Show
        John Fung added a comment - This is happening in all Hard-failure testcases in System Test: testcase_015 [1-9] The following error is thrown when a hard-failed broker (kill -9) is re-started: [2013-02-12 10:07:15,015] INFO Started Kafka CSV metrics reporter with polling period 5 seconds (kafka.metrics.KafkaCSVMetricsReporter) [2013-02-12 10:07:15,020] INFO [Kafka Server 3] , starting (kafka.server.KafkaServer) [2013-02-12 10:07:15,046] INFO [Log Manager on Broker 3] Loading log 'test_1-2' (kafka.log.LogManager) [2013-02-12 10:07:15,062] INFO Creating or reloading log segment /tmp/kafka_server_3_logs/test_1-2/00000000000000000000.log (kafka.log.FileMessageSet) [2013-02-12 10:07:15,066] INFO Loaded index file /tmp/kafka_server_3_logs/test_1-2/00000000000000000000.index with maxEntries = 1310720, maxIndexSize = 10485760, entries = 1310720, lastOffset = 0, file position = 10485760 (kafka.log.OffsetIndex) [2013-02-12 10:07:15,068] FATAL Fatal error during KafkaServerStable startup. Prepare to shutdown (kafka.server.KafkaServerStartable) java.lang.IllegalArgumentException: requirement failed: Corrupt index found, index file (/tmp/kafka_server_3_logs/test_1-2/00000000000000000000.index) has non-zero size but last offset is 0. at scala.Predef$.require(Predef.scala:145) at kafka.log.OffsetIndex.<init>(OffsetIndex.scala:95) at kafka.log.LogSegment.<init>(LogSegment.scala:36) at kafka.log.Log$$anonfun$loadSegments$2.apply(Log.scala:163) at kafka.log.Log$$anonfun$loadSegments$2.apply(Log.scala:147) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:827) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34) at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:826) at kafka.log.Log.loadSegments(Log.scala:147) at kafka.log.Log.<init>(Log.scala:125) at kafka.log.LogManager$$anonfun$loadLogs$1$$anonfun$apply$3.apply(LogManager.scala:115) at kafka.log.LogManager$$anonfun$loadLogs$1$$anonfun$apply$3.apply(LogManager.scala:109) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34) at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34) at kafka.log.LogManager$$anonfun$loadLogs$1.apply(LogManager.scala:109) at kafka.log.LogManager$$anonfun$loadLogs$1.apply(LogManager.scala:101) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34) at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:32) at kafka.log.LogManager.loadLogs(LogManager.scala:101) at kafka.log.LogManager.<init>(LogManager.scala:62) at kafka.server.KafkaServer.startup(KafkaServer.scala:59) at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:34) at kafka.Kafka$.main(Kafka.scala:46) at kafka.Kafka.main(Kafka.scala) [2013-02-12 10:07:15,069] INFO [Kafka Server 3] , shutting down (kafka.server.KafkaServer)

          People

          • Assignee:
            Swapnil Ghike
            Reporter:
            John Fung
          • Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development