Details

    • Type: New Feature
    • Status: Reopened
    • Priority: Major
    • Resolution: Unresolved
    • Affects Version/s: None
    • Fix Version/s: None
    • Component/s: storm-kafka
    • Labels:
      None

      Description

      Creating a feature request for supporting Wildcard Topic's for Kafka Spout.

      We want to be able to run a aggregation stream for data coming from all tenants. Tenants get added dynamically. So new kafka topics get created. All the topics will be matching a regex pattern.
      example:
      clickstream:tenant1:log
      clickstream:tenant2:log
      clickstream:tenant3:log
      Storm code should be able to perform auto-discovery, and should be able to to fetch from newly created topics in run time.

        Issue Links

          Activity

          Hide
          mtreadway@verizon.com Mike Treadway added a comment -

          Hi there, I'm seeing an issue with this implementation and I'm wondering how it ever worked with Trident.

          The OpaquePartitionedTridentSpoutExecutor assumes that the partition state directories are direct children of the transaction directory. When wildcard support is enabled, the state directories are grandchildren of the transaction directory. For example, without wildcard support, it all works and the directory structure looks like this:

          {stream}/user/partition_0

          With wildcard support enabled, the partition id goes from 'partition_0' to '{topic}/partition_0', so the directory structure looks like this:
          {stream}

          /user/

          {topic}

          /partition_0

          The implementation inside OpaquePartitionedTridentSpoutExecutor.commit() assumes that the partition state is directly under user because it does '_state.list("")', which is incorrect.

          Is anyone else seeing this?

          Show
          mtreadway@verizon.com Mike Treadway added a comment - Hi there, I'm seeing an issue with this implementation and I'm wondering how it ever worked with Trident. The OpaquePartitionedTridentSpoutExecutor assumes that the partition state directories are direct children of the transaction directory. When wildcard support is enabled, the state directories are grandchildren of the transaction directory. For example, without wildcard support, it all works and the directory structure looks like this: {stream}/user/partition_0 With wildcard support enabled, the partition id goes from 'partition_0' to '{topic}/partition_0', so the directory structure looks like this: {stream} /user/ {topic} /partition_0 The implementation inside OpaquePartitionedTridentSpoutExecutor.commit() assumes that the partition state is directly under user because it does '_state.list("")', which is incorrect. Is anyone else seeing this?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user sumitchawla closed the pull request at:

          https://github.com/apache/storm/pull/561

          Show
          githubbot ASF GitHub Bot added a comment - Github user sumitchawla closed the pull request at: https://github.com/apache/storm/pull/561
          Hide
          githubbot ASF GitHub Bot added a comment -
          Show
          githubbot ASF GitHub Bot added a comment - Github user sumitchawla commented on the pull request: https://github.com/apache/storm/pull/561#issuecomment-153232953 Closing the request as its merged in https://github.com/apache/storm/commit/60d9f81ba1b16f7711c487f831f202e49eda258c
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user harshach commented on the pull request:

          https://github.com/apache/storm/pull/561#issuecomment-153214521

          @sumitchawla you can close the pr, for some reason its not closed after i merged in.

          Show
          githubbot ASF GitHub Bot added a comment - Github user harshach commented on the pull request: https://github.com/apache/storm/pull/561#issuecomment-153214521 @sumitchawla you can close the pr, for some reason its not closed after i merged in.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user harshach commented on the pull request:

          https://github.com/apache/storm/pull/561#issuecomment-153176672

          Thanks a lot for you patience on this patch @sumitchawla . I merged into master. Appreciate following upon on the reviews and many upmerges. Hoping to see more patches from you in the future

          Show
          githubbot ASF GitHub Bot added a comment - Github user harshach commented on the pull request: https://github.com/apache/storm/pull/561#issuecomment-153176672 Thanks a lot for you patience on this patch @sumitchawla . I merged into master. Appreciate following upon on the reviews and many upmerges. Hoping to see more patches from you in the future
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user sumitchawla commented on the pull request:

          https://github.com/apache/storm/pull/561#issuecomment-153157781

          Hi @harshach, merge is done and build is passing. Thanks for your review

          Show
          githubbot ASF GitHub Bot added a comment - Github user sumitchawla commented on the pull request: https://github.com/apache/storm/pull/561#issuecomment-153157781 Hi @harshach, merge is done and build is passing. Thanks for your review
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user harshach commented on the pull request:

          https://github.com/apache/storm/pull/561#issuecomment-152840039

          @sumitchawla appreciate if you can do one more upmerge. I'll merge it into master after that. Thanks

          Show
          githubbot ASF GitHub Bot added a comment - Github user harshach commented on the pull request: https://github.com/apache/storm/pull/561#issuecomment-152840039 @sumitchawla appreciate if you can do one more upmerge. I'll merge it into master after that. Thanks
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user ptgoetz commented on the pull request:

          https://github.com/apache/storm/pull/561#issuecomment-151599531

          +1

          Show
          githubbot ASF GitHub Bot added a comment - Github user ptgoetz commented on the pull request: https://github.com/apache/storm/pull/561#issuecomment-151599531 +1
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user sumitchawla commented on the pull request:

          https://github.com/apache/storm/pull/561#issuecomment-151593590

          Done another merge. @ptgoetz @revans2 @HeartSaVioR can you guys please review this change?

          Show
          githubbot ASF GitHub Bot added a comment - Github user sumitchawla commented on the pull request: https://github.com/apache/storm/pull/561#issuecomment-151593590 Done another merge. @ptgoetz @revans2 @HeartSaVioR can you guys please review this change?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user harshach commented on the pull request:

          https://github.com/apache/storm/pull/561#issuecomment-145363605

          +1

          Show
          githubbot ASF GitHub Bot added a comment - Github user harshach commented on the pull request: https://github.com/apache/storm/pull/561#issuecomment-145363605 +1
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user Parth-Brahmbhatt commented on the pull request:

          https://github.com/apache/storm/pull/561#issuecomment-143808292

          +1

          Show
          githubbot ASF GitHub Bot added a comment - Github user Parth-Brahmbhatt commented on the pull request: https://github.com/apache/storm/pull/561#issuecomment-143808292 +1
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user Parth-Brahmbhatt commented on a diff in the pull request:

          https://github.com/apache/storm/pull/561#discussion_r40577765

          — Diff: external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java —
          @@ -82,11 +82,9 @@ public static long getOffset(SimpleConsumer consumer, String topic, int partitio
          public static class KafkaOffsetMetric implements IMetric {
          Map<Partition, Long> _partitionToOffset = new HashMap<Partition, Long>();
          Set<Partition> _partitions;

          • String _topic;
            DynamicPartitionConnections _connections;
          • public KafkaOffsetMetric(String topic, DynamicPartitionConnections connections) {
          • _topic = topic;
              • End diff –

          any reason we have to delete this? I am thinking about the case where some users will have graphs setup on per topic bases and it will be broken after this change.

          Show
          githubbot ASF GitHub Bot added a comment - Github user Parth-Brahmbhatt commented on a diff in the pull request: https://github.com/apache/storm/pull/561#discussion_r40577765 — Diff: external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java — @@ -82,11 +82,9 @@ public static long getOffset(SimpleConsumer consumer, String topic, int partitio public static class KafkaOffsetMetric implements IMetric { Map<Partition, Long> _partitionToOffset = new HashMap<Partition, Long>(); Set<Partition> _partitions; String _topic; DynamicPartitionConnections _connections; public KafkaOffsetMetric(String topic, DynamicPartitionConnections connections) { _topic = topic; End diff – any reason we have to delete this? I am thinking about the case where some users will have graphs setup on per topic bases and it will be broken after this change.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user sumitchawla commented on a diff in the pull request:

          https://github.com/apache/storm/pull/561#discussion_r40497455

          — Diff: external/storm-kafka/src/jvm/storm/kafka/DynamicBrokersReader.java —
          @@ -40,6 +42,7 @@
          private CuratorFramework _curator;
          private String _zkPath;
          private String _topic;
          + private Boolean _wildcardTopic;
          — End diff –

          Thanks @Parth-Brahmbhatt , i have renamed the variable

          Show
          githubbot ASF GitHub Bot added a comment - Github user sumitchawla commented on a diff in the pull request: https://github.com/apache/storm/pull/561#discussion_r40497455 — Diff: external/storm-kafka/src/jvm/storm/kafka/DynamicBrokersReader.java — @@ -40,6 +42,7 @@ private CuratorFramework _curator; private String _zkPath; private String _topic; + private Boolean _wildcardTopic; — End diff – Thanks @Parth-Brahmbhatt , i have renamed the variable
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user Parth-Brahmbhatt commented on a diff in the pull request:

          https://github.com/apache/storm/pull/561#discussion_r40483221

          — Diff: external/storm-kafka/src/jvm/storm/kafka/DynamicBrokersReader.java —
          @@ -40,6 +42,7 @@
          private CuratorFramework _curator;
          private String _zkPath;
          private String _topic;
          + private Boolean _wildcardTopic;
          — End diff –

          Let's name this something like _isWildCardTopic

          Show
          githubbot ASF GitHub Bot added a comment - Github user Parth-Brahmbhatt commented on a diff in the pull request: https://github.com/apache/storm/pull/561#discussion_r40483221 — Diff: external/storm-kafka/src/jvm/storm/kafka/DynamicBrokersReader.java — @@ -40,6 +42,7 @@ private CuratorFramework _curator; private String _zkPath; private String _topic; + private Boolean _wildcardTopic; — End diff – Let's name this something like _isWildCardTopic
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user harshach commented on the pull request:

          https://github.com/apache/storm/pull/561#issuecomment-138753701

          @sumitchawla sorry about the long delay in reviewing this. overall this looks good to me I am going to run some tests on this patch.
          @ptgoetz @revans2 @HeartSaVioR @Parth-Brahmbhatt please take a look at it as well.

          Show
          githubbot ASF GitHub Bot added a comment - Github user harshach commented on the pull request: https://github.com/apache/storm/pull/561#issuecomment-138753701 @sumitchawla sorry about the long delay in reviewing this. overall this looks good to me I am going to run some tests on this patch. @ptgoetz @revans2 @HeartSaVioR @Parth-Brahmbhatt please take a look at it as well.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user sumitchawla commented on the pull request:

          https://github.com/apache/storm/pull/561#issuecomment-120221230

          @harshach @Parth-Brahmbhatt .. I have done another upstream merge. I have added some test cases already. Do let me know if you think something is missing

          Show
          githubbot ASF GitHub Bot added a comment - Github user sumitchawla commented on the pull request: https://github.com/apache/storm/pull/561#issuecomment-120221230 @harshach @Parth-Brahmbhatt .. I have done another upstream merge. I have added some test cases already. Do let me know if you think something is missing
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user harshach commented on the pull request:

          https://github.com/apache/storm/pull/561#issuecomment-120216042

          @sumitchawla sorry for the delay. looks like you need to do another up merge. These lot of changes we need to run some tests . Also can you add any new config you added to README.
          @Parth-Brahmbhatt can you also take a look.

          Show
          githubbot ASF GitHub Bot added a comment - Github user harshach commented on the pull request: https://github.com/apache/storm/pull/561#issuecomment-120216042 @sumitchawla sorry for the delay. looks like you need to do another up merge. These lot of changes we need to run some tests . Also can you add any new config you added to README. @Parth-Brahmbhatt can you also take a look.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user sumitchawla commented on the pull request:

          https://github.com/apache/storm/pull/561#issuecomment-120136074

          Any updates on this one?

          Show
          githubbot ASF GitHub Bot added a comment - Github user sumitchawla commented on the pull request: https://github.com/apache/storm/pull/561#issuecomment-120136074 Any updates on this one?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user sumitchawla commented on the pull request:

          https://github.com/apache/storm/pull/561#issuecomment-116774534

          @harshach change is merged. Build failed for jdk 8 due to an unrelated error. Please review and provide your comments.

          Show
          githubbot ASF GitHub Bot added a comment - Github user sumitchawla commented on the pull request: https://github.com/apache/storm/pull/561#issuecomment-116774534 @harshach change is merged. Build failed for jdk 8 due to an unrelated error. Please review and provide your comments.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user harshach commented on the pull request:

          https://github.com/apache/storm/pull/561#issuecomment-116740320

          @sumitchawla can you upmerge this I'll take a look. Thanks.

          Show
          githubbot ASF GitHub Bot added a comment - Github user harshach commented on the pull request: https://github.com/apache/storm/pull/561#issuecomment-116740320 @sumitchawla can you upmerge this I'll take a look. Thanks.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user jfgreen commented on the pull request:

          https://github.com/apache/storm/pull/561#issuecomment-116736180

          This would be super awesome to have merged. Currently having to enumerate a lot of topics into multiple spouts.

          Show
          githubbot ASF GitHub Bot added a comment - Github user jfgreen commented on the pull request: https://github.com/apache/storm/pull/561#issuecomment-116736180 This would be super awesome to have merged. Currently having to enumerate a lot of topics into multiple spouts.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user sumitchawla commented on the pull request:

          https://github.com/apache/storm/pull/561#issuecomment-106882647

          Can anybody please review this PR and provide feedback or comments?

          Show
          githubbot ASF GitHub Bot added a comment - Github user sumitchawla commented on the pull request: https://github.com/apache/storm/pull/561#issuecomment-106882647 Can anybody please review this PR and provide feedback or comments?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user sumitchawla commented on the pull request:

          https://github.com/apache/storm/pull/561#issuecomment-106096242

          How to use:
          1. Add following to your config:
          config.put("kafka.topic.wildcard.match",true);

          2. Pass a wildcard topic to KafkaSpout
          String topic = "my.clickstream.*.log";

          This will match all topics matching the above pattern:
          e.g.
          my.clickstream.1.log
          my.clickstream.2.log
          my.clickstream.3.log

          Show
          githubbot ASF GitHub Bot added a comment - Github user sumitchawla commented on the pull request: https://github.com/apache/storm/pull/561#issuecomment-106096242 How to use: 1. Add following to your config: config.put("kafka.topic.wildcard.match",true); 2. Pass a wildcard topic to KafkaSpout String topic = "my.clickstream.*.log"; This will match all topics matching the above pattern: e.g. my.clickstream.1.log my.clickstream.2.log my.clickstream.3.log
          Hide
          kabhwan Jungtaek Lim added a comment -

          Reopening this.
          Issue will be closed by committer when PR is merged to master branch, so you don't need to close this issue yourself.
          Same things applied to "Fix Version".

          Show
          kabhwan Jungtaek Lim added a comment - Reopening this. Issue will be closed by committer when PR is merged to master branch, so you don't need to close this issue yourself. Same things applied to "Fix Version".
          Hide
          sumitkchawla Sumit Chawla added a comment -

          Changes have been implemented and a pull request is opened.

          https://github.com/apache/storm/pull/561

          Show
          sumitkchawla Sumit Chawla added a comment - Changes have been implemented and a pull request is opened. https://github.com/apache/storm/pull/561
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user sumitchawla opened a pull request:

          https://github.com/apache/storm/pull/561

          STORM-817: Support for Kafka Wildcard topics

          Support for Wildcard match of topics based on Config - "kafka.topic.wildcard.match". If this config is specified then Topics matching the pattern are picked for listening to messages.

          Implementation Details:
          1. DynamicBrokerReader now returns a list of GlobalPartitionInformation. Each object is tied to a topic.
          2. Partition object now has a topic. Each parition belongs to a specific topic
          3. DynamicPartitonConnections keeps a map of topic_partiton registration
          4. Kafka Metrics are written per topic.
          5. PartitionManager uses Partition.topic for calculating commit path. Commit path for wildcard topic scenario will be _spoutConfig.id/topic_ABC/partition_0; For non-wildcard use, the path will be _spoutConfig.id/partition_0. This way the change is backward compatible with previously committed ZK offsets.
          6. New Test case DynamicBrokerReaderTests. testGetBrokerInfoWildcardMatch for testing topic discovery.

          7. Manually tested Kafka Single Topic, Wildcard topics, Trident Topic, Wildcard topics.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/sumitchawla/storm master

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/storm/pull/561.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #561


          commit 455435adffd873d98eedc77f428b7d76f94f8bba
          Author: Sumit Chawla <sumitkchawla@gmail.com>
          Date: 2015-05-22T21:38:30Z

          STORM-817: Support for Kafka Wildcard topics


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user sumitchawla opened a pull request: https://github.com/apache/storm/pull/561 STORM-817 : Support for Kafka Wildcard topics Support for Wildcard match of topics based on Config - "kafka.topic.wildcard.match". If this config is specified then Topics matching the pattern are picked for listening to messages. Implementation Details: 1. DynamicBrokerReader now returns a list of GlobalPartitionInformation. Each object is tied to a topic. 2. Partition object now has a topic. Each parition belongs to a specific topic 3. DynamicPartitonConnections keeps a map of topic_partiton registration 4. Kafka Metrics are written per topic. 5. PartitionManager uses Partition.topic for calculating commit path. Commit path for wildcard topic scenario will be _spoutConfig.id/topic_ABC/partition_0; For non-wildcard use, the path will be _spoutConfig.id/partition_0. This way the change is backward compatible with previously committed ZK offsets. 6. New Test case DynamicBrokerReaderTests. testGetBrokerInfoWildcardMatch for testing topic discovery. 7. Manually tested Kafka Single Topic, Wildcard topics, Trident Topic, Wildcard topics. You can merge this pull request into a Git repository by running: $ git pull https://github.com/sumitchawla/storm master Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/561.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #561 commit 455435adffd873d98eedc77f428b7d76f94f8bba Author: Sumit Chawla <sumitkchawla@gmail.com> Date: 2015-05-22T21:38:30Z STORM-817 : Support for Kafka Wildcard topics
          Hide
          sumitkchawla Sumit Chawla added a comment -

          Ya... Even i have started digging on the same lines.

          Show
          sumitkchawla Sumit Chawla added a comment - Ya... Even i have started digging on the same lines.
          Hide
          zhuoliu Zhuo Liu added a comment - - edited

          This use case looks quite interesting and useful.
          Thus the KafkaSpout needs to intermittently check zookeeper for matching topics, and then build new connection partitions. Some failure/security issues may need to be considered (e.g., some topics may match the wildcard but not really what the Spout needs).
          In terms of implementation, the current ZkCoordinator and DynamicPartitionConnections can support dynamic partition/connection updates.
          I guess the size of it will be reasonable. Sure.

          This issue is related to storm-392:
          A "default" high-level kafka consumer has the ability to read from multiple topics using a WhiteList notation (which is basically just a regex). It would be great to have that ability in KafkaSpout too.

          Show
          zhuoliu Zhuo Liu added a comment - - edited This use case looks quite interesting and useful. Thus the KafkaSpout needs to intermittently check zookeeper for matching topics, and then build new connection partitions. Some failure/security issues may need to be considered (e.g., some topics may match the wildcard but not really what the Spout needs). In terms of implementation, the current ZkCoordinator and DynamicPartitionConnections can support dynamic partition/connection updates. I guess the size of it will be reasonable. Sure. This issue is related to storm-392: A "default" high-level kafka consumer has the ability to read from multiple topics using a WhiteList notation (which is basically just a regex). It would be great to have that ability in KafkaSpout too.
          Hide
          sumitkchawla Sumit Chawla added a comment - - edited

          Hi Zhuo

          My use case matches "a". We have multiple topics separated by tenant namespaces. We want to be able to run a aggregation stream for data coming from all tenants. Tenants get added dynamically. So new kafka topics get created. All the topics will be matching a regex pattern.
          example:
          clickstream:tenant1:log
          clickstream:tenant2:log
          clickstream:tenant3:log

          Storm code should be able to perform auto-discovery, and should be able to to fetch from newly created topics in run time.

          What do you think will be the size of this effort? I am also willing to put in sometime in feature. May be we can collaborate on this.

          Show
          sumitkchawla Sumit Chawla added a comment - - edited Hi Zhuo My use case matches "a". We have multiple topics separated by tenant namespaces. We want to be able to run a aggregation stream for data coming from all tenants. Tenants get added dynamically. So new kafka topics get created. All the topics will be matching a regex pattern. example: clickstream:tenant1:log clickstream:tenant2:log clickstream:tenant3:log Storm code should be able to perform auto-discovery, and should be able to to fetch from newly created topics in run time. What do you think will be the size of this effort? I am also willing to put in sometime in feature. May be we can collaborate on this.
          Hide
          zhuoliu Zhuo Liu added a comment -

          Hi Sumit, would you please give a bit more description about this issue?
          a. Does the wildcard feature means: "user passes a wildcard string in config (e.g., '^WordCount'), then the KafkaSpout may find multiple matching topics, thus multiple streams need to be constructed for reading messages from Kafka's multiple topics." ?
          b. If <a> is what is described in this issue, does it have any real use cases?
          c. Or you mean a WildcardTopicSelector to choose among multiple matching topics (then finally choose one of them for data input)?
          Thanks.

          Show
          zhuoliu Zhuo Liu added a comment - Hi Sumit, would you please give a bit more description about this issue? a. Does the wildcard feature means: "user passes a wildcard string in config (e.g., '^WordCount'), then the KafkaSpout may find multiple matching topics, thus multiple streams need to be constructed for reading messages from Kafka's multiple topics." ? b. If <a> is what is described in this issue, does it have any real use cases? c. Or you mean a WildcardTopicSelector to choose among multiple matching topics (then finally choose one of them for data input)? Thanks.

            People

            • Assignee:
              sumitkchawla Sumit Chawla
              Reporter:
              sumitkchawla Sumit Chawla
            • Votes:
              1 Vote for this issue
              Watchers:
              8 Start watching this issue

              Dates

              • Created:
                Updated:

                Development