Kafka
  1. Kafka
  2. KAFKA-3

Consumer needs a pluggable decoder

    Details

    • Type: Improvement Improvement
    • Status: Resolved
    • Priority: Major Major
    • Resolution: Unresolved
    • Affects Version/s: None
    • Fix Version/s: 0.7
    • Component/s: None
    • Labels:
      None

      Description

      Kafka producer allows a user to plug in an encoder (from type T to Message). We need to do the same thing on the consumer side, by allowing a user to plug in a decoder (from Message to type T).

      1. KAFKA-3_v1.patch
        46 kB
        Joel Koshy
      2. KAFKA-3_v2.patch
        25 kB
        Joel Koshy
      3. KAFKA-3_v3.patch
        26 kB
        Joel Koshy

        Activity

        Hide
        Joel Koshy added a comment -

        Here is a first stab at plugging in a decoder for the consumer API. The
        html docs/quickstart also need to be updated after this is finalized, so
        this is more of a checkpoint review request.

        The consumer API becomes a bit clunky with the parameterized type so if
        anyone has ideas on making this simpler/cleaner please comment.

        There is an example of the API in action in the new test under
        ZookeeperConsumerConnectorTest, but here is a summary:

        // create consumerConfig and set config.deserializerClass to
        // MyDecoder (extends Decoder<MyRichType>)

        val consumer = new ZookeeperConsumerConnector[MyRichType](consumerConfig)
        val streams = consumer.createMessageStreams(topicMap) // each stream is of type KafkaMessageStream[MyRichType]
        for (stream : streams) {
        for (myRichTypeMessage <- stream)

        { // use myRichTypeMessage: MyRichType }

        }

        • With this approach, createMessageStreams can handle at most one decoder
          class. If the client needs different decoders for different topics, the
          client will need to create separate consumer connectors with a different
          deserializer for each one. One possible way to avoid that would be to make
          the deserializer config a topic:deserializer map, and move the type
          parameter to the createMessageStreams method. The client will still need
          to call createMessageStreams for each deserializer type, but will not need
          multiple connectors objects - not sure if this adds much value though and
          it makes the config more cumbersome.
        • For ConsoleConsumer, the MessageFormatter cannot be parameterized from the
          config. So that poses a bit of a redundancy problem in that the formatter
          presumably needs to decode the message. I guess you can have a decoder
          implementation that implements both the decoder and formatter traits, and
          have the formatter implementation use the decoder if necessary.

        Since this is a client API change, it would be great to discuss and resolve
        any concerns about it at this point - should we post this comment on
        kafka-user as well?

        Show
        Joel Koshy added a comment - Here is a first stab at plugging in a decoder for the consumer API. The html docs/quickstart also need to be updated after this is finalized, so this is more of a checkpoint review request. The consumer API becomes a bit clunky with the parameterized type so if anyone has ideas on making this simpler/cleaner please comment. There is an example of the API in action in the new test under ZookeeperConsumerConnectorTest, but here is a summary: // create consumerConfig and set config.deserializerClass to // MyDecoder (extends Decoder<MyRichType>) val consumer = new ZookeeperConsumerConnector [MyRichType] (consumerConfig) val streams = consumer.createMessageStreams(topicMap) // each stream is of type KafkaMessageStream [MyRichType] for (stream : streams) { for (myRichTypeMessage <- stream) { // use myRichTypeMessage: MyRichType } } With this approach, createMessageStreams can handle at most one decoder class. If the client needs different decoders for different topics, the client will need to create separate consumer connectors with a different deserializer for each one. One possible way to avoid that would be to make the deserializer config a topic:deserializer map, and move the type parameter to the createMessageStreams method. The client will still need to call createMessageStreams for each deserializer type, but will not need multiple connectors objects - not sure if this adds much value though and it makes the config more cumbersome. For ConsoleConsumer, the MessageFormatter cannot be parameterized from the config. So that poses a bit of a redundancy problem in that the formatter presumably needs to decode the message. I guess you can have a decoder implementation that implements both the decoder and formatter traits, and have the formatter implementation use the decoder if necessary. Since this is a client API change, it would be great to discuss and resolve any concerns about it at this point - should we post this comment on kafka-user as well?
        Hide
        Jay Kreps added a comment -

        One way to do this would be to make use of type inference off the method arguments. For example, you could make the api be
        val firstStreamConfig = new StreamConfig[String](topic = "my_topic", parallelism = 5, decoder=new StringDecoder())
        val secondStreamConfig = new StreamConfig[Integer](topic = "your_topic", parallelism = 5, decoder=new IntDecoder())
        val streamSet: StreamSet = consumer.createMessageStreams(firstStreamConfig, secondStreamConfig)
        val streamOne: List[KafkaMessageStream[String]] = streamSet.get(firstStreamConfig)

        Here the type of the iterator is actually inferred from the parametrization on the config object passed in. This trick is used a lot in java APIs to help pass through type parameters. The other advantage is that it gives us a general per-topic config object. Currently we don't have a very clean way to do per-topic config.

        Another way to do this would be to ask if there is a way to just make the stream creation be one at a time without causing a "rebalance storm" for the clients. In that case you could do
        val consumer = new ZookeeperConsumerConnector[MyRichType](consumerConfig)
        val streamsA = consumer.createMessageStream("topic_a", decoder, parallelism)
        val streamsB = consumer.createMessageStream("topic_a", decoder, parallelism)
        To my eyes this later approach is much more friendly, especially in the common case where you just have one topic to consume from. I think the reason we didn't do this was to avoid having a sequence of these cause a ton of rebalancing activity. I wonder if there is a direct fix for that.

        Show
        Jay Kreps added a comment - One way to do this would be to make use of type inference off the method arguments. For example, you could make the api be val firstStreamConfig = new StreamConfig [String] (topic = "my_topic", parallelism = 5, decoder=new StringDecoder()) val secondStreamConfig = new StreamConfig [Integer] (topic = "your_topic", parallelism = 5, decoder=new IntDecoder()) val streamSet: StreamSet = consumer.createMessageStreams(firstStreamConfig, secondStreamConfig) val streamOne: List[KafkaMessageStream [String] ] = streamSet.get(firstStreamConfig) Here the type of the iterator is actually inferred from the parametrization on the config object passed in. This trick is used a lot in java APIs to help pass through type parameters. The other advantage is that it gives us a general per-topic config object. Currently we don't have a very clean way to do per-topic config. Another way to do this would be to ask if there is a way to just make the stream creation be one at a time without causing a "rebalance storm" for the clients. In that case you could do val consumer = new ZookeeperConsumerConnector [MyRichType] (consumerConfig) val streamsA = consumer.createMessageStream("topic_a", decoder, parallelism) val streamsB = consumer.createMessageStream("topic_a", decoder, parallelism) To my eyes this later approach is much more friendly, especially in the common case where you just have one topic to consume from. I think the reason we didn't do this was to avoid having a sequence of these cause a ton of rebalancing activity. I wonder if there is a direct fix for that.
        Hide
        Joel Koshy added a comment -

        Thanks for the tip - that's an interesting idea. I had actually tried using type inference to avoid having to specify the default [Message] when a decoder is not required, but couldn't get it to work - I don't recall exactly why, but now that I read your comment, I think parameterizing the API methods instead of the class makes it more tractable. I'll experiment tomorrow.

        Show
        Joel Koshy added a comment - Thanks for the tip - that's an interesting idea. I had actually tried using type inference to avoid having to specify the default [Message] when a decoder is not required, but couldn't get it to work - I don't recall exactly why, but now that I read your comment, I think parameterizing the API methods instead of the class makes it more tractable. I'll experiment tomorrow.
        Hide
        Joel Koshy added a comment -

        I tried various experiments for this - the difficulty lies in bringing the Java
        API up to par with the Scala API mainly due to limitations in (or my
        understanding of) the interoperability of Java-Scala generics and default
        arguments.

        The current consumer API is:

        createMessageStreams(topicCountMap: Map[String, Int]): Map[String,
        List[KafkaMessageStream]]

        With consumer decoders, we want to be able to return a Map[String,
        List[KafkaMessageStream[T]]], and if possible have different types bound to
        T in the map. This is doable in Scala, but I don't see how we can support an
        equally flexible API in Java.

        So I think our alternatives are:

        1 - Use type inference, and allow disparity between the Scala and Java APIs
        2 - Go with a simpler API that requires an explicit type parameter, but is
        consistent across Java/Scala
        3 - Other thoughts?

        For the first approach, the API could be:

        createMessageStreams[T](topicCountMap: Map[String, StreamConfig[T]]):
        Map[String, List[KafkaMessageStream[T]]]

        There are a couple of issues in getting this to work. Although it works well
        on the Scala side, Java is more limited. E.g., if the type parameter is
        declared covariant (i.e., +T) with Scala it is possible to mix different
        decoder types in a single call to createMessageStreams, but it is not
        possible do this cleanly in Java. Anyway, I think this use case would be
        rare. More importantly, the other useful benefit of this approach (default
        type if decoder is not specified) is also not available to the Java API,
        since Java doesn't understand default arguments. I tried a wrapper factory
        object to make StreamConfigs, but ultimately an explicit type and casting is
        required on the Java side.

        The second approach would be along the following lines:

        Consumer connector provides:

        createMessageStreams[T](topicCountMap: Map[String, Int], decoder:
        Decoder[T]): Map[String, List[KafkaMessageStream[T]]]

        So pretty much the existing API with the addition of the parameterized type
        and the decoder argument.

        Scala usage:

        // c is a ConsumerConnector

        val intStreamMap = c.createMessageStreams(intTopicMap, new IntDecoder)
        val floatStreamMap = c.createMessageStreams(floatTopicMap, new FloatDeocder)
        val aFloatTopicStreams = floatStreamMap.get("aFloatTopic")

        Java usage:

        java.util.Map<String, List<KafkaMessageStream<Float>>> floatStreamMap =
        c.createMessageStreams(floatTopicMap, new FloatDecoder());

        KafkaMessageStream<Float> aFloatTopicStreams =
        floatStreamMap.get("aFloatTopic");

        Thoughts?

        Show
        Joel Koshy added a comment - I tried various experiments for this - the difficulty lies in bringing the Java API up to par with the Scala API mainly due to limitations in (or my understanding of) the interoperability of Java-Scala generics and default arguments. The current consumer API is: createMessageStreams(topicCountMap: Map [String, Int] ): Map[String, List [KafkaMessageStream] ] With consumer decoders, we want to be able to return a Map[String, List[KafkaMessageStream [T] ]], and if possible have different types bound to T in the map. This is doable in Scala, but I don't see how we can support an equally flexible API in Java. So I think our alternatives are: 1 - Use type inference, and allow disparity between the Scala and Java APIs 2 - Go with a simpler API that requires an explicit type parameter, but is consistent across Java/Scala 3 - Other thoughts? For the first approach, the API could be: createMessageStreams [T] (topicCountMap: Map[String, StreamConfig [T] ]): Map[String, List[KafkaMessageStream [T] ]] There are a couple of issues in getting this to work. Although it works well on the Scala side, Java is more limited. E.g., if the type parameter is declared covariant (i.e., +T) with Scala it is possible to mix different decoder types in a single call to createMessageStreams, but it is not possible do this cleanly in Java. Anyway, I think this use case would be rare. More importantly, the other useful benefit of this approach (default type if decoder is not specified) is also not available to the Java API, since Java doesn't understand default arguments. I tried a wrapper factory object to make StreamConfigs, but ultimately an explicit type and casting is required on the Java side. The second approach would be along the following lines: Consumer connector provides: createMessageStreams [T] (topicCountMap: Map [String, Int] , decoder: Decoder [T] ): Map[String, List[KafkaMessageStream [T] ]] So pretty much the existing API with the addition of the parameterized type and the decoder argument. Scala usage: // c is a ConsumerConnector val intStreamMap = c.createMessageStreams(intTopicMap, new IntDecoder) val floatStreamMap = c.createMessageStreams(floatTopicMap, new FloatDeocder) val aFloatTopicStreams = floatStreamMap.get("aFloatTopic") Java usage: java.util.Map<String, List<KafkaMessageStream<Float>>> floatStreamMap = c.createMessageStreams(floatTopicMap, new FloatDecoder()); KafkaMessageStream<Float> aFloatTopicStreams = floatStreamMap.get("aFloatTopic"); Thoughts?
        Hide
        Jun Rao added a comment -

        I vote for the second approach. The API is simpler and is consistent between java and scala.

        Show
        Jun Rao added a comment - I vote for the second approach. The API is simpler and is consistent between java and scala.
        Hide
        Joel Koshy added a comment -

        Here is an updated patch. (Btw, I added a new method to the Java consumer connector interface to allow creating streams without having to specify the default decoder each time.)

        Show
        Joel Koshy added a comment - Here is an updated patch. (Btw, I added a new method to the Java consumer connector interface to allow creating streams without having to specify the default decoder each time.)
        Hide
        Jun Rao added a comment -

        Overall, the patch looks good. Some minor comments:

        1. Does ZookeeperConsumerConnector.consume need a default decoder? The caller always passes in a decoder.

        2. Shouldn't ConsumerShell use StringDecoder?

        3. Consumer.java doesn't need to import kafka.serializer.DefaultDecoder.

        4. It seems you don't need the default decoder in ZookeeperConsumerConnector.createMessageStreams, since the default decoder is already defined in the trait.

        Show
        Jun Rao added a comment - Overall, the patch looks good. Some minor comments: 1. Does ZookeeperConsumerConnector.consume need a default decoder? The caller always passes in a decoder. 2. Shouldn't ConsumerShell use StringDecoder? 3. Consumer.java doesn't need to import kafka.serializer.DefaultDecoder. 4. It seems you don't need the default decoder in ZookeeperConsumerConnector.createMessageStreams, since the default decoder is already defined in the trait.
        Hide
        Joel Koshy added a comment -

        Made the changes in this patch. For 2, either way works, but makes more sense to use StringDecoder now that we have the decoder in the consumer api.

        Show
        Joel Koshy added a comment - Made the changes in this patch. For 2, either way works, but makes more sense to use StringDecoder now that we have the decoder in the consumer api.
        Hide
        Jun Rao added a comment -

        Thanks, Joel. Just committed this.

        Show
        Jun Rao added a comment - Thanks, Joel. Just committed this.

          People

          • Assignee:
            Unassigned
            Reporter:
            Anonymous
          • Votes:
            0 Vote for this issue
            Watchers:
            0 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development