Kafka
  1. Kafka
  2. KAFKA-739

Handle null values in Message payload

    Details

    • Type: Bug Bug
    • Status: Resolved
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 0.8.1
    • Component/s: None
    • Labels:
      None

      Description

      Add tests for null message payloads in producer, server, and consumer.
      Ensure log cleaner treats these as deletes.
      Test that null keys are rejected on dedupe logs.

      1. KAFKA-739-v1.patch
        53 kB
        Jay Kreps
      2. KAFKA-739-v2.patch
        56 kB
        Jay Kreps
      3. KAFKA-739-v3.patch
        58 kB
        Jay Kreps
      4. KAFKA-739-v4.patch
        61 kB
        Jay Kreps
      5. KAFKA-739-v5.patch
        63 kB
        Jay Kreps

        Issue Links

          Activity

          Hide
          Jay Kreps added a comment -

          This patch is more extensive than I expected because I found a hole in the logic when handling deletes in the log compactor. The changes are as follows:

          1. Handle null properly in Message.scala and miscellaneous other places.
          2. Fix the logic for handling deletes. Previously we guaranteed that we would retain delete records only in the dirty section of the log. This is not sufficient, because a bootstrapping consumer might see a message, but the subsequent delete message might be gc'd before the consumer sees it.
          3. OffsetMap.scala: make the map exact using a probing scheme. This means that the tail of the log is actually now fully deduplicated. The motivation for this is making delete-handling easier since to remove a delete tombstone you need to ensure that there are no prior occurrences of that message. Also added a counter on the number of collisions, just to help with any debugging.
          4. Added a new configuration log.cleaner.delete.retention.ms that controls the length of time for which delete records are retained. This is implicitly a limit on the amount of time the consumer can spend bootstrapping and still get a consistent bootstrap. Once the topic-level config patch goes in, this will be made available at the topic level and can be set with the create topic tool
          5. Added a peek() method to iterator template. Didn't end up using it, but it is a useful feature
          6. Changed the integration test tool to issue deletes and changed the verification to handle delete records properly. Redid testing now with deletes included.
          7. Added a variety of unit tests for null messages

          Show
          Jay Kreps added a comment - This patch is more extensive than I expected because I found a hole in the logic when handling deletes in the log compactor. The changes are as follows: 1. Handle null properly in Message.scala and miscellaneous other places. 2. Fix the logic for handling deletes. Previously we guaranteed that we would retain delete records only in the dirty section of the log. This is not sufficient, because a bootstrapping consumer might see a message, but the subsequent delete message might be gc'd before the consumer sees it. 3. OffsetMap.scala: make the map exact using a probing scheme. This means that the tail of the log is actually now fully deduplicated. The motivation for this is making delete-handling easier since to remove a delete tombstone you need to ensure that there are no prior occurrences of that message. Also added a counter on the number of collisions, just to help with any debugging. 4. Added a new configuration log.cleaner.delete.retention.ms that controls the length of time for which delete records are retained. This is implicitly a limit on the amount of time the consumer can spend bootstrapping and still get a consistent bootstrap. Once the topic-level config patch goes in, this will be made available at the topic level and can be set with the create topic tool 5. Added a peek() method to iterator template. Didn't end up using it, but it is a useful feature 6. Changed the integration test tool to issue deletes and changed the verification to handle delete records properly. Redid testing now with deletes included. 7. Added a variety of unit tests for null messages
          Hide
          Sriram Subramanian added a comment -

          I am not able to apply this patch using git apply. Was this created using git format-patch?

          Show
          Sriram Subramanian added a comment - I am not able to apply this patch using git apply. Was this created using git format-patch?
          Hide
          Jay Kreps added a comment -

          No, actually it is just git diff against the base revision.

          Show
          Jay Kreps added a comment - No, actually it is just git diff against the base revision.
          Hide
          Neha Narkhede added a comment -

          There was a conflict on DefaultEventHandler, but I reviewed the patch.

          1. KafkaConfig
          Should the default for log.cleaner.delete.retention.ms be 24 hours instead of 1 hour ?

          2. LogCleaner
          2.1 Should the check for dedup buffer be
          config.dedupeBufferSize / config.numThreads > Int.MaxValue

          3. DefaultEventHandler (There was a conflict, maybe you already handled this)
          Need to check for null payload in the following trace-
          trace("Successfully sent message: %s".format(Utils.readString(message.message.payload)))))

          4. DumpLogSegments
          Should this be reading message.key instead ?
          print(" key: " + Utils.readString(messageAndOffset.message.payload, "UTF-8"))

          5. SimpleKafkaETLMapper
          Should probably check for null here in getData well -
          ByteBuffer buf = message.payload();

          6. OffsetMap6.1 If I understand correctly from getPosition(), it seems that the probe length will change arbitrarily each time. What is the advantage of doing this VS picking a fixed probe length that is relatively prime to the total number of entries that the hash table can fit in ? The purpose of this property is so that every slot in the hash table can be eventually traversed.
          6.2 Why does attempts increment by 1 and not by 4 ?

          7. TestLogCleaning
          The purpose of dumpLogs config is not clear from the command line option description.

          Show
          Neha Narkhede added a comment - There was a conflict on DefaultEventHandler, but I reviewed the patch. 1. KafkaConfig Should the default for log.cleaner.delete.retention.ms be 24 hours instead of 1 hour ? 2. LogCleaner 2.1 Should the check for dedup buffer be config.dedupeBufferSize / config.numThreads > Int.MaxValue 3. DefaultEventHandler (There was a conflict, maybe you already handled this) Need to check for null payload in the following trace- trace("Successfully sent message: %s".format(Utils.readString(message.message.payload))))) 4. DumpLogSegments Should this be reading message.key instead ? print(" key: " + Utils.readString(messageAndOffset.message.payload, "UTF-8")) 5. SimpleKafkaETLMapper Should probably check for null here in getData well - ByteBuffer buf = message.payload(); 6. OffsetMap6.1 If I understand correctly from getPosition(), it seems that the probe length will change arbitrarily each time. What is the advantage of doing this VS picking a fixed probe length that is relatively prime to the total number of entries that the hash table can fit in ? The purpose of this property is so that every slot in the hash table can be eventually traversed. 6.2 Why does attempts increment by 1 and not by 4 ? 7. TestLogCleaning The purpose of dumpLogs config is not clear from the command line option description.
          Hide
          Jay Kreps added a comment -

          New patch rebased to trunk and addresses Neha's comments:

          1. Changed delete retention to 24 hours
          2. Fixed broken logic in warning statement so it warns when your buffer is too big.
          3. Yes, that was in the patch, just got lost in the conflict?
          4. Dump log segments was printing the value as the key, fixed.
          5. SimpleKafkaETLMapper didn't handle null. This isn't an easy fix since the text format doesn't have an out of range marker to represent null. Returning empty string which is ambiguous but better than crashing.
          6. Linear probing has the problem that it tends to lead to "runs". I.e. if you have a fixed probing step size of N then if you have a collision the probability that the spot M slots over is full is going to be higher. So the ideal probing approach would be a sequence of fully random hashes which were completely uncorrelated with one another. That is the motivation for using the rest of the md5 before degrading to linear probing since we have already computed 16 bytes of random hash. The second question is wether it is legit to increment byte by byte or not since this effectively reuses bytes of the hash. I agree it is a little sketchy, though it does seem to work.
          7. Clarified the purpose of dump logs.

          Show
          Jay Kreps added a comment - New patch rebased to trunk and addresses Neha's comments: 1. Changed delete retention to 24 hours 2. Fixed broken logic in warning statement so it warns when your buffer is too big. 3. Yes, that was in the patch, just got lost in the conflict? 4. Dump log segments was printing the value as the key, fixed. 5. SimpleKafkaETLMapper didn't handle null. This isn't an easy fix since the text format doesn't have an out of range marker to represent null. Returning empty string which is ambiguous but better than crashing. 6. Linear probing has the problem that it tends to lead to "runs". I.e. if you have a fixed probing step size of N then if you have a collision the probability that the spot M slots over is full is going to be higher. So the ideal probing approach would be a sequence of fully random hashes which were completely uncorrelated with one another. That is the motivation for using the rest of the md5 before degrading to linear probing since we have already computed 16 bytes of random hash. The second question is wether it is legit to increment byte by byte or not since this effectively reuses bytes of the hash. I agree it is a little sketchy, though it does seem to work. 7. Clarified the purpose of dump logs.
          Hide
          Jay Kreps added a comment -

          Patch version V3:
          1. Rebased to include the dynamic config change.
          2. Made delete retention a per-topic config

          Show
          Jay Kreps added a comment - Patch version V3: 1. Rebased to include the dynamic config change. 2. Made delete retention a per-topic config
          Hide
          Sriram Subramanian added a comment -

          1. OffsetMap
          a. The way the probe is calculated, we could end up having the same probe multiple times. Starting with attempt = hashSize - 4 to attempt = hashSize the probe would be the same.
          val probe = Utils.readInt(hash, math.min(attempt, hashSize-4)) + math.max(0, attempt - hashSize)

          2. LogCleaner
          a. Cleaner doc comments need to be updated

          Will look at the test changes and provide comments if any.

          Show
          Sriram Subramanian added a comment - 1. OffsetMap a. The way the probe is calculated, we could end up having the same probe multiple times. Starting with attempt = hashSize - 4 to attempt = hashSize the probe would be the same. val probe = Utils.readInt(hash, math.min(attempt, hashSize-4)) + math.max(0, attempt - hashSize) 2. LogCleaner a. Cleaner doc comments need to be updated Will look at the test changes and provide comments if any.
          Hide
          Jay Kreps added a comment -

          Nice catch Sriram, that actually drops the collision rate by 7%.

          Here is a new patch that fixes that bug, fixes the docs, exposes the cleaner buffer load factor as a configuration parameter.

          Show
          Jay Kreps added a comment - Nice catch Sriram, that actually drops the collision rate by 7%. Here is a new patch that fixes that bug, fixes the docs, exposes the cleaner buffer load factor as a configuration parameter.
          Hide
          Neha Narkhede added a comment -

          +1 on patch v4

          Show
          Neha Narkhede added a comment - +1 on patch v4
          Hide
          Jun Rao added a comment -

          Thanks for patch v4. Looks good. Some minor comments:

          40. IteratorTemplate: Not sure that I understand how peek() is different from next(). If both cases, they call hasNext() and therefore move nextItem to the next item, right?

          41. LogCleaner:
          41.1 Could you add some comments in the header to describe how delete retention works?
          41.2 cleanSegments(): val now not used.

          42. Decoder:
          42.1 Could we add a comment in the trait saying that bytes can be null?
          42.2 We need to fix StringDecoder to return null if input is null.

          Show
          Jun Rao added a comment - Thanks for patch v4. Looks good. Some minor comments: 40. IteratorTemplate: Not sure that I understand how peek() is different from next(). If both cases, they call hasNext() and therefore move nextItem to the next item, right? 41. LogCleaner: 41.1 Could you add some comments in the header to describe how delete retention works? 41.2 cleanSegments(): val now not used. 42. Decoder: 42.1 Could we add a comment in the trait saying that bytes can be null? 42.2 We need to fix StringDecoder to return null if input is null.
          Hide
          Jay Kreps added a comment -

          Jun, attached v5 patch to address your comments.
          40. This is actually right, both peek, hasNext, and next will all call makeNext() if there isn't an item ready. But peek and hasNext are idempotent and next() isn't--it advances the iterator. I wrote a unit test that demonstrates this.
          41. Added the comment and removed the stray variable.
          42. Actually the handling of nulls is not done in the serializers, it is done in Kafka. That is no matter what serializer you use, null always deserializes to null. You could argue either way whether this is a good thing. The downside to pushing it isn't the serializer is that all serializers have to remember to handle null. The advantage is that the serializer could yield a different value for null if it wanted. Couldn't think of a use for the later so I went with the simple thing.

          Show
          Jay Kreps added a comment - Jun, attached v5 patch to address your comments. 40. This is actually right, both peek, hasNext, and next will all call makeNext() if there isn't an item ready. But peek and hasNext are idempotent and next() isn't--it advances the iterator. I wrote a unit test that demonstrates this. 41. Added the comment and removed the stray variable. 42. Actually the handling of nulls is not done in the serializers, it is done in Kafka. That is no matter what serializer you use, null always deserializes to null. You could argue either way whether this is a good thing. The downside to pushing it isn't the serializer is that all serializers have to remember to handle null. The advantage is that the serializer could yield a different value for null if it wanted. Couldn't think of a use for the later so I went with the simple thing.
          Hide
          Jun Rao added a comment -

          Thanks for patch v5. +1.

          Show
          Jun Rao added a comment - Thanks for patch v5. +1.

            People

            • Assignee:
              Jay Kreps
              Reporter:
              Jay Kreps
            • Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development