Kafka
  1. Kafka
  2. KAFKA-888

problems when shutting down the java consumer .

    Details

    • Type: Bug Bug
    • Status: Open
    • Priority: Minor Minor
    • Resolution: Unresolved
    • Affects Version/s: 0.8.0
    • Fix Version/s: None
    • Component/s: consumer
    • Environment:
      Linux kacper-pc 3.2.0-40-generic #64-Ubuntu SMP 2013 x86_64 x86_64 x86_64 GNU/Linux, scala 2.9.2

      Description

      I got the following error when shutting down the consumer :

      ConsumerFetcherThread-test-consumer-group_kacper-pc-1367268338957-12cb5a0b-0-0] INFO kafka.consumer.SimpleConsumer - Reconnect due to socket error:
      java.nio.channels.ClosedByInterruptException: null
      at java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:202) ~[na:1.7.0_21]
      at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:386) ~[na:1.7.0_21]
      at sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:220) ~[na:1.7.0_21]
      at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103) ~[na:1.7.0_21]
      at java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385) ~[na:1.7.0_21]
      at kafka.utils.Utils$.read(Utils.scala:394) ~[kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
      at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) ~[kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
      at kafka.network.Receive$class.readCompletely(Transmission.scala:56) ~[kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
      at kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29) ~[kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
      at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100) ~[kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
      at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:75) [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
      at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:73) [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
      at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:112) [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
      at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112) [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
      at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112) [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
      at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
      at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:111) [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
      at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111) [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
      at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111) [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
      at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
      at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:110) [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
      at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:96) [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
      at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88) [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
      at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]

      and this is how I create my Consumer

      public Boolean call() throws Exception

      { Map<String, Integer> topicCountMap = new HashMap<>(); topicCountMap.put(topic, new Integer(1)); Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0); ConsumerIterator<byte[], byte[]> it = stream.iterator(); it.next(); LOGGER.info("Received the message. Shutting down"); consumer.commitOffsets(); consumer.shutdown(); return true; }

        Activity

        Hide
        Jun Rao added a comment -

        This logging itself is ok. We interrupt the fetcher thread when shutting down the consumer. Other than the logging, could your consumer shutdown? Also, normally, you iterate and consume messages in a separate thread. See the example in https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example

        Show
        Jun Rao added a comment - This logging itself is ok. We interrupt the fetcher thread when shutting down the consumer. Other than the logging, could your consumer shutdown? Also, normally, you iterate and consume messages in a separate thread. See the example in https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example

          People

          • Assignee:
            Neha Narkhede
            Reporter:
            kacper chwialkowski
          • Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

            • Created:
              Updated:

              Development