Index: core/src/main/scala/kafka/server/KafkaServer.scala =================================================================== --- core/src/main/scala/kafka/server/KafkaServer.scala (revision 1208953) +++ core/src/main/scala/kafka/server/KafkaServer.scala (working copy) @@ -47,42 +47,35 @@ * Instantiates the LogManager, the SocketServer and the request handlers - KafkaRequestHandlers */ def startup() { - try { - info("Starting Kafka server...") - var needRecovery = true - val cleanShutDownFile = new File(new File(config.logDir), CLEAN_SHUTDOWN_FILE) - if (cleanShutDownFile.exists) { - needRecovery = false - cleanShutDownFile.delete - } - logManager = new LogManager(config, - scheduler, - SystemTime, - 1000L * 60 * config.logCleanupIntervalMinutes, - 1000L * 60 * 60 * config.logRetentionHours, - needRecovery) - - val handlers = new KafkaRequestHandlers(logManager) - socketServer = new SocketServer(config.port, - config.numThreads, - config.monitoringPeriodSecs, - handlers.handlerFor, - config.maxSocketRequestSize) - Utils.registerMBean(socketServer.stats, statsMBeanName) - socketServer.startup - Mx4jLoader.maybeLoad - /** - * Registers this broker in ZK. After this, consumers can connect to broker. - * So this should happen after socket server start. - */ - logManager.startup - info("Server started.") + info("Starting Kafka server...") + var needRecovery = true + val cleanShutDownFile = new File(new File(config.logDir), CLEAN_SHUTDOWN_FILE) + if (cleanShutDownFile.exists) { + needRecovery = false + cleanShutDownFile.delete } - catch { - case e => - fatal("Fatal error during startup.", e) - shutdown - } + logManager = new LogManager(config, + scheduler, + SystemTime, + 1000L * 60 * config.logCleanupIntervalMinutes, + 1000L * 60 * 60 * config.logRetentionHours, + needRecovery) + + val handlers = new KafkaRequestHandlers(logManager) + socketServer = new SocketServer(config.port, + config.numThreads, + config.monitoringPeriodSecs, + handlers.handlerFor, + config.maxSocketRequestSize) + Utils.registerMBean(socketServer.stats, statsMBeanName) + socketServer.startup() + Mx4jLoader.maybeLoad + /** + * Registers this broker in ZK. After this, consumers can connect to broker. + * So this should happen after socket server start. + */ + logManager.startup() + info("Kafka server started.") } /** @@ -92,25 +85,19 @@ def shutdown() { val canShutdown = isShuttingDown.compareAndSet(false, true); if (canShutdown) { - info("Shutting down...") - try { - scheduler.shutdown() - if (socketServer != null) - socketServer.shutdown() - Utils.unregisterMBean(statsMBeanName) - if (logManager != null) - logManager.close() + info("Shutting down Kafka server") + scheduler.shutdown() + if (socketServer != null) + socketServer.shutdown() + Utils.unregisterMBean(statsMBeanName) + if (logManager != null) + logManager.close() - val cleanShutDownFile = new File(new File(config.logDir), CLEAN_SHUTDOWN_FILE) - cleanShutDownFile.createNewFile - } - catch { - case e => - fatal(e) - fatal(Utils.stackTrace(e)) - } + val cleanShutDownFile = new File(new File(config.logDir), CLEAN_SHUTDOWN_FILE) + cleanShutDownFile.createNewFile + shutdownLatch.countDown() - info("shut down completed") + info("Kafka server shut down completed") } } Index: core/src/main/scala/kafka/server/KafkaServerStartable.scala =================================================================== --- core/src/main/scala/kafka/server/KafkaServerStartable.scala (revision 1208953) +++ core/src/main/scala/kafka/server/KafkaServerStartable.scala (working copy) @@ -27,7 +27,7 @@ class KafkaServerStartable(val serverConfig: KafkaConfig, val consumerConfig: ConsumerConfig, - val producerConfig: ProducerConfig) { + val producerConfig: ProducerConfig) extends Logging { private var server : KafkaServer = null private var embeddedConsumer : EmbeddedConsumer = null @@ -43,15 +43,29 @@ } def startup() { - server.startup - if (embeddedConsumer != null) - embeddedConsumer.startup + try { + server.startup() + if (embeddedConsumer != null) + embeddedConsumer.startup() + } + catch { + case e => + fatal("Fatal error during KafkaServerStable startup. Prepare to shutdown", e) + shutdown() + } } def shutdown() { - if (embeddedConsumer != null) - embeddedConsumer.shutdown - server.shutdown + try { + if (embeddedConsumer != null) + embeddedConsumer.shutdown() + server.shutdown() + } + catch { + case e => + fatal("Fatal error during KafkaServerStable shutdown. Prepare to halt", e) + Runtime.getRuntime.halt(1) + } } def awaitShutdown() {