Details

    • Type: Bug Bug
    • Status: Resolved
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: 0.8.0
    • Fix Version/s: 0.8.0
    • Component/s: None
    • Labels:
      None

      Description

      Currently we only support retention by dropping entire segment files. A more nuanced retention policy would allow dropping individual messages from a segment file by recopying it. This is not currently possible because the lookup structure we use to locate messages is based on the file offset directly.

      To fix this we should move to a sequential, logical offset (0,1,2,3,...) which would allow deleting individual messages (e.g. 2) without deleting the entire segment.

      It is desirable to make this change in the 0.8 timeframe since we are already doing data format changes.

      As part of this we would explicitly store the key field given by the producer for partitioning (right now there is no way for the consumer to find the value used for partitioning).

      This combination of features would allow a key-based retention policy that would clean obsolete values either by a user defined key.

      The specific use case I am targeting is a commit log for local state maintained by a process doing some kind of near-real-time processing. The process could log out its local state changes and be able to restore from this log in the event of a failure. However I think this is a broadly useful feature.

      The following changes would be part of this:
      1. The log format would now be
      8 byte offset
      4 byte message_size
      N byte message
      2. The offsets would be changed to a sequential, logical number rather than the byte offset (e.g. 0,1,2,3,...)
      3. A local memory-mapped lookup structure will be kept for each log segment that contains the mapping from logical to physical offset.

      I propose to break this into two patches. The first makes the log format changes, but retains the physical offset. The second adds the lookup structure and moves to logical offset.

      Here are a few issues to be considered for the first patch:
      1. Currently a MessageSet implements Iterable[MessageAndOffset]. One surprising thing is that the offset is actually the offset of the next message. I think there are actually several uses for the current offset. I would propose making this hold the current message offset since with logical offsets the next offset is always just current_offset+1. Note that since we no longer require messages to be dense, it is not true that if the next offset is N the current offset is N-1 (because N-1 may have been deleted). Thoughts or objections?
      2. Currently during iteration over a ByteBufferMessageSet we throw an exception if there are zero messages in the set. This is used to detect fetches that are smaller than a single message size. I think this behavior is misplaced and should be moved up into the consumer.
      3. In addition to adding a key in Message, I made two other changes: (1) I moved the CRC to the first field and made it cover the entire message contents (previously it only covered the payload), (2) I dropped support for Magic=0, effectively making the attributes field required, which simplifies the code (since we are breaking compatibility anyway).

      1. KAFKA-506-neha-post-review.patch
        4 kB
        Jay Kreps
      2. KAFKA-506-neha-post-review-v2.patch
        6 kB
        Jay Kreps
      3. KAFKA-506-phase-2.patch
        163 kB
        Jay Kreps
      4. KAFKA-506-phase-2-v1.patch
        262 kB
        Jay Kreps
      5. KAFKA-506-phase-2-v2.patch
        262 kB
        Jay Kreps
      6. KAFKA-506-phase-2-v3.patch
        281 kB
        Jay Kreps
      7. KAFKA-506-phase-2-v4.patch
        284 kB
        Jay Kreps
      8. KAFKA-506-phase-2-v5.patch
        286 kB
        Jay Kreps
      9. KAFKA-506-phase-2-v5.patch
        286 kB
        Jay Kreps
      10. KAFKA-506-v1.patch
        125 kB
        Jay Kreps
      11. KAFKA-506-v1-draft.patch
        101 kB
        Jay Kreps
      12. KAFKA-506-v4-changes-since-v3.patch
        16 kB
        Jay Kreps

        Issue Links

          Activity

          Jay Kreps created issue -
          Hide
          Jay Kreps added a comment -

          Add key to message and reorder some fields
          Bump up Message magic number to 2
          Add offset to MessageSet format
          Make MessageAndOffset contain the current offset and add a nextOffset() method to get the next offset
          Some misc. cleanups (delete some obsolete files, fix bad formatting)

          There are still two problems with this patch:
          1. Not handling offsets properly in compressed messages
          2. Unit test failures in LogRecoveryTest

          Show
          Jay Kreps added a comment - Add key to message and reorder some fields Bump up Message magic number to 2 Add offset to MessageSet format Make MessageAndOffset contain the current offset and add a nextOffset() method to get the next offset Some misc. cleanups (delete some obsolete files, fix bad formatting) There are still two problems with this patch: 1. Not handling offsets properly in compressed messages 2. Unit test failures in LogRecoveryTest
          Jay Kreps made changes -
          Field Original Value New Value
          Attachment KAFKA-506-v1-draft.patch [ 12545057 ]
          Hide
          Jay Kreps added a comment -

          Updated the patch. This patch fixes the remaining failing tests and correctly handles compressed messages.

          This patch is ready for review.

          Show
          Jay Kreps added a comment - Updated the patch. This patch fixes the remaining failing tests and correctly handles compressed messages. This patch is ready for review.
          Jay Kreps made changes -
          Attachment KAFKA-506-v1.patch [ 12545234 ]
          Hide
          Jay Kreps added a comment -

          I am going to begin phase two of this, implementing the logical offset management in Log.

          Show
          Jay Kreps added a comment - I am going to begin phase two of this, implementing the logical offset management in Log.
          Hide
          Jun Rao added a comment -

          Thanks for patch v1. Overall, the log format change is reasonable. Some comments:

          1. MessageAndOffset: nextOffset is not correct for compressed messages. Currently, in the high-level consumer, after iterating each message, the consume offset is moved to the offset of the next message. So, if one consumes a message and then commits the offset, the committed offset points to the next message to be consumed. We could probably change the protocol to move the consumer offset to the offset of the current message. Then, the caller will need to commit the offset first and then consumes the message to get the same semantics.

          2. Message:
          2.1 The comment of the message has a bug. Payload should have (N- K - 10) bytes.
          2.2 In constructor, should we assert that offset is btw 0 and bytes.length-1? Also, just to be clear that offset and size are for the payload, should we rename bytes, offset and size to something like payload, payloadOffset and payloadSize?
          2.3 computeChecksum(): can use MagicOffset for both starting offset and length
          2.4 remove unused import

          3. MessageSet: Fix the comment in second line "A The format".

          4. ByteBufferMessageSet: remove unused comment

          5. Log:
          5.1 append(): For verifying message size, we need to use the shallow iterator since a compressed message has to be smaller than the configured max message size.
          5.2 append(): Compressed messages are forced to be decompressed and then compressed again. This will introduce some CPU overhead. What's the increase in CPU utilization if incoming messages are compressed? Also, for replicaFetchThread, it can just put the data fetched from the leader directly into the log without recomputing the offsets. Could we add a flag in append to bypass regenerating the offsets?
          5.3 trimInvalidBytes(): There is a bug in the following statement: messages.size should be messages.sizeInBytes.
          if(messageSetValidBytes == messages.size) {

          6. javaapi.ByteBufferMessageSet: Java users shouldn't really be using buffer. So, we don't need the bean property.

          7. PartitionData: Do we need to override equal and hash since this is already a case class?

          8. ZkUtils.conditionalUpdatePersistenPath(): This method expects exception due to version conflict. So there is no need to log the exception.

          9. SyncProducerTest: remove unused imports

          10. How do we handle the case that a consumer uses too small a fetch size?

          Show
          Jun Rao added a comment - Thanks for patch v1. Overall, the log format change is reasonable. Some comments: 1. MessageAndOffset: nextOffset is not correct for compressed messages. Currently, in the high-level consumer, after iterating each message, the consume offset is moved to the offset of the next message. So, if one consumes a message and then commits the offset, the committed offset points to the next message to be consumed. We could probably change the protocol to move the consumer offset to the offset of the current message. Then, the caller will need to commit the offset first and then consumes the message to get the same semantics. 2. Message: 2.1 The comment of the message has a bug. Payload should have (N- K - 10) bytes. 2.2 In constructor, should we assert that offset is btw 0 and bytes.length-1? Also, just to be clear that offset and size are for the payload, should we rename bytes, offset and size to something like payload, payloadOffset and payloadSize? 2.3 computeChecksum(): can use MagicOffset for both starting offset and length 2.4 remove unused import 3. MessageSet: Fix the comment in second line "A The format". 4. ByteBufferMessageSet: remove unused comment 5. Log: 5.1 append(): For verifying message size, we need to use the shallow iterator since a compressed message has to be smaller than the configured max message size. 5.2 append(): Compressed messages are forced to be decompressed and then compressed again. This will introduce some CPU overhead. What's the increase in CPU utilization if incoming messages are compressed? Also, for replicaFetchThread, it can just put the data fetched from the leader directly into the log without recomputing the offsets. Could we add a flag in append to bypass regenerating the offsets? 5.3 trimInvalidBytes(): There is a bug in the following statement: messages.size should be messages.sizeInBytes. if(messageSetValidBytes == messages.size) { 6. javaapi.ByteBufferMessageSet: Java users shouldn't really be using buffer. So, we don't need the bean property. 7. PartitionData: Do we need to override equal and hash since this is already a case class? 8. ZkUtils.conditionalUpdatePersistenPath(): This method expects exception due to version conflict. So there is no need to log the exception. 9. SyncProducerTest: remove unused imports 10. How do we handle the case that a consumer uses too small a fetch size?
          Hide
          Jay Kreps added a comment -

          Great feedback, thanks.

          1. Good point about nextOffset. I think this is slightly tricky to fix. I think I will ignore this problem and work on phase 2 which will fix that issue by making nextOffset=offset+1. This means taking both patches at once which will be a bit big. Sound feasible?
          2-4 Good feedback
          5.1. Good point.
          5.2. I will do a little micro-benchmark on decompression/re-compression. Yes, we can definitely avoid this for the replica fetcher thread. Depending on how much we want to optimize that path there are a lot of options. On the extreme side of total trust I think it might actually possible to do FileChannel.transferTo directly from the socket buffer, though there are complications around metrics and hw mark. I think for now it makes sense to just skip decompression. One question: let's say recompression turns out to be expensive, there are two options: (1) do not set internal offsets (as today), (2) eat the cost and recommend snappy instead of gzip. Personally I prefer (2) since I think we need to fix the correctness bugs, but I am open to implementing either if there is a consensus.
          5.3. Good catch
          6. OK
          7. I am not sure. We had a custom implementation of equals but no hashcode which I think was likely wrong. We can remove both, but I would want to figure out why we added the equals.
          8-9. OK
          10. Ah, forgot to add that. I think the right thing is just to check (currentDataChunk.messages.size > 0 && currentDataChunk.buffer.size == fetchSize) throw Exception() in the ConsumerIterator. The only thing to consider is that this means there is no check for the simpleconsumer.

          Show
          Jay Kreps added a comment - Great feedback, thanks. 1. Good point about nextOffset. I think this is slightly tricky to fix. I think I will ignore this problem and work on phase 2 which will fix that issue by making nextOffset=offset+1. This means taking both patches at once which will be a bit big. Sound feasible? 2-4 Good feedback 5.1. Good point. 5.2. I will do a little micro-benchmark on decompression/re-compression. Yes, we can definitely avoid this for the replica fetcher thread. Depending on how much we want to optimize that path there are a lot of options. On the extreme side of total trust I think it might actually possible to do FileChannel.transferTo directly from the socket buffer, though there are complications around metrics and hw mark. I think for now it makes sense to just skip decompression. One question: let's say recompression turns out to be expensive, there are two options: (1) do not set internal offsets (as today), (2) eat the cost and recommend snappy instead of gzip. Personally I prefer (2) since I think we need to fix the correctness bugs, but I am open to implementing either if there is a consensus. 5.3. Good catch 6. OK 7. I am not sure. We had a custom implementation of equals but no hashcode which I think was likely wrong. We can remove both, but I would want to figure out why we added the equals. 8-9. OK 10. Ah, forgot to add that. I think the right thing is just to check (currentDataChunk.messages.size > 0 && currentDataChunk.buffer.size == fetchSize) throw Exception() in the ConsumerIterator. The only thing to consider is that this means there is no check for the simpleconsumer.
          Hide
          Jun Rao added a comment -

          Actually, there is another thing.

          11. We need to change DefaultEventHandler to put the key data into messages sent to the broker. Also, Producer currently can take any type as key, do we want to restrict it to bytes or do we want to define a serializer for key too?

          Show
          Jun Rao added a comment - Actually, there is another thing. 11. We need to change DefaultEventHandler to put the key data into messages sent to the broker. Also, Producer currently can take any type as key, do we want to restrict it to bytes or do we want to define a serializer for key too?
          Hide
          Neha Narkhede added a comment -

          Thanks for the patch ! The log format change doesn't interfere with replication as of this patch. A few comments in addition to Jun's -

          1. CompressionUtils: How about re-using the ByteBufferMessageSet.writeMessage() API for serializing the compressed message to a byte buffer ?

          2. ByteBufferMessageSet.scala, FileMessageSet: Can we use MessageSet.LogOverhead instead of 12 for byte arithmetic ?

          3. ConsumerIterator
          The nextOffset issue for compressed message sets will get resolved when we actually use the sequential logical offsets. With that, the advantage is that the consumer will be able to fetch a message even if it is inside a compressed message. Today, there is no good way to achieve this unless we have level-2 message offsets for compressed messages. Even if we cannot make that change in time for replication, we can take this change and leave the message set iterator to return the next offset (valid fetch offset), just like we do today. So, either way, we are covered here.

          Show
          Neha Narkhede added a comment - Thanks for the patch ! The log format change doesn't interfere with replication as of this patch. A few comments in addition to Jun's - 1. CompressionUtils: How about re-using the ByteBufferMessageSet.writeMessage() API for serializing the compressed message to a byte buffer ? 2. ByteBufferMessageSet.scala, FileMessageSet: Can we use MessageSet.LogOverhead instead of 12 for byte arithmetic ? 3. ConsumerIterator The nextOffset issue for compressed message sets will get resolved when we actually use the sequential logical offsets. With that, the advantage is that the consumer will be able to fetch a message even if it is inside a compressed message. Today, there is no good way to achieve this unless we have level-2 message offsets for compressed messages. Even if we cannot make that change in time for replication, we can take this change and leave the message set iterator to return the next offset (valid fetch offset), just like we do today. So, either way, we are covered here.
          Hide
          Jay Kreps added a comment -

          This patch is incremental from the previous one. I will rebase and provide an up-to-date patch that covers both phases, but this shows the new work required to support logical offsets.

          I think I have addressed most of the comments on the original patch, except:
          1. I have put off any performance optimization (avoiding recompression for replicas, memory-mapping the log, etc). I would like to break this into a separate JIRA and write a reasonable standalone Log benchmark that covers these cases and then work against that. I have several other cleanups I would like to do as well: (1) get rid of SegmentList, (2) move more functionality in Log into LogSegment.
          2. I am not yet storing the key in the message, this may change the produce api slightly so i think this should be a seperate JIRA too.
          3. Neha--I change most of the uses of magical numbers except where the concrete number is more clear.

          Here is a description of the new changes.

          • Offset now always refers to a logical log offset. I have tried to change any instances where offset meant file offset to instead use the terminology "position". References to file positions should only occur in Log.scala and classes internal to that.
          • As in the previous patch MessageAndOffset gives three things: (1) the message, (2) the offset of THAT message, and (3) a helper method to calculate the next offset.
          • Log.append() is responsible for maintaining the logEndOffset and using it to assign offsets to the messageset before appending to the log.
          • Offsets are now assigned to compressed messages too. One nuance is that the offset of the wrapper message is equal to the last offset of the messages it contains. This will be more clear in the discussion of the offset search changes.
          • Log.read now accepts a new argument maxOffset, which is the largest (logical) offset that will be returned in addition to the maxSize which limits the size in bytes.
          • I have changed Log.read to now support sparse offsets. That is, it is valid to have missing offsets. This sparseness is needed both for the key-retention but also for the correct handling of compressed messages. I will describe the read path in more detail below.
          • I moved FileMessageSet to the package kafka.log as already much of its functionality was specific to the log implementation.
          • I changed FetchPurgatory back to use a simple counter for accumulated bytes. It was previously re-calculating the available bytes, but because this now is a more expensive operation, and because this calculation is redone for each topic/partition produce (i.e. potentially 200 times per produce request), I think this is better. This is less accurate, but since long poll is a heuristic anyway I think that is okay.
          • I changed the default suffix of .kafka files to .log and added a new .index file that contains a sparse index of offset=>file_position to help efficiently resolve logical offsets.
          • Entries are added to this index at a configurable frequency, controlled by a new configuration log.index.interval.bytes which defaults to 4096
          • I removed numerous instances of byte calculations. I think this is a good thing for code quality.

          Here is a description of the new read path.
          1. First log tries to find the correct segment to read from using the existing binary search on log segments. I modified this search slightly in two ways. First we had a corner case bug which only occurred if you have two files with successive offsets (unlikely now, impossible before). Second, I now no longer check ranges but instead return the largest segment file less than or equal to the requested offset.
          2. Once the segment is found we check the index on that segment. The index returns the largest offset less than or equal to the requested offset and the associated file position in the log file. This position represents a least upper bound on the position in the file, and it is the position from which we begin a linear search checking each message. The index itself is just a sorted sequence of (offset, position) pairs. Complete details are in the header comments on kafka.log.OffsetIndex.scala. It is not required that all messages have an entry in the OffsetIndex, instead there is a confgurable frequency in terms of bytes which is set in LogSegment. So, for example, we might have an entry every 4096 bytes. This frequency is approximate, since a single message may be larger than that.
          3. Once we have a greatest lower bound on the location we use FileMessageSet.searchFor to search for the position of the first message with an offset at least as large as the target offset. This search just skips through the file checking the offset only.

          Show
          Jay Kreps added a comment - This patch is incremental from the previous one. I will rebase and provide an up-to-date patch that covers both phases, but this shows the new work required to support logical offsets. I think I have addressed most of the comments on the original patch, except: 1. I have put off any performance optimization (avoiding recompression for replicas, memory-mapping the log, etc). I would like to break this into a separate JIRA and write a reasonable standalone Log benchmark that covers these cases and then work against that. I have several other cleanups I would like to do as well: (1) get rid of SegmentList, (2) move more functionality in Log into LogSegment. 2. I am not yet storing the key in the message, this may change the produce api slightly so i think this should be a seperate JIRA too. 3. Neha--I change most of the uses of magical numbers except where the concrete number is more clear. Here is a description of the new changes. Offset now always refers to a logical log offset. I have tried to change any instances where offset meant file offset to instead use the terminology "position". References to file positions should only occur in Log.scala and classes internal to that. As in the previous patch MessageAndOffset gives three things: (1) the message, (2) the offset of THAT message, and (3) a helper method to calculate the next offset. Log.append() is responsible for maintaining the logEndOffset and using it to assign offsets to the messageset before appending to the log. Offsets are now assigned to compressed messages too. One nuance is that the offset of the wrapper message is equal to the last offset of the messages it contains. This will be more clear in the discussion of the offset search changes. Log.read now accepts a new argument maxOffset, which is the largest (logical) offset that will be returned in addition to the maxSize which limits the size in bytes. I have changed Log.read to now support sparse offsets. That is, it is valid to have missing offsets. This sparseness is needed both for the key-retention but also for the correct handling of compressed messages. I will describe the read path in more detail below. I moved FileMessageSet to the package kafka.log as already much of its functionality was specific to the log implementation. I changed FetchPurgatory back to use a simple counter for accumulated bytes. It was previously re-calculating the available bytes, but because this now is a more expensive operation, and because this calculation is redone for each topic/partition produce (i.e. potentially 200 times per produce request), I think this is better. This is less accurate, but since long poll is a heuristic anyway I think that is okay. I changed the default suffix of .kafka files to .log and added a new .index file that contains a sparse index of offset=>file_position to help efficiently resolve logical offsets. Entries are added to this index at a configurable frequency, controlled by a new configuration log.index.interval.bytes which defaults to 4096 I removed numerous instances of byte calculations. I think this is a good thing for code quality. Here is a description of the new read path. 1. First log tries to find the correct segment to read from using the existing binary search on log segments. I modified this search slightly in two ways. First we had a corner case bug which only occurred if you have two files with successive offsets (unlikely now, impossible before). Second, I now no longer check ranges but instead return the largest segment file less than or equal to the requested offset. 2. Once the segment is found we check the index on that segment. The index returns the largest offset less than or equal to the requested offset and the associated file position in the log file. This position represents a least upper bound on the position in the file, and it is the position from which we begin a linear search checking each message. The index itself is just a sorted sequence of (offset, position) pairs. Complete details are in the header comments on kafka.log.OffsetIndex.scala. It is not required that all messages have an entry in the OffsetIndex, instead there is a confgurable frequency in terms of bytes which is set in LogSegment. So, for example, we might have an entry every 4096 bytes. This frequency is approximate, since a single message may be larger than that. 3. Once we have a greatest lower bound on the location we use FileMessageSet.searchFor to search for the position of the first message with an offset at least as large as the target offset. This search just skips through the file checking the offset only.
          Jay Kreps made changes -
          Attachment KAFKA-506-phase-2.patch [ 12546813 ]
          Hide
          Jay Kreps added a comment -

          Okay attached a fully rebased patch that contains both phase 1 and phase 2 changes.

          Show
          Jay Kreps added a comment - Okay attached a fully rebased patch that contains both phase 1 and phase 2 changes.
          Jay Kreps made changes -
          Attachment KAFKA-506-phase-2-v1.patch [ 12546937 ]
          Jay Kreps made changes -
          Attachment KAFKA-506-phase-2-v2.patch [ 12547037 ]
          Jay Kreps made changes -
          Attachment KAFKA-506-phase-2-v2.patch [ 12547037 ]
          Jay Kreps made changes -
          Attachment KAFKA-506-phase-2-v2.patch [ 12547038 ]
          Hide
          Jay Kreps added a comment -

          Three preliminary comments from Neha while she does deeper interogations:

          • Would be nice if the DumpLogSegment tool also dumped the contents of the index file
          • This patch implicitly assumes file segments are limited to 2GB (I use a 4 byte position pointer in the index). Turns out this isn't true. Proposed fix is to limit log segments to 2GB.
          • We decided the corner case with sparse messages at the end of a segment isn't really a corner case as it effects compressed messages too. So I will fix that in the scope of this patch.
          Show
          Jay Kreps added a comment - Three preliminary comments from Neha while she does deeper interogations: Would be nice if the DumpLogSegment tool also dumped the contents of the index file This patch implicitly assumes file segments are limited to 2GB (I use a 4 byte position pointer in the index). Turns out this isn't true. Proposed fix is to limit log segments to 2GB. We decided the corner case with sparse messages at the end of a segment isn't really a corner case as it effects compressed messages too. So I will fix that in the scope of this patch.
          Hide
          Neha Narkhede added a comment -

          2 additions to the preliminary comments -

          • 3 unit tests fail on patch v2 - http://pastebin.com/ECUA2n1f
          • It will be nice for maxIndexEntries to be a configurable property on the server
          Show
          Neha Narkhede added a comment - 2 additions to the preliminary comments - 3 unit tests fail on patch v2 - http://pastebin.com/ECUA2n1f It will be nice for maxIndexEntries to be a configurable property on the server
          Hide
          Jun Rao added a comment -

          Thanks for patch v2. Some more comments:

          20. Log:
          20.1 findRange(): Add to the comment that now this method returns the largest segment file <= the requested offset.
          20.2 close(): move the closing } for the for loop to a new line.
          20.3 bytesSinceLastIndexEntry is only set but is never read.
          20.4 append(): This method returns the offset of the first message to be appended. This is ok for the purpose of returning the offset to the producer. However, when determining whether all replicas have received the appended messages, we need to use the log end offset after the messages are appended. So, what we should do is to have append() return 2 offsets, one before the append and one after the append. We use the former in producer response and use the latter for the replica check. To avoid complicating this patch further, another approach is to, in the jira, have append return the log end offset after the append and use it in both producer response and replica check. We can file a separate jira to have append return 2 offsets.
          20.5 read(): The trace statement: last format pattern should be %d instead of %s.
          20.6 truncateTo(): The usage of logEndOffset in the following statement is incorrect. It should be the offset of the next segment.
          segments.view.find(segment => targetOffset >= segment.start && targetOffset < logEndOffset)
          20.7 There are several places where we need to create a log segment and the code for creating the new data file and the new index file is duplicate. Could we create a utility function createNewSegment to share the code?

          21. LogSegment: bytesSinceLastIndexEntry needs to be updated in append().

          22. FileMessageSet.searchFor(): The following check seems to be a bit strange. Shouldn't we use position + 12 or just position instead?
          while(position + 8 < size) {

          23. OffsetIndex:
          23.1 In the comment, "mutable index can be created to" seems to have a grammar bug.
          23.2 mmap initialization: The following statement seems unnecessary. However, we do need to set the mapped buffer's position to end of file for mutable indexes.
          idx.position(idx.limit).asInstanceOf[MappedByteBuffer]
          23.3 append(): If index entry is full, should we automatically roll the log segment? It's ok if this is tracked in a separate jira.
          23.4 makeReadOnly(): should we call flush after raf.setLength()? Also, should we remap the index file to the current length and make it read only?

          24. LogManager.shutdown(): log indentation already adds LogManager in the prefix of each log entry.

          25. KafkaApis:
          25.1 handleFetchRequest: topicDatas is weird since data is the plural form of datum. How about topicDataMap?
          25.2 ProducerRequestPurgatory: It seems that it's useful to keep the logIndent since it can distinguish logs from the ProducerRequestPurgatory and FetchRequestPurgatory. Also, it's probably useful to pass in brokerId to RequestPurgatory for debugging unit tests.

          26. Partition: There are a few places that the first character of info log is changed to lower case. The current convention is to already use upper case.

          27. javaapi.ByteBufferMessageSet: underlying should be private val.

          28. DumpLogSegment: Now that each message stores an offset, we should just print the offset in MessageAndOffset. There is no need for var offset now.

          29. FetchedDataChunk: No need to use val for parameters in constructor since this is a case class now.

          30. PartitionData:
          30.1 No need to redefine equals and hashcode since this is already a case class.
          30.2 initialOffset is no longer needed.

          31. PartitionTopicInfo.enqueue(): It seems that next can be computed using shallow iterator since the offset of a compressed message is always the offset of the last internal message.

          32. ByteBufferMessageSet: In create() and decompress(), we probably should close the output and the input stream in a finally clause in case we hit any exception during compression and decompression.

          33. remove unused imports.

          The following comment from the first round of review is still not addressed.
          10. How do we handle the case that a consumer uses too small a fetch size?

          Show
          Jun Rao added a comment - Thanks for patch v2. Some more comments: 20. Log: 20.1 findRange(): Add to the comment that now this method returns the largest segment file <= the requested offset. 20.2 close(): move the closing } for the for loop to a new line. 20.3 bytesSinceLastIndexEntry is only set but is never read. 20.4 append(): This method returns the offset of the first message to be appended. This is ok for the purpose of returning the offset to the producer. However, when determining whether all replicas have received the appended messages, we need to use the log end offset after the messages are appended. So, what we should do is to have append() return 2 offsets, one before the append and one after the append. We use the former in producer response and use the latter for the replica check. To avoid complicating this patch further, another approach is to, in the jira, have append return the log end offset after the append and use it in both producer response and replica check. We can file a separate jira to have append return 2 offsets. 20.5 read(): The trace statement: last format pattern should be %d instead of %s. 20.6 truncateTo(): The usage of logEndOffset in the following statement is incorrect. It should be the offset of the next segment. segments.view.find(segment => targetOffset >= segment.start && targetOffset < logEndOffset) 20.7 There are several places where we need to create a log segment and the code for creating the new data file and the new index file is duplicate. Could we create a utility function createNewSegment to share the code? 21. LogSegment: bytesSinceLastIndexEntry needs to be updated in append(). 22. FileMessageSet.searchFor(): The following check seems to be a bit strange. Shouldn't we use position + 12 or just position instead? while(position + 8 < size) { 23. OffsetIndex: 23.1 In the comment, "mutable index can be created to" seems to have a grammar bug. 23.2 mmap initialization: The following statement seems unnecessary. However, we do need to set the mapped buffer's position to end of file for mutable indexes. idx.position(idx.limit).asInstanceOf [MappedByteBuffer] 23.3 append(): If index entry is full, should we automatically roll the log segment? It's ok if this is tracked in a separate jira. 23.4 makeReadOnly(): should we call flush after raf.setLength()? Also, should we remap the index file to the current length and make it read only? 24. LogManager.shutdown(): log indentation already adds LogManager in the prefix of each log entry. 25. KafkaApis: 25.1 handleFetchRequest: topicDatas is weird since data is the plural form of datum. How about topicDataMap? 25.2 ProducerRequestPurgatory: It seems that it's useful to keep the logIndent since it can distinguish logs from the ProducerRequestPurgatory and FetchRequestPurgatory. Also, it's probably useful to pass in brokerId to RequestPurgatory for debugging unit tests. 26. Partition: There are a few places that the first character of info log is changed to lower case. The current convention is to already use upper case. 27. javaapi.ByteBufferMessageSet: underlying should be private val. 28. DumpLogSegment: Now that each message stores an offset, we should just print the offset in MessageAndOffset. There is no need for var offset now. 29. FetchedDataChunk: No need to use val for parameters in constructor since this is a case class now. 30. PartitionData: 30.1 No need to redefine equals and hashcode since this is already a case class. 30.2 initialOffset is no longer needed. 31. PartitionTopicInfo.enqueue(): It seems that next can be computed using shallow iterator since the offset of a compressed message is always the offset of the last internal message. 32. ByteBufferMessageSet: In create() and decompress(), we probably should close the output and the input stream in a finally clause in case we hit any exception during compression and decompression. 33. remove unused imports. The following comment from the first round of review is still not addressed. 10. How do we handle the case that a consumer uses too small a fetch size?
          Hide
          Jay Kreps added a comment -

          New patch with a few new things:

          I rebased a few more times to pick up changes.

          WRT Neha's comments:

          • I made maxIndexEntries configurable by adding the property log.index.max.size. I did this in terms of index file size rather than entries since the user doesn't really know the entry size but may care about the file size.
          • For the failing tests: (1) The message set failure is due to scalatest not handling parameterized tests, i had fixed this but somehow it didn't make it into the previous patch. It is in the current one. testHWCheckpointWithFailuresSingleLogSegment is a timing assumption in that test. Fixed it by adding a sleep . The producer test failure I cannot reproduce.
          • Wrote a test case using compressed messages to try to produce the corner case at the end of a segment. But actually this turns out not to be possible with compressed messages since the numbering is by the last offset. So effectively our segments are always dense right now. As such I would rather wait until I refactor segment list to fix it since it will be duplicate work otherwise.
          • Turns out that log segments are limited to 2GB already, via a restriction in the config. Not actually sure why this is. Given this limitation one cleanup that might be nice to do would be to convert MessageSet.sizeInBytes to an Int, which would remove a lot of casts. Since this is an unrelated cleanup I will not do it in this patch.
          • I added support to DumpLogSegment tool to display the index file. I had to revert Jun's change to check that last offset=file size since this is no longer true.

          Jun's Comments:
          First of all, this is an impressively thorough code review. Thanks!
          20.1 Made the Log.findRange comment more reflective of what the method does. I hope to remove this entirely in the next phase.
          20.2 Fixed mangled paren in close()
          20.3 bytesSinceLastIndexEntry. Yes, good catch. This is screwed up. This was moved into LogSegment, but the read and update are split in two places. Fixed.
          20.4 append(): "We need to have both the begin offset and the end offset returned by Log.append()". Made Log.append return (Long, Long). I am not wild about this change, but I see the need. I had to refactor KafkaApis slightly since we were constructing an intermediate response object in the produceToLocalLog method (which was kind of weird anyway) so there was only one offset and since this is an API object we can't change it. I think the use of API objects in the business logic is a bit dangerous for this reason.
          20.5 Fixed broken log statement to use correct format param.
          20.6 truncateTo(): The usage of logEndOffset in the following statement is incorrect. Changed this to use Log.findInRange which I think is the intention.
          20.7 "There are several places where we need to create a log segment and the code for creating the new data file and the new index file is duplicate. Could we create a utility function createNewSegment to share the code?" Good idea, done. There is still a lot more refactoring that could be done between Log and LogSegment, but I am kind of putting that off.
          21. LogSegment: "bytesSinceLastIndexEntry needs to be updated in append()." Fixed.
          22. FileMessageSet.searchFor() fixed bad byte arithmetic.
          23. OffsetIndex:
          23.1 Fixed bad english in comment
          23.2 mmap initialization: Yes, this doesn't make sense. The correct logic is that the mutable case must be set to index 0, and the read-only case doesn't matter. This was happening implicitly since byte buffers initialize to 0, but I switched it to make it explicit.
          23.3 append(): "If index entry is full, should we automatically roll the log segment?" This is already handled in Log.maybeRoll(segment) which checks segment.index.isFull
          23.4 makeReadOnly(): "should we call flush after raf.setLength()?" This is a good point. I think
          what you are saying is that the truncate call itself needs the metadata to flush to be considered stable. Calling force on the mmap after the setLength won't do this. Instead I changed the file open to use synchronous mode "rws" which should automatically fsync metadata when we call setLength. The existing flush is okay: I verified that flush doesn't cause the sparse file to desparsify or anything like that. "Also, should we remap the index file to the current length and make it read only?" Well, this isn't really needed. There is no problem with truncating a file post mmap, but I guess making the mapping read-only could prevent corruption due to any bugs we might have so I made that change.
          LogManager
          24. "log indentation already adds LogManager in the prefix of each log entry." Oops.
          25. KafkaApis:
          25.1 "handleFetchRequest: topicDatas is weird since data is the plural form of datum. How about topicDataMap?" Changed to dataRead (I don't like having the type in the name).
          25.2 "ProducerRequestPurgatory: It seems that it's useful to keep the logIndent since it can distinguish logs from the ProducerRequestPurgatory and FetchRequestPurgatory. Also, it's probably useful to pass in brokerId to RequestPurgatory for debugging unit tests." Agreed, accidentally removed this; added it back.
          26. "Partition: There are a few places that the first character of info log is changed to lower case. The current convention is to already use upper case." Made all upper case.
          27. "javaapi.ByteBufferMessageSet: underlying should be private val." Changed.
          28. "DumpLogSegment: Now that each message stores an offset, we should just print the offset in MessageAndOffset. There is no need for var offset now." Removed.
          29. "FetchedDataChunk: No need to use val for parameters in constructor since this is a case class now." Wait is everything a val in a case class? I made this change, but don't know what it means...
          30. PartitionData:
          30.1 "No need to redefine equals and hashcode since this is already a case class." Yeah, this was fixing a bug in the equals/hashcode stuff due to the array that went away when i rebased. Removed it
          30.2 "initialOffset is no longer needed." I think PartitionData is also used by ProducerRequest. This is a bug, but I think we do need the initial offset for the other case. Until we separate these two I don't think I can remove it.
          31. "PartitionTopicInfo.enqueue(): It seems that next can be computed using shallow iterator." Ah, very nice. Changed that.
          32. "ByteBufferMessageSet: In create() and decompress(), we probably should close the output and the input stream in a finally clause in case we hit any exception during compression and decompression." These are not real output streams. I can close them, but they are just arrays so I think it is just noise, no?
          33. "remove unused imports." Eclipse doesn't identify them, will swing by.
          34. "How do we handle the case that a consumer uses too small a fetch size?" Added a check and throw for this in ConsumerIterator.

          Show
          Jay Kreps added a comment - New patch with a few new things: I rebased a few more times to pick up changes. WRT Neha's comments: I made maxIndexEntries configurable by adding the property log.index.max.size. I did this in terms of index file size rather than entries since the user doesn't really know the entry size but may care about the file size. For the failing tests: (1) The message set failure is due to scalatest not handling parameterized tests, i had fixed this but somehow it didn't make it into the previous patch. It is in the current one. testHWCheckpointWithFailuresSingleLogSegment is a timing assumption in that test. Fixed it by adding a sleep . The producer test failure I cannot reproduce. Wrote a test case using compressed messages to try to produce the corner case at the end of a segment. But actually this turns out not to be possible with compressed messages since the numbering is by the last offset. So effectively our segments are always dense right now. As such I would rather wait until I refactor segment list to fix it since it will be duplicate work otherwise. Turns out that log segments are limited to 2GB already, via a restriction in the config. Not actually sure why this is. Given this limitation one cleanup that might be nice to do would be to convert MessageSet.sizeInBytes to an Int, which would remove a lot of casts. Since this is an unrelated cleanup I will not do it in this patch. I added support to DumpLogSegment tool to display the index file. I had to revert Jun's change to check that last offset=file size since this is no longer true. Jun's Comments: First of all, this is an impressively thorough code review. Thanks! 20.1 Made the Log.findRange comment more reflective of what the method does. I hope to remove this entirely in the next phase. 20.2 Fixed mangled paren in close() 20.3 bytesSinceLastIndexEntry. Yes, good catch. This is screwed up. This was moved into LogSegment, but the read and update are split in two places. Fixed. 20.4 append(): "We need to have both the begin offset and the end offset returned by Log.append()". Made Log.append return (Long, Long). I am not wild about this change, but I see the need. I had to refactor KafkaApis slightly since we were constructing an intermediate response object in the produceToLocalLog method (which was kind of weird anyway) so there was only one offset and since this is an API object we can't change it. I think the use of API objects in the business logic is a bit dangerous for this reason. 20.5 Fixed broken log statement to use correct format param. 20.6 truncateTo(): The usage of logEndOffset in the following statement is incorrect. Changed this to use Log.findInRange which I think is the intention. 20.7 "There are several places where we need to create a log segment and the code for creating the new data file and the new index file is duplicate. Could we create a utility function createNewSegment to share the code?" Good idea, done. There is still a lot more refactoring that could be done between Log and LogSegment, but I am kind of putting that off. 21. LogSegment: "bytesSinceLastIndexEntry needs to be updated in append()." Fixed. 22. FileMessageSet.searchFor() fixed bad byte arithmetic. 23. OffsetIndex: 23.1 Fixed bad english in comment 23.2 mmap initialization: Yes, this doesn't make sense. The correct logic is that the mutable case must be set to index 0, and the read-only case doesn't matter. This was happening implicitly since byte buffers initialize to 0, but I switched it to make it explicit. 23.3 append(): "If index entry is full, should we automatically roll the log segment?" This is already handled in Log.maybeRoll(segment) which checks segment.index.isFull 23.4 makeReadOnly(): "should we call flush after raf.setLength()?" This is a good point. I think what you are saying is that the truncate call itself needs the metadata to flush to be considered stable. Calling force on the mmap after the setLength won't do this. Instead I changed the file open to use synchronous mode "rws" which should automatically fsync metadata when we call setLength. The existing flush is okay: I verified that flush doesn't cause the sparse file to desparsify or anything like that. "Also, should we remap the index file to the current length and make it read only?" Well, this isn't really needed. There is no problem with truncating a file post mmap, but I guess making the mapping read-only could prevent corruption due to any bugs we might have so I made that change. LogManager 24. "log indentation already adds LogManager in the prefix of each log entry." Oops. 25. KafkaApis: 25.1 "handleFetchRequest: topicDatas is weird since data is the plural form of datum. How about topicDataMap?" Changed to dataRead (I don't like having the type in the name). 25.2 "ProducerRequestPurgatory: It seems that it's useful to keep the logIndent since it can distinguish logs from the ProducerRequestPurgatory and FetchRequestPurgatory. Also, it's probably useful to pass in brokerId to RequestPurgatory for debugging unit tests." Agreed, accidentally removed this; added it back. 26. "Partition: There are a few places that the first character of info log is changed to lower case. The current convention is to already use upper case." Made all upper case. 27. "javaapi.ByteBufferMessageSet: underlying should be private val." Changed. 28. "DumpLogSegment: Now that each message stores an offset, we should just print the offset in MessageAndOffset. There is no need for var offset now." Removed. 29. "FetchedDataChunk: No need to use val for parameters in constructor since this is a case class now." Wait is everything a val in a case class? I made this change, but don't know what it means... 30. PartitionData: 30.1 "No need to redefine equals and hashcode since this is already a case class." Yeah, this was fixing a bug in the equals/hashcode stuff due to the array that went away when i rebased. Removed it 30.2 "initialOffset is no longer needed." I think PartitionData is also used by ProducerRequest. This is a bug, but I think we do need the initial offset for the other case. Until we separate these two I don't think I can remove it. 31. "PartitionTopicInfo.enqueue(): It seems that next can be computed using shallow iterator." Ah, very nice. Changed that. 32. "ByteBufferMessageSet: In create() and decompress(), we probably should close the output and the input stream in a finally clause in case we hit any exception during compression and decompression." These are not real output streams. I can close them, but they are just arrays so I think it is just noise, no? 33. "remove unused imports." Eclipse doesn't identify them, will swing by. 34. "How do we handle the case that a consumer uses too small a fetch size?" Added a check and throw for this in ConsumerIterator.
          Jay Kreps made changes -
          Attachment KAFKA-506-phase-2-v3.patch [ 12547447 ]
          Hide
          Jay Kreps added a comment -

          Ran system test, passes:

          2012-10-02 14:11:50,376 - INFO - ======================================================
          2012-10-02 14:11:50,376 - INFO - stopping all entities
          2012-10-02 14:11:50,376 - INFO - ======================================================

          2012-10-02 14:12:43,105 - INFO - =================================================
          2012-10-02 14:12:43,105 - INFO - TEST REPORTS
          2012-10-02 14:12:43,105 - INFO - =================================================
          2012-10-02 14:12:43,105 - INFO - test_case_name : testcase_1
          2012-10-02 14:12:43,105 - INFO - test_class_name : ReplicaBasicTest
          2012-10-02 14:12:43,105 - INFO - validation_status :
          2012-10-02 14:12:43,105 - INFO - Leader Election Latency - iter 2 brokerid 3 : 49636.00 ms
          2012-10-02 14:12:43,105 - INFO - Validate leader election successful : PASSED
          2012-10-02 14:12:43,106 - INFO - Unique messages from consumer : 850
          2012-10-02 14:12:43,106 - INFO - Validate for data matched : PASSED
          2012-10-02 14:12:43,106 - INFO - Unique messages from producer : 850
          2012-10-02 14:12:43,106 - INFO - Leader Election Latency - iter 1 brokerid 2 : 354.00 ms

          Show
          Jay Kreps added a comment - Ran system test, passes: 2012-10-02 14:11:50,376 - INFO - ====================================================== 2012-10-02 14:11:50,376 - INFO - stopping all entities 2012-10-02 14:11:50,376 - INFO - ====================================================== 2012-10-02 14:12:43,105 - INFO - ================================================= 2012-10-02 14:12:43,105 - INFO - TEST REPORTS 2012-10-02 14:12:43,105 - INFO - ================================================= 2012-10-02 14:12:43,105 - INFO - test_case_name : testcase_1 2012-10-02 14:12:43,105 - INFO - test_class_name : ReplicaBasicTest 2012-10-02 14:12:43,105 - INFO - validation_status : 2012-10-02 14:12:43,105 - INFO - Leader Election Latency - iter 2 brokerid 3 : 49636.00 ms 2012-10-02 14:12:43,105 - INFO - Validate leader election successful : PASSED 2012-10-02 14:12:43,106 - INFO - Unique messages from consumer : 850 2012-10-02 14:12:43,106 - INFO - Validate for data matched : PASSED 2012-10-02 14:12:43,106 - INFO - Unique messages from producer : 850 2012-10-02 14:12:43,106 - INFO - Leader Election Latency - iter 1 brokerid 2 : 354.00 ms
          Hide
          Jun Rao added a comment -

          Thanks for patch v3. We are almost there. A few more comments:

          40. Log.append: It seems that it's easier if lastOffset returned is just nextOffset instead of nextOffset -1. Then, in KafkaApis, we can just pass end, instead of end+1 to ProducerResponseStatus.

          41. OffsetIndex: When initializing mmap, if the index is mutable, shouldn't we move the position to the end of the buffer for append operations?

          42. KafkaApis: It's useful to pass in brokerId to RequestPurgatory for debugging unit tests.

          43. DumpLogSegments: Currently, the message iterator in FileMessageSet will stop when it hits the first non parsable message. So, we need to check if at the end of the message iteration, location == FileMessageSet.sizeInBytes(). If not, we should report the offset from which data is corrupted.

          44. ConsumerIterator: The check for guarding small fetch size doesn't work. This is because in PartitionTopicInfo.enqueue(), we only add ByteBufferMessageSet that has positive valid bytes. We can log an error in PartitionTopicInfo.enqueue() and enqueue a special instance of FetchedDataChunk that indicates an error. In ConsumerIterator, when seeing the special FetchedDataChunk, it can throw an exception.

          29. Yes, all parameters in the constructor in a case class are implicitly val.

          Show
          Jun Rao added a comment - Thanks for patch v3. We are almost there. A few more comments: 40. Log.append: It seems that it's easier if lastOffset returned is just nextOffset instead of nextOffset -1. Then, in KafkaApis, we can just pass end, instead of end+1 to ProducerResponseStatus. 41. OffsetIndex: When initializing mmap, if the index is mutable, shouldn't we move the position to the end of the buffer for append operations? 42. KafkaApis: It's useful to pass in brokerId to RequestPurgatory for debugging unit tests. 43. DumpLogSegments: Currently, the message iterator in FileMessageSet will stop when it hits the first non parsable message. So, we need to check if at the end of the message iteration, location == FileMessageSet.sizeInBytes(). If not, we should report the offset from which data is corrupted. 44. ConsumerIterator: The check for guarding small fetch size doesn't work. This is because in PartitionTopicInfo.enqueue(), we only add ByteBufferMessageSet that has positive valid bytes. We can log an error in PartitionTopicInfo.enqueue() and enqueue a special instance of FetchedDataChunk that indicates an error. In ConsumerIterator, when seeing the special FetchedDataChunk, it can throw an exception. 29. Yes, all parameters in the constructor in a case class are implicitly val.
          Hide
          Jun Rao added a comment -

          There is another issue:

          45. ConsumerIterator: Now that we index each message inside a compressed message, we need to handle the case when a fetch request starting on an offset in the middle of a compressed message. In makeNext(), we need to first skip messages whose offset is less than currentDataChunk.fetchOffset. Otherwise, the consumer would get duplicates. We probably can do this in a followup jira since currently the consumer can get duplicates on compressed messages too.

          Show
          Jun Rao added a comment - There is another issue: 45. ConsumerIterator: Now that we index each message inside a compressed message, we need to handle the case when a fetch request starting on an offset in the middle of a compressed message. In makeNext(), we need to first skip messages whose offset is less than currentDataChunk.fetchOffset. Otherwise, the consumer would get duplicates. We probably can do this in a followup jira since currently the consumer can get duplicates on compressed messages too.
          Hide
          Jay Kreps added a comment -

          Here is a new patch that addresses these comments. I also did an incremental diff against the previous patch so you can see the specific changes for the below items (that is KAFKA-506-v4-changes-since-v3.patch)

          Also rebased again.

          40. I actually disagree. It is more code to add and subtract, but I think it makes more sense. This way we would say the append api returns "the first and last offset for the messages you appended" rather than "the first offset for the messages you appended and the offset of the next message that would be appended". This is not a huge deal so I can go either way, but I did think about it both ways and that was my rationale.

          41. My thinking was that there were only two cases: re-creating a new, mutable index (at position 0) and opening a read-only index. In reality there are three cases: in addition to the previous two you can be re-opening an existing log that went through clean shutdown. I was not handling this properly and in fact was truncating the index on re-open, so the existing entries in the last segment would be unindexed. There are now two cases for mutable indexes. Recall that on clean-shutdown the index is always truncated to the max valid entry. So now when we open an index, if the file exists, I set the position to the end of the file. If the file doesn't exist I allocate it and start at position 0. The recovery process well still re-create the index if it runs, if the shutdown was clean then we will just roll to a new segment on the first append (since the index was truncated, it is now full).

          43. I removed that feature since the iterator only has the offset not the file position. However after thinking about it I can add it back by just using MessageSet.entrySize(message) on each entry and use the sum of these to compare to the messageSet.sizeInBytes. Added that.

          44. Changed the check to be the messageSet.sizeInBytes. This check was really meant to guard the case where we are at the end of the log and get an empty set. I think it was using validBytes because it needed to calculate the next offset. Now that calculation is gone, so I think it is okay to just use messageSet.sizeInBytes. This would result in a set with 0 valid bytes being enqueued, and then the error getting thrown to the consumer. The fetcher would likely continue to fetch this message set, but that should be bounded by the consumer queue size.

          45. The behavior after this patch should be exactly the same as the current behavior, so my hope was to do this as a follow up patch.

          Also: Found that I wasn't closing the index when the log was closed, and found a bug in the index re-creation logic in recovery; fixed both, and expanded tests for this.

          Show
          Jay Kreps added a comment - Here is a new patch that addresses these comments. I also did an incremental diff against the previous patch so you can see the specific changes for the below items (that is KAFKA-506 -v4-changes-since-v3.patch) Also rebased again. 40. I actually disagree. It is more code to add and subtract, but I think it makes more sense. This way we would say the append api returns "the first and last offset for the messages you appended" rather than "the first offset for the messages you appended and the offset of the next message that would be appended". This is not a huge deal so I can go either way, but I did think about it both ways and that was my rationale. 41. My thinking was that there were only two cases: re-creating a new, mutable index (at position 0) and opening a read-only index. In reality there are three cases: in addition to the previous two you can be re-opening an existing log that went through clean shutdown. I was not handling this properly and in fact was truncating the index on re-open, so the existing entries in the last segment would be unindexed. There are now two cases for mutable indexes. Recall that on clean-shutdown the index is always truncated to the max valid entry. So now when we open an index, if the file exists, I set the position to the end of the file. If the file doesn't exist I allocate it and start at position 0. The recovery process well still re-create the index if it runs, if the shutdown was clean then we will just roll to a new segment on the first append (since the index was truncated, it is now full). 43. I removed that feature since the iterator only has the offset not the file position. However after thinking about it I can add it back by just using MessageSet.entrySize(message) on each entry and use the sum of these to compare to the messageSet.sizeInBytes. Added that. 44. Changed the check to be the messageSet.sizeInBytes. This check was really meant to guard the case where we are at the end of the log and get an empty set. I think it was using validBytes because it needed to calculate the next offset. Now that calculation is gone, so I think it is okay to just use messageSet.sizeInBytes. This would result in a set with 0 valid bytes being enqueued, and then the error getting thrown to the consumer. The fetcher would likely continue to fetch this message set, but that should be bounded by the consumer queue size. 45. The behavior after this patch should be exactly the same as the current behavior, so my hope was to do this as a follow up patch. Also: Found that I wasn't closing the index when the log was closed, and found a bug in the index re-creation logic in recovery; fixed both, and expanded tests for this.
          Jay Kreps made changes -
          Attachment KAFKA-506-phase-2-v4.patch [ 12547593 ]
          Attachment KAFKA-506-v4-changes-since-v3.patch [ 12547594 ]
          Hide
          Jun Rao added a comment -

          Patch v4 looks good overall. A couple of remaining issues:

          50. testCompressionSetConsumption seems to fail transiently for me with the following exception. This seems to be related to the change made for #44.
          kafka.common.MessageSizeTooLargeException: The broker contains a message larger than the maximum fetch size of this consumer. Increase the fetch size, or decrease the maximum message size the broker will allow.
          at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:87)

          51. ConsumerIterator: When throwing MessageSizeTooLargeException, could we add the topic/partition/offset to the message string in the exception?

          Show
          Jun Rao added a comment - Patch v4 looks good overall. A couple of remaining issues: 50. testCompressionSetConsumption seems to fail transiently for me with the following exception. This seems to be related to the change made for #44. kafka.common.MessageSizeTooLargeException: The broker contains a message larger than the maximum fetch size of this consumer. Increase the fetch size, or decrease the maximum message size the broker will allow. at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:87) 51. ConsumerIterator: When throwing MessageSizeTooLargeException, could we add the topic/partition/offset to the message string in the exception?
          Hide
          Jay Kreps added a comment -

          Rebased again and fixed the above issues to make v5

          50. I looked into this. It is slightly subtle. The problem was that validBytes is cached in a local variable, and the incremental computation was done on the member variable in ByteBufferMessageSet. The next problem was that AbstractFetcherThread and the ConsumerIterator could both be calling this at the same time, which would lead to setting validBytes to 0 and then iterating over the messages to count the bytes. If the check and the computation occurred at precisely the same time it is possible for validBytes to return essentially any value. The fix is (1) avoid mucking with the MessageSet once it is handed over to ConsumerFetcherThread.processPartitionData, and (2) use a local variable to compute the validbytes, this way even if we do have future threading bugs the worst case is that we recompute the same cached value twice instead of accessing a partial computation (we could also make the variable volatile, but that doesn't really add any additional protection since we don't need precise memory visibility).

          51. Done.

          Show
          Jay Kreps added a comment - Rebased again and fixed the above issues to make v5 50. I looked into this. It is slightly subtle. The problem was that validBytes is cached in a local variable, and the incremental computation was done on the member variable in ByteBufferMessageSet. The next problem was that AbstractFetcherThread and the ConsumerIterator could both be calling this at the same time, which would lead to setting validBytes to 0 and then iterating over the messages to count the bytes. If the check and the computation occurred at precisely the same time it is possible for validBytes to return essentially any value. The fix is (1) avoid mucking with the MessageSet once it is handed over to ConsumerFetcherThread.processPartitionData, and (2) use a local variable to compute the validbytes, this way even if we do have future threading bugs the worst case is that we recompute the same cached value twice instead of accessing a partial computation (we could also make the variable volatile, but that doesn't really add any additional protection since we don't need precise memory visibility). 51. Done.
          Jay Kreps made changes -
          Attachment KAFKA-506-phase-2-v5.patch [ 12547788 ]
          Hide
          Jun Rao added a comment -

          Thanks for patch v5.

          50. There is still a potential issue in that shallowValidByteCount is a long and long value is not guaranteed to be exposed atomically without synchronization in java. So, 1 thread could see a partially updated long value. Thinking about this, since ByteBufferMessageSet is not updatable, is it better to compute validBytes once in the constructor?

          51. ConsumerIterator: Could you include currentDataChunk.fetchOffset in the message string in MessageSizeTooLargeException? This will make debugging easier.

          Since this is a large patch, it would be good if someone else takes a closer look at it too. At least Neha expressed interests in taking another look at the latest patch.

          Show
          Jun Rao added a comment - Thanks for patch v5. 50. There is still a potential issue in that shallowValidByteCount is a long and long value is not guaranteed to be exposed atomically without synchronization in java. So, 1 thread could see a partially updated long value. Thinking about this, since ByteBufferMessageSet is not updatable, is it better to compute validBytes once in the constructor? 51. ConsumerIterator: Could you include currentDataChunk.fetchOffset in the message string in MessageSizeTooLargeException? This will make debugging easier. Since this is a large patch, it would be good if someone else takes a closer look at it too. At least Neha expressed interests in taking another look at the latest patch.
          Hide
          Jay Kreps added a comment -

          50. It can actually only take Int values, so I don't think this can happen. I will file a follow-up clean-up issue to change sizeInBytes to be an Int (I had mentioned that earlier in the thread) since this anyways leads to innumerable safe-but-annoying casts to int. I think this is better than pre-computing it because in many cases we instantiate a ByteBufferMessageSet without necessarily using validBytes.

          51. Yes, I will add this as part of the checkin.

          Show
          Jay Kreps added a comment - 50. It can actually only take Int values, so I don't think this can happen. I will file a follow-up clean-up issue to change sizeInBytes to be an Int (I had mentioned that earlier in the thread) since this anyways leads to innumerable safe-but-annoying casts to int. I think this is better than pre-computing it because in many cases we instantiate a ByteBufferMessageSet without necessarily using validBytes. 51. Yes, I will add this as part of the checkin.
          Hide
          Neha Narkhede added a comment -

          I will free up tomorrow after Grace Hopper conference is over. Would like to take another closer look at the follow up patches. If you guys don't mind, please can we hold this at least for this weekend ?

          Show
          Neha Narkhede added a comment - I will free up tomorrow after Grace Hopper conference is over. Would like to take another closer look at the follow up patches. If you guys don't mind, please can we hold this at least for this weekend ?
          Hide
          Jay Kreps added a comment -

          It is really hard/error-prone to keep this patch alive and functioning, I basically spend half of each day on rebasing then debugging the new bugs i introduce during rebasing. Could we do it as a post commit review? I am totally down to fix/change things, but the problem is each new change may take a few iterations and meanwhile the whole hunk has to be kept alive. In an ideal world I would have found a way to have done this in smaller pieces, but it is kind of a cross-cutting change so that was hard.

          Show
          Jay Kreps added a comment - It is really hard/error-prone to keep this patch alive and functioning, I basically spend half of each day on rebasing then debugging the new bugs i introduce during rebasing. Could we do it as a post commit review? I am totally down to fix/change things, but the problem is each new change may take a few iterations and meanwhile the whole hunk has to be kept alive. In an ideal world I would have found a way to have done this in smaller pieces, but it is kind of a cross-cutting change so that was hard.
          Hide
          Jun Rao added a comment -

          What we can do is to hold off committing other conflicting patches for now and have this patch more thoroughly reviewed. If there are no major concerns, we can just commit the patch and have follow-up jiras to address minor issues. Neha, do you think that you can finish the review by Saturday?

          Show
          Jun Rao added a comment - What we can do is to hold off committing other conflicting patches for now and have this patch more thoroughly reviewed. If there are no major concerns, we can just commit the patch and have follow-up jiras to address minor issues. Neha, do you think that you can finish the review by Saturday?
          Hide
          Joel Koshy added a comment -

          Rebasing is painful for sure, especially since 0.8 is moving quite fast. I think the other patches in flight are either small or otherwise straightforward to rebase as they don't have significant overlap. So it seems holding off all check-ins until after this weekend would work for everyone right?

          Show
          Joel Koshy added a comment - Rebasing is painful for sure, especially since 0.8 is moving quite fast. I think the other patches in flight are either small or otherwise straightforward to rebase as they don't have significant overlap. So it seems holding off all check-ins until after this weekend would work for everyone right?
          Hide
          Jay Kreps added a comment -

          jkreps-mn:kafka-git jkreps$ git pull
          remote: Counting objects: 72, done.
          remote: Compressing objects: 100% (37/37), done.
          remote: Total 42 (delta 26), reused 0 (delta 0)
          Unpacking objects: 100% (42/42), done.
          From git://git.apache.org/kafka
          0aa1500..65e139c 0.8 -> origin/0.8
          Auto-merging core/src/main/scala/kafka/api/FetchResponse.scala
          CONFLICT (content): Merge conflict in core/src/main/scala/kafka/api/FetchResponse.scala
          Auto-merging core/src/main/scala/kafka/api/ProducerRequest.scala
          CONFLICT (content): Merge conflict in core/src/main/scala/kafka/api/ProducerRequest.scala
          Auto-merging core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
          Auto-merging core/src/main/scala/kafka/server/AbstractFetcherThread.scala
          CONFLICT (content): Merge conflict in core/src/main/scala/kafka/server/AbstractFetcherThread.scala
          Auto-merging core/src/main/scala/kafka/server/KafkaApis.scala
          CONFLICT (content): Merge conflict in core/src/main/scala/kafka/server/KafkaApis.scala
          Auto-merging core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
          Auto-merging core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
          Auto-merging core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
          Auto-merging core/src/test/scala/unit/kafka/utils/TestUtils.scala
          Automatic merge failed; fix conflicts and then commit the result.

          Show
          Jay Kreps added a comment - jkreps-mn:kafka-git jkreps$ git pull remote: Counting objects: 72, done. remote: Compressing objects: 100% (37/37), done. remote: Total 42 (delta 26), reused 0 (delta 0) Unpacking objects: 100% (42/42), done. From git://git.apache.org/kafka 0aa1500..65e139c 0.8 -> origin/0.8 Auto-merging core/src/main/scala/kafka/api/FetchResponse.scala CONFLICT (content): Merge conflict in core/src/main/scala/kafka/api/FetchResponse.scala Auto-merging core/src/main/scala/kafka/api/ProducerRequest.scala CONFLICT (content): Merge conflict in core/src/main/scala/kafka/api/ProducerRequest.scala Auto-merging core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala Auto-merging core/src/main/scala/kafka/server/AbstractFetcherThread.scala CONFLICT (content): Merge conflict in core/src/main/scala/kafka/server/AbstractFetcherThread.scala Auto-merging core/src/main/scala/kafka/server/KafkaApis.scala CONFLICT (content): Merge conflict in core/src/main/scala/kafka/server/KafkaApis.scala Auto-merging core/src/main/scala/kafka/server/ReplicaFetcherThread.scala Auto-merging core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala Auto-merging core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala Auto-merging core/src/test/scala/unit/kafka/utils/TestUtils.scala Automatic merge failed; fix conflicts and then commit the result.
          Hide
          Jay Kreps added a comment -

          Rebased patch and improved error message for the MessageSizeTooLargeException.

          Show
          Jay Kreps added a comment - Rebased patch and improved error message for the MessageSizeTooLargeException.
          Jay Kreps made changes -
          Attachment KAFKA-506-phase-2-v5.patch [ 12548036 ]
          Hide
          Neha Narkhede added a comment -

          Agree that rebasing is painful. In addition to more reviews, the hope was to check in ~20 more test cases as part of KAFKA-502, so we could test it out thoroughly. But we can check it in and return to fixing issues later as well.

          Show
          Neha Narkhede added a comment - Agree that rebasing is painful. In addition to more reviews, the hope was to check in ~20 more test cases as part of KAFKA-502 , so we could test it out thoroughly. But we can check it in and return to fixing issues later as well.
          Hide
          Neha Narkhede added a comment -

          Btw, which svn revision does patch v5 apply correctly on ?

          Show
          Neha Narkhede added a comment - Btw, which svn revision does patch v5 apply correctly on ?
          Hide
          Jay Kreps added a comment -

          Should apply on HEAD.

          Show
          Jay Kreps added a comment - Should apply on HEAD.
          Hide
          Jun Rao added a comment -

          +1 from me.

          Show
          Jun Rao added a comment - +1 from me.
          Hide
          Jay Kreps added a comment -

          Committed.

          Show
          Jay Kreps added a comment - Committed.
          Jay Kreps made changes -
          Status Open [ 1 ] Resolved [ 5 ]
          Resolution Fixed [ 1 ]
          Hide
          Neha Narkhede added a comment - - edited

          I have to mention that there is a possibility that some of my comments are not related to this patch directly, but were found while inspecting the new code closely Since you know the code better, feel free to file follow up JIRAs

          1. Log

          1.2 In findRange, the following statements runs the risk of hitting overflow, giving incorrect results from the binary search -
          val mid = ceil((high + low) / 2.0).toInt
          Will probably be better to use
          val mid = low + ceil((high - low)/2.0).toInt
          1.3 It seems that there are only 2 usages of the findRange API that takes in the array length . We already have an API that covers that use case - findRange[T <: Range](ranges: Array[T], value: Long) and this is used by a majority of API calls.
          We can make the findRange method that has the actual binary search logic private and changes the 2 use cases in Log.scala to use the public method that assumes the array length.
          1.4 In truncateTo, it is possible that the log file was successfully deleted but the index file was not. In this case, we would end up an unused index file that is never deleted from the kafka log directory.
          1.5 In loadSegments, we need to rebuild any missing index files. Or it will error out at a later time. Do we have a follow up JIRA to cover this, it seems like a blocker to me.

          2. LogManager
          2.1 numPartitions is an unused class variable

          3. FileMessageSet
          3.1. In searchFor API, fix comment to mention that it searches for the first/least offset that is >= the target offset. Right now it says search for the last offset that is >= target offset
          3.2 The searchFor API returns a pair of (offset, position). Right now, it does not always return the offset of the message at the returned position. If the file message set is sparse, it returns the offset of the next message, so the offset and position do not point to the same message in the log. Currently, we are not using the offset returned by the read() API, but in the future if we do, it will be good for it to be consistent.
          3.3 In searchFor API, one of the statements uses 12 and the other uses MessageSet.LogOverhead. I think the while condition is better understood if it said MessageSet.LogOverhead.

          4. LogSegment
          4.1 It is better to make translateOffset return an Option. That way, every usage of this API will be forced to handle the case when the position was not found in the log segment.
          4.2 I guess it might make sense to have all the places that uses this segment size to a an Int instead of Long.

          5. ConsumerIterator

          Right now, while committing offsets for a compressed message set, the consumer can still get duplicates. However, we could probably fix this by making the ConsumerIterator smarter and discarding messages with offset < fetch offset.

          6. ReplicaFetcherThread

          When the follower fetches data from the leader, it uses log.append which re-computes the logical message ids. This involves recompression when the data is compressed, which it is in production. This can be avoided by making the data copy from leader -> follower smarter

          7. MessageCompressionTest
          There are 2 unused imports in this file

          8. ByteBufferMessageSet
          8.1 There are 3 unused imports in this file
          8.2 The return statement in create() API is redundant

          9. OffsetIndex
          9.1 The last return statement in indexSlotFor is redundant
          9.1 The first return statement in indexSlotFor can be safely removed by using case-match or putting the rest of the logic in the else part of if-else block.

          10. Performance
          Performance test to see the impact on throughput/latency if any due to this patch. What I am curious about is the performance impact due to the following, which are the changes that can impact performance as compared to pre KAFKA-506 -
          10.1 Recompression of data during replica reads
          10.2 Recompression of data to assign correct offsets inside a compressed message set
          10.3 The linear search in the file segment to find the message with a given id. This depends on the index interval and there needs to be a balance between index size and index interval.
          10.4 The impact of making the log memory mapped.
          10.5 Overhead of using the index to read/write data in Kafka

          11. KafkaApis
          Unused imports in this file

          Just to summarize so that we understand the follow up work and also the JIRAs that got automatically resolved due to this feature. Please correct me if I missed something here -

          Follow up JIRAs
          1. Retain key in producer (KAFKA-544)
          2. Change sizeInBytes() to Int (KAFKA-556)
          3. Fix consumer offset commit in ConsumerIterator for compressed message sets (KAFKA-546)
          4. Remove the recompression involved while fetching data from follower to leader (KAFKA-557)
          5. Rebuild missing index files (KAFKA-561)
          6. Add performance test for log subsystem (KAFKA-545)
          7. Overall Performance analysis due to the factors listed above

          JIRAs resolved due to this feature
          1. Fix offsets returned as part of producer response (KAFKA-511)
          2. Consumer offset issue during unclean leader election (KAFKA-497)

          Show
          Neha Narkhede added a comment - - edited I have to mention that there is a possibility that some of my comments are not related to this patch directly, but were found while inspecting the new code closely Since you know the code better, feel free to file follow up JIRAs 1. Log 1.2 In findRange, the following statements runs the risk of hitting overflow, giving incorrect results from the binary search - val mid = ceil((high + low) / 2.0).toInt Will probably be better to use val mid = low + ceil((high - low)/2.0).toInt 1.3 It seems that there are only 2 usages of the findRange API that takes in the array length . We already have an API that covers that use case - findRange [T <: Range] (ranges: Array [T] , value: Long) and this is used by a majority of API calls. We can make the findRange method that has the actual binary search logic private and changes the 2 use cases in Log.scala to use the public method that assumes the array length. 1.4 In truncateTo, it is possible that the log file was successfully deleted but the index file was not. In this case, we would end up an unused index file that is never deleted from the kafka log directory. 1.5 In loadSegments, we need to rebuild any missing index files. Or it will error out at a later time. Do we have a follow up JIRA to cover this, it seems like a blocker to me. 2. LogManager 2.1 numPartitions is an unused class variable 3. FileMessageSet 3.1. In searchFor API, fix comment to mention that it searches for the first/least offset that is >= the target offset. Right now it says search for the last offset that is >= target offset 3.2 The searchFor API returns a pair of (offset, position). Right now, it does not always return the offset of the message at the returned position. If the file message set is sparse, it returns the offset of the next message, so the offset and position do not point to the same message in the log. Currently, we are not using the offset returned by the read() API, but in the future if we do, it will be good for it to be consistent. 3.3 In searchFor API, one of the statements uses 12 and the other uses MessageSet.LogOverhead. I think the while condition is better understood if it said MessageSet.LogOverhead. 4. LogSegment 4.1 It is better to make translateOffset return an Option. That way, every usage of this API will be forced to handle the case when the position was not found in the log segment. 4.2 I guess it might make sense to have all the places that uses this segment size to a an Int instead of Long. 5. ConsumerIterator Right now, while committing offsets for a compressed message set, the consumer can still get duplicates. However, we could probably fix this by making the ConsumerIterator smarter and discarding messages with offset < fetch offset. 6. ReplicaFetcherThread When the follower fetches data from the leader, it uses log.append which re-computes the logical message ids. This involves recompression when the data is compressed, which it is in production. This can be avoided by making the data copy from leader -> follower smarter 7. MessageCompressionTest There are 2 unused imports in this file 8. ByteBufferMessageSet 8.1 There are 3 unused imports in this file 8.2 The return statement in create() API is redundant 9. OffsetIndex 9.1 The last return statement in indexSlotFor is redundant 9.1 The first return statement in indexSlotFor can be safely removed by using case-match or putting the rest of the logic in the else part of if-else block. 10. Performance Performance test to see the impact on throughput/latency if any due to this patch. What I am curious about is the performance impact due to the following, which are the changes that can impact performance as compared to pre KAFKA-506 - 10.1 Recompression of data during replica reads 10.2 Recompression of data to assign correct offsets inside a compressed message set 10.3 The linear search in the file segment to find the message with a given id. This depends on the index interval and there needs to be a balance between index size and index interval. 10.4 The impact of making the log memory mapped. 10.5 Overhead of using the index to read/write data in Kafka 11. KafkaApis Unused imports in this file Just to summarize so that we understand the follow up work and also the JIRAs that got automatically resolved due to this feature. Please correct me if I missed something here - Follow up JIRAs 1. Retain key in producer ( KAFKA-544 ) 2. Change sizeInBytes() to Int ( KAFKA-556 ) 3. Fix consumer offset commit in ConsumerIterator for compressed message sets ( KAFKA-546 ) 4. Remove the recompression involved while fetching data from follower to leader ( KAFKA-557 ) 5. Rebuild missing index files ( KAFKA-561 ) 6. Add performance test for log subsystem ( KAFKA-545 ) 7. Overall Performance analysis due to the factors listed above JIRAs resolved due to this feature 1. Fix offsets returned as part of producer response ( KAFKA-511 ) 2. Consumer offset issue during unclean leader election ( KAFKA-497 )
          Hide
          Jay Kreps added a comment -

          Hi Neha, here are some comments on your comments and a patch that addresses the comments we are in agreement on.

          1. Log
          1.2, 1.3 True. This problem exists in both OffsetIndex and Log, though I don't think either are actually possible. In Log this requires one to have 2 billion segment files, though, which is not physically possible; in OffsetIndex one would need to have ~2 billion entries in an index, which isn't possible as the message overhead would fill up the log segment first. I am going to leave it alone in Log since that code I want to delete asap anyway. I fixed it in the OffsetIndex since that code is meant to last.
          1.4. This logic is a little odd, I will fix it, but actually this reminds me of a bigger problem. If file.delete() fails on the log file, the presence of that log file will effectively corrupt the log on restart (since we will have a file with the given offset but will also start another log with a parallel offset that we actually append to--on restart the bad file will mask part of the new file). Obviously if file.delete() fails things are pretty fucked and there is nothing we can do in software to recover. So what I would like to do is throw KafkaStorageException and have Partition.makeFollower() shut down the server. What would happen in the leadership transfer if I did that?
          1.5 Filed a JIRA for this.

          LogManager
          2.1 Deleted numPartitions (not related to this patch, I don't think)

          FileMessageSet
          3.1 Good catch, fixed.
          3.2 Right, so I return the offset specifically to be able to differentiate the case where I found the exact location versus the next message. This is important for things like truncate. I always return the offset and corresponding file position of the first offset that meets the >= criteria. So either I am confused, or I think it works the way you are saying it should.
          3.3 Well, but the code actually reads and Int and Long out of the resulting buffer, so if MessageSet.LogOverhead != 12 there is a bug, so we aren't abstracting anything just adding a layer of obfuscation. But, yes, it should be consistent, so changed it.

          LogSegment
          4. LogSegment
          4.1 I don't want to allocate an object for each call as this method is internal to LogSegment. I will make it private to emphasize that.
          4.2 I agree, though we have had the 2gb limit for a while now so this isn't new. We repurposed KAFKA-556 for this.

          5. ConsumerIterator
          Agreed. Broke this into a separate issue since current state is no worse than 0.7.x. JIRA is KAFKA-546.

          6. ReplicaFetcherThread
          Agreed this was discussed above. JIRA is KAFKA-557.

          7. Only IDEA detects this, which I don't have. So can't help on this.

          8. ByteBufferMessageSet
          8.2 Fixed

          9. OffsetIndex
          9.1 Fixed
          9.2 This is true but I think it would be more convoluted. Simple test and exits make it so you don't have to add another layer of nesting.

          10 Agreed, of the various things on my plate I think this is the most important. Any issues here are resolvable, but we need to first get the data.

          Show
          Jay Kreps added a comment - Hi Neha, here are some comments on your comments and a patch that addresses the comments we are in agreement on. 1. Log 1.2, 1.3 True. This problem exists in both OffsetIndex and Log, though I don't think either are actually possible. In Log this requires one to have 2 billion segment files, though, which is not physically possible; in OffsetIndex one would need to have ~2 billion entries in an index, which isn't possible as the message overhead would fill up the log segment first. I am going to leave it alone in Log since that code I want to delete asap anyway. I fixed it in the OffsetIndex since that code is meant to last. 1.4. This logic is a little odd, I will fix it, but actually this reminds me of a bigger problem. If file.delete() fails on the log file, the presence of that log file will effectively corrupt the log on restart (since we will have a file with the given offset but will also start another log with a parallel offset that we actually append to--on restart the bad file will mask part of the new file). Obviously if file.delete() fails things are pretty fucked and there is nothing we can do in software to recover. So what I would like to do is throw KafkaStorageException and have Partition.makeFollower() shut down the server. What would happen in the leadership transfer if I did that? 1.5 Filed a JIRA for this. LogManager 2.1 Deleted numPartitions (not related to this patch, I don't think) FileMessageSet 3.1 Good catch, fixed. 3.2 Right, so I return the offset specifically to be able to differentiate the case where I found the exact location versus the next message. This is important for things like truncate. I always return the offset and corresponding file position of the first offset that meets the >= criteria. So either I am confused, or I think it works the way you are saying it should. 3.3 Well, but the code actually reads and Int and Long out of the resulting buffer, so if MessageSet.LogOverhead != 12 there is a bug, so we aren't abstracting anything just adding a layer of obfuscation. But, yes, it should be consistent, so changed it. LogSegment 4. LogSegment 4.1 I don't want to allocate an object for each call as this method is internal to LogSegment. I will make it private to emphasize that. 4.2 I agree, though we have had the 2gb limit for a while now so this isn't new. We repurposed KAFKA-556 for this. 5. ConsumerIterator Agreed. Broke this into a separate issue since current state is no worse than 0.7.x. JIRA is KAFKA-546 . 6. ReplicaFetcherThread Agreed this was discussed above. JIRA is KAFKA-557 . 7. Only IDEA detects this, which I don't have. So can't help on this. 8. ByteBufferMessageSet 8.2 Fixed 9. OffsetIndex 9.1 Fixed 9.2 This is true but I think it would be more convoluted. Simple test and exits make it so you don't have to add another layer of nesting. 10 Agreed, of the various things on my plate I think this is the most important. Any issues here are resolvable, but we need to first get the data.
          Jay Kreps made changes -
          Attachment KAFKA-506-neha-post-review.patch [ 12548519 ]
          Hide
          Jay Kreps added a comment -

          This patch is identical to the previous Neha related patch except that now in the event that a log segment can't be deleted we throw KafkaStorageException. In KafkaApis.handleLeaderAndISRRequest we catch this exception and shutdown the server.

          Show
          Jay Kreps added a comment - This patch is identical to the previous Neha related patch except that now in the event that a log segment can't be deleted we throw KafkaStorageException. In KafkaApis.handleLeaderAndISRRequest we catch this exception and shutdown the server.
          Jay Kreps made changes -
          Attachment KAFKA-506-neha-post-review-v2.patch [ 12548670 ]
          Hide
          Neha Narkhede added a comment -

          +1. Looks good and thanks for addressing the late review comments. One minor comment -

          The following error statement is slightly misleading. The broker could either be in the middle of becoming a leader or a follower, not necessarily the former.

          fatal("Disk error while becoming leader.")

          Show
          Neha Narkhede added a comment - +1. Looks good and thanks for addressing the late review comments. One minor comment - The following error statement is slightly misleading. The broker could either be in the middle of becoming a leader or a follower, not necessarily the former. fatal("Disk error while becoming leader.")
          Hide
          Jay Kreps added a comment -

          Ah, nice catch. Changed it to "Disk error during leadership change."

          Checked in with the change.

          Show
          Jay Kreps added a comment - Ah, nice catch. Changed it to "Disk error during leadership change." Checked in with the change.
          Hide
          Sam Meder added a comment -

          I think you missed a change to KafkaETLContext. It needs:

          diff --git a/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java b/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java
          index bca1757..9498169 100644
          — a/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java
          +++ b/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java
          @@ -205,7 +205,7 @@ public class KafkaETLContext {

          key.set(_index, _offset, messageAndOffset.message().checksum());

          • _offset = messageAndOffset.offset(); //increase offset
            + _offset = messageAndOffset.nextOffset(); //increase offset
            _count ++; //increase count

          return true;

          or something similar. As it stands it'll run forever...

          Show
          Sam Meder added a comment - I think you missed a change to KafkaETLContext. It needs: diff --git a/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java b/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java index bca1757..9498169 100644 — a/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java +++ b/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java @@ -205,7 +205,7 @@ public class KafkaETLContext { key.set(_index, _offset, messageAndOffset.message().checksum()); _offset = messageAndOffset.offset(); //increase offset + _offset = messageAndOffset.nextOffset(); //increase offset _count ++; //increase count return true; or something similar. As it stands it'll run forever...
          Hide
          Jay Kreps added a comment -

          Ack, nice catch, fixed it.

          Show
          Jay Kreps added a comment - Ack, nice catch, fixed it.
          Maxime Brugidou made changes -
          Link This issue is related to KAFKA-634 [ KAFKA-634 ]

            People

            • Assignee:
              Jay Kreps
              Reporter:
              Jay Kreps
            • Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development