Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-17436

Follower index dump mismatch

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • None
    • None
    • core
    • None

    Description

      When writing data, the starting position and the last offset of the current fetch data will be written to the index. However, when dumping the index, the starting position and the last offset of a batch are verified. Therefore, if the follower fetches multiple batches, it will cause problems with the index file.

       

      Reproduction:

      Unit test in DumpLogSegmentsTest

       

      def addMultiRecordBatches(): Unit = {
        var recordsCount = 0L
        val batches = new ArrayBuffer[BatchInfo]
        val now = System.currentTimeMillis()
        val firstBatchRecords = (0 until 10).map { i => new SimpleRecord(now + i * 2, s"message key $i".getBytes, s"message value $i".getBytes)}
        batches += BatchInfo(firstBatchRecords, true, true)
        val secondBatchRecords = (10 until 30).map { i => new SimpleRecord(now + i * 3, s"message key $i".getBytes, null)}
        batches += BatchInfo(secondBatchRecords, true, false)
        val thirdBatchRecords = (30 until 50).map { i => new SimpleRecord(now + i * 5, null, s"message value $i".getBytes)}
        batches += BatchInfo(thirdBatchRecords, false, true)
        val fourthBatchRecords = (50 until 60).map { i => new SimpleRecord(now + i * 7, null)}
        batches += BatchInfo(fourthBatchRecords, false, false)
      
        batches.foreach { batchInfo =>
          val buf = ByteBuffer.allocate(2048)
          batchInfo.records.grouped(5).foreach { records =>
            val builder = MemoryRecords.builder(buf, RecordBatch.MAGIC_VALUE_V1, Compression.NONE, TimestampType.CREATE_TIME, recordsCount)
            records.foreach(builder.append)
            builder.close()
            recordsCount += records.size.toLong
          }
          buf.flip()
          log.appendAsFollower(MemoryRecords.readableRecords(buf.slice()))
        }
        // Flush, but don't close so that the indexes are not trimmed and contain some zero entries
        log.flush(false)
      }
      
      @Test
      def testDumpIndexMismatches2(): Unit = {
        log = createTestLog
        addMultiRecordBatches()
        val offsetMismatches = mutable.Map[String, List[(Long, Long)]]()
        DumpLogSegments.dumpIndex(new File(indexFilePath), indexSanityOnly = false, verifyOnly = true, offsetMismatches,
          Int.MaxValue)
        assertEquals(Map.empty, offsetMismatches)
      } 

      System test:

      1. start server and produce records
      2. stop follower and delete all data in follower
      3. start follower for fetching data

      result:

      • follower index file size is smaller than leader
      • dump follower index throw index mismatch errors

       

      Solution:
      1: Repair class DumpLogSegments so that it will not report errors
      2: After follower fetched multiple batches, write multiple indexes instead of one

       

      What kind of solution is recommended, I can submit the repair code

       

      Attachments

        1. image-2024-08-28-14-42-44-152.png
          479 kB
          Xiaobing Fang

        Activity

          People

            fxbing Xiaobing Fang
            fxbing Xiaobing Fang
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

              Created:
              Updated: