Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-2585

ConsoleConsumer should not hang infinitely upon exception

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • None
    • 0.9.0.0
    • None
    • None

    Description

      Due to imcompatibility beween Java 1.7 and Java 1.8, if Kafka ConsoleConsumer was compiled by Java 1.8 but run by Java 1.7, and if ConsoleConsumer has consumer.timeout.ms in the consumer.properties, then when it timesout, it will throw the following exception and hang infinitely.

      This will cause problem for e.g. Ducktape system test, which currently runs consumer with Java 1.7 and waits for consumer to timeout. This bug causes the Ducktape to hang for a long time.

      To prevent ConsoleConsumer to hang in case of such exception, we can wrap try/catch around run() and do System.exit(0) at the end of main() in ConsoleConsumer.

      [2015-09-28 05:41:31,499] ERROR Error processing message, stopping consumer: (kafka.tools.ConsoleConsumer$)
      kafka.consumer.ConsumerTimeoutException
      at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:69)
      at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:33)
      at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66)
      at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58)
      at kafka.utils.IteratorTemplate.next(IteratorTemplate.scala:38)
      at kafka.consumer.ConsumerIterator.next(ConsumerIterator.scala:46)
      at kafka.consumer.OldConsumer.receive(BaseConsumer.scala:70)
      at kafka.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:94)
      at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:57)
      at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:41)
      at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)
      Processed a total of 78 messages
      Exception in thread "main" java.lang.NoSuchMethodError: java.util.concurrent.ConcurrentHashMap.keySet()Ljava/util/concurrent/ConcurrentHashMap$KeySetView;
      at kafka.utils.Pool.keys(Pool.scala:77)
      at kafka.consumer.FetchRequestAndResponseStatsRegistry$.removeConsumerFetchRequestAndResponseStats(FetchRequestAndResponseStats.scala:69)
      at kafka.metrics.KafkaMetricsGroup$.removeAllConsumerMetrics(KafkaMetricsGroup.scala:189)
      at kafka.consumer.ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:202)
      at kafka.consumer.OldConsumer.stop(BaseConsumer.scala:75)
      at kafka.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:98)
      at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:57)
      at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:41)
      at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)

      Attachments

        Activity

          People

            lindong Dong Lin
            lindong Dong Lin
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: