Uploaded image for project: 'Apache NiFi'
  1. Apache NiFi
  2. NIFI-3363

PutKafka throws NullPointerException when User-Defined partition strategy is used

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 1.0.0, 0.4.0, 0.5.0, 0.6.0, 0.4.1, 0.5.1, 0.7.0, 0.6.1, 1.1.0, 0.7.1, 1.1.1, 1.0.1
    • 0.8.0, 1.2.0, 0.7.3
    • Extensions
    • None

    Description

      NullPointerException is thrown because PutKafka tries to put null into properties since following if statements doesn't cover USER_DEFINED_PARTITIONING.

      PutKafka.java buildKafkaConfigProperties
      String partitionStrategy = context.getProperty(PARTITION_STRATEGY).getValue();
      String partitionerClass = null;
      if (partitionStrategy.equalsIgnoreCase(ROUND_ROBIN_PARTITIONING.getValue())) {
          partitionerClass = Partitioners.RoundRobinPartitioner.class.getName();
      } else if (partitionStrategy.equalsIgnoreCase(RANDOM_PARTITIONING.getValue())) {
          partitionerClass = Partitioners.RandomPartitioner.class.getName();
      }
      properties.setProperty("partitioner.class", partitionerClass); // Happens here
      

      A naive fix for this would be adding one more if statement so that it puts 'partitioner.class' property only if partitionerClass is set.

      However, while I was testing the fix, I found following facts, that revealed this approach wouldn't be the right solution for this issue.

      In short, we don't have to set 'partitioner.class' property with Kafka 0.8.x client in the first place. I assume it's there because PutKafka came through a long history..

      PutKafka history analysis

      • PutKafka used to cover Kafka 0.8 and 0.9
      • Around the time Kafka 0.9 was released, PutKafka added 'partitioner.class' via NIFI-1097: start using new API. There were two client libraries kafka-clients and kafka_2.9.1, both 0.8.2.2.
      • Then PublishKafka is added for Kafka 0.9, at this point, we could add 'partition' property to PublishKafka, but we didn't do that for some reason. PublishKafka doesn't support user defined partition as of this writing. NIFI-1296.
      • The code adding 'partitioner.class' has been left in PutKafka.
      • Further, we separated nar into 0.8, 0.9 and 0.10.
      • Now only PutKafka(0.8) uses 'partitioner.class' property, but 0.8 client doesn't use that property. So we don't need that code at all.

      Then, how should we fix this?

      Since PutKafka in both master and 0.x branch specifically uses Kafka 0.8.x client. We can simply remove the codes adding 'partitioner.class', probably PARTITION_STRATEGY processor property, too.

      Expected result after fix

      • Users can specify Kafka partition with PutKafka 'partition' property, but no need to specify 'partition strategy', NullPointerException won't be thrown
      • A warning log, that used to be logged in nifi-app.log won't be logged any more:
        2017-01-18 13:53:33,071 WARN [Timer-Driven Process Thread-9] o.a.k.clients.producer.ProducerConfig The configuration partitioner.class = null was supplied but isn't a known config.

      Attachments

        Activity

          People

            ijokarumawak Koji Kawamura
            ijokarumawak Koji Kawamura
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: