diff --git a/core/src/main/scala/kafka/server/TopicConfigManager.scala b/core/src/main/scala/kafka/server/TopicConfigManager.scala index d41fd33..458bace 100644 --- a/core/src/main/scala/kafka/server/TopicConfigManager.scala +++ b/core/src/main/scala/kafka/server/TopicConfigManager.scala @@ -40,6 +40,7 @@ import org.I0Itec.zkclient.{IZkChildListener, ZkClient} * To update a topic config we first update the topic config properties. Then we create a new sequential * znode under the change path which contains the name of the topic that was updated, say * /brokers/config_changes/config_change_13321 + * This is just a notification--the actual config change is stored only once under the /brokers/topics//config path. * * This will fire a watcher on all brokers. This watcher works as follows. It reads all the config change notifications. * It keeps track of the highest config change suffix number it has applied previously. For any previously applied change it finds @@ -59,7 +60,7 @@ import org.I0Itec.zkclient.{IZkChildListener, ZkClient} */ class TopicConfigManager(private val zkClient: ZkClient, private val logManager: LogManager, - private val changeExpirationMs: Long = 10*60*1000, + private val changeExpirationMs: Long = 15*60*1000, private val time: Time = SystemTime) extends Logging { private var lastExecutedChange = -1L @@ -86,7 +87,7 @@ class TopicConfigManager(private val zkClient: ZkClient, */ private def processConfigChanges(notifications: Seq[String]) { if (notifications.size > 0) { - info("Processing %d topic config change notification(s)...".format(notifications.size)) + info("Processing config change notification(s)...") val now = time.milliseconds val logs = logManager.logsByTopicPartition.toBuffer val logsByTopic = logs.groupBy(_._1.topic).mapValues(_.map(_._2)) @@ -94,26 +95,38 @@ class TopicConfigManager(private val zkClient: ZkClient, val changeId = changeNumber(notification) if (changeId > lastExecutedChange) { val changeZnode = ZkUtils.TopicConfigChangesPath + "/" + notification - val (topicJson, stat) = ZkUtils.readData(zkClient, changeZnode) - val topic = topicJson.substring(1, topicJson.length - 1) // dequote - if (logsByTopic.contains(topic)) { - /* combine the default properties with the overrides in zk to create the new LogConfig */ - val props = new Properties(logManager.defaultConfig.toProps) - props.putAll(AdminUtils.fetchTopicConfig(zkClient, topic)) - val logConfig = LogConfig.fromProps(props) - for (log <- logsByTopic(topic)) - log.config = logConfig - lastExecutedChange = changeId - info("Processed topic config change %d for topic %s, setting new config to %s.".format(changeId, topic, props)) - } else { - if (now - stat.getCtime > changeExpirationMs) { - /* this change is now obsolete, try to delete it unless it is the last change left */ - error("Ignoring topic config change %d for topic %s since the change has expired") - } else { - error("Ignoring topic config change %d for topic %s since the topic may have been deleted") + val (jsonOpt, stat) = ZkUtils.readDataMaybeNull(zkClient, changeZnode) + if(jsonOpt.isDefined) { + val json = jsonOpt.get + val topic = json.substring(1, json.length - 1) // hacky way to dequote + if (logsByTopic.contains(topic)) { + /* combine the default properties with the overrides in zk to create the new LogConfig */ + val props = new Properties(logManager.defaultConfig.toProps) + props.putAll(AdminUtils.fetchTopicConfig(zkClient, topic)) + val logConfig = LogConfig.fromProps(props) + for (log <- logsByTopic(topic)) + log.config = logConfig + info("Processed topic config change %d for topic %s, setting new config to %s.".format(changeId, topic, props)) + purgeObsoleteNotifications(now, notifications) } - ZkUtils.deletePath(zkClient, changeZnode) } + lastExecutedChange = changeId + } + } + } + } + + private def purgeObsoleteNotifications(now: Long, notifications: Seq[String]) { + val eligible = notifications.sorted.dropRight(1) // never purge newest notification--we need it for the seq number + for(notification <- eligible) { + val (jsonOpt, stat) = ZkUtils.readDataMaybeNull(zkClient, ZkUtils.TopicConfigChangesPath + "/" + notification) + if(jsonOpt.isDefined) { + val changeZnode = ZkUtils.TopicConfigChangesPath + "/" + notification + if (now - stat.getCtime > changeExpirationMs) { + debug("Purging config change notification " + notification) + ZkUtils.deletePath(zkClient, changeZnode) + } else { + return } } } diff --git a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala new file mode 100644 index 0000000..5c48796 --- /dev/null +++ b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server + +import junit.framework.Assert._ +import java.util.Properties +import java.io.File +import org.junit.{After, Before, Test} +import kafka.integration.KafkaServerTestHarness +import kafka.utils._ +import kafka.common._ +import kafka.log.LogConfig +import kafka.admin.AdminUtils +import org.scalatest.junit.JUnit3Suite + +class DynamicConfigChangeTest extends JUnit3Suite with KafkaServerTestHarness { + + override val configs = List(new KafkaConfig(TestUtils.createBrokerConfig(0, TestUtils.choosePort))) + + @Test + def testConfigChange() { + val oldVal = 100000 + val newVal = 200000 + val tp = TopicAndPartition("test", 0) + AdminUtils.createTopic(zkClient, tp.topic, 1, 1, LogConfig(flushInterval = oldVal).toProps) + TestUtils.retry(10000) { + val logOpt = this.servers(0).logManager.getLog(tp) + assertTrue(logOpt.isDefined) + assertEquals(oldVal, logOpt.get.config.flushInterval) + } + AdminUtils.changeTopicConfig(zkClient, tp.topic, LogConfig(flushInterval = newVal).toProps) + TestUtils.retry(10000) { + assertEquals(newVal, this.servers(0).logManager.getLog(tp).get.config.flushInterval) + } + } + +} \ No newline at end of file