Description
I'm porting some unit tests from 0.7.2 to 0.8.0. The test does the following, all embedded in the same java process:
– spins up a zk instance
– spins up a kafka server using a fresh log directory
– creates a producer and sends a message
– creates a high-level consumer and verifies that it can consume the message
– shuts down the consumer
– stops the kafka server
– stops zk
The test seems to be working fine now, however, I consistently see the following exception, when the consumer connector is shutdown:
1699 [ConsumerFetcherThread-group1_square-1a7ac0.local-1368076598439-d66bb2eb-0-1946108683] WARN kafka.consumer.ConsumerFetcherThread - [ConsumerFetcherThread-group1_square-1a7ac0.local-1368076598439-d66bb2eb-0-1946108683], Error in fetch Name: FetchRequest; Version: 0; CorrelationId: 1; ClientId: group1-ConsumerFetcherThread-group1_square-1a7ac0.local-1368076598439-d66bb2eb-0-1946108683; ReplicaId: -1; MaxWait: 100 ms; MinBytes: 1 bytes; RequestInfo: [test-topic,0] -> PartitionFetchInfo(1,1048576)
java.nio.channels.ClosedByInterruptException
at java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:184)
at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:543)
at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:47)
at kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:60)
at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:81)
at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:73)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:112)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:111)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:110)
at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:96)
at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
1721 [Thread-12] INFO com.squareup.kafka.server.KafkaServer - Shutting down KafkaServer
2030 [main] INFO com.squareup.kafka.server.KafkaServer - Shut down complete for KafkaServer
Disconnected from the target VM, address: '127.0.0.1:49243', transport: 'socket'
It would be great if instead, something meaningful was logged, like:
"Consumer connector has been shutdown"