Uploaded image for project: 'Flume'
  1. Flume
  2. FLUME-2790

Flume Kafka NullPointerException while trying to connect to Kafka topic as source (consumer.ConsumerFetcherManager LeaderFinderThread Failed to find leader for Set)

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Blocker
    • Resolution: Later
    • 1.6.0
    • None
    • Sinks+Sources
    • None
    • HDP 2.3 fully kerberized including Kafka 0.8.2.2, using either the HDP 2.3 version of Flume 1.5.2 or Apache Flume 1.6 downloaded from apache.org

    Description

      I'm getting the following NullPointerException when trying to integrate Flume with Kafka as a source:

      15/09/10 14:39:25 INFO consumer.ZookeeperConsumerConnector: [flume_<custom_scrubbed>-1441895965442-55ac2e21], Cleared all relevant queues for this fetcher
      15/09/10 14:39:25 INFO consumer.ZookeeperConsumerConnector: [flume_<custom_scrubbed>-1441895965442-55ac2e21], Cleared the data chunks in all the consumer message iterators
      15/09/10 14:39:25 INFO consumer.ZookeeperConsumerConnector: [flume_<custom_scrubbed>-1441895965442-55ac2e21], Committing all offsets after clearing the fetcher queues
      15/09/10 14:39:25 INFO consumer.ZookeeperConsumerConnector: [flume_<custom_scrubbed>-1441895965442-55ac2e21], Releasing partition ownership
      15/09/10 14:39:25 INFO consumer.ZookeeperConsumerConnector: [flume_<custom_scrubbed>-1441895965442-55ac2e21], Consumer flume_<custom_scrubbed>-1441895965442-55ac2e21 rebalancing the following partitions: ArrayBuffer(0, 1) for topic <custom_scrubbed> with consumers: List(flume_<custom_scrubbed>-1441895965442-55ac2e21-0)
      15/09/10 14:39:25 INFO consumer.ZookeeperConsumerConnector: [flume_<custom_scrubbed>-1441895965442-55ac2e21], flume_<custom_scrubbed>-1441895965442-55ac2e21-0 attempting to claim partition 0
      15/09/10 14:39:25 INFO consumer.ZookeeperConsumerConnector: [flume_<custom_scrubbed>-1441895965442-55ac2e21], flume_<custom_scrubbed>-1441895965442-55ac2e21-0 attempting to claim partition 1
      15/09/10 14:39:25 INFO consumer.ZookeeperConsumerConnector: [flume_<custom_scrubbed>-1441895965442-55ac2e21], flume_<custom_scrubbed>-1441895965442-55ac2e21-0 successfully owned partition 1 for topic <custom_scrubbed>
      15/09/10 14:39:25 INFO consumer.ZookeeperConsumerConnector: [flume_<custom_scrubbed>-1441895965442-55ac2e21], flume_<custom_scrubbed>-1441895965442-55ac2e21-0 successfully owned partition 0 for topic <custom_scrubbed>
      15/09/10 14:39:25 INFO consumer.ZookeeperConsumerConnector: [flume_<custom_scrubbed>-1441895965442-55ac2e21], Updating the cache
      15/09/10 14:39:25 INFO consumer.ZookeeperConsumerConnector: [flume_<custom_scrubbed>-1441895965442-55ac2e21], Consumer flume_<custom_scrubbed>-1441895965442-55ac2e21 selected partitions : <custom_scrubbed>:0: fetched offset = -1: consumed offset = -1,<custom_scrubbed>:1: fetched offset = -1: consumed offset = -1
      15/09/10 14:39:25 INFO consumer.ConsumerFetcherManager$LeaderFinderThread: [flume_<custom_scrubbed>-1441895965442-55ac2e21-leader-finder-thread], Starting
      15/09/10 14:39:25 INFO consumer.ZookeeperConsumerConnector: [flume_<custom_scrubbed>-1441895965442-55ac2e21], end rebalancing consumer flume_<custom_scrubbed>-1441895965442-55ac2e21 try #0
      15/09/10 14:39:25 INFO kafka.KafkaSource: Kafka source s1 started.
      15/09/10 14:39:25 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: s1: Successfully registered new MBean.
      15/09/10 14:39:25 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: s1 started
      15/09/10 14:39:25 INFO utils.VerifiableProperties: Verifying properties
      15/09/10 14:39:25 INFO utils.VerifiableProperties: Property client.id is overridden to flume
      15/09/10 14:39:25 INFO utils.VerifiableProperties: Property metadata.broker.list is overridden to null:-1
      15/09/10 14:39:25 INFO utils.VerifiableProperties: Property request.timeout.ms is overridden to 30000
      15/09/10 14:39:25 WARN consumer.ConsumerFetcherManager$LeaderFinderThread: [flume_<custom_scrubbed>-1441895965442-55ac2e21-leader-finder-thread], Failed to find leader for Set([<custom_scrubbed>,0], [<custom_scrubbed>,1])
      java.lang.NullPointerException
              at java.util.Hashtable.put(Hashtable.java:514)
              at kafka.producer.ProducerPool$.createSyncProducer(ProducerPool.scala:35)
              at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:50)
              at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:88)
              at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
              at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
      15/09/10 14:39:25 INFO consumer.ConsumerFetcherManager: [ConsumerFetcherManager-1441895965538] Added fetcher for partitions ArrayBuffer()
      15/09/10 14:39:26 INFO utils.VerifiableProperties: Verifying properties
      15/09/10 14:39:26 INFO utils.VerifiableProperties: Property client.id is overridden to flume
      15/09/10 14:39:26 INFO utils.VerifiableProperties: Property metadata.broker.list is overridden to null:-1
      15/09/10 14:39:26 INFO utils.VerifiableProperties: Property request.timeout.ms is overridden to 30000
      15/09/10 14:39:26 WARN consumer.ConsumerFetcherManager$LeaderFinderThread: [flume_<custom_scrubbed>-1441895965442-55ac2e21-leader-finder-thread], Failed to find leader for Set([<custom_scrubbed>,0], [<custom_scrubbed>,1])
      java.lang.NullPointerException
              at java.util.Hashtable.put(Hashtable.java:514)
              at kafka.producer.ProducerPool$.createSyncProducer(ProducerPool.scala:35)
              at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:50)
              at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:88)
              at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
              at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
      15/09/10 14:39:26 INFO consumer.ConsumerFetcherManager: [ConsumerFetcherManager-1441895965538] Added fetcher for partitions ArrayBuffer()
      15/09/10 14:39:26 INFO utils.VerifiableProperties: Verifying properties
      15/09/10 14:39:26 INFO utils.VerifiableProperties: Property client.id is overridden to flume
      15/09/10 14:39:26 INFO utils.VerifiableProperties: Property metadata.broker.list is overridden to null:-1
      15/09/10 14:39:26 INFO utils.VerifiableProperties: Property request.timeout.ms is overridden to 30000
      15/09/10 14:39:26 WARN consumer.ConsumerFetcherManager$LeaderFinderThread: [flume_<custom_scrubbed>-1441895965442-55ac2e21-leader-finder-thread], Failed to find leader for Set([<custom_scrubbed>,0], [<custom_scrubbed>,1])
      java.lang.NullPointerException
      ...
      <repeats>
      

      There is a single kafka broker which definitely has a leader and ISR as seen here:

      Topic:<custom_scrubbed>  PartitionCount:2        ReplicationFactor:1     Configs:
              Topic: <custom_scrubbed> Partition: 0    Leader: 0       Replicas: 0     Isr: 0
              Topic: <custom_scrubbed> Partition: 1    Leader: 0       Replicas: 0     Isr: 0
      

      The relevant bit of Flume config is:

      a1.sources = s1
      a1.sources.s1.type = org.apache.flume.source.kafka.KafkaSource
      a1.sources.s1.channels = c1
      a1.sources.s1.zookeeperConnect = myfqdn:2181
      a1.sources.s1.topic = mytopic
      a1.sources.s1.groupId = flume
      a1.sources.s1.kafka.consumer.timeout.ms = 100
      

      Attachments

        Activity

          People

            Unassigned Unassigned
            harisekhon Hari Sekhon
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: