From 22296ed11a383f02824f7e955d76456dad989397 Mon Sep 17 00:00:00 2001 From: Sriharsha Chintalapani Date: Fri, 20 Feb 2015 23:50:37 -0800 Subject: [PATCH] KAFKA-1887. controller error message on shutting the last broker. --- core/src/main/scala/kafka/server/KafkaServer.scala | 14 ++++++-------- .../kafka/api/ProducerFailureHandlingTest.scala | 7 ++++--- 2 files changed, 10 insertions(+), 11 deletions(-) diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 7e5ddcb..5d4acfb 100644 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -133,6 +133,10 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg /* start offset manager */ offsetManager = createOffsetManager() + /* tell everyone we are alive */ + kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, config.advertisedHostName, config.advertisedPort, config.zkSessionTimeoutMs, zkClient) + kafkaHealthcheck.startup() + /* start kafka controller */ kafkaController = new KafkaController(config, zkClient, brokerState) kafkaController.startup() @@ -152,10 +156,6 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg topicConfigManager = new TopicConfigManager(zkClient, logManager) topicConfigManager.startup() - /* tell everyone we are alive */ - kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, config.advertisedHostName, config.advertisedPort, config.zkSessionTimeoutMs, zkClient) - kafkaHealthcheck.startup() - /* register broker metrics */ registerStats() @@ -310,6 +310,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg if (canShutdown) { Utils.swallow(controlledShutdown()) brokerState.newState(BrokerShuttingDown) + if(kafkaController != null) + Utils.swallow(kafkaController.shutdown()) if(kafkaHealthcheck != null) Utils.swallow(kafkaHealthcheck.shutdown()) if(socketServer != null) @@ -327,13 +329,9 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg Utils.swallow(logManager.shutdown()) if(consumerCoordinator != null) Utils.swallow(consumerCoordinator.shutdown()) - if(kafkaController != null) - Utils.swallow(kafkaController.shutdown()) if(zkClient != null) Utils.swallow(zkClient.close()) - brokerState.newState(NotRunning) - startupComplete.set(false) isShuttingDown.set(false) shutdownLatch.countDown() diff --git a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala index ba48a63..19a2fd0 100644 --- a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala +++ b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala @@ -32,7 +32,7 @@ import kafka.integration.KafkaServerTestHarness import kafka.utils.{TestZKUtils, ShutdownableThread, TestUtils} import org.apache.kafka.common.KafkaException -import org.apache.kafka.common.errors.{InvalidTopicException, NotEnoughReplicasException} +import org.apache.kafka.common.errors.{InvalidTopicException, NotEnoughReplicasException, NotEnoughReplicasAfterAppendException} import org.apache.kafka.clients.producer._ class ProducerFailureHandlingTest extends KafkaServerTestHarness { @@ -348,8 +348,9 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness { fail("Expected exception when producing to topic with fewer brokers than min.insync.replicas") } catch { case e: ExecutionException => - if (!e.getCause.isInstanceOf[NotEnoughReplicasException]) { - fail("Expected NotEnoughReplicasException when producing to topic with fewer brokers than min.insync.replicas") + if (!e.getCause.isInstanceOf[NotEnoughReplicasException] && + !e.getCause.isInstanceOf[NotEnoughReplicasAfterAppendException]) { + fail("Expected NotEnoughReplicasException when producing to topic with fewer brokers than min.insync.replicas ") } } -- 1.9.3 (Apple Git-50)