Kafka
  1. Kafka
  2. KAFKA-1180

WhiteList topic filter gets a NullPointerException on complex Regex

    Details

    • Type: Bug Bug
    • Status: Patch Available
    • Priority: Major Major
    • Resolution: Unresolved
    • Affects Version/s: 0.8.0
    • Fix Version/s: 0.8.2
    • Component/s: consumer
    • Labels:
      None

      Description

      We are needing to create a stream selector that essentially combines the logic of the BlackList and WhiteList classes (which is not easily exposed in the high-level consumer api). That is, we want to select a topic that contains a certain prefix, as long as it doesn't also contain a secondary string.

      This should be easy to do with ordinary java Regex's, but we're running into some issues, trying to do this with the WhiteList class only.

      We have a pattern that uses negative lookahead, like this:

      "test-(?!bad
      b)\\w+"

      So this should select a topic like: "test-good", but exclude a topic like "test-bad", and also exclude a topic without the "test" prefix, like "foo-bar".

      Instead, what we see is a NullPointerException in the call to createMessageStreamsByFilter (after having previously sent a message to "test-good" followed by a message to "test-bad"):

      21700 [ConsumerFetcherThread-group1_square-1a7ac0.local-1386869343370-dc19c7dc-0-1946108683] ERROR kafka.consumer.ConsumerFetcherThread - [ConsumerFetcherThread-group1_square-1a7ac0.local-1386869343370-dc19c7dc-0-1946108683], Error due to
      kafka.common.KafkaException: error processing data for partition [test-bad,0] offset 0
      at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:137)
      at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:109)
      at scala.collection.immutable.Map$Map1.foreach(Map.scala:105)
      at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:109)
      at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:109)
      at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:109)
      at kafka.utils.Utils$.inLock(Utils.scala:565)
      at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:108)
      at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:86)
      at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
      Caused by: java.lang.NullPointerException
      at kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60)
      at kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49)
      at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:128)
      ... 9 more

      1. KAFKA-1180.patch
        2 kB
        Joe Stein
      2. KAFKA-1180_2013-12-22_01:24:57.patch
        2 kB
        Joe Stein
      3. KAFKA-1180.patch
        2 kB
        Joe Stein
      4. KAFKA-1180_2014-02-09_20:21:49.patch
        4 kB
        Joe Stein
      5. KAFKA-1180_2014-02-13_14:50:40.patch
        9 kB
        Joe Stein
      6. KAFKA-1180_2014-02-13_15:13:17.patch
        5 kB
        Joe Stein
      7. KAFKA-1180_2014-02-13_15:21:51.patch
        4 kB
        Joe Stein
      8. KAFKA-1180_2014-02-13_15:23:28.patch
        4 kB
        Joe Stein

        Activity

        Hide
        Jason Rosenberg added a comment -

        Here's some code which reproduces the issue. Assume zkConnect points to a running zk cluster (and there's also a running kafka instance using the same zkConnect), and also that kafka is running on localhost, and using using 'metadataport':

        List<KeyedMessage<Integer, String>> data = ImmutableList.of(
        new KeyedMessage<Integer, String>("test-topic", "test-message1"),
        new KeyedMessage<Integer, String>("test-bad", "test-message2")),
        String regex = "test-(?!bad
        b)\\w+",

        Properties pProps = new Properties();
        pProps.put("metadata.broker.list", "localhost:" + metadataport);
        pProps.put("serializer.class", "kafka.serializer.StringEncoder");
        ProducerConfig pConfig = new ProducerConfig(pProps);
        Producer<Integer, String> producer = new Producer<Integer, String>(pConfig);
        for (KeyedMessage<Integer, String> data : toSend)

        { System.out.println("write"); producer.send(data); }

        producer.close();

        Properties cProps = new Properties();
        cProps.put("zookeeper.connect", zkConnect);
        cProps.put("group.id", "group1");
        cProps.put("auto.offset.reset", OffsetRequest.SmallestTimeString());
        ConsumerConfig consumerConfig = new ConsumerConfig(cProps);
        ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);

        List<KafkaStream<byte[], byte[]>> streams =
        consumerConnector.createMessageStreamsByFilter(new Whitelist(regex), 1);
        System.out.println("create streams");

        Show
        Jason Rosenberg added a comment - Here's some code which reproduces the issue. Assume zkConnect points to a running zk cluster (and there's also a running kafka instance using the same zkConnect), and also that kafka is running on localhost, and using using 'metadataport': List<KeyedMessage<Integer, String>> data = ImmutableList.of( new KeyedMessage<Integer, String>("test-topic", "test-message1"), new KeyedMessage<Integer, String>("test-bad", "test-message2")), String regex = "test-(?!bad b) \\w +", Properties pProps = new Properties(); pProps.put("metadata.broker.list", "localhost:" + metadataport); pProps.put("serializer.class", "kafka.serializer.StringEncoder"); ProducerConfig pConfig = new ProducerConfig(pProps); Producer<Integer, String> producer = new Producer<Integer, String>(pConfig); for (KeyedMessage<Integer, String> data : toSend) { System.out.println("write"); producer.send(data); } producer.close(); Properties cProps = new Properties(); cProps.put("zookeeper.connect", zkConnect); cProps.put("group.id", "group1"); cProps.put("auto.offset.reset", OffsetRequest.SmallestTimeString()); ConsumerConfig consumerConfig = new ConsumerConfig(cProps); ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig); List<KafkaStream<byte[], byte[]>> streams = consumerConnector.createMessageStreamsByFilter(new Whitelist(regex), 1); System.out.println("create streams");
        Hide
        Joe Stein added a comment -

        That code is all in TopicFilter, can you writeup a StreamSelector that extends TopicFilter(rawRegex) or another TopicFilter?

        case class Whitelist(rawRegex: String) extends TopicFilter(rawRegex) {
        override def requiresTopicEventWatcher = !regex.matches("""[\p

        {Alnum}

        -|]+""")

        override def isTopicAllowed(topic: String) =

        { val allowed = topic.matches(regex) debug("%s %s".format( topic, if (allowed) "allowed" else "filtered")) allowed }

        }

        case class Blacklist(rawRegex: String) extends TopicFilter(rawRegex) {
        override def requiresTopicEventWatcher = true

        override def isTopicAllowed(topic: String) =

        { val allowed = !topic.matches(regex) debug("%s %s".format( topic, if (allowed) "allowed" else "filtered")) allowed }

        if can patch I can help it get upstream, might have a chance to get some to-do this in the next week or two

        Show
        Joe Stein added a comment - That code is all in TopicFilter, can you writeup a StreamSelector that extends TopicFilter(rawRegex) or another TopicFilter? case class Whitelist(rawRegex: String) extends TopicFilter(rawRegex) { override def requiresTopicEventWatcher = !regex.matches("""[\p {Alnum} -|]+""") override def isTopicAllowed(topic: String) = { val allowed = topic.matches(regex) debug("%s %s".format( topic, if (allowed) "allowed" else "filtered")) allowed } } case class Blacklist(rawRegex: String) extends TopicFilter(rawRegex) { override def requiresTopicEventWatcher = true override def isTopicAllowed(topic: String) = { val allowed = !topic.matches(regex) debug("%s %s".format( topic, if (allowed) "allowed" else "filtered")) allowed } if can patch I can help it get upstream, might have a chance to get some to-do this in the next week or two
        Hide
        Jason Rosenberg added a comment -

        Joe, I'm not sure I understand your suggestion? We are calling the WhiteList constructor from java. Unfortunately, it does not look like there is an easy way to sub-class Whitelist (or even TopicFilter) from java.

        Anyway, the bottom line, is that that regex I used in the example above causes an NPE, can you reproduce that? I think that's the main issue in the bug. Going beyond that, it would be great if WhiteList allowed arbitrary Regex's, that can you negative lookahead, etc., to allow complex patterns. Or alternatively, a separate filter class, which is more general purpose.

        There is some code I'm uncertain about in that WhiteList class (I am really not clear on what the distinction between 'regex' and 'rawRegex' is, and the business around 'requiresTopicEventWatcher'?).

        Show
        Jason Rosenberg added a comment - Joe, I'm not sure I understand your suggestion? We are calling the WhiteList constructor from java. Unfortunately, it does not look like there is an easy way to sub-class Whitelist (or even TopicFilter) from java. Anyway, the bottom line, is that that regex I used in the example above causes an NPE, can you reproduce that? I think that's the main issue in the bug. Going beyond that, it would be great if WhiteList allowed arbitrary Regex's, that can you negative lookahead, etc., to allow complex patterns. Or alternatively, a separate filter class, which is more general purpose. There is some code I'm uncertain about in that WhiteList class (I am really not clear on what the distinction between 'regex' and 'rawRegex' is, and the business around 'requiresTopicEventWatcher'?).
        Hide
        Joe Stein added a comment -
        Show
        Joe Stein added a comment - Created reviewboard https://reviews.apache.org/r/16425/
        Hide
        Joe Stein added a comment -

        reproduced

        [2013-12-22 01:07:15,984] ERROR [ConsumerFetcherThread-console-consumer-55777_Joes-MacBook-Air.local-1387692435196-f52625ba-0-0], Error due to (kafka.consumer.ConsumerFetcherThread)
        kafka.common.KafkaException: error processing data for partition [test-bad,0] offset 0
        at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:137)
        at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:109)
        at scala.collection.immutable.Map$Map4.foreach(Map.scala:180)
        at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:109)
        at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:109)
        at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:109)
        at kafka.utils.Utils$.inLock(Utils.scala:565)
        at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:108)
        at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:86)
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
        Caused by: java.lang.NullPointerException
        at kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60)
        at kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49)
        at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:128)
        ... 9 more

        Show
        Joe Stein added a comment - reproduced [2013-12-22 01:07:15,984] ERROR [ConsumerFetcherThread-console-consumer-55777_Joes-MacBook-Air.local-1387692435196-f52625ba-0-0] , Error due to (kafka.consumer.ConsumerFetcherThread) kafka.common.KafkaException: error processing data for partition [test-bad,0] offset 0 at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:137) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:109) at scala.collection.immutable.Map$Map4.foreach(Map.scala:180) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:109) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:109) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:109) at kafka.utils.Utils$.inLock(Utils.scala:565) at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:108) at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:86) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) Caused by: java.lang.NullPointerException at kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60) at kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:128) ... 9 more
        Hide
        Joe Stein added a comment -
        Show
        Joe Stein added a comment - Updated reviewboard https://reviews.apache.org/r/16425/
        Hide
        Jason Rosenberg added a comment -

        So, is this patch applicable to the 0.8 branch? I assume the fix will be committed, only to trunk / 0.8.1?

        Show
        Jason Rosenberg added a comment - So, is this patch applicable to the 0.8 branch? I assume the fix will be committed, only to trunk / 0.8.1?
        Hide
        Joe Stein added a comment -

        Hi Jason, this patch was developed for you to try out against 0.8 branch, yes.

        I would +1 on a 0.8.0.1 release with this fix (and anything else spring up) as it would also resolve the 2.8.0 zero length maven issue too (as that is just build related so two JIRA tickets). I understand if folks didn't want to switch scala versions in production also along with making other changes in case something went wrong and what was the cause. 2.8.0 should be supported in maven but I still think (need to throw a documentation patch up for this) we should document sbt and maven using scala 2.10 (conflating JIRAs some here, sorry).

        I haven't tried the patch yet on 0.8.1 as I wanted to 100% make sure first this resolved your issue and that you didn't hit another issue after this was fixed.

        0.8.1 might not be too far away also and with release time and testing maybe 0.8.1 comes out within the same time frame the community would be looking for 0.8.0.1. I haven't tried the Log Compaction stuff in 0.8.1 yet but it looks really awesome https://cwiki.apache.org/confluence/display/KAFKA/Log+Compaction will do that when I try this patch on 0.8.1 once you confirm it works for you as expected.

        Show
        Joe Stein added a comment - Hi Jason, this patch was developed for you to try out against 0.8 branch, yes. I would +1 on a 0.8.0.1 release with this fix (and anything else spring up) as it would also resolve the 2.8.0 zero length maven issue too (as that is just build related so two JIRA tickets). I understand if folks didn't want to switch scala versions in production also along with making other changes in case something went wrong and what was the cause. 2.8.0 should be supported in maven but I still think (need to throw a documentation patch up for this) we should document sbt and maven using scala 2.10 (conflating JIRAs some here, sorry). I haven't tried the patch yet on 0.8.1 as I wanted to 100% make sure first this resolved your issue and that you didn't hit another issue after this was fixed. 0.8.1 might not be too far away also and with release time and testing maybe 0.8.1 comes out within the same time frame the community would be looking for 0.8.0.1. I haven't tried the Log Compaction stuff in 0.8.1 yet but it looks really awesome https://cwiki.apache.org/confluence/display/KAFKA/Log+Compaction will do that when I try this patch on 0.8.1 once you confirm it works for you as expected.
        Hide
        Jason Rosenberg added a comment -

        Joe Stein I've applied this patch against the 0.8 branch, and can confirm that it seems to solve the issue we've been having. Please go ahead and add it to the 0.8.1 branch. I'm not sure it makes sense at this point to add a 0.8.0.1 release, but if you did, I would use that (for now, I'll run with my own patched 0.8.0 version).

        Thanks!

        Show
        Jason Rosenberg added a comment - Joe Stein I've applied this patch against the 0.8 branch, and can confirm that it seems to solve the issue we've been having. Please go ahead and add it to the 0.8.1 branch. I'm not sure it makes sense at this point to add a 0.8.0.1 release, but if you did, I would use that (for now, I'll run with my own patched 0.8.0 version). Thanks!
        Hide
        Joe Stein added a comment -

        The KAFKA-1180_2013-12-22_01:24:57.patch is for the 0.8 branch

        Show
        Joe Stein added a comment - The KAFKA-1180 _2013-12-22_01:24:57.patch is for the 0.8 branch
        Hide
        Joe Stein added a comment -

        Created reviewboard https://reviews.apache.org/r/16718/
        against branch origin/trunk

        Show
        Joe Stein added a comment - Created reviewboard https://reviews.apache.org/r/16718/ against branch origin/trunk
        Show
        Joe Stein added a comment - Trunk patch = https://issues.apache.org/jira/secure/attachment/12621898/KAFKA-1180.patch
        Hide
        Jason Rosenberg added a comment -

        Any updates on getting this merged into trunk, in time for 0.8.1?

        Show
        Jason Rosenberg added a comment - Any updates on getting this merged into trunk, in time for 0.8.1?
        Hide
        Joe Stein added a comment -

        I have the changes done requested in the review, I need to dig them out of my branch and upload them for follow-up. Should be able to-do that before Monday.

        Show
        Joe Stein added a comment - I have the changes done requested in the review, I need to dig them out of my branch and upload them for follow-up. Should be able to-do that before Monday.
        Hide
        Joe Stein added a comment -

        Updated reviewboard https://reviews.apache.org/r/16718/
        against branch origin/0.8.1

        Show
        Joe Stein added a comment - Updated reviewboard https://reviews.apache.org/r/16718/ against branch origin/0.8.1
        Hide
        Joe Stein added a comment -

        Updated reviewboard https://reviews.apache.org/r/16718/
        against branch origin/0.8.1

        Show
        Joe Stein added a comment - Updated reviewboard https://reviews.apache.org/r/16718/ against branch origin/0.8.1
        Hide
        Joe Stein added a comment -

        The last patch uploaded just now is rebased on 0.8.1 and added blacklist test case that was missing too

        Show
        Joe Stein added a comment - The last patch uploaded just now is rebased on 0.8.1 and added blacklist test case that was missing too
        Hide
        Joe Stein added a comment -

        Updated reviewboard https://reviews.apache.org/r/16718/
        against branch origin/0.8.1

        Show
        Joe Stein added a comment - Updated reviewboard https://reviews.apache.org/r/16718/ against branch origin/0.8.1
        Hide
        Joe Stein added a comment -

        Updated reviewboard https://reviews.apache.org/r/16718/
        against branch origin/0.8.1

        Show
        Joe Stein added a comment - Updated reviewboard https://reviews.apache.org/r/16718/ against branch origin/0.8.1
        Hide
        Joe Stein added a comment -

        Updated reviewboard https://reviews.apache.org/r/16718/
        against branch origin/0.8.1

        Show
        Joe Stein added a comment - Updated reviewboard https://reviews.apache.org/r/16718/ against branch origin/0.8.1
        Hide
        Joe Stein added a comment -

        version 3,4,5 were some issue with my local branches. cleaned that up and fixed in version 6 (latest) which is reviewable now

        added test cases and comments reviewed by Joel & Neha and missing Blacklist test reviewed by Guozhang

        Show
        Joe Stein added a comment - version 3,4,5 were some issue with my local branches. cleaned that up and fixed in version 6 (latest) which is reviewable now added test cases and comments reviewed by Joel & Neha and missing Blacklist test reviewed by Guozhang
        Hide
        Jason Rosenberg added a comment -

        is it too late to reconsider for 0.8.1?
        will there at least be a patch applicable for 0.8.1?

        Show
        Jason Rosenberg added a comment - is it too late to reconsider for 0.8.1? will there at least be a patch applicable for 0.8.1?
        Hide
        Jason Rosenberg added a comment -

        Joe Stein What's the status of this? Will it be available only in 0.8.2? If so, will there be a patch we can apply to 0.8.1 and 0.8.1.1? Will the patch previously supplied for 0.8.0 work with 0.8.1.*?

        We are currently using 0.8.0 with your patch. We'd like to upgrade to 0.8.1, but would need to have the patch available before doing so!

        Thanks,

        Jason

        Show
        Jason Rosenberg added a comment - Joe Stein What's the status of this? Will it be available only in 0.8.2? If so, will there be a patch we can apply to 0.8.1 and 0.8.1.1? Will the patch previously supplied for 0.8.0 work with 0.8.1.*? We are currently using 0.8.0 with your patch. We'd like to upgrade to 0.8.1, but would need to have the patch available before doing so! Thanks, Jason

          People

          • Assignee:
            Neha Narkhede
            Reporter:
            Jason Rosenberg
          • Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

            • Created:
              Updated:

              Development