Details

    • Type: New Feature New Feature
    • Status: Closed
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: 0.7.1
    • Fix Version/s: 0.7.2, 0.8.0
    • Component/s: None
    • Labels:

      Description

      Some applications might want their data to be deleted from the Kafka servers earlier than the default retention time.

      1. kafka-475-0.8-v1.patch
        23 kB
        Swapnil Ghike
      2. kafka-475-v1.patch
        9 kB
        Swapnil Ghike
      3. kafka-475-v2.patch
        25 kB
        Swapnil Ghike
      4. kafka-475-v3.patch
        25 kB
        Swapnil Ghike
      5. kafka-475-v4.patch
        24 kB
        Swapnil Ghike
      6. kafka-475-v5.patch
        25 kB
        Swapnil Ghike

        Activity

        Hide
        Swapnil Ghike added a comment -

        To facilitate this, we can roll out a new log segment whenever a time threshold is reached if the size limit has not been reached already. We can fix this time limit for segment roll out as the same as retention time limit. These values will make sure that the number of open file handles at any point in the system cannot more than double.

        Show
        Swapnil Ghike added a comment - To facilitate this, we can roll out a new log segment whenever a time threshold is reached if the size limit has not been reached already. We can fix this time limit for segment roll out as the same as retention time limit. These values will make sure that the number of open file handles at any point in the system cannot more than double.
        Hide
        Jun Rao added a comment -

        Thanks for patch v1. Some comments:

        1. The condition for testing whether we should roll a new log segment doesn't seem right. Currently, it will roll a new segment if the last segment hasn't been updated for retention time. What we should do is to roll a new segment every retention interval independent of the last update time, as long as (a) no segment has been rolled since the last retention interval; (b) the last segment has a size larger than 0.

        2. We should add a unit test to test rolling a new segment by time.

        Show
        Jun Rao added a comment - Thanks for patch v1. Some comments: 1. The condition for testing whether we should roll a new log segment doesn't seem right. Currently, it will roll a new segment if the last segment hasn't been updated for retention time. What we should do is to roll a new segment every retention interval independent of the last update time, as long as (a) no segment has been rolled since the last retention interval; (b) the last segment has a size larger than 0. 2. We should add a unit test to test rolling a new segment by time.
        Hide
        Neha Narkhede added a comment -

        If you roll log segments based on retention time, seems like you can have only one segment for that log at any point of time. If you want to roll 5 minute segments, it means that you can only have 5 minute worth of data for that partition. On the contrary, if I choose size based rolling and size based retention, I can have multiple log segments each of a specific size. What seems desirable is to have time based rolling + retention also behave the same way. I would imagine applications wanting to roll segments every 1 hour and retain 24 hours worth of data. This is an advantage for applications using getOffsetsBefore() to do some time indexed fetch of the data, since getOffsetsBefore only returns offsets at the log segment granularity. And it also gives applications a way to reason about the time window of the data retained for a partition. One potential downside is that, you can end up creating large number of log segments for your partition, if you choose too small a value for log.file.time.ms. But this problem exists today with size based log segment rolling too. So we are not introducing any regression in behavior.

        Other review comments -

        1. Log
        1.1 Rename currentMS to currentMs (Follow camel case convention).
        1.2 How about renaming retentionMSInterval to retentionIntervalMs to be consistent with naming convention ?
        1.3 In maybeRoll, looks like currentMS is unused apart from being used to compute the time difference. How about removing currentMS ?

        2. LogManager
        2.1 This is unrelated to your patch, but lets also rename logRetentionMSMap to logRetentionMsMap

        Show
        Neha Narkhede added a comment - If you roll log segments based on retention time, seems like you can have only one segment for that log at any point of time. If you want to roll 5 minute segments, it means that you can only have 5 minute worth of data for that partition. On the contrary, if I choose size based rolling and size based retention, I can have multiple log segments each of a specific size. What seems desirable is to have time based rolling + retention also behave the same way. I would imagine applications wanting to roll segments every 1 hour and retain 24 hours worth of data. This is an advantage for applications using getOffsetsBefore() to do some time indexed fetch of the data, since getOffsetsBefore only returns offsets at the log segment granularity. And it also gives applications a way to reason about the time window of the data retained for a partition. One potential downside is that, you can end up creating large number of log segments for your partition, if you choose too small a value for log.file.time.ms. But this problem exists today with size based log segment rolling too. So we are not introducing any regression in behavior. Other review comments - 1. Log 1.1 Rename currentMS to currentMs (Follow camel case convention). 1.2 How about renaming retentionMSInterval to retentionIntervalMs to be consistent with naming convention ? 1.3 In maybeRoll, looks like currentMS is unused apart from being used to compute the time difference. How about removing currentMS ? 2. LogManager 2.1 This is unrelated to your patch, but lets also rename logRetentionMSMap to logRetentionMsMap
        Hide
        Swapnil Ghike added a comment -

        Jun: Thanks for pointing out the mistake. I could not see why (a) in your suggestions is important though. Could you please elaborate if it makes a difference if we did not implement (a)?

        Neha: Please correct me if I failed to see your point. In this proposed scheme, a new segment will be rolled out depending on whichever of the size limit or the time limit is hit first. So, if a producer produces data fast enough, it can still create multiple segments due to the size limit on each segment. I have set the time interval of rolling = retention time interval. In this case, if the segments don't hit the size limit within the retention time (due to aggressive retention time or slow production of data), then what you said will be true and there will be at most two active segments in the log at any point of time. In the first case, the application indeed wanted its data cleaned up fast and in the second case, hopefully the number of segments should not matter.

        Including your other suggestions in the patch.

        Show
        Swapnil Ghike added a comment - Jun: Thanks for pointing out the mistake. I could not see why (a) in your suggestions is important though. Could you please elaborate if it makes a difference if we did not implement (a)? Neha: Please correct me if I failed to see your point. In this proposed scheme, a new segment will be rolled out depending on whichever of the size limit or the time limit is hit first. So, if a producer produces data fast enough, it can still create multiple segments due to the size limit on each segment. I have set the time interval of rolling = retention time interval. In this case, if the segments don't hit the size limit within the retention time (due to aggressive retention time or slow production of data), then what you said will be true and there will be at most two active segments in the log at any point of time. In the first case, the application indeed wanted its data cleaned up fast and in the second case, hopefully the number of segments should not matter. Including your other suggestions in the patch.
        Hide
        Swapnil Ghike added a comment - - edited

        Patch attached:
        1. Time based log segment rollout added. As discussed with Neha, the values of config.logRollHours and config.logRetentionHours are decoupled now.

        2. Moved the position of maybeRoll(segment) call in the Log to make sure that a new message does not get appended to a segment that has expired in time.
        i. Accordingly modified the testCleanupSegmentsToMaintainSizeWithSizeBasedLogRoll

        3. I have currently set the range of logRetentionHours and logRollHours to (1, 24 * 7). An upper cap on the value of hours is necessary because a very high value of hours can overflow and become negative when converted to milliseconds.

        4. Unit tests added in LogTest
        i.testTimeBasedLogRoll
        ii. testSizeBasedLogRoll

        5. Unit tests added in LogManagerTest (sorry couldn't come up with more concise names :\ )
        i. testCleanupSegmentsToMaintainSizeWithTimeBasedLogRoll
        ii. testCleanupExpiredSegmentsWithTimeBasedLogRoll

        Show
        Swapnil Ghike added a comment - - edited Patch attached: 1. Time based log segment rollout added. As discussed with Neha, the values of config.logRollHours and config.logRetentionHours are decoupled now. 2. Moved the position of maybeRoll(segment) call in the Log to make sure that a new message does not get appended to a segment that has expired in time. i. Accordingly modified the testCleanupSegmentsToMaintainSizeWithSizeBasedLogRoll 3. I have currently set the range of logRetentionHours and logRollHours to (1, 24 * 7). An upper cap on the value of hours is necessary because a very high value of hours can overflow and become negative when converted to milliseconds. 4. Unit tests added in LogTest i.testTimeBasedLogRoll ii. testSizeBasedLogRoll 5. Unit tests added in LogManagerTest (sorry couldn't come up with more concise names :\ ) i. testCleanupSegmentsToMaintainSizeWithTimeBasedLogRoll ii. testCleanupExpiredSegmentsWithTimeBasedLogRoll
        Hide
        Swapnil Ghike added a comment -

        Removed an unnecessary assert statement. Please view v3 of patch.

        Show
        Swapnil Ghike added a comment - Removed an unnecessary assert statement. Please view v3 of patch.
        Hide
        Jun Rao added a comment -

        Thanks for patch v3. A few other comments:
        20. KafkaConfig:
        20.1 To be consistent, we probably should add topic level log file size for rolling.
        20.2 We probably don't need to cap logRoll and logRetention hours at 24*7 since we store ms in long, which has 2^^63 millseconds.

        21. LogSegment: Unlike java, we can just have "val startTime" and use it directly. Scala already wraps the val with a public getter.

        22. LogManagerTest: It seems to me that we can test log rolling (covered in LogTest) and log cleanup (covered in LogManager) independently. Is there any value in testing all 4 combination of log rolling and log cleanup?

        Show
        Jun Rao added a comment - Thanks for patch v3. A few other comments: 20. KafkaConfig: 20.1 To be consistent, we probably should add topic level log file size for rolling. 20.2 We probably don't need to cap logRoll and logRetention hours at 24*7 since we store ms in long, which has 2^^63 millseconds. 21. LogSegment: Unlike java, we can just have "val startTime" and use it directly. Scala already wraps the val with a public getter. 22. LogManagerTest: It seems to me that we can test log rolling (covered in LogTest) and log cleanup (covered in LogManager) independently. Is there any value in testing all 4 combination of log rolling and log cleanup?
        Hide
        Swapnil Ghike added a comment -

        1. Similarly, should we also add topic level log retention size?
        2. Ok.
        3. Ok. I am actually changing it to a var because there is one small change to be made to the rolling policy - We don't roll a new log when the previous segment which has expired in time is empty. When a new message is finally appended to this empty expired segment, its timeOfCreation should also be reset to a new value.
        4. I implemented the new tests to make sure that the independent mechanisms of roll and recovery don't interfere with each other. But now that I look at them, they indeed look like a working module of rolling followed by a working model of recovery. We can either remove them, or I can try to combine all modes of roll and recovery in one new test to check for any interference.

        Also, should we have a check for illegal values in getTopic* methods in Utils?

        Show
        Swapnil Ghike added a comment - 1. Similarly, should we also add topic level log retention size? 2. Ok. 3. Ok. I am actually changing it to a var because there is one small change to be made to the rolling policy - We don't roll a new log when the previous segment which has expired in time is empty. When a new message is finally appended to this empty expired segment, its timeOfCreation should also be reset to a new value. 4. I implemented the new tests to make sure that the independent mechanisms of roll and recovery don't interfere with each other. But now that I look at them, they indeed look like a working module of rolling followed by a working model of recovery. We can either remove them, or I can try to combine all modes of roll and recovery in one new test to check for any interference. Also, should we have a check for illegal values in getTopic* methods in Utils?
        Hide
        Jun Rao added a comment -

        1. Yes, adding a topic level log retention size will be useful.

        3. Yes, we can make timeOfCreation and Option. Initially, it will be none. It becomes a non-empty value on next append.

        4. It doesn't seem that rolling logs are interfering with log cleanup. So, removing those tests should be fine.

        Show
        Jun Rao added a comment - 1. Yes, adding a topic level log retention size will be useful. 3. Yes, we can make timeOfCreation and Option. Initially, it will be none. It becomes a non-empty value on next append. 4. It doesn't seem that rolling logs are interfering with log cleanup. So, removing those tests should be fine.
        Hide
        Swapnil Ghike added a comment -

        1. Topic level log roll size and retention size limits added.
        2. Removed the cap on logRoll and logRetention Hours.
        3. Created an Option for the timeOfFirstAppend.
        4. Removed the unnecessary unit tests.

        Created kafka-481 for adding require() to getTopic* methods.

        Show
        Swapnil Ghike added a comment - 1. Topic level log roll size and retention size limits added. 2. Removed the cap on logRoll and logRetention Hours. 3. Created an Option for the timeOfFirstAppend. 4. Removed the unnecessary unit tests. Created kafka-481 for adding require() to getTopic* methods.
        Hide
        Jun Rao added a comment -

        Thanks for patch v4. A couple more comments:

        30. LogSegment.updateFirstAppendTime(): Could we instead add an LogSegment.append(ByteBufferMessageSet) which appends the data and updates the timestamp? Also, if ByteBufferMessageSet.sizeInBytes is less than 0, in addition to not updating the time, we can avoid appending the messageset.

        31. Could you rebase?

        Show
        Jun Rao added a comment - Thanks for patch v4. A couple more comments: 30. LogSegment.updateFirstAppendTime(): Could we instead add an LogSegment.append(ByteBufferMessageSet) which appends the data and updates the timestamp? Also, if ByteBufferMessageSet.sizeInBytes is less than 0, in addition to not updating the time, we can avoid appending the messageset. 31. Could you rebase?
        Hide
        Swapnil Ghike added a comment -

        Made the changes.

        Show
        Swapnil Ghike added a comment - Made the changes.
        Hide
        Jun Rao added a comment -

        Thanks for patch v5. Committed to trunk. Could you port to 0.8 too?

        Show
        Jun Rao added a comment - Thanks for patch v5. Committed to trunk. Could you port to 0.8 too?
        Hide
        Swapnil Ghike added a comment -

        Patch for 0.8 attached. After rebasing, I can apply patch for 481 to 0.8.

        Show
        Swapnil Ghike added a comment - Patch for 0.8 attached. After rebasing, I can apply patch for 481 to 0.8.
        Hide
        Jun Rao added a comment -

        Thanks for the patch. Committed to 0.8.

        Show
        Jun Rao added a comment - Thanks for the patch. Committed to 0.8.

          People

          • Assignee:
            Swapnil Ghike
            Reporter:
            Swapnil Ghike
          • Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

            • Due:
              Created:
              Updated:
              Resolved:

              Time Tracking

              Estimated:
              Original Estimate - 48h
              48h
              Remaining:
              Remaining Estimate - 48h
              48h
              Logged:
              Time Spent - Not Specified
              Not Specified

                Development