Kafka
  1. Kafka
  2. KAFKA-593

Empty log index file created when it shouldn't be empty

    Details

    • Type: Bug Bug
    • Status: Resolved
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: None
    • Component/s: None
    • Labels:
      None

      Description

      We have met empty index file during system test when it shouldn't be empty. In this case, there're around 100 messages in each segment, each of size around 100 bytes, given the "logIndexIntervalBytes" 4096, there should be at least 2 log index entries, but we see empty index file. The kafka and zookeeper logs are attached

      [yye@yye-ld kafka_server_3_logs]$ cd test_1-2/
      [yye@yye-ld test_1-2]$ ls -l
      total 84
      rw-rr- 1 yye eng 8 Oct 29 15:22 00000000000000000000.index
      rw-rr- 1 yye eng 10248 Oct 29 15:22 00000000000000000000.log
      rw-rr- 1 yye eng 8 Oct 29 15:22 00000000000000000100.index
      rw-rr- 1 yye eng 10296 Oct 29 15:22 00000000000000000100.log
      rw-rr- 1 yye eng 0 Oct 29 15:23 00000000000000000200.index
      rw-rr- 1 yye eng 10293 Oct 29 15:23 00000000000000000200.log
      rw-rr- 1 yye eng 0 Oct 29 15:23 00000000000000000300.index
      rw-rr- 1 yye eng 10274 Oct 29 15:23 00000000000000000300.log
      rw-rr- 1 yye eng 0 Oct 29 15:23 00000000000000000399.index
      rw-rr- 1 yye eng 10276 Oct 29 15:23 00000000000000000399.log
      rw-rr- 1 yye eng 0 Oct 29 15:23 00000000000000000498.index
      rw-rr- 1 yye eng 10256 Oct 29 15:23 00000000000000000498.log
      rw-rr- 1 yye eng 10485760 Oct 29 15:23 00000000000000000596.index
      rw-rr- 1 yye eng 3564 Oct 29 15:23 00000000000000000596.log

      1. kafka_583_zk_kafka_data.tar.gz
        525 kB
        Yang Ye
      2. kafka_593_v1.diff
        2 kB
        Yang Ye
      3. kafka_593_v2.diff
        3 kB
        Yang Ye
      4. kafka_593_v3.diff
        4 kB
        Yang Ye
      5. kafka_593_v4.diff
        4 kB
        Yang Ye
      6. kafka_593_v5.diff
        6 kB
        Yang Ye

        Issue Links

          Activity

          Hide
          Neha Narkhede added a comment -

          +1 on v5. Committed this patch

          Show
          Neha Narkhede added a comment - +1 on v5. Committed this patch
          Hide
          Jay Kreps added a comment -

          +1

          Show
          Jay Kreps added a comment - +1
          Hide
          Yang Ye added a comment -

          Thanks for the comments, changes in v5:

          1. rename resetSize() to resize()
          2. add using roundToMultiple() in resize()
          3. add comments to resize()
          4. add one unit test "testIndexResizingAtTruncation" in LogTest for this case

          Show
          Yang Ye added a comment - Thanks for the comments, changes in v5: 1. rename resetSize() to resize() 2. add using roundToMultiple() in resize() 3. add comments to resize() 4. add one unit test "testIndexResizingAtTruncation" in LogTest for this case
          Hide
          Jay Kreps added a comment -

          Discussed with Victor. (1) and (3) should be doable, but (2) is not very convenient because the usage actually resizes to indexMaxSize which is in terms of bytes. So our solution was to have the API take bytes and just round to a multiple of 8.

          Show
          Jay Kreps added a comment - Discussed with Victor. (1) and (3) should be doable, but (2) is not very convenient because the usage actually resizes to indexMaxSize which is in terms of bytes. So our solution was to have the API take bytes and just round to a multiple of 8.
          Hide
          Jay Kreps added a comment -

          Looks good, two minor things:
          1. Can we name resetSize to resize?
          2. Can we change the argument to be in terms of number of entries rather than number of bytes? It is incorrect to set to a number of bytes that is not a multiple of the entry size and the entry size is kind of an implementation detail of that class so this would be nicer.
          3. Can we add a test for this case?

          Show
          Jay Kreps added a comment - Looks good, two minor things: 1. Can we name resetSize to resize? 2. Can we change the argument to be in terms of number of entries rather than number of bytes? It is incorrect to set to a number of bytes that is not a multiple of the entry size and the entry size is kind of an implementation detail of that class so this would be nicer. 3. Can we add a test for this case?
          Hide
          Yang Ye added a comment -

          For 30,
          It's fixed

          For 31,
          It seems simpler to keep as it is

          Jay, can you help have a look at this patch?

          Show
          Yang Ye added a comment - For 30, It's fixed For 31, It seems simpler to keep as it is Jay, can you help have a look at this patch?
          Hide
          Jun Rao added a comment -

          Thanks for patch v3. Looks good. Some minor comments.

          30. Log.rollToOffset(): segmentsView.last.index.file.getName.split("
          .")(0).toLong can just be segmentsView.last.start.

          31. Log.loadSegments(): Just to be consistent. Should we use index.maxIndexSize instead of maxIndexSize in the following statement?
          logSegments.get(logSegments.size() - 1).index.resetSizeTo(maxIndexSize)

          Show
          Jun Rao added a comment - Thanks for patch v3. Looks good. Some minor comments. 30. Log.rollToOffset(): segmentsView.last.index.file.getName.split(" .")(0).toLong can just be segmentsView.last.start. 31. Log.loadSegments(): Just to be consistent. Should we use index.maxIndexSize instead of maxIndexSize in the following statement? logSegments.get(logSegments.size() - 1).index.resetSizeTo(maxIndexSize)
          Hide
          Yang Ye added a comment -



          Thanks for the discussions, and here's the third patch which is also verified to be working.

          Show
          Yang Ye added a comment - Thanks for the discussions, and here's the third patch which is also verified to be working.
          Hide
          Yang Ye added a comment -

          Jay, Jun,

          Thanks for the comments.

          Jun also pointed out that there may still be cases where at startup, the last log segment has empty index and empty log file ---- and the trimOrReallocate() is not called because it was a clean shutdown before.

          What about an alternative idea, which is, whenever we load a log segment, we always make sure its offset index has enough disk space and memory. In this case, when we truncate back to old segment, its index will not be full, when we start the last log segment with empty log file and index file, its index will also not be full.

          To do this, we just need to change the constructor of OffsetIndex, by always set the index file size and mmap limit to maxIndexSize.

          Show
          Yang Ye added a comment - Jay, Jun, Thanks for the comments. Jun also pointed out that there may still be cases where at startup, the last log segment has empty index and empty log file ---- and the trimOrReallocate() is not called because it was a clean shutdown before. What about an alternative idea, which is, whenever we load a log segment, we always make sure its offset index has enough disk space and memory. In this case, when we truncate back to old segment, its index will not be full, when we start the last log segment with empty log file and index file, its index will also not be full. To do this, we just need to change the constructor of OffsetIndex, by always set the index file size and mmap limit to maxIndexSize.
          Hide
          Jay Kreps added a comment -

          trimOrReallocate(isReallocate: Boolean) is a bit of a hacky interface. This supports resizing to two sizes: entries or maxSize, but it would be better to just implement the more general
          resize(numEntries: Int)
          numEntries is the mmap.limit/8. It might be nice to leave the helper method trimToSize as a less error prone alias
          def trimToSize() = resize(entries)
          If we do this we should be able to use resize to implement OffsetIndex.truncateTo (the difference between truncateTo and resize is that truncateTo is in terms of offset whereas resize is in terms of entries).

          We should also add a test case to cover these corner cases.

          Show
          Jay Kreps added a comment - trimOrReallocate(isReallocate: Boolean) is a bit of a hacky interface. This supports resizing to two sizes: entries or maxSize, but it would be better to just implement the more general resize(numEntries: Int) numEntries is the mmap.limit/8. It might be nice to leave the helper method trimToSize as a less error prone alias def trimToSize() = resize(entries) If we do this we should be able to use resize to implement OffsetIndex.truncateTo (the difference between truncateTo and resize is that truncateTo is in terms of offset whereas resize is in terms of entries). We should also add a test case to cover these corner cases.
          Hide
          Jun Rao added a comment -

          Thanks for the patch. A couple of comments:

          1. Log.rollToOffset(): We just need to verify that the starting offset of the last segment doesn't equal to the new offset.

          2. OffsetIndex: When loading the log segment on broker startup, it is possible to have a segment with empty data and empty index. So, we will hit the same issue of rolling a segment with a duplicated name. We probably should extend the index size to max index size in the constructor of OffsetIndex.

          Show
          Jun Rao added a comment - Thanks for the patch. A couple of comments: 1. Log.rollToOffset(): We just need to verify that the starting offset of the last segment doesn't equal to the new offset. 2. OffsetIndex: When loading the log segment on broker startup, it is possible to have a segment with empty data and empty index. So, we will hit the same issue of rolling a segment with a duplicated name. We probably should extend the index size to max index size in the constructor of OffsetIndex.
          Hide
          Yang Ye added a comment -

          Basically the problem was during restarting the server and truncation from existing log and index files.

          When after the restart or truncation the existing index file is empty (so it's always full), one of the conditions of maybeRoll() will be true, so a new log segment will be rolled ---- we will end up with two log segments starting from the same offset, one of them is empty.

          To fix it, we change the function trimToSize() in OffsetIndex to trimOrReallocate(), it does either trimming or reallocating the offset index file (and memory mapping). At truncation or restart, it will do reallocation, so that enough space for offset index is allocated.

          We also check existing log segments at roll() function, and throws exception if some segment exists with the same offset as the target offset. (This should not happen)

          Show
          Yang Ye added a comment - Basically the problem was during restarting the server and truncation from existing log and index files. When after the restart or truncation the existing index file is empty (so it's always full), one of the conditions of maybeRoll() will be true, so a new log segment will be rolled ---- we will end up with two log segments starting from the same offset, one of them is empty. To fix it, we change the function trimToSize() in OffsetIndex to trimOrReallocate(), it does either trimming or reallocating the offset index file (and memory mapping). At truncation or restart, it will do reallocation, so that enough space for offset index is allocated. We also check existing log segments at roll() function, and throws exception if some segment exists with the same offset as the target offset. (This should not happen)
          Hide
          Jay Kreps added a comment -

          Err, that should read "intra-messageset positions".

          Show
          Jay Kreps added a comment - Err, that should read "intra-messageset positions".
          Hide
          Jay Kreps added a comment -

          Makes sense, clever. I thought a bit about this during the implementation and decided not to try to make the index entries exact just to reduce complexity. It would be possible to be more exact by having LogSegment calculate inter-messageset positions and append multiple entries to the index if necessary. This would not necessarily be a bad change, but there is definitely some complexity, especially in the face of truncation, so this would need pretty thorough testing.

          Show
          Jay Kreps added a comment - Makes sense, clever. I thought a bit about this during the implementation and decided not to try to make the index entries exact just to reduce complexity. It would be possible to be more exact by having LogSegment calculate inter-messageset positions and append multiple entries to the index if necessary. This would not necessarily be a bad change, but there is definitely some complexity, especially in the face of truncation, so this would need pretty thorough testing.
          Hide
          Jun Rao added a comment -

          Here is the issue. We rolled a new segment in the follower. The follower in one fetch gets 10k bytes of data and appends to its log. This won't add any index entry since it's the very first append to this segment. After the append, the log rolled since the max segment size is reached. This leaves an empty index.

          Technically, the logic is still correct. It does mean that index entries may not be generated as frequently as one expects, depending on the fetch size used in the follower fetcher thread and how far behind a follower is. This may impact consumer performance a bit.

          Show
          Jun Rao added a comment - Here is the issue. We rolled a new segment in the follower. The follower in one fetch gets 10k bytes of data and appends to its log. This won't add any index entry since it's the very first append to this segment. After the append, the log rolled since the max segment size is reached. This leaves an empty index. Technically, the logic is still correct. It does mean that index entries may not be generated as frequently as one expects, depending on the fetch size used in the follower fetcher thread and how far behind a follower is. This may impact consumer performance a bit.
          Hide
          Jun Rao added a comment -

          Producer used sync mode. So, there is 1 message per batch and each segment has about 100 messages. I expect at least 2 index entries being added per segment.

          Show
          Jun Rao added a comment - Producer used sync mode. So, there is 1 message per batch and each segment has about 100 messages. I expect at least 2 index entries being added per segment.
          Hide
          Jay Kreps added a comment -

          I am not sure that this is a bug. The index entries are not placed at exact intervals. Rather, we add a single index entry per append if the bytesSinceLastIndexEntry > indexIntervalBytes. What this means is that you could get a log of 10,296 bytes by appending a single message set of that size or by appending one message set <= 4095 and then another that made up the difference.

          What was batch size being used in this test?

          Show
          Jay Kreps added a comment - I am not sure that this is a bug. The index entries are not placed at exact intervals. Rather, we add a single index entry per append if the bytesSinceLastIndexEntry > indexIntervalBytes. What this means is that you could get a log of 10,296 bytes by appending a single message set of that size or by appending one message set <= 4095 and then another that made up the difference. What was batch size being used in this test?

            People

            • Assignee:
              Unassigned
              Reporter:
              Yang Ye
            • Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development