Index: core/src/main/scala/kafka/server/KafkaServerStartable.scala =================================================================== --- core/src/main/scala/kafka/server/KafkaServerStartable.scala (revision 1213769) +++ core/src/main/scala/kafka/server/KafkaServerStartable.scala (working copy) @@ -39,7 +39,7 @@ server = new KafkaServer(serverConfig) if (consumerConfig != null) embeddedConsumer = - new EmbeddedConsumer(consumerConfig, producerConfig, server) + new EmbeddedConsumer(consumerConfig, producerConfig, this) } def startup() { @@ -75,7 +75,7 @@ class EmbeddedConsumer(private val consumerConfig: ConsumerConfig, private val producerConfig: ProducerConfig, - private val kafkaServer: KafkaServer) extends TopicEventHandler[String] with Logging { + private val kafkaServerStartable: KafkaServerStartable) extends TopicEventHandler[String] with Logging { private val whiteListTopics = consumerConfig.mirrorTopicsWhitelist.split(",").toList.map(_.trim) @@ -160,7 +160,8 @@ } def startup() { - topicEventWatcher = new ZookeeperTopicEventWatcher(consumerConfig, this) + info("staring up embedded consumer") + topicEventWatcher = new ZookeeperTopicEventWatcher(consumerConfig, this, kafkaServerStartable) /* * consumer threads are (re-)started upon topic events (which includes an * initial startup event which lists the current topics) Index: core/src/main/scala/kafka/common/ConsumerReblanceFailedException.scala =================================================================== --- core/src/main/scala/kafka/common/ConsumerReblanceFailedException.scala (revision 0) +++ core/src/main/scala/kafka/common/ConsumerReblanceFailedException.scala (revision 0) @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.common + +/** + * Thrown when a request is made for broker but no brokers with that topic + * exist. + */ +class ConsumerRebalanceFailedException(message: String) extends RuntimeException(message) { + def this() = this(null) +} \ No newline at end of file Index: core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala =================================================================== --- core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala (revision 1213769) +++ core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala (working copy) @@ -21,9 +21,11 @@ import kafka.utils.{Utils, ZkUtils, ZKStringSerializer, Logging} import org.I0Itec.zkclient.{IZkStateListener, IZkChildListener, ZkClient} import org.apache.zookeeper.Watcher.Event.KeeperState +import kafka.server.KafkaServerStartable +import kafka.common.ConsumerRebalanceFailedException class ZookeeperTopicEventWatcher(val config:ConsumerConfig, - val eventHandler: TopicEventHandler[String]) extends Logging { + val eventHandler: TopicEventHandler[String], kafkaServerStartable: KafkaServerStartable) extends Logging { val lock = new Object() @@ -33,7 +35,7 @@ startWatchingTopicEvents() private def startWatchingTopicEvents() { - val topicEventListener = new ZkTopicEventListener + val topicEventListener = new ZkTopicEventListener(kafkaServerStartable) ZkUtils.makeSurePersistentPathExists(zkClient, ZkUtils.BrokerTopicsPath) zkClient.subscribeStateChanges( @@ -50,24 +52,17 @@ def shutdown() { lock.synchronized { - try { - if (zkClient != null) { - stopWatchingTopicEvents() - zkClient.close() - zkClient = null - } - else - warn("Cannot shutdown already shutdown topic event watcher.") + if (zkClient != null) { + stopWatchingTopicEvents() + zkClient.close() + zkClient = null } - catch { - case e => - fatal(e) - fatal(Utils.stackTrace(e)) - } + else + warn("Cannot shutdown already shutdown topic event watcher.") } } - class ZkTopicEventListener() extends IZkChildListener { + class ZkTopicEventListener(val kafkaServerStartable: KafkaServerStartable) extends IZkChildListener { @throws(classOf[Exception]) def handleChildChange(parent: String, children: java.util.List[String]) { @@ -81,9 +76,11 @@ } } catch { + case e: ConsumerRebalanceFailedException => + fatal("can't rebalance in embedded consumer; proceed to shutdown", e) + kafkaServerStartable.shutdown() case e => - fatal(e) - fatal(Utils.stackTrace(e)) + error("error in handling child changes in embedded consumer", e) } } } Index: core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala =================================================================== --- core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala (revision 1213769) +++ core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala (working copy) @@ -29,7 +29,7 @@ import kafka.api.OffsetRequest import java.util.UUID import kafka.serializer.Decoder -import kafka.common.InvalidConfigException +import kafka.common.{ConsumerRebalanceFailedException, InvalidConfigException} /** * This class handles the consumers interaction with zookeeper @@ -446,7 +446,7 @@ } } - throw new RuntimeException(consumerIdString + " can't rebalance after " + config.maxRebalanceRetries +" retries") + throw new ConsumerRebalanceFailedException(consumerIdString + " can't rebalance after " + config.maxRebalanceRetries +" retries") } private def rebalance(): Boolean = {