Index: core/src/main/scala/kafka/server/KafkaServer.scala =================================================================== --- core/src/main/scala/kafka/server/KafkaServer.scala (revision 1309618) +++ core/src/main/scala/kafka/server/KafkaServer.scala (working copy) @@ -17,13 +17,13 @@ package kafka.server -import java.util.concurrent._ -import java.util.concurrent.atomic._ import java.io.File import kafka.network.{SocketServerStats, SocketServer} import kafka.log.LogManager import kafka.utils._ import kafka.cluster.Replica +import java.util.concurrent._ +import atomic.AtomicBoolean /** * Represents the lifecycle of a single Kafka broker. Handles all functionality required @@ -32,8 +32,8 @@ class KafkaServer(val config: KafkaConfig) extends Logging { val CleanShutdownFile = ".kafka_cleanshutdown" - private val isShuttingDown = new AtomicBoolean(false) - private val shutdownLatch = new CountDownLatch(1) + private var isServerDown = new AtomicBoolean(true) + private var shutdownLatch = new CountDownLatch(1) private val statsMBeanName = "kafka:type=kafka.SocketServerStats" var socketServer: SocketServer = null var requestHandlerPool: KafkaRequestHandlerPool = null @@ -47,6 +47,13 @@ */ def startup() { info("Starting Kafka server...") + val canStart = isServerDown.compareAndSet(true, false) + if (!canStart) { + info("kafka server already started") + return + } + + shutdownLatch = new CountDownLatch(1) var needRecovery = true val cleanShutDownFile = new File(new File(config.logDir), CleanShutdownFile) if (cleanShutDownFile.exists) { @@ -90,7 +97,7 @@ * Shuts down the LogManager, the SocketServer and the log cleaner scheduler thread */ def shutdown() { - val canShutdown = isShuttingDown.compareAndSet(false, true); + val canShutdown = isServerDown.compareAndSet(false, true); if (canShutdown) { info("Shutting down Kafka server with id " + config.brokerId) kafkaZookeeper.close