diff --git a/core/src/main/scala/kafka/admin/ShutdownBroker.scala b/core/src/main/scala/kafka/admin/ShutdownBroker.scala deleted file mode 100644 index 2dd47e7..0000000 --- a/core/src/main/scala/kafka/admin/ShutdownBroker.scala +++ /dev/null @@ -1,124 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.admin - - -import joptsimple.OptionParser -import kafka.utils._ -import org.I0Itec.zkclient.ZkClient -import javax.management.remote.{JMXServiceURL, JMXConnectorFactory} -import javax.management.ObjectName -import kafka.controller.KafkaController -import scala.Some -import kafka.common.{TopicAndPartition, BrokerNotAvailableException} - - -object ShutdownBroker extends Logging { - - private case class ShutdownParams(zkConnect: String, brokerId: java.lang.Integer) - - private def invokeShutdown(params: ShutdownParams): Boolean = { - var zkClient: ZkClient = null - try { - zkClient = new ZkClient(params.zkConnect, 30000, 30000, ZKStringSerializer) - val controllerBrokerId = ZkUtils.getController(zkClient) - ZkUtils.readDataMaybeNull(zkClient, ZkUtils.BrokerIdsPath + "/" + controllerBrokerId)._1 match { - case Some(controllerInfo) => - var controllerHost: String = null - var controllerJmxPort: Int = -1 - try { - Json.parseFull(controllerInfo) match { - case Some(m) => - val brokerInfo = m.asInstanceOf[Map[String, Any]] - controllerHost = brokerInfo.get("host").get.toString - controllerJmxPort = brokerInfo.get("jmx_port").get.asInstanceOf[Int] - case None => - throw new BrokerNotAvailableException("Broker id %d does not exist".format(controllerBrokerId)) - } - } - val jmxUrl = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://%s:%d/jmxrmi".format(controllerHost, controllerJmxPort)) - info("Connecting to jmx url " + jmxUrl) - val jmxc = JMXConnectorFactory.connect(jmxUrl, null) - val mbsc = jmxc.getMBeanServerConnection - val leaderPartitionsRemaining = mbsc.invoke(new ObjectName(KafkaController.MBeanName), - "shutdownBroker", - Array(params.brokerId), - Array(classOf[Int].getName)).asInstanceOf[Set[TopicAndPartition]] - val shutdownComplete = (leaderPartitionsRemaining.size == 0) - info("Shutdown status: " + - (if (shutdownComplete) "complete" else "incomplete (broker still leads %d partitions)".format(leaderPartitionsRemaining))) - shutdownComplete - case None => - throw new BrokerNotAvailableException("Broker id %d does not exist".format(controllerBrokerId)) - } - } catch { - case t: Throwable => - error("Operation failed due to controller failure", t) - false - } finally { - if (zkClient != null) - zkClient.close() - } - } - - def main(args: Array[String]) { - val parser = new OptionParser - val brokerOpt = parser.accepts("broker", "REQUIRED: The broker to shutdown.") - .withRequiredArg - .describedAs("Broker Id") - .ofType(classOf[java.lang.Integer]) - val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the form host:port. " + - "Multiple URLS can be given to allow fail-over.") - .withRequiredArg - .describedAs("urls") - .ofType(classOf[String]) - val numRetriesOpt = parser.accepts("num.retries", "Number of attempts to retry if shutdown does not complete.") - .withRequiredArg - .describedAs("number of retries") - .ofType(classOf[java.lang.Integer]) - .defaultsTo(0) - val retryIntervalOpt = parser.accepts("retry.interval.ms", "Retry interval if retries requested.") - .withRequiredArg - .describedAs("retry interval in ms (> 1000)") - .ofType(classOf[java.lang.Integer]) - .defaultsTo(1000) - - val options = parser.parse(args : _*) - CommandLineUtils.checkRequiredArgs(parser, options, brokerOpt, zkConnectOpt) - - val retryIntervalMs = options.valueOf(retryIntervalOpt).intValue.max(1000) - val numRetries = options.valueOf(numRetriesOpt).intValue - - val shutdownParams = ShutdownParams(options.valueOf(zkConnectOpt), options.valueOf(brokerOpt)) - - if (!invokeShutdown(shutdownParams)) { - (1 to numRetries).takeWhile(attempt => { - info("Retry " + attempt) - try { - Thread.sleep(retryIntervalMs) - } - catch { - case ie: InterruptedException => // ignore - } - !invokeShutdown(shutdownParams) - }) - } - } - -} - diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index 2fa1341..e776423 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -121,12 +121,8 @@ class ControllerContext(val zkClient: ZkClient, } } -trait KafkaControllerMBean { - def shutdownBroker(id: Int): Set[TopicAndPartition] -} object KafkaController extends Logging { - val MBeanName = "kafka.controller:type=KafkaController,name=ControllerOps" val stateChangeLogger = new StateChangeLogger("state.change.logger") val InitialControllerEpoch = 1 val InitialControllerEpochZkVersion = 1 @@ -155,7 +151,7 @@ object KafkaController extends Logging { } } -class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerState: BrokerState) extends Logging with KafkaMetricsGroup with KafkaControllerMBean { +class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerState: BrokerState) extends Logging with KafkaMetricsGroup { this.logIdent = "[Controller " + config.brokerId + "]: " private var isRunning = true private val stateChangeLogger = KafkaController.stateChangeLogger @@ -238,7 +234,6 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt throw new BrokerNotAvailableException("Broker id %d does not exist.".format(id)) controllerContext.shuttingDownBrokerIds.add(id) - debug("All shutting down brokers: " + controllerContext.shuttingDownBrokerIds.mkString(",")) debug("Live brokers: " + controllerContext.liveBrokerIds.mkString(",")) } @@ -251,31 +246,29 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt allPartitionsAndReplicationFactorOnBroker.foreach { case(topicAndPartition, replicationFactor) => - // Move leadership serially to relinquish lock. - inLock(controllerContext.controllerLock) { - controllerContext.partitionLeadershipInfo.get(topicAndPartition).foreach { currLeaderIsrAndControllerEpoch => - if (currLeaderIsrAndControllerEpoch.leaderAndIsr.leader == id) { - // If the broker leads the topic partition, transition the leader and update isr. Updates zk and - // notifies all affected brokers - partitionStateMachine.handleStateChanges(Set(topicAndPartition), OnlinePartition, - controlledShutdownPartitionLeaderSelector) - } - else { - // Stop the replica first. The state change below initiates ZK changes which should take some time - // before which the stop replica request should be completed (in most cases) - brokerRequestBatch.newBatch() - brokerRequestBatch.addStopReplicaRequestForBrokers(Seq(id), topicAndPartition.topic, - topicAndPartition.partition, deletePartition = false) - brokerRequestBatch.sendRequestsToBrokers(epoch, controllerContext.correlationId.getAndIncrement) - - // If the broker is a follower, updates the isr in ZK and notifies the current leader - replicaStateMachine.handleStateChanges(Set(PartitionAndReplica(topicAndPartition.topic, - topicAndPartition.partition, id)), OfflineReplica) + // Move leadership serially to relinquish lock. + inLock(controllerContext.controllerLock) { + controllerContext.partitionLeadershipInfo.get(topicAndPartition).foreach { currLeaderIsrAndControllerEpoch => + if (currLeaderIsrAndControllerEpoch.leaderAndIsr.leader == id && replicationFactor > 1) { + // If the broker leads the topic partition, transition the leader and update isr. Updates zk and + // notifies all affected brokers + partitionStateMachine.handleStateChanges(Set(topicAndPartition), OnlinePartition, + controlledShutdownPartitionLeaderSelector) + } else { + // Stop the replica first. The state change below initiates ZK changes which should take some time + // before which the stop replica request should be completed (in most cases) + brokerRequestBatch.newBatch() + brokerRequestBatch.addStopReplicaRequestForBrokers(Seq(id), topicAndPartition.topic, + topicAndPartition.partition, deletePartition = false) + brokerRequestBatch.sendRequestsToBrokers(epoch, controllerContext.correlationId.getAndIncrement) + + // If the broker is a follower, updates the isr in ZK and notifies the current leader + replicaStateMachine.handleStateChanges(Set(PartitionAndReplica(topicAndPartition.topic, + topicAndPartition.partition, id)), OfflineReplica) + } } } - } } - def replicatedPartitionsBrokerLeads() = inLock(controllerContext.controllerLock) { trace("All leaders = " + controllerContext.partitionLeadershipInfo.mkString(",")) controllerContext.partitionLeadershipInfo.filter { @@ -315,7 +308,6 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt partitionStateMachine.startup() // register the partition change listeners for all existing topics on failover controllerContext.allTopics.foreach(topic => partitionStateMachine.registerPartitionChangeListener(topic)) - Utils.registerMBean(this, KafkaController.MBeanName) info("Broker %d is ready to serve as the new controller with epoch %d".format(config.brokerId, epoch)) brokerState.newState(RunningAsController) maybeTriggerPartitionReassignment() @@ -346,7 +338,6 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt if (config.autoLeaderRebalanceEnable) autoRebalanceScheduler.shutdown() - Utils.unregisterMBean(KafkaController.MBeanName) partitionStateMachine.shutdown() replicaStateMachine.shutdown() if(controllerContext.controllerChannelManager != null) { diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index d0bbeb6..c7508d5 100644 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -5,7 +5,7 @@ * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software @@ -31,7 +31,7 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro this(new VerifiableProperties(originalProps)) props.verify() } - + private def getLogRetentionTimeMillis(): Long = { val millisInMinute = 60L * 1000L val millisInHour = 60L * millisInMinute @@ -40,44 +40,44 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro } else { millisInHour * props.getIntInRange("log.retention.hours", 24*7, (1, Int.MaxValue)) } - + } /*********** General Configuration ***********/ - + /* the broker id for this server */ val brokerId: Int = props.getIntInRange("broker.id", (0, Int.MaxValue)) /* the maximum size of message that the server can receive */ val messageMaxBytes = props.getIntInRange("message.max.bytes", 1000000 + MessageSet.LogOverhead, (0, Int.MaxValue)) - + /* the number of network threads that the server uses for handling network requests */ val numNetworkThreads = props.getIntInRange("num.network.threads", 3, (1, Int.MaxValue)) /* the number of io threads that the server uses for carrying out network requests */ val numIoThreads = props.getIntInRange("num.io.threads", 8, (1, Int.MaxValue)) - + /* the number of threads to use for various background processing tasks */ val backgroundThreads = props.getIntInRange("background.threads", 10, (1, Int.MaxValue)) - + /* the number of queued requests allowed before blocking the network threads */ val queuedMaxRequests = props.getIntInRange("queued.max.requests", 500, (1, Int.MaxValue)) - + /*********** Socket Server Configuration ***********/ - + /* the port to listen and accept connections on */ val port: Int = props.getInt("port", 6667) /* hostname of broker. If this is set, it will only bind to this address. If this is not set, * it will bind to all interfaces */ val hostName: String = props.getString("host.name", null) - + /* hostname to publish to ZooKeeper for clients to use. In IaaS environments, this may * need to be different from the interface to which the broker binds. If this is not set, * it will use the value for "host.name" if configured. Otherwise * it will use the value returned from java.net.InetAddress.getCanonicalHostName(). */ val advertisedHostName: String = props.getString("advertised.host.name", hostName) - + /* the port to publish to ZooKeeper for clients to use. In IaaS environments, this may * need to be different from the port to which the broker binds. If this is not set, * it will publish the same port that the broker binds to. */ @@ -85,22 +85,22 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro /* the SO_SNDBUFF buffer of the socket sever sockets */ val socketSendBufferBytes: Int = props.getInt("socket.send.buffer.bytes", 100*1024) - + /* the SO_RCVBUFF buffer of the socket sever sockets */ val socketReceiveBufferBytes: Int = props.getInt("socket.receive.buffer.bytes", 100*1024) - + /* the maximum number of bytes in a socket request */ val socketRequestMaxBytes: Int = props.getIntInRange("socket.request.max.bytes", 100*1024*1024, (1, Int.MaxValue)) - + /*********** Log Configuration ***********/ /* the default number of log partitions per topic */ val numPartitions = props.getIntInRange("num.partitions", 1, (1, Int.MaxValue)) - + /* the directories in which the log data is kept */ val logDirs = Utils.parseCsvList(props.getString("log.dirs", props.getString("log.dir", "/tmp/kafka-logs"))) require(logDirs.size > 0) - + /* the maximum size of a single log file */ val logSegmentBytes = props.getIntInRange("log.segment.bytes", 1*1024*1024*1024, (Message.MinHeaderSize, Int.MaxValue)) @@ -115,42 +115,42 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro /* the frequency in minutes that the log cleaner checks whether any log is eligible for deletion */ val logCleanupIntervalMs = props.getLongInRange("log.retention.check.interval.ms", 5*60*1000, (1, Long.MaxValue)) - + /* the default cleanup policy for segments beyond the retention window, must be either "delete" or "compact" */ val logCleanupPolicy = props.getString("log.cleanup.policy", "delete") - + /* the number of background threads to use for log cleaning */ val logCleanerThreads = props.getIntInRange("log.cleaner.threads", 1, (0, Int.MaxValue)) - + /* the log cleaner will be throttled so that the sum of its read and write i/o will be less than this value on average */ val logCleanerIoMaxBytesPerSecond = props.getDouble("log.cleaner.io.max.bytes.per.second", Double.MaxValue) - + /* the total memory used for log deduplication across all cleaner threads */ val logCleanerDedupeBufferSize = props.getLongInRange("log.cleaner.dedupe.buffer.size", 500*1024*1024L, (0, Long.MaxValue)) require(logCleanerDedupeBufferSize / logCleanerThreads > 1024*1024, "log.cleaner.dedupe.buffer.size must be at least 1MB per cleaner thread.") - + /* the total memory used for log cleaner I/O buffers across all cleaner threads */ val logCleanerIoBufferSize = props.getIntInRange("log.cleaner.io.buffer.size", 512*1024, (0, Int.MaxValue)) - + /* log cleaner dedupe buffer load factor. The percentage full the dedupe buffer can become. A higher value * will allow more log to be cleaned at once but will lead to more hash collisions */ val logCleanerDedupeBufferLoadFactor = props.getDouble("log.cleaner.io.buffer.load.factor", 0.9d) - + /* the amount of time to sleep when there are no logs to clean */ val logCleanerBackoffMs = props.getLongInRange("log.cleaner.backoff.ms", 15*1000, (0L, Long.MaxValue)) - + /* the minimum ratio of dirty log to total log for a log to eligible for cleaning */ val logCleanerMinCleanRatio = props.getDouble("log.cleaner.min.cleanable.ratio", 0.5) - + /* should we enable log cleaning? */ val logCleanerEnable = props.getBoolean("log.cleaner.enable", false) - + /* how long are delete records retained? */ val logCleanerDeleteRetentionMs = props.getLong("log.cleaner.delete.retention.ms", 24 * 60 * 60 * 1000L) - + /* the maximum size in bytes of the offset index */ val logIndexSizeMaxBytes = props.getIntInRange("log.index.size.max.bytes", 10*1024*1024, (4, Int.MaxValue)) - + /* the interval with which we add an entry to the offset index */ val logIndexIntervalBytes = props.getIntInRange("log.index.interval.bytes", 4096, (0, Int.MaxValue)) @@ -165,7 +165,7 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro /* the maximum time in ms that a message in any topic is kept in memory before flushed to disk */ val logFlushIntervalMs = props.getLong("log.flush.interval.ms", logFlushSchedulerIntervalMs) - + /* the frequency with which we update the persistent record of the last flush which acts as the log recovery point */ val logFlushOffsetCheckpointIntervalMs = props.getIntInRange("log.flush.offset.checkpoint.interval.ms", 60000, (0, Int.MaxValue)) @@ -210,7 +210,7 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro /* number of fetcher threads used to replicate messages from a source broker. * Increasing this value can increase the degree of I/O parallelism in the follower broker. */ val numReplicaFetchers = props.getInt("num.replica.fetchers", 1) - + /* the frequency with which the high watermark is saved out to disk */ val replicaHighWatermarkCheckpointIntervalMs = props.getLong("replica.high.watermark.checkpoint.interval.ms", 5000L) @@ -245,10 +245,10 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro val controlledShutdownRetryBackoffMs = props.getInt("controlled.shutdown.retry.backoff.ms", 5000) /* enable controlled shutdown of the server */ - val controlledShutdownEnable = props.getBoolean("controlled.shutdown.enable", default = false) + val controlledShutdownEnable = props.getBoolean("controlled.shutdown.enable", default = true) /*********** Offset management configuration ***********/ - + /* the maximum size for a metadata entry associated with an offset commit */ val offsetMetadataMaxSize = props.getInt("offset.metadata.max.bytes", OffsetManagerConfig.DefaultMaxMetadataSize)