Kafka
  1. Kafka
  2. KAFKA-544

Retain key in producer and expose it in the consumer

    Details

    • Type: Bug Bug
    • Status: Closed
    • Priority: Blocker Blocker
    • Resolution: Fixed
    • Affects Version/s: 0.8.0
    • Fix Version/s: None
    • Component/s: None
    • Labels:

      Description

      KAFKA-506 added support for retaining a key in the messages, however this field is not yet set by the producer.

      The proposal for doing this is to change the producer api to change ProducerData to allow only a single key/value pair so it has a one-to-one mapping to Message. That is change from
      ProducerData(topic: String, key: K, data: Seq[V])
      to
      ProducerData(topic: String, key: K, data: V)

      The key itself needs to be encoded. There are several ways this could be handled. A few of the options:
      1. Change the Encoder and Decoder to be MessageEncoder and MessageDecoder and have them take both a key and value.
      2. Another option is to change the type of the encoder/decoder to not refer to Message so it could be used for both the key and value.

      I favor the second option but am open to feedback.

      One concern with our current approach to serialization as well as both of these proposals is that they are inefficient. We go from Object=>byte[]=>Message=>MessageSet with a copy at each step. In the case of compression there are a bunch of intermediate steps. We could theoretically clean this up by instead having an interface for the encoder that was something like
      Encoder.writeTo(buffer: ByteBuffer, object: AnyRef)
      and
      Decoder.readFrom(buffer:ByteBuffer): AnyRef
      However there are two problems with this. The first is that we don't actually know the size of the data until it is serialized so we can't really allocate the bytebuffer properly and might need to resize it. The second is that in the case of compression there is a whole other path to consider. Originally I thought maybe it would be good to try to fix this, but now I think it should be out-of-scope and we should revisit the efficiency issue in a future release in conjunction with our internal handling of compression.

      1. KAFKA-544-joel-comments.patch
        8 kB
        Jay Kreps
      2. KAFKA-544-joel-comments-v2.patch
        10 kB
        Jay Kreps
      3. KAFKA-544-v1.patch
        122 kB
        Jay Kreps
      4. KAFKA-544-v2.patch
        175 kB
        Jay Kreps
      5. KAFKA-544-v3.patch
        175 kB
        Jay Kreps
      6. KAFKA-544-v4.patch
        177 kB
        Jay Kreps
      7. KAFKA-544-v5.patch
        177 kB
        Jay Kreps
      8. KAFKA-544-v6.patch
        177 kB
        Jay Kreps

        Activity

        Hide
        Jay Kreps added a comment -

        After looking at the code I think there is a fair amount of work here. I recommend we put off the user-facing API change until 0.9. Instead I propose the following intermediate hack for 0.8:
        1. Use the existing ProducerData object to get the key and value. This is slightly unnatural because it allows you to associate a key with many values.
        2. Use option (2) above for the encoders

        So specifically this means that the two interfaces would now be
        trait Encoder[T]

        { def toBytes(t: T) }

        trait Decoder[T]

        { def fromBytes(b: Array[Byte] }

        There would now be two encoders, one for the key and one for the value. The value would still be configured by the property "serializer.class" but we would add a new property "key.serializer.class" which would default to use the same value as the value serializer.

        The plan would be to hold off on any changes to the consumer for now.

        Show
        Jay Kreps added a comment - After looking at the code I think there is a fair amount of work here. I recommend we put off the user-facing API change until 0.9. Instead I propose the following intermediate hack for 0.8: 1. Use the existing ProducerData object to get the key and value. This is slightly unnatural because it allows you to associate a key with many values. 2. Use option (2) above for the encoders So specifically this means that the two interfaces would now be trait Encoder [T] { def toBytes(t: T) } trait Decoder [T] { def fromBytes(b: Array[Byte] } There would now be two encoders, one for the key and one for the value. The value would still be configured by the property "serializer.class" but we would add a new property "key.serializer.class" which would default to use the same value as the value serializer. The plan would be to hold off on any changes to the consumer for now.
        Hide
        Jun Rao added a comment -

        This plan sounds good to me.

        Show
        Jun Rao added a comment - This plan sounds good to me.
        Hide
        Jay Kreps added a comment -

        This patch does the following:
        1. Change Encoder and Decoder to map between object and byte[] rather than between Message and object.
        2. Require two encoders for the producer and two decoders for the consumer. This follows the same pattern as before: in the producer there is now serializer.class and key.serializer.class. By default key.serializer takes the same value as serializer.class. If no key is specified then this parameter is essentially ignored. In the consumer ConsumerConnector now requires two decoders, one for the key and one for the value.
        3. Message is now no longer exposed in the high level APIs.
        4. All tests that used Message with high level apis (i.e. almost all tests) are now converted to use Strings. This is easier to read and debug (since you can print the value) and generally less code.

        Overall I have some concern that we are changing the API without deeply thinking it through, but this does expose the key functionality which is needed.

        This set of changes is wide but shallow--it touches a lot of classes but there is nothing too tricky.

        All unit tests pass, but I haven't yet verified system tests.

        Show
        Jay Kreps added a comment - This patch does the following: 1. Change Encoder and Decoder to map between object and byte[] rather than between Message and object. 2. Require two encoders for the producer and two decoders for the consumer. This follows the same pattern as before: in the producer there is now serializer.class and key.serializer.class. By default key.serializer takes the same value as serializer.class. If no key is specified then this parameter is essentially ignored. In the consumer ConsumerConnector now requires two decoders, one for the key and one for the value. 3. Message is now no longer exposed in the high level APIs. 4. All tests that used Message with high level apis (i.e. almost all tests) are now converted to use Strings. This is easier to read and debug (since you can print the value) and generally less code. Overall I have some concern that we are changing the API without deeply thinking it through, but this does expose the key functionality which is needed. This set of changes is wide but shallow--it touches a lot of classes but there is nothing too tricky. All unit tests pass, but I haven't yet verified system tests.
        Hide
        Jay Kreps added a comment -

        One other minor thing in this patch:
        5. Both the key and the partition are now exposed in MessageAndMetadata

        Show
        Jay Kreps added a comment - One other minor thing in this patch: 5. Both the key and the partition are now exposed in MessageAndMetadata
        Hide
        Jay Kreps added a comment -

        Updated patch. This patch fixes on of the uglier things in the producer API. Previously the producer took a ProducerData object which contained a topic, an optional key and one or more messages.

        The send method in the producer could take one or more of these. This is a little odd since sending many messages attached to a single ProducerData is the same as sending many ProducerData objects with the same key.

        Since we now associate a key per message, I changed this. I added a new class
        case class KeyedMessage(topic: String, key: K, message: V)
        this replaces ProducerData. Note that this class takes only a single key-value pair, but since you can send many at once in a single send() call this does not make the API any less general.

        Show
        Jay Kreps added a comment - Updated patch. This patch fixes on of the uglier things in the producer API. Previously the producer took a ProducerData object which contained a topic, an optional key and one or more messages. The send method in the producer could take one or more of these. This is a little odd since sending many messages attached to a single ProducerData is the same as sending many ProducerData objects with the same key. Since we now associate a key per message, I changed this. I added a new class case class KeyedMessage(topic: String, key: K, message: V) this replaces ProducerData. Note that this class takes only a single key-value pair, but since you can send many at once in a single send() call this does not make the API any less general.
        Hide
        Jay Kreps added a comment -

        Okay one more change--added offset to MessageAndMetadata.

        Show
        Jay Kreps added a comment - Okay one more change--added offset to MessageAndMetadata.
        Hide
        Jay Kreps added a comment -

        Final patch fixes a bug that effected the system tests. This patch is ready for review.

        To summarize, here are the changes listed in one place
        1. Change encoder/decoder to
        def toBytes(t: T)
        def fromBytes(bytes: Array[Byte]): T
        I also took the opportunity to pass properties into the encoders and decoders so encoders and decoders now require a constructor that takes a VerifiableProperties. This allows for things like schema registry url, character encoding, etc.
        2. Rename ProducerData to KeyedMessage and make it only contain a single key-value pair.
        3. Add a new property for the producer, key.serializer.class to complement the already existing serializer.class. The key serializer defaults to the same value as the value serializer since I think that will be common (e.g. both Avro).
        4. ConsumerConnector now requires two decoders, one for the key and one for the value. The type of the resulting stream is now [K,V] rather than just [T].
        5. Exposed partition and offset in MessageAndMetadata class
        6. Changed unit tests to mostly use strings instead of Message or byte[]

        This code is ready for review (por favor).

        We also need to make a call whether we want this in 0.8. It is not a very tricky change, but it does touch a lot of files. Since it is a compatibility change it would be nice to do it in 0.8, but it is awfully late in the game for this...

        Show
        Jay Kreps added a comment - Final patch fixes a bug that effected the system tests. This patch is ready for review. To summarize, here are the changes listed in one place 1. Change encoder/decoder to def toBytes(t: T) def fromBytes(bytes: Array [Byte] ): T I also took the opportunity to pass properties into the encoders and decoders so encoders and decoders now require a constructor that takes a VerifiableProperties. This allows for things like schema registry url, character encoding, etc. 2. Rename ProducerData to KeyedMessage and make it only contain a single key-value pair. 3. Add a new property for the producer, key.serializer.class to complement the already existing serializer.class. The key serializer defaults to the same value as the value serializer since I think that will be common (e.g. both Avro). 4. ConsumerConnector now requires two decoders, one for the key and one for the value. The type of the resulting stream is now [K,V] rather than just [T] . 5. Exposed partition and offset in MessageAndMetadata class 6. Changed unit tests to mostly use strings instead of Message or byte[] This code is ready for review (por favor). We also need to make a call whether we want this in 0.8. It is not a very tricky change, but it does touch a lot of files. Since it is a compatibility change it would be nice to do it in 0.8, but it is awfully late in the game for this...
        Hide
        Jun Rao added a comment -

        Thanks for patch v3. Looks good overall. Some minor comments:

        30. Encoder: It seems that we require the constructor of Encoder and Partitioner to take a VerifiableProperty. It would be good if we can add a comment on that in the trait.

        31. ConsumerConnector: Can we have a version of create,essageStreamsByFilter without the decoders?

        32. ConsumerFetcherManager: no change is needed.

        33. BlockingChannel: logger.debug() should be just debug().

        34. ChecksumMessageFormatter: We probably can't remove it since it may be used in our tests.

        Show
        Jun Rao added a comment - Thanks for patch v3. Looks good overall. Some minor comments: 30. Encoder: It seems that we require the constructor of Encoder and Partitioner to take a VerifiableProperty. It would be good if we can add a comment on that in the trait. 31. ConsumerConnector: Can we have a version of create,essageStreamsByFilter without the decoders? 32. ConsumerFetcherManager: no change is needed. 33. BlockingChannel: logger.debug() should be just debug(). 34. ChecksumMessageFormatter: We probably can't remove it since it may be used in our tests.
        Hide
        Jay Kreps added a comment -

        30. Good point, added.
        31. No, for some reason scala won't allow that since it causes confusion with the other method (probably why we didn't have the equivalent before). Instead I gave default arguments for the decoders, which accomplishes the same thing.
        32. Fixed
        33. Fixed
        34. Added it back.

        New patch contains these changes.

        Show
        Jay Kreps added a comment - 30. Good point, added. 31. No, for some reason scala won't allow that since it causes confusion with the other method (probably why we didn't have the equivalent before). Instead I gave default arguments for the decoders, which accomplishes the same thing. 32. Fixed 33. Fixed 34. Added it back. New patch contains these changes.
        Hide
        Jun Rao added a comment -

        +1 on patch v5. For 30, could you add the same constructor comment for Partitioner too?

        Show
        Jun Rao added a comment - +1 on patch v5. For 30, could you add the same constructor comment for Partitioner too?
        Hide
        Neha Narkhede added a comment -

        Minor comment - The API docs are broken in Producer.scala, Producer.java and ConsumerConnector.java due to the stale param names

        Show
        Neha Narkhede added a comment - Minor comment - The API docs are broken in Producer.scala, Producer.java and ConsumerConnector.java due to the stale param names
        Hide
        Joel Koshy added a comment -

        Overall the patch looks good - I have a few scattered comments (apologies for the late review).

        Decoder.scala: What is the point of KeylessMessageDecoder - shouldn't it simply be called MessageDecoder? Phrased
        differently, why does it matter to the message decoder whether the message has an associated key or not?

        ConsoleConsumer: Does it make sense to provide an option (through formatter args to NewLineMessageFormatter) to also
        print the key?

        Nulls vs. options:
        I'm not sure if we discussed this much on the mailing list as far as coding convention goes, but there are some places
        where I personally prefer options over nulls. E.g., KeyedMessage - using null forces the programmer to use hasKey/check
        for null. If something is missed, it could result in an NPE @ runtime. Options would catch these at compile time.

        One problem is that options are unavailable to Java, but that can be handled by providing a javaapi class that does
        use option.

        ProducerSendThread: can use ArrayBuffer instead of ListBuffer.

        Partitioner.scala
        Why is it required to have a verifiable props in the constructor? E.g., why should DefaultPartitioner have to take a
        props argument?

        Show
        Joel Koshy added a comment - Overall the patch looks good - I have a few scattered comments (apologies for the late review). Decoder.scala: What is the point of KeylessMessageDecoder - shouldn't it simply be called MessageDecoder? Phrased differently, why does it matter to the message decoder whether the message has an associated key or not? ConsoleConsumer: Does it make sense to provide an option (through formatter args to NewLineMessageFormatter) to also print the key? Nulls vs. options: I'm not sure if we discussed this much on the mailing list as far as coding convention goes, but there are some places where I personally prefer options over nulls. E.g., KeyedMessage - using null forces the programmer to use hasKey/check for null. If something is missed, it could result in an NPE @ runtime. Options would catch these at compile time. One problem is that options are unavailable to Java, but that can be handled by providing a javaapi class that does use option. ProducerSendThread: can use ArrayBuffer instead of ListBuffer. Partitioner.scala Why is it required to have a verifiable props in the constructor? E.g., why should DefaultPartitioner have to take a props argument?
        Hide
        Jay Kreps added a comment -

        Hey Joel, those are good points. Here is a follow-up patch that adresses these issues. Specifically:

        • KeylessMessageDecoder makes no sense it was a hold over from an older, lower impact approach to defining decoder that would have let us leave the consumer api more or less unchanged. It is irrelevant now. Deleted it.
        • This does make sense. I had planned to leave this as a follow-up future item since it is essentially a new feature. However it is not hard. I implemented this on both the producer and consumer side.
        • Null vs Options. This is controversial but I am one of the ones who think option is not so good for us. Two reasons: (1) You have to create a new object and if the value is a number, box it, (2) the resulting match statement is a lot less readable than if(x == null) imho. It is true that NullPointerExceptions are a drag but in my experience these are just not that common in our kind of software as a production issue because our testing and load is relatively heavy so these issues are flushed out very quickly.
        • ProducerSendThread: good call, changed.
        • Yeah I thought about this. I basically just thought it was simpler. The complication is what do you do if there is both a no-arg constructor and one that takes options--the user has to know which we will invoke. Seems simpler to just require the argument even if you don't use it.
        Show
        Jay Kreps added a comment - Hey Joel, those are good points. Here is a follow-up patch that adresses these issues. Specifically: KeylessMessageDecoder makes no sense it was a hold over from an older, lower impact approach to defining decoder that would have let us leave the consumer api more or less unchanged. It is irrelevant now. Deleted it. This does make sense. I had planned to leave this as a follow-up future item since it is essentially a new feature. However it is not hard. I implemented this on both the producer and consumer side. Null vs Options. This is controversial but I am one of the ones who think option is not so good for us. Two reasons: (1) You have to create a new object and if the value is a number, box it, (2) the resulting match statement is a lot less readable than if(x == null) imho. It is true that NullPointerExceptions are a drag but in my experience these are just not that common in our kind of software as a production issue because our testing and load is relatively heavy so these issues are flushed out very quickly. ProducerSendThread: good call, changed. Yeah I thought about this. I basically just thought it was simpler. The complication is what do you do if there is both a no-arg constructor and one that takes options--the user has to know which we will invoke. Seems simpler to just require the argument even if you don't use it.
        Hide
        Joel Koshy added a comment -

        Thanks a lot for addressing the above. Changes look good but for a few typos:

        • misspelling (seperat*)
        • NewLineMessageFormatter may as well be called DefaultMessageFormatter given the arbitrary message separator.
        • ConsoleProducer - valueEncoderClass should use valueEncoderOpt (line 93).

        On null vs. options, I agree that it is cumbersome to use and generally try to avoid it, but lately I have begun to think it's not too bad especially for cases like this where it is easy to forget to check for null. E.g., there's if (xOpt.isDefined) which is only slightly more verbose than if (x != null).
        or xOpt.foreach(process(_)) (or map if you need the result) instead of (x != null) process removes the need for the isDefined check/match statement and is not much more verbose.

        All that said, you are right that something like this would be caught early on - so either way works.

        Show
        Joel Koshy added a comment - Thanks a lot for addressing the above. Changes look good but for a few typos: misspelling (seperat*) NewLineMessageFormatter may as well be called DefaultMessageFormatter given the arbitrary message separator. ConsoleProducer - valueEncoderClass should use valueEncoderOpt (line 93). On null vs. options, I agree that it is cumbersome to use and generally try to avoid it, but lately I have begun to think it's not too bad especially for cases like this where it is easy to forget to check for null. E.g., there's if (xOpt.isDefined) which is only slightly more verbose than if (x != null). or xOpt.foreach(process(_)) (or map if you need the result) instead of (x != null) process removes the need for the isDefined check/match statement and is not much more verbose. All that said, you are right that something like this would be caught early on - so either way works.
        Hide
        Jay Kreps added a comment -

        Nice catches, fixed.

        Show
        Jay Kreps added a comment - Nice catches, fixed.
        Hide
        Eli Reisman added a comment -

        I like it!

        Show
        Eli Reisman added a comment - I like it!
        Hide
        Joel Koshy added a comment -

        +1 - although can you fix the typo?

          • if(props.containsKey("key.seperator"))
            keySeparator = props.getProperty("key.separator").getBytes
          • if(props.containsKey("line.seperator"))
            lineSeparator = props.getProperty("line.separator").getBytes
        Show
        Joel Koshy added a comment - +1 - although can you fix the typo? if(props.containsKey("key.seperator")) keySeparator = props.getProperty("key.separator").getBytes if(props.containsKey("line.seperator")) lineSeparator = props.getProperty("line.separator").getBytes

          People

          • Assignee:
            Jay Kreps
            Reporter:
            Jay Kreps
          • Votes:
            0 Vote for this issue
            Watchers:
            5 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development