Details

    • Type: New Feature New Feature
    • Status: Resolved
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: 0.8.1
    • Fix Version/s: None
    • Component/s: core
    • Labels:
      None

      Description

      Currently Kafka has only one way to bound the space of the log, namely by deleting old segments. The policy that controls which segments are deleted can be configured based either on the number of bytes to retain or the age of the messages. This makes sense for event or log data which has no notion of primary key. However lots of data has a primary key and consists of updates by primary key. For this data it would be nice to be able to ensure that the log contained at least the last version of every key.

      As an example, say that the Kafka topic contains a sequence of User Account messages, each capturing the current state of a given user account. Rather than simply discarding old segments, since the set of user accounts is finite, it might make more sense to delete individual records that have been made obsolete by a more recent update for the same key. This would ensure that the topic contained at least the current state of each record.

      1. KAFKA-631-v9.patch
        178 kB
        Jay Kreps
      2. KAFKA-631-v8.patch
        178 kB
        Jay Kreps
      3. KAFKA-631-v7.patch
        175 kB
        Jay Kreps
      4. KAFKA-631-v6.patch
        173 kB
        Jay Kreps
      5. KAFKA-631-v5.patch
        171 kB
        Jay Kreps
      6. KAFKA-631-v4.patch
        170 kB
        Jay Kreps
      7. KAFKA-631-v3.patch
        170 kB
        Jay Kreps
      8. KAFKA-631-v2.patch
        169 kB
        Jay Kreps
      9. KAFKA-631-v1.patch
        166 kB
        Jay Kreps

        Issue Links

          Activity

          Hide
          Jay Kreps added a comment -

          Here is a specific proposal:

          We will retain the existing settings that retain segments based on bytes and time, with data prior to these limits left unmolested. We will introduce a new setting for each topic "cleanup.policy"=

          {delete, dedupe}

          . cleanup.policy=delete will correspond to the current behavior. cleanup.policy=dedupe will correspond to the new behavior described in this JIRA. As now, data that falls inside the retention window will not be touched, but data that is outside that window will be deduplicated rather than deleted. It is intended that this be a per-topic setting specified at topic creation time. As a short-cut for the purpose of this ticket I will just add a configuration map setting the policy in the way we have for other topic-level settings, these can all be refactored into something set in the create/alter topic command as a follow-up item.

          Topics getting dedupe will be processed by a pool of background "cleaner" threads. These threads will periodically recopy old segment files removing obsolete messages and swapping in these new deduplicated files in place of the old segments. These sparse files should already be well-supported by the logical and sparse offset work in 0.8.

          Here are the specific changes intended:

          • Add a few new configs:
          • topic.cleanup.policy= {delete,dedupe}

            // A map of cleanup policies, defaults to delete

          • cleaner.thread.pool.size=# // The number of background threads to use for cleaning
          • cleaner.buffer.size.bytes=# // The maximum amount of heap memory per cleaner thread that can be used for log deduplication
          • cleaner.max. {read,write}

            .throughput=# // The maximum bytes per second the cleaner can read or write

          • Add a new method Log.replaceSegments() that replaces one or more old segments with a new segment while holding the log lock
          • Implement a background cleaner thread that does the recopying. This thread will be owned and maintained by LogManager
          • Add a new file per data directory called cleaner-metadata that maintains the cleaned section of the logs in that directory that have dedupe enabled. This allows the cleaner to restart cleaning from the same point upon restart.

          The cleaning algorithm for a single log will work as follows:
          1. Scan the head of the log (i.e. all messages since the last cleaning) and create a Map of key => offset for messages in the head of the log. If the cleaner buffer is too small to scan the full head of the log then just scan whatever fits going from oldest to newest.
          2. Sequentially clean segments from oldest to newest.
          3. To clean a segment, first create a new empty copy of the segment file with a temp name. Check each message in the original segment. If it is contained in the map with a higher offset, ignore it; otherwise recopy it to the new temp segment. When the segment is complete swap in the new file and delete the old.

          The threads will iterate over the logs and clean them periodically (not sure the right frequency yet).

          Some Nuances:
          1. The above tends to lead to smaller and smaller segment files in the tail of the log as records are overwritten. To avoid this we will combine files; that is, we will always collect the largest set of files that together are smaller than the max segment size into a single segment. Obviously this will be based on the starting sizes, so the resulting segment will likely still be smaller than the resulting segment.
          2. The recopying procedure depends on the property that logs are immutable. However our logs are only mostly immutable. It is possible to truncate a log to any segment. It is important that the cleaner respect this and not have a race condition with potential truncate operations. But likewise we can't lock for the duration of the cleaning as it may be quite slow. To work around this I will add a generation counter to the log. Each truncate operation will increment this counter. The cleaner will record the generation before it begins cleaning and the swap operation that swaps in the new, cleaned segment will only occur if the generations match (i.e. if no truncates happened in that segment during cleaning). This will potentially result in some wasted cleaner work when truncatations collide with cleanings, but since truncates are rare and truncates deep enough into the log to interact with cleaning very rare this should almost never happen.

          Show
          Jay Kreps added a comment - Here is a specific proposal: We will retain the existing settings that retain segments based on bytes and time, with data prior to these limits left unmolested. We will introduce a new setting for each topic "cleanup.policy"= {delete, dedupe} . cleanup.policy=delete will correspond to the current behavior. cleanup.policy=dedupe will correspond to the new behavior described in this JIRA. As now, data that falls inside the retention window will not be touched, but data that is outside that window will be deduplicated rather than deleted. It is intended that this be a per-topic setting specified at topic creation time. As a short-cut for the purpose of this ticket I will just add a configuration map setting the policy in the way we have for other topic-level settings, these can all be refactored into something set in the create/alter topic command as a follow-up item. Topics getting dedupe will be processed by a pool of background "cleaner" threads. These threads will periodically recopy old segment files removing obsolete messages and swapping in these new deduplicated files in place of the old segments. These sparse files should already be well-supported by the logical and sparse offset work in 0.8. Here are the specific changes intended: Add a few new configs: topic.cleanup.policy= {delete,dedupe} // A map of cleanup policies, defaults to delete cleaner.thread.pool.size=# // The number of background threads to use for cleaning cleaner.buffer.size.bytes=# // The maximum amount of heap memory per cleaner thread that can be used for log deduplication cleaner.max. {read,write} .throughput=# // The maximum bytes per second the cleaner can read or write Add a new method Log.replaceSegments() that replaces one or more old segments with a new segment while holding the log lock Implement a background cleaner thread that does the recopying. This thread will be owned and maintained by LogManager Add a new file per data directory called cleaner-metadata that maintains the cleaned section of the logs in that directory that have dedupe enabled. This allows the cleaner to restart cleaning from the same point upon restart. The cleaning algorithm for a single log will work as follows: 1. Scan the head of the log (i.e. all messages since the last cleaning) and create a Map of key => offset for messages in the head of the log. If the cleaner buffer is too small to scan the full head of the log then just scan whatever fits going from oldest to newest. 2. Sequentially clean segments from oldest to newest. 3. To clean a segment, first create a new empty copy of the segment file with a temp name. Check each message in the original segment. If it is contained in the map with a higher offset, ignore it; otherwise recopy it to the new temp segment. When the segment is complete swap in the new file and delete the old. The threads will iterate over the logs and clean them periodically (not sure the right frequency yet). Some Nuances: 1. The above tends to lead to smaller and smaller segment files in the tail of the log as records are overwritten. To avoid this we will combine files; that is, we will always collect the largest set of files that together are smaller than the max segment size into a single segment. Obviously this will be based on the starting sizes, so the resulting segment will likely still be smaller than the resulting segment. 2. The recopying procedure depends on the property that logs are immutable. However our logs are only mostly immutable. It is possible to truncate a log to any segment. It is important that the cleaner respect this and not have a race condition with potential truncate operations. But likewise we can't lock for the duration of the cleaning as it may be quite slow. To work around this I will add a generation counter to the log. Each truncate operation will increment this counter. The cleaner will record the generation before it begins cleaning and the swap operation that swaps in the new, cleaned segment will only occur if the generations match (i.e. if no truncates happened in that segment during cleaning). This will potentially result in some wasted cleaner work when truncatations collide with cleanings, but since truncates are rare and truncates deep enough into the log to interact with cleaning very rare this should almost never happen.
          Hide
          Prashanth Menon added a comment -

          Hey Jay, is this something you wanted to take on?

          Show
          Prashanth Menon added a comment - Hey Jay, is this something you wanted to take on?
          Hide
          Jay Kreps added a comment -

          Yes, I am actually working on it now (forgot to assign to myself). If you are looking for a cool project I actually have a number of ideas...

          Show
          Jay Kreps added a comment - Yes, I am actually working on it now (forgot to assign to myself). If you are looking for a cool project I actually have a number of ideas...
          Hide
          Prashanth Menon added a comment -

          Sure, pass the ideas along onto the distribution list and I'll see what chunks I can bite off

          Show
          Prashanth Menon added a comment - Sure, pass the ideas along onto the distribution list and I'll see what chunks I can bite off
          Hide
          Jun Rao added a comment -

          Prashanth,

          Will you be interested in KAFKA-330 (delete topic)? I added some comments on how to approach this.

          Show
          Jun Rao added a comment - Prashanth, Will you be interested in KAFKA-330 (delete topic)? I added some comments on how to approach this.
          Hide
          Jay Kreps added a comment -

          This patch implements more or less what was described above.

          Specific Changes:

          • OffsetCheckpoint.scala: Generalize HighwaterMarkCheckpoint to OffsetCheckpoint for use in tracking the cleaner point. In the future we would use this for flush point too, if possible.
          • Move configuration parameters in Log to a single class, LogConfig, to prepare for dynamically changing log configuration (also a nice cleanup)
          • Implement a cleaner process in LogCleaner.scala that cleans logs, this is mostly standalone code. It is complicated but doesn't really touch anything else.
          • Implement an efficient OffsetMap (and associated tests) for log deduplication
          • Add an API in Log.scala that allows swapping in segments. This api is fairly specific to the cleaner for now and is not a public api.
          • Refactor segment delete in Log.scala to allow reuse of the async delete functionality in segment swap
          • Add logic in log recovery (Log.scala) to handle the case of a crash in the middle of cleaning or file swaps.
          • Add a set of unit tests on cleaner logic (CleanerTest.scala), an integration test (LogCleanerIntegrationTest.scala) for the cleaner, and a torture test to run against a standalone server (TestLogCleaning.scala). The torture test produces a bunch of messages to a server over a long period of time and simultaneously logs them out to a text file. Then it uses unix sort to deduce this text file and compares the result to the result of consuming from the topic (if the unique key-set isn't the same for both it throws an error). It also measures the log size reduction.

          New configuration parameters:

          1. should we default to delete or deduce for the cleanup policy?
            log.cleanup.policy = delete/dedupe
          1. per-topic override for cleanup policy
            topic.log.cleanup.policy = topic:delete/dedupe, …
          1. number of background threads to use for log cleaning
            log.cleaner.threads=1
          1. maximum I/O the cleaner is allowed to do (read & write combined)
            log.cleaner.io.max.bytes.per.second=Double.MaxValue
          1. the maximum memory the cleaner can use
            log.cleaner.buffer.size=100MB
          1. the amount of time to sleep when there is no cleaning to do
            log.cleaner.backoff.ms=30secs
          1. minimum ratio of new to old messages the log must have for cleaning to proceed
            log.cleaner.min.cleanable.ratio=0.5

          I also changed the configuration log.cleanup.interval.mins to log.retention.check.interval.ms because the word "cleanup" is confusing.

          New Persistent Data

          This patch adds a new persistent data structure, a per-data directory file 'cleaner-offset-checkpoint'. This is the exact same format and code as the existing 'replication-offset-checkpoint'. The contents of the file is the position in the log up to which the cleaner has cleaned.

          Current State

          This patch is mostly functional with a number of known limitations:
          1. It is a lot of code, so there are likely bugs. I think most bugs would only effect log cleaning.
          2. The cleaner is somewhat inefficient. Current it does about 11MB/sec. I suspect this can be increased to around 70-100MB/sec by implementing batching of writes. I will do this as a follow-up ticket.
          3. I do not properly handle compressed logs. Cleaning will work correctly but all messages are written uncompressed. The reason for this is that logically it is pretty complex to figure out what codec messages should be written with (since there may be a mixture of compression types in the log). Rather then try to handle this now, I think it makes more sense to implement dynamic config and then add a new config for log compression so that each topic has a single compression type that all messages are written with.
          4. It would be nice to seed the hash with a different seed for each run so that collisions would get handled in the next run. This will also be done in a follow-up patch.
          5. It would be nice to integrate the torture test into the nightly integration test framework (since it is a pass/fail test). I will work to do this as a separate item.

          I would like to get this in in the current state and work on making log config dynamic. Without that feature this is not very useful since you have to bounce the server every time you add a new topic to set the cleanup policy. Once that is done we can use it for real features which will likely uncover more issues then further testing now.

          Status of Testing

          • There is reasonable unit test coverage but I will likely add additional tests as real usage uncovers corner cases
          • I can run the torture test for many hours on a few dozen gb of data and get correct results.
          Show
          Jay Kreps added a comment - This patch implements more or less what was described above. Specific Changes: OffsetCheckpoint.scala: Generalize HighwaterMarkCheckpoint to OffsetCheckpoint for use in tracking the cleaner point. In the future we would use this for flush point too, if possible. Move configuration parameters in Log to a single class, LogConfig, to prepare for dynamically changing log configuration (also a nice cleanup) Implement a cleaner process in LogCleaner.scala that cleans logs, this is mostly standalone code. It is complicated but doesn't really touch anything else. Implement an efficient OffsetMap (and associated tests) for log deduplication Add an API in Log.scala that allows swapping in segments. This api is fairly specific to the cleaner for now and is not a public api. Refactor segment delete in Log.scala to allow reuse of the async delete functionality in segment swap Add logic in log recovery (Log.scala) to handle the case of a crash in the middle of cleaning or file swaps. Add a set of unit tests on cleaner logic (CleanerTest.scala), an integration test (LogCleanerIntegrationTest.scala) for the cleaner, and a torture test to run against a standalone server (TestLogCleaning.scala). The torture test produces a bunch of messages to a server over a long period of time and simultaneously logs them out to a text file. Then it uses unix sort to deduce this text file and compares the result to the result of consuming from the topic (if the unique key-set isn't the same for both it throws an error). It also measures the log size reduction. New configuration parameters: should we default to delete or deduce for the cleanup policy? log.cleanup.policy = delete/dedupe per-topic override for cleanup policy topic.log.cleanup.policy = topic:delete/dedupe, … number of background threads to use for log cleaning log.cleaner.threads=1 maximum I/O the cleaner is allowed to do (read & write combined) log.cleaner.io.max.bytes.per.second=Double.MaxValue the maximum memory the cleaner can use log.cleaner.buffer.size=100MB the amount of time to sleep when there is no cleaning to do log.cleaner.backoff.ms=30secs minimum ratio of new to old messages the log must have for cleaning to proceed log.cleaner.min.cleanable.ratio=0.5 I also changed the configuration log.cleanup.interval.mins to log.retention.check.interval.ms because the word "cleanup" is confusing. New Persistent Data This patch adds a new persistent data structure, a per-data directory file 'cleaner-offset-checkpoint'. This is the exact same format and code as the existing 'replication-offset-checkpoint'. The contents of the file is the position in the log up to which the cleaner has cleaned. Current State This patch is mostly functional with a number of known limitations: 1. It is a lot of code, so there are likely bugs. I think most bugs would only effect log cleaning. 2. The cleaner is somewhat inefficient. Current it does about 11MB/sec. I suspect this can be increased to around 70-100MB/sec by implementing batching of writes. I will do this as a follow-up ticket. 3. I do not properly handle compressed logs. Cleaning will work correctly but all messages are written uncompressed. The reason for this is that logically it is pretty complex to figure out what codec messages should be written with (since there may be a mixture of compression types in the log). Rather then try to handle this now, I think it makes more sense to implement dynamic config and then add a new config for log compression so that each topic has a single compression type that all messages are written with. 4. It would be nice to seed the hash with a different seed for each run so that collisions would get handled in the next run. This will also be done in a follow-up patch. 5. It would be nice to integrate the torture test into the nightly integration test framework (since it is a pass/fail test). I will work to do this as a separate item. I would like to get this in in the current state and work on making log config dynamic. Without that feature this is not very useful since you have to bounce the server every time you add a new topic to set the cleanup policy. Once that is done we can use it for real features which will likely uncover more issues then further testing now. Status of Testing There is reasonable unit test coverage but I will likely add additional tests as real usage uncovers corner cases I can run the torture test for many hours on a few dozen gb of data and get correct results.
          Hide
          Jun Rao added a comment -

          Thanks for the patch. Do you know the revision of trunk on which this patch will apply? I can take a look before you rebase.

          Show
          Jun Rao added a comment - Thanks for the patch. Do you know the revision of trunk on which this patch will apply? I can take a look before you rebase.
          Hide
          Jay Kreps added a comment -

          362eba981de40a69ae509a291649531ead6f6aee

          Show
          Jay Kreps added a comment - 362eba981de40a69ae509a291649531ead6f6aee
          Hide
          Jay Kreps added a comment -

          New patch, only minor changes:
          1. Rebased against trunk at 9ee795ac563c3ce4c4f03e022c7f951e065ad1ed
          2. Implemented seeding for the offset map hash so that now a different hash is used on each iteration so collisions between cleaning iterations should be independent.
          3. Implemented batching in the cleaner's writes. This improves the per-thread performance from about 11MB/sec to about 64MB/sec on my laptop.
          4. Add a special log4j log for cleaner messages since they are kind of verbose.

          Show
          Jay Kreps added a comment - New patch, only minor changes: 1. Rebased against trunk at 9ee795ac563c3ce4c4f03e022c7f951e065ad1ed 2. Implemented seeding for the offset map hash so that now a different hash is used on each iteration so collisions between cleaning iterations should be independent. 3. Implemented batching in the cleaner's writes. This improves the per-thread performance from about 11MB/sec to about 64MB/sec on my laptop. 4. Add a special log4j log for cleaner messages since they are kind of verbose.
          Hide
          Jay Kreps added a comment -

          I did some testing on the I/O throttling and verified that this does indeed maintain the expected I/O rate. Two gotchas in this, first you can't look at iostat because the OS will batch up writes and then asynchronously flush them out at a rate greater than what we requested. Second since the limit is on read and write combined a limit of 5MB/sec will lead to the offset map building happening at exactly 5MB/sec but the cleaning will be closer to 2.5MB/sec because cleaning involves first reading in messages then writing them back out so 1MB of cleaning does 2MB of I/O (assuming 100% retention).

          Show
          Jay Kreps added a comment - I did some testing on the I/O throttling and verified that this does indeed maintain the expected I/O rate. Two gotchas in this, first you can't look at iostat because the OS will batch up writes and then asynchronously flush them out at a rate greater than what we requested. Second since the limit is on read and write combined a limit of 5MB/sec will lead to the offset map building happening at exactly 5MB/sec but the cleaning will be closer to 2.5MB/sec because cleaning involves first reading in messages then writing them back out so 1MB of cleaning does 2MB of I/O (assuming 100% retention).
          Hide
          Jay Kreps added a comment -

          Attached patch v3. Two small changes:
          1. Make memory usage more intuitive now that there is a read and write buffer for each cleaner thread. These are both fixed at 1MB per thread and taken out of the total buffer size given to cleaning.
          2. Ensure that each new log segment is flushed before it is swapped into the log. Without this we can swap in a segment that is not on disk at all, delete the old segment, and then lose both in the event of a crash.

          Show
          Jay Kreps added a comment - Attached patch v3. Two small changes: 1. Make memory usage more intuitive now that there is a read and write buffer for each cleaner thread. These are both fixed at 1MB per thread and taken out of the total buffer size given to cleaning. 2. Ensure that each new log segment is flushed before it is swapped into the log. Without this we can swap in a segment that is not on disk at all, delete the old segment, and then lose both in the event of a crash.
          Hide
          Jay Kreps added a comment -

          Did some testing with multithreading, resulting in...

          Patch v4:
          1. Bug: Log selection wasn't eliminating logs already being cleaned which could lead to two simultaneous cleaner threads both cleaning the same log.
          2. Improve logging to always include the thread number.

          Show
          Jay Kreps added a comment - Did some testing with multithreading, resulting in... Patch v4: 1. Bug: Log selection wasn't eliminating logs already being cleaned which could lead to two simultaneous cleaner threads both cleaning the same log. 2. Improve logging to always include the thread number.
          Hide
          Neha Narkhede added a comment -

          Reviewed patch v4 -

          1. CleanerConfig
          1.1 Typos - enableClenaer, dedupeBufferLoadFactor (probably dedupBufferLoadFactor is better?)

          2. VerifiableProperties
          "If the given key is not present" -> "If the given property is not present"

          3. KafkaConfig
          3.1 The comment for explaining log.cleaner.min.cleanable.ratio is confusing
          "/* the minimum ratio of bytes of log eligible for cleaning to bytes to total bytes which a log must
          contain to be eligible for cleaning */"
          3.2 The config "log.retention.check.interval.ms" says the retention check is in milliseconds, but the name of the config is logCleanupIntervalMinutes and we multiple this value by 60K before passing it into LogManager
          3.3 Can we document the different values for log.cleanup.policy in the comment ?

          4. OffsetMap
          4.1 Remove unused import "import java.util.concurrent._"
          4.2 entries should be updated in put() API

          5. Log
          5.1 Rolling new log segment in %s (log = %d/%d, index = %d/%d, age = %d/%d)
          This log statement got a little confusing but sofisticated. The last part of the statement should be index and last but one should be age

          6. LogCleaner
          6.1 In the cleanSegments() API, we pass in SystemTime to the LogSegment. However, we already pass in a Time instance to LogCleaner. In order to test it independently, we can pass in MockTime to LogCleaner but we should pass in the same instance to LogSegment for it to work correctly.
          6.2 In the cleanInto() API, we log a custom message in the IllegalArgumentException. I'm not sure I quite understood that. Aren't the log segments to be cleaned a mix of previously cleaned segments and yet to be cleaned ones ? Why not just use "require" like we did while building the offsetmap ?
          6.3 If the server crashes in replaceSegments() after addSegment() and before asyncDeleteSegment() and let's say 2 log segments (xxxx.log,yyyy.log) were replaced with one new log segment(xxxx.log). Now, when this server restarts, the loadSegments() API will swap in the new xxxx.log.swap as xxxx.log, but it will leave yyyy.log.
          6.4 Do we have a unit test to cover the grouping logic in groupSegmentsBySize() API ? It looks correct to me, but I've been bitten by several scala collection append nuances before.
          6.5 Remove unused import "import java.util.concurrent.locks.ReentrantLock"
          6.6 allCleanerCheckpoints() is only called from within LogCleaner. Can we make this private ?

          7. CleanerConfig
          7.1 Typo in API doc "enableClenaer" and "clenaer"
          7.2 Why 3MB for the minimum buffer space per thread ? Can we keep this configurable as well ?

          8. LogManager
          8.1 Can we rename configs to topicConfigs or topicOverrides ?

          9. LogSegment
          9.1 Fix log4j statement for the .log renaming - "Failed to change the index file suffix"

          10. ReplicaManager
          In checkpointHighWatermarks(), it is better to use fatal("Error writing to highwatermark file: ", e)

          11. MockScheduler
          Even though this is not introduced in this patch, while reading the code, realized that the MockScheduler actually executes tasks before their nextExecution time is reached. This is because we just check if the nextExecutionTime <= now and then call task.fun() without waiting until nextExecution time.

          Show
          Neha Narkhede added a comment - Reviewed patch v4 - 1. CleanerConfig 1.1 Typos - enableClenaer, dedupeBufferLoadFactor (probably dedupBufferLoadFactor is better?) 2. VerifiableProperties "If the given key is not present" -> "If the given property is not present" 3. KafkaConfig 3.1 The comment for explaining log.cleaner.min.cleanable.ratio is confusing "/* the minimum ratio of bytes of log eligible for cleaning to bytes to total bytes which a log must contain to be eligible for cleaning */" 3.2 The config "log.retention.check.interval.ms" says the retention check is in milliseconds, but the name of the config is logCleanupIntervalMinutes and we multiple this value by 60K before passing it into LogManager 3.3 Can we document the different values for log.cleanup.policy in the comment ? 4. OffsetMap 4.1 Remove unused import "import java.util.concurrent._" 4.2 entries should be updated in put() API 5. Log 5.1 Rolling new log segment in %s (log = %d/%d, index = %d/%d, age = %d/%d) This log statement got a little confusing but sofisticated. The last part of the statement should be index and last but one should be age 6. LogCleaner 6.1 In the cleanSegments() API, we pass in SystemTime to the LogSegment. However, we already pass in a Time instance to LogCleaner. In order to test it independently, we can pass in MockTime to LogCleaner but we should pass in the same instance to LogSegment for it to work correctly. 6.2 In the cleanInto() API, we log a custom message in the IllegalArgumentException. I'm not sure I quite understood that. Aren't the log segments to be cleaned a mix of previously cleaned segments and yet to be cleaned ones ? Why not just use "require" like we did while building the offsetmap ? 6.3 If the server crashes in replaceSegments() after addSegment() and before asyncDeleteSegment() and let's say 2 log segments (xxxx.log,yyyy.log) were replaced with one new log segment(xxxx.log). Now, when this server restarts, the loadSegments() API will swap in the new xxxx.log.swap as xxxx.log, but it will leave yyyy.log. 6.4 Do we have a unit test to cover the grouping logic in groupSegmentsBySize() API ? It looks correct to me, but I've been bitten by several scala collection append nuances before. 6.5 Remove unused import "import java.util.concurrent.locks.ReentrantLock" 6.6 allCleanerCheckpoints() is only called from within LogCleaner. Can we make this private ? 7. CleanerConfig 7.1 Typo in API doc "enableClenaer" and "clenaer" 7.2 Why 3MB for the minimum buffer space per thread ? Can we keep this configurable as well ? 8. LogManager 8.1 Can we rename configs to topicConfigs or topicOverrides ? 9. LogSegment 9.1 Fix log4j statement for the .log renaming - "Failed to change the index file suffix" 10. ReplicaManager In checkpointHighWatermarks(), it is better to use fatal("Error writing to highwatermark file: ", e) 11. MockScheduler Even though this is not introduced in this patch, while reading the code, realized that the MockScheduler actually executes tasks before their nextExecution time is reached. This is because we just check if the nextExecutionTime <= now and then call task.fun() without waiting until nextExecution time.
          Hide
          Jay Kreps added a comment -

          Great review. Attached patch v5 that addresses most of these issues:

          1.1 Fixed "enableClenaer", dedupe is actually a word and is spelled dedupe, though, I think…
          2. Changed
          3.1 This is hard to explain, but changed it to "the minimum ratio of dirty log to total log for a log to eligible for cleaning"
          3.2 Changed to ms.
          3.3 Done
          4.1. Done
          4.2 Ah, nice catch. Fixed. Added test for it.
          5.1 "Confusing but sophisticated" is my middle name. Basically I didn't like the code duplication and it seemed nice to see all the criteria whenever we roll. Fixed the ordering.
          6.1 Fixed
          6.2 I think you are saying we could change this to a require() call, right? Made that change.
          6.3 Argh, you're right. I didn't think of that problem. It isn't easily fixable. Let's continue the review and I will think of a fix for this as a follow-up item. It isn't a critical problem because effectively you just duplicate a segment of the log needlessly (but with very low probability). The old segment will mask that portion of the new segment, but I don't think there is any other bad thing that happens.
          6.4 CleanerTest.testSegmentGrouping() is a beast
          6.5 Done
          6.6 It can but it seems reasonable to ask for the last known cleaner point?
          7.1 Fixed
          7.2 3MB should be enough for anyone. No the real reason is because I require you to have a 1MB read buffer, a 1MB write buffer which I cleverly subtract from the cleaner total buffer size. I don't think we need to make these configurable since 1MB is a good size (bigger won't help, and smaller will hurt). So you must have at least 2MB, but if you are trying to set a dedupe buffer that is less than 1MB well that is crazy. Maybe this is just me being a little too accurate about memory accounting and a better approach would just be to not count the I/O buffers at all. In that case the question is what is the minimum we should set for the cleaner buffer?
          8.1 We can't do topicOverrides since these are full log configs not overrides. I suppose topicConfigs is better in case there was a question of what the String in the map was. Changed that.
          9.1 Fixed.
          10.1 Fixed
          11. Not sure I follow. If you update the time manually and then call tick() we basically do "catch up" executing the tasks in order of next execution and cycling their period until we are caught up. The key point is that the user is the one who advances the time not the scheduler. That is the user says "it is now 12:15" and we execute our backlog of tasks. Perhaps you are saying that it should work the other way where the scheduler advances the clock rather than vice versa?

          Show
          Jay Kreps added a comment - Great review. Attached patch v5 that addresses most of these issues: 1.1 Fixed "enableClenaer", dedupe is actually a word and is spelled dedupe, though, I think… 2. Changed 3.1 This is hard to explain, but changed it to "the minimum ratio of dirty log to total log for a log to eligible for cleaning" 3.2 Changed to ms. 3.3 Done 4.1. Done 4.2 Ah, nice catch. Fixed. Added test for it. 5.1 "Confusing but sophisticated" is my middle name. Basically I didn't like the code duplication and it seemed nice to see all the criteria whenever we roll. Fixed the ordering. 6.1 Fixed 6.2 I think you are saying we could change this to a require() call, right? Made that change. 6.3 Argh, you're right. I didn't think of that problem. It isn't easily fixable. Let's continue the review and I will think of a fix for this as a follow-up item. It isn't a critical problem because effectively you just duplicate a segment of the log needlessly (but with very low probability). The old segment will mask that portion of the new segment, but I don't think there is any other bad thing that happens. 6.4 CleanerTest.testSegmentGrouping() is a beast 6.5 Done 6.6 It can but it seems reasonable to ask for the last known cleaner point? 7.1 Fixed 7.2 3MB should be enough for anyone. No the real reason is because I require you to have a 1MB read buffer, a 1MB write buffer which I cleverly subtract from the cleaner total buffer size. I don't think we need to make these configurable since 1MB is a good size (bigger won't help, and smaller will hurt). So you must have at least 2MB, but if you are trying to set a dedupe buffer that is less than 1MB well that is crazy. Maybe this is just me being a little too accurate about memory accounting and a better approach would just be to not count the I/O buffers at all. In that case the question is what is the minimum we should set for the cleaner buffer? 8.1 We can't do topicOverrides since these are full log configs not overrides. I suppose topicConfigs is better in case there was a question of what the String in the map was. Changed that. 9.1 Fixed. 10.1 Fixed 11. Not sure I follow. If you update the time manually and then call tick() we basically do "catch up" executing the tasks in order of next execution and cycling their period until we are caught up. The key point is that the user is the one who advances the time not the scheduler. That is the user says "it is now 12:15" and we execute our backlog of tasks. Perhaps you are saying that it should work the other way where the scheduler advances the clock rather than vice versa?
          Hide
          Sriram Subramanian added a comment -

          Good stuff.

          My feedbacks below -

          1. Throttling

          The current scheme of throttling will work only if there is one physical disk that kafka uses which I guess is not going to be the case. For multiple disks, the single throttler is not going to prevent some disks from getting saturated. A more accurate but complex solution is to do the following -

          • Query the number of physical disks on the machine on startup. Divide the total bytes / sec allowed for the cleaner by this number (This is the value per disk).
          • Create a throttler per disk.
          • Have a background thread that refreshes the log directory mapping to the physical disk (this is in cases when the log directory gets moved to a different disk)
          • Use the appropriate throttler based on the mapping above
            This would be one of the ways you can control any single disk from getting saturated.

          2. Grouping segments by size

          The way the current grouping of segments is done based on size I think will not solve the problem of preventing very small segments. We decide the grouping even before deduplicating. I would assume somebody would choose dedup based GC only if they have lots of updates instead of new records. In such a scenario, all the old segments will eventually tend to 0 after dedup. If you were to calculate the total number of segments based on a max size before dedup, you could end up having very few records in that new segment. A more deterministic way to ensure each segment has a min size is to check the size as you append to the segment. If it has crossed the maxsize and is at the end of a segment boundary, do the swap with the segments read.

          3. Determination of end offset

          The code currently decides the end offset based on the map capacity. Consider the example you had quoted in the wiki about user updates. Very few active users would generate new events regularly and the majority would have unique records in a given time period. If many of the records within the dirty set gets duplicated you would not be making optimum use of your memory (map would end up being partially filled). I dont have a good answer for this but something to note.One option would be to keep reading the dirty set till you hit the map capacity or the beginning of the active segment.

          4. Script to find actual deduplication

          As part of your tests do you plan to measure the expected dedup level Vs the actual gain? As long as the gain is close to the expected value it is fine but we do not want it to be way off.

          5. Integration tests

          Should the integration tests use more than one cleaner thread to catch any corner cases? I could have missed it but I did not find any test that does a sanity check of multiple cleaner threads functioning correctly.

          6. Salt generation

          Should the salt be randomly generated instead of being monotonically increasing. Empirically I have found it to perform better in terms of providing more uniform distribution given a key namespace.

          Show
          Sriram Subramanian added a comment - Good stuff. My feedbacks below - 1. Throttling The current scheme of throttling will work only if there is one physical disk that kafka uses which I guess is not going to be the case. For multiple disks, the single throttler is not going to prevent some disks from getting saturated. A more accurate but complex solution is to do the following - Query the number of physical disks on the machine on startup. Divide the total bytes / sec allowed for the cleaner by this number (This is the value per disk). Create a throttler per disk. Have a background thread that refreshes the log directory mapping to the physical disk (this is in cases when the log directory gets moved to a different disk) Use the appropriate throttler based on the mapping above This would be one of the ways you can control any single disk from getting saturated. 2. Grouping segments by size The way the current grouping of segments is done based on size I think will not solve the problem of preventing very small segments. We decide the grouping even before deduplicating. I would assume somebody would choose dedup based GC only if they have lots of updates instead of new records. In such a scenario, all the old segments will eventually tend to 0 after dedup. If you were to calculate the total number of segments based on a max size before dedup, you could end up having very few records in that new segment. A more deterministic way to ensure each segment has a min size is to check the size as you append to the segment. If it has crossed the maxsize and is at the end of a segment boundary, do the swap with the segments read. 3. Determination of end offset The code currently decides the end offset based on the map capacity. Consider the example you had quoted in the wiki about user updates. Very few active users would generate new events regularly and the majority would have unique records in a given time period. If many of the records within the dirty set gets duplicated you would not be making optimum use of your memory (map would end up being partially filled). I dont have a good answer for this but something to note.One option would be to keep reading the dirty set till you hit the map capacity or the beginning of the active segment. 4. Script to find actual deduplication As part of your tests do you plan to measure the expected dedup level Vs the actual gain? As long as the gain is close to the expected value it is fine but we do not want it to be way off. 5. Integration tests Should the integration tests use more than one cleaner thread to catch any corner cases? I could have missed it but I did not find any test that does a sanity check of multiple cleaner threads functioning correctly. 6. Salt generation Should the salt be randomly generated instead of being monotonically increasing. Empirically I have found it to perform better in terms of providing more uniform distribution given a key namespace.
          Hide
          Jay Kreps added a comment -

          Sriram--These are great suggestions. For the most part I am taking these to be "future work" because I don't think they block a "minimally viable product" which is what I hope to get in first. My intuition is to avoid doing anything hard or complicated until we have real operational experience with this functionality because otherwise we end up building a ton of fancy stuff that solves the wrong problems. Patches would be gladly accepted, though.

          1. This is a good suggestion. There is an additional assumption which is combining read and write I/O. Read I/O may be coming out of pagecache (shared) or from disks (not shared). Likewise it isn't really the number of disks per se since a RAID setup would effectively pool the I/O of all the disks (making the global throttler correct). We support multiple data directories with the recommendation that each data directory be a disk. We also know the mapping of log->data_directory. If we relied on this assumption we could do the throttling per data directory without too much difficulty. Of course that creates another additional scheduling problem which is that we should ideally choose a cleaning schedule that balances load over data directories. In any case, I think the global throttle, while not as precise as it could be, is pretty good. So I am going to add this to the "future work" page.

          2. Yes. In fact the current code can generate segments with size 0. This is okay though. There is nothing too bad about having a few small files. We just can't accumulate an unbounded number of small files that never disappear (some combining must occur). Small files will get cleaned up in the next run. So I knowingly chose this heuristic rather than doing dynamic grouping because it made the code easier and simpler to test (i.e. I can test grouping separate from cleaning).

          3. Since you have to size your heap statically in the case of a single thread shrinking the map size doesn't help anyone. Having a very sparse map just makes duplicates unlikely. However in the case where you had two threads it would be possible to schedule cleanings in such a way that you allocated small buffers for small logs and big buffers for big logs instead of medium buffers for both. Since these threads progress independently, though, it would be a bit complicated. Probably the small log would finish soon, so you would have to keep finding more small logs for the duration of the cleaning of the large log. And when the large cleaning did happen, you would probably have a small cleaning in progress so you would have to start another cleaning with the same large buffer size if you wanted memory to remain fixed. However one thing this brings up is that if your logs are non-uniform having non-uniform buffers (even if they are statically sized) could make it so you were able to efficiently clean large logs with less memory provided your scheduling was sophisticated enough. There are a number of gotchas here though.

          4. I created a cleaner log and after each cleaning I log the full cleaner stats (time, mb/sec, size reduction, etc).

          5. There are three tests in the patch. A simple non-threaded method-by-method unit test. A junit integration test of the full cleaner running as a background thread with concurrent appends. Finally a stand-alone torture test that runs against an arbitrary broker by producing to N topics and recording all its produced messages, then consuming from the broker to a file, then sorting and deduplicating both files by brute force and comparing them exactly. This later test is very comprehensive and runs over many hours and can test any broker configuration. I ran it with multiple threads to validate that case (and found some bugs, that i fixed). I think a third thing that could be done (but which I haven't done) is to build a stand-alone log duplication checker that consumes a topic/partition and estimates the duplication of keys using a bloom filter or something like that. I haven't done the later.

          5. Intuitively this should not be true. By definition "independent" means that sequential salt should perform as well as well as any other salt or else that would be an attack on md5, no?

          Show
          Jay Kreps added a comment - Sriram--These are great suggestions. For the most part I am taking these to be "future work" because I don't think they block a "minimally viable product" which is what I hope to get in first. My intuition is to avoid doing anything hard or complicated until we have real operational experience with this functionality because otherwise we end up building a ton of fancy stuff that solves the wrong problems. Patches would be gladly accepted, though. 1. This is a good suggestion. There is an additional assumption which is combining read and write I/O. Read I/O may be coming out of pagecache (shared) or from disks (not shared). Likewise it isn't really the number of disks per se since a RAID setup would effectively pool the I/O of all the disks (making the global throttler correct). We support multiple data directories with the recommendation that each data directory be a disk. We also know the mapping of log->data_directory. If we relied on this assumption we could do the throttling per data directory without too much difficulty. Of course that creates another additional scheduling problem which is that we should ideally choose a cleaning schedule that balances load over data directories. In any case, I think the global throttle, while not as precise as it could be, is pretty good. So I am going to add this to the "future work" page. 2. Yes. In fact the current code can generate segments with size 0. This is okay though. There is nothing too bad about having a few small files. We just can't accumulate an unbounded number of small files that never disappear (some combining must occur). Small files will get cleaned up in the next run. So I knowingly chose this heuristic rather than doing dynamic grouping because it made the code easier and simpler to test (i.e. I can test grouping separate from cleaning). 3. Since you have to size your heap statically in the case of a single thread shrinking the map size doesn't help anyone. Having a very sparse map just makes duplicates unlikely. However in the case where you had two threads it would be possible to schedule cleanings in such a way that you allocated small buffers for small logs and big buffers for big logs instead of medium buffers for both. Since these threads progress independently, though, it would be a bit complicated. Probably the small log would finish soon, so you would have to keep finding more small logs for the duration of the cleaning of the large log. And when the large cleaning did happen, you would probably have a small cleaning in progress so you would have to start another cleaning with the same large buffer size if you wanted memory to remain fixed. However one thing this brings up is that if your logs are non-uniform having non-uniform buffers (even if they are statically sized) could make it so you were able to efficiently clean large logs with less memory provided your scheduling was sophisticated enough. There are a number of gotchas here though. 4. I created a cleaner log and after each cleaning I log the full cleaner stats (time, mb/sec, size reduction, etc). 5. There are three tests in the patch. A simple non-threaded method-by-method unit test. A junit integration test of the full cleaner running as a background thread with concurrent appends. Finally a stand-alone torture test that runs against an arbitrary broker by producing to N topics and recording all its produced messages, then consuming from the broker to a file, then sorting and deduplicating both files by brute force and comparing them exactly. This later test is very comprehensive and runs over many hours and can test any broker configuration. I ran it with multiple threads to validate that case (and found some bugs, that i fixed). I think a third thing that could be done (but which I haven't done) is to build a stand-alone log duplication checker that consumes a topic/partition and estimates the duplication of keys using a bloom filter or something like that. I haven't done the later. 5. Intuitively this should not be true. By definition "independent" means that sequential salt should perform as well as well as any other salt or else that would be an attack on md5, no?
          Hide
          Jay Kreps added a comment -

          Updated patch v6:

          • Fixed a bug: messages larger than the I/O buffer would lead to an infinite loop. Now we raise the I/O buffer size when needed.
          • Made I/O buffer configurable instead of hardcoding to 1MB. I also no longer subtract this from overall buffer size. The two configurations are now:
            log.cleaner.dedupe.buffer.size
            log.cleaner.io.buffer.size
            Both give the size over all threads so the per-thread size is divided by log.cleaner.threads.

          Neha, I think this addresses all your concerns.

          Show
          Jay Kreps added a comment - Updated patch v6: Fixed a bug: messages larger than the I/O buffer would lead to an infinite loop. Now we raise the I/O buffer size when needed. Made I/O buffer configurable instead of hardcoding to 1MB. I also no longer subtract this from overall buffer size. The two configurations are now: log.cleaner.dedupe.buffer.size log.cleaner.io.buffer.size Both give the size over all threads so the per-thread size is divided by log.cleaner.threads. Neha, I think this addresses all your concerns.
          Hide
          Jay Kreps added a comment -

          Patch v7, rebased.
          Minor changes:

          • Rebased
          • Removed some printlns that slipped in in the last patch
          • Improved logging for buffer size growing
          • Changed Logging.msgIdent to only do string concatenation if there is a message set. Not really related to this patch but seems like a good thing to do.
          Show
          Jay Kreps added a comment - Patch v7, rebased. Minor changes: Rebased Removed some printlns that slipped in in the last patch Improved logging for buffer size growing Changed Logging.msgIdent to only do string concatenation if there is a message set. Not really related to this patch but seems like a good thing to do.
          Hide
          Jun Rao added a comment -

          Thanks for patch v7. Looks good overall. Some comments:

          70. LogCleaner:
          70.1 buildOffsetMap(): need to consider grow readBuffer to accomodate for maxMessageSize.
          70.2 celanInto(): Can the payload ever be null?
          val retainRecord = lastOffset < 0 || (entry.offset >= lastOffset && entry.message.payload != null)
          70.3 CleanerThread.run(): Should we catch all Throwables, instead of Exceptions?

          71. Log:
          71.1 loadSegments(): The following comment is no longer true since it can happen to a segment with SwapFileSuffix.
          if(!hasIndex) {
          // this can only happen if someone manually deletes the index file
          71.2 maybeRoll(): move .format in debug to a separate line.
          71.3 truncateFullyAndStartAt(): This one behaves in the same way as truncateTo and is called directly from ReplicaFetcherThread. So need to increment truncates here too.

          72. KafkaConfig: Why do we have log.cleaner.enable? Shouldn't log cleaner be automatically enabled if logCleanupPolicy is dedup?

          Show
          Jun Rao added a comment - Thanks for patch v7. Looks good overall. Some comments: 70. LogCleaner: 70.1 buildOffsetMap(): need to consider grow readBuffer to accomodate for maxMessageSize. 70.2 celanInto(): Can the payload ever be null? val retainRecord = lastOffset < 0 || (entry.offset >= lastOffset && entry.message.payload != null) 70.3 CleanerThread.run(): Should we catch all Throwables, instead of Exceptions? 71. Log: 71.1 loadSegments(): The following comment is no longer true since it can happen to a segment with SwapFileSuffix. if(!hasIndex) { // this can only happen if someone manually deletes the index file 71.2 maybeRoll(): move .format in debug to a separate line. 71.3 truncateFullyAndStartAt(): This one behaves in the same way as truncateTo and is called directly from ReplicaFetcherThread. So need to increment truncates here too. 72. KafkaConfig: Why do we have log.cleaner.enable? Shouldn't log cleaner be automatically enabled if logCleanupPolicy is dedup?
          Hide
          Jun Rao added a comment -

          A few more comments:

          73. LogOffsetTest.createBrokerConfig(): We should set log.retention.check.interval.ms to 5 mins, instead of 5ms.

          74. CleanerConfig, LogConfig, TestLogCleaning: missing Apache header.

          75. TestLogCleaning: Could you write in the comment how this test works?

          76. In SkimpyOffsetMap, we use only the first 4 bytes (out of 16 byts of MD5) to calculate the array position of the hash. Would it be better to use all of the 16 bytes?

          Show
          Jun Rao added a comment - A few more comments: 73. LogOffsetTest.createBrokerConfig(): We should set log.retention.check.interval.ms to 5 mins, instead of 5ms. 74. CleanerConfig, LogConfig, TestLogCleaning: missing Apache header. 75. TestLogCleaning: Could you write in the comment how this test works? 76. In SkimpyOffsetMap, we use only the first 4 bytes (out of 16 byts of MD5) to calculate the array position of the hash. Would it be better to use all of the 16 bytes?
          Hide
          Jay Kreps added a comment -

          Patch v8 includes Jun's comments. Specifically:
          Cleaner:
          70.1 Nice catch. Buffers now grow in offset map building. Also changed both offset map building and cleaning to keep the same buffer size for the duration of the segment to avoid growing and shrinking too frequently.
          70.2 The message payload can be null and this is used to indicate a delete (note that null messages do go into the offset map but never survive a cleaning). Currently though there is no way to set the payload to null and a number of bugs around null payloads. I will be opening a ticket to solve those.
          70.3 Usually catching Throwable is a mistake, I think. I.e. if we are out of memory, the thread should die.
          Log:
          71.1 Removed the comment about rebuilding indexes.
          71.2 Improved formatting for log statement in maybeRoll()
          71.3 Nice catch. Incrementing truncates count in truncateFullyAndStartAt()
          KafkaConfig
          72. It would be easily to implement something where the log cleaner starts only if we have a log with dedupe. However it is a little trickier with topics that are dynamically added or for which the config is changed dynamically. I would like to leave it simple/stupid for now and when we have the config change stuff ironed out make the cleaner dynamically start when the first log becomes dedupe-enabled.
          LogOffsetTest:
          73. Changed log.retention.check.interval.ms to 5*60*1000LogOffsetTest.createBrokerConfig()
          74. Added apache header to CleanerConfig, LogConfig, TestLogCleaning.
          75. Added a comment for TestLogCleaning.
          76. The bytes of a cryptographic hash are supposed to be uniformly distributed, so just using the first 4 bytes should be fine (I have previously tested this and it works well empirically too).

          Show
          Jay Kreps added a comment - Patch v8 includes Jun's comments. Specifically: Cleaner: 70.1 Nice catch. Buffers now grow in offset map building. Also changed both offset map building and cleaning to keep the same buffer size for the duration of the segment to avoid growing and shrinking too frequently. 70.2 The message payload can be null and this is used to indicate a delete (note that null messages do go into the offset map but never survive a cleaning). Currently though there is no way to set the payload to null and a number of bugs around null payloads. I will be opening a ticket to solve those. 70.3 Usually catching Throwable is a mistake, I think. I.e. if we are out of memory, the thread should die. Log: 71.1 Removed the comment about rebuilding indexes. 71.2 Improved formatting for log statement in maybeRoll() 71.3 Nice catch. Incrementing truncates count in truncateFullyAndStartAt() KafkaConfig 72. It would be easily to implement something where the log cleaner starts only if we have a log with dedupe. However it is a little trickier with topics that are dynamically added or for which the config is changed dynamically. I would like to leave it simple/stupid for now and when we have the config change stuff ironed out make the cleaner dynamically start when the first log becomes dedupe-enabled. LogOffsetTest: 73. Changed log.retention.check.interval.ms to 5*60*1000LogOffsetTest.createBrokerConfig() 74. Added apache header to CleanerConfig, LogConfig, TestLogCleaning. 75. Added a comment for TestLogCleaning. 76. The bytes of a cryptographic hash are supposed to be uniformly distributed, so just using the first 4 bytes should be fine (I have previously tested this and it works well empirically too).
          Hide
          Jay Kreps added a comment -

          Oops, dropped one change I mentioned in the v8 patch. V9 only restores the read and write buffers at the end of the segment to avoid churning on memory allocation (one liner).

          Show
          Jay Kreps added a comment - Oops, dropped one change I mentioned in the v8 patch. V9 only restores the read and write buffers at the end of the segment to avoid churning on memory allocation (one liner).
          Hide
          Neha Narkhede added a comment -

          +1 on v9. Some minor changes before you check it in -

          13. KafkaConfig
          Typo - accross -> across
          14. LogCleaner
          Typo: ellapsed -> elapsed
          15. We talked about this offline, but regarding review comment 6.3, I personally like the renaming the .swap file to contain the names of the files it has cleaned, but there might be nuances. e.g. there is a OS limit to the length of a file name. Would you mind filing another bug to track that change ?

          Show
          Neha Narkhede added a comment - +1 on v9. Some minor changes before you check it in - 13. KafkaConfig Typo - accross -> across 14. LogCleaner Typo: ellapsed -> elapsed 15. We talked about this offline, but regarding review comment 6.3, I personally like the renaming the .swap file to contain the names of the files it has cleaned, but there might be nuances. e.g. there is a OS limit to the length of a file name. Would you mind filing another bug to track that change ?
          Hide
          Jay Kreps added a comment -

          Cool, checked in with those fixes. Filed KAFKA-739, KAFKA-740, KAFKA-741. Jun if you have any follow-up comments I will do those as a second checkin.

          Show
          Jay Kreps added a comment - Cool, checked in with those fixes. Filed KAFKA-739 , KAFKA-740 , KAFKA-741 . Jun if you have any follow-up comments I will do those as a second checkin.

            People

            • Assignee:
              Jay Kreps
              Reporter:
              Jay Kreps
            • Votes:
              0 Vote for this issue
              Watchers:
              7 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development