Kafka
  1. Kafka
  2. KAFKA-888

problems when shutting down the java consumer .

    Details

    • Type: Bug Bug
    • Status: Open
    • Priority: Minor Minor
    • Resolution: Unresolved
    • Affects Version/s: 0.8.0
    • Fix Version/s: None
    • Component/s: consumer
    • Environment:
      Linux kacper-pc 3.2.0-40-generic #64-Ubuntu SMP 2013 x86_64 x86_64 x86_64 GNU/Linux, scala 2.9.2

      Description

      I got the following error when shutting down the consumer :

      ConsumerFetcherThread-test-consumer-group_kacper-pc-1367268338957-12cb5a0b-0-0] INFO kafka.consumer.SimpleConsumer - Reconnect due to socket error:
      java.nio.channels.ClosedByInterruptException: null
      at java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:202) ~[na:1.7.0_21]
      at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:386) ~[na:1.7.0_21]
      at sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:220) ~[na:1.7.0_21]
      at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103) ~[na:1.7.0_21]
      at java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385) ~[na:1.7.0_21]
      at kafka.utils.Utils$.read(Utils.scala:394) ~[kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
      at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) ~[kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
      at kafka.network.Receive$class.readCompletely(Transmission.scala:56) ~[kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
      at kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29) ~[kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
      at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100) ~[kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
      at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:75) [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
      at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:73) [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
      at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:112) [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
      at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112) [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
      at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112) [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
      at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
      at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:111) [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
      at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111) [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
      at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111) [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
      at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
      at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:110) [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
      at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:96) [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
      at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88) [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
      at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]

      and this is how I create my Consumer

      public Boolean call() throws Exception

      { Map<String, Integer> topicCountMap = new HashMap<>(); topicCountMap.put(topic, new Integer(1)); Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0); ConsumerIterator<byte[], byte[]> it = stream.iterator(); it.next(); LOGGER.info("Received the message. Shutting down"); consumer.commitOffsets(); consumer.shutdown(); return true; }

        Activity

          People

          • Assignee:
            Neha Narkhede
            Reporter:
            kacper chwialkowski
          • Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

            • Created:
              Updated:

              Development