Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-888

problems when shutting down the java consumer .

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Minor
    • Resolution: Cannot Reproduce
    • Affects Version/s: 0.8.0
    • Fix Version/s: None
    • Component/s: consumer
    • Environment:
      Linux kacper-pc 3.2.0-40-generic #64-Ubuntu SMP 2013 x86_64 x86_64 x86_64 GNU/Linux, scala 2.9.2

      Description

      I got the following error when shutting down the consumer :

      ConsumerFetcherThread-test-consumer-group_kacper-pc-1367268338957-12cb5a0b-0-0] INFO kafka.consumer.SimpleConsumer - Reconnect due to socket error:
      java.nio.channels.ClosedByInterruptException: null
      at java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:202) ~[na:1.7.0_21]
      at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:386) ~[na:1.7.0_21]
      at sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:220) ~[na:1.7.0_21]
      at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103) ~[na:1.7.0_21]
      at java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385) ~[na:1.7.0_21]
      at kafka.utils.Utils$.read(Utils.scala:394) ~[kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
      at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) ~[kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
      at kafka.network.Receive$class.readCompletely(Transmission.scala:56) ~[kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
      at kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29) ~[kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
      at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100) ~[kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
      at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:75) [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
      at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:73) [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
      at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:112) [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
      at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112) [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
      at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112) [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
      at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
      at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:111) [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
      at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111) [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
      at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111) [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
      at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
      at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:110) [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
      at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:96) [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
      at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88) [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
      at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]

      and this is how I create my Consumer

      public Boolean call() throws Exception

      { Map<String, Integer> topicCountMap = new HashMap<>(); topicCountMap.put(topic, new Integer(1)); Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0); ConsumerIterator<byte[], byte[]> it = stream.iterator(); it.next(); LOGGER.info("Received the message. Shutting down"); consumer.commitOffsets(); consumer.shutdown(); return true; }

        Activity

        Hide
        omkreddy Manikumar added a comment -

        Pl reopen if you think the issue still exists

        Show
        omkreddy Manikumar added a comment - Pl reopen if you think the issue still exists
        Hide
        junrao Jun Rao added a comment -

        This logging itself is ok. We interrupt the fetcher thread when shutting down the consumer. Other than the logging, could your consumer shutdown? Also, normally, you iterate and consume messages in a separate thread. See the example in https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example

        Show
        junrao Jun Rao added a comment - This logging itself is ok. We interrupt the fetcher thread when shutting down the consumer. Other than the logging, could your consumer shutdown? Also, normally, you iterate and consume messages in a separate thread. See the example in https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example

          People

          • Assignee:
            nehanarkhede Neha Narkhede
            Reporter:
            kacper kacper chwialkowski
          • Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development