Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
None
-
None
-
None
Description
Currently, in the BrokerProxy stop method, we only interrupt the BrokerProxy thread, and do not take care of the simple consumer. This causes the following exception when we stop the KafkaSystemConsumer:
2015-08-04 14:00:34 CoordinatorStreamSystemConsumer [INFO] Stopping coordinator stream system consumer. 2015-08-04 14:00:34 BrokerProxy [INFO] Shutting down BrokerProxy for 10.151.111.27:9092 2015-08-04 14:00:34 DefaultFetchSimpleConsumer [INFO] Reconnect due to socket error: java.nio.channels.ClosedByInterruptException 2015-08-04 14:00:34 DefaultFetchSimpleConsumer [DEBUG] Disconnecting from 10.151.111.27:9092 2015-08-04 14:00:34 DefaultFetchSimpleConsumer [DEBUG] Disconnecting from 10.151.111.27:9092 2015-08-04 14:00:34 DefaultFetchSimpleConsumer [DEBUG] Disconnecting from 10.151.111.27:9092 2015-08-04 14:00:34 BrokerProxy [WARN] Restarting consumer due to java.nio.channels.ClosedChannelException. Releasing ownership of all partitions, and restarting consumer. Turn on debugging to get a full stack trace. 2015-08-04 14:00:34 BrokerProxy [DEBUG] Exception detail: java.nio.channels.ClosedChannelException at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:78) at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:112) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:111) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:110) at org.apache.samza.system.kafka.DefaultFetchSimpleConsumer.fetch(DefaultFetchSimpleConsumer.scala:48) at org.apache.samza.system.kafka.DefaultFetchSimpleConsumer.defaultFetch(DefaultFetchSimpleConsumer.scala:41) at org.apache.samza.system.kafka.BrokerProxy.org$apache$samza$system$kafka$BrokerProxy$$fetchMessages(BrokerProxy.scala:176) at org.apache.samza.system.kafka.BrokerProxy$$anon$1$$anonfun$run$1.apply(BrokerProxy.scala:146) at org.apache.samza.system.kafka.BrokerProxy$$anon$1$$anonfun$run$1.apply(BrokerProxy.scala:133) at org.apache.samza.util.ExponentialSleepStrategy.run(ExponentialSleepStrategy.scala:82) at org.apache.samza.system.kafka.BrokerProxy$$anon$1.run(BrokerProxy.scala:132) at java.lang.Thread.run(Thread.java:745) 2015-08-04 14:00:34 BrokerProxy [DEBUG] Removed [__samza_coordinator_simple-task0715_1,0] 2015-08-04 14:00:34 KafkaSystemConsumer [INFO] Abdicating for [__samza_coordinator_simple-task0715_1,0] 2015-08-04 14:00:34 KafkaSystemConsumer [INFO] Refreshing brokers for: Map([__samza_coordinator_simple-task0715_1,0] -> 931) 2015-08-04 14:00:34 BrokerProxy [INFO] Shutting down due to interrupt.
We should stop the simple consumer first, then interrupt the BrokerProxy.