Kafka
  1. Kafka
  2. KAFKA-1180

WhiteList topic filter gets a NullPointerException on complex Regex

    Details

    • Type: Bug Bug
    • Status: Resolved
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: 0.8.0
    • Fix Version/s: 0.8.2.0
    • Component/s: consumer
    • Labels:
      None

      Description

      We are needing to create a stream selector that essentially combines the logic of the BlackList and WhiteList classes (which is not easily exposed in the high-level consumer api). That is, we want to select a topic that contains a certain prefix, as long as it doesn't also contain a secondary string.

      This should be easy to do with ordinary java Regex's, but we're running into some issues, trying to do this with the WhiteList class only.

      We have a pattern that uses negative lookahead, like this:

      "test-(?!bad
      b)\\w+"

      So this should select a topic like: "test-good", but exclude a topic like "test-bad", and also exclude a topic without the "test" prefix, like "foo-bar".

      Instead, what we see is a NullPointerException in the call to createMessageStreamsByFilter (after having previously sent a message to "test-good" followed by a message to "test-bad"):

      21700 [ConsumerFetcherThread-group1_square-1a7ac0.local-1386869343370-dc19c7dc-0-1946108683] ERROR kafka.consumer.ConsumerFetcherThread - [ConsumerFetcherThread-group1_square-1a7ac0.local-1386869343370-dc19c7dc-0-1946108683], Error due to
      kafka.common.KafkaException: error processing data for partition [test-bad,0] offset 0
      at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:137)
      at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:109)
      at scala.collection.immutable.Map$Map1.foreach(Map.scala:105)
      at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:109)
      at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:109)
      at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:109)
      at kafka.utils.Utils$.inLock(Utils.scala:565)
      at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:108)
      at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:86)
      at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
      Caused by: java.lang.NullPointerException
      at kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60)
      at kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49)
      at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:128)
      ... 9 more

      1. apply-patch-1180-to-0.8.1.patch
        2 kB
        Jason Rosenberg
      2. KAFKA-1180_2013-12-22_01:24:57.patch
        2 kB
        Joe Stein
      3. KAFKA-1180_2014-02-09_20:21:49.patch
        4 kB
        Joe Stein
      4. KAFKA-1180_2014-02-13_14:50:40.patch
        9 kB
        Joe Stein
      5. KAFKA-1180_2014-02-13_15:13:17.patch
        5 kB
        Joe Stein
      6. KAFKA-1180_2014-02-13_15:21:51.patch
        4 kB
        Joe Stein
      7. KAFKA-1180_2014-02-13_15:23:28.patch
        4 kB
        Joe Stein
      8. KAFKA-1180.patch
        2 kB
        Joe Stein
      9. KAFKA-1180.patch
        2 kB
        Joe Stein

        Activity

        Jason Rosenberg created issue -
        Jason Rosenberg made changes -
        Field Original Value New Value
        Description We are needing to create a stream selector that essentially combines the logic of the BlackList and WhiteList classes (which is not easily exposed in the high-level consumer api). That is, we want to select a topic that contains a certain prefix, as long as it doesn't also contain a secondary string.

        This should be easy to do with ordinary java Regex's, but we're running into some issues, trying to do this with the WhiteList class only.

        We have a pattern that uses negative lookahead, like this:

        "test-(?!bad\\b)[\\w]+"

        So this should select a topic like: "test-good", but exclude a topic like "test-bad", and also exclude a topic without the "test" prefix, like "foo-bar".

        Instead, what we see is a NullPointerException in the ConsumerIterator, and the consumer just hangs, if we send a message like "test-topic" followed by "test-bad":

        21700 [ConsumerFetcherThread-group1_square-1a7ac0.local-1386869343370-dc19c7dc-0-1946108683] ERROR kafka.consumer.ConsumerFetcherThread - [ConsumerFetcherThread-group1_square-1a7ac0.local-1386869343370-dc19c7dc-0-1946108683], Error due to
        kafka.common.KafkaException: error processing data for partition [test-bad,0] offset 0
        at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:137)
        at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:109)
        at scala.collection.immutable.Map$Map1.foreach(Map.scala:105)
        at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:109)
        at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:109)
        at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:109)
        at kafka.utils.Utils$.inLock(Utils.scala:565)
        at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:108)
        at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:86)
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
        Caused by: java.lang.NullPointerException
        at kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60)
        at kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49)
        at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:128)
        ... 9 more
        We are needing to create a stream selector that essentially combines the logic of the BlackList and WhiteList classes (which is not easily exposed in the high-level consumer api). That is, we want to select a topic that contains a certain prefix, as long as it doesn't also contain a secondary string.

        This should be easy to do with ordinary java Regex's, but we're running into some issues, trying to do this with the WhiteList class only.

        We have a pattern that uses negative lookahead, like this:

        "test-(?!bad\\b)[\\w]+"

        So this should select a topic like: "test-good", but exclude a topic like "test-bad", and also exclude a topic without the "test" prefix, like "foo-bar".

        Instead, what we see is a NullPointerException in the call to createMessageStreamsByFilter:

        21700 [ConsumerFetcherThread-group1_square-1a7ac0.local-1386869343370-dc19c7dc-0-1946108683] ERROR kafka.consumer.ConsumerFetcherThread - [ConsumerFetcherThread-group1_square-1a7ac0.local-1386869343370-dc19c7dc-0-1946108683], Error due to
        kafka.common.KafkaException: error processing data for partition [test-bad,0] offset 0
        at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:137)
        at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:109)
        at scala.collection.immutable.Map$Map1.foreach(Map.scala:105)
        at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:109)
        at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:109)
        at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:109)
        at kafka.utils.Utils$.inLock(Utils.scala:565)
        at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:108)
        at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:86)
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
        Caused by: java.lang.NullPointerException
        at kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60)
        at kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49)
        at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:128)
        ... 9 more
        Jason Rosenberg made changes -
        Description We are needing to create a stream selector that essentially combines the logic of the BlackList and WhiteList classes (which is not easily exposed in the high-level consumer api). That is, we want to select a topic that contains a certain prefix, as long as it doesn't also contain a secondary string.

        This should be easy to do with ordinary java Regex's, but we're running into some issues, trying to do this with the WhiteList class only.

        We have a pattern that uses negative lookahead, like this:

        "test-(?!bad\\b)[\\w]+"

        So this should select a topic like: "test-good", but exclude a topic like "test-bad", and also exclude a topic without the "test" prefix, like "foo-bar".

        Instead, what we see is a NullPointerException in the call to createMessageStreamsByFilter:

        21700 [ConsumerFetcherThread-group1_square-1a7ac0.local-1386869343370-dc19c7dc-0-1946108683] ERROR kafka.consumer.ConsumerFetcherThread - [ConsumerFetcherThread-group1_square-1a7ac0.local-1386869343370-dc19c7dc-0-1946108683], Error due to
        kafka.common.KafkaException: error processing data for partition [test-bad,0] offset 0
        at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:137)
        at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:109)
        at scala.collection.immutable.Map$Map1.foreach(Map.scala:105)
        at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:109)
        at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:109)
        at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:109)
        at kafka.utils.Utils$.inLock(Utils.scala:565)
        at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:108)
        at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:86)
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
        Caused by: java.lang.NullPointerException
        at kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60)
        at kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49)
        at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:128)
        ... 9 more
        We are needing to create a stream selector that essentially combines the logic of the BlackList and WhiteList classes (which is not easily exposed in the high-level consumer api). That is, we want to select a topic that contains a certain prefix, as long as it doesn't also contain a secondary string.

        This should be easy to do with ordinary java Regex's, but we're running into some issues, trying to do this with the WhiteList class only.

        We have a pattern that uses negative lookahead, like this:

        "test-(?!bad\\b)[\\w]+"

        So this should select a topic like: "test-good", but exclude a topic like "test-bad", and also exclude a topic without the "test" prefix, like "foo-bar".

        Instead, what we see is a NullPointerException in the call to createMessageStreamsByFilter (after having previously sent a message to "test-good" followed by a message to "test-bad"):

        21700 [ConsumerFetcherThread-group1_square-1a7ac0.local-1386869343370-dc19c7dc-0-1946108683] ERROR kafka.consumer.ConsumerFetcherThread - [ConsumerFetcherThread-group1_square-1a7ac0.local-1386869343370-dc19c7dc-0-1946108683], Error due to
        kafka.common.KafkaException: error processing data for partition [test-bad,0] offset 0
        at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:137)
        at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:109)
        at scala.collection.immutable.Map$Map1.foreach(Map.scala:105)
        at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:109)
        at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:109)
        at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:109)
        at kafka.utils.Utils$.inLock(Utils.scala:565)
        at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:108)
        at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:86)
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
        Caused by: java.lang.NullPointerException
        at kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60)
        at kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49)
        at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:128)
        ... 9 more
        Joe Stein made changes -
        Attachment KAFKA-1180.patch [ 12619873 ]
        Joe Stein made changes -
        Attachment KAFKA-1180_2013-12-22_01:24:57.patch [ 12620067 ]
        Joe Stein made changes -
        Status Open [ 1 ] Patch Available [ 10002 ]
        Joe Stein made changes -
        Fix Version/s 0.8.1 [ 12322960 ]
        Joe Stein made changes -
        Attachment KAFKA-1180.patch [ 12621898 ]
        Joe Stein made changes -
        Attachment KAFKA-1180_2014-02-09_20:21:49.patch [ 12627907 ]
        Joe Stein made changes -
        Attachment KAFKA-1180_2014-02-13_14:50:40.patch [ 12628831 ]
        Joe Stein made changes -
        Attachment KAFKA-1180_2014-02-13_15:13:17.patch [ 12628836 ]
        Joe Stein made changes -
        Attachment KAFKA-1180_2014-02-13_15:21:51.patch [ 12628839 ]
        Joe Stein made changes -
        Attachment KAFKA-1180_2014-02-13_15:23:28.patch [ 12628840 ]
        Joe Stein made changes -
        Fix Version/s 0.8.2 [ 12326167 ]
        Fix Version/s 0.8.1 [ 12322960 ]
        Jason Rosenberg made changes -
        Attachment apply-patch-1180-to-0.8.1.patch [ 12655482 ]
        Neha Narkhede made changes -
        Assignee Neha Narkhede [ nehanarkhede ] Joel Koshy [ jjkoshy ]
        Joe Stein made changes -
        Assignee Joel Koshy [ jjkoshy ] Joe Stein [ joestein ]
        Joe Stein made changes -
        Status Patch Available [ 10002 ] Resolved [ 5 ]
        Resolution Fixed [ 1 ]

          People

          • Assignee:
            Joe Stein
            Reporter:
            Jason Rosenberg
          • Votes:
            0 Vote for this issue
            Watchers:
            6 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development