Kafka
  1. Kafka
  2. KAFKA-249

Separate out Kafka mirroring into a stand-alone app

    Details

    • Type: Improvement Improvement
    • Status: Resolved
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 0.7.1
    • Component/s: core
    • Labels:
      None

      Description

      I would like to discuss on this jira, the feasibility/benefits of separating
      out Kafka's mirroring feature from the broker into a stand-alone app, as it
      currently has a couple of limitations and issues.

      For example, we recently had to deal with Kafka mirrors that were in fact
      idle due to the fact that mirror threads were not created at start-up due to
      a rebalancing exception, but the Kafka broker itself did not shutdown. This
      has since been fixed, but is indicative of (avoidable) problems in embedding
      non-broker specific features in the broker.

      Logically, it seems to make sense to separate it out to achieve better
      division of labor. Furthermore, enhancements to mirroring may be less
      clunky to implement and use with a stand-alone app. For example to support
      custom partitioning on the target cluster, or to mirror from multiple
      clusters we would probably need to be able to pass in multiple embedded
      consumer/embedded producer configs, which would be less ugly if the
      mirroring process were a stand-alone app. Also, if we break it out, it
      would be convenient to use as a "consumption engine" for the console
      consumer which will make it easier to add on features such as wildcards in
      topic consumption, since it contains a ZooKeeper topic discovery component.

      Any suggestions and/or objections to this?

      1. KAFKA-249.v1.patch
        73 kB
        Joel Koshy
      2. KAFKA-249.v2.patch
        113 kB
        Joel Koshy
      3. KAFKA-249.v3.patch
        189 kB
        Joel Koshy
      4. KAFKA-249.v3-v4.incremental.patch
        67 kB
        Joel Koshy
      5. KAFKA-249.v4.patch
        191 kB
        Joel Koshy
      6. KAFKA-249.v4-v5.incremental.patch
        13 kB
        Joel Koshy
      7. KAFKA-249.v5.patch
        192 kB
        Joel Koshy

        Activity

        Hide
        Jun Rao added a comment -

        Joel, thanks for the patch. Just committed to trunk.

        Show
        Jun Rao added a comment - Joel, thanks for the patch. Just committed to trunk.
        Hide
        Joel Koshy added a comment -

        Incremental patch over v4 in case it helps.

        Show
        Joel Koshy added a comment - Incremental patch over v4 in case it helps.
        Hide
        Joel Koshy added a comment -

        Done. I also enabled shallow iteration in the system test.

        Show
        Joel Koshy added a comment - Done. I also enabled shallow iteration in the system test.
        Hide
        Jun Rao added a comment -

        Actually, just committed kafka-315. Could you rebase?

        Show
        Jun Rao added a comment - Actually, just committed kafka-315. Could you rebase?
        Hide
        Jun Rao added a comment -

        v4 patch looks good to me. If there is no objection, I will commit the code tomorrow.

        Show
        Jun Rao added a comment - v4 patch looks good to me. If there is no objection, I will commit the code tomorrow.
        Hide
        Joel Koshy added a comment -

        Incremental patch v3 -> v4.

        Show
        Joel Koshy added a comment - Incremental patch v3 -> v4.
        Hide
        Joel Koshy added a comment -

        Thanks for the comments. If it helps quicker review, I can upload an incremental diff (over the previous one). Changes in this patch:

        • Removed TopicFilterSpec - I'll update the consumer API proposal wiki with this info.
        • Restored MessageAndOffset - agreed that it is a bit weird for MessageSet to use MessageAndMetadata, which is why I put "invalid" defaults for the metadata fields if unused.
        • Added explicit types for the vals in reinitializeConsumer along with comments showing examples (would have been good if Scala supported named components when specifying maps/tuples, etc.).
        • Added TODO for 0.8 to deal with deleted topics. We probably need to do more in addition to removing the topic from the streams map.
        • Fixed mirrormaker with the producer.close - thanks for catching this.
        Show
        Joel Koshy added a comment - Thanks for the comments. If it helps quicker review, I can upload an incremental diff (over the previous one). Changes in this patch: Removed TopicFilterSpec - I'll update the consumer API proposal wiki with this info. Restored MessageAndOffset - agreed that it is a bit weird for MessageSet to use MessageAndMetadata, which is why I put "invalid" defaults for the metadata fields if unused. Added explicit types for the vals in reinitializeConsumer along with comments showing examples (would have been good if Scala supported named components when specifying maps/tuples, etc.). Added TODO for 0.8 to deal with deleted topics. We probably need to do more in addition to removing the topic from the streams map. Fixed mirrormaker with the producer.close - thanks for catching this.
        Hide
        Jun Rao added a comment -

        v2 looks much better now. Some additional comments:

        21. Should isTopicAllowed be part of TopicFilterSpec, especially if we want to extend it in the future? If so, we don't need TopicFilter.
        22. In this patch, I suggest that we only put message and topic in MessageAndMetadata. We can have a separate jira on how to expose offsets to the consumer. There, we need to discuss how a consume can rewind the consumption using the offset returned.
        23. It's probably better to rename KafkaMessageAndMetadataStream to KafkaStream.
        24. ZookeeperConsumerConnector:
        24.1 reinitializeConsumer: I think it will make the code easier to understand if we explicitly define the type of val consumerThreadIdsPerTopic, topicThreadIds and threadQueueStreamPairs. It would be also very useful to explicitly define the return type of consumerThreadIdsPerTopic.flatten.
        24.2 reinitializeConsumer: This method is called every time a new topic is discovered. It feels strange that we have to register the consumer here. Ideally, each consumer is registered exactly once. Also, it seems that each time this method is called, we only add new entries to loadBalancerListener.kafkaMessageAndMetadataStreams. Shouldn't we clear this map first so that deleted topics can be removed?

        25. ByteBufferMessageSet: It's not clear to me if the iterator of ByteBufferMessageSet should return MessageAndMetadata. This is because ByteBufferMessageSet itself doesn't know all the metadata, such as topic and partition. So, it seems the iterator of this class should probably remain MessageAndOffset. MessageAndMetadata is only used for the client api.

        26. MirrorMaker: The shutdown hook should close producer.

        Show
        Jun Rao added a comment - v2 looks much better now. Some additional comments: 21. Should isTopicAllowed be part of TopicFilterSpec, especially if we want to extend it in the future? If so, we don't need TopicFilter. 22. In this patch, I suggest that we only put message and topic in MessageAndMetadata. We can have a separate jira on how to expose offsets to the consumer. There, we need to discuss how a consume can rewind the consumption using the offset returned. 23. It's probably better to rename KafkaMessageAndMetadataStream to KafkaStream. 24. ZookeeperConsumerConnector: 24.1 reinitializeConsumer: I think it will make the code easier to understand if we explicitly define the type of val consumerThreadIdsPerTopic, topicThreadIds and threadQueueStreamPairs. It would be also very useful to explicitly define the return type of consumerThreadIdsPerTopic.flatten. 24.2 reinitializeConsumer: This method is called every time a new topic is discovered. It feels strange that we have to register the consumer here. Ideally, each consumer is registered exactly once. Also, it seems that each time this method is called, we only add new entries to loadBalancerListener.kafkaMessageAndMetadataStreams. Shouldn't we clear this map first so that deleted topics can be removed? 25. ByteBufferMessageSet: It's not clear to me if the iterator of ByteBufferMessageSet should return MessageAndMetadata. This is because ByteBufferMessageSet itself doesn't know all the metadata, such as topic and partition. So, it seems the iterator of this class should probably remain MessageAndOffset. MessageAndMetadata is only used for the client api. 26. MirrorMaker: The shutdown hook should close producer.
        Hide
        Joel Koshy added a comment -

        This patch addresses the points raised in the review for v2.

        I'm terrible at naming methods, so let me know if you have a better name for reinitializeConsumer.

        Also, Jay had brought up some good points wrt the API (https://cwiki.apache.org/confluence/display/KAFKA/Consumer+API+changes). I think it would be good to get this reviewed for any other issues even as that conversation goes on.

        Show
        Joel Koshy added a comment - This patch addresses the points raised in the review for v2. I'm terrible at naming methods, so let me know if you have a better name for reinitializeConsumer. Also, Jay had brought up some good points wrt the API ( https://cwiki.apache.org/confluence/display/KAFKA/Consumer+API+changes ). I think it would be good to get this reviewed for any other issues even as that conversation goes on.
        Hide
        Neha Narkhede added a comment -

        > 5. There are several new class variables called wildcard* in
        > ZookeeperConsumerConnector. I'm thinking they can just be variables local
        > to createMessageStreamsByFilter ?

        >> Related to above. consumeWildcardTopics actually needs to access these so
        >> that's why it's global - in this case global makes sense in that you really
        >> wouldn't need to (and currently cannot) make multiple calls to
        >> createMessageStreamsByFilter. However, it would be good to localize them if
        >> possible to make the code easier to read.

        Also, is there a reason for making ZookeeperConsumerConnector implement the TopicEventHandler trait ? It can simply be a separate class that implements the handleTopicEvent right ? If you do that, you can pass in wildcard related variables in the constructor of this topic handler. I guess only the wildcardTopicWatcher needs to be a class variable, since it is required during shutdown.

        Show
        Neha Narkhede added a comment - > 5. There are several new class variables called wildcard* in > ZookeeperConsumerConnector. I'm thinking they can just be variables local > to createMessageStreamsByFilter ? >> Related to above. consumeWildcardTopics actually needs to access these so >> that's why it's global - in this case global makes sense in that you really >> wouldn't need to (and currently cannot) make multiple calls to >> createMessageStreamsByFilter. However, it would be good to localize them if >> possible to make the code easier to read. Also, is there a reason for making ZookeeperConsumerConnector implement the TopicEventHandler trait ? It can simply be a separate class that implements the handleTopicEvent right ? If you do that, you can pass in wildcard related variables in the constructor of this topic handler. I guess only the wildcardTopicWatcher needs to be a class variable, since it is required during shutdown.
        Hide
        Joel Koshy added a comment -

        Thanks for the reviews. Further comments inline.

        Jun's comments:

        > 1. For future extension, I am thinking that we should probably unifying
        > KafkaMessageStream and KafkaMessageAndTopicStream to sth like
        > KafkaMessageMetadataStream. The stream gives a iterator of Message and its
        > associated meta data. For now, the meta data can be just topic. In the
        > future, it may include things like partition id and offset.

        That's a good suggestion. I'm not sure if it is better to factor that change
        for the existing createMessageStreams into 0.8 instead of trunk, because it
        is a fundamental API change that would break existing clients (at compile
        time). I can propose this to the mailing list to see if anyone has a
        preference. If no one objects, then we can remove it.

        > 2. ZookeeperConsumerConnector: 2.1 updateFetcher: no need to pass in
        > messagStreams

        Will do

        > 2.2 ZKRebalancerListener: It seems that kafkaMessageStream can be
        > immutable.

        It is mutable because it is updated in consumeWildcardTopics.

        > 2.3 createMessageStreamByFilter: topicsStreamsMap is empty when passed to
        > ZKRebalanceListener. This means that the queue is not cleared during
        > rebalance.

        Related to previous comment. The topicsStreamsMap is bootstrapped in
        consumeWildCardTopics and updated at every topic event if there are new
        allowed topics. So it will be populated before any rebalance occurs.

        > 2.4 consumeWildCardTopics: I find it hard to read the code in this method.
        > Is there a real benefit to use implicit conversion here, instead of
        > explicit conversion? It's not clear to me where the conversion is used.
        > The 2-level tuple makes it hard to figure out what the referred fields
        > represent. Is the code relying on groupedTopicThreadIds being sorted by
        > (topic, threadid)? If so, where is that enforced.

        The map flatten method is a bit confusing. I'm using (and hopefully not
        misusing) this variant:

        def flatten [B] (implicit asTraversable: ((A, B)) ⇒ TraversableOnce[B]): Traversable[B]

        Converts this map of traversable collections into a map in which all element
        collections are concatenated.

        It basically allows you to take the KV pairs of a map and generate some
        traversable collection out of it. Here is how I'm using it: We have a list
        of queues (e.g., List(queue1, queue2)) and a map of
        consumerThreadIdsPerTopic (e.g.,

        { "topic1" -> Set("topic1-1", "topic1-2"), "topic2" -> Set("topic2-1", "topic2-2"), "topic3" -> Set("topic3-1", topic3-2") }

        ).

        From the above I need to create pairs of topic/thread -> queue, like this:

        { ("topic1", "topic1-1") -> queue1, ("topic1", "topic1-2") -> queue2, ("topic2", "topic2-1") -> queue1, ("topic2", "topic2-2") -> queue2, ("topic3", "topic3-1") -> queue1, ("topic3", "topic3-2") -> queue2 }

        This is a bit tricky and I had trouble finding a clearer way to write it.
        However, I agree that this snippet is hard to read - even I'm having
        difficulty reading it now, but I think keeping it concise as is and adding
        comments such as the above example to explain what is going on should help.

        > 3. KafkaServerStartable: Should we remove the embedded consumer now?

        My original thought was that it would be good to keep it around for
        fall-back, but I guess it can be removed.

        > 4. Utils, UtilsTest: unused import

        Will do.

        --------------------------------------------------------------------------------

        Neha's comments:

        > 1. It seems awkward that there is a MessageStream trait and the only API
        > it exposes is clear(). Any reason it doesn't expose the iterator() API ?
        > From a user's perspective, one might think, since it is a stream, it would
        > expose stream specific APIs too. It will be good to add docs to that API
        > to explain exactly what it is meant for.

        The only reason it was added was because I have two message stream types
        now. Anyway, this will go away if we switch to the common
        KafkaMessageMetadataStream.

        > 3. There is some critical code that is duplicated in the
        > ZookeeperConsumerConnector. consume() and consumeWildcardTopics() have
        > some code in common. It would be great if this can be refactored to share
        > the logic of registering session expiration listeners, registering watches
        > on consumer group changes and topic partition changes.

        Will do

        > 4. Could you merge all the logic that wraps the wildcard handling in one
        > API ? Right now, it is distributed between createMessageStreamsByFilter
        > and consumeWildcardTopics. It will be great if there is one API that will
        > pre process the wild cards, create the relevant queues and then call a
        > common consume() that has the logic described in item 5 above.

        Slightly involved, but it is worth doing.

        > 5. There are several new class variables called wildcard* in
        > ZookeeperConsumerConnector. I'm thinking they can just be variables local
        > to createMessageStreamsByFilter ?

        Related to above. consumeWildcardTopics actually needs to access these so
        that's why it's global - in this case global makes sense in that you really
        wouldn't need to (and currently cannot) make multiple calls to
        createMessageStreamsByFilter. However, it would be good to localize them if
        possible to make the code easier to read.

        > 6. There is a MessageAndTopic class, that seems to act as a container to
        > hold message and other metadata, but only exposes one API to get the
        > message. Topic is exposed by making it a public val. Would it make sense
        > to either make it a case class or provide consistent APIs for all fields
        > it holds ?

        Ok, but this will likely go away due to the MessageMetadata discussion.

        > 7. Since now we seem to have more than one iterators for the consumer,
        > would it make sense to rename ConsumerIterator to MessageIterator, and
        > TopicalConsumerIterator to MessageAndMetadataIterator ?

        Makes sense, but it could break existing users of KafkaMessageStream. Also,
        if we can get rid of KafkaMessageStream and just go with
        KafkaMessageAndMetadataStream we will have only one iterator type.

        > 8. rat fails on this patch. There are some files without the Apache header

        Good catch and reminder that reviews should ideally include running rat. I
        do need to add the header for some files.

        Show
        Joel Koshy added a comment - Thanks for the reviews. Further comments inline. Jun's comments: > 1. For future extension, I am thinking that we should probably unifying > KafkaMessageStream and KafkaMessageAndTopicStream to sth like > KafkaMessageMetadataStream. The stream gives a iterator of Message and its > associated meta data. For now, the meta data can be just topic. In the > future, it may include things like partition id and offset. That's a good suggestion. I'm not sure if it is better to factor that change for the existing createMessageStreams into 0.8 instead of trunk, because it is a fundamental API change that would break existing clients (at compile time). I can propose this to the mailing list to see if anyone has a preference. If no one objects, then we can remove it. > 2. ZookeeperConsumerConnector: 2.1 updateFetcher: no need to pass in > messagStreams Will do > 2.2 ZKRebalancerListener: It seems that kafkaMessageStream can be > immutable. It is mutable because it is updated in consumeWildcardTopics. > 2.3 createMessageStreamByFilter: topicsStreamsMap is empty when passed to > ZKRebalanceListener. This means that the queue is not cleared during > rebalance. Related to previous comment. The topicsStreamsMap is bootstrapped in consumeWildCardTopics and updated at every topic event if there are new allowed topics. So it will be populated before any rebalance occurs. > 2.4 consumeWildCardTopics: I find it hard to read the code in this method. > Is there a real benefit to use implicit conversion here, instead of > explicit conversion? It's not clear to me where the conversion is used. > The 2-level tuple makes it hard to figure out what the referred fields > represent. Is the code relying on groupedTopicThreadIds being sorted by > (topic, threadid)? If so, where is that enforced. The map flatten method is a bit confusing. I'm using (and hopefully not misusing) this variant: def flatten [B] (implicit asTraversable: ((A, B)) ⇒ TraversableOnce [B] ): Traversable [B] Converts this map of traversable collections into a map in which all element collections are concatenated. It basically allows you to take the KV pairs of a map and generate some traversable collection out of it. Here is how I'm using it: We have a list of queues (e.g., List(queue1, queue2)) and a map of consumerThreadIdsPerTopic (e.g., { "topic1" -> Set("topic1-1", "topic1-2"), "topic2" -> Set("topic2-1", "topic2-2"), "topic3" -> Set("topic3-1", topic3-2") } ). From the above I need to create pairs of topic/thread -> queue, like this: { ("topic1", "topic1-1") -> queue1, ("topic1", "topic1-2") -> queue2, ("topic2", "topic2-1") -> queue1, ("topic2", "topic2-2") -> queue2, ("topic3", "topic3-1") -> queue1, ("topic3", "topic3-2") -> queue2 } This is a bit tricky and I had trouble finding a clearer way to write it. However, I agree that this snippet is hard to read - even I'm having difficulty reading it now, but I think keeping it concise as is and adding comments such as the above example to explain what is going on should help. > 3. KafkaServerStartable: Should we remove the embedded consumer now? My original thought was that it would be good to keep it around for fall-back, but I guess it can be removed. > 4. Utils, UtilsTest: unused import Will do. -------------------------------------------------------------------------------- Neha's comments: > 1. It seems awkward that there is a MessageStream trait and the only API > it exposes is clear(). Any reason it doesn't expose the iterator() API ? > From a user's perspective, one might think, since it is a stream, it would > expose stream specific APIs too. It will be good to add docs to that API > to explain exactly what it is meant for. The only reason it was added was because I have two message stream types now. Anyway, this will go away if we switch to the common KafkaMessageMetadataStream. > 3. There is some critical code that is duplicated in the > ZookeeperConsumerConnector. consume() and consumeWildcardTopics() have > some code in common. It would be great if this can be refactored to share > the logic of registering session expiration listeners, registering watches > on consumer group changes and topic partition changes. Will do > 4. Could you merge all the logic that wraps the wildcard handling in one > API ? Right now, it is distributed between createMessageStreamsByFilter > and consumeWildcardTopics. It will be great if there is one API that will > pre process the wild cards, create the relevant queues and then call a > common consume() that has the logic described in item 5 above. Slightly involved, but it is worth doing. > 5. There are several new class variables called wildcard* in > ZookeeperConsumerConnector. I'm thinking they can just be variables local > to createMessageStreamsByFilter ? Related to above. consumeWildcardTopics actually needs to access these so that's why it's global - in this case global makes sense in that you really wouldn't need to (and currently cannot) make multiple calls to createMessageStreamsByFilter. However, it would be good to localize them if possible to make the code easier to read. > 6. There is a MessageAndTopic class, that seems to act as a container to > hold message and other metadata, but only exposes one API to get the > message. Topic is exposed by making it a public val. Would it make sense > to either make it a case class or provide consistent APIs for all fields > it holds ? Ok, but this will likely go away due to the MessageMetadata discussion. > 7. Since now we seem to have more than one iterators for the consumer, > would it make sense to rename ConsumerIterator to MessageIterator, and > TopicalConsumerIterator to MessageAndMetadataIterator ? Makes sense, but it could break existing users of KafkaMessageStream. Also, if we can get rid of KafkaMessageStream and just go with KafkaMessageAndMetadataStream we will have only one iterator type. > 8. rat fails on this patch. There are some files without the Apache header Good catch and reminder that reviews should ideally include running rat. I do need to add the header for some files.
        Hide
        Neha Narkhede added a comment -

        This is a cool feature. Thanks for the patch !

        A couple of questions -

        1. It seems awkward that there is a MessageStream trait and the only API it exposes is clear(). Any reason it doesn't expose the iterator() API ? From a user's perspective, one might think, since it is a stream, it would expose stream specific APIs too. It will be good to add docs to that API to explain exactly what it is meant for.

        2. I like the idea of having just one KafkaMessageMetadataStream.

        3. There is some critical code that is duplicated in the ZookeeperConsumerConnector. consume() and consumeWildcardTopics() have some code in common. It would be great if this can be refactored to share the logic of registering session expiration listeners, registering watches on consumer group changes and topic partition changes.

        4. Could you merge all the logic that wraps the wildcard handling in one API ? Right now, it is distributed between createMessageStreamsByFilter and consumeWildcardTopics. It will be great if there is one API that will pre process the wild cards, create the relevant queues and then call a common consume() that has the logic described in item 5 above.

        5. There are several new class variables called wildcard* in ZookeeperConsumerConnector. I'm thinking they can just be variables local to createMessageStreamsByFilter ?

        6. There is a MessageAndTopic class, that seems to act as a container to hold message and other metadata, but only exposes one API to get the message. Topic is exposed by making it a public val. Would it make sense to either make it a case class or provide consistent APIs for all fields it holds ?

        7. Since now we seem to have more than one iterators for the consumer, would it make sense to rename ConsumerIterator to MessageIterator, and TopicalConsumerIterator to MessageAndMetadataIterator ?

        8. rat fails on this patch. There are some files without the Apache header

        Show
        Neha Narkhede added a comment - This is a cool feature. Thanks for the patch ! A couple of questions - 1. It seems awkward that there is a MessageStream trait and the only API it exposes is clear(). Any reason it doesn't expose the iterator() API ? From a user's perspective, one might think, since it is a stream, it would expose stream specific APIs too. It will be good to add docs to that API to explain exactly what it is meant for. 2. I like the idea of having just one KafkaMessageMetadataStream. 3. There is some critical code that is duplicated in the ZookeeperConsumerConnector. consume() and consumeWildcardTopics() have some code in common. It would be great if this can be refactored to share the logic of registering session expiration listeners, registering watches on consumer group changes and topic partition changes. 4. Could you merge all the logic that wraps the wildcard handling in one API ? Right now, it is distributed between createMessageStreamsByFilter and consumeWildcardTopics. It will be great if there is one API that will pre process the wild cards, create the relevant queues and then call a common consume() that has the logic described in item 5 above. 5. There are several new class variables called wildcard* in ZookeeperConsumerConnector. I'm thinking they can just be variables local to createMessageStreamsByFilter ? 6. There is a MessageAndTopic class, that seems to act as a container to hold message and other metadata, but only exposes one API to get the message. Topic is exposed by making it a public val. Would it make sense to either make it a case class or provide consistent APIs for all fields it holds ? 7. Since now we seem to have more than one iterators for the consumer, would it make sense to rename ConsumerIterator to MessageIterator, and TopicalConsumerIterator to MessageAndMetadataIterator ? 8. rat fails on this patch. There are some files without the Apache header
        Hide
        Jun Rao added a comment -

        Thanks for the patch. Some comments:

        1. For future extension, I am thinking that we should probably unifying KafkaMessageStream and KafkaMessageAndTopicStream to sth like KafkaMessageMetadataStream. The stream gives a iterator of Message and its associated meta data. For now, the meta data can be just topic. In the future, it may include things like partition id and offset.

        2. ZookeeperConsumerConnector:
        2.1 updateFetcher: no need to pass in messagStreams
        2.2 ZKRebalancerListener: It seems that kafkaMessageStream can be immutable.
        2.3 createMessageStreamByFilter: topicsStreamsMap is empty when passed to ZKRebalanceListener. This means that the queue is not cleared during rebalance.
        2.4 consumeWildCardTopics: I find it hard to read the code in this method. Is there a real benefit to use implicit conversion here, instead of explicit conversion? It's not clear to me where the conversion is used. The 2-level tuple makes it hard to figure out what the referred fields represent. Is the code relying on groupedTopicThreadIds being sorted by (topic, threadid)? If so, where is that enforced.

        3. KafkaServerStartable: Should we remove the embedded consumer now?

        4. Utils, UtilsTest: unused import

        Show
        Jun Rao added a comment - Thanks for the patch. Some comments: 1. For future extension, I am thinking that we should probably unifying KafkaMessageStream and KafkaMessageAndTopicStream to sth like KafkaMessageMetadataStream. The stream gives a iterator of Message and its associated meta data. For now, the meta data can be just topic. In the future, it may include things like partition id and offset. 2. ZookeeperConsumerConnector: 2.1 updateFetcher: no need to pass in messagStreams 2.2 ZKRebalancerListener: It seems that kafkaMessageStream can be immutable. 2.3 createMessageStreamByFilter: topicsStreamsMap is empty when passed to ZKRebalanceListener. This means that the queue is not cleared during rebalance. 2.4 consumeWildCardTopics: I find it hard to read the code in this method. Is there a real benefit to use implicit conversion here, instead of explicit conversion? It's not clear to me where the conversion is used. The 2-level tuple makes it hard to figure out what the referred fields represent. Is the code relying on groupedTopicThreadIds being sorted by (topic, threadid)? If so, where is that enforced. 3. KafkaServerStartable: Should we remove the embedded consumer now? 4. Utils, UtilsTest: unused import
        Hide
        Joel Koshy added a comment -

        Sorry about the delay on this. Here is a patch incorporating the design
        change. Overview of changes:

        • Added topic watcher to ZookeeperConsumerConnector for creating message
          streams based on filters.
        • The API is slightly different from the previous patch - just one
          createMessageStreamsByFilter call instead of separate ones for
          whitelist/blacklist.
        • Since we may now iterate over messages from multiple topics, added a new
          KafkaMessageAndTopicStream and TopicalConsumerIterator, that iterates over
          MessageAndTopic objects.
        • For wildcarded consumption, the topic count string may need to change in
          ZK for new topics. To avoid additional logic to handle this, added
          WildcardTopicCount for this, distinguished from StaticTopicCount.
          WildcardTopicCount's encoding is described in the TopicCount class.
        • New mirror maker tool.
        • Updated mirror maker system test.
        • Updated console consumer which now allows one of: topic, whitelist,
          blacklist
        • Added logIdent field to Logging trait, defaults to "". Not so sure this
          is a great idea, but the reason I needed this is that the mirror-maker may
          instantiate multiple ZK connectors and it is very unclear which messages
          come from which connector.
          E.g.,
          [2012-03-26 17:10:06,114] INFO group1_jkoshy-ld-1332807005602-c0176b3f Committing all offsets after clearing the fetcher queues (kafka.consumer.ZookeeperConsumerConnector)

        A few other comments:

        • Apparently there are issues in recreating messages streams from the same
          zkconnector, so I have disabled it for all the createMessageStreams*
          methods. The createMessageStreamsByFilter method will only allow one call
          to it the way it is implemented, but I don't think that is a serious
          limitation.
        • I noticed a small caveat in using joptsimple - if I say --whitelist ".*"
          it interprets it as ".". However, --whitelist=".*". --whitelist ".+" all
          work.
        • I encountered a small shutdown issue - in the system test, I have two
          connectors. When shutting them down, the first one to shut down triggers a
          rebalance in the other connector. However, that connector is itself
          shutting down and sets zkclient to null. So I see null pointer exceptions
          due to accessing ZK as part of rebalance. We should probably add a
          isRebalancing atomic bool and not shutdown if that is set, and vice-versa.
          I can roll that in as part of this patch if it makes sense.
        Show
        Joel Koshy added a comment - Sorry about the delay on this. Here is a patch incorporating the design change. Overview of changes: Added topic watcher to ZookeeperConsumerConnector for creating message streams based on filters. The API is slightly different from the previous patch - just one createMessageStreamsByFilter call instead of separate ones for whitelist/blacklist. Since we may now iterate over messages from multiple topics, added a new KafkaMessageAndTopicStream and TopicalConsumerIterator, that iterates over MessageAndTopic objects. For wildcarded consumption, the topic count string may need to change in ZK for new topics. To avoid additional logic to handle this, added WildcardTopicCount for this, distinguished from StaticTopicCount. WildcardTopicCount's encoding is described in the TopicCount class. New mirror maker tool. Updated mirror maker system test. Updated console consumer which now allows one of: topic, whitelist, blacklist Added logIdent field to Logging trait, defaults to "". Not so sure this is a great idea, but the reason I needed this is that the mirror-maker may instantiate multiple ZK connectors and it is very unclear which messages come from which connector. E.g., [2012-03-26 17:10:06,114] INFO group1_jkoshy-ld-1332807005602-c0176b3f Committing all offsets after clearing the fetcher queues (kafka.consumer.ZookeeperConsumerConnector) A few other comments: Apparently there are issues in recreating messages streams from the same zkconnector, so I have disabled it for all the createMessageStreams* methods. The createMessageStreamsByFilter method will only allow one call to it the way it is implemented, but I don't think that is a serious limitation. I noticed a small caveat in using joptsimple - if I say --whitelist ".*" it interprets it as ".". However, --whitelist=".*". --whitelist ".+" all work. I encountered a small shutdown issue - in the system test, I have two connectors. When shutting them down, the first one to shut down triggers a rebalance in the other connector. However, that connector is itself shutting down and sets zkclient to null. So I see null pointer exceptions due to accessing ZK as part of rebalance. We should probably add a isRebalancing atomic bool and not shutdown if that is set, and vice-versa. I can roll that in as part of this patch if it makes sense.
        Hide
        Jaroslaw Odzga added a comment -

        +1 for feature

        Show
        Jaroslaw Odzga added a comment - +1 for feature
        Hide
        Joel Koshy added a comment -

        I meant +1 on the suggestion, not my own patch

        The approach you propose is more generic and simpler. It should be possible by using just one queue for all the fetchers.

        Show
        Joel Koshy added a comment - I meant +1 on the suggestion, not my own patch The approach you propose is more generic and simpler. It should be possible by using just one queue for all the fetchers.
        Hide
        Joel Koshy added a comment -

        +1

        Show
        Joel Koshy added a comment - +1
        Hide
        Jun Rao added a comment -

        Could we support subscribing to wildcard topics directly in consumerConnector? We could add a new api like the following:

        createMessageStream(topicRegex: String) : KafkaMessageStream

        All topics that match topicRegex will be returned in a single message stream, which can then be iterated.

        Show
        Jun Rao added a comment - Could we support subscribing to wildcard topics directly in consumerConnector? We could add a new api like the following: createMessageStream(topicRegex: String) : KafkaMessageStream All topics that match topicRegex will be returned in a single message stream, which can then be iterated.
        Hide
        Joel Koshy added a comment -

        Overview of changes:

        • New abstract consumer-agent that embeds a topic event watcher to allow
          topic discovery (similar to what the embedded consumer does). It provides
          a processMessage hook that concrete implementations can specify.
        • New stand-alone mirror-maker tool that extends from consumer-agent.
        • Console consumer now also extends from consumer-agent, and supports
          wildcarding in the topic list.
        • New mirror-maker system test that is similar to the embedded consumer
          system test, but tests with multiple source clusters. When/if we deprecate
          the embedded consumer we can delete its system test.

        Some comments:

        • Shutdown logic is somewhat tricky - and some of it was driven by
          requirements from console consumer. Let me know if you have suggestions on
          restructuring to make it simpler. The afterStoppingWorkerThread call is
          before the workersStoppedLatch.countDown so the concrete implementation
          should not block there forever. Or we could just move it after the
          countDown, or remove that hook altogether.
        • I thought it would be best to continue to have the embedded consumer and
          deprecate it later. So I also kept the "mirror" prefix in the
          whitelist/blacklist consumer config options.
        • The mirror-maker tool right now only uses one producer - which is "ok"
          with hacking around the broker.list config property to use multiple
          producer send threads underneath. However, we probably want multiple
          producers. So we can keep this under review until after Kafka-253 is
          merged into 0.7/trunk as that will affect this piece of code.
        • Wildcarding is standard Java regex. However, for convenience a
          comma-separated list of topics is allowed, in which case ',' is replaced
          by '|'. In doing so, I have assumed that commas were never allowed as part
          of topic names.
        • If the whitelist is non-trivial (where triviality is defined by having
          only alpha-numeric characters and '|') then a topic event watcher is
          created. Otherwise, no watcher is created. So for example, "whitetopic.*"
          would require a topic watcher, but "whitetopic01,whitetopic02" would not.
        • I would have preferred making consumer agent a trait, but that would limit
          extending it in Java as it contains implementation.
        • I needed to synchronize console consumer's processMessage to implement
          support for the maxMessages option. One way to avoid this complexity is to
          add maxMessages option to ConsumerConfig. If it is set, then we count
          numMessages as an atomic long in ConsumerAgent's loop and add (numMessages
          < maxMessages) to the Stream.continually condition.
        • There is a consumer shutdown bug (KAFKA-282). It is a simple fix but I
          think it is better to keep that in a separate jira.

        So how about we keep this pending until the producer refactoring has been
        well-tested and makes it to trunk? In the interim I can also prepare changes
        to update the Mirroring-howto and add a console consumer how-to as well.

        (Also, thanks to Neha and Jun for helping with debugging this.)

        Show
        Joel Koshy added a comment - Overview of changes: New abstract consumer-agent that embeds a topic event watcher to allow topic discovery (similar to what the embedded consumer does). It provides a processMessage hook that concrete implementations can specify. New stand-alone mirror-maker tool that extends from consumer-agent. Console consumer now also extends from consumer-agent, and supports wildcarding in the topic list. New mirror-maker system test that is similar to the embedded consumer system test, but tests with multiple source clusters. When/if we deprecate the embedded consumer we can delete its system test. Some comments: Shutdown logic is somewhat tricky - and some of it was driven by requirements from console consumer. Let me know if you have suggestions on restructuring to make it simpler. The afterStoppingWorkerThread call is before the workersStoppedLatch.countDown so the concrete implementation should not block there forever. Or we could just move it after the countDown, or remove that hook altogether. I thought it would be best to continue to have the embedded consumer and deprecate it later. So I also kept the "mirror" prefix in the whitelist/blacklist consumer config options. The mirror-maker tool right now only uses one producer - which is "ok" with hacking around the broker.list config property to use multiple producer send threads underneath. However, we probably want multiple producers. So we can keep this under review until after Kafka-253 is merged into 0.7/trunk as that will affect this piece of code. Wildcarding is standard Java regex. However, for convenience a comma-separated list of topics is allowed, in which case ',' is replaced by '|'. In doing so, I have assumed that commas were never allowed as part of topic names. If the whitelist is non-trivial (where triviality is defined by having only alpha-numeric characters and '|') then a topic event watcher is created. Otherwise, no watcher is created. So for example, "whitetopic.*" would require a topic watcher, but "whitetopic01,whitetopic02" would not. I would have preferred making consumer agent a trait, but that would limit extending it in Java as it contains implementation. I needed to synchronize console consumer's processMessage to implement support for the maxMessages option. One way to avoid this complexity is to add maxMessages option to ConsumerConfig. If it is set, then we count numMessages as an atomic long in ConsumerAgent's loop and add (numMessages < maxMessages) to the Stream.continually condition. There is a consumer shutdown bug ( KAFKA-282 ). It is a simple fix but I think it is better to keep that in a separate jira. So how about we keep this pending until the producer refactoring has been well-tested and makes it to trunk? In the interim I can also prepare changes to update the Mirroring-howto and add a console consumer how-to as well. (Also, thanks to Neha and Jun for helping with debugging this.)

          People

          • Assignee:
            Joel Koshy
            Reporter:
            Joel Koshy
          • Votes:
            1 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development