diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index 933de9d..2cf7d30 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -28,7 +28,7 @@ import kafka.cluster.Broker import kafka.common._ import kafka.log.LogConfig import kafka.metrics.{KafkaTimer, KafkaMetricsGroup} -import kafka.server.{ZookeeperLeaderElector, KafkaConfig} +import kafka.server._ import kafka.utils.ZkUtils._ import kafka.utils._ import kafka.utils.Utils._ @@ -39,6 +39,13 @@ import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.locks.ReentrantLock import scala.Some import kafka.common.TopicAndPartition +import scala.Some +import kafka.common.TopicAndPartition +import scala.None +import kafka.controller.ReassignedPartitionsContext +import kafka.controller.PartitionAndReplica +import kafka.controller.LeaderIsrAndControllerEpoch +import kafka.server.BrokerState class ControllerContext(val zkClient: ZkClient, val zkSessionTimeout: Int) { @@ -154,7 +161,7 @@ object KafkaController extends Logging { } } -class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logging with KafkaMetricsGroup with KafkaControllerMBean { +class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerState: BrokerState) extends Logging with KafkaMetricsGroup with KafkaControllerMBean { this.logIdent = "[Controller " + config.brokerId + "]: " private var isRunning = true private val stateChangeLogger = KafkaController.stateChangeLogger @@ -316,6 +323,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg controllerContext.allTopics.foreach(topic => partitionStateMachine.registerPartitionChangeListener(topic)) Utils.registerMBean(this, KafkaController.MBeanName) info("Broker %d is ready to serve as the new controller with epoch %d".format(config.brokerId, epoch)) + brokerState.newState(RunningAsController) maybeTriggerPartitionReassignment() maybeTriggerPreferredReplicaElection() /* send partition leadership info to all live brokers */ @@ -349,6 +357,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg controllerContext.controllerChannelManager.shutdown() controllerContext.controllerChannelManager = null } + brokerState.newState(RunningAsBroker) } } diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index b7bc5ff..1d30a69 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -175,6 +175,7 @@ class Log(val dir: File, return } + // okay we need to actually recovery this log val unflushed = logSegments(this.recoveryPoint, Long.MaxValue).iterator while(unflushed.hasNext) { diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index ac67f08..1e77b68 100644 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -22,7 +22,7 @@ import java.util.concurrent.TimeUnit import kafka.utils._ import scala.collection._ import kafka.common.{TopicAndPartition, KafkaException} -import kafka.server.OffsetCheckpoint +import kafka.server.{RecoveringFromUncleanShutdown, BrokerStates, BrokerState, OffsetCheckpoint} /** * The entry point to the kafka log management subsystem. The log manager is responsible for log creation, retrieval, and cleaning. @@ -43,6 +43,7 @@ class LogManager(val logDirs: Array[File], val flushCheckpointMs: Long, val retentionCheckMs: Long, scheduler: Scheduler, + val brokerState: BrokerState, private val time: Time) extends Logging { val RecoveryPointCheckpointFile = "recovery-point-offset-checkpoint" @@ -110,6 +111,9 @@ class LogManager(val logDirs: Array[File], val cleanShutDownFile = new File(dir, Log.CleanShutdownFile) if(cleanShutDownFile.exists()) info("Found clean shutdown file. Skipping recovery for all logs in data directory '%s'".format(dir.getAbsolutePath)) + else + brokerState.newState(RecoveringFromUncleanShutdown) + for(dir <- subDirs) { if(dir.isDirectory) { info("Loading log '" + dir.getName + "'") diff --git a/core/src/main/scala/kafka/server/BrokerStates.scala b/core/src/main/scala/kafka/server/BrokerStates.scala new file mode 100644 index 0000000..e6ee77e --- /dev/null +++ b/core/src/main/scala/kafka/server/BrokerStates.scala @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +/** + * Broker states are the possible state that a kafka broker can be in. + * A broker should be only in one state at a time. + * The expected state transition with the following defined states is: + * + * +-----------+ + * |Not Running| + * +-----+-----+ + * | + * v + * +-----+-----+ + * |Starting +--+ + * +-----+-----+ | +----+------------+ + * | +>+RecoveringFrom | + * v |UncleanShutdown | + * +----------+ +-----+-----+ +-------+---------+ + * |RunningAs | |RunningAs | | + * |Controller+<--->+Broker +<-----------+ + * +----------+ +-----+-----+ + * | | + * | v + * | +-----+------------+ + * |-----> |PendingControlled | + * |Shutdown | + * +-----+------------+ + * | + * v + * +-----+----------+ + * |BrokerShutting | + * |Down | + * +-----+----------+ + * | + * v + * +-----+-----+ + * |Not Running| + * +-----------+ + * + * Custom states is also allowed for cases where there are custom kafka states for different scenarios. + */ +sealed trait BrokerStates { def state: Byte } +case object NotRunning extends BrokerStates { val state: Byte = 0 } +case object Starting extends BrokerStates { val state: Byte = 1 } +case object RecoveringFromUncleanShutdown extends BrokerStates { val state: Byte = 2 } +case object RunningAsBroker extends BrokerStates { val state: Byte = 3 } +case object RunningAsController extends BrokerStates { val state: Byte = 4 } +case object PendingControlledShutdown extends BrokerStates { val state: Byte = 6 } +case object BrokerShuttingDown extends BrokerStates { val state: Byte = 7 } + + +case class BrokerState() { + @volatile var currentState: Byte = NotRunning.state + + def newState(newState: BrokerStates) { + this.newState(newState.state) + } + + // Allowing undefined custom state + def newState(newState: Byte) { + currentState = newState + } +} diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index c208f83..c22e51e 100644 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -31,16 +31,19 @@ import kafka.cluster.Broker import kafka.api.{ControlledShutdownResponse, ControlledShutdownRequest} import kafka.common.ErrorMapping import kafka.network.{Receive, BlockingChannel, SocketServer} +import kafka.metrics.KafkaMetricsGroup +import com.yammer.metrics.core.Gauge /** * Represents the lifecycle of a single Kafka broker. Handles all functionality required * to start up and shutdown a single Kafka node. */ -class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logging { +class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logging with KafkaMetricsGroup { this.logIdent = "[Kafka Server " + config.brokerId + "], " private var isShuttingDown = new AtomicBoolean(false) private var shutdownLatch = new CountDownLatch(1) private var startupComplete = new AtomicBoolean(false) + val brokerState: BrokerState = new BrokerState val correlationId: AtomicInteger = new AtomicInteger(0) var socketServer: SocketServer = null var requestHandlerPool: KafkaRequestHandlerPool = null @@ -54,12 +57,20 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg val kafkaScheduler = new KafkaScheduler(config.backgroundThreads) var zkClient: ZkClient = null + newGauge( + "BrokerState", + new Gauge[Int] { + def value = brokerState.currentState + } + ) + /** * Start up API for bringing up a single instance of the Kafka server. * Instantiates the LogManager, the SocketServer and the request handlers - KafkaRequestHandlers */ def startup() { info("starting") + brokerState.newState(Starting) isShuttingDown = new AtomicBoolean(false) shutdownLatch = new CountDownLatch(1) @@ -70,7 +81,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg zkClient = initZk() /* start log manager */ - logManager = createLogManager(zkClient) + logManager = createLogManager(zkClient, brokerState) logManager.startup() socketServer = new SocketServer(config.brokerId, @@ -88,11 +99,12 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg /* start offset manager */ offsetManager = createOffsetManager() - kafkaController = new KafkaController(config, zkClient) + kafkaController = new KafkaController(config, zkClient, brokerState) /* start processing requests */ apis = new KafkaApis(socketServer.requestChannel, replicaManager, offsetManager, zkClient, config.brokerId, config, kafkaController) requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads) + brokerState.newState(RunningAsBroker) Mx4jLoader.maybeLoad() @@ -143,6 +155,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg var prevController : Broker = null var shutdownSuceeded : Boolean = false try { + brokerState.newState(PendingControlledShutdown) while (!shutdownSuceeded && remainingRetries > 0) { remainingRetries = remainingRetries - 1 @@ -177,7 +190,9 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg // send the controlled shutdown request val request = new ControlledShutdownRequest(correlationId.getAndIncrement, config.brokerId) channel.send(request) + response = channel.receive() + val shutdownResponse = ControlledShutdownResponse.readFrom(response.buffer) if (shutdownResponse.errorCode == ErrorMapping.NoError && shutdownResponse.partitionsRemaining != null && shutdownResponse.partitionsRemaining.size == 0) { @@ -223,6 +238,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg val canShutdown = isShuttingDown.compareAndSet(false, true) if (canShutdown) { Utils.swallow(controlledShutdown()) + brokerState.newState(BrokerShuttingDown) if(kafkaHealthcheck != null) Utils.swallow(kafkaHealthcheck.shutdown()) if(socketServer != null) @@ -243,6 +259,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg if(zkClient != null) Utils.swallow(zkClient.close()) + brokerState.newState(NotRunning) shutdownLatch.countDown() startupComplete.set(false) info("shut down completed") @@ -256,7 +273,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg def getLogManager(): LogManager = logManager - private def createLogManager(zkClient: ZkClient): LogManager = { + private def createLogManager(zkClient: ZkClient, brokerState: BrokerState): LogManager = { val defaultLogConfig = LogConfig(segmentSize = config.logSegmentBytes, segmentMs = 60L * 60L * 1000L * config.logRollHours, flushInterval = config.logFlushIntervalMessages, @@ -289,6 +306,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg flushCheckpointMs = config.logFlushOffsetCheckpointIntervalMs, retentionCheckMs = config.logCleanupIntervalMs, scheduler = kafkaScheduler, + brokerState = brokerState, time = time) } diff --git a/core/src/main/scala/kafka/server/KafkaServerStartable.scala b/core/src/main/scala/kafka/server/KafkaServerStartable.scala index acda52b..5e35380 100644 --- a/core/src/main/scala/kafka/server/KafkaServerStartable.scala +++ b/core/src/main/scala/kafka/server/KafkaServerStartable.scala @@ -52,6 +52,10 @@ class KafkaServerStartable(val serverConfig: KafkaConfig) extends Logging { } } + def setServerState(newState: Byte) { + server.brokerState.newState(newState) + } + def awaitShutdown() = server.awaitShutdown